diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cd7df0525f..11dff724e6 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -30,7 +30,6 @@ from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
- Awaitable,
Callable,
Collection,
Dict,
@@ -54,7 +53,10 @@ from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.replication.http.presence import (
ReplicationBumpPresenceActiveTime,
ReplicationPresenceSetState,
@@ -141,6 +143,8 @@ class BasePresenceHandler(abc.ABC):
self.state = hs.get_state_handler()
self.is_mine_id = hs.is_mine_id
+ self._presence_enabled = hs.config.server.use_presence
+
self._federation = None
if hs.should_send_federation():
self._federation = hs.get_federation_sender()
@@ -149,6 +153,15 @@ class BasePresenceHandler(abc.ABC):
self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
+ self.VALID_PRESENCE: Tuple[str, ...] = (
+ PresenceState.ONLINE,
+ PresenceState.UNAVAILABLE,
+ PresenceState.OFFLINE,
+ )
+
+ if self._busy_presence_enabled:
+ self.VALID_PRESENCE += (PresenceState.BUSY,)
+
active_presence = self.store.take_presence_startup_info()
self.user_to_current_state = {state.user_id: state for state in active_presence}
@@ -395,8 +408,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
self._presence_writer_instance = hs.config.worker.writers.presence[0]
- self._presence_enabled = hs.config.server.use_presence
-
# Route presence EDUs to the right worker
hs.get_federation_registry().register_instances_for_edu(
EduTypes.PRESENCE,
@@ -421,8 +432,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
)
- self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
-
hs.get_reactor().addSystemEventTrigger(
"before",
"shutdown",
@@ -490,7 +499,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
# what the spec wants: see comment in the BasePresenceHandler version
# of this function.
await self.set_state(
- UserID.from_string(user_id), {"presence": presence_state}, True
+ UserID.from_string(user_id),
+ {"presence": presence_state},
+ ignore_status_msg=True,
)
curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
@@ -601,22 +612,13 @@ class WorkerPresenceHandler(BasePresenceHandler):
"""
presence = state["presence"]
- valid_presence = (
- PresenceState.ONLINE,
- PresenceState.UNAVAILABLE,
- PresenceState.OFFLINE,
- PresenceState.BUSY,
- )
-
- if presence not in valid_presence or (
- presence == PresenceState.BUSY and not self._busy_presence_enabled
- ):
+ if presence not in self.VALID_PRESENCE:
raise SynapseError(400, "Invalid presence state")
user_id = target_user.to_string()
# If presence is disabled, no-op
- if not self.hs.config.server.use_presence:
+ if not self._presence_enabled:
return
# Proxy request to instance that writes presence
@@ -633,7 +635,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
with the app.
"""
# If presence is disabled, no-op
- if not self.hs.config.server.use_presence:
+ if not self._presence_enabled:
return
# Proxy request to instance that writes presence
@@ -649,7 +651,6 @@ class PresenceHandler(BasePresenceHandler):
self.hs = hs
self.wheel_timer: WheelTimer[str] = WheelTimer()
self.notifier = hs.get_notifier()
- self._presence_enabled = hs.config.server.use_presence
federation_registry = hs.get_federation_registry()
@@ -700,8 +701,6 @@ class PresenceHandler(BasePresenceHandler):
self._on_shutdown,
)
- self._next_serial = 1
-
# Keeps track of the number of *ongoing* syncs on this process. While
# this is non zero a user will never go offline.
self.user_to_num_current_syncs: Dict[str, int] = {}
@@ -723,21 +722,16 @@ class PresenceHandler(BasePresenceHandler):
# Start a LoopingCall in 30s that fires every 5s.
# The initial delay is to allow disconnected clients a chance to
# reconnect before we treat them as offline.
- def run_timeout_handler() -> Awaitable[None]:
- return run_as_background_process(
- "handle_presence_timeouts", self._handle_timeouts
- )
-
self.clock.call_later(
- 30, self.clock.looping_call, run_timeout_handler, 5000
+ 30, self.clock.looping_call, self._handle_timeouts, 5000
)
- def run_persister() -> Awaitable[None]:
- return run_as_background_process(
- "persist_presence_changes", self._persist_unpersisted_changes
- )
-
- self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
+ self.clock.call_later(
+ 60,
+ self.clock.looping_call,
+ self._persist_unpersisted_changes,
+ 60 * 1000,
+ )
LaterGauge(
"synapse_handlers_presence_wheel_timer_size",
@@ -783,6 +777,7 @@ class PresenceHandler(BasePresenceHandler):
)
logger.info("Finished _on_shutdown")
+ @wrap_as_background_process("persist_presence_changes")
async def _persist_unpersisted_changes(self) -> None:
"""We periodically persist the unpersisted changes, as otherwise they
may stack up and slow down shutdown times.
@@ -898,6 +893,7 @@ class PresenceHandler(BasePresenceHandler):
states, [destination]
)
+ @wrap_as_background_process("handle_presence_timeouts")
async def _handle_timeouts(self) -> None:
"""Checks the presence of users that have timed out and updates as
appropriate.
@@ -955,7 +951,7 @@ class PresenceHandler(BasePresenceHandler):
with the app.
"""
# If presence is disabled, no-op
- if not self.hs.config.server.use_presence:
+ if not self._presence_enabled:
return
user_id = user.to_string()
@@ -990,56 +986,51 @@ class PresenceHandler(BasePresenceHandler):
client that is being used by a user.
presence_state: The presence state indicated in the sync request
"""
- # Override if it should affect the user's presence, if presence is
- # disabled.
- if not self.hs.config.server.use_presence:
- affect_presence = False
+ if not affect_presence or not self._presence_enabled:
+ return _NullContextManager()
- if affect_presence:
- curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
- self.user_to_num_current_syncs[user_id] = curr_sync + 1
+ curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
+ self.user_to_num_current_syncs[user_id] = curr_sync + 1
- prev_state = await self.current_state_for_user(user_id)
+ prev_state = await self.current_state_for_user(user_id)
- # If they're busy then they don't stop being busy just by syncing,
- # so just update the last sync time.
- if prev_state.state != PresenceState.BUSY:
- # XXX: We set_state separately here and just update the last_active_ts above
- # This keeps the logic as similar as possible between the worker and single
- # process modes. Using set_state will actually cause last_active_ts to be
- # updated always, which is not what the spec calls for, but synapse has done
- # this for... forever, I think.
- await self.set_state(
- UserID.from_string(user_id), {"presence": presence_state}, True
- )
- # Retrieve the new state for the logic below. This should come from the
- # in-memory cache.
- prev_state = await self.current_state_for_user(user_id)
+ # If they're busy then they don't stop being busy just by syncing,
+ # so just update the last sync time.
+ if prev_state.state != PresenceState.BUSY:
+ # XXX: We set_state separately here and just update the last_active_ts above
+ # This keeps the logic as similar as possible between the worker and single
+ # process modes. Using set_state will actually cause last_active_ts to be
+ # updated always, which is not what the spec calls for, but synapse has done
+ # this for... forever, I think.
+ await self.set_state(
+ UserID.from_string(user_id),
+ {"presence": presence_state},
+ ignore_status_msg=True,
+ )
+ # Retrieve the new state for the logic below. This should come from the
+ # in-memory cache.
+ prev_state = await self.current_state_for_user(user_id)
- # To keep the single process behaviour consistent with worker mode, run the
- # same logic as `update_external_syncs_row`, even though it looks weird.
- if prev_state.state == PresenceState.OFFLINE:
- await self._update_states(
- [
- prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=self.clock.time_msec(),
- last_user_sync_ts=self.clock.time_msec(),
- )
- ]
- )
- # otherwise, set the new presence state & update the last sync time,
- # but don't update last_active_ts as this isn't an indication that
- # they've been active (even though it's probably been updated by
- # set_state above)
- else:
- await self._update_states(
- [
- prev_state.copy_and_replace(
- last_user_sync_ts=self.clock.time_msec()
- )
- ]
- )
+ # To keep the single process behaviour consistent with worker mode, run the
+ # same logic as `update_external_syncs_row`, even though it looks weird.
+ if prev_state.state == PresenceState.OFFLINE:
+ await self._update_states(
+ [
+ prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=self.clock.time_msec(),
+ last_user_sync_ts=self.clock.time_msec(),
+ )
+ ]
+ )
+ # otherwise, set the new presence state & update the last sync time,
+ # but don't update last_active_ts as this isn't an indication that
+ # they've been active (even though it's probably been updated by
+ # set_state above)
+ else:
+ await self._update_states(
+ [prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
+ )
async def _end() -> None:
try:
@@ -1061,8 +1052,7 @@ class PresenceHandler(BasePresenceHandler):
try:
yield
finally:
- if affect_presence:
- run_in_background(_end)
+ run_in_background(_end)
return _user_syncing()
@@ -1229,20 +1219,11 @@ class PresenceHandler(BasePresenceHandler):
status_msg = state.get("status_msg", None)
presence = state["presence"]
- valid_presence = (
- PresenceState.ONLINE,
- PresenceState.UNAVAILABLE,
- PresenceState.OFFLINE,
- PresenceState.BUSY,
- )
-
- if presence not in valid_presence or (
- presence == PresenceState.BUSY and not self._busy_presence_enabled
- ):
+ if presence not in self.VALID_PRESENCE:
raise SynapseError(400, "Invalid presence state")
# If presence is disabled, no-op
- if not self.hs.config.server.use_presence:
+ if not self._presence_enabled:
return
user_id = target_user.to_string()
|