diff --git a/changelog.d/9900.bugfix b/changelog.d/9900.bugfix
new file mode 100644
index 0000000000..a8470fca3f
--- /dev/null
+++ b/changelog.d/9900.bugfix
@@ -0,0 +1 @@
+Fix tight loop handling presence replication when using workers. Introduced in v1.33.0rc1.
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e9f618bb5a..ebbc234334 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -2045,18 +2045,40 @@ class PresenceFederationQueue:
)
return result["updates"], result["upto_token"], result["limited"]
+ # If the from_token is the current token then there's nothing to return
+ # and we can trivially no-op.
+ if from_token == self._next_id - 1:
+ return [], upto_token, False
+
# We can find the correct position in the queue by noting that there is
# exactly one entry per stream ID, and that the last entry has an ID of
# `self._next_id - 1`, so we can count backwards from the end.
#
+ # Since we are returning all states in the range `from_token < stream_id
+ # <= upto_token` we look for the index with a `stream_id` of `from_token
+ # + 1`.
+ #
# Since the start of the queue is periodically truncated we need to
# handle the case where `from_token` stream ID has already been dropped.
- start_idx = max(from_token - self._next_id, -len(self._queue))
+ start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
limited = False
new_id = upto_token
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
+ if stream_id <= from_token:
+ # Paranoia check that we are actually only sending states that
+ # are have stream_id strictly greater than from_token. We should
+ # never hit this.
+ logger.warning(
+ "Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
+ stream_id,
+ from_token,
+ self._next_id,
+ len(self._queue),
+ )
+ continue
+
if stream_id > upto_token:
break
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 61271cd084..ce330e79cc 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -509,6 +509,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
self.assertCountEqual(rows, expected_rows)
+ now_token = self.queue.get_current_token(self.instance_name)
+ rows, upto_token, limited = self.get_success(
+ self.queue.get_replication_rows("master", upto_token, now_token, 10)
+ )
+ self.assertEqual(upto_token, now_token)
+ self.assertFalse(limited)
+ self.assertCountEqual(rows, [])
+
def test_send_and_get_split(self):
state1 = UserPresenceState.default("@user1:test")
state2 = UserPresenceState.default("@user2:test")
@@ -538,6 +546,20 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
self.assertCountEqual(rows, expected_rows)
+ now_token = self.queue.get_current_token(self.instance_name)
+ rows, upto_token, limited = self.get_success(
+ self.queue.get_replication_rows("master", upto_token, now_token, 10)
+ )
+
+ self.assertEqual(upto_token, now_token)
+ self.assertFalse(limited)
+
+ expected_rows = [
+ (2, ("dest3", "@user3:test")),
+ ]
+
+ self.assertCountEqual(rows, expected_rows)
+
def test_clear_queue_all(self):
state1 = UserPresenceState.default("@user1:test")
state2 = UserPresenceState.default("@user2:test")
|