summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/handler.py72
-rw-r--r--synapse/replication/tcp/redis.py2
-rw-r--r--synapse/replication/tcp/resource.py17
-rw-r--r--synapse/replication/tcp/streams/_base.py18
4 files changed, 88 insertions, 21 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py

index afd03137f0..c14a18ba2e 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -257,6 +257,11 @@ class ReplicationCommandHandler: if hs.config.redis.redis_enabled: self._notifier.add_lock_released_callback(self.on_lock_released) + # Marks if we should send POSITION commands for all streams ASAP. This + # is checked by the `ReplicationStreamer` which manages sending + # RDATA/POSITION commands + self._should_announce_positions = True + def subscribe_to_channel(self, channel_name: str) -> None: """ Indicates that we wish to subscribe to a Redis channel by name. @@ -397,29 +402,23 @@ class ReplicationCommandHandler: return self._streams_to_replicate def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None: - self.send_positions_to_connection(conn) + self.send_positions_to_connection() - def send_positions_to_connection(self, conn: IReplicationConnection) -> None: + def send_positions_to_connection(self) -> None: """Send current position of all streams this process is source of to the connection. """ - # We respond with current position of all streams this instance - # replicates. - for stream in self.get_streams_to_replicate(): - # Note that we use the current token as the prev token here (rather - # than stream.last_token), as we can't be sure that there have been - # no rows written between last token and the current token (since we - # might be racing with the replication sending bg process). - current_token = stream.current_token(self._instance_name) - self.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - current_token, - current_token, - ) - ) + self._should_announce_positions = True + self._notifier.notify_replication() + + def should_announce_positions(self) -> bool: + """Check if we should send POSITION commands for all streams ASAP.""" + return self._should_announce_positions + + def will_announce_positions(self) -> None: + """Mark that we're about to send POSITIONs out for all streams.""" + self._should_announce_positions = False def on_USER_SYNC( self, conn: IReplicationConnection, cmd: UserSyncCommand @@ -588,6 +587,21 @@ class ReplicationCommandHandler: logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line()) + # Check if we can early discard this position. We can only do so for + # connected streams. + stream = self._streams[cmd.stream_name] + if stream.can_discard_position( + cmd.instance_name, cmd.prev_token, cmd.new_token + ) and self.is_stream_connected(conn, cmd.stream_name): + logger.debug( + "Discarding redundant POSITION %s/%s %s %s", + cmd.instance_name, + cmd.stream_name, + cmd.prev_token, + cmd.new_token, + ) + return + self._add_command_to_stream_queue(conn, cmd) async def _process_position( @@ -599,6 +613,18 @@ class ReplicationCommandHandler: """ stream = self._streams[stream_name] + if stream.can_discard_position( + cmd.instance_name, cmd.prev_token, cmd.new_token + ) and self.is_stream_connected(conn, cmd.stream_name): + logger.debug( + "Discarding redundant POSITION %s/%s %s %s", + cmd.instance_name, + cmd.stream_name, + cmd.prev_token, + cmd.new_token, + ) + return + # We're about to go and catch up with the stream, so remove from set # of connected streams. for streams in self._streams_by_connection.values(): @@ -626,8 +652,9 @@ class ReplicationCommandHandler: # for why this can happen. logger.info( - "Fetching replication rows for '%s' between %i and %i", + "Fetching replication rows for '%s' / %s between %i and %i", stream_name, + cmd.instance_name, current_token, cmd.new_token, ) @@ -657,6 +684,13 @@ class ReplicationCommandHandler: self._streams_by_connection.setdefault(conn, set()).add(stream_name) + def is_stream_connected( + self, conn: IReplicationConnection, stream_name: str + ) -> bool: + """Return if stream has been successfully connected and is ready to + receive updates""" + return stream_name in self._streams_by_connection.get(conn, ()) + def on_REMOTE_SERVER_UP( self, conn: IReplicationConnection, cmd: RemoteServerUpCommand ) -> None: diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 7e96145b3b..1fa37bb888 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py
@@ -141,7 +141,7 @@ class RedisSubscriber(SubscriberProtocol): # We send out our positions when there is a new connection in case the # other side missed updates. We do this for Redis connections as the # otherside won't know we've connected and so won't issue a REPLICATE. - self.synapse_handler.send_positions_to_connection(self) + self.synapse_handler.send_positions_to_connection() def messageReceived(self, pattern: str, channel: str, message: str) -> None: """Received a message from redis.""" diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 38abb5df54..d15828f2d3 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py
@@ -123,7 +123,7 @@ class ReplicationStreamer: # We check up front to see if anything has actually changed, as we get # poked because of changes that happened on other instances. - if all( + if not self.command_handler.should_announce_positions() and all( stream.last_token == stream.current_token(self._instance_name) for stream in self.streams ): @@ -158,6 +158,21 @@ class ReplicationStreamer: all_streams = list(all_streams) random.shuffle(all_streams) + if self.command_handler.should_announce_positions(): + # We need to send out POSITIONs for all streams, usually + # because a worker has reconnected. + self.command_handler.will_announce_positions() + + for stream in all_streams: + self.command_handler.send_command( + PositionCommand( + stream.NAME, + self._instance_name, + stream.last_token, + stream.last_token, + ) + ) + for stream in all_streams: if stream.last_token == stream.current_token( self._instance_name diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 58a44029aa..cc34dfb322 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -144,6 +144,16 @@ class Stream: """ raise NotImplementedError() + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + """Whether or not a position command for this stream can be discarded. + + Useful for streams that can never go backwards and where we already know + the stream ID for the instance has advanced. + """ + return False + def discard_updates_and_advance(self) -> None: """Called when the stream should advance but the updates would be discarded, e.g. when there are no currently connected workers. @@ -221,6 +231,14 @@ class _StreamFromIdGen(Stream): def minimal_local_current_token(self) -> Token: return self._stream_id_gen.get_minimal_local_current_token() + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # These streams can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + + return new_token <= self.current_token(instance_name) + def current_token_without_instance( current_token: Callable[[], int]