diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index d9f2f017ed..9f1c2fe22a 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -21,6 +21,7 @@
#
import datetime
import logging
+from collections import OrderedDict
from types import TracebackType
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type
@@ -68,6 +69,10 @@ sent_edus_by_type = Counter(
# If the retry interval is larger than this then we enter "catchup" mode
CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
+# Limit how many presence states we add to each presence EDU, to ensure that
+# they are bounded in size.
+MAX_PRESENCE_STATES_PER_EDU = 50
+
class PerDestinationQueue:
"""
@@ -144,7 +149,7 @@ class PerDestinationQueue:
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
- self._pending_presence: Dict[str, UserPresenceState] = {}
+ self._pending_presence: OrderedDict[str, UserPresenceState] = OrderedDict()
# List of room_id -> receipt_type -> user_id -> receipt_dict,
#
@@ -399,7 +404,7 @@ class PerDestinationQueue:
# through another mechanism, because this is all volatile!
self._pending_edus = []
self._pending_edus_keyed = {}
- self._pending_presence = {}
+ self._pending_presence.clear()
self._pending_receipt_edus = []
self._start_catching_up()
@@ -721,22 +726,26 @@ class _TransactionQueueManager:
# Add presence EDU.
if self.queue._pending_presence:
+ # Only send max 50 presence entries in the EDU, to bound the amount
+ # of data we're sending.
+ presence_to_add: List[JsonDict] = []
+ while (
+ self.queue._pending_presence
+ and len(presence_to_add) < MAX_PRESENCE_STATES_PER_EDU
+ ):
+ _, presence = self.queue._pending_presence.popitem(last=False)
+ presence_to_add.append(
+ format_user_presence_state(presence, self.queue._clock.time_msec())
+ )
+
pending_edus.append(
Edu(
origin=self.queue._server_name,
destination=self.queue._destination,
edu_type=EduTypes.PRESENCE,
- content={
- "push": [
- format_user_presence_state(
- presence, self.queue._clock.time_msec()
- )
- for presence in self.queue._pending_presence.values()
- ]
- },
+ content={"push": presence_to_add},
)
)
- self.queue._pending_presence = {}
# Add read receipt EDUs.
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
|