summary refs log tree commit diff
path: root/synapse/app/generic_worker.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-14 14:01:39 +0100
committerGitHub <noreply@github.com>2020-05-14 14:01:39 +0100
commit4734a7bbe4d08d68c5f04dd76cd5bcfb4cd9b6be (patch)
treee01a3c3e134699ac16e402301906b9f62cdd263a /synapse/app/generic_worker.py
parentAdd `instance_map` config and route replication calls (#7495) (diff)
downloadsynapse-4734a7bbe4d08d68c5f04dd76cd5bcfb4cd9b6be.tar.xz
Move EventStream handling into default ReplicationDataHandler (#7493)
This is so that the logic can happen on both master and workers when we move event persistence out.
Diffstat (limited to 'synapse/app/generic_worker.py')
-rw-r--r--synapse/app/generic_worker.py33
1 files changed, 2 insertions, 31 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index bccb1140b2..2e3add7ac5 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -26,7 +26,6 @@ from twisted.web.resource import NoResource
 
 import synapse
 import synapse.events
-from synapse.api.constants import EventTypes
 from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.api.urls import (
     CLIENT_API_PREFIX,
@@ -81,11 +80,6 @@ from synapse.replication.tcp.streams import (
     ToDeviceStream,
     TypingStream,
 )
-from synapse.replication.tcp.streams.events import (
-    EventsStream,
-    EventsStreamEventRow,
-    EventsStreamRow,
-)
 from synapse.rest.admin import register_servlets_for_media_repo
 from synapse.rest.client.v1 import events
 from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@@ -633,7 +627,7 @@ class GenericWorkerServer(HomeServer):
 
 class GenericWorkerReplicationHandler(ReplicationDataHandler):
     def __init__(self, hs):
-        super(GenericWorkerReplicationHandler, self).__init__(hs.get_datastore())
+        super(GenericWorkerReplicationHandler, self).__init__(hs)
 
         self.store = hs.get_datastore()
         self.typing_handler = hs.get_typing_handler()
@@ -659,30 +653,7 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
                     stream_name, token, rows
                 )
 
-            if stream_name == EventsStream.NAME:
-                # We shouldn't get multiple rows per token for events stream, so
-                # we don't need to optimise this for multiple rows.
-                for row in rows:
-                    if row.type != EventsStreamEventRow.TypeId:
-                        continue
-                    assert isinstance(row, EventsStreamRow)
-
-                    event = await self.store.get_event(
-                        row.data.event_id, allow_rejected=True
-                    )
-                    if event.rejected_reason:
-                        continue
-
-                    extra_users = ()
-                    if event.type == EventTypes.Member:
-                        extra_users = (event.state_key,)
-                    max_token = self.store.get_room_max_stream_ordering()
-                    self.notifier.on_new_room_event(
-                        event, token, max_token, extra_users
-                    )
-
-                await self.pusher_pool.on_new_notifications(token, token)
-            elif stream_name == PushRulesStream.NAME:
+            if stream_name == PushRulesStream.NAME:
                 self.notifier.on_new_event(
                     "push_rules_key", token, users=[row.user_id for row in rows]
                 )