diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2019-03-05 13:58:30 +0000 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2019-03-05 13:58:30 +0000 |
commit | b9f61630927752422fb80cf7ece083741aefd399 (patch) | |
tree | 807e627754d352b6ee1f705749ab2b2b42e87e3f /synapse | |
parent | Clean up logic and add comments (diff) | |
download | synapse-b9f61630927752422fb80cf7ece083741aefd399.tar.xz |
Simplify token replication logic
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/replication/tcp/protocol.py | 37 |
1 files changed, 14 insertions, 23 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index dac4fbeef7..55630ba9a7 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -478,7 +478,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Now we can send any updates that came in while we were subscribing pending_rdata = self.pending_rdata.pop(stream_name, []) - batch_updates = [] + updates = [] for token, update in pending_rdata: # If the token is null, it is part of a batch update. Batches # are multiple updates that share a single token. To denote @@ -489,34 +489,25 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # final token. if token is None: # Store this update as part of a batch - batch_updates.append(update) + updates.append(update) continue - if len(batch_updates) > 0: - # There is an ongoing batch and this is the end - if current_token <= current_token: - # This batch is older than current_token, dismiss it - batch_updates = [] - else: - # This is the end of the batch. Append final update of - # this batch before sending - batch_updates.append(update) - - # Send all updates that are part of this batch with the - # found token - for update in batch_updates: - self.send_command(RdataCommand(stream_name, token, update)) - - # Clear saved batch updates - batch_updates = [] + if token <= current_token: + # This update or batch of updates is older than + # current_token, dismiss it + updates = [] continue - # This is an update that's not part of a batch. - # - # Only send updates newer than the current token - if token > current_token: + updates.append(update) + + # Send all updates that are part of this batch with the + # found token + for update in updates: self.send_command(RdataCommand(stream_name, token, update)) + # Clear stored updates + updates = [] + # They're now fully subscribed self.replication_streams.add(stream_name) except Exception as e: |