summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/presence.py24
1 files changed, 23 insertions, 1 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 969c73c1e7..12df35f26e 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -2026,18 +2026,40 @@ class PresenceFederationQueue:
             )
             return result["updates"], result["upto_token"], result["limited"]
 
+        # If the from_token is the current token then there's nothing to return
+        # and we can trivially no-op.
+        if from_token == self._next_id - 1:
+            return [], upto_token, False
+
         # We can find the correct position in the queue by noting that there is
         # exactly one entry per stream ID, and that the last entry has an ID of
         # `self._next_id - 1`, so we can count backwards from the end.
         #
+        # Since we are returning all states in the range `from_token < stream_id
+        # <= upto_token` we look for the index with a `stream_id` of `from_token
+        # + 1`.
+        #
         # Since the start of the queue is periodically truncated we need to
         # handle the case where `from_token` stream ID has already been dropped.
-        start_idx = max(from_token - self._next_id, -len(self._queue))
+        start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
 
         to_send = []  # type: List[Tuple[int, Tuple[str, str]]]
         limited = False
         new_id = upto_token
         for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
+            if stream_id <= from_token:
+                # Paranoia check that we are actually only sending states that
+                # are have stream_id strictly greater than from_token. We should
+                # never hit this.
+                logger.warning(
+                    "Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
+                    stream_id,
+                    from_token,
+                    self._next_id,
+                    len(self._queue),
+                )
+                continue
+
             if stream_id > upto_token:
                 break