diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2020-01-07 15:07:19 +0000 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2020-01-07 15:07:19 +0000 |
commit | 4be582d7c85b45d14d0320330b98c9251b3a95ed (patch) | |
tree | 8d09864ce61b48fe5c4e44a6016713f239d667da /synapse/app/synchrotron.py | |
parent | Apply suggestions from code review (diff) | |
parent | Fixup changelog (diff) | |
download | synapse-4be582d7c85b45d14d0320330b98c9251b3a95ed.tar.xz |
Merge branch 'develop' into babolivier/retention_doc
Diffstat (limited to 'synapse/app/synchrotron.py')
-rw-r--r-- | synapse/app/synchrotron.py | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index dd2132e608..03031ee34d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -48,7 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams.events import EventsStreamEventRow +from synapse.replication.tcp.streams.events import EventsStreamEventRow, EventsStreamRow from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.room import RoomInitialSyncRestServlet @@ -371,8 +371,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def get_currently_syncing_users(self): return self.presence_handler.get_currently_syncing_users() - @defer.inlineCallbacks - def process_and_notify(self, stream_name, token, rows): + async def process_and_notify(self, stream_name, token, rows): try: if stream_name == "events": # We shouldn't get multiple rows per token for events stream, so @@ -380,7 +379,14 @@ class SyncReplicationHandler(ReplicationClientHandler): for row in rows: if row.type != EventsStreamEventRow.TypeId: continue - event = yield self.store.get_event(row.data.event_id) + assert isinstance(row, EventsStreamRow) + + event = await self.store.get_event( + row.data.event_id, allow_rejected=True + ) + if event.rejected_reason: + continue + extra_users = () if event.type == EventTypes.Member: extra_users = (event.state_key,) @@ -412,11 +418,11 @@ class SyncReplicationHandler(ReplicationClientHandler): elif stream_name == "device_lists": all_room_ids = set() for row in rows: - room_ids = yield self.store.get_rooms_for_user(row.user_id) + room_ids = await self.store.get_rooms_for_user(row.user_id) all_room_ids.update(room_ids) self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) elif stream_name == "presence": - yield self.presence_handler.process_replication_rows(token, rows) + await self.presence_handler.process_replication_rows(token, rows) elif stream_name == "receipts": self.notifier.on_new_event( "groups_key", token, users=[row.user_id for row in rows] |