From 04819239bae2b39ee42bfdb6f9b83c6d9fe34169 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 6 Apr 2021 14:38:30 +0100 Subject: Add a Synapse Module for configuring presence update routing (#9491) At the moment, if you'd like to share presence between local or remote users, those users must be sharing a room together. This isn't always the most convenient or useful situation though. This PR adds a module to Synapse that will allow deployments to set up extra logic on where presence updates should be routed. The module must implement two methods, `get_users_for_states` and `get_interested_users`. These methods are given presence updates or user IDs and must return information that Synapse will use to grant passing presence updates around. A method is additionally added to `ModuleApi` which allows triggering a set of users to receive the current, online presence information for all users they are considered interested in. This is the equivalent of that user receiving presence information during an initial sync. The goal of this module is to be fairly generic and useful for a variety of applications, with hard requirements being: * Sending state for a specific set or all known users to a defined set of local and remote users. * The ability to trigger an initial sync for specific users, so they receive all current state. --- synapse/federation/sender/__init__.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) (limited to 'synapse/federation/sender/__init__.py') diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 8babb1ebbe..98bfce22ff 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -44,6 +44,7 @@ from synapse.types import JsonDict, ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func if TYPE_CHECKING: + from synapse.events.presence_router import PresenceRouter from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -162,6 +163,7 @@ class FederationSender(AbstractFederationSender): self.clock = hs.get_clock() self.is_mine_id = hs.is_mine_id + self._presence_router = None # type: Optional[PresenceRouter] self._transaction_manager = TransactionManager(hs) self._instance_name = hs.get_instance_name() @@ -584,7 +586,22 @@ class FederationSender(AbstractFederationSender): """Given a list of states populate self.pending_presence_by_dest and poke to send a new transaction to each destination """ - hosts_and_states = await get_interested_remotes(self.store, states, self.state) + # We pull the presence router here instead of __init__ + # to prevent a dependency cycle: + # + # AuthHandler -> Notifier -> FederationSender + # -> PresenceRouter -> ModuleApi -> AuthHandler + if self._presence_router is None: + self._presence_router = self.hs.get_presence_router() + + assert self._presence_router is not None + + hosts_and_states = await get_interested_remotes( + self.store, + self._presence_router, + states, + self.state, + ) for destinations, states in hosts_and_states: for destination in destinations: -- cgit 1.5.1 From 3a569fb2000e972efe2e145d57ffd9441ee41665 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 8 Apr 2021 17:30:01 +0100 Subject: Fix sharded federation sender sometimes using 100% CPU. We pull all destinations requiring catchup from the DB in batches. However, if all those destinations get filtered out (due to the federation sender being sharded), then the `last_processed` destination doesn't get updated, and we keep requesting the same set repeatedly. --- changelog.d/9770.bugfix | 1 + synapse/federation/sender/__init__.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 changelog.d/9770.bugfix (limited to 'synapse/federation/sender/__init__.py') diff --git a/changelog.d/9770.bugfix b/changelog.d/9770.bugfix new file mode 100644 index 0000000000..baf93138de --- /dev/null +++ b/changelog.d/9770.bugfix @@ -0,0 +1 @@ +Fix bug where sharded federation senders could get stuck repeatedly querying the DB in a loop, using lots of CPU. diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 98bfce22ff..d821dcbf6a 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -734,16 +734,18 @@ class FederationSender(AbstractFederationSender): self._catchup_after_startup_timer = None break + last_processed = destinations_to_wake[-1] + destinations_to_wake = [ d for d in destinations_to_wake if self._federation_shard_config.should_handle(self._instance_name, d) ] - for last_processed in destinations_to_wake: + for destination in destinations_to_wake: logger.info( "Destination %s has outstanding catch-up, waking up.", last_processed, ) - self.wake_destination(last_processed) + self.wake_destination(destination) await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC) -- cgit 1.5.1