summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/presence.py78
1 files changed, 63 insertions, 15 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e120dd1f48..6460eb9952 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -123,6 +123,14 @@ class BasePresenceHandler(abc.ABC):
     def __init__(self, hs: "HomeServer"):
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
+        self.presence_router = hs.get_presence_router()
+        self.state = hs.get_state_handler()
+
+        self._federation = None
+        if hs.should_send_federation() or not hs.config.worker_app:
+            self._federation = hs.get_federation_sender()
+
+        self._send_federation = hs.should_send_federation()
 
         self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
 
@@ -249,6 +257,29 @@ class BasePresenceHandler(abc.ABC):
         """Process presence stream rows received over replication."""
         pass
 
+    async def maybe_send_presence_to_interested_destinations(
+        self, states: List[UserPresenceState]
+    ):
+        """If this instance is a federation sender, send the states to all
+        destinations that are interested.
+        """
+
+        if not self._send_federation:
+            return
+
+        # If this worker sends federation we must have a FederationSender.
+        assert self._federation
+
+        hosts_and_states = await get_interested_remotes(
+            self.store,
+            self.presence_router,
+            states,
+            self.state,
+        )
+
+        for destinations, states in hosts_and_states:
+            self._federation.send_presence_to_destinations(states, destinations)
+
 
 class _NullContextManager(ContextManager[None]):
     """A context manager which does nothing."""
@@ -263,7 +294,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
         self.hs = hs
         self.is_mine_id = hs.is_mine_id
 
-        self.presence_router = hs.get_presence_router()
         self._presence_enabled = hs.config.use_presence
 
         # The number of ongoing syncs on this process, by user id.
@@ -388,6 +418,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
             users=users_to_states.keys(),
         )
 
+        # If this is a federation sender, notify about presence updates.
+        await self.maybe_send_presence_to_interested_destinations(states)
+
     async def process_replication_rows(self, token, rows):
         states = [
             UserPresenceState(
@@ -463,9 +496,6 @@ class PresenceHandler(BasePresenceHandler):
         self.server_name = hs.hostname
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
-        self.federation = hs.get_federation_sender()
-        self.state = hs.get_state_handler()
-        self.presence_router = hs.get_presence_router()
         self._presence_enabled = hs.config.use_presence
 
         federation_registry = hs.get_federation_registry()
@@ -672,6 +702,13 @@ class PresenceHandler(BasePresenceHandler):
             self.unpersisted_users_changes |= {s.user_id for s in new_states}
             self.unpersisted_users_changes -= set(to_notify.keys())
 
+            # Check if we need to resend any presence states to remote hosts. We
+            # only do this for states that haven't been updated in a while to
+            # ensure that the remote host doesn't time the presence state out.
+            #
+            # Note that since these are states that have *not* been updated,
+            # they won't get sent down the normal presence replication stream,
+            # and so we have to explicitly send them via the federation stream.
             to_federation_ping = {
                 user_id: state
                 for user_id, state in to_federation_ping.items()
@@ -680,7 +717,19 @@ class PresenceHandler(BasePresenceHandler):
             if to_federation_ping:
                 federation_presence_out_counter.inc(len(to_federation_ping))
 
-                self._push_to_remotes(to_federation_ping.values())
+                hosts_and_states = await get_interested_remotes(
+                    self.store,
+                    self.presence_router,
+                    list(to_federation_ping.values()),
+                    self.state,
+                )
+
+                # Since this is master we know that we have a federation sender or
+                # queue, and so this will be defined.
+                assert self._federation
+
+                for destinations, states in hosts_and_states:
+                    self._federation.send_presence_to_destinations(states, destinations)
 
     async def _handle_timeouts(self):
         """Checks the presence of users that have timed out and updates as
@@ -920,15 +969,10 @@ class PresenceHandler(BasePresenceHandler):
             users=[UserID.from_string(u) for u in users_to_states],
         )
 
-        self._push_to_remotes(states)
-
-    def _push_to_remotes(self, states):
-        """Sends state updates to remote servers.
-
-        Args:
-            states (list(UserPresenceState))
-        """
-        self.federation.send_presence(states)
+        # We only want to poke the local federation sender, if any, as other
+        # workers will receive the presence updates via the presence replication
+        # stream (which is updated by `store.update_presence`).
+        await self.maybe_send_presence_to_interested_destinations(states)
 
     async def incoming_presence(self, origin, content):
         """Called when we receive a `m.presence` EDU from a remote server."""
@@ -1164,9 +1208,13 @@ class PresenceHandler(BasePresenceHandler):
                     user_presence_states
                 )
 
+        # Since this is master we know that we have a federation sender or
+        # queue, and so this will be defined.
+        assert self._federation
+
         # Send out user presence updates for each destination
         for destination, user_state_set in presence_destinations.items():
-            self.federation.send_presence_to_destinations(
+            self._federation.send_presence_to_destinations(
                 destinations=[destination], states=user_state_set
             )