summary refs log tree commit diff
path: root/synapse/app/synchrotron.py
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2020-01-07 15:07:19 +0000
committerBrendan Abolivier <babolivier@matrix.org>2020-01-07 15:07:19 +0000
commit4be582d7c85b45d14d0320330b98c9251b3a95ed (patch)
tree8d09864ce61b48fe5c4e44a6016713f239d667da /synapse/app/synchrotron.py
parentApply suggestions from code review (diff)
parentFixup changelog (diff)
downloadsynapse-4be582d7c85b45d14d0320330b98c9251b3a95ed.tar.xz
Merge branch 'develop' into babolivier/retention_doc
Diffstat (limited to 'synapse/app/synchrotron.py')
-rw-r--r--synapse/app/synchrotron.py18
1 files changed, 12 insertions, 6 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index dd2132e608..03031ee34d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -48,7 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams.events import EventsStreamEventRow
+from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow
 from synapse.rest.client.v1 import events
 from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
 from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@@ -371,8 +371,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
     def get_currently_syncing_users(self):
         return self.presence_handler.get_currently_syncing_users()
 
-    @defer.inlineCallbacks
-    def process_and_notify(self, stream_name, token, rows):
+    async def process_and_notify(self, stream_name, token, rows):
         try:
             if stream_name == "events":
                 # We shouldn't get multiple rows per token for events stream, so
@@ -380,7 +379,14 @@ class SyncReplicationHandler(ReplicationClientHandler):
                 for row in rows:
                     if row.type != EventsStreamEventRow.TypeId:
                         continue
-                    event = yield self.store.get_event(row.data.event_id)
+                    assert isinstance(row, EventsStreamRow)
+
+                    event = await self.store.get_event(
+                        row.data.event_id, allow_rejected=True
+                    )
+                    if event.rejected_reason:
+                        continue
+
                     extra_users = ()
                     if event.type == EventTypes.Member:
                         extra_users = (event.state_key,)
@@ -412,11 +418,11 @@ class SyncReplicationHandler(ReplicationClientHandler):
             elif stream_name == "device_lists":
                 all_room_ids = set()
                 for row in rows:
-                    room_ids = yield self.store.get_rooms_for_user(row.user_id)
+                    room_ids = await self.store.get_rooms_for_user(row.user_id)
                     all_room_ids.update(room_ids)
                 self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
             elif stream_name == "presence":
-                yield self.presence_handler.process_replication_rows(token, rows)
+                await self.presence_handler.process_replication_rows(token, rows)
             elif stream_name == "receipts":
                 self.notifier.on_new_event(
                     "groups_key", token, users=[row.user_id for row in rows]