From 023ee197be23c02fa5f2e5bffba23fb0da2bf35b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 4 Apr 2017 13:19:26 +0100 Subject: Advance replication streams even if nothing is listening Otherwise the streams don't advance and steadily fall behind, so when a worker does connect either a) they'll be streamed lots of old updates or b) the connection will fail as the streams are too far behind. --- synapse/replication/tcp/protocol.py | 12 +++++++----- synapse/replication/tcp/resource.py | 2 +- synapse/replication/tcp/streams.py | 7 +++++++ 3 files changed, 15 insertions(+), 6 deletions(-) (limited to 'synapse/replication/tcp') diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 80f732b455..743f0207fa 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -414,16 +414,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): token, row = update[0], update[1] self.send_command(RdataCommand(stream_name, token, row)) - # Now we can send any updates that came in while we were subscribing - pending_rdata = self.pending_rdata.pop(stream_name, []) - for token, update in pending_rdata: - self.send_command(RdataCommand(stream_name, token, update)) - # We send a POSITION command to ensure that they have an up to # date token (especially useful if we didn't send any updates # above) self.send_command(PositionCommand(stream_name, current_token)) + # Now we can send any updates that came in while we were subscribing + pending_rdata = self.pending_rdata.pop(stream_name, []) + for token, update in pending_rdata: + # Only send updates newer than the current token + if token > current_token: + self.send_command(RdataCommand(stream_name, token, update)) + # They're now fully subscribed self.replication_streams.add(stream_name) except Exception as e: diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 0d7ea57318..8b2c4c3043 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -124,7 +124,7 @@ class ReplicationStreamer(object): # Don't bother if nothing is listening. We still need to advance # the stream tokens otherwise they'll fall beihind forever for stream in self.streams: - stream.advance_current_token() + stream.discard_updates_and_advance() return # If we're in the process of checking for new updates, mark that fact diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index fada40c6ef..4de4ebe84d 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -89,6 +89,13 @@ class Stream(object): """ self.upto_token = self.current_token() + def discard_updates_and_advance(self): + """Called when the stream should advance but the updates would be discarded, + e.g. when there are no currently connected workers. + """ + self.upto_token = self.current_token() + self.last_token = self.upto_token + @defer.inlineCallbacks def get_updates(self): """Gets all updates since the last time this function was called (or -- cgit 1.4.1