diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 49ae5b3355..a6df04d851 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -451,7 +451,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
@defer.inlineCallbacks
def subscribe_to_stream(self, stream_name, token):
- """Subscribe the remote to a streams.
+ """Subscribe the remote to a stream.
This invloves checking if they've missed anything and sending those
updates down if they have. During that time new updates for the stream
@@ -478,10 +478,30 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Now we can send any updates that came in while we were subscribing
pending_rdata = self.pending_rdata.pop(stream_name, [])
+ batch_updates = []
for token, update in pending_rdata:
- # Only send updates newer than the current token
- if token > current_token:
- self.send_command(RdataCommand(stream_name, token, update))
+ # If the token is null, it is part of a batch update. Batches
+ # are multiple updates that share a single token. To denote
+ # this, the token is set to None for all tokens in the batch
+ # except for the last. If we find a None token, we keep looking
+ # through tokens until we find one that is not None and then
+ # process all previous updates in the batch as if they had the
+ # final token.
+ if not token or len(batch_updates) > 0:
+ batch_updates.append(update)
+ if token and not token > current_token:
+ # This batch is older than current_token, dismiss
+ batch_updates = []
+ continue
+ if token:
+ # Send all updates that are part of this batch with the
+ # found token
+ for update in batch_updates:
+ self.send_command(RdataCommand(stream_name, token, update))
+ else:
+ # Only send updates newer than the current token
+ if token > current_token:
+ self.send_command(RdataCommand(stream_name, token, update))
# They're now fully subscribed
self.replication_streams.add(stream_name)
|