From 313987187ee04dce5e70db17c1ab9377f283be7e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Feb 2019 15:04:34 +0000 Subject: Fix tightloop over connecting to replication server If the client failed to process incoming commands during the initial set up of the replication connection it would immediately disconnect and reconnect, resulting in a tightloop. This can happen, for example, when subscribing to a stream that has a row that is too long in the backlog. The fix here is to not consider the connection successfully set up until the client has succesfully subscribed and caught up with the streams. This ensures that the retry logic timers aren't reset until then, meaning that if an error does happen during start up the client will continue backing off before retrying again. --- synapse/replication/tcp/client.py | 38 ++++++++++++++++++++++++++++++++++--- synapse/replication/tcp/commands.py | 5 ++++- 2 files changed, 39 insertions(+), 4 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 586dddb40b..914cd24b55 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory): def buildProtocol(self, addr): logger.info("Connected to replication: %r", addr) - self.resetDelay() return ClientReplicationStreamProtocol( self.client_name, self.server_name, self._clock, self.handler ) @@ -90,15 +89,23 @@ class ReplicationClientHandler(object): # Used for tests. self.awaiting_syncs = {} + # Set of stream names that have been subscribe to, but haven't yet + # caught up with. This is used to track when the client has been fully + # connected to the remote. + self.streams_connecting = None + + # The factory used to create connections. + self.factory = None + def start_replication(self, hs): """Helper method to start a replication connection to the remote server using TCP. """ client_name = hs.config.worker_name - factory = ReplicationClientFactory(hs, client_name, self) + self.factory = ReplicationClientFactory(hs, client_name, self) host = hs.config.worker_replication_host port = hs.config.worker_replication_port - hs.get_reactor().connectTCP(host, port, factory) + hs.get_reactor().connectTCP(host, port, self.factory) def on_rdata(self, stream_name, token, rows): """Called when we get new replication data. By default this just pokes @@ -115,6 +122,12 @@ class ReplicationClientHandler(object): Can be overriden in subclasses to handle more. """ + # When we get a `POSITION` command it means we've finished getting + # missing updates for the given stream, and are now up to date. + self.streams_connecting.discard(stream_name) + if not self.streams_connecting: + self.finished_connecting() + return self.store.process_replication_rows(stream_name, token, []) def on_sync(self, data): @@ -140,6 +153,10 @@ class ReplicationClientHandler(object): args["account_data"] = user_account_data elif room_account_data: args["account_data"] = room_account_data + + # Record which streams we're in the process of subscribing to + self.streams_connecting = set(args.keys()) + return args def get_currently_syncing_users(self): @@ -204,3 +221,18 @@ class ReplicationClientHandler(object): for cmd in self.pending_commands: connection.send_command(cmd) self.pending_commands = [] + + # This will happen if we don't actually subscribe to any streams + if not self.streams_connecting: + self.finished_connecting() + + def finished_connecting(self): + """Called when we have successfully subscribed and caught up to all + streams we're interested in. + """ + logger.info("Finished connecting to server") + + # We don't reset the delay any earlier as otherwise if there is a + # problem during start up we'll end up tight looping connecting to the + # server. + self.factory.resetDelay() diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 327556f6a1..2098c32a77 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -127,8 +127,11 @@ class RdataCommand(Command): class PositionCommand(Command): - """Sent by the client to tell the client the stream postition without + """Sent by the server to tell the client the stream postition without needing to send an RDATA. + + Sent to the client after all missing updates for a stream have been sent + to the client and they're now up to date. """ NAME = "POSITION" -- cgit 1.4.1 From 25814921f1900e92b50830d6616762b174e82773 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Feb 2019 15:12:29 +0000 Subject: Increase the max delay between retry attempts Otherwise if you have many workers they can easily take out master with their connection attempts --- synapse/replication/tcp/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 914cd24b55..51f90655d0 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -39,7 +39,7 @@ class ReplicationClientFactory(ReconnectingClientFactory): Accepts a handler that will be called when new data is available or data is required. """ - maxDelay = 5 # Try at least once every N seconds + maxDelay = 30 # Try at least once every N seconds def __init__(self, hs, client_name, handler): self.client_name = client_name -- cgit 1.4.1 From 6870fc496ff3da5075fec74e40515c03c929915f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 27 Feb 2019 10:22:52 +0000 Subject: Move connecting logic into ClientReplicationStreamProtocol --- synapse/replication/tcp/client.py | 18 ------------------ synapse/replication/tcp/protocol.py | 17 +++++++++++++++++ 2 files changed, 17 insertions(+), 18 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 51f90655d0..e558f90e1a 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -89,11 +89,6 @@ class ReplicationClientHandler(object): # Used for tests. self.awaiting_syncs = {} - # Set of stream names that have been subscribe to, but haven't yet - # caught up with. This is used to track when the client has been fully - # connected to the remote. - self.streams_connecting = None - # The factory used to create connections. self.factory = None @@ -122,12 +117,6 @@ class ReplicationClientHandler(object): Can be overriden in subclasses to handle more. """ - # When we get a `POSITION` command it means we've finished getting - # missing updates for the given stream, and are now up to date. - self.streams_connecting.discard(stream_name) - if not self.streams_connecting: - self.finished_connecting() - return self.store.process_replication_rows(stream_name, token, []) def on_sync(self, data): @@ -154,9 +143,6 @@ class ReplicationClientHandler(object): elif room_account_data: args["account_data"] = room_account_data - # Record which streams we're in the process of subscribing to - self.streams_connecting = set(args.keys()) - return args def get_currently_syncing_users(self): @@ -222,10 +208,6 @@ class ReplicationClientHandler(object): connection.send_command(cmd) self.pending_commands = [] - # This will happen if we don't actually subscribe to any streams - if not self.streams_connecting: - self.finished_connecting() - def finished_connecting(self): """Called when we have successfully subscribed and caught up to all streams we're interested in. diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 0b3fe6cbf5..6123c995b9 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -511,6 +511,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.server_name = server_name self.handler = handler + # Set of stream names that have been subscribe to, but haven't yet + # caught up with. This is used to track when the client has been fully + # connected to the remote. + self.streams_connecting = set() + # Map of stream to batched updates. See RdataCommand for info on how # batching works. self.pending_batches = {} @@ -533,6 +538,10 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # We've now finished connecting to so inform the client handler self.handler.update_connection(self) + # This will happen if we don't actually subscribe to any streams + if not self.streams_connecting: + self.handler.finished_connecting() + def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) @@ -562,6 +571,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): return self.handler.on_rdata(stream_name, cmd.token, rows) def on_POSITION(self, cmd): + # When we get a `POSITION` command it means we've finished getting + # missing updates for the given stream, and are now up to date. + self.streams_connecting.discard(cmd.stream_name) + if not self.streams_connecting: + self.handler.finished_connecting() + return self.handler.on_position(cmd.stream_name, cmd.token) def on_SYNC(self, cmd): @@ -578,6 +593,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.id(), stream_name, token ) + self.streams_connecting.add(stream_name) + self.send_command(ReplicateCommand(stream_name, token)) def on_connection_closed(self): -- cgit 1.4.1