summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/_base.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-11-16 12:32:17 +0000
committerGitHub <noreply@github.com>2023-11-16 12:32:17 +0000
commit898655fd1240138600c96cfa763603c3e5ca3e0e (patch)
treecf3fd89b9099f74df5414d6229e72d7fe0c44de2 /synapse/replication/tcp/streams/_base.py
parentFix test not detecting tables with missing primary keys and missing replica i... (diff)
downloadsynapse-898655fd1240138600c96cfa763603c3e5ca3e0e.tar.xz
More efficiently handle no-op POSITION (#16640)
We may receive `POSITION` commands where we already know that worker has
advanced past that position, so there is no point in handling it.
Diffstat (limited to '')
-rw-r--r--synapse/replication/tcp/streams/_base.py18
1 files changed, 18 insertions, 0 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 58a44029aa..cc34dfb322 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -144,6 +144,16 @@ class Stream:
         """
         raise NotImplementedError()
 
+    def can_discard_position(
+        self, instance_name: str, prev_token: int, new_token: int
+    ) -> bool:
+        """Whether or not a position command for this stream can be discarded.
+
+        Useful for streams that can never go backwards and where we already know
+        the stream ID for the instance has advanced.
+        """
+        return False
+
     def discard_updates_and_advance(self) -> None:
         """Called when the stream should advance but the updates would be discarded,
         e.g. when there are no currently connected workers.
@@ -221,6 +231,14 @@ class _StreamFromIdGen(Stream):
     def minimal_local_current_token(self) -> Token:
         return self._stream_id_gen.get_minimal_local_current_token()
 
+    def can_discard_position(
+        self, instance_name: str, prev_token: int, new_token: int
+    ) -> bool:
+        # These streams can't go backwards, so we know we can ignore any
+        # positions where the tokens are from before the current token.
+
+        return new_token <= self.current_token(instance_name)
+
 
 def current_token_without_instance(
     current_token: Callable[[], int]