diff --git a/changelog.d/10398.misc b/changelog.d/10398.misc
new file mode 100644
index 0000000000..326e54655a
--- /dev/null
+++ b/changelog.d/10398.misc
@@ -0,0 +1 @@
+Stagger sending of presence update to remote servers, reducing CPU spikes caused by starting many connections to remote servers at once.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 0960f033bc..d980e0d986 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,9 +14,12 @@
import abc
import logging
+from collections import OrderedDict
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
+import attr
from prometheus_client import Counter
+from typing_extensions import Literal
from twisted.internet import defer
@@ -33,8 +36,12 @@ from synapse.metrics import (
event_processing_loop_room_count,
events_processed_counter,
)
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.util import Clock
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -137,6 +144,84 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
+@attr.s
+class _PresenceQueue:
+ """A queue of destinations that need to be woken up due to new presence
+ updates.
+
+ Staggers waking up of per destination queues to ensure that we don't attempt
+ to start TLS connections with many hosts all at once, leading to pinned CPU.
+ """
+
+ # The maximum duration in seconds between queuing up a destination and it
+ # being woken up.
+ _MAX_TIME_IN_QUEUE = 30.0
+
+ # The maximum duration in seconds between waking up consecutive destination
+ # queues.
+ _MAX_DELAY = 0.1
+
+ sender: "FederationSender" = attr.ib()
+ clock: Clock = attr.ib()
+ queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
+ processing: bool = attr.ib(default=False)
+
+ def add_to_queue(self, destination: str) -> None:
+ """Add a destination to the queue to be woken up."""
+
+ self.queue[destination] = None
+
+ if not self.processing:
+ self._handle()
+
+ @wrap_as_background_process("_PresenceQueue.handle")
+ async def _handle(self) -> None:
+ """Background process to drain the queue."""
+
+ if not self.queue:
+ return
+
+ assert not self.processing
+ self.processing = True
+
+ try:
+ # We start with a delay that should drain the queue quickly enough that
+ # we process all destinations in the queue in _MAX_TIME_IN_QUEUE
+ # seconds.
+ #
+ # We also add an upper bound to the delay, to gracefully handle the
+ # case where the queue only has a few entries in it.
+ current_sleep_seconds = min(
+ self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
+ )
+
+ while self.queue:
+ destination, _ = self.queue.popitem(last=False)
+
+ queue = self.sender._get_per_destination_queue(destination)
+
+ if not queue._new_data_to_send:
+ # The per destination queue has already been woken up.
+ continue
+
+ queue.attempt_new_transaction()
+
+ await self.clock.sleep(current_sleep_seconds)
+
+ if not self.queue:
+ break
+
+ # More destinations may have been added to the queue, so we may
+ # need to reduce the delay to ensure everything gets processed
+ # within _MAX_TIME_IN_QUEUE seconds.
+ current_sleep_seconds = min(
+ current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue)
+ )
+
+ finally:
+ self.processing = False
+
+
class FederationSender(AbstractFederationSender):
def __init__(self, hs: "HomeServer"):
self.hs = hs
@@ -208,6 +293,8 @@ class FederationSender(AbstractFederationSender):
self._external_cache = hs.get_external_cache()
+ self._presence_queue = _PresenceQueue(self, self.clock)
+
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
@@ -517,7 +604,12 @@ class FederationSender(AbstractFederationSender):
self._instance_name, destination
):
continue
- self._get_per_destination_queue(destination).send_presence(states)
+
+ self._get_per_destination_queue(destination).send_presence(
+ states, start_loop=False
+ )
+
+ self._presence_queue.add_to_queue(destination)
def build_and_send_edu(
self,
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index d06a3aff19..c11d1f6d31 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -171,14 +171,24 @@ class PerDestinationQueue:
self.attempt_new_transaction()
- def send_presence(self, states: Iterable[UserPresenceState]) -> None:
- """Add presence updates to the queue. Start the transmission loop if necessary.
+ def send_presence(
+ self, states: Iterable[UserPresenceState], start_loop: bool = True
+ ) -> None:
+ """Add presence updates to the queue.
+
+ Args:
+ states: Presence updates to send
+ start_loop: Whether to start the transmission loop if not already
+ running.
Args:
states: presence to send
"""
self._pending_presence.update({state.user_id: state for state in states})
- self.attempt_new_transaction()
+ self._new_data_to_send = True
+
+ if start_loop:
+ self.attempt_new_transaction()
def queue_read_receipt(self, receipt: ReadReceipt) -> None:
"""Add a RR to the list to be sent. Doesn't start the transmission loop yet
diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py
index c4ad33194d..3f41e99950 100644
--- a/tests/events/test_presence_router.py
+++ b/tests/events/test_presence_router.py
@@ -285,6 +285,10 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
presence_updates, _ = sync_presence(self, self.presence_receiving_user_two_id)
self.assertEqual(len(presence_updates), 3)
+ # We stagger sending of presence, so we need to wait a bit for them to
+ # get sent out.
+ self.reactor.advance(60)
+
# Test that sending to a remote user works
remote_user_id = "@far_away_person:island"
@@ -301,6 +305,10 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
self.module_api.send_local_online_presence_to([remote_user_id])
)
+ # We stagger sending of presence, so we need to wait a bit for them to
+ # get sent out.
+ self.reactor.advance(60)
+
# Check that the expected presence updates were sent
# We explicitly compare using sets as we expect that calling
# module_api.send_local_online_presence_to will create a presence
|