diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e120dd1f48..6460eb9952 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -123,6 +123,14 @@ class BasePresenceHandler(abc.ABC):
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastore()
+ self.presence_router = hs.get_presence_router()
+ self.state = hs.get_state_handler()
+
+ self._federation = None
+ if hs.should_send_federation() or not hs.config.worker_app:
+ self._federation = hs.get_federation_sender()
+
+ self._send_federation = hs.should_send_federation()
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
@@ -249,6 +257,29 @@ class BasePresenceHandler(abc.ABC):
"""Process presence stream rows received over replication."""
pass
+ async def maybe_send_presence_to_interested_destinations(
+ self, states: List[UserPresenceState]
+ ):
+ """If this instance is a federation sender, send the states to all
+ destinations that are interested.
+ """
+
+ if not self._send_federation:
+ return
+
+ # If this worker sends federation we must have a FederationSender.
+ assert self._federation
+
+ hosts_and_states = await get_interested_remotes(
+ self.store,
+ self.presence_router,
+ states,
+ self.state,
+ )
+
+ for destinations, states in hosts_and_states:
+ self._federation.send_presence_to_destinations(states, destinations)
+
class _NullContextManager(ContextManager[None]):
"""A context manager which does nothing."""
@@ -263,7 +294,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.hs = hs
self.is_mine_id = hs.is_mine_id
- self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
# The number of ongoing syncs on this process, by user id.
@@ -388,6 +418,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
users=users_to_states.keys(),
)
+ # If this is a federation sender, notify about presence updates.
+ await self.maybe_send_presence_to_interested_destinations(states)
+
async def process_replication_rows(self, token, rows):
states = [
UserPresenceState(
@@ -463,9 +496,6 @@ class PresenceHandler(BasePresenceHandler):
self.server_name = hs.hostname
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
- self.federation = hs.get_federation_sender()
- self.state = hs.get_state_handler()
- self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
federation_registry = hs.get_federation_registry()
@@ -672,6 +702,13 @@ class PresenceHandler(BasePresenceHandler):
self.unpersisted_users_changes |= {s.user_id for s in new_states}
self.unpersisted_users_changes -= set(to_notify.keys())
+ # Check if we need to resend any presence states to remote hosts. We
+ # only do this for states that haven't been updated in a while to
+ # ensure that the remote host doesn't time the presence state out.
+ #
+ # Note that since these are states that have *not* been updated,
+ # they won't get sent down the normal presence replication stream,
+ # and so we have to explicitly send them via the federation stream.
to_federation_ping = {
user_id: state
for user_id, state in to_federation_ping.items()
@@ -680,7 +717,19 @@ class PresenceHandler(BasePresenceHandler):
if to_federation_ping:
federation_presence_out_counter.inc(len(to_federation_ping))
- self._push_to_remotes(to_federation_ping.values())
+ hosts_and_states = await get_interested_remotes(
+ self.store,
+ self.presence_router,
+ list(to_federation_ping.values()),
+ self.state,
+ )
+
+ # Since this is master we know that we have a federation sender or
+ # queue, and so this will be defined.
+ assert self._federation
+
+ for destinations, states in hosts_and_states:
+ self._federation.send_presence_to_destinations(states, destinations)
async def _handle_timeouts(self):
"""Checks the presence of users that have timed out and updates as
@@ -920,15 +969,10 @@ class PresenceHandler(BasePresenceHandler):
users=[UserID.from_string(u) for u in users_to_states],
)
- self._push_to_remotes(states)
-
- def _push_to_remotes(self, states):
- """Sends state updates to remote servers.
-
- Args:
- states (list(UserPresenceState))
- """
- self.federation.send_presence(states)
+ # We only want to poke the local federation sender, if any, as other
+ # workers will receive the presence updates via the presence replication
+ # stream (which is updated by `store.update_presence`).
+ await self.maybe_send_presence_to_interested_destinations(states)
async def incoming_presence(self, origin, content):
"""Called when we receive a `m.presence` EDU from a remote server."""
@@ -1164,9 +1208,13 @@ class PresenceHandler(BasePresenceHandler):
user_presence_states
)
+ # Since this is master we know that we have a federation sender or
+ # queue, and so this will be defined.
+ assert self._federation
+
# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
- self.federation.send_presence_to_destinations(
+ self._federation.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)
|