summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/protocol.py15
1 files changed, 11 insertions, 4 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index a6df04d851..53615b7ee3 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -488,16 +488,23 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
                 # process all previous updates in the batch as if they had the
                 # final token.
                 if not token or len(batch_updates) > 0:
-                    batch_updates.append(update)
-                    if token and not token > current_token:
+                    if token is None:
+                        # Store this update as part of the batch
+                        batch_updates.append(update)
+                    elif current_token <= current_token:
                         # This batch is older than current_token, dismiss
                         batch_updates = []
-                        continue
-                    if token:
+                    else:
+                        # Append final update of this batch before sending
+                        batch_updates.append(update)
+
                         # Send all updates that are part of this batch with the
                         # found token
                         for update in batch_updates:
                             self.send_command(RdataCommand(stream_name, token, update))
+
+                        # Clear saved batch updates
+                        batch_updates = []
                 else:
                     # Only send updates newer than the current token
                     if token > current_token: