Also discard 'caches' and 'backfill' stream POSITIONS (#16655)
Follow on from #16640
1 files changed, 16 insertions, 0 deletions
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index cc34dfb322..1f6402c2da 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -305,6 +305,14 @@ class BackfillStream(Stream):
# which means we need to negate it.
return -self.store._backfill_id_gen.get_minimal_local_current_token()
+ def can_discard_position(
+ self, instance_name: str, prev_token: int, new_token: int
+ ) -> bool:
+ # Backfill stream can't go backwards, so we know we can ignore any
+ # positions where the tokens are from before the current token.
+
+ return new_token <= self.current_token(instance_name)
+
class PresenceStream(_StreamFromIdGen):
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -519,6 +527,14 @@ class CachesStream(Stream):
return self.store._cache_id_gen.get_minimal_local_current_token()
return self.current_token(self.local_instance_name)
+ def can_discard_position(
+ self, instance_name: str, prev_token: int, new_token: int
+ ) -> bool:
+ # Caches streams can't go backwards, so we know we can ignore any
+ # positions where the tokens are from before the current token.
+
+ return new_token <= self.current_token(instance_name)
+
class DeviceListsStream(_StreamFromIdGen):
"""Either a user has updated their devices or a remote server needs to be
|