summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py29
1 files changed, 18 insertions, 11 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 53615b7ee3..dac4fbeef7 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -487,15 +487,19 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
                 # through tokens until we find one that is not None and then
                 # process all previous updates in the batch as if they had the
                 # final token.
-                if not token or len(batch_updates) > 0:
-                    if token is None:
-                        # Store this update as part of the batch
-                        batch_updates.append(update)
-                    elif current_token <= current_token:
-                        # This batch is older than current_token, dismiss
+                if token is None:
+                    # Store this update as part of a batch
+                    batch_updates.append(update)
+                    continue
+
+                if len(batch_updates) > 0:
+                    # There is an ongoing batch and this is the end
+                    if current_token <= current_token:
+                        # This batch is older than current_token, dismiss it
                         batch_updates = []
                     else:
-                        # Append final update of this batch before sending
+                        # This is the end of the batch. Append final update of
+                        # this batch before sending
                         batch_updates.append(update)
 
                         # Send all updates that are part of this batch with the
@@ -505,10 +509,13 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
                         # Clear saved batch updates
                         batch_updates = []
-                else:
-                    # Only send updates newer than the current token
-                    if token > current_token:
-                        self.send_command(RdataCommand(stream_name, token, update))
+                    continue
+
+                # This is an update that's not part of a batch.
+                #
+                # Only send updates newer than the current token
+                if token > current_token:
+                    self.send_command(RdataCommand(stream_name, token, update))
 
             # They're now fully subscribed
             self.replication_streams.add(stream_name)