1 files changed, 13 insertions, 9 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index bc1482a9bb..d7ef2398fa 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -485,15 +485,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.connecting_streams.add(stream_name)
try:
- # Get missing updates
- updates, current_token = await self.streamer.get_stream_updates(
- stream_name, token
- )
-
- # Send all the missing updates
- for update in updates:
- token, row = update[0], update[1]
- self.send_command(RdataCommand(stream_name, token, row))
+ limited = True
+ while limited:
+ # Get missing updates
+ (
+ updates,
+ current_token,
+ limited,
+ ) = await self.streamer.get_stream_updates(stream_name, token)
+
+ # Send all the missing updates
+ for update in updates:
+ token, row = update[0], update[1]
+ self.send_command(RdataCommand(stream_name, token, row))
# We send a POSITION command to ensure that they have an up to
# date token (especially useful if we didn't send any updates
|