diff options
author | Erik Johnston <erik@matrix.org> | 2017-04-04 13:19:26 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-04-04 13:19:26 +0100 |
commit | 023ee197be23c02fa5f2e5bffba23fb0da2bf35b (patch) | |
tree | c0abea570b1eb5f83e01d43d61c9b46b7b9957c4 /synapse/replication/tcp/protocol.py | |
parent | Merge pull request #2082 from matrix-org/erikj/repl_tcp_server (diff) | |
download | synapse-023ee197be23c02fa5f2e5bffba23fb0da2bf35b.tar.xz |
Advance replication streams even if nothing is listening
Otherwise the streams don't advance and steadily fall behind, so when a worker does connect either a) they'll be streamed lots of old updates or b) the connection will fail as the streams are too far behind.
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r-- | synapse/replication/tcp/protocol.py | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 80f732b455..743f0207fa 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -414,16 +414,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): token, row = update[0], update[1] self.send_command(RdataCommand(stream_name, token, row)) - # Now we can send any updates that came in while we were subscribing - pending_rdata = self.pending_rdata.pop(stream_name, []) - for token, update in pending_rdata: - self.send_command(RdataCommand(stream_name, token, update)) - # We send a POSITION command to ensure that they have an up to # date token (especially useful if we didn't send any updates # above) self.send_command(PositionCommand(stream_name, current_token)) + # Now we can send any updates that came in while we were subscribing + pending_rdata = self.pending_rdata.pop(stream_name, []) + for token, update in pending_rdata: + # Only send updates newer than the current token + if token > current_token: + self.send_command(RdataCommand(stream_name, token, update)) + # They're now fully subscribed self.replication_streams.add(stream_name) except Exception as e: |