diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 586dddb40b..e558f90e1a 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -39,7 +39,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
Accepts a handler that will be called when new data is available or data
is required.
"""
- maxDelay = 5 # Try at least once every N seconds
+ maxDelay = 30 # Try at least once every N seconds
def __init__(self, hs, client_name, handler):
self.client_name = client_name
@@ -54,7 +54,6 @@ class ReplicationClientFactory(ReconnectingClientFactory):
def buildProtocol(self, addr):
logger.info("Connected to replication: %r", addr)
- self.resetDelay()
return ClientReplicationStreamProtocol(
self.client_name, self.server_name, self._clock, self.handler
)
@@ -90,15 +89,18 @@ class ReplicationClientHandler(object):
# Used for tests.
self.awaiting_syncs = {}
+ # The factory used to create connections.
+ self.factory = None
+
def start_replication(self, hs):
"""Helper method to start a replication connection to the remote server
using TCP.
"""
client_name = hs.config.worker_name
- factory = ReplicationClientFactory(hs, client_name, self)
+ self.factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
- hs.get_reactor().connectTCP(host, port, factory)
+ hs.get_reactor().connectTCP(host, port, self.factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes
@@ -140,6 +142,7 @@ class ReplicationClientHandler(object):
args["account_data"] = user_account_data
elif room_account_data:
args["account_data"] = room_account_data
+
return args
def get_currently_syncing_users(self):
@@ -204,3 +207,14 @@ class ReplicationClientHandler(object):
for cmd in self.pending_commands:
connection.send_command(cmd)
self.pending_commands = []
+
+ def finished_connecting(self):
+ """Called when we have successfully subscribed and caught up to all
+ streams we're interested in.
+ """
+ logger.info("Finished connecting to server")
+
+ # We don't reset the delay any earlier as otherwise if there is a
+ # problem during start up we'll end up tight looping connecting to the
+ # server.
+ self.factory.resetDelay()
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 327556f6a1..2098c32a77 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -127,8 +127,11 @@ class RdataCommand(Command):
class PositionCommand(Command):
- """Sent by the client to tell the client the stream postition without
+ """Sent by the server to tell the client the stream postition without
needing to send an RDATA.
+
+ Sent to the client after all missing updates for a stream have been sent
+ to the client and they're now up to date.
"""
NAME = "POSITION"
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 429471c345..02e5bf6cc8 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -223,14 +223,25 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
return
# Now lets try and call on_<CMD_NAME> function
- try:
- run_as_background_process(
- "replication-" + cmd.get_logcontext_id(),
- getattr(self, "on_%s" % (cmd_name,)),
- cmd,
- )
- except Exception:
- logger.exception("[%s] Failed to handle line: %r", self.id(), line)
+ run_as_background_process(
+ "replication-" + cmd.get_logcontext_id(),
+ self.handle_command,
+ cmd,
+ )
+
+ def handle_command(self, cmd):
+ """Handle a command we have received over the replication stream.
+
+ By default delegates to on_<COMMAND>
+
+ Args:
+ cmd (synapse.replication.tcp.commands.Command): received command
+
+ Returns:
+ Deferred
+ """
+ handler = getattr(self, "on_%s" % (cmd.NAME,))
+ return handler(cmd)
def close(self):
logger.warn("[%s] Closing connection", self.id())
@@ -364,8 +375,11 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.transport.unregisterProducer()
def __str__(self):
+ addr = None
+ if self.transport:
+ addr = str(self.transport.getPeer())
return "ReplicationConnection<name=%s,conn_id=%s,addr=%s>" % (
- self.name, self.conn_id, self.addr,
+ self.name, self.conn_id, addr,
)
def id(self):
@@ -381,12 +395,11 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
VALID_INBOUND_COMMANDS = VALID_CLIENT_COMMANDS
VALID_OUTBOUND_COMMANDS = VALID_SERVER_COMMANDS
- def __init__(self, server_name, clock, streamer, addr):
+ def __init__(self, server_name, clock, streamer):
BaseReplicationStreamProtocol.__init__(self, clock) # Old style class
self.server_name = server_name
self.streamer = streamer
- self.addr = addr
# The streams the client has subscribed to and is up to date with
self.replication_streams = set()
@@ -451,7 +464,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
@defer.inlineCallbacks
def subscribe_to_stream(self, stream_name, token):
- """Subscribe the remote to a streams.
+ """Subscribe the remote to a stream.
This invloves checking if they've missed anything and sending those
updates down if they have. During that time new updates for the stream
@@ -478,11 +491,36 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
# Now we can send any updates that came in while we were subscribing
pending_rdata = self.pending_rdata.pop(stream_name, [])
+ updates = []
for token, update in pending_rdata:
- # Only send updates newer than the current token
- if token > current_token:
+ # If the token is null, it is part of a batch update. Batches
+ # are multiple updates that share a single token. To denote
+ # this, the token is set to None for all tokens in the batch
+ # except for the last. If we find a None token, we keep looking
+ # through tokens until we find one that is not None and then
+ # process all previous updates in the batch as if they had the
+ # final token.
+ if token is None:
+ # Store this update as part of a batch
+ updates.append(update)
+ continue
+
+ if token <= current_token:
+ # This update or batch of updates is older than
+ # current_token, dismiss it
+ updates = []
+ continue
+
+ updates.append(update)
+
+ # Send all updates that are part of this batch with the
+ # found token
+ for update in updates:
self.send_command(RdataCommand(stream_name, token, update))
+ # Clear stored updates
+ updates = []
+
# They're now fully subscribed
self.replication_streams.add(stream_name)
except Exception as e:
@@ -526,6 +564,11 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.server_name = server_name
self.handler = handler
+ # Set of stream names that have been subscribe to, but haven't yet
+ # caught up with. This is used to track when the client has been fully
+ # connected to the remote.
+ self.streams_connecting = set()
+
# Map of stream to batched updates. See RdataCommand for info on how
# batching works.
self.pending_batches = {}
@@ -548,6 +591,10 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
# We've now finished connecting to so inform the client handler
self.handler.update_connection(self)
+ # This will happen if we don't actually subscribe to any streams
+ if not self.streams_connecting:
+ self.handler.finished_connecting()
+
def on_SERVER(self, cmd):
if cmd.data != self.server_name:
logger.error("[%s] Connected to wrong remote: %r", self.id(), cmd.data)
@@ -577,6 +624,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
return self.handler.on_rdata(stream_name, cmd.token, rows)
def on_POSITION(self, cmd):
+ # When we get a `POSITION` command it means we've finished getting
+ # missing updates for the given stream, and are now up to date.
+ self.streams_connecting.discard(cmd.stream_name)
+ if not self.streams_connecting:
+ self.handler.finished_connecting()
+
return self.handler.on_position(cmd.stream_name, cmd.token)
def on_SYNC(self, cmd):
@@ -593,6 +646,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.id(), stream_name, token
)
+ self.streams_connecting.add(stream_name)
+
self.send_command(ReplicateCommand(stream_name, token))
def on_connection_closed(self):
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index fd59f1595f..7fc346c7b6 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -16,6 +16,7 @@
"""
import logging
+import random
from six import itervalues
@@ -56,7 +57,6 @@ class ReplicationStreamProtocolFactory(Factory):
self.server_name,
self.clock,
self.streamer,
- addr
)
@@ -74,6 +74,8 @@ class ReplicationStreamer(object):
self.notifier = hs.get_notifier()
self._server_notices_sender = hs.get_server_notices_sender()
+ self._replication_torture_level = hs.config.replication_torture_level
+
# Current connections.
self.connections = []
@@ -157,10 +159,23 @@ class ReplicationStreamer(object):
for stream in self.streams:
stream.advance_current_token()
- for stream in self.streams:
+ all_streams = self.streams
+
+ if self._replication_torture_level is not None:
+ # there is no guarantee about ordering between the streams,
+ # so let's shuffle them around a bit when we are in torture mode.
+ all_streams = list(all_streams)
+ random.shuffle(all_streams)
+
+ for stream in all_streams:
if stream.last_token == stream.upto_token:
continue
+ if self._replication_torture_level:
+ yield self.clock.sleep(
+ self._replication_torture_level / 1000.0
+ )
+
logger.debug(
"Getting stream: %s: %s -> %s",
stream.NAME, stream.last_token, stream.upto_token
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index c1e626be3f..e23084baae 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -23,7 +23,7 @@ Each stream is defined by the following information:
current_token: The function that returns the current token for the stream
update_function: The function that returns a list of updates between two tokens
"""
-
+import itertools
import logging
from collections import namedtuple
@@ -195,8 +195,8 @@ class Stream(object):
limit=MAX_EVENTS_BEHIND + 1,
)
- if len(rows) >= MAX_EVENTS_BEHIND:
- raise Exception("stream %s has fallen behind" % (self.NAME))
+ # never turn more than MAX_EVENTS_BEHIND + 1 into updates.
+ rows = itertools.islice(rows, MAX_EVENTS_BEHIND + 1)
else:
rows = yield self.update_function(
from_token, current_token,
@@ -204,6 +204,11 @@ class Stream(object):
updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]
+ # check we didn't get more rows than the limit.
+ # doing it like this allows the update_function to be a generator.
+ if self._LIMITED and len(updates) >= MAX_EVENTS_BEHIND:
+ raise Exception("stream %s has fallen behind" % (self.NAME))
+
defer.returnValue((updates, current_token))
def current_token(self):
|