summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index bc1482a9bb..d7ef2398fa 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -485,15 +485,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
         self.connecting_streams.add(stream_name)
 
         try:
-            # Get missing updates
-            updates, current_token = await self.streamer.get_stream_updates(
-                stream_name, token
-            )
-
-            # Send all the missing updates
-            for update in updates:
-                token, row = update[0], update[1]
-                self.send_command(RdataCommand(stream_name, token, row))
+            limited = True
+            while limited:
+                # Get missing updates
+                (
+                    updates,
+                    current_token,
+                    limited,
+                ) = await self.streamer.get_stream_updates(stream_name, token)
+
+                # Send all the missing updates
+                for update in updates:
+                    token, row = update[0], update[1]
+                    self.send_command(RdataCommand(stream_name, token, row))
 
             # We send a POSITION command to ensure that they have an up to
             # date token (especially useful if we didn't send any updates