summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2022-07-18 14:28:14 +0100
committerGitHub <noreply@github.com>2022-07-18 14:28:14 +0100
commitf721f1baba9cdefc0bff540c3b93710b36eecee9 (patch)
tree5d754da53f48e96620b28de63295bda6193eb80b /synapse/replication
parentDon't pull out full state when sending dummy events (#13310) (diff)
downloadsynapse-f721f1baba9cdefc0bff540c3b93710b36eecee9.tar.xz
Revert "Make all `process_replication_rows` methods async (#13304)" (#13312)
This reverts commit 5d4028f217f178fcd384d5bfddd92225b4e78c51.
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/slave/storage/devices.py6
-rw-r--r--synapse/replication/slave/storage/push_rule.py6
-rw-r--r--synapse/replication/slave/storage/pushers.py6
-rw-r--r--synapse/replication/tcp/client.py6
4 files changed, 8 insertions, 16 deletions
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 22f7999721..a48cc02069 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -49,7 +49,7 @@ class SlavedDeviceStore(DeviceWorkerStore, BaseSlavedStore):
     def get_device_stream_token(self) -> int:
         return self._device_list_id_gen.get_current_token()
 
-    async def process_replication_rows(
+    def process_replication_rows(
         self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
     ) -> None:
         if stream_name == DeviceListsStream.NAME:
@@ -59,9 +59,7 @@ class SlavedDeviceStore(DeviceWorkerStore, BaseSlavedStore):
             self._device_list_id_gen.advance(instance_name, token)
             for row in rows:
                 self._user_signature_stream_cache.entity_has_changed(row.user_id, token)
-        return await super().process_replication_rows(
-            stream_name, instance_name, token, rows
-        )
+        return super().process_replication_rows(stream_name, instance_name, token, rows)
 
     def _invalidate_caches_for_devices(
         self, token: int, rows: Iterable[DeviceListsStream.DeviceListsStreamRow]
diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py
index e1838a81a9..52ee3f7e58 100644
--- a/synapse/replication/slave/storage/push_rule.py
+++ b/synapse/replication/slave/storage/push_rule.py
@@ -24,7 +24,7 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
     def get_max_push_rules_stream_id(self) -> int:
         return self._push_rules_stream_id_gen.get_current_token()
 
-    async def process_replication_rows(
+    def process_replication_rows(
         self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
     ) -> None:
         if stream_name == PushRulesStream.NAME:
@@ -33,6 +33,4 @@ class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore):
                 self.get_push_rules_for_user.invalidate((row.user_id,))
                 self.get_push_rules_enabled_for_user.invalidate((row.user_id,))
                 self.push_rules_stream_cache.entity_has_changed(row.user_id, token)
-        return await super().process_replication_rows(
-            stream_name, instance_name, token, rows
-        )
+        return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/slave/storage/pushers.py b/synapse/replication/slave/storage/pushers.py
index fb3f5653af..de642bba71 100644
--- a/synapse/replication/slave/storage/pushers.py
+++ b/synapse/replication/slave/storage/pushers.py
@@ -40,11 +40,9 @@ class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
     def get_pushers_stream_token(self) -> int:
         return self._pushers_id_gen.get_current_token()
 
-    async def process_replication_rows(
+    def process_replication_rows(
         self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
     ) -> None:
         if stream_name == PushersStream.NAME:
             self._pushers_id_gen.advance(instance_name, token)
-        return await super().process_replication_rows(
-            stream_name, instance_name, token, rows
-        )
+        return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index f9722ccb4f..2f59245058 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -144,15 +144,13 @@ class ReplicationDataHandler:
             token: stream token for this batch of rows
             rows: a list of Stream.ROW_TYPE objects as returned by Stream.parse_row.
         """
-        await self.store.process_replication_rows(
-            stream_name, instance_name, token, rows
-        )
+        self.store.process_replication_rows(stream_name, instance_name, token, rows)
 
         if self.send_handler:
             await self.send_handler.process_replication_rows(stream_name, token, rows)
 
         if stream_name == TypingStream.NAME:
-            await self._typing_handler.process_replication_rows(token, rows)
+            self._typing_handler.process_replication_rows(token, rows)
             self.notifier.on_new_event(
                 StreamKeyType.TYPING, token, rooms=[row.room_id for row in rows]
             )