diff options
Diffstat (limited to 'synapse/replication/tcp/streams/_base.py')
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 19 |
1 files changed, 10 insertions, 9 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index a860072ccf..4ae3cffb1e 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -24,8 +24,8 @@ from synapse.replication.http.streams import ReplicationGetStreamUpdates logger = logging.getLogger(__name__) - -MAX_EVENTS_BEHIND = 500000 +# the number of rows to request from an update_function. +_STREAM_UPDATE_TARGET_ROW_COUNT = 100 # Some type aliases to make things a bit easier. @@ -56,7 +56,11 @@ StreamUpdateResult = Tuple[List[Tuple[Token, StreamRow]], Token, bool] # * from_token: the previous stream token: the starting point for fetching the # updates # * to_token: the new stream token: the point to get updates up to -# * limit: the maximum number of rows to return +# * target_row_count: a target for the number of rows to be returned. +# +# The update_function is expected to return up to _approximately_ target_row_count rows. +# If there are more updates available, it should set `limited` in the result, and +# it will be called again to get the next batch. # UpdateFunction = Callable[[Token, Token, int], Awaitable[StreamUpdateResult]] @@ -138,7 +142,7 @@ class Stream(object): return updates, current_token, limited async def get_updates_since( - self, from_token: Token, upto_token: Token, limit: int = 100 + self, from_token: Token, upto_token: Token ) -> StreamUpdateResult: """Like get_updates except allows specifying from when we should stream updates @@ -156,7 +160,7 @@ class Stream(object): return [], upto_token, False updates, upto_token, limited = await self.update_function( - from_token, upto_token, limit, + from_token, upto_token, _STREAM_UPDATE_TARGET_ROW_COUNT, ) return updates, upto_token, limited @@ -193,10 +197,7 @@ def make_http_update_function(hs, stream_name: str) -> UpdateFunction: from_token: int, upto_token: int, limit: int ) -> StreamUpdateResult: result = await client( - stream_name=stream_name, - from_token=from_token, - upto_token=upto_token, - limit=limit, + stream_name=stream_name, from_token=from_token, upto_token=upto_token, ) return result["updates"], result["upto_token"], result["limited"] |