summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-02-26 15:04:34 +0000
committerErik Johnston <erik@matrix.org>2019-02-26 15:05:41 +0000
commit313987187ee04dce5e70db17c1ab9377f283be7e (patch)
treefc529070b91290f9ea6d31afc652454c23243ecc /synapse/replication/tcp/client.py
parentMerge pull request #4746 from matrix-org/anoa/public_rooms_federate_develop (diff)
downloadsynapse-313987187ee04dce5e70db17c1ab9377f283be7e.tar.xz
Fix tightloop over connecting to replication server
If the client failed to process incoming commands during the initial set
up of the replication connection it would immediately disconnect and
reconnect, resulting in a tightloop.

This can happen, for example, when subscribing to a stream that has a
row that is too long in the backlog.

The fix here is to not consider the connection successfully set up until
the client has succesfully subscribed and caught up with the streams.
This ensures that the retry logic timers aren't reset until then,
meaning that if an error does happen during start up the client will
continue backing off before retrying again.
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py38
1 files changed, 35 insertions, 3 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 586dddb40b..914cd24b55 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory):
 
     def buildProtocol(self, addr):
         logger.info("Connected to replication: %r", addr)
-        self.resetDelay()
         return ClientReplicationStreamProtocol(
             self.client_name, self.server_name, self._clock, self.handler
         )
@@ -90,15 +89,23 @@ class ReplicationClientHandler(object):
         # Used for tests.
         self.awaiting_syncs = {}
 
+        # Set of stream names that have been subscribe to, but haven't yet
+        # caught up with. This is used to track when the client has been fully
+        # connected to the remote.
+        self.streams_connecting = None
+
+        # The factory used to create connections.
+        self.factory = None
+
     def start_replication(self, hs):
         """Helper method to start a replication connection to the remote server
         using TCP.
         """
         client_name = hs.config.worker_name
-        factory = ReplicationClientFactory(hs, client_name, self)
+        self.factory = ReplicationClientFactory(hs, client_name, self)
         host = hs.config.worker_replication_host
         port = hs.config.worker_replication_port
-        hs.get_reactor().connectTCP(host, port, factory)
+        hs.get_reactor().connectTCP(host, port, self.factory)
 
     def on_rdata(self, stream_name, token, rows):
         """Called when we get new replication data. By default this just pokes
@@ -115,6 +122,12 @@ class ReplicationClientHandler(object):
 
         Can be overriden in subclasses to handle more.
         """
+        # When we get a `POSITION` command it means we've finished getting
+        # missing updates for the given stream, and are now up to date.
+        self.streams_connecting.discard(stream_name)
+        if not self.streams_connecting:
+            self.finished_connecting()
+
         return self.store.process_replication_rows(stream_name, token, [])
 
     def on_sync(self, data):
@@ -140,6 +153,10 @@ class ReplicationClientHandler(object):
             args["account_data"] = user_account_data
         elif room_account_data:
             args["account_data"] = room_account_data
+
+        # Record which streams we're in the process of subscribing to
+        self.streams_connecting = set(args.keys())
+
         return args
 
     def get_currently_syncing_users(self):
@@ -204,3 +221,18 @@ class ReplicationClientHandler(object):
             for cmd in self.pending_commands:
                 connection.send_command(cmd)
             self.pending_commands = []
+
+            # This will happen if we don't actually subscribe to any streams
+            if not self.streams_connecting:
+                self.finished_connecting()
+
+    def finished_connecting(self):
+        """Called when we have successfully subscribed and caught up to all
+        streams we're interested in.
+        """
+        logger.info("Finished connecting to server")
+
+        # We don't reset the delay any earlier as otherwise if there is a
+        # problem during start up we'll end up tight looping connecting to the
+        # server.
+        self.factory.resetDelay()