summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-11-16 16:27:21 +0000
committerErik Johnston <erik@matrix.org>2023-11-16 16:27:21 +0000
commitb20bdd3997227dd74403ce39977a37a3b89762ed (patch)
treeb35f4771a535bbbd7fe8955c2c238b62f0ad5f71 /synapse/replication/tcp/handler.py
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentSpeed up deleting device messages (#16643) (diff)
downloadsynapse-b20bdd3997227dd74403ce39977a37a3b89762ed.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py72
1 files changed, 53 insertions, 19 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py

index afd03137f0..c14a18ba2e 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -257,6 +257,11 @@ class ReplicationCommandHandler: if hs.config.redis.redis_enabled: self._notifier.add_lock_released_callback(self.on_lock_released) + # Marks if we should send POSITION commands for all streams ASAP. This + # is checked by the `ReplicationStreamer` which manages sending + # RDATA/POSITION commands + self._should_announce_positions = True + def subscribe_to_channel(self, channel_name: str) -> None: """ Indicates that we wish to subscribe to a Redis channel by name. @@ -397,29 +402,23 @@ class ReplicationCommandHandler: return self._streams_to_replicate def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None: - self.send_positions_to_connection(conn) + self.send_positions_to_connection() - def send_positions_to_connection(self, conn: IReplicationConnection) -> None: + def send_positions_to_connection(self) -> None: """Send current position of all streams this process is source of to the connection. """ - # We respond with current position of all streams this instance - # replicates. - for stream in self.get_streams_to_replicate(): - # Note that we use the current token as the prev token here (rather - # than stream.last_token), as we can't be sure that there have been - # no rows written between last token and the current token (since we - # might be racing with the replication sending bg process). - current_token = stream.current_token(self._instance_name) - self.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - current_token, - current_token, - ) - ) + self._should_announce_positions = True + self._notifier.notify_replication() + + def should_announce_positions(self) -> bool: + """Check if we should send POSITION commands for all streams ASAP.""" + return self._should_announce_positions + + def will_announce_positions(self) -> None: + """Mark that we're about to send POSITIONs out for all streams.""" + self._should_announce_positions = False def on_USER_SYNC( self, conn: IReplicationConnection, cmd: UserSyncCommand @@ -588,6 +587,21 @@ class ReplicationCommandHandler: logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line()) + # Check if we can early discard this position. We can only do so for + # connected streams. + stream = self._streams[cmd.stream_name] + if stream.can_discard_position( + cmd.instance_name, cmd.prev_token, cmd.new_token + ) and self.is_stream_connected(conn, cmd.stream_name): + logger.debug( + "Discarding redundant POSITION %s/%s %s %s", + cmd.instance_name, + cmd.stream_name, + cmd.prev_token, + cmd.new_token, + ) + return + self._add_command_to_stream_queue(conn, cmd) async def _process_position( @@ -599,6 +613,18 @@ class ReplicationCommandHandler: """ stream = self._streams[stream_name] + if stream.can_discard_position( + cmd.instance_name, cmd.prev_token, cmd.new_token + ) and self.is_stream_connected(conn, cmd.stream_name): + logger.debug( + "Discarding redundant POSITION %s/%s %s %s", + cmd.instance_name, + cmd.stream_name, + cmd.prev_token, + cmd.new_token, + ) + return + # We're about to go and catch up with the stream, so remove from set # of connected streams. for streams in self._streams_by_connection.values(): @@ -626,8 +652,9 @@ class ReplicationCommandHandler: # for why this can happen. logger.info( - "Fetching replication rows for '%s' between %i and %i", + "Fetching replication rows for '%s' / %s between %i and %i", stream_name, + cmd.instance_name, current_token, cmd.new_token, ) @@ -657,6 +684,13 @@ class ReplicationCommandHandler: self._streams_by_connection.setdefault(conn, set()).add(stream_name) + def is_stream_connected( + self, conn: IReplicationConnection, stream_name: str + ) -> bool: + """Return if stream has been successfully connected and is ready to + receive updates""" + return stream_name in self._streams_by_connection.get(conn, ()) + def on_REMOTE_SERVER_UP( self, conn: IReplicationConnection, cmd: RemoteServerUpCommand ) -> None: