1 files changed, 11 insertions, 4 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index a6df04d851..53615b7ee3 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -488,16 +488,23 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
# process all previous updates in the batch as if they had the
# final token.
if not token or len(batch_updates) > 0:
- batch_updates.append(update)
- if token and not token > current_token:
+ if token is None:
+ # Store this update as part of the batch
+ batch_updates.append(update)
+ elif current_token <= current_token:
# This batch is older than current_token, dismiss
batch_updates = []
- continue
- if token:
+ else:
+ # Append final update of this batch before sending
+ batch_updates.append(update)
+
# Send all updates that are part of this batch with the
# found token
for update in batch_updates:
self.send_command(RdataCommand(stream_name, token, update))
+
+ # Clear saved batch updates
+ batch_updates = []
else:
# Only send updates newer than the current token
if token > current_token:
|