diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/presence.py | 78 |
1 files changed, 63 insertions, 15 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e120dd1f48..6460eb9952 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -123,6 +123,14 @@ class BasePresenceHandler(abc.ABC): def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() self.store = hs.get_datastore() + self.presence_router = hs.get_presence_router() + self.state = hs.get_state_handler() + + self._federation = None + if hs.should_send_federation() or not hs.config.worker_app: + self._federation = hs.get_federation_sender() + + self._send_federation = hs.should_send_federation() self._busy_presence_enabled = hs.config.experimental.msc3026_enabled @@ -249,6 +257,29 @@ class BasePresenceHandler(abc.ABC): """Process presence stream rows received over replication.""" pass + async def maybe_send_presence_to_interested_destinations( + self, states: List[UserPresenceState] + ): + """If this instance is a federation sender, send the states to all + destinations that are interested. + """ + + if not self._send_federation: + return + + # If this worker sends federation we must have a FederationSender. + assert self._federation + + hosts_and_states = await get_interested_remotes( + self.store, + self.presence_router, + states, + self.state, + ) + + for destinations, states in hosts_and_states: + self._federation.send_presence_to_destinations(states, destinations) + class _NullContextManager(ContextManager[None]): """A context manager which does nothing.""" @@ -263,7 +294,6 @@ class WorkerPresenceHandler(BasePresenceHandler): self.hs = hs self.is_mine_id = hs.is_mine_id - self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence # The number of ongoing syncs on this process, by user id. @@ -388,6 +418,9 @@ class WorkerPresenceHandler(BasePresenceHandler): users=users_to_states.keys(), ) + # If this is a federation sender, notify about presence updates. + await self.maybe_send_presence_to_interested_destinations(states) + async def process_replication_rows(self, token, rows): states = [ UserPresenceState( @@ -463,9 +496,6 @@ class PresenceHandler(BasePresenceHandler): self.server_name = hs.hostname self.wheel_timer = WheelTimer() self.notifier = hs.get_notifier() - self.federation = hs.get_federation_sender() - self.state = hs.get_state_handler() - self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence federation_registry = hs.get_federation_registry() @@ -672,6 +702,13 @@ class PresenceHandler(BasePresenceHandler): self.unpersisted_users_changes |= {s.user_id for s in new_states} self.unpersisted_users_changes -= set(to_notify.keys()) + # Check if we need to resend any presence states to remote hosts. We + # only do this for states that haven't been updated in a while to + # ensure that the remote host doesn't time the presence state out. + # + # Note that since these are states that have *not* been updated, + # they won't get sent down the normal presence replication stream, + # and so we have to explicitly send them via the federation stream. to_federation_ping = { user_id: state for user_id, state in to_federation_ping.items() @@ -680,7 +717,19 @@ class PresenceHandler(BasePresenceHandler): if to_federation_ping: federation_presence_out_counter.inc(len(to_federation_ping)) - self._push_to_remotes(to_federation_ping.values()) + hosts_and_states = await get_interested_remotes( + self.store, + self.presence_router, + list(to_federation_ping.values()), + self.state, + ) + + # Since this is master we know that we have a federation sender or + # queue, and so this will be defined. + assert self._federation + + for destinations, states in hosts_and_states: + self._federation.send_presence_to_destinations(states, destinations) async def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -920,15 +969,10 @@ class PresenceHandler(BasePresenceHandler): users=[UserID.from_string(u) for u in users_to_states], ) - self._push_to_remotes(states) - - def _push_to_remotes(self, states): - """Sends state updates to remote servers. - - Args: - states (list(UserPresenceState)) - """ - self.federation.send_presence(states) + # We only want to poke the local federation sender, if any, as other + # workers will receive the presence updates via the presence replication + # stream (which is updated by `store.update_presence`). + await self.maybe_send_presence_to_interested_destinations(states) async def incoming_presence(self, origin, content): """Called when we receive a `m.presence` EDU from a remote server.""" @@ -1164,9 +1208,13 @@ class PresenceHandler(BasePresenceHandler): user_presence_states ) + # Since this is master we know that we have a federation sender or + # queue, and so this will be defined. + assert self._federation + # Send out user presence updates for each destination for destination, user_state_set in presence_destinations.items(): - self.federation.send_presence_to_destinations( + self._federation.send_presence_to_destinations( destinations=[destination], states=user_state_set ) |