summary refs log tree commit diff
path: root/synapse/replication/tcp/streams/_base.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-11-20 10:09:33 +0000
committerErik Johnston <erik@matrix.org>2023-11-20 10:09:33 +0000
commit9c3b906b3aa13d83f3dc5eb080dc7e77d6d00511 (patch)
tree69de0f066bc3f30c6d70aedb08f8f8d612a44d38 /synapse/replication/tcp/streams/_base.py
parentMerge remote-tracking branch 'origin/develop' into matrix-org-hotfixes (diff)
parentSpeed up how quickly we launch new tasks (#16660) (diff)
downloadsynapse-9c3b906b3aa13d83f3dc5eb080dc7e77d6d00511.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/replication/tcp/streams/_base.py')
-rw-r--r--synapse/replication/tcp/streams/_base.py16
1 files changed, 16 insertions, 0 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py

index cc34dfb322..1f6402c2da 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py
@@ -305,6 +305,14 @@ class BackfillStream(Stream): # which means we need to negate it. return -self.store._backfill_id_gen.get_minimal_local_current_token() + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # Backfill stream can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + + return new_token <= self.current_token(instance_name) + class PresenceStream(_StreamFromIdGen): @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -519,6 +527,14 @@ class CachesStream(Stream): return self.store._cache_id_gen.get_minimal_local_current_token() return self.current_token(self.local_instance_name) + def can_discard_position( + self, instance_name: str, prev_token: int, new_token: int + ) -> bool: + # Caches streams can't go backwards, so we know we can ignore any + # positions where the tokens are from before the current token. + + return new_token <= self.current_token(instance_name) + class DeviceListsStream(_StreamFromIdGen): """Either a user has updated their devices or a remote server needs to be