diff options
author | Erik Johnston <erikj@jki.re> | 2019-02-27 11:00:59 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-02-27 11:00:59 +0000 |
commit | 7590e9fa285afac717132b16cbcba4e8b19943dd (patch) | |
tree | 40ab9e389596559b7119054ffc62e57c64bf4f90 /synapse/replication/tcp/client.py | |
parent | 0.99.2rc1 (diff) | |
parent | Move connecting logic into ClientReplicationStreamProtocol (diff) | |
download | synapse-7590e9fa285afac717132b16cbcba4e8b19943dd.tar.xz |
Merge pull request #4749 from matrix-org/erikj/replication_connection_backoff
Fix tightloop over connecting to replication server
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r-- | synapse/replication/tcp/client.py | 22 |
1 files changed, 18 insertions, 4 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 586dddb40b..e558f90e1a 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -39,7 +39,7 @@ class ReplicationClientFactory(ReconnectingClientFactory): Accepts a handler that will be called when new data is available or data is required. """ - maxDelay = 5 # Try at least once every N seconds + maxDelay = 30 # Try at least once every N seconds def __init__(self, hs, client_name, handler): self.client_name = client_name @@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory): def buildProtocol(self, addr): logger.info("Connected to replication: %r", addr) - self.resetDelay() return ClientReplicationStreamProtocol( self.client_name, self.server_name, self._clock, self.handler ) @@ -90,15 +89,18 @@ class ReplicationClientHandler(object): # Used for tests. self.awaiting_syncs = {} + # The factory used to create connections. + self.factory = None + def start_replication(self, hs): """Helper method to start a replication connection to the remote server using TCP. """ client_name = hs.config.worker_name - factory = ReplicationClientFactory(hs, client_name, self) + self.factory = ReplicationClientFactory(hs, client_name, self) host = hs.config.worker_replication_host port = hs.config.worker_replication_port - hs.get_reactor().connectTCP(host, port, factory) + hs.get_reactor().connectTCP(host, port, self.factory) def on_rdata(self, stream_name, token, rows): """Called when we get new replication data. By default this just pokes @@ -140,6 +142,7 @@ class ReplicationClientHandler(object): args["account_data"] = user_account_data elif room_account_data: args["account_data"] = room_account_data + return args def get_currently_syncing_users(self): @@ -204,3 +207,14 @@ class ReplicationClientHandler(object): for cmd in self.pending_commands: connection.send_command(cmd) self.pending_commands = [] + + def finished_connecting(self): + """Called when we have successfully subscribed and caught up to all + streams we're interested in. + """ + logger.info("Finished connecting to server") + + # We don't reset the delay any earlier as otherwise if there is a + # problem during start up we'll end up tight looping connecting to the + # server. + self.factory.resetDelay() |