diff options
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r-- | synapse/replication/tcp/client.py | 38 |
1 files changed, 35 insertions, 3 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 586dddb40b..914cd24b55 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory): def buildProtocol(self, addr): logger.info("Connected to replication: %r", addr) - self.resetDelay() return ClientReplicationStreamProtocol( self.client_name, self.server_name, self._clock, self.handler ) @@ -90,15 +89,23 @@ class ReplicationClientHandler(object): # Used for tests. self.awaiting_syncs = {} + # Set of stream names that have been subscribe to, but haven't yet + # caught up with. This is used to track when the client has been fully + # connected to the remote. + self.streams_connecting = None + + # The factory used to create connections. + self.factory = None + def start_replication(self, hs): """Helper method to start a replication connection to the remote server using TCP. """ client_name = hs.config.worker_name - factory = ReplicationClientFactory(hs, client_name, self) + self.factory = ReplicationClientFactory(hs, client_name, self) host = hs.config.worker_replication_host port = hs.config.worker_replication_port - hs.get_reactor().connectTCP(host, port, factory) + hs.get_reactor().connectTCP(host, port, self.factory) def on_rdata(self, stream_name, token, rows): """Called when we get new replication data. By default this just pokes @@ -115,6 +122,12 @@ class ReplicationClientHandler(object): Can be overriden in subclasses to handle more. """ + # When we get a `POSITION` command it means we've finished getting + # missing updates for the given stream, and are now up to date. + self.streams_connecting.discard(stream_name) + if not self.streams_connecting: + self.finished_connecting() + return self.store.process_replication_rows(stream_name, token, []) def on_sync(self, data): @@ -140,6 +153,10 @@ class ReplicationClientHandler(object): args["account_data"] = user_account_data elif room_account_data: args["account_data"] = room_account_data + + # Record which streams we're in the process of subscribing to + self.streams_connecting = set(args.keys()) + return args def get_currently_syncing_users(self): @@ -204,3 +221,18 @@ class ReplicationClientHandler(object): for cmd in self.pending_commands: connection.send_command(cmd) self.pending_commands = [] + + # This will happen if we don't actually subscribe to any streams + if not self.streams_connecting: + self.finished_connecting() + + def finished_connecting(self): + """Called when we have successfully subscribed and caught up to all + streams we're interested in. + """ + logger.info("Finished connecting to server") + + # We don't reset the delay any earlier as otherwise if there is a + # problem during start up we'll end up tight looping connecting to the + # server. + self.factory.resetDelay() |