diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e8e9db4b91..c395dcdb43 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -151,15 +151,13 @@ class BasePresenceHandler(abc.ABC):
self._federation_queue = PresenceFederationQueue(hs, self)
- self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
-
self.VALID_PRESENCE: Tuple[str, ...] = (
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
)
- if self._busy_presence_enabled:
+ if hs.config.experimental.msc3026_enabled:
self.VALID_PRESENCE += (PresenceState.BUSY,)
active_presence = self.store.take_presence_startup_info()
@@ -255,17 +253,19 @@ class BasePresenceHandler(abc.ABC):
self,
target_user: UserID,
state: JsonDict,
- ignore_status_msg: bool = False,
force_notify: bool = False,
+ is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
- ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
- If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
+ is_sync: True if this update was from a sync, which results in
+ *not* overriding a previously set BUSY status, updating the
+ user's last_user_sync_ts, and ignoring the "status_msg" field of
+ the `state` dict.
"""
@abc.abstractmethod
@@ -491,23 +491,18 @@ class WorkerPresenceHandler(BasePresenceHandler):
if not affect_presence or not self._presence_enabled:
return _NullContextManager()
- prev_state = await self.current_state_for_user(user_id)
- if prev_state.state != PresenceState.BUSY:
- # We set state here but pass ignore_status_msg = True as we don't want to
- # cause the status message to be cleared.
- # Note that this causes last_active_ts to be incremented which is not
- # what the spec wants: see comment in the BasePresenceHandler version
- # of this function.
- await self.set_state(
- UserID.from_string(user_id),
- {"presence": presence_state},
- ignore_status_msg=True,
- )
+ # Note that this causes last_active_ts to be incremented which is not
+ # what the spec wants.
+ await self.set_state(
+ UserID.from_string(user_id),
+ state={"presence": presence_state},
+ is_sync=True,
+ )
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
self._user_to_num_current_syncs[user_id] = curr_sync + 1
- # If we went from no in flight sync to some, notify replication
+ # If this is the first in-flight sync, notify replication
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)
@@ -518,7 +513,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
if user_id in self._user_to_num_current_syncs:
self._user_to_num_current_syncs[user_id] -= 1
- # If we went from one in flight sync to non, notify replication
+ # If there are no more in-flight syncs, notify replication
if self._user_to_num_current_syncs[user_id] == 0:
self.mark_as_going_offline(user_id)
@@ -598,17 +593,19 @@ class WorkerPresenceHandler(BasePresenceHandler):
self,
target_user: UserID,
state: JsonDict,
- ignore_status_msg: bool = False,
force_notify: bool = False,
+ is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
- ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
- If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
+ is_sync: True if this update was from a sync, which results in
+ *not* overriding a previously set BUSY status, updating the
+ user's last_user_sync_ts, and ignoring the "status_msg" field of
+ the `state` dict.
"""
presence = state["presence"]
@@ -626,8 +623,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
instance_name=self._presence_writer_instance,
user_id=user_id,
state=state,
- ignore_status_msg=ignore_status_msg,
force_notify=force_notify,
+ is_sync=is_sync,
)
async def bump_presence_active_time(self, user: UserID) -> None:
@@ -992,45 +989,13 @@ class PresenceHandler(BasePresenceHandler):
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
- prev_state = await self.current_state_for_user(user_id)
-
- # If they're busy then they don't stop being busy just by syncing,
- # so just update the last sync time.
- if prev_state.state != PresenceState.BUSY:
- # XXX: We set_state separately here and just update the last_active_ts above
- # This keeps the logic as similar as possible between the worker and single
- # process modes. Using set_state will actually cause last_active_ts to be
- # updated always, which is not what the spec calls for, but synapse has done
- # this for... forever, I think.
- await self.set_state(
- UserID.from_string(user_id),
- {"presence": presence_state},
- ignore_status_msg=True,
- )
- # Retrieve the new state for the logic below. This should come from the
- # in-memory cache.
- prev_state = await self.current_state_for_user(user_id)
-
- # To keep the single process behaviour consistent with worker mode, run the
- # same logic as `update_external_syncs_row`, even though it looks weird.
- if prev_state.state == PresenceState.OFFLINE:
- await self._update_states(
- [
- prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=self.clock.time_msec(),
- last_user_sync_ts=self.clock.time_msec(),
- )
- ]
- )
- # otherwise, set the new presence state & update the last sync time,
- # but don't update last_active_ts as this isn't an indication that
- # they've been active (even though it's probably been updated by
- # set_state above)
- else:
- await self._update_states(
- [prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
- )
+ # Note that this causes last_active_ts to be incremented which is not
+ # what the spec wants.
+ await self.set_state(
+ UserID.from_string(user_id),
+ state={"presence": presence_state},
+ is_sync=True,
+ )
async def _end() -> None:
try:
@@ -1080,32 +1045,27 @@ class PresenceHandler(BasePresenceHandler):
process_id, set()
)
- updates = []
+ # USER_SYNC is sent when a user starts or stops syncing on a remote
+ # process. (But only for the initial and last device.)
+ #
+ # When a user *starts* syncing it also calls set_state(...) which
+ # will update the state, last_active_ts, and last_user_sync_ts.
+ # Simply ensure the user is tracked as syncing in this case.
+ #
+ # When a user *stops* syncing, update the last_user_sync_ts and mark
+ # them as no longer syncing. Note this doesn't quite match the
+ # monolith behaviour, which updates last_user_sync_ts at the end of
+ # every sync, not just the last in-flight sync.
if is_syncing and user_id not in process_presence:
- if prev_state.state == PresenceState.OFFLINE:
- updates.append(
- prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=sync_time_msec,
- last_user_sync_ts=sync_time_msec,
- )
- )
- else:
- updates.append(
- prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
- )
process_presence.add(user_id)
- elif user_id in process_presence:
- updates.append(
- prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
+ elif not is_syncing and user_id in process_presence:
+ new_state = prev_state.copy_and_replace(
+ last_user_sync_ts=sync_time_msec
)
+ await self._update_states([new_state])
- if not is_syncing:
process_presence.discard(user_id)
- if updates:
- await self._update_states(updates)
-
self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
async def update_external_syncs_clear(self, process_id: str) -> None:
@@ -1204,17 +1164,19 @@ class PresenceHandler(BasePresenceHandler):
self,
target_user: UserID,
state: JsonDict,
- ignore_status_msg: bool = False,
force_notify: bool = False,
+ is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
- ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
- If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
+ is_sync: True if this update was from a sync, which results in
+ *not* overriding a previously set BUSY status, updating the
+ user's last_user_sync_ts, and ignoring the "status_msg" field of
+ the `state` dict.
"""
status_msg = state.get("status_msg", None)
presence = state["presence"]
@@ -1227,18 +1189,27 @@ class PresenceHandler(BasePresenceHandler):
return
user_id = target_user.to_string()
+ now = self.clock.time_msec()
prev_state = await self.current_state_for_user(user_id)
+ # Syncs do not override a previous presence of busy.
+ #
+ # TODO: This is a hack for lack of multi-device support. Unfortunately
+ # removing this requires coordination with clients.
+ if prev_state.state == PresenceState.BUSY and is_sync:
+ presence = PresenceState.BUSY
+
new_fields = {"state": presence}
- if not ignore_status_msg:
- new_fields["status_msg"] = status_msg
+ if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
+ new_fields["last_active_ts"] = now
- if presence == PresenceState.ONLINE or (
- presence == PresenceState.BUSY and self._busy_presence_enabled
- ):
- new_fields["last_active_ts"] = self.clock.time_msec()
+ if is_sync:
+ new_fields["last_user_sync_ts"] = now
+ else:
+ # Syncs do not override the status message.
+ new_fields["status_msg"] = status_msg
await self._update_states(
[prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py
index db16aac9c2..a24fb9310b 100644
--- a/synapse/replication/http/presence.py
+++ b/synapse/replication/http/presence.py
@@ -73,8 +73,8 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
{
"state": { ... },
- "ignore_status_msg": false,
- "force_notify": false
+ "force_notify": false,
+ "is_sync": false
}
200 OK
@@ -96,13 +96,13 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
async def _serialize_payload( # type: ignore[override]
user_id: str,
state: JsonDict,
- ignore_status_msg: bool = False,
force_notify: bool = False,
+ is_sync: bool = False,
) -> JsonDict:
return {
"state": state,
- "ignore_status_msg": ignore_status_msg,
"force_notify": force_notify,
+ "is_sync": is_sync,
}
async def _handle_request( # type: ignore[override]
@@ -111,8 +111,8 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
await self._presence_handler.set_state(
UserID.from_string(user_id),
content["state"],
- content["ignore_status_msg"],
content["force_notify"],
+ content.get("is_sync", False),
)
return (200, {})
|