1 files changed, 18 insertions, 0 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 58a44029aa..cc34dfb322 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -144,6 +144,16 @@ class Stream:
"""
raise NotImplementedError()
+ def can_discard_position(
+ self, instance_name: str, prev_token: int, new_token: int
+ ) -> bool:
+ """Whether or not a position command for this stream can be discarded.
+
+ Useful for streams that can never go backwards and where we already know
+ the stream ID for the instance has advanced.
+ """
+ return False
+
def discard_updates_and_advance(self) -> None:
"""Called when the stream should advance but the updates would be discarded,
e.g. when there are no currently connected workers.
@@ -221,6 +231,14 @@ class _StreamFromIdGen(Stream):
def minimal_local_current_token(self) -> Token:
return self._stream_id_gen.get_minimal_local_current_token()
+ def can_discard_position(
+ self, instance_name: str, prev_token: int, new_token: int
+ ) -> bool:
+ # These streams can't go backwards, so we know we can ignore any
+ # positions where the tokens are from before the current token.
+
+ return new_token <= self.current_token(instance_name)
+
def current_token_without_instance(
current_token: Callable[[], int]
|