diff options
author | Nick Mills-Barrett <nick@beeper.com> | 2022-07-17 23:19:43 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-17 22:19:43 +0100 |
commit | 5d4028f217f178fcd384d5bfddd92225b4e78c51 (patch) | |
tree | 3123cad0bdeea027069318cbb12a1bb65b4dc7de /synapse/storage | |
parent | Use HTTPStatus constants in place of literals in tests. (#13297) (diff) | |
download | synapse-5d4028f217f178fcd384d5bfddd92225b4e78c51.tar.xz |
Make all `process_replication_rows` methods async (#13304)
More prep work for asyncronous caching, also makes all process_replication_rows methods consistent (presence handler already is so). Signed off by Nick @ Beeper (@Fizzadar)
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/account_data.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/cache.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/deviceinbox.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/presence.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/receipts.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/tags.py | 4 |
8 files changed, 21 insertions, 15 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b8c8dcd76b..822108e83b 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -47,7 +47,7 @@ class SQLBaseStore(metaclass=ABCMeta): self.database_engine = database.engine self.db_pool = database - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 9af9f4f18e..337b22294e 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -414,7 +414,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) ) - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -437,7 +437,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) self._account_data_stream_cache.entity_has_changed(row.user_id, token) - super().process_replication_rows(stream_name, instance_name, token, rows) + await super().process_replication_rows(stream_name, instance_name, token, rows) async def add_account_data_to_room( self, user_id: str, room_id: str, account_data_type: str, content: JsonDict diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 2367ddeea3..048ff3e1b7 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -119,7 +119,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): "get_all_updated_caches", get_all_updated_caches_txn ) - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any] ) -> None: if stream_name == EventsStream.NAME: @@ -154,7 +154,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) - super().process_replication_rows(stream_name, instance_name, token, rows) + await super().process_replication_rows(stream_name, instance_name, token, rows) def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None: data = row.data diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 422e0e65ca..45fe58c104 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -128,7 +128,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): prefilled_cache=device_outbox_prefill, ) - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -148,7 +148,9 @@ class DeviceInboxWorkerStore(SQLBaseStore): self._device_federation_outbox_stream_cache.entity_has_changed( row.entity, token ) - return super().process_replication_rows(stream_name, instance_name, token, rows) + return await super().process_replication_rows( + stream_name, instance_name, token, rows + ) def get_to_device_stream_token(self) -> int: return self._device_inbox_id_gen.get_current_token() diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f3935bfead..5310d4eda2 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -280,7 +280,7 @@ class EventsWorkerStore(SQLBaseStore): id_column="chain_id", ) - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -292,7 +292,7 @@ class EventsWorkerStore(SQLBaseStore): elif stream_name == BackfillStream.NAME: self._backfill_id_gen.advance(instance_name, -token) - super().process_replication_rows(stream_name, instance_name, token, rows) + await super().process_replication_rows(stream_name, instance_name, token, rows) async def have_censored_event(self, event_id: str) -> bool: """Check if an event has been censored, i.e. if the content of the event has been erased diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 9769a18a9d..9fe3124b35 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -431,7 +431,7 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) self._presence_on_startup = [] return active_on_startup - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -443,4 +443,6 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) for row in rows: self.presence_stream_cache.entity_has_changed(row.user_id, token) self._get_presence_for_user.invalidate((row.user_id,)) - return super().process_replication_rows(stream_name, instance_name, token, rows) + return await super().process_replication_rows( + stream_name, instance_name, token, rows + ) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 0090c9f225..f85862d968 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -589,7 +589,7 @@ class ReceiptsWorkerStore(SQLBaseStore): "get_unread_event_push_actions_by_room_for_user", (room_id,) ) - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -604,7 +604,9 @@ class ReceiptsWorkerStore(SQLBaseStore): ) self._receipts_stream_cache.entity_has_changed(row.room_id, token) - return super().process_replication_rows(stream_name, instance_name, token, rows) + return await super().process_replication_rows( + stream_name, instance_name, token, rows + ) def _insert_linearized_receipt_txn( self, diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index b0f5de67a3..5e8905369c 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -292,7 +292,7 @@ class TagsWorkerStore(AccountDataWorkerStore): # than the id that the client has. pass - def process_replication_rows( + async def process_replication_rows( self, stream_name: str, instance_name: str, @@ -305,7 +305,7 @@ class TagsWorkerStore(AccountDataWorkerStore): self.get_tags_for_user.invalidate((row.user_id,)) self._account_data_stream_cache.entity_has_changed(row.user_id, token) - super().process_replication_rows(stream_name, instance_name, token, rows) + await super().process_replication_rows(stream_name, instance_name, token, rows) class TagsStore(TagsWorkerStore): |