summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2017-04-04 15:40:51 +0100
committerGitHub <noreply@github.com>2017-04-04 15:40:51 +0100
commita76886726bbe68bcc7781ebb457cb1dc728efc4e (patch)
tree4682c31a49aa26abbbeebf7088e3bba3316c8a86 /synapse/replication/tcp/protocol.py
parentFiddle tcp replication logging (diff)
parentAdvance replication streams even if nothing is listening (diff)
downloadsynapse-a76886726bbe68bcc7781ebb457cb1dc728efc4e.tar.xz
Merge pull request #2098 from matrix-org/erikj/repl_tcp_fix
Advance replication streams even if nothing is listening
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py

index 6ae3c86a8d..6864204616 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py
@@ -415,16 +415,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): token, row = update[0], update[1] self.send_command(RdataCommand(stream_name, token, row)) - # Now we can send any updates that came in while we were subscribing - pending_rdata = self.pending_rdata.pop(stream_name, []) - for token, update in pending_rdata: - self.send_command(RdataCommand(stream_name, token, update)) - # We send a POSITION command to ensure that they have an up to # date token (especially useful if we didn't send any updates # above) self.send_command(PositionCommand(stream_name, current_token)) + # Now we can send any updates that came in while we were subscribing + pending_rdata = self.pending_rdata.pop(stream_name, []) + for token, update in pending_rdata: + # Only send updates newer than the current token + if token > current_token: + self.send_command(RdataCommand(stream_name, token, update)) + # They're now fully subscribed self.replication_streams.add(stream_name) except Exception as e: