diff options
author | Erik Johnston <erikj@matrix.org> | 2023-11-16 12:32:17 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-16 12:32:17 +0000 |
commit | 898655fd1240138600c96cfa763603c3e5ca3e0e (patch) | |
tree | cf3fd89b9099f74df5414d6229e72d7fe0c44de2 /synapse/replication/tcp/handler.py | |
parent | Fix test not detecting tables with missing primary keys and missing replica i... (diff) | |
download | synapse-898655fd1240138600c96cfa763603c3e5ca3e0e.tar.xz |
More efficiently handle no-op POSITION (#16640)
We may receive `POSITION` commands where we already know that worker has advanced past that position, so there is no point in handling it.
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r-- | synapse/replication/tcp/handler.py | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index afd03137f0..1748182663 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -588,6 +588,21 @@ class ReplicationCommandHandler: logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line()) + # Check if we can early discard this position. We can only do so for + # connected streams. + stream = self._streams[cmd.stream_name] + if stream.can_discard_position( + cmd.instance_name, cmd.prev_token, cmd.new_token + ) and self.is_stream_connected(conn, cmd.stream_name): + logger.debug( + "Discarding redundant POSITION %s/%s %s %s", + cmd.instance_name, + cmd.stream_name, + cmd.prev_token, + cmd.new_token, + ) + return + self._add_command_to_stream_queue(conn, cmd) async def _process_position( @@ -599,6 +614,18 @@ class ReplicationCommandHandler: """ stream = self._streams[stream_name] + if stream.can_discard_position( + cmd.instance_name, cmd.prev_token, cmd.new_token + ) and self.is_stream_connected(conn, cmd.stream_name): + logger.debug( + "Discarding redundant POSITION %s/%s %s %s", + cmd.instance_name, + cmd.stream_name, + cmd.prev_token, + cmd.new_token, + ) + return + # We're about to go and catch up with the stream, so remove from set # of connected streams. for streams in self._streams_by_connection.values(): @@ -657,6 +684,13 @@ class ReplicationCommandHandler: self._streams_by_connection.setdefault(conn, set()).add(stream_name) + def is_stream_connected( + self, conn: IReplicationConnection, stream_name: str + ) -> bool: + """Return if stream has been successfully connected and is ready to + receive updates""" + return stream_name in self._streams_by_connection.get(conn, ()) + def on_REMOTE_SERVER_UP( self, conn: IReplicationConnection, cmd: RemoteServerUpCommand ) -> None: |