summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-03-08 11:44:20 +0000
committerErik Johnston <erik@matrix.org>2019-03-08 11:44:20 +0000
commit8c4896668fd602e2822dfc48e992ed48bbc5adf7 (patch)
tree99c0052e1697e74379050ad7af4cef2d62f21c2e /synapse/replication/tcp/protocol.py
parentFactor out soft fail checks (diff)
parentMerge pull request #4829 from matrix-org/erikj/device_list_seen_updates (diff)
downloadsynapse-8c4896668fd602e2822dfc48e992ed48bbc5adf7.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/soft_fail_impl
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py31
1 files changed, 28 insertions, 3 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 49ae5b3355..55630ba9a7 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -451,7 +451,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
     @defer.inlineCallbacks
     def subscribe_to_stream(self, stream_name, token):
-        """Subscribe the remote to a streams.
+        """Subscribe the remote to a stream.
 
         This invloves checking if they've missed anything and sending those
         updates down if they have. During that time new updates for the stream
@@ -478,11 +478,36 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
             # Now we can send any updates that came in while we were subscribing
             pending_rdata = self.pending_rdata.pop(stream_name, [])
+            updates = []
             for token, update in pending_rdata:
-                # Only send updates newer than the current token
-                if token > current_token:
+                # If the token is null, it is part of a batch update. Batches
+                # are multiple updates that share a single token. To denote
+                # this, the token is set to None for all tokens in the batch
+                # except for the last. If we find a None token, we keep looking
+                # through tokens until we find one that is not None and then
+                # process all previous updates in the batch as if they had the
+                # final token.
+                if token is None:
+                    # Store this update as part of a batch
+                    updates.append(update)
+                    continue
+
+                if token <= current_token:
+                    # This update or batch of updates is older than
+                    # current_token, dismiss it
+                    updates = []
+                    continue
+
+                updates.append(update)
+
+                # Send all updates that are part of this batch with the
+                # found token
+                for update in updates:
                     self.send_command(RdataCommand(stream_name, token, update))
 
+                # Clear stored updates
+                updates = []
+
             # They're now fully subscribed
             self.replication_streams.add(stream_name)
         except Exception as e: