diff options
Diffstat (limited to 'synapse/federation/sender/__init__.py')
-rw-r--r-- | synapse/federation/sender/__init__.py | 110 |
1 files changed, 100 insertions, 10 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index deb40f4610..d980e0d986 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -14,9 +14,12 @@ import abc import logging +from collections import OrderedDict from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple +import attr from prometheus_client import Counter +from typing_extensions import Literal from twisted.internet import defer @@ -33,8 +36,12 @@ from synapse.metrics import ( event_processing_loop_room_count, events_processed_counter, ) -from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.metrics.background_process_metrics import ( + run_as_background_process, + wrap_as_background_process, +) from synapse.types import JsonDict, ReadReceipt, RoomStreamToken +from synapse.util import Clock from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -137,6 +144,84 @@ class AbstractFederationSender(metaclass=abc.ABCMeta): raise NotImplementedError() +@attr.s +class _PresenceQueue: + """A queue of destinations that need to be woken up due to new presence + updates. + + Staggers waking up of per destination queues to ensure that we don't attempt + to start TLS connections with many hosts all at once, leading to pinned CPU. + """ + + # The maximum duration in seconds between queuing up a destination and it + # being woken up. + _MAX_TIME_IN_QUEUE = 30.0 + + # The maximum duration in seconds between waking up consecutive destination + # queues. + _MAX_DELAY = 0.1 + + sender: "FederationSender" = attr.ib() + clock: Clock = attr.ib() + queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict) + processing: bool = attr.ib(default=False) + + def add_to_queue(self, destination: str) -> None: + """Add a destination to the queue to be woken up.""" + + self.queue[destination] = None + + if not self.processing: + self._handle() + + @wrap_as_background_process("_PresenceQueue.handle") + async def _handle(self) -> None: + """Background process to drain the queue.""" + + if not self.queue: + return + + assert not self.processing + self.processing = True + + try: + # We start with a delay that should drain the queue quickly enough that + # we process all destinations in the queue in _MAX_TIME_IN_QUEUE + # seconds. + # + # We also add an upper bound to the delay, to gracefully handle the + # case where the queue only has a few entries in it. + current_sleep_seconds = min( + self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue) + ) + + while self.queue: + destination, _ = self.queue.popitem(last=False) + + queue = self.sender._get_per_destination_queue(destination) + + if not queue._new_data_to_send: + # The per destination queue has already been woken up. + continue + + queue.attempt_new_transaction() + + await self.clock.sleep(current_sleep_seconds) + + if not self.queue: + break + + # More destinations may have been added to the queue, so we may + # need to reduce the delay to ensure everything gets processed + # within _MAX_TIME_IN_QUEUE seconds. + current_sleep_seconds = min( + current_sleep_seconds, self._MAX_TIME_IN_QUEUE / len(self.queue) + ) + + finally: + self.processing = False + + class FederationSender(AbstractFederationSender): def __init__(self, hs: "HomeServer"): self.hs = hs @@ -148,14 +233,14 @@ class FederationSender(AbstractFederationSender): self.clock = hs.get_clock() self.is_mine_id = hs.is_mine_id - self._presence_router = None # type: Optional[PresenceRouter] + self._presence_router: Optional["PresenceRouter"] = None self._transaction_manager = TransactionManager(hs) self._instance_name = hs.get_instance_name() self._federation_shard_config = hs.config.worker.federation_shard_config # map from destination to PerDestinationQueue - self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] + self._per_destination_queues: Dict[str, PerDestinationQueue] = {} LaterGauge( "synapse_federation_transaction_queue_pending_destinations", @@ -192,9 +277,7 @@ class FederationSender(AbstractFederationSender): # awaiting a call to flush_read_receipts_for_room. The presence of an entry # here for a given room means that we are rate-limiting RR flushes to that room, # and that there is a pending call to _flush_rrs_for_room in the system. - self._queues_awaiting_rr_flush_by_room = ( - {} - ) # type: Dict[str, Set[PerDestinationQueue]] + self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {} self._rr_txn_interval_per_room_ms = ( 1000.0 / hs.config.federation_rr_transactions_per_room_per_second @@ -210,6 +293,8 @@ class FederationSender(AbstractFederationSender): self._external_cache = hs.get_external_cache() + self._presence_queue = _PresenceQueue(self, self.clock) + def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination @@ -265,7 +350,7 @@ class FederationSender(AbstractFederationSender): if not event.internal_metadata.should_proactively_send(): return - destinations = None # type: Optional[Set[str]] + destinations: Optional[Set[str]] = None if not event.prev_event_ids(): # If there are no prev event IDs then the state is empty # and so no remote servers in the room @@ -331,7 +416,7 @@ class FederationSender(AbstractFederationSender): for event in events: await handle_event(event) - events_by_room = {} # type: Dict[str, List[EventBase]] + events_by_room: Dict[str, List[EventBase]] = {} for event in events: events_by_room.setdefault(event.room_id, []).append(event) @@ -519,7 +604,12 @@ class FederationSender(AbstractFederationSender): self._instance_name, destination ): continue - self._get_per_destination_queue(destination).send_presence(states) + + self._get_per_destination_queue(destination).send_presence( + states, start_loop=False + ) + + self._presence_queue.add_to_queue(destination) def build_and_send_edu( self, @@ -628,7 +718,7 @@ class FederationSender(AbstractFederationSender): In order to reduce load spikes, adds a delay between each destination. """ - last_processed = None # type: Optional[str] + last_processed: Optional[str] = None while True: destinations_to_wake = ( |