1 files changed, 14 insertions, 23 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index dac4fbeef7..55630ba9a7 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -478,7 +478,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Now we can send any updates that came in while we were subscribing
pending_rdata = self.pending_rdata.pop(stream_name, [])
- batch_updates = []
+ updates = []
for token, update in pending_rdata:
# If the token is null, it is part of a batch update. Batches
# are multiple updates that share a single token. To denote
@@ -489,34 +489,25 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
# final token.
if token is None:
# Store this update as part of a batch
- batch_updates.append(update)
+ updates.append(update)
continue
- if len(batch_updates) > 0:
- # There is an ongoing batch and this is the end
- if current_token <= current_token:
- # This batch is older than current_token, dismiss it
- batch_updates = []
- else:
- # This is the end of the batch. Append final update of
- # this batch before sending
- batch_updates.append(update)
-
- # Send all updates that are part of this batch with the
- # found token
- for update in batch_updates:
- self.send_command(RdataCommand(stream_name, token, update))
-
- # Clear saved batch updates
- batch_updates = []
+ if token <= current_token:
+ # This update or batch of updates is older than
+ # current_token, dismiss it
+ updates = []
continue
- # This is an update that's not part of a batch.
- #
- # Only send updates newer than the current token
- if token > current_token:
+ updates.append(update)
+
+ # Send all updates that are part of this batch with the
+ # found token
+ for update in updates:
self.send_command(RdataCommand(stream_name, token, update))
+ # Clear stored updates
+ updates = []
+
# They're now fully subscribed
self.replication_streams.add(stream_name)
except Exception as e:
|