summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-11-16 12:32:17 +0000
committerGitHub <noreply@github.com>2023-11-16 12:32:17 +0000
commit898655fd1240138600c96cfa763603c3e5ca3e0e (patch)
treecf3fd89b9099f74df5414d6229e72d7fe0c44de2 /synapse/replication/tcp/handler.py
parentFix test not detecting tables with missing primary keys and missing replica i... (diff)
downloadsynapse-898655fd1240138600c96cfa763603c3e5ca3e0e.tar.xz
More efficiently handle no-op POSITION (#16640)
We may receive `POSITION` commands where we already know that worker has
advanced past that position, so there is no point in handling it.
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py34
1 files changed, 34 insertions, 0 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index afd03137f0..1748182663 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -588,6 +588,21 @@ class ReplicationCommandHandler:
 
         logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
 
+        # Check if we can early discard this position. We can only do so for
+        # connected streams.
+        stream = self._streams[cmd.stream_name]
+        if stream.can_discard_position(
+            cmd.instance_name, cmd.prev_token, cmd.new_token
+        ) and self.is_stream_connected(conn, cmd.stream_name):
+            logger.debug(
+                "Discarding redundant POSITION %s/%s %s %s",
+                cmd.instance_name,
+                cmd.stream_name,
+                cmd.prev_token,
+                cmd.new_token,
+            )
+            return
+
         self._add_command_to_stream_queue(conn, cmd)
 
     async def _process_position(
@@ -599,6 +614,18 @@ class ReplicationCommandHandler:
         """
         stream = self._streams[stream_name]
 
+        if stream.can_discard_position(
+            cmd.instance_name, cmd.prev_token, cmd.new_token
+        ) and self.is_stream_connected(conn, cmd.stream_name):
+            logger.debug(
+                "Discarding redundant POSITION %s/%s %s %s",
+                cmd.instance_name,
+                cmd.stream_name,
+                cmd.prev_token,
+                cmd.new_token,
+            )
+            return
+
         # We're about to go and catch up with the stream, so remove from set
         # of connected streams.
         for streams in self._streams_by_connection.values():
@@ -657,6 +684,13 @@ class ReplicationCommandHandler:
 
         self._streams_by_connection.setdefault(conn, set()).add(stream_name)
 
+    def is_stream_connected(
+        self, conn: IReplicationConnection, stream_name: str
+    ) -> bool:
+        """Return if stream has been successfully connected and is ready to
+        receive updates"""
+        return stream_name in self._streams_by_connection.get(conn, ())
+
     def on_REMOTE_SERVER_UP(
         self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
     ) -> None: