summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/7493.misc1
-rw-r--r--synapse/app/generic_worker.py33
-rw-r--r--synapse/replication/tcp/client.py37
-rw-r--r--synapse/server.py2
-rw-r--r--synapse/server.pyi3
5 files changed, 40 insertions, 36 deletions
diff --git a/changelog.d/7493.misc b/changelog.d/7493.misc
new file mode 100644
index 0000000000..575c55a99b
--- /dev/null
+++ b/changelog.d/7493.misc
@@ -0,0 +1 @@
+Move EventStream handling into default ReplicationDataHandler.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index bccb1140b2..2e3add7ac5 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -26,7 +26,6 @@ from twisted.web.resource import NoResource
 
 import synapse
 import synapse.events
-from synapse.api.constants import EventTypes
 from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.api.urls import (
     CLIENT_API_PREFIX,
@@ -81,11 +80,6 @@ from synapse.replication.tcp.streams import (
     ToDeviceStream,
     TypingStream,
 )
-from synapse.replication.tcp.streams.events import (
-    EventsStream,
-    EventsStreamEventRow,
-    EventsStreamRow,
-)
 from synapse.rest.admin import register_servlets_for_media_repo
 from synapse.rest.client.v1 import events
 from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@@ -633,7 +627,7 @@ class GenericWorkerServer(HomeServer):
 
 class GenericWorkerReplicationHandler(ReplicationDataHandler):
     def __init__(self, hs):
-        super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore())
+        super(GenericWorkerReplicationHandler, self).__init__(hs)
 
         self.store = hs.get_datastore()
         self.typing_handler = hs.get_typing_handler()
@@ -659,30 +653,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
                     stream_name, token, rows
                 )
 
-            if stream_name == EventsStream.NAME:
-                # We shouldn't get multiple rows per token for events stream, so
-                # we don't need to optimise this for multiple rows.
-                for row in rows:
-                    if row.type != EventsStreamEventRow.TypeId:
-                        continue
-                    assert isinstance(row, EventsStreamRow)
-
-                    event = await self.store.get_event(
-                        row.data.event_id, allow_rejected=True
-                    )
-                    if event.rejected_reason:
-                        continue
-
-                    extra_users = ()
-                    if event.type == EventTypes.Member:
-                        extra_users = (event.state_key,)
-                    max_token = self.store.get_room_max_stream_ordering()
-                    self.notifier.on_new_room_event(
-                        event, token, max_token, extra_users
-                    )
-
-                await self.pusher_pool.on_new_notifications(token, token)
-            elif stream_name == PushRulesStream.NAME:
+            if stream_name == PushRulesStream.NAME:
                 self.notifier.on_new_event(
                     "push_rules_key", token, users=[row.user_id for row in rows]
                 )
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 20cb8a654f..28826302f5 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -16,12 +16,17 @@
 """
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
 
 from twisted.internet.protocol import ReconnectingClientFactory
 
-from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.api.constants import EventTypes
 from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
+from synapse.replication.tcp.streams.events import (
+    EventsStream,
+    EventsStreamEventRow,
+    EventsStreamRow,
+)
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -83,8 +88,10 @@ class ReplicationDataHandler:
     to handle updates in additional ways.
     """
 
-    def __init__(self, store: BaseSlavedStore):
-        self.store = store
+    def __init__(self, hs: "HomeServer"):
+        self.store = hs.get_datastore()
+        self.pusher_pool = hs.get_pusherpool()
+        self.notifier = hs.get_notifier()
 
     async def on_rdata(
         self, stream_name: str, instance_name: str, token: int, rows: list
@@ -102,6 +109,28 @@ class ReplicationDataHandler:
         """
         self.store.process_replication_rows(stream_name, instance_name, token, rows)
 
+        if stream_name == EventsStream.NAME:
+            # We shouldn't get multiple rows per token for events stream, so
+            # we don't need to optimise this for multiple rows.
+            for row in rows:
+                if row.type != EventsStreamEventRow.TypeId:
+                    continue
+                assert isinstance(row, EventsStreamRow)
+
+                event = await self.store.get_event(
+                    row.data.event_id, allow_rejected=True
+                )
+                if event.rejected_reason:
+                    continue
+
+                extra_users = ()  # type: Tuple[str, ...]
+                if event.type == EventTypes.Member:
+                    extra_users = (event.state_key,)
+                max_token = self.store.get_room_max_stream_ordering()
+                self.notifier.on_new_room_event(event, token, max_token, extra_users)
+
+            await self.pusher_pool.on_new_notifications(token, token)
+
     async def on_position(self, stream_name: str, instance_name: str, token: int):
         self.store.process_replication_rows(stream_name, instance_name, token, [])
 
diff --git a/synapse/server.py b/synapse/server.py
index b4aea81e24..c530f1aa1a 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -581,7 +581,7 @@ class HomeServer(object):
         return ReplicationStreamer(self)
 
     def build_replication_data_handler(self):
-        return ReplicationDataHandler(self.get_datastore())
+        return ReplicationDataHandler(self)
 
     def remove_pusher(self, app_id, push_key, user_id):
         return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 31a9cc0389..9e7fad7e6e 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -19,6 +19,7 @@ import synapse.handlers.room_member
 import synapse.handlers.set_password
 import synapse.http.client
 import synapse.notifier
+import synapse.push.pusherpool
 import synapse.replication.tcp.client
 import synapse.replication.tcp.handler
 import synapse.rest.media.v1.media_repository
@@ -133,3 +134,5 @@ class HomeServer(object):
         pass
     def get_macaroon_generator(self) -> synapse.handlers.auth.MacaroonGenerator:
         pass
+    def get_pusherpool(self) -> synapse.push.pusherpool.PusherPool:
+        pass