diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index deb40f4610..d980e0d986 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,9 +14,12 @@
import abc
import logging
+from collections import OrderedDict
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
+import attr
from prometheus_client import Counter
+from typing_extensions import Literal
from twisted.internet import defer
@@ -33,8 +36,12 @@ from synapse.metrics import (
event_processing_loop_room_count,
events_processed_counter,
)
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
+from synapse.util import Clock
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -137,6 +144,84 @@ class AbstractFederationSender(metaclass=abc.ABCMeta):
raise NotImplementedError()
+@attr.s
+class _PresenceQueue:
+ """A queue of destinations that need to be woken up due to new presence
+ updates.
+
+ Staggers waking up of per destination queues to ensure that we don't attempt
+ to start TLS connections with many hosts all at once, leading to pinned CPU.
+ """
+
+ # The maximum duration in seconds between queuing up a destination and it
+ # being woken up.
+ _MAX_TIME_IN_QUEUE = 30.0
+
+ # The maximum duration in seconds between waking up consecutive destination
+ # queues.
+ _MAX_DELAY = 0.1
+
+ sender: "FederationSender" = attr.ib()
+ clock: Clock = attr.ib()
+ queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
+ processing: bool = attr.ib(default=False)
+
+ def add_to_queue(self, destination: str) -> None:
+ """Add a destination to the queue to be woken up."""
+
+ self.queue[destination] = None
+
+ if not self.processing:
+ self._handle()
+
+ @wrap_as_background_process("_PresenceQueue.handle")
+ async def _handle(self) -> None:
+ """Background process to drain the queue."""
+
+ if not self.queue:
+ return
+
+ assert not self.processing
+ self.processing = True
+
+ try:
+ # We start with a delay that should drain the queue quickly enough that
+ # we process all destinations in the queue in _MAX_TIME_IN_QUEUE
+ # seconds.
+ #
+ # We also add an upper bound to the delay, to gracefully handle the
+ # case where the queue only has a few entries in it.
+ current_sleep_seconds = min(
+ self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
+ )
+
+ while self.queue:
+ destination, _ = self.queue.popitem(last=False)
+
+ queue = self.sender._get_per_destination_queue(destination)
+
+ if not queue._new_data_to_send:
+ # The per destination queue has already been woken up.
+ continue
+
+ queue.attempt_new_transaction()
+
+ await self.clock.sleep(current_sleep_seconds)
+
+ if not self.queue:
+ break
+
+ # More destinations may have been added to the queue, so we may
+ # need to reduce the delay to ensure everything gets processed
+ # within _MAX_TIME_IN_QUEUE seconds.
+ current_sleep_seconds = min(
+ current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue)
+ )
+
+ finally:
+ self.processing = False
+
+
class FederationSender(AbstractFederationSender):
def __init__(self, hs: "HomeServer"):
self.hs = hs
@@ -148,14 +233,14 @@ class FederationSender(AbstractFederationSender):
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
- self._presence_router = None # type: Optional[PresenceRouter]
+ self._presence_router: Optional["PresenceRouter"] = None
self._transaction_manager = TransactionManager(hs)
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
# map from destination to PerDestinationQueue
- self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
+ self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
@@ -192,9 +277,7 @@ class FederationSender(AbstractFederationSender):
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
# and that there is a pending call to _flush_rrs_for_room in the system.
- self._queues_awaiting_rr_flush_by_room = (
- {}
- ) # type: Dict[str, Set[PerDestinationQueue]]
+ self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
self._rr_txn_interval_per_room_ms = (
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
@@ -210,6 +293,8 @@ class FederationSender(AbstractFederationSender):
self._external_cache = hs.get_external_cache()
+ self._presence_queue = _PresenceQueue(self, self.clock)
+
def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
"""Get or create a PerDestinationQueue for the given destination
@@ -265,7 +350,7 @@ class FederationSender(AbstractFederationSender):
if not event.internal_metadata.should_proactively_send():
return
- destinations = None # type: Optional[Set[str]]
+ destinations: Optional[Set[str]] = None
if not event.prev_event_ids():
# If there are no prev event IDs then the state is empty
# and so no remote servers in the room
@@ -331,7 +416,7 @@ class FederationSender(AbstractFederationSender):
for event in events:
await handle_event(event)
- events_by_room = {} # type: Dict[str, List[EventBase]]
+ events_by_room: Dict[str, List[EventBase]] = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@@ -519,7 +604,12 @@ class FederationSender(AbstractFederationSender):
self._instance_name, destination
):
continue
- self._get_per_destination_queue(destination).send_presence(states)
+
+ self._get_per_destination_queue(destination).send_presence(
+ states, start_loop=False
+ )
+
+ self._presence_queue.add_to_queue(destination)
def build_and_send_edu(
self,
@@ -628,7 +718,7 @@ class FederationSender(AbstractFederationSender):
In order to reduce load spikes, adds a delay between each destination.
"""
- last_processed = None # type: Optional[str]
+ last_processed: Optional[str] = None
while True:
destinations_to_wake = (
|