summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/protocol.py37
1 files changed, 14 insertions, 23 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index dac4fbeef7..55630ba9a7 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -478,7 +478,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
             # Now we can send any updates that came in while we were subscribing
             pending_rdata = self.pending_rdata.pop(stream_name, [])
-            batch_updates = []
+            updates = []
             for token, update in pending_rdata:
                 # If the token is null, it is part of a batch update. Batches
                 # are multiple updates that share a single token. To denote
@@ -489,34 +489,25 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
                 # final token.
                 if token is None:
                     # Store this update as part of a batch
-                    batch_updates.append(update)
+                    updates.append(update)
                     continue
 
-                if len(batch_updates) > 0:
-                    # There is an ongoing batch and this is the end
-                    if current_token <= current_token:
-                        # This batch is older than current_token, dismiss it
-                        batch_updates = []
-                    else:
-                        # This is the end of the batch. Append final update of
-                        # this batch before sending
-                        batch_updates.append(update)
-
-                        # Send all updates that are part of this batch with the
-                        # found token
-                        for update in batch_updates:
-                            self.send_command(RdataCommand(stream_name, token, update))
-
-                        # Clear saved batch updates
-                        batch_updates = []
+                if token <= current_token:
+                    # This update or batch of updates is older than
+                    # current_token, dismiss it
+                    updates = []
                     continue
 
-                # This is an update that's not part of a batch.
-                #
-                # Only send updates newer than the current token
-                if token > current_token:
+                updates.append(update)
+
+                # Send all updates that are part of this batch with the
+                # found token
+                for update in updates:
                     self.send_command(RdataCommand(stream_name, token, update))
 
+                # Clear stored updates
+                updates = []
+
             # They're now fully subscribed
             self.replication_streams.add(stream_name)
         except Exception as e: