diff options
author | Erik Johnston <erikj@matrix.org> | 2023-11-16 12:32:17 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-16 12:32:17 +0000 |
commit | 898655fd1240138600c96cfa763603c3e5ca3e0e (patch) | |
tree | cf3fd89b9099f74df5414d6229e72d7fe0c44de2 /synapse/replication/tcp/streams/_base.py | |
parent | Fix test not detecting tables with missing primary keys and missing replica i... (diff) | |
download | synapse-898655fd1240138600c96cfa763603c3e5ca3e0e.tar.xz |
More efficiently handle no-op POSITION (#16640)
We may receive `POSITION` commands where we already know that worker has advanced past that position, so there is no point in handling it.
Diffstat (limited to '')
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 18 |
1 files changed, 18 insertions, 0 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 58a44029aa..cc34dfb322 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -144,6 +144,16 @@ class Stream: """ raise NotImplementedError() + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + """Whether or not a position command for this stream can be discarded. + + Useful for streams that can never go backwards and where we already know + the stream ID for the instance has advanced. + """ + return False + def discard_updates_and_advance(self) -> None: """Called when the stream should advance but the updates would be discarded, e.g. when there are no currently connected workers. @@ -221,6 +231,14 @@ class _StreamFromIdGen(Stream): def minimal_local_current_token(self) -> Token: return self._stream_id_gen.get_minimal_local_current_token() + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # These streams can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + + return new_token <= self.current_token(instance_name) + def current_token_without_instance( current_token: Callable[[], int] |