summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/presence.py169
1 files changed, 75 insertions, 94 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cd7df0525f..11dff724e6 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -30,7 +30,6 @@ from types import TracebackType
 from typing import (
     TYPE_CHECKING,
     Any,
-    Awaitable,
     Callable,
     Collection,
     Dict,
@@ -54,7 +53,10 @@ from synapse.appservice import ApplicationService
 from synapse.events.presence_router import PresenceRouter
 from synapse.logging.context import run_in_background
 from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.replication.http.presence import (
     ReplicationBumpPresenceActiveTime,
     ReplicationPresenceSetState,
@@ -141,6 +143,8 @@ class BasePresenceHandler(abc.ABC):
         self.state = hs.get_state_handler()
         self.is_mine_id = hs.is_mine_id
 
+        self._presence_enabled = hs.config.server.use_presence
+
         self._federation = None
         if hs.should_send_federation():
             self._federation = hs.get_federation_sender()
@@ -149,6 +153,15 @@ class BasePresenceHandler(abc.ABC):
 
         self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
 
+        self.VALID_PRESENCE: Tuple[str, ...] = (
+            PresenceState.ONLINE,
+            PresenceState.UNAVAILABLE,
+            PresenceState.OFFLINE,
+        )
+
+        if self._busy_presence_enabled:
+            self.VALID_PRESENCE += (PresenceState.BUSY,)
+
         active_presence = self.store.take_presence_startup_info()
         self.user_to_current_state = {state.user_id: state for state in active_presence}
 
@@ -395,8 +408,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
 
         self._presence_writer_instance = hs.config.worker.writers.presence[0]
 
-        self._presence_enabled = hs.config.server.use_presence
-
         # Route presence EDUs to the right worker
         hs.get_federation_registry().register_instances_for_edu(
             EduTypes.PRESENCE,
@@ -421,8 +432,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
             self.send_stop_syncing, UPDATE_SYNCING_USERS_MS
         )
 
-        self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
-
         hs.get_reactor().addSystemEventTrigger(
             "before",
             "shutdown",
@@ -490,7 +499,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
             # what the spec wants: see comment in the BasePresenceHandler version
             # of this function.
             await self.set_state(
-                UserID.from_string(user_id), {"presence": presence_state}, True
+                UserID.from_string(user_id),
+                {"presence": presence_state},
+                ignore_status_msg=True,
             )
 
         curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
@@ -601,22 +612,13 @@ class WorkerPresenceHandler(BasePresenceHandler):
         """
         presence = state["presence"]
 
-        valid_presence = (
-            PresenceState.ONLINE,
-            PresenceState.UNAVAILABLE,
-            PresenceState.OFFLINE,
-            PresenceState.BUSY,
-        )
-
-        if presence not in valid_presence or (
-            presence == PresenceState.BUSY and not self._busy_presence_enabled
-        ):
+        if presence not in self.VALID_PRESENCE:
             raise SynapseError(400, "Invalid presence state")
 
         user_id = target_user.to_string()
 
         # If presence is disabled, no-op
-        if not self.hs.config.server.use_presence:
+        if not self._presence_enabled:
             return
 
         # Proxy request to instance that writes presence
@@ -633,7 +635,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
         with the app.
         """
         # If presence is disabled, no-op
-        if not self.hs.config.server.use_presence:
+        if not self._presence_enabled:
             return
 
         # Proxy request to instance that writes presence
@@ -649,7 +651,6 @@ class PresenceHandler(BasePresenceHandler):
         self.hs = hs
         self.wheel_timer: WheelTimer[str] = WheelTimer()
         self.notifier = hs.get_notifier()
-        self._presence_enabled = hs.config.server.use_presence
 
         federation_registry = hs.get_federation_registry()
 
@@ -700,8 +701,6 @@ class PresenceHandler(BasePresenceHandler):
             self._on_shutdown,
         )
 
-        self._next_serial = 1
-
         # Keeps track of the number of *ongoing* syncs on this process. While
         # this is non zero a user will never go offline.
         self.user_to_num_current_syncs: Dict[str, int] = {}
@@ -723,21 +722,16 @@ class PresenceHandler(BasePresenceHandler):
             # Start a LoopingCall in 30s that fires every 5s.
             # The initial delay is to allow disconnected clients a chance to
             # reconnect before we treat them as offline.
-            def run_timeout_handler() -> Awaitable[None]:
-                return run_as_background_process(
-                    "handle_presence_timeouts", self._handle_timeouts
-                )
-
             self.clock.call_later(
-                30, self.clock.looping_call, run_timeout_handler, 5000
+                30, self.clock.looping_call, self._handle_timeouts, 5000
             )
 
-            def run_persister() -> Awaitable[None]:
-                return run_as_background_process(
-                    "persist_presence_changes", self._persist_unpersisted_changes
-                )
-
-            self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
+            self.clock.call_later(
+                60,
+                self.clock.looping_call,
+                self._persist_unpersisted_changes,
+                60 * 1000,
+            )
 
         LaterGauge(
             "synapse_handlers_presence_wheel_timer_size",
@@ -783,6 +777,7 @@ class PresenceHandler(BasePresenceHandler):
             )
         logger.info("Finished _on_shutdown")
 
+    @wrap_as_background_process("persist_presence_changes")
     async def _persist_unpersisted_changes(self) -> None:
         """We periodically persist the unpersisted changes, as otherwise they
         may stack up and slow down shutdown times.
@@ -898,6 +893,7 @@ class PresenceHandler(BasePresenceHandler):
                         states, [destination]
                     )
 
+    @wrap_as_background_process("handle_presence_timeouts")
     async def _handle_timeouts(self) -> None:
         """Checks the presence of users that have timed out and updates as
         appropriate.
@@ -955,7 +951,7 @@ class PresenceHandler(BasePresenceHandler):
         with the app.
         """
         # If presence is disabled, no-op
-        if not self.hs.config.server.use_presence:
+        if not self._presence_enabled:
             return
 
         user_id = user.to_string()
@@ -990,56 +986,51 @@ class PresenceHandler(BasePresenceHandler):
                 client that is being used by a user.
             presence_state: The presence state indicated in the sync request
         """
-        # Override if it should affect the user's presence, if presence is
-        # disabled.
-        if not self.hs.config.server.use_presence:
-            affect_presence = False
+        if not affect_presence or not self._presence_enabled:
+            return _NullContextManager()
 
-        if affect_presence:
-            curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
-            self.user_to_num_current_syncs[user_id] = curr_sync + 1
+        curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
+        self.user_to_num_current_syncs[user_id] = curr_sync + 1
 
-            prev_state = await self.current_state_for_user(user_id)
+        prev_state = await self.current_state_for_user(user_id)
 
-            # If they're busy then they don't stop being busy just by syncing,
-            # so just update the last sync time.
-            if prev_state.state != PresenceState.BUSY:
-                # XXX: We set_state separately here and just update the last_active_ts above
-                # This keeps the logic as similar as possible between the worker and single
-                # process modes. Using set_state will actually cause last_active_ts to be
-                # updated always, which is not what the spec calls for, but synapse has done
-                # this for... forever, I think.
-                await self.set_state(
-                    UserID.from_string(user_id), {"presence": presence_state}, True
-                )
-                # Retrieve the new state for the logic below. This should come from the
-                # in-memory cache.
-                prev_state = await self.current_state_for_user(user_id)
+        # If they're busy then they don't stop being busy just by syncing,
+        # so just update the last sync time.
+        if prev_state.state != PresenceState.BUSY:
+            # XXX: We set_state separately here and just update the last_active_ts above
+            # This keeps the logic as similar as possible between the worker and single
+            # process modes. Using set_state will actually cause last_active_ts to be
+            # updated always, which is not what the spec calls for, but synapse has done
+            # this for... forever, I think.
+            await self.set_state(
+                UserID.from_string(user_id),
+                {"presence": presence_state},
+                ignore_status_msg=True,
+            )
+            # Retrieve the new state for the logic below. This should come from the
+            # in-memory cache.
+            prev_state = await self.current_state_for_user(user_id)
 
-            # To keep the single process behaviour consistent with worker mode, run the
-            # same logic as `update_external_syncs_row`, even though it looks weird.
-            if prev_state.state == PresenceState.OFFLINE:
-                await self._update_states(
-                    [
-                        prev_state.copy_and_replace(
-                            state=PresenceState.ONLINE,
-                            last_active_ts=self.clock.time_msec(),
-                            last_user_sync_ts=self.clock.time_msec(),
-                        )
-                    ]
-                )
-            # otherwise, set the new presence state & update the last sync time,
-            # but don't update last_active_ts as this isn't an indication that
-            # they've been active (even though it's probably been updated by
-            # set_state above)
-            else:
-                await self._update_states(
-                    [
-                        prev_state.copy_and_replace(
-                            last_user_sync_ts=self.clock.time_msec()
-                        )
-                    ]
-                )
+        # To keep the single process behaviour consistent with worker mode, run the
+        # same logic as `update_external_syncs_row`, even though it looks weird.
+        if prev_state.state == PresenceState.OFFLINE:
+            await self._update_states(
+                [
+                    prev_state.copy_and_replace(
+                        state=PresenceState.ONLINE,
+                        last_active_ts=self.clock.time_msec(),
+                        last_user_sync_ts=self.clock.time_msec(),
+                    )
+                ]
+            )
+        # otherwise, set the new presence state & update the last sync time,
+        # but don't update last_active_ts as this isn't an indication that
+        # they've been active (even though it's probably been updated by
+        # set_state above)
+        else:
+            await self._update_states(
+                [prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
+            )
 
         async def _end() -> None:
             try:
@@ -1061,8 +1052,7 @@ class PresenceHandler(BasePresenceHandler):
             try:
                 yield
             finally:
-                if affect_presence:
-                    run_in_background(_end)
+                run_in_background(_end)
 
         return _user_syncing()
 
@@ -1229,20 +1219,11 @@ class PresenceHandler(BasePresenceHandler):
         status_msg = state.get("status_msg", None)
         presence = state["presence"]
 
-        valid_presence = (
-            PresenceState.ONLINE,
-            PresenceState.UNAVAILABLE,
-            PresenceState.OFFLINE,
-            PresenceState.BUSY,
-        )
-
-        if presence not in valid_presence or (
-            presence == PresenceState.BUSY and not self._busy_presence_enabled
-        ):
+        if presence not in self.VALID_PRESENCE:
             raise SynapseError(400, "Invalid presence state")
 
         # If presence is disabled, no-op
-        if not self.hs.config.server.use_presence:
+        if not self._presence_enabled:
             return
 
         user_id = target_user.to_string()