diff options
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r-- | synapse/replication/tcp/resource.py | 57 |
1 files changed, 56 insertions, 1 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 687984e7a8..1d4ceac0f1 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -23,7 +23,9 @@ from prometheus_client import Counter from twisted.internet.protocol import Factory from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.tcp.commands import PositionCommand from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol +from synapse.replication.tcp.streams import EventsStream from synapse.util.metrics import Measure stream_updates_counter = Counter( @@ -84,6 +86,23 @@ class ReplicationStreamer: # Set of streams to replicate. self.streams = self.command_handler.get_streams_to_replicate() + # If we have streams then we must have redis enabled or on master + assert ( + not self.streams + or hs.config.redis.redis_enabled + or not hs.config.worker.worker_app + ) + + # If we are replicating an event stream we want to periodically check if + # we should send updated POSITIONs. We do this as a looping call rather + # explicitly poking when the position advances (without new data to + # replicate) to reduce replication traffic (otherwise each writer would + # likely send a POSITION for each new event received over replication). + # + # Note that if the position hasn't advanced then we won't send anything. + if any(EventsStream.NAME == s.NAME for s in self.streams): + self.clock.looping_call(self.on_notifier_poke, 1000) + def on_notifier_poke(self): """Checks if there is actually any new data and sends it to the connections if there are. @@ -91,13 +110,23 @@ class ReplicationStreamer: This should get called each time new data is available, even if it is currently being executed, so that nothing gets missed """ - if not self.command_handler.connected(): + if not self.command_handler.connected() or not self.streams: # Don't bother if nothing is listening. We still need to advance # the stream tokens otherwise they'll fall behind forever for stream in self.streams: stream.discard_updates_and_advance() return + # We check up front to see if anything has actually changed, as we get + # poked because of changes that happened on other instances. + if all( + stream.last_token == stream.current_token(self._instance_name) + for stream in self.streams + ): + return + + # If there are updates then we need to set this even if we're already + # looping, as the loop needs to know that he might need to loop again. self.pending_updates = True if self.is_looping: @@ -136,6 +165,8 @@ class ReplicationStreamer: self._replication_torture_level / 1000.0 ) + last_token = stream.last_token + logger.debug( "Getting stream: %s: %s -> %s", stream.NAME, @@ -159,6 +190,30 @@ class ReplicationStreamer: ) stream_updates_counter.labels(stream.NAME).inc(len(updates)) + else: + # The token has advanced but there is no data to + # send, so we send a `POSITION` to inform other + # workers of the updated position. + if stream.NAME == EventsStream.NAME: + # XXX: We only do this for the EventStream as it + # turns out that e.g. account data streams share + # their "current token" with each other, meaning + # that it is *not* safe to send a POSITION. + logger.info( + "Sending position: %s -> %s", + stream.NAME, + current_token, + ) + self.command_handler.send_command( + PositionCommand( + stream.NAME, + self._instance_name, + last_token, + current_token, + ) + ) + continue + # Some streams return multiple rows with the same stream IDs, # we need to make sure they get sent out in batches. We do # this by setting the current token to all but the last of |