summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/replication/tcp/protocol.py28
1 files changed, 24 insertions, 4 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 49ae5b3355..a6df04d851 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -451,7 +451,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
     @defer.inlineCallbacks
     def subscribe_to_stream(self, stream_name, token):
-        """Subscribe the remote to a streams.
+        """Subscribe the remote to a stream.
 
         This invloves checking if they've missed anything and sending those
         updates down if they have. During that time new updates for the stream
@@ -478,10 +478,30 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
             # Now we can send any updates that came in while we were subscribing
             pending_rdata = self.pending_rdata.pop(stream_name, [])
+            batch_updates = []
             for token, update in pending_rdata:
-                # Only send updates newer than the current token
-                if token > current_token:
-                    self.send_command(RdataCommand(stream_name, token, update))
+                # If the token is null, it is part of a batch update. Batches
+                # are multiple updates that share a single token. To denote
+                # this, the token is set to None for all tokens in the batch
+                # except for the last. If we find a None token, we keep looking
+                # through tokens until we find one that is not None and then
+                # process all previous updates in the batch as if they had the
+                # final token.
+                if not token or len(batch_updates) > 0:
+                    batch_updates.append(update)
+                    if token and not token > current_token:
+                        # This batch is older than current_token, dismiss
+                        batch_updates = []
+                        continue
+                    if token:
+                        # Send all updates that are part of this batch with the
+                        # found token
+                        for update in batch_updates:
+                            self.send_command(RdataCommand(stream_name, token, update))
+                else:
+                    # Only send updates newer than the current token
+                    if token > current_token:
+                        self.send_command(RdataCommand(stream_name, token, update))
 
             # They're now fully subscribed
             self.replication_streams.add(stream_name)