summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-07-07 12:11:35 +0100
committerGitHub <noreply@github.com>2020-07-07 12:11:35 +0100
commit67d7756fcfb43c2b01a83da10b4f36635fa7b441 (patch)
treef193847474b4ef687e522add3d011f66b9407b16 /synapse/replication
parentAdd libwebp dependency to Dockerfile (#7791) (diff)
downloadsynapse-67d7756fcfb43c2b01a83da10b4f36635fa7b441.tar.xz
Refactor getting replication updates from database v2. (#7740)
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/streams/_base.py56
1 files changed, 10 insertions, 46 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index f196eff072..9076bbe9f1 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -198,26 +198,6 @@ def current_token_without_instance(
     return lambda instance_name: current_token()
 
 
-def db_query_to_update_function(
-    query_function: Callable[[Token, Token, int], Awaitable[List[tuple]]]
-) -> UpdateFunction:
-    """Wraps a db query function which returns a list of rows to make it
-    suitable for use as an `update_function` for the Stream class
-    """
-
-    async def update_function(instance_name, from_token, upto_token, limit):
-        rows = await query_function(from_token, upto_token, limit)
-        updates = [(row[0], row[1:]) for row in rows]
-        limited = False
-        if len(updates) >= limit:
-            upto_token = updates[-1][0]
-            limited = True
-
-        return updates, upto_token, limited
-
-    return update_function
-
-
 def make_http_update_function(hs, stream_name: str) -> UpdateFunction:
     """Makes a suitable function for use as an `update_function` that queries
     the master process for updates.
@@ -393,7 +373,7 @@ class PushersStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_pushers_stream_token),
-            db_query_to_update_function(store.get_all_updated_pushers_rows),
+            store.get_all_updated_pushers_rows,
         )
 
 
@@ -421,26 +401,12 @@ class CachesStream(Stream):
     ROW_TYPE = CachesStreamRow
 
     def __init__(self, hs):
-        self.store = hs.get_datastore()
+        store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
-            self.store.get_cache_stream_token,
-            self._update_function,
-        )
-
-    async def _update_function(
-        self, instance_name: str, from_token: int, upto_token: int, limit: int
-    ):
-        rows = await self.store.get_all_updated_caches(
-            instance_name, from_token, upto_token, limit
+            store.get_cache_stream_token,
+            store.get_all_updated_caches,
         )
-        updates = [(row[0], row[1:]) for row in rows]
-        limited = False
-        if len(updates) >= limit:
-            upto_token = updates[-1][0]
-            limited = True
-
-        return updates, upto_token, limited
 
 
 class PublicRoomsStream(Stream):
@@ -465,7 +431,7 @@ class PublicRoomsStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_current_public_room_stream_id),
-            db_query_to_update_function(store.get_all_new_public_rooms),
+            store.get_all_new_public_rooms,
         )
 
 
@@ -486,7 +452,7 @@ class DeviceListsStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_device_stream_token),
-            db_query_to_update_function(store.get_all_device_list_changes_for_remotes),
+            store.get_all_device_list_changes_for_remotes,
         )
 
 
@@ -504,7 +470,7 @@ class ToDeviceStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_to_device_stream_token),
-            db_query_to_update_function(store.get_all_new_device_messages),
+            store.get_all_new_device_messages,
         )
 
 
@@ -524,7 +490,7 @@ class TagAccountDataStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_max_account_data_stream_id),
-            db_query_to_update_function(store.get_all_updated_tags),
+            store.get_all_updated_tags,
         )
 
 
@@ -612,7 +578,7 @@ class GroupServerStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_group_stream_token),
-            db_query_to_update_function(store.get_all_groups_changes),
+            store.get_all_groups_changes,
         )
 
 
@@ -630,7 +596,5 @@ class UserSignatureStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_device_stream_token),
-            db_query_to_update_function(
-                store.get_all_user_signature_changes_for_remotes
-            ),
+            store.get_all_user_signature_changes_for_remotes,
         )