diff options
author | Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> | 2019-03-06 15:48:29 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-06 15:48:29 +0000 |
commit | 7b8a157b7996582429028bb51fe2000cb946abcf (patch) | |
tree | 6e2fd14dede86ff3f16e4b7a147c4d728ab67176 /synapse/replication/tcp/protocol.py | |
parent | Merge pull request #4804 from matrix-org/babolivier/ratelimit_registration_im... (diff) | |
parent | Simplify token replication logic (diff) | |
download | synapse-7b8a157b7996582429028bb51fe2000cb946abcf.tar.xz |
Merge pull request #4792 from matrix-org/anoa/replication_tokens
Support batch updates in the worker sender
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r-- | synapse/replication/tcp/protocol.py | 31 |
1 files changed, 28 insertions, 3 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 49ae5b3355..55630ba9a7 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -451,7 +451,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): @defer.inlineCallbacks def subscribe_to_stream(self, stream_name, token): - """Subscribe the remote to a streams. + """Subscribe the remote to a stream. This invloves checking if they've missed anything and sending those updates down if they have. During that time new updates for the stream @@ -478,11 +478,36 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Now we can send any updates that came in while we were subscribing pending_rdata = self.pending_rdata.pop(stream_name, []) + updates = [] for token, update in pending_rdata: - # Only send updates newer than the current token - if token > current_token: + # If the token is null, it is part of a batch update. Batches + # are multiple updates that share a single token. To denote + # this, the token is set to None for all tokens in the batch + # except for the last. If we find a None token, we keep looking + # through tokens until we find one that is not None and then + # process all previous updates in the batch as if they had the + # final token. + if token is None: + # Store this update as part of a batch + updates.append(update) + continue + + if token <= current_token: + # This update or batch of updates is older than + # current_token, dismiss it + updates = [] + continue + + updates.append(update) + + # Send all updates that are part of this batch with the + # found token + for update in updates: self.send_command(RdataCommand(stream_name, token, update)) + # Clear stored updates + updates = [] + # They're now fully subscribed self.replication_streams.add(stream_name) except Exception as e: |