diff --git a/synapse/api/presence.py b/synapse/api/presence.py
index b80aa83cb3..b78f419994 100644
--- a/synapse/api/presence.py
+++ b/synapse/api/presence.py
@@ -20,18 +20,53 @@ from synapse.api.constants import PresenceState
from synapse.types import JsonDict
+@attr.s(slots=True, auto_attribs=True)
+class UserDevicePresenceState:
+ """
+ Represents the current presence state of a user's device.
+
+ user_id: The user ID.
+ device_id: The user's device ID.
+ state: The presence state, see PresenceState.
+ last_active_ts: Time in msec that the device last interacted with server.
+ last_sync_ts: Time in msec that the device last *completed* a sync
+ (or event stream).
+ """
+
+ user_id: str
+ device_id: Optional[str]
+ state: str
+ last_active_ts: int
+ last_sync_ts: int
+
+ @classmethod
+ def default(
+ cls, user_id: str, device_id: Optional[str]
+ ) -> "UserDevicePresenceState":
+ """Returns a default presence state."""
+ return cls(
+ user_id=user_id,
+ device_id=device_id,
+ state=PresenceState.OFFLINE,
+ last_active_ts=0,
+ last_sync_ts=0,
+ )
+
+
@attr.s(slots=True, frozen=True, auto_attribs=True)
class UserPresenceState:
"""Represents the current presence state of the user.
- user_id
- last_active: Time in msec that the user last interacted with server.
- last_federation_update: Time in msec since either a) we sent a presence
+ user_id: The user ID.
+ state: The presence state, see PresenceState.
+ last_active_ts: Time in msec that the user last interacted with server.
+ last_federation_update_ts: Time in msec since either a) we sent a presence
update to other servers or b) we received a presence update, depending
on if is a local user or not.
- last_user_sync: Time in msec that the user last *completed* a sync
+ last_user_sync_ts: Time in msec that the user last *completed* a sync
(or event stream).
status_msg: User set status message.
+ currently_active: True if the user is currently syncing.
"""
user_id: str
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index f31e18328b..80190838b7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -13,13 +13,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-"""This module is responsible for keeping track of presence status of local
+"""
+This module is responsible for keeping track of presence status of local
and remote users.
The methods that define policy are:
- PresenceHandler._update_states
- PresenceHandler._handle_timeouts
- should_notify
+
+# Tracking local presence
+
+For local users, presence is tracked on a per-device basis. When a user has multiple
+devices the user presence state is derived by coalescing the presence from each
+device:
+
+ BUSY > ONLINE > UNAVAILABLE > OFFLINE
+
+The time that each device was last active and last synced is tracked in order to
+automatically downgrade a device's presence state:
+
+ A device may move from ONLINE -> UNAVAILABLE, if it has not been active for
+ a period of time.
+
+ A device may go from any state -> OFFLINE, if it is not active and has not
+ synced for a period of time.
+
+The timeouts are handled using a wheel timer, which has coarse buckets. Timings
+do not need to be exact.
+
+Generally a device's presence state is updated whenever a user syncs (via the
+set_presence parameter), when the presence API is called, or if "pro-active"
+events occur, including:
+
+* Sending an event, receipt, read marker.
+* Updating typing status.
+
+The busy state has special status that it cannot is not downgraded by a call to
+sync with a lower priority state *and* it takes a long period of time to transition
+to offline.
+
+# Persisting (and restoring) presence
+
+For all users, presence is persisted on a per-user basis. Data is kept in-memory
+and persisted periodically. When Synapse starts each worker loads the current
+presence state and then tracks the presence stream to keep itself up-to-date.
+
+When restoring presence for local users a pseudo-device is created to match the
+user state; this device follows the normal timeout logic (see above) and will
+automatically be replaced with any information from currently available devices.
+
"""
import abc
import contextlib
@@ -30,6 +73,7 @@ from contextlib import contextmanager
from types import TracebackType
from typing import (
TYPE_CHECKING,
+ AbstractSet,
Any,
Callable,
Collection,
@@ -49,7 +93,7 @@ from prometheus_client import Counter
import synapse.metrics
from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
-from synapse.api.presence import UserPresenceState
+from synapse.api.presence import UserDevicePresenceState, UserPresenceState
from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
@@ -162,6 +206,7 @@ class BasePresenceHandler(abc.ABC):
self.VALID_PRESENCE += (PresenceState.BUSY,)
active_presence = self.store.take_presence_startup_info()
+ # The combined status across all user devices.
self.user_to_current_state = {state.user_id: state for state in active_presence}
@abc.abstractmethod
@@ -708,9 +753,27 @@ class PresenceHandler(BasePresenceHandler):
lambda: len(self.user_to_current_state),
)
+ # The per-device presence state, maps user to devices to per-device presence state.
+ self._user_to_device_to_current_state: Dict[
+ str, Dict[Optional[str], UserDevicePresenceState]
+ ] = {}
+
now = self.clock.time_msec()
if self._presence_enabled:
for state in self.user_to_current_state.values():
+ # Create a psuedo-device to properly handle time outs. This will
+ # be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT.
+ pseudo_device_id = None
+ self._user_to_device_to_current_state[state.user_id] = {
+ pseudo_device_id: UserDevicePresenceState(
+ user_id=state.user_id,
+ device_id=pseudo_device_id,
+ state=state.state,
+ last_active_ts=state.last_active_ts,
+ last_sync_ts=state.last_user_sync_ts,
+ )
+ }
+
self.wheel_timer.insert(
now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
)
@@ -752,7 +815,7 @@ class PresenceHandler(BasePresenceHandler):
# Keeps track of the number of *ongoing* syncs on other processes.
#
- # While any sync is ongoing on another process the user will never
+ # While any sync is ongoing on another process the user's device will never
# go offline.
#
# Each process has a unique identifier and an update frequency. If
@@ -981,22 +1044,21 @@ class PresenceHandler(BasePresenceHandler):
timers_fired_counter.inc(len(states))
- syncing_user_ids = {
- user_id
- for (user_id, _), count in self._user_device_to_num_current_syncs.items()
+ # Set of user ID & device IDs which are currently syncing.
+ syncing_user_devices = {
+ user_id_device_id
+ for user_id_device_id, count in self._user_device_to_num_current_syncs.items()
if count
}
- syncing_user_ids.update(
- user_id
- for user_id, _ in itertools.chain(
- *self.external_process_to_current_syncs.values()
- )
+ syncing_user_devices.update(
+ itertools.chain(*self.external_process_to_current_syncs.values())
)
changes = handle_timeouts(
states,
is_mine_fn=self.is_mine_id,
- syncing_user_ids=syncing_user_ids,
+ syncing_user_devices=syncing_user_devices,
+ user_to_devices=self._user_to_device_to_current_state,
now=now,
)
@@ -1016,11 +1078,26 @@ class PresenceHandler(BasePresenceHandler):
bump_active_time_counter.inc()
- prev_state = await self.current_state_for_user(user_id)
+ now = self.clock.time_msec()
+
+ # Update the device information & mark the device as online if it was
+ # unavailable.
+ devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+ device_state = devices.setdefault(
+ device_id,
+ UserDevicePresenceState.default(user_id, device_id),
+ )
+ device_state.last_active_ts = now
+ if device_state.state == PresenceState.UNAVAILABLE:
+ device_state.state = PresenceState.ONLINE
- new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()}
- if prev_state.state == PresenceState.UNAVAILABLE:
- new_fields["state"] = PresenceState.ONLINE
+ # Update the user state, this will always update last_active_ts and
+ # might update the presence state.
+ prev_state = await self.current_state_for_user(user_id)
+ new_fields: Dict[str, Any] = {
+ "last_active_ts": now,
+ "state": _combine_device_states(devices.values()),
+ }
await self._update_states([prev_state.copy_and_replace(**new_fields)])
@@ -1132,6 +1209,12 @@ class PresenceHandler(BasePresenceHandler):
if is_syncing and (user_id, device_id) not in process_presence:
process_presence.add((user_id, device_id))
elif not is_syncing and (user_id, device_id) in process_presence:
+ devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+ device_state = devices.setdefault(
+ device_id, UserDevicePresenceState.default(user_id, device_id)
+ )
+ device_state.last_sync_ts = sync_time_msec
+
new_state = prev_state.copy_and_replace(
last_user_sync_ts=sync_time_msec
)
@@ -1151,11 +1234,24 @@ class PresenceHandler(BasePresenceHandler):
process_presence = self.external_process_to_current_syncs.pop(
process_id, set()
)
- prev_states = await self.current_state_for_users(
- {user_id for user_id, device_id in process_presence}
- )
+
time_now_ms = self.clock.time_msec()
+ # Mark each device as having a last sync time.
+ updated_users = set()
+ for user_id, device_id in process_presence:
+ device_state = self._user_to_device_to_current_state.setdefault(
+ user_id, {}
+ ).setdefault(
+ device_id, UserDevicePresenceState.default(user_id, device_id)
+ )
+
+ device_state.last_sync_ts = time_now_ms
+ updated_users.add(user_id)
+
+ # Update each user (and insert into the appropriate timers to check if
+ # they've gone offline).
+ prev_states = await self.current_state_for_users(updated_users)
await self._update_states(
[
prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
@@ -1277,6 +1373,20 @@ class PresenceHandler(BasePresenceHandler):
if prev_state.state == PresenceState.BUSY and is_sync:
presence = PresenceState.BUSY
+ # Update the device specific information.
+ devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+ device_state = devices.setdefault(
+ device_id,
+ UserDevicePresenceState.default(user_id, device_id),
+ )
+ device_state.state = presence
+ device_state.last_active_ts = now
+ if is_sync:
+ device_state.last_sync_ts = now
+
+ # Based on the state of each user's device calculate the new presence state.
+ presence = _combine_device_states(devices.values())
+
new_fields = {"state": presence}
if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
@@ -1873,7 +1983,8 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
def handle_timeouts(
user_states: List[UserPresenceState],
is_mine_fn: Callable[[str], bool],
- syncing_user_ids: Set[str],
+ syncing_user_devices: AbstractSet[Tuple[str, Optional[str]]],
+ user_to_devices: Dict[str, Dict[Optional[str], UserDevicePresenceState]],
now: int,
) -> List[UserPresenceState]:
"""Checks the presence of users that have timed out and updates as
@@ -1882,7 +1993,8 @@ def handle_timeouts(
Args:
user_states: List of UserPresenceState's to check.
is_mine_fn: Function that returns if a user_id is ours
- syncing_user_ids: Set of user_ids with active syncs.
+ syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
+ user_to_devices: A map of user ID to device ID to UserDevicePresenceState.
now: Current time in ms.
Returns:
@@ -1891,9 +2003,16 @@ def handle_timeouts(
changes = {} # Actual changes we need to notify people about
for state in user_states:
- is_mine = is_mine_fn(state.user_id)
-
- new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
+ user_id = state.user_id
+ is_mine = is_mine_fn(user_id)
+
+ new_state = handle_timeout(
+ state,
+ is_mine,
+ syncing_user_devices,
+ user_to_devices.get(user_id, {}),
+ now,
+ )
if new_state:
changes[state.user_id] = new_state
@@ -1901,14 +2020,19 @@ def handle_timeouts(
def handle_timeout(
- state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
+ state: UserPresenceState,
+ is_mine: bool,
+ syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]],
+ user_devices: Dict[Optional[str], UserDevicePresenceState],
+ now: int,
) -> Optional[UserPresenceState]:
"""Checks the presence of the user to see if any of the timers have elapsed
Args:
- state
+ state: UserPresenceState to check.
is_mine: Whether the user is ours
- syncing_user_ids: Set of user_ids with active syncs.
+ syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
+ user_devices: A map of device ID to UserDevicePresenceState.
now: Current time in ms.
Returns:
@@ -1919,34 +2043,55 @@ def handle_timeout(
return None
changed = False
- user_id = state.user_id
if is_mine:
- if state.state == PresenceState.ONLINE:
- if now - state.last_active_ts > IDLE_TIMER:
- # Currently online, but last activity ages ago so auto
- # idle
- state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
- changed = True
- elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
- # So that we send down a notification that we've
- # stopped updating.
+ # Check per-device whether the device should be considered idle or offline
+ # due to timeouts.
+ device_changed = False
+ offline_devices = []
+ for device_id, device_state in user_devices.items():
+ if device_state.state == PresenceState.ONLINE:
+ if now - device_state.last_active_ts > IDLE_TIMER:
+ # Currently online, but last activity ages ago so auto
+ # idle
+ device_state.state = PresenceState.UNAVAILABLE
+ device_changed = True
+
+ # If there are have been no sync for a while (and none ongoing),
+ # set presence to offline.
+ if (state.user_id, device_id) not in syncing_device_ids:
+ # If the user has done something recently but hasn't synced,
+ # don't set them as offline.
+ sync_or_active = max(
+ device_state.last_sync_ts, device_state.last_active_ts
+ )
+
+ if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
+ # Mark the device as going offline.
+ offline_devices.append(device_id)
+ device_changed = True
+
+ # Offline devices are not needed and do not add information.
+ for device_id in offline_devices:
+ user_devices.pop(device_id)
+
+ # If the presence state of the devices changed, then (maybe) update
+ # the user's overall presence state.
+ if device_changed:
+ new_presence = _combine_device_states(user_devices.values())
+ if new_presence != state.state:
+ state = state.copy_and_replace(state=new_presence)
changed = True
+ if now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+ # So that we send down a notification that we've
+ # stopped updating.
+ changed = True
+
if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
# Need to send ping to other servers to ensure they don't
# timeout and set us to offline
changed = True
-
- # If there are have been no sync for a while (and none ongoing),
- # set presence to offline
- if user_id not in syncing_user_ids:
- # If the user has done something recently but hasn't synced,
- # don't set them as offline.
- sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
- if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
- state = state.copy_and_replace(state=PresenceState.OFFLINE)
- changed = True
else:
# We expect to be poked occasionally by the other side.
# This is to protect against forgetful/buggy servers, so that
@@ -2036,6 +2181,46 @@ def handle_update(
return new_state, persist_and_notify, federation_ping
+PRESENCE_BY_PRIORITY = {
+ PresenceState.BUSY: 4,
+ PresenceState.ONLINE: 3,
+ PresenceState.UNAVAILABLE: 2,
+ PresenceState.OFFLINE: 1,
+}
+
+
+def _combine_device_states(
+ device_states: Iterable[UserDevicePresenceState],
+) -> str:
+ """
+ Find the device to use presence information from.
+
+ Orders devices by priority, then last_active_ts.
+
+ Args:
+ device_states: An iterable of device presence states
+
+ Return:
+ The combined presence state.
+ """
+
+ # Based on (all) the user's devices calculate the new presence state.
+ presence = PresenceState.OFFLINE
+ last_active_ts = -1
+
+ # Find the device to use the presence state of based on the presence priority,
+ # but tie-break with how recently the device has been seen.
+ for device_state in device_states:
+ if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > (
+ PRESENCE_BY_PRIORITY[presence],
+ last_active_ts,
+ ):
+ presence = device_state.state
+ last_active_ts = device_state.last_active_ts
+
+ return presence
+
+
async def get_interested_parties(
store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
|