diff options
author | Erik Johnston <erikj@matrix.org> | 2023-11-17 14:14:29 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-11-17 14:14:29 +0000 |
commit | 6fec2d035f5584335af7c1b115ca48a13a8da5fa (patch) | |
tree | a3db760b81af5004c7be83ae3c700d6b469ef899 | |
parent | Merge branch 'master' into develop (diff) | |
download | synapse-6fec2d035f5584335af7c1b115ca48a13a8da5fa.tar.xz |
Also discard 'caches' and 'backfill' stream POSITIONS (#16655)
Follow on from #16640
-rw-r--r-- | changelog.d/16655.misc | 1 | ||||
-rw-r--r-- | synapse/replication/tcp/streams/_base.py | 16 |
2 files changed, 17 insertions, 0 deletions
diff --git a/changelog.d/16655.misc b/changelog.d/16655.misc new file mode 100644 index 0000000000..3b1cc2185d --- /dev/null +++ b/changelog.d/16655.misc @@ -0,0 +1 @@ +More efficiently handle no-op `POSITION` over replication. diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index cc34dfb322..1f6402c2da 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -305,6 +305,14 @@ class BackfillStream(Stream): # which means we need to negate it. return -self.store._backfill_id_gen.get_minimal_local_current_token() + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # Backfill stream can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + + return new_token <= self.current_token(instance_name) + class PresenceStream(_StreamFromIdGen): @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -519,6 +527,14 @@ class CachesStream(Stream): return self.store._cache_id_gen.get_minimal_local_current_token() return self.current_token(self.local_instance_name) + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # Caches streams can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + + return new_token <= self.current_token(instance_name) + class DeviceListsStream(_StreamFromIdGen): """Either a user has updated their devices or a remote server needs to be |