diff options
author | Erik Johnston <erik@matrix.org> | 2022-07-18 14:28:14 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-18 14:28:14 +0100 |
commit | f721f1baba9cdefc0bff540c3b93710b36eecee9 (patch) | |
tree | 5d754da53f48e96620b28de63295bda6193eb80b /synapse/storage | |
parent | Don't pull out full state when sending dummy events (#13310) (diff) | |
download | synapse-f721f1baba9cdefc0bff540c3b93710b36eecee9.tar.xz |
Revert "Make all `process_replication_rows` methods async (#13304)" (#13312)
This reverts commit 5d4028f217f178fcd384d5bfddd92225b4e78c51.
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/account_data.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/cache.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/deviceinbox.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/presence.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/receipts.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/tags.py | 4 |
8 files changed, 15 insertions, 21 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 822108e83b..b8c8dcd76b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -47,7 +47,7 @@ class SQLBaseStore(metaclass=ABCMeta): self.database_engine = database.engine self.db_pool = database - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 337b22294e..9af9f4f18e 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -414,7 +414,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) ) - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -437,7 +437,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) self._account_data_stream_cache.entity_has_changed(row.user_id, token) - await super().process_replication_rows(stream_name, instance_name, token, rows) + super().process_replication_rows(stream_name, instance_name, token, rows) async def add_account_data_to_room( self, user_id: str, room_id: str, account_data_type: str, content: JsonDict diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 048ff3e1b7..2367ddeea3 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -119,7 +119,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): "get_all_updated_caches", get_all_updated_caches_txn ) - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == EventsStream.NAME: @@ -154,7 +154,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) - await super().process_replication_rows(stream_name, instance_name, token, rows) + super().process_replication_rows(stream_name, instance_name, token, rows) def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: data = row.data diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 45fe58c104..422e0e65ca 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -128,7 +128,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): prefilled_cache=device_outbox_prefill, ) - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -148,9 +148,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): self._device_federation_outbox_stream_cache.entity_has_changed( row.entity, token ) - return await super().process_replication_rows( - stream_name, instance_name, token, rows - ) + return super().process_replication_rows(stream_name, instance_name, token, rows) def get_to_device_stream_token(self) -> int: return self._device_inbox_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 5310d4eda2..f3935bfead 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -280,7 +280,7 @@ class EventsWorkerStore(SQLBaseStore): id_column="chain_id", ) - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -292,7 +292,7 @@ class EventsWorkerStore(SQLBaseStore): elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(instance_name, -token) - await super().process_replication_rows(stream_name, instance_name, token, rows) + super().process_replication_rows(stream_name, instance_name, token, rows) async def have_censored_event(self, event_id: str) -> bool: """Check if an event has been censored, i.e. if the content of the event has been erased diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 9fe3124b35..9769a18a9d 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -431,7 +431,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) self._presence_on_startup = [] return active_on_startup - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -443,6 +443,4 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) for row in rows: self.presence_stream_cache.entity_has_changed(row.user_id, token) self._get_presence_for_user.invalidate((row.user_id,)) - return await super().process_replication_rows( - stream_name, instance_name, token, rows - ) + return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index f85862d968..0090c9f225 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -589,7 +589,7 @@ class ReceiptsWorkerStore(SQLBaseStore): "get_unread_event_push_actions_by_room_for_user", (room_id,) ) - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -604,9 +604,7 @@ class ReceiptsWorkerStore(SQLBaseStore): ) self._receipts_stream_cache.entity_has_changed(row.room_id, token) - return await super().process_replication_rows( - stream_name, instance_name, token, rows - ) + return super().process_replication_rows(stream_name, instance_name, token, rows) def _insert_linearized_receipt_txn( self, diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index 5e8905369c..b0f5de67a3 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -292,7 +292,7 @@ class TagsWorkerStore(AccountDataWorkerStore): # than the id that the client has. pass - async def process_replication_rows( + def process_replication_rows( self, stream_name: str, instance_name: str, @@ -305,7 +305,7 @@ class TagsWorkerStore(AccountDataWorkerStore): self.get_tags_for_user.invalidate((row.user_id,)) self._account_data_stream_cache.entity_has_changed(row.user_id, token) - await super().process_replication_rows(stream_name, instance_name, token, rows) + super().process_replication_rows(stream_name, instance_name, token, rows) class TagsStore(TagsWorkerStore): |