diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e9f618bb5a..ebbc234334 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -2045,18 +2045,40 @@ class PresenceFederationQueue:
)
return result["updates"], result["upto_token"], result["limited"]
+ # If the from_token is the current token then there's nothing to return
+ # and we can trivially no-op.
+ if from_token == self._next_id - 1:
+ return [], upto_token, False
+
# We can find the correct position in the queue by noting that there is
# exactly one entry per stream ID, and that the last entry has an ID of
# `self._next_id - 1`, so we can count backwards from the end.
#
+ # Since we are returning all states in the range `from_token < stream_id
+ # <= upto_token` we look for the index with a `stream_id` of `from_token
+ # + 1`.
+ #
# Since the start of the queue is periodically truncated we need to
# handle the case where `from_token` stream ID has already been dropped.
- start_idx = max(from_token - self._next_id, -len(self._queue))
+ start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
limited = False
new_id = upto_token
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
+ if stream_id <= from_token:
+ # Paranoia check that we are actually only sending states that
+ # are have stream_id strictly greater than from_token. We should
+ # never hit this.
+ logger.warning(
+ "Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
+ stream_id,
+ from_token,
+ self._next_id,
+ len(self._queue),
+ )
+ continue
+
if stream_id > upto_token:
break
|