summary refs log tree commit diff
path: root/synapse/replication/tcp/streams
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-10-28 12:11:45 +0000
committerGitHub <noreply@github.com>2020-10-28 12:11:45 +0000
commita6ea1a957e8e38ca3f98d4da32ee49a40fcb4807 (patch)
tree43068698506ed93e735b48b785e58fc1d1b464b9 /synapse/replication/tcp/streams
parentTell Black to format code for Python 3.5 (#8664) (diff)
downloadsynapse-a6ea1a957e8e38ca3f98d4da32ee49a40fcb4807.tar.xz
Don't pull event from DB when handling replication traffic. (#8669)
I was trying to make it so that we didn't have to start a background task when handling RDATA, but that is a bigger job (due to all the code in `generic_worker`). However I still think not pulling the event from the DB may help reduce some DB usage due to replication, even if most workers will simply go and pull that event from the DB later anyway.

Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
Diffstat (limited to 'synapse/replication/tcp/streams')
-rw-r--r--synapse/replication/tcp/streams/events.py21
1 files changed, 13 insertions, 8 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 82e9e0d64e..86a62b71eb 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -15,12 +15,15 @@
 # limitations under the License.
 import heapq
 from collections.abc import Iterable
-from typing import List, Tuple, Type
+from typing import TYPE_CHECKING, List, Optional, Tuple, Type
 
 import attr
 
 from ._base import Stream, StreamUpdateResult, Token
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 """Handling of the 'events' replication stream
 
 This stream contains rows of various types. Each row therefore contains a 'type'
@@ -81,12 +84,14 @@ class BaseEventsStreamRow:
 class EventsStreamEventRow(BaseEventsStreamRow):
     TypeId = "ev"
 
-    event_id = attr.ib()  # str
-    room_id = attr.ib()  # str
-    type = attr.ib()  # str
-    state_key = attr.ib()  # str, optional
-    redacts = attr.ib()  # str, optional
-    relates_to = attr.ib()  # str, optional
+    event_id = attr.ib(type=str)
+    room_id = attr.ib(type=str)
+    type = attr.ib(type=str)
+    state_key = attr.ib(type=Optional[str])
+    redacts = attr.ib(type=Optional[str])
+    relates_to = attr.ib(type=Optional[str])
+    membership = attr.ib(type=Optional[str])
+    rejected = attr.ib(type=bool)
 
 
 @attr.s(slots=True, frozen=True)
@@ -113,7 +118,7 @@ class EventsStream(Stream):
 
     NAME = "events"
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self._store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),