summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/16170.misc1
-rw-r--r--synapse/handlers/presence.py155
-rw-r--r--synapse/replication/http/presence.py10
-rw-r--r--tests/handlers/test_presence.py37
4 files changed, 99 insertions, 104 deletions
diff --git a/changelog.d/16170.misc b/changelog.d/16170.misc
new file mode 100644
index 0000000000..c950b54367
--- /dev/null
+++ b/changelog.d/16170.misc
@@ -0,0 +1 @@
+Simplify presence code when using workers.
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index e8e9db4b91..c395dcdb43 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -151,15 +151,13 @@ class BasePresenceHandler(abc.ABC):
 
         self._federation_queue = PresenceFederationQueue(hs, self)
 
-        self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
-
         self.VALID_PRESENCE: Tuple[str, ...] = (
             PresenceState.ONLINE,
             PresenceState.UNAVAILABLE,
             PresenceState.OFFLINE,
         )
 
-        if self._busy_presence_enabled:
+        if hs.config.experimental.msc3026_enabled:
             self.VALID_PRESENCE += (PresenceState.BUSY,)
 
         active_presence = self.store.take_presence_startup_info()
@@ -255,17 +253,19 @@ class BasePresenceHandler(abc.ABC):
         self,
         target_user: UserID,
         state: JsonDict,
-        ignore_status_msg: bool = False,
         force_notify: bool = False,
+        is_sync: bool = False,
     ) -> None:
         """Set the presence state of the user.
 
         Args:
             target_user: The ID of the user to set the presence state of.
             state: The presence state as a JSON dictionary.
-            ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
-                If False, the user's current status will be updated.
             force_notify: Whether to force notification of the update to clients.
+            is_sync: True if this update was from a sync, which results in
+                *not* overriding a previously set BUSY status, updating the
+                user's last_user_sync_ts, and ignoring the "status_msg" field of
+                the `state` dict.
         """
 
     @abc.abstractmethod
@@ -491,23 +491,18 @@ class WorkerPresenceHandler(BasePresenceHandler):
         if not affect_presence or not self._presence_enabled:
             return _NullContextManager()
 
-        prev_state = await self.current_state_for_user(user_id)
-        if prev_state.state != PresenceState.BUSY:
-            # We set state here but pass ignore_status_msg = True as we don't want to
-            # cause the status message to be cleared.
-            # Note that this causes last_active_ts to be incremented which is not
-            # what the spec wants: see comment in the BasePresenceHandler version
-            # of this function.
-            await self.set_state(
-                UserID.from_string(user_id),
-                {"presence": presence_state},
-                ignore_status_msg=True,
-            )
+        # Note that this causes last_active_ts to be incremented which is not
+        # what the spec wants.
+        await self.set_state(
+            UserID.from_string(user_id),
+            state={"presence": presence_state},
+            is_sync=True,
+        )
 
         curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
         self._user_to_num_current_syncs[user_id] = curr_sync + 1
 
-        # If we went from no in flight sync to some, notify replication
+        # If this is the first in-flight sync, notify replication
         if self._user_to_num_current_syncs[user_id] == 1:
             self.mark_as_coming_online(user_id)
 
@@ -518,7 +513,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
             if user_id in self._user_to_num_current_syncs:
                 self._user_to_num_current_syncs[user_id] -= 1
 
-                # If we went from one in flight sync to non, notify replication
+                # If there are no more in-flight syncs, notify replication
                 if self._user_to_num_current_syncs[user_id] == 0:
                     self.mark_as_going_offline(user_id)
 
@@ -598,17 +593,19 @@ class WorkerPresenceHandler(BasePresenceHandler):
         self,
         target_user: UserID,
         state: JsonDict,
-        ignore_status_msg: bool = False,
         force_notify: bool = False,
+        is_sync: bool = False,
     ) -> None:
         """Set the presence state of the user.
 
         Args:
             target_user: The ID of the user to set the presence state of.
             state: The presence state as a JSON dictionary.
-            ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
-                If False, the user's current status will be updated.
             force_notify: Whether to force notification of the update to clients.
+            is_sync: True if this update was from a sync, which results in
+                *not* overriding a previously set BUSY status, updating the
+                user's last_user_sync_ts, and ignoring the "status_msg" field of
+                the `state` dict.
         """
         presence = state["presence"]
 
@@ -626,8 +623,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
             instance_name=self._presence_writer_instance,
             user_id=user_id,
             state=state,
-            ignore_status_msg=ignore_status_msg,
             force_notify=force_notify,
+            is_sync=is_sync,
         )
 
     async def bump_presence_active_time(self, user: UserID) -> None:
@@ -992,45 +989,13 @@ class PresenceHandler(BasePresenceHandler):
         curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
         self.user_to_num_current_syncs[user_id] = curr_sync + 1
 
-        prev_state = await self.current_state_for_user(user_id)
-
-        # If they're busy then they don't stop being busy just by syncing,
-        # so just update the last sync time.
-        if prev_state.state != PresenceState.BUSY:
-            # XXX: We set_state separately here and just update the last_active_ts above
-            # This keeps the logic as similar as possible between the worker and single
-            # process modes. Using set_state will actually cause last_active_ts to be
-            # updated always, which is not what the spec calls for, but synapse has done
-            # this for... forever, I think.
-            await self.set_state(
-                UserID.from_string(user_id),
-                {"presence": presence_state},
-                ignore_status_msg=True,
-            )
-            # Retrieve the new state for the logic below. This should come from the
-            # in-memory cache.
-            prev_state = await self.current_state_for_user(user_id)
-
-        # To keep the single process behaviour consistent with worker mode, run the
-        # same logic as `update_external_syncs_row`, even though it looks weird.
-        if prev_state.state == PresenceState.OFFLINE:
-            await self._update_states(
-                [
-                    prev_state.copy_and_replace(
-                        state=PresenceState.ONLINE,
-                        last_active_ts=self.clock.time_msec(),
-                        last_user_sync_ts=self.clock.time_msec(),
-                    )
-                ]
-            )
-        # otherwise, set the new presence state & update the last sync time,
-        # but don't update last_active_ts as this isn't an indication that
-        # they've been active (even though it's probably been updated by
-        # set_state above)
-        else:
-            await self._update_states(
-                [prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
-            )
+        # Note that this causes last_active_ts to be incremented which is not
+        # what the spec wants.
+        await self.set_state(
+            UserID.from_string(user_id),
+            state={"presence": presence_state},
+            is_sync=True,
+        )
 
         async def _end() -> None:
             try:
@@ -1080,32 +1045,27 @@ class PresenceHandler(BasePresenceHandler):
                 process_id, set()
             )
 
-            updates = []
+            # USER_SYNC is sent when a user starts or stops syncing on a remote
+            # process. (But only for the initial and last device.)
+            #
+            # When a user *starts* syncing it also calls set_state(...) which
+            # will update the state, last_active_ts, and last_user_sync_ts.
+            # Simply ensure the user is tracked as syncing in this case.
+            #
+            # When a user *stops* syncing, update the last_user_sync_ts and mark
+            # them as no longer syncing. Note this doesn't quite match the
+            # monolith behaviour, which updates last_user_sync_ts at the end of
+            # every sync, not just the last in-flight sync.
             if is_syncing and user_id not in process_presence:
-                if prev_state.state == PresenceState.OFFLINE:
-                    updates.append(
-                        prev_state.copy_and_replace(
-                            state=PresenceState.ONLINE,
-                            last_active_ts=sync_time_msec,
-                            last_user_sync_ts=sync_time_msec,
-                        )
-                    )
-                else:
-                    updates.append(
-                        prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
-                    )
                 process_presence.add(user_id)
-            elif user_id in process_presence:
-                updates.append(
-                    prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
+            elif not is_syncing and user_id in process_presence:
+                new_state = prev_state.copy_and_replace(
+                    last_user_sync_ts=sync_time_msec
                 )
+                await self._update_states([new_state])
 
-            if not is_syncing:
                 process_presence.discard(user_id)
 
-            if updates:
-                await self._update_states(updates)
-
             self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
 
     async def update_external_syncs_clear(self, process_id: str) -> None:
@@ -1204,17 +1164,19 @@ class PresenceHandler(BasePresenceHandler):
         self,
         target_user: UserID,
         state: JsonDict,
-        ignore_status_msg: bool = False,
         force_notify: bool = False,
+        is_sync: bool = False,
     ) -> None:
         """Set the presence state of the user.
 
         Args:
             target_user: The ID of the user to set the presence state of.
             state: The presence state as a JSON dictionary.
-            ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
-                If False, the user's current status will be updated.
             force_notify: Whether to force notification of the update to clients.
+            is_sync: True if this update was from a sync, which results in
+                *not* overriding a previously set BUSY status, updating the
+                user's last_user_sync_ts, and ignoring the "status_msg" field of
+                the `state` dict.
         """
         status_msg = state.get("status_msg", None)
         presence = state["presence"]
@@ -1227,18 +1189,27 @@ class PresenceHandler(BasePresenceHandler):
             return
 
         user_id = target_user.to_string()
+        now = self.clock.time_msec()
 
         prev_state = await self.current_state_for_user(user_id)
 
+        # Syncs do not override a previous presence of busy.
+        #
+        # TODO: This is a hack for lack of multi-device support. Unfortunately
+        # removing this requires coordination with clients.
+        if prev_state.state == PresenceState.BUSY and is_sync:
+            presence = PresenceState.BUSY
+
         new_fields = {"state": presence}
 
-        if not ignore_status_msg:
-            new_fields["status_msg"] = status_msg
+        if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
+            new_fields["last_active_ts"] = now
 
-        if presence == PresenceState.ONLINE or (
-            presence == PresenceState.BUSY and self._busy_presence_enabled
-        ):
-            new_fields["last_active_ts"] = self.clock.time_msec()
+        if is_sync:
+            new_fields["last_user_sync_ts"] = now
+        else:
+            # Syncs do not override the status message.
+            new_fields["status_msg"] = status_msg
 
         await self._update_states(
             [prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py
index db16aac9c2..a24fb9310b 100644
--- a/synapse/replication/http/presence.py
+++ b/synapse/replication/http/presence.py
@@ -73,8 +73,8 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
 
         {
             "state": { ... },
-            "ignore_status_msg": false,
-            "force_notify": false
+            "force_notify": false,
+            "is_sync": false
         }
 
         200 OK
@@ -96,13 +96,13 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
     async def _serialize_payload(  # type: ignore[override]
         user_id: str,
         state: JsonDict,
-        ignore_status_msg: bool = False,
         force_notify: bool = False,
+        is_sync: bool = False,
     ) -> JsonDict:
         return {
             "state": state,
-            "ignore_status_msg": ignore_status_msg,
             "force_notify": force_notify,
+            "is_sync": is_sync,
         }
 
     async def _handle_request(  # type: ignore[override]
@@ -111,8 +111,8 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
         await self._presence_handler.set_state(
             UserID.from_string(user_id),
             content["state"],
-            content["ignore_status_msg"],
             content["force_notify"],
+            content.get("is_sync", False),
         )
 
         return (200, {})
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 1aebcc16ad..a3fdcf7f93 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -641,13 +641,20 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
         """Test that if an external process doesn't update the records for a while
         we time out their syncing users presence.
         """
-        process_id = "1"
 
-        # Notify handler that a user is now syncing.
+        # Create a worker and use it to handle /sync traffic instead.
+        # This is used to test that presence changes get replicated from workers
+        # to the main process correctly.
+        worker_to_sync_against = self.make_worker_hs(
+            "synapse.app.generic_worker", {"worker_name": "synchrotron"}
+        )
+        worker_presence_handler = worker_to_sync_against.get_presence_handler()
+
         self.get_success(
-            self.presence_handler.update_external_syncs_row(
-                process_id, self.user_id, True, self.clock.time_msec()
-            )
+            worker_presence_handler.user_syncing(
+                self.user_id, True, PresenceState.ONLINE
+            ),
+            by=0.1,
         )
 
         # Check that if we wait a while without telling the handler the user has
@@ -820,7 +827,7 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
             # This is used to test that presence changes get replicated from workers
             # to the main process correctly.
             worker_to_sync_against = self.make_worker_hs(
-                "synapse.app.generic_worker", {"worker_name": "presence_writer"}
+                "synapse.app.generic_worker", {"worker_name": "synchrotron"}
             )
 
         # Set presence to BUSY
@@ -832,7 +839,8 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
         self.get_success(
             worker_to_sync_against.get_presence_handler().user_syncing(
                 self.user_id, True, PresenceState.ONLINE
-            )
+            ),
+            by=0.1,
         )
 
         # Check against the main process that the user's presence did not change.
@@ -840,6 +848,21 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
         # we should still be busy
         self.assertEqual(state.state, PresenceState.BUSY)
 
+        # Advance such that the device would be discarded if it was not busy,
+        # then pump so _handle_timeouts function to called.
+        self.reactor.advance(IDLE_TIMER / 1000)
+        self.reactor.pump([5])
+
+        # The account should still be busy.
+        state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
+        self.assertEqual(state.state, PresenceState.BUSY)
+
+        # Ensure that a /presence call can set the user *off* busy.
+        self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
+
+        state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
+        self.assertEqual(state.state, PresenceState.ONLINE)
+
     def _set_presencestate_with_status_msg(
         self, state: str, status_msg: Optional[str]
     ) -> None: