diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 0047907cd9..6460eb9952 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1,4 +1,3 @@
-# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2020 The Matrix.org Foundation C.I.C.
#
@@ -23,6 +22,7 @@ The methods that define policy are:
- should_notify
"""
import abc
+import contextlib
import logging
from contextlib import contextmanager
from typing import (
@@ -49,6 +49,11 @@ from synapse.logging.context import run_in_background
from synapse.logging.utils import log_function
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.http.presence import (
+ ReplicationBumpPresenceActiveTime,
+ ReplicationPresenceSetState,
+)
+from synapse.replication.tcp.commands import ClearUserSyncsCommand
from synapse.state import StateHandler
from synapse.storage.databases.main import DataStore
from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
@@ -105,6 +110,10 @@ FEDERATION_PING_INTERVAL = 25 * 60 * 1000
# are dead.
EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
+# Delay before a worker tells the presence handler that a user has stopped
+# syncing.
+UPDATE_SYNCING_USERS_MS = 10 * 1000
+
assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
@@ -114,6 +123,14 @@ class BasePresenceHandler(abc.ABC):
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastore()
+ self.presence_router = hs.get_presence_router()
+ self.state = hs.get_state_handler()
+
+ self._federation = None
+ if hs.should_send_federation() or not hs.config.worker_app:
+ self._federation = hs.get_federation_sender()
+
+ self._send_federation = hs.should_send_federation()
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
@@ -209,6 +226,267 @@ class BasePresenceHandler(abc.ABC):
with the app.
"""
+ async def update_external_syncs_row(
+ self, process_id, user_id, is_syncing, sync_time_msec
+ ):
+ """Update the syncing users for an external process as a delta.
+
+ This is a no-op when presence is handled by a different worker.
+
+ Args:
+ process_id (str): An identifier for the process the users are
+ syncing against. This allows synapse to process updates
+ as user start and stop syncing against a given process.
+ user_id (str): The user who has started or stopped syncing
+ is_syncing (bool): Whether or not the user is now syncing
+ sync_time_msec(int): Time in ms when the user was last syncing
+ """
+ pass
+
+ async def update_external_syncs_clear(self, process_id):
+ """Marks all users that had been marked as syncing by a given process
+ as offline.
+
+ Used when the process has stopped/disappeared.
+
+ This is a no-op when presence is handled by a different worker.
+ """
+ pass
+
+ async def process_replication_rows(self, token, rows):
+ """Process presence stream rows received over replication."""
+ pass
+
+ async def maybe_send_presence_to_interested_destinations(
+ self, states: List[UserPresenceState]
+ ):
+ """If this instance is a federation sender, send the states to all
+ destinations that are interested.
+ """
+
+ if not self._send_federation:
+ return
+
+ # If this worker sends federation we must have a FederationSender.
+ assert self._federation
+
+ hosts_and_states = await get_interested_remotes(
+ self.store,
+ self.presence_router,
+ states,
+ self.state,
+ )
+
+ for destinations, states in hosts_and_states:
+ self._federation.send_presence_to_destinations(states, destinations)
+
+
+class _NullContextManager(ContextManager[None]):
+ """A context manager which does nothing."""
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+
+class WorkerPresenceHandler(BasePresenceHandler):
+ def __init__(self, hs):
+ super().__init__(hs)
+ self.hs = hs
+ self.is_mine_id = hs.is_mine_id
+
+ self._presence_enabled = hs.config.use_presence
+
+ # The number of ongoing syncs on this process, by user id.
+ # Empty if _presence_enabled is false.
+ self._user_to_num_current_syncs = {} # type: Dict[str, int]
+
+ self.notifier = hs.get_notifier()
+ self.instance_id = hs.get_instance_id()
+
+ # user_id -> last_sync_ms. Lists the users that have stopped syncing
+ # but we haven't notified the master of that yet
+ self.users_going_offline = {}
+
+ self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
+ self._set_state_client = ReplicationPresenceSetState.make_client(hs)
+
+ self._send_stop_syncing_loop = self.clock.looping_call(
+ self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
+ )
+
+ self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
+
+ hs.get_reactor().addSystemEventTrigger(
+ "before",
+ "shutdown",
+ run_as_background_process,
+ "generic_presence.on_shutdown",
+ self._on_shutdown,
+ )
+
+ def _on_shutdown(self):
+ if self._presence_enabled:
+ self.hs.get_tcp_replication().send_command(
+ ClearUserSyncsCommand(self.instance_id)
+ )
+
+ def send_user_sync(self, user_id, is_syncing, last_sync_ms):
+ if self._presence_enabled:
+ self.hs.get_tcp_replication().send_user_sync(
+ self.instance_id, user_id, is_syncing, last_sync_ms
+ )
+
+ def mark_as_coming_online(self, user_id):
+ """A user has started syncing. Send a UserSync to the master, unless they
+ had recently stopped syncing.
+
+ Args:
+ user_id (str)
+ """
+ going_offline = self.users_going_offline.pop(user_id, None)
+ if not going_offline:
+ # Safe to skip because we haven't yet told the master they were offline
+ self.send_user_sync(user_id, True, self.clock.time_msec())
+
+ def mark_as_going_offline(self, user_id):
+ """A user has stopped syncing. We wait before notifying the master as
+ its likely they'll come back soon. This allows us to avoid sending
+ a stopped syncing immediately followed by a started syncing notification
+ to the master
+
+ Args:
+ user_id (str)
+ """
+ self.users_going_offline[user_id] = self.clock.time_msec()
+
+ def send_stop_syncing(self):
+ """Check if there are any users who have stopped syncing a while ago
+ and haven't come back yet. If there are poke the master about them.
+ """
+ now = self.clock.time_msec()
+ for user_id, last_sync_ms in list(self.users_going_offline.items()):
+ if now - last_sync_ms > UPDATE_SYNCING_USERS_MS:
+ self.users_going_offline.pop(user_id, None)
+ self.send_user_sync(user_id, False, last_sync_ms)
+
+ async def user_syncing(
+ self, user_id: str, affect_presence: bool
+ ) -> ContextManager[None]:
+ """Record that a user is syncing.
+
+ Called by the sync and events servlets to record that a user has connected to
+ this worker and is waiting for some events.
+ """
+ if not affect_presence or not self._presence_enabled:
+ return _NullContextManager()
+
+ curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
+ self._user_to_num_current_syncs[user_id] = curr_sync + 1
+
+ # If we went from no in flight sync to some, notify replication
+ if self._user_to_num_current_syncs[user_id] == 1:
+ self.mark_as_coming_online(user_id)
+
+ def _end():
+ # We check that the user_id is in user_to_num_current_syncs because
+ # user_to_num_current_syncs may have been cleared if we are
+ # shutting down.
+ if user_id in self._user_to_num_current_syncs:
+ self._user_to_num_current_syncs[user_id] -= 1
+
+ # If we went from one in flight sync to non, notify replication
+ if self._user_to_num_current_syncs[user_id] == 0:
+ self.mark_as_going_offline(user_id)
+
+ @contextlib.contextmanager
+ def _user_syncing():
+ try:
+ yield
+ finally:
+ _end()
+
+ return _user_syncing()
+
+ async def notify_from_replication(self, states, stream_id):
+ parties = await get_interested_parties(self.store, self.presence_router, states)
+ room_ids_to_states, users_to_states = parties
+
+ self.notifier.on_new_event(
+ "presence_key",
+ stream_id,
+ rooms=room_ids_to_states.keys(),
+ users=users_to_states.keys(),
+ )
+
+ # If this is a federation sender, notify about presence updates.
+ await self.maybe_send_presence_to_interested_destinations(states)
+
+ async def process_replication_rows(self, token, rows):
+ states = [
+ UserPresenceState(
+ row.user_id,
+ row.state,
+ row.last_active_ts,
+ row.last_federation_update_ts,
+ row.last_user_sync_ts,
+ row.status_msg,
+ row.currently_active,
+ )
+ for row in rows
+ ]
+
+ for state in states:
+ self.user_to_current_state[state.user_id] = state
+
+ stream_id = token
+ await self.notify_from_replication(states, stream_id)
+
+ def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
+ return [
+ user_id
+ for user_id, count in self._user_to_num_current_syncs.items()
+ if count > 0
+ ]
+
+ async def set_state(self, target_user, state, ignore_status_msg=False):
+ """Set the presence state of the user."""
+ presence = state["presence"]
+
+ valid_presence = (
+ PresenceState.ONLINE,
+ PresenceState.UNAVAILABLE,
+ PresenceState.OFFLINE,
+ PresenceState.BUSY,
+ )
+
+ if presence not in valid_presence or (
+ presence == PresenceState.BUSY and not self._busy_presence_enabled
+ ):
+ raise SynapseError(400, "Invalid presence state")
+
+ user_id = target_user.to_string()
+
+ # If presence is disabled, no-op
+ if not self.hs.config.use_presence:
+ return
+
+ # Proxy request to master
+ await self._set_state_client(
+ user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
+ )
+
+ async def bump_presence_active_time(self, user):
+ """We've seen the user do something that indicates they're interacting
+ with the app.
+ """
+ # If presence is disabled, no-op
+ if not self.hs.config.use_presence:
+ return
+
+ # Proxy request to master
+ user_id = user.to_string()
+ await self._bump_active_client(user_id=user_id)
+
class PresenceHandler(BasePresenceHandler):
def __init__(self, hs: "HomeServer"):
@@ -218,9 +496,6 @@ class PresenceHandler(BasePresenceHandler):
self.server_name = hs.hostname
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
- self.federation = hs.get_federation_sender()
- self.state = hs.get_state_handler()
- self.presence_router = hs.get_presence_router()
self._presence_enabled = hs.config.use_presence
federation_registry = hs.get_federation_registry()
@@ -427,6 +702,13 @@ class PresenceHandler(BasePresenceHandler):
self.unpersisted_users_changes |= {s.user_id for s in new_states}
self.unpersisted_users_changes -= set(to_notify.keys())
+ # Check if we need to resend any presence states to remote hosts. We
+ # only do this for states that haven't been updated in a while to
+ # ensure that the remote host doesn't time the presence state out.
+ #
+ # Note that since these are states that have *not* been updated,
+ # they won't get sent down the normal presence replication stream,
+ # and so we have to explicitly send them via the federation stream.
to_federation_ping = {
user_id: state
for user_id, state in to_federation_ping.items()
@@ -435,7 +717,19 @@ class PresenceHandler(BasePresenceHandler):
if to_federation_ping:
federation_presence_out_counter.inc(len(to_federation_ping))
- self._push_to_remotes(to_federation_ping.values())
+ hosts_and_states = await get_interested_remotes(
+ self.store,
+ self.presence_router,
+ list(to_federation_ping.values()),
+ self.state,
+ )
+
+ # Since this is master we know that we have a federation sender or
+ # queue, and so this will be defined.
+ assert self._federation
+
+ for destinations, states in hosts_and_states:
+ self._federation.send_presence_to_destinations(states, destinations)
async def _handle_timeouts(self):
"""Checks the presence of users that have timed out and updates as
@@ -675,15 +969,10 @@ class PresenceHandler(BasePresenceHandler):
users=[UserID.from_string(u) for u in users_to_states],
)
- self._push_to_remotes(states)
-
- def _push_to_remotes(self, states):
- """Sends state updates to remote servers.
-
- Args:
- states (list(UserPresenceState))
- """
- self.federation.send_presence(states)
+ # We only want to poke the local federation sender, if any, as other
+ # workers will receive the presence updates via the presence replication
+ # stream (which is updated by `store.update_presence`).
+ await self.maybe_send_presence_to_interested_destinations(states)
async def incoming_presence(self, origin, content):
"""Called when we receive a `m.presence` EDU from a remote server."""
@@ -919,9 +1208,13 @@ class PresenceHandler(BasePresenceHandler):
user_presence_states
)
+ # Since this is master we know that we have a federation sender or
+ # queue, and so this will be defined.
+ assert self._federation
+
# Send out user presence updates for each destination
for destination, user_state_set in presence_destinations.items():
- self.federation.send_presence_to_destinations(
+ self._federation.send_presence_to_destinations(
destinations=[destination], states=user_state_set
)
|