diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index ffd4c61993..f35cebc710 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -28,7 +28,7 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
The API looks like:
- GET /_synapse/replication/get_repl_stream_updates/events?from_token=0&to_token=10&limit=100
+ GET /_synapse/replication/get_repl_stream_updates/<stream name>?from_token=0&to_token=10
200 OK
@@ -38,6 +38,9 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
limited: False,
}
+ If there are more rows than can sensibly be returned in one lump, `limited` will be
+ set to true, and the caller should call again with a new `from_token`.
+
"""
NAME = "get_repl_stream_updates"
@@ -52,8 +55,8 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
self.streams = hs.get_replication_streamer().get_streams()
@staticmethod
- def _serialize_payload(stream_name, from_token, upto_token, limit):
- return {"from_token": from_token, "upto_token": upto_token, "limit": limit}
+ def _serialize_payload(stream_name, from_token, upto_token):
+ return {"from_token": from_token, "upto_token": upto_token}
async def _handle_request(self, request, stream_name):
stream = self.streams.get(stream_name)
@@ -62,10 +65,9 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
from_token = parse_integer(request, "from_token", required=True)
upto_token = parse_integer(request, "upto_token", required=True)
- limit = parse_integer(request, "limit", required=True)
updates, upto_token, limited = await stream.get_updates_since(
- from_token, upto_token, limit
+ from_token, upto_token
)
return (
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index a860072ccf..112bfead56 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -193,10 +193,7 @@ def make_http_update_function(hs, stream_name: str) -> UpdateFunction:
from_token: int, upto_token: int, limit: int
) -> StreamUpdateResult:
result = await client(
- stream_name=stream_name,
- from_token=from_token,
- upto_token=upto_token,
- limit=limit,
+ stream_name=stream_name, from_token=from_token, upto_token=upto_token,
)
return result["updates"], result["upto_token"], result["limited"]
|