Reduce amount of caches POSITIONS we send (#16561)
Follow on from / actually correctly does #16557
2 files changed, 11 insertions, 0 deletions
diff --git a/changelog.d/16561.bugfix b/changelog.d/16561.bugfix
new file mode 100644
index 0000000000..4f4a0380cd
--- /dev/null
+++ b/changelog.d/16561.bugfix
@@ -0,0 +1 @@
+Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 5c4d228f3d..58a44029aa 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -161,6 +161,14 @@ class Stream:
and `limited` is whether there are more updates to fetch.
"""
current_token = self.current_token(self.local_instance_name)
+
+ # If the minimum current token for the local instance is less than or
+ # equal to the last thing we published, we know that there are no
+ # updates.
+ if self.last_token >= self.minimal_local_current_token():
+ self.last_token = current_token
+ return [], current_token, False
+
updates, current_token, limited = await self.get_updates_since(
self.local_instance_name, self.last_token, current_token
)
@@ -489,6 +497,8 @@ class CachesStream(Stream):
return self.store.get_cache_stream_token_for_writer(instance_name)
def minimal_local_current_token(self) -> Token:
+ if self.store._cache_id_gen:
+ return self.store._cache_id_gen.get_minimal_local_current_token()
return self.current_token(self.local_instance_name)
|