summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams/_base.py')
-rw-r--r--synapse/replication/tcp/streams/_base.py29
1 files changed, 8 insertions, 21 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 4acefc8a96..f196eff072 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -264,7 +264,7 @@ class BackfillStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_current_backfill_token),
-            db_query_to_update_function(store.get_all_new_backfill_event_rows),
+            store.get_all_new_backfill_event_rows,
         )
 
 
@@ -291,9 +291,7 @@ class PresenceStream(Stream):
         if hs.config.worker_app is None:
             # on the master, query the presence handler
             presence_handler = hs.get_presence_handler()
-            update_function = db_query_to_update_function(
-                presence_handler.get_all_presence_updates
-            )
+            update_function = presence_handler.get_all_presence_updates
         else:
             # Query master process
             update_function = make_http_update_function(hs, self.NAME)
@@ -318,9 +316,7 @@ class TypingStream(Stream):
 
         if hs.config.worker_app is None:
             # on the master, query the typing handler
-            update_function = db_query_to_update_function(
-                typing_handler.get_all_typing_updates
-            )
+            update_function = typing_handler.get_all_typing_updates
         else:
             # Query master process
             update_function = make_http_update_function(hs, self.NAME)
@@ -352,7 +348,7 @@ class ReceiptsStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_max_receipt_stream_id),
-            db_query_to_update_function(store.get_all_updated_receipts),
+            store.get_all_updated_receipts,
         )
 
 
@@ -367,26 +363,17 @@ class PushRulesStream(Stream):
 
     def __init__(self, hs):
         self.store = hs.get_datastore()
+
         super(PushRulesStream, self).__init__(
-            hs.get_instance_name(), self._current_token, self._update_function
+            hs.get_instance_name(),
+            self._current_token,
+            self.store.get_all_push_rule_updates,
         )
 
     def _current_token(self, instance_name: str) -> int:
         push_rules_token, _ = self.store.get_push_rules_stream_token()
         return push_rules_token
 
-    async def _update_function(
-        self, instance_name: str, from_token: Token, to_token: Token, limit: int
-    ):
-        rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)
-
-        limited = False
-        if len(rows) == limit:
-            to_token = rows[-1][0]
-            limited = True
-
-        return [(row[0], (row[2],)) for row in rows], to_token, limited
-
 
 class PushersStream(Stream):
     """A user has added/changed/removed a pusher