summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/streams/_base.py')
-rw-r--r--synapse/replication/tcp/streams/_base.py19
1 files changed, 10 insertions, 9 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index a860072ccf..4ae3cffb1e 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -24,8 +24,8 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates
 
 logger = logging.getLogger(__name__)
 
-
-MAX_EVENTS_BEHIND = 500000
+# the number of rows to request from an update_function.
+_STREAM_UPDATE_TARGET_ROW_COUNT = 100
 
 
 # Some type aliases to make things a bit easier.
@@ -56,7 +56,11 @@ StreamUpdateResult = Tuple[List[Tuple[Token, StreamRow]], Token, bool]
 #  * from_token: the previous stream token: the starting point for fetching the
 #    updates
 #  * to_token: the new stream token: the point to get updates up to
-#  * limit: the maximum number of rows to return
+#  * target_row_count: a target for the number of rows to be returned.
+#
+# The update_function is expected to return up to _approximately_ target_row_count rows.
+# If there are more updates available, it should set `limited` in the result, and
+# it will be called again to get the next batch.
 #
 UpdateFunction = Callable[[Token, Token, int], Awaitable[StreamUpdateResult]]
 
@@ -138,7 +142,7 @@ class Stream(object):
         return updates, current_token, limited
 
     async def get_updates_since(
-        self, from_token: Token, upto_token: Token, limit: int = 100
+        self, from_token: Token, upto_token: Token
     ) -> StreamUpdateResult:
         """Like get_updates except allows specifying from when we should
         stream updates
@@ -156,7 +160,7 @@ class Stream(object):
             return [], upto_token, False
 
         updates, upto_token, limited = await self.update_function(
-            from_token, upto_token, limit,
+            from_token, upto_token, _STREAM_UPDATE_TARGET_ROW_COUNT,
         )
         return updates, upto_token, limited
 
@@ -193,10 +197,7 @@ def make_http_update_function(hs, stream_name: str) -> UpdateFunction:
         from_token: int, upto_token: int, limit: int
     ) -> StreamUpdateResult:
         result = await client(
-            stream_name=stream_name,
-            from_token=from_token,
-            upto_token=upto_token,
-            limit=limit,
+            stream_name=stream_name, from_token=from_token, upto_token=upto_token,
         )
         return result["updates"], result["upto_token"], result["limited"]