summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-07-01 12:48:36 +0100
committerErik Johnston <erik@matrix.org>2024-07-02 14:07:11 +0100
commit1609855ff8322e3d4d91f8aea322f9750ac24ba2 (patch)
tree0243a192103bd1405c6dc26651d1ccaa2f878d86 /synapse
parentFix regression when bounding future tokens (#17391) (diff)
downloadsynapse-1609855ff8322e3d4d91f8aea322f9750ac24ba2.tar.xz
Limit size of presence EDUs (#17371)
Otherwise they are unbounded.

---------

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/sender/per_destination_queue.py31
1 files changed, 20 insertions, 11 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index d9f2f017ed..9f1c2fe22a 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -21,6 +21,7 @@
 #
 import datetime
 import logging
+from collections import OrderedDict
 from types import TracebackType
 from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, Type
 
@@ -68,6 +69,10 @@ sent_edus_by_type = Counter(
 # If the retry interval is larger than this then we enter "catchup" mode
 CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000
 
+# Limit how many presence states we add to each presence EDU, to ensure that
+# they are bounded in size.
+MAX_PRESENCE_STATES_PER_EDU = 50
+
 
 class PerDestinationQueue:
     """
@@ -144,7 +149,7 @@ class PerDestinationQueue:
 
         # Map of user_id -> UserPresenceState of pending presence to be sent to this
         # destination
-        self._pending_presence: Dict[str, UserPresenceState] = {}
+        self._pending_presence: OrderedDict[str, UserPresenceState] = OrderedDict()
 
         # List of room_id -> receipt_type -> user_id -> receipt_dict,
         #
@@ -399,7 +404,7 @@ class PerDestinationQueue:
                 # through another mechanism, because this is all volatile!
                 self._pending_edus = []
                 self._pending_edus_keyed = {}
-                self._pending_presence = {}
+                self._pending_presence.clear()
                 self._pending_receipt_edus = []
 
                 self._start_catching_up()
@@ -721,22 +726,26 @@ class _TransactionQueueManager:
 
         # Add presence EDU.
         if self.queue._pending_presence:
+            # Only send max 50 presence entries in the EDU, to bound the amount
+            # of data we're sending.
+            presence_to_add: List[JsonDict] = []
+            while (
+                self.queue._pending_presence
+                and len(presence_to_add) < MAX_PRESENCE_STATES_PER_EDU
+            ):
+                _, presence = self.queue._pending_presence.popitem(last=False)
+                presence_to_add.append(
+                    format_user_presence_state(presence, self.queue._clock.time_msec())
+                )
+
             pending_edus.append(
                 Edu(
                     origin=self.queue._server_name,
                     destination=self.queue._destination,
                     edu_type=EduTypes.PRESENCE,
-                    content={
-                        "push": [
-                            format_user_presence_state(
-                                presence, self.queue._clock.time_msec()
-                            )
-                            for presence in self.queue._pending_presence.values()
-                        ]
-                    },
+                    content={"push": presence_to_add},
                 )
             )
-            self.queue._pending_presence = {}
 
         # Add read receipt EDUs.
         pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))