diff --git a/synapse/__init__.py b/synapse/__init__.py
index fbd49a93e1..5bbaa62de2 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -47,7 +47,7 @@ try:
except ImportError:
pass
-__version__ = "1.32.2"
+__version__ = "1.33.0rc1"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 088260c2e9..deb40f4610 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,16 +14,7 @@
import abc
import logging
-from typing import (
- TYPE_CHECKING,
- Dict,
- Hashable,
- Iterable,
- List,
- Optional,
- Set,
- Tuple,
-)
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
from prometheus_client import Counter
@@ -35,10 +26,7 @@ from synapse.events import EventBase
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
from synapse.federation.sender.transaction_manager import TransactionManager
from synapse.federation.units import Edu
-from synapse.logging.context import (
- make_deferred_yieldable,
- run_in_background,
-)
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics import (
LaterGauge,
event_processing_loop_counter,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 969c73c1e7..12df35f26e 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -2026,18 +2026,40 @@ class PresenceFederationQueue:
)
return result["updates"], result["upto_token"], result["limited"]
+ # If the from_token is the current token then there's nothing to return
+ # and we can trivially no-op.
+ if from_token == self._next_id - 1:
+ return [], upto_token, False
+
# We can find the correct position in the queue by noting that there is
# exactly one entry per stream ID, and that the last entry has an ID of
# `self._next_id - 1`, so we can count backwards from the end.
#
+ # Since we are returning all states in the range `from_token < stream_id
+ # <= upto_token` we look for the index with a `stream_id` of `from_token
+ # + 1`.
+ #
# Since the start of the queue is periodically truncated we need to
# handle the case where `from_token` stream ID has already been dropped.
- start_idx = max(from_token - self._next_id, -len(self._queue))
+ start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
to_send = [] # type: List[Tuple[int, Tuple[str, str]]]
limited = False
new_id = upto_token
for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
+ if stream_id <= from_token:
+ # Paranoia check that we are actually only sending states that
+ # are have stream_id strictly greater than from_token. We should
+ # never hit this.
+ logger.warning(
+ "Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
+ stream_id,
+ from_token,
+ self._next_id,
+ len(self._queue),
+ )
+ continue
+
if stream_id > upto_token:
break
|