From f721f1baba9cdefc0bff540c3b93710b36eecee9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 18 Jul 2022 14:28:14 +0100 Subject: Revert "Make all `process_replication_rows` methods async (#13304)" (#13312) This reverts commit 5d4028f217f178fcd384d5bfddd92225b4e78c51. --- synapse/handlers/typing.py | 4 ++-- synapse/replication/slave/storage/devices.py | 6 ++---- synapse/replication/slave/storage/push_rule.py | 6 ++---- synapse/replication/slave/storage/pushers.py | 6 ++---- synapse/replication/tcp/client.py | 6 ++---- synapse/storage/_base.py | 2 +- synapse/storage/databases/main/account_data.py | 4 ++-- synapse/storage/databases/main/cache.py | 4 ++-- synapse/storage/databases/main/deviceinbox.py | 6 ++---- synapse/storage/databases/main/events_worker.py | 4 ++-- synapse/storage/databases/main/presence.py | 6 ++---- synapse/storage/databases/main/receipts.py | 6 ++---- synapse/storage/databases/main/tags.py | 4 ++-- 13 files changed, 25 insertions(+), 39 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 26edeeca3c..d104ea07fe 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -158,7 +158,7 @@ class FollowerTypingHandler: except Exception: logger.exception("Error pushing typing notif to remotes") - async def process_replication_rows( + def process_replication_rows( self, token: int, rows: List[TypingStream.TypingStreamRow] ) -> None: """Should be called whenever we receive updates for typing stream.""" @@ -444,7 +444,7 @@ class TypingWriterHandler(FollowerTypingHandler): return rows, current_id, limited - async def process_replication_rows( + def process_replication_rows( self, token: int, rows: List[TypingStream.TypingStreamRow] ) -> None: # The writing process should never get updates from replication. diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 22f7999721..a48cc02069 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -49,7 +49,7 @@ class SlavedDeviceStore(DeviceWorkerStore, BaseSlavedStore): def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == DeviceListsStream.NAME: @@ -59,9 +59,7 @@ class SlavedDeviceStore(DeviceWorkerStore, BaseSlavedStore): self._device_list_id_gen.advance(instance_name, token) for row in rows: self._user_signature_stream_cache.entity_has_changed(row.user_id, token) - return await super().process_replication_rows( - stream_name, instance_name, token, rows - ) + return super().process_replication_rows(stream_name, instance_name, token, rows) def _invalidate_caches_for_devices( self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow] diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index e1838a81a9..52ee3f7e58 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -24,7 +24,7 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): def get_max_push_rules_stream_id(self) -> int: return self._push_rules_stream_id_gen.get_current_token() - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == PushRulesStream.NAME: @@ -33,6 +33,4 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): self.get_push_rules_for_user.invalidate((row.user_id,)) self.get_push_rules_enabled_for_user.invalidate((row.user_id,)) self.push_rules_stream_cache.entity_has_changed(row.user_id, token) - return await super().process_replication_rows( - stream_name, instance_name, token, rows - ) + return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py index fb3f5653af..de642bba71 100644 --- a/synapse/replication/slave/storage/pushers.py +++ b/synapse/replication/slave/storage/pushers.py @@ -40,11 +40,9 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore): def get_pushers_stream_token(self) -> int: return self._pushers_id_gen.get_current_token() - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == PushersStream.NAME: self._pushers_id_gen.advance(instance_name, token) - return await super().process_replication_rows( - stream_name, instance_name, token, rows - ) + return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index f9722ccb4f..2f59245058 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -144,15 +144,13 @@ class ReplicationDataHandler: token: stream token for this batch of rows rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row. """ - await self.store.process_replication_rows( - stream_name, instance_name, token, rows - ) + self.store.process_replication_rows(stream_name, instance_name, token, rows) if self.send_handler: await self.send_handler.process_replication_rows(stream_name, token, rows) if stream_name == TypingStream.NAME: - await self._typing_handler.process_replication_rows(token, rows) + self._typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( StreamKeyType.TYPING, token, rooms=[row.room_id for row in rows] ) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 822108e83b..b8c8dcd76b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -47,7 +47,7 @@ class SQLBaseStore(metaclass=ABCMeta): self.database_engine = database.engine self.db_pool = database - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 337b22294e..9af9f4f18e 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -414,7 +414,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) ) - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -437,7 +437,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) self._account_data_stream_cache.entity_has_changed(row.user_id, token) - await super().process_replication_rows(stream_name, instance_name, token, rows) + super().process_replication_rows(stream_name, instance_name, token, rows) async def add_account_data_to_room( self, user_id: str, room_id: str, account_data_type: str, content: JsonDict diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 048ff3e1b7..2367ddeea3 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -119,7 +119,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): "get_all_updated_caches", get_all_updated_caches_txn ) - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == EventsStream.NAME: @@ -154,7 +154,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) - await super().process_replication_rows(stream_name, instance_name, token, rows) + super().process_replication_rows(stream_name, instance_name, token, rows) def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: data = row.data diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 45fe58c104..422e0e65ca 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -128,7 +128,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): prefilled_cache=device_outbox_prefill, ) - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -148,9 +148,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): self._device_federation_outbox_stream_cache.entity_has_changed( row.entity, token ) - return await super().process_replication_rows( - stream_name, instance_name, token, rows - ) + return super().process_replication_rows(stream_name, instance_name, token, rows) def get_to_device_stream_token(self) -> int: return self._device_inbox_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 5310d4eda2..f3935bfead 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -280,7 +280,7 @@ class EventsWorkerStore(SQLBaseStore): id_column="chain_id", ) - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -292,7 +292,7 @@ class EventsWorkerStore(SQLBaseStore): elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(instance_name, -token) - await super().process_replication_rows(stream_name, instance_name, token, rows) + super().process_replication_rows(stream_name, instance_name, token, rows) async def have_censored_event(self, event_id: str) -> bool: """Check if an event has been censored, i.e. if the content of the event has been erased diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 9fe3124b35..9769a18a9d 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -431,7 +431,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) self._presence_on_startup = [] return active_on_startup - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -443,6 +443,4 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) for row in rows: self.presence_stream_cache.entity_has_changed(row.user_id, token) self._get_presence_for_user.invalidate((row.user_id,)) - return await super().process_replication_rows( - stream_name, instance_name, token, rows - ) + return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index f85862d968..0090c9f225 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -589,7 +589,7 @@ class ReceiptsWorkerStore(SQLBaseStore): "get_unread_event_push_actions_by_room_for_user", (room_id,) ) - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -604,9 +604,7 @@ class ReceiptsWorkerStore(SQLBaseStore): ) self._receipts_stream_cache.entity_has_changed(row.room_id, token) - return await super().process_replication_rows( - stream_name, instance_name, token, rows - ) + return super().process_replication_rows(stream_name, instance_name, token, rows) def _insert_linearized_receipt_txn( self, diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index 5e8905369c..b0f5de67a3 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -292,7 +292,7 @@ class TagsWorkerStore(AccountDataWorkerStore): # than the id that the client has. pass - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -305,7 +305,7 @@ class TagsWorkerStore(AccountDataWorkerStore): self.get_tags_for_user.invalidate((row.user_id,)) self._account_data_stream_cache.entity_has_changed(row.user_id, token) - await super().process_replication_rows(stream_name, instance_name, token, rows) + super().process_replication_rows(stream_name, instance_name, token, rows) class TagsStore(TagsWorkerStore): -- cgit 1.4.1