From e4ab8676b4b5a3336ef49bb68a0e6dabbf030df4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Apr 2021 14:42:50 +0100 Subject: Fix tight loop handling presence replication. (#9900) Only affects workers. Introduced in #9819. Fixes #9899. --- synapse/handlers/presence.py | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 969c73c1e7..12df35f26e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -2026,18 +2026,40 @@ class PresenceFederationQueue: ) return result["updates"], result["upto_token"], result["limited"] + # If the from_token is the current token then there's nothing to return + # and we can trivially no-op. + if from_token == self._next_id - 1: + return [], upto_token, False + # We can find the correct position in the queue by noting that there is # exactly one entry per stream ID, and that the last entry has an ID of # `self._next_id - 1`, so we can count backwards from the end. # + # Since we are returning all states in the range `from_token < stream_id + # <= upto_token` we look for the index with a `stream_id` of `from_token + # + 1`. + # # Since the start of the queue is periodically truncated we need to # handle the case where `from_token` stream ID has already been dropped. - start_idx = max(from_token - self._next_id, -len(self._queue)) + start_idx = max(from_token + 1 - self._next_id, -len(self._queue)) to_send = [] # type: List[Tuple[int, Tuple[str, str]]] limited = False new_id = upto_token for _, stream_id, destinations, user_ids in self._queue[start_idx:]: + if stream_id <= from_token: + # Paranoia check that we are actually only sending states that + # are have stream_id strictly greater than from_token. We should + # never hit this. + logger.warning( + "Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)", + stream_id, + from_token, + self._next_id, + len(self._queue), + ) + continue + if stream_id > upto_token: break -- cgit 1.4.1