summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/client.py22
-rw-r--r--synapse/replication/tcp/commands.py5
-rw-r--r--synapse/replication/tcp/protocol.py83
-rw-r--r--synapse/replication/tcp/resource.py19
-rw-r--r--synapse/replication/tcp/streams.py11
5 files changed, 116 insertions, 24 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py

index 586dddb40b..e558f90e1a 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py
@@ -39,7 +39,7 @@ class ReplicationClientFactory(ReconnectingClientFactory): Accepts a handler that will be called when new data is available or data is required. """ - maxDelay = 5 # Try at least once every N seconds + maxDelay = 30 # Try at least once every N seconds def __init__(self, hs, client_name, handler): self.client_name = client_name @@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory): def buildProtocol(self, addr): logger.info("Connected to replication: %r", addr) - self.resetDelay() return ClientReplicationStreamProtocol( self.client_name, self.server_name, self._clock, self.handler ) @@ -90,15 +89,18 @@ class ReplicationClientHandler(object): # Used for tests. self.awaiting_syncs = {} + # The factory used to create connections. + self.factory = None + def start_replication(self, hs): """Helper method to start a replication connection to the remote server using TCP. """ client_name = hs.config.worker_name - factory = ReplicationClientFactory(hs, client_name, self) + self.factory = ReplicationClientFactory(hs, client_name, self) host = hs.config.worker_replication_host port = hs.config.worker_replication_port - hs.get_reactor().connectTCP(host, port, factory) + hs.get_reactor().connectTCP(host, port, self.factory) def on_rdata(self, stream_name, token, rows): """Called when we get new replication data. By default this just pokes @@ -140,6 +142,7 @@ class ReplicationClientHandler(object): args["account_data"] = user_account_data elif room_account_data: args["account_data"] = room_account_data + return args def get_currently_syncing_users(self): @@ -204,3 +207,14 @@ class ReplicationClientHandler(object): for cmd in self.pending_commands: connection.send_command(cmd) self.pending_commands = [] + + def finished_connecting(self): + """Called when we have successfully subscribed and caught up to all + streams we're interested in. + """ + logger.info("Finished connecting to server") + + # We don't reset the delay any earlier as otherwise if there is a + # problem during start up we'll end up tight looping connecting to the + # server. + self.factory.resetDelay() diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 327556f6a1..2098c32a77 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py
@@ -127,8 +127,11 @@ class RdataCommand(Command): class PositionCommand(Command): - """Sent by the client to tell the client the stream postition without + """Sent by the server to tell the client the stream postition without needing to send an RDATA. + + Sent to the client after all missing updates for a stream have been sent + to the client and they're now up to date. """ NAME = "POSITION" diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 429471c345..02e5bf6cc8 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py
@@ -223,14 +223,25 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): return # Now lets try and call on_<CMD_NAME> function - try: - run_as_background_process( - "replication-" + cmd.get_logcontext_id(), - getattr(self, "on_%s" % (cmd_name,)), - cmd, - ) - except Exception: - logger.exception("[%s] Failed to handle line: %r", self.id(), line) + run_as_background_process( + "replication-" + cmd.get_logcontext_id(), + self.handle_command, + cmd, + ) + + def handle_command(self, cmd): + """Handle a command we have received over the replication stream. + + By default delegates to on_<COMMAND> + + Args: + cmd (synapse.replication.tcp.commands.Command): received command + + Returns: + Deferred + """ + handler = getattr(self, "on_%s" % (cmd.NAME,)) + return handler(cmd) def close(self): logger.warn("[%s] Closing connection", self.id()) @@ -364,8 +375,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.transport.unregisterProducer() def __str__(self): + addr = None + if self.transport: + addr = str(self.transport.getPeer()) return "ReplicationConnection<name=%s,conn_id=%s,addr=%s>" % ( - self.name, self.conn_id, self.addr, + self.name, self.conn_id, addr, ) def id(self): @@ -381,12 +395,11 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS - def __init__(self, server_name, clock, streamer, addr): + def __init__(self, server_name, clock, streamer): BaseReplicationStreamProtocol.__init__(self, clock) # Old style class self.server_name = server_name self.streamer = streamer - self.addr = addr # The streams the client has subscribed to and is up to date with self.replication_streams = set() @@ -451,7 +464,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): @defer.inlineCallbacks def subscribe_to_stream(self, stream_name, token): - """Subscribe the remote to a streams. + """Subscribe the remote to a stream. This invloves checking if they've missed anything and sending those updates down if they have. During that time new updates for the stream @@ -478,11 +491,36 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): # Now we can send any updates that came in while we were subscribing pending_rdata = self.pending_rdata.pop(stream_name, []) + updates = [] for token, update in pending_rdata: - # Only send updates newer than the current token - if token > current_token: + # If the token is null, it is part of a batch update. Batches + # are multiple updates that share a single token. To denote + # this, the token is set to None for all tokens in the batch + # except for the last. If we find a None token, we keep looking + # through tokens until we find one that is not None and then + # process all previous updates in the batch as if they had the + # final token. + if token is None: + # Store this update as part of a batch + updates.append(update) + continue + + if token <= current_token: + # This update or batch of updates is older than + # current_token, dismiss it + updates = [] + continue + + updates.append(update) + + # Send all updates that are part of this batch with the + # found token + for update in updates: self.send_command(RdataCommand(stream_name, token, update)) + # Clear stored updates + updates = [] + # They're now fully subscribed self.replication_streams.add(stream_name) except Exception as e: @@ -526,6 +564,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.server_name = server_name self.handler = handler + # Set of stream names that have been subscribe to, but haven't yet + # caught up with. This is used to track when the client has been fully + # connected to the remote. + self.streams_connecting = set() + # Map of stream to batched updates. See RdataCommand for info on how # batching works. self.pending_batches = {} @@ -548,6 +591,10 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): # We've now finished connecting to so inform the client handler self.handler.update_connection(self) + # This will happen if we don't actually subscribe to any streams + if not self.streams_connecting: + self.handler.finished_connecting() + def on_SERVER(self, cmd): if cmd.data != self.server_name: logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data) @@ -577,6 +624,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): return self.handler.on_rdata(stream_name, cmd.token, rows) def on_POSITION(self, cmd): + # When we get a `POSITION` command it means we've finished getting + # missing updates for the given stream, and are now up to date. + self.streams_connecting.discard(cmd.stream_name) + if not self.streams_connecting: + self.handler.finished_connecting() + return self.handler.on_position(cmd.stream_name, cmd.token) def on_SYNC(self, cmd): @@ -593,6 +646,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.id(), stream_name, token ) + self.streams_connecting.add(stream_name) + self.send_command(ReplicateCommand(stream_name, token)) def on_connection_closed(self): diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index fd59f1595f..7fc346c7b6 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -16,6 +16,7 @@ """ import logging +import random from six import itervalues @@ -56,7 +57,6 @@ class ReplicationStreamProtocolFactory(Factory): self.server_name, self.clock, self.streamer, - addr ) @@ -74,6 +74,8 @@ class ReplicationStreamer(object): self.notifier = hs.get_notifier() self._server_notices_sender = hs.get_server_notices_sender() + self._replication_torture_level = hs.config.replication_torture_level + # Current connections. self.connections = [] @@ -157,10 +159,23 @@ class ReplicationStreamer(object): for stream in self.streams: stream.advance_current_token() - for stream in self.streams: + all_streams = self.streams + + if self._replication_torture_level is not None: + # there is no guarantee about ordering between the streams, + # so let's shuffle them around a bit when we are in torture mode. + all_streams = list(all_streams) + random.shuffle(all_streams) + + for stream in all_streams: if stream.last_token == stream.upto_token: continue + if self._replication_torture_level: + yield self.clock.sleep( + self._replication_torture_level / 1000.0 + ) + logger.debug( "Getting stream: %s: %s -> %s", stream.NAME, stream.last_token, stream.upto_token diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index c1e626be3f..e23084baae 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py
@@ -23,7 +23,7 @@ Each stream is defined by the following information: current_token: The function that returns the current token for the stream update_function: The function that returns a list of updates between two tokens """ - +import itertools import logging from collections import namedtuple @@ -195,8 +195,8 @@ class Stream(object): limit=MAX_EVENTS_BEHIND + 1, ) - if len(rows) >= MAX_EVENTS_BEHIND: - raise Exception("stream %s has fallen behind" % (self.NAME)) + # never turn more than MAX_EVENTS_BEHIND + 1 into updates. + rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1) else: rows = yield self.update_function( from_token, current_token, @@ -204,6 +204,11 @@ class Stream(object): updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows] + # check we didn't get more rows than the limit. + # doing it like this allows the update_function to be a generator. + if self._LIMITED and len(updates) >= MAX_EVENTS_BEHIND: + raise Exception("stream %s has fallen behind" % (self.NAME)) + defer.returnValue((updates, current_token)) def current_token(self):