From 5d4028f217f178fcd384d5bfddd92225b4e78c51 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Sun, 17 Jul 2022 23:19:43 +0200 Subject: Make all `process_replication_rows` methods async (#13304) More prep work for asyncronous caching, also makes all process_replication_rows methods consistent (presence handler already is so). Signed off by Nick @ Beeper (@Fizzadar) --- changelog.d/13304.misc | 1 + synapse/handlers/typing.py | 4 ++-- synapse/replication/slave/storage/devices.py | 6 ++++-- synapse/replication/slave/storage/push_rule.py | 6 ++++-- synapse/replication/slave/storage/pushers.py | 6 ++++-- synapse/replication/tcp/client.py | 6 ++++-- synapse/storage/_base.py | 2 +- synapse/storage/databases/main/account_data.py | 4 ++-- synapse/storage/databases/main/cache.py | 4 ++-- synapse/storage/databases/main/deviceinbox.py | 6 ++++-- synapse/storage/databases/main/events_worker.py | 4 ++-- synapse/storage/databases/main/presence.py | 6 ++++-- synapse/storage/databases/main/receipts.py | 6 ++++-- synapse/storage/databases/main/tags.py | 4 ++-- 14 files changed, 40 insertions(+), 25 deletions(-) create mode 100644 changelog.d/13304.misc diff --git a/changelog.d/13304.misc b/changelog.d/13304.misc new file mode 100644 index 0000000000..156d3d71d7 --- /dev/null +++ b/changelog.d/13304.misc @@ -0,0 +1 @@ +Make all replication row processing methods asynchronous. Contributed by Nick @ Beeper (@fizzadar). diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index d104ea07fe..26edeeca3c 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -158,7 +158,7 @@ class FollowerTypingHandler: except Exception: logger.exception("Error pushing typing notif to remotes") - def process_replication_rows( + async def process_replication_rows( self, token: int, rows: List[TypingStream.TypingStreamRow] ) -> None: """Should be called whenever we receive updates for typing stream.""" @@ -444,7 +444,7 @@ class TypingWriterHandler(FollowerTypingHandler): return rows, current_id, limited - def process_replication_rows( + async def process_replication_rows( self, token: int, rows: List[TypingStream.TypingStreamRow] ) -> None: # The writing process should never get updates from replication. diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index a48cc02069..22f7999721 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -49,7 +49,7 @@ class SlavedDeviceStore(DeviceWorkerStore, BaseSlavedStore): def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == DeviceListsStream.NAME: @@ -59,7 +59,9 @@ class SlavedDeviceStore(DeviceWorkerStore, BaseSlavedStore): self._device_list_id_gen.advance(instance_name, token) for row in rows: self._user_signature_stream_cache.entity_has_changed(row.user_id, token) - return super().process_replication_rows(stream_name, instance_name, token, rows) + return await super().process_replication_rows( + stream_name, instance_name, token, rows + ) def _invalidate_caches_for_devices( self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow] diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 52ee3f7e58..e1838a81a9 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -24,7 +24,7 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): def get_max_push_rules_stream_id(self) -> int: return self._push_rules_stream_id_gen.get_current_token() - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == PushRulesStream.NAME: @@ -33,4 +33,6 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): self.get_push_rules_for_user.invalidate((row.user_id,)) self.get_push_rules_enabled_for_user.invalidate((row.user_id,)) self.push_rules_stream_cache.entity_has_changed(row.user_id, token) - return super().process_replication_rows(stream_name, instance_name, token, rows) + return await super().process_replication_rows( + stream_name, instance_name, token, rows + ) diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py index de642bba71..fb3f5653af 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py @@ -40,9 +40,11 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore): def get_pushers_stream_token(self) -> int: return self._pushers_id_gen.get_current_token() - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == PushersStream.NAME: self._pushers_id_gen.advance(instance_name, token) - return super().process_replication_rows(stream_name, instance_name, token, rows) + return await super().process_replication_rows( + stream_name, instance_name, token, rows + ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 2f59245058..f9722ccb4f 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -144,13 +144,15 @@ class ReplicationDataHandler: token: stream token for this batch of rows rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. """ - self.store.process_replication_rows(stream_name, instance_name, token, rows) + await self.store.process_replication_rows( + stream_name, instance_name, token, rows + ) if self.send_handler: await self.send_handler.process_replication_rows(stream_name, token, rows) if stream_name == TypingStream.NAME: - self._typing_handler.process_replication_rows(token, rows) + await self._typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( StreamKeyType.TYPING, token, rooms=[row.room_id for row in rows] ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b8c8dcd76b..822108e83b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -47,7 +47,7 @@ class SQLBaseStore(metaclass=ABCMeta): self.database_engine = database.engine self.db_pool = database - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 9af9f4f18e..337b22294e 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -414,7 +414,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) ) - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -437,7 +437,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) self._account_data_stream_cache.entity_has_changed(row.user_id, token) - super().process_replication_rows(stream_name, instance_name, token, rows) + await super().process_replication_rows(stream_name, instance_name, token, rows) async def add_account_data_to_room( self, user_id: str, room_id: str, account_data_type: str, content: JsonDict diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 2367ddeea3..048ff3e1b7 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -119,7 +119,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): "get_all_updated_caches", get_all_updated_caches_txn ) - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == EventsStream.NAME: @@ -154,7 +154,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) - super().process_replication_rows(stream_name, instance_name, token, rows) + await super().process_replication_rows(stream_name, instance_name, token, rows) def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: data = row.data diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 422e0e65ca..45fe58c104 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -128,7 +128,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): prefilled_cache=device_outbox_prefill, ) - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -148,7 +148,9 @@ class DeviceInboxWorkerStore(SQLBaseStore): self._device_federation_outbox_stream_cache.entity_has_changed( row.entity, token ) - return super().process_replication_rows(stream_name, instance_name, token, rows) + return await super().process_replication_rows( + stream_name, instance_name, token, rows + ) def get_to_device_stream_token(self) -> int: return self._device_inbox_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f3935bfead..5310d4eda2 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -280,7 +280,7 @@ class EventsWorkerStore(SQLBaseStore): id_column="chain_id", ) - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -292,7 +292,7 @@ class EventsWorkerStore(SQLBaseStore): elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(instance_name, -token) - super().process_replication_rows(stream_name, instance_name, token, rows) + await super().process_replication_rows(stream_name, instance_name, token, rows) async def have_censored_event(self, event_id: str) -> bool: """Check if an event has been censored, i.e. if the content of the event has been erased diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 9769a18a9d..9fe3124b35 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -431,7 +431,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) self._presence_on_startup = [] return active_on_startup - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -443,4 +443,6 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) for row in rows: self.presence_stream_cache.entity_has_changed(row.user_id, token) self._get_presence_for_user.invalidate((row.user_id,)) - return super().process_replication_rows(stream_name, instance_name, token, rows) + return await super().process_replication_rows( + stream_name, instance_name, token, rows + ) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 0090c9f225..f85862d968 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -589,7 +589,7 @@ class ReceiptsWorkerStore(SQLBaseStore): "get_unread_event_push_actions_by_room_for_user", (room_id,) ) - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -604,7 +604,9 @@ class ReceiptsWorkerStore(SQLBaseStore): ) self._receipts_stream_cache.entity_has_changed(row.room_id, token) - return super().process_replication_rows(stream_name, instance_name, token, rows) + return await super().process_replication_rows( + stream_name, instance_name, token, rows + ) def _insert_linearized_receipt_txn( self, diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index b0f5de67a3..5e8905369c 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -292,7 +292,7 @@ class TagsWorkerStore(AccountDataWorkerStore): # than the id that the client has. pass - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -305,7 +305,7 @@ class TagsWorkerStore(AccountDataWorkerStore): self.get_tags_for_user.invalidate((row.user_id,)) self._account_data_stream_cache.entity_has_changed(row.user_id, token) - super().process_replication_rows(stream_name, instance_name, token, rows) + await super().process_replication_rows(stream_name, instance_name, token, rows) class TagsStore(TagsWorkerStore): -- cgit 1.4.1