summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/presence.py136
-rw-r--r--synapse/module_api/__init__.py63
-rw-r--r--synapse/replication/http/presence.py11
-rw-r--r--synapse/rest/admin/server_notice_servlet.py8
-rw-r--r--synapse/storage/databases/main/presence.py58
-rw-r--r--synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql34
6 files changed, 245 insertions, 65 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 6fd1f34289..f5a049d754 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -222,9 +222,21 @@ class BasePresenceHandler(abc.ABC):
 
     @abc.abstractmethod
     async def set_state(
-        self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
+        self,
+        target_user: UserID,
+        state: JsonDict,
+        ignore_status_msg: bool = False,
+        force_notify: bool = False,
     ) -> None:
-        """Set the presence state of the user. """
+        """Set the presence state of the user.
+
+        Args:
+            target_user: The ID of the user to set the presence state of.
+            state: The presence state as a JSON dictionary.
+            ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
+                If False, the user's current status will be updated.
+            force_notify: Whether to force notification of the update to clients.
+        """
 
     @abc.abstractmethod
     async def bump_presence_active_time(self, user: UserID):
@@ -296,6 +308,51 @@ class BasePresenceHandler(abc.ABC):
         for destinations, states in hosts_and_states:
             self._federation.send_presence_to_destinations(states, destinations)
 
+    async def send_full_presence_to_users(self, user_ids: Collection[str]):
+        """
+        Adds to the list of users who should receive a full snapshot of presence
+        upon their next sync. Note that this only works for local users.
+
+        Then, grabs the current presence state for a given set of users and adds it
+        to the top of the presence stream.
+
+        Args:
+            user_ids: The IDs of the local users to send full presence to.
+        """
+        # Retrieve one of the users from the given set
+        if not user_ids:
+            raise Exception(
+                "send_full_presence_to_users must be called with at least one user"
+            )
+        user_id = next(iter(user_ids))
+
+        # Mark all users as receiving full presence on their next sync
+        await self.store.add_users_to_send_full_presence_to(user_ids)
+
+        # Add a new entry to the presence stream. Since we use stream tokens to determine whether a
+        # local user should receive a full snapshot of presence when they sync, we need to bump the
+        # presence stream so that subsequent syncs with no presence activity in between won't result
+        # in the client receiving multiple full snapshots of presence.
+        #
+        # If we bump the stream ID, then the user will get a higher stream token next sync, and thus
+        # correctly won't receive a second snapshot.
+
+        # Get the current presence state for one of the users (defaults to offline if not found)
+        current_presence_state = await self.get_state(UserID.from_string(user_id))
+
+        # Convert the UserPresenceState object into a serializable dict
+        state = {
+            "presence": current_presence_state.state,
+            "status_message": current_presence_state.status_msg,
+        }
+
+        # Copy the presence state to the tip of the presence stream.
+
+        # We set force_notify=True here so that this presence update is guaranteed to
+        # increment the presence stream ID (which resending the current user's presence
+        # otherwise would not do).
+        await self.set_state(UserID.from_string(user_id), state, force_notify=True)
+
 
 class _NullContextManager(ContextManager[None]):
     """A context manager which does nothing."""
@@ -480,8 +537,17 @@ class WorkerPresenceHandler(BasePresenceHandler):
         target_user: UserID,
         state: JsonDict,
         ignore_status_msg: bool = False,
+        force_notify: bool = False,
     ) -> None:
-        """Set the presence state of the user."""
+        """Set the presence state of the user.
+
+        Args:
+            target_user: The ID of the user to set the presence state of.
+            state: The presence state as a JSON dictionary.
+            ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
+                If False, the user's current status will be updated.
+            force_notify: Whether to force notification of the update to clients.
+        """
         presence = state["presence"]
 
         valid_presence = (
@@ -508,6 +574,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
             user_id=user_id,
             state=state,
             ignore_status_msg=ignore_status_msg,
+            force_notify=force_notify,
         )
 
     async def bump_presence_active_time(self, user: UserID) -> None:
@@ -677,13 +744,19 @@ class PresenceHandler(BasePresenceHandler):
                 [self.user_to_current_state[user_id] for user_id in unpersisted]
             )
 
-    async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None:
+    async def _update_states(
+        self, new_states: Iterable[UserPresenceState], force_notify: bool = False
+    ) -> None:
         """Updates presence of users. Sets the appropriate timeouts. Pokes
         the notifier and federation if and only if the changed presence state
         should be sent to clients/servers.
 
         Args:
             new_states: The new user presence state updates to process.
+            force_notify: Whether to force notifying clients of this presence state update,
+                even if it doesn't change the state of a user's presence (e.g online -> online).
+                This is currently used to bump the max presence stream ID without changing any
+                user's presence (see PresenceHandler.add_users_to_send_full_presence_to).
         """
         now = self.clock.time_msec()
 
@@ -720,6 +793,9 @@ class PresenceHandler(BasePresenceHandler):
                     now=now,
                 )
 
+                if force_notify:
+                    should_notify = True
+
                 self.user_to_current_state[user_id] = new_state
 
                 if should_notify:
@@ -1058,9 +1134,21 @@ class PresenceHandler(BasePresenceHandler):
             await self._update_states(updates)
 
     async def set_state(
-        self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
+        self,
+        target_user: UserID,
+        state: JsonDict,
+        ignore_status_msg: bool = False,
+        force_notify: bool = False,
     ) -> None:
-        """Set the presence state of the user."""
+        """Set the presence state of the user.
+
+        Args:
+            target_user: The ID of the user to set the presence state of.
+            state: The presence state as a JSON dictionary.
+            ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
+                If False, the user's current status will be updated.
+            force_notify: Whether to force notification of the update to clients.
+        """
         status_msg = state.get("status_msg", None)
         presence = state["presence"]
 
@@ -1091,7 +1179,9 @@ class PresenceHandler(BasePresenceHandler):
         ):
             new_fields["last_active_ts"] = self.clock.time_msec()
 
-        await self._update_states([prev_state.copy_and_replace(**new_fields)])
+        await self._update_states(
+            [prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
+        )
 
     async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
         """Returns whether a user can see another user's presence."""
@@ -1389,11 +1479,10 @@ class PresenceEventSource:
         #
         #   Presence -> Notifier -> PresenceEventSource -> Presence
         #
-        # Same with get_module_api, get_presence_router
+        # Same with get_presence_router:
         #
         #   AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
         self.get_presence_handler = hs.get_presence_handler
-        self.get_module_api = hs.get_module_api
         self.get_presence_router = hs.get_presence_router
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
@@ -1424,16 +1513,21 @@ class PresenceEventSource:
         stream_change_cache = self.store.presence_stream_cache
 
         with Measure(self.clock, "presence.get_new_events"):
-            if user_id in self.get_module_api()._send_full_presence_to_local_users:
-                # This user has been specified by a module to receive all current, online
-                # user presence. Removing from_key and setting include_offline to false
-                # will do effectively this.
-                from_key = None
-                include_offline = False
-
             if from_key is not None:
                 from_key = int(from_key)
 
+                # Check if this user should receive all current, online user presence. We only
+                # bother to do this if from_key is set, as otherwise the user will receive all
+                # user presence anyways.
+                if await self.store.should_user_receive_full_presence_with_token(
+                    user_id, from_key
+                ):
+                    # This user has been specified by a module to receive all current, online
+                    # user presence. Removing from_key and setting include_offline to false
+                    # will do effectively this.
+                    from_key = None
+                    include_offline = False
+
             max_token = self.store.get_current_presence_token()
             if from_key == max_token:
                 # This is necessary as due to the way stream ID generators work
@@ -1467,12 +1561,6 @@ class PresenceEventSource:
                     user_id, include_offline, from_key
                 )
 
-                # Remove the user from the list of users to receive all presence
-                if user_id in self.get_module_api()._send_full_presence_to_local_users:
-                    self.get_module_api()._send_full_presence_to_local_users.remove(
-                        user_id
-                    )
-
                 return presence_updates, max_token
 
             # Make mypy happy. users_interested_in should now be a set
@@ -1522,10 +1610,6 @@ class PresenceEventSource:
             )
             presence_updates = list(users_to_state.values())
 
-        # Remove the user from the list of users to receive all presence
-        if user_id in self.get_module_api()._send_full_presence_to_local_users:
-            self.get_module_api()._send_full_presence_to_local_users.remove(user_id)
-
         if not include_offline:
             # Filter out offline presence states
             presence_updates = self._filter_offline_presence_state(presence_updates)
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index a1a2b9aecc..cecdc96bf5 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -56,14 +56,6 @@ class ModuleApi:
         self._http_client = hs.get_simple_http_client()  # type: SimpleHttpClient
         self._public_room_list_manager = PublicRoomListManager(hs)
 
-        # The next time these users sync, they will receive the current presence
-        # state of all local users. Users are added by send_local_online_presence_to,
-        # and removed after a successful sync.
-        #
-        # We make this a private variable to deter modules from accessing it directly,
-        # though other classes in Synapse will still do so.
-        self._send_full_presence_to_local_users = set()
-
     @property
     def http_client(self):
         """Allows making outbound HTTP requests to remote resources.
@@ -405,39 +397,44 @@ class ModuleApi:
         Updates to remote users will be sent immediately, whereas local users will receive
         them on their next sync attempt.
 
-        Note that this method can only be run on the main or federation_sender worker
-        processes.
+        Note that this method can only be run on the process that is configured to write to the
+        presence stream. By default this is the main process.
         """
-        if not self._hs.should_send_federation():
+        if self._hs._instance_name not in self._hs.config.worker.writers.presence:
             raise Exception(
                 "send_local_online_presence_to can only be run "
-                "on processes that send federation",
+                "on the process that is configured to write to the "
+                "presence stream (by default this is the main process)",
             )
 
+        local_users = set()
+        remote_users = set()
         for user in users:
             if self._hs.is_mine_id(user):
-                # Modify SyncHandler._generate_sync_entry_for_presence to call
-                # presence_source.get_new_events with an empty `from_key` if
-                # that user's ID were in a list modified by ModuleApi somewhere.
-                # That user would then get all presence state on next incremental sync.
-
-                # Force a presence initial_sync for this user next time
-                self._send_full_presence_to_local_users.add(user)
+                local_users.add(user)
             else:
-                # Retrieve presence state for currently online users that this user
-                # is considered interested in
-                presence_events, _ = await self._presence_stream.get_new_events(
-                    UserID.from_string(user), from_key=None, include_offline=False
-                )
-
-                # Send to remote destinations.
-
-                # We pull out the presence handler here to break a cyclic
-                # dependency between the presence router and module API.
-                presence_handler = self._hs.get_presence_handler()
-                await presence_handler.maybe_send_presence_to_interested_destinations(
-                    presence_events
-                )
+                remote_users.add(user)
+
+        # We pull out the presence handler here to break a cyclic
+        # dependency between the presence router and module API.
+        presence_handler = self._hs.get_presence_handler()
+
+        if local_users:
+            # Force a presence initial_sync for these users next time they sync.
+            await presence_handler.send_full_presence_to_users(local_users)
+
+        for user in remote_users:
+            # Retrieve presence state for currently online users that this user
+            # is considered interested in.
+            presence_events, _ = await self._presence_stream.get_new_events(
+                UserID.from_string(user), from_key=None, include_offline=False
+            )
+
+            # Send to remote destinations.
+            destination = UserID.from_string(user).domain
+            presence_handler.get_federation_queue().send_presence_to_destinations(
+                presence_events, destination
+            )
 
 
 class PublicRoomListManager:
diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py
index f25307620d..bb00247953 100644
--- a/synapse/replication/http/presence.py
+++ b/synapse/replication/http/presence.py
@@ -73,6 +73,7 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
         {
             "state": { ... },
             "ignore_status_msg": false,
+            "force_notify": false
         }
 
         200 OK
@@ -91,17 +92,23 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
         self._presence_handler = hs.get_presence_handler()
 
     @staticmethod
-    async def _serialize_payload(user_id, state, ignore_status_msg=False):
+    async def _serialize_payload(
+        user_id, state, ignore_status_msg=False, force_notify=False
+    ):
         return {
             "state": state,
             "ignore_status_msg": ignore_status_msg,
+            "force_notify": force_notify,
         }
 
     async def _handle_request(self, request, user_id):
         content = parse_json_object_from_request(request)
 
         await self._presence_handler.set_state(
-            UserID.from_string(user_id), content["state"], content["ignore_status_msg"]
+            UserID.from_string(user_id),
+            content["state"],
+            content["ignore_status_msg"],
+            content["force_notify"],
         )
 
         return (
diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py
index cc3ab5854b..b5e4c474ef 100644
--- a/synapse/rest/admin/server_notice_servlet.py
+++ b/synapse/rest/admin/server_notice_servlet.py
@@ -54,7 +54,6 @@ class SendServerNoticeServlet(RestServlet):
         self.hs = hs
         self.auth = hs.get_auth()
         self.txns = HttpTransactionCache(hs)
-        self.snm = hs.get_server_notices_manager()
 
     def register(self, json_resource: HttpServer):
         PATTERN = "/send_server_notice"
@@ -77,7 +76,10 @@ class SendServerNoticeServlet(RestServlet):
         event_type = body.get("type", EventTypes.Message)
         state_key = body.get("state_key")
 
-        if not self.snm.is_enabled():
+        # We grab the server notices manager here as its initialisation has a check for worker processes,
+        # but worker processes still need to initialise SendServerNoticeServlet (as it is part of the
+        # admin api).
+        if not self.hs.get_server_notices_manager().is_enabled():
             raise SynapseError(400, "Server notices are not enabled on this server")
 
         user_id = body["user_id"]
@@ -85,7 +87,7 @@ class SendServerNoticeServlet(RestServlet):
         if not self.hs.is_mine_id(user_id):
             raise SynapseError(400, "Server notices can only be sent to local users")
 
-        event = await self.snm.send_notice(
+        event = await self.hs.get_server_notices_manager().send_notice(
             user_id=body["user_id"],
             type=event_type,
             state_key=state_key,
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index db22fab23e..669a2af884 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import TYPE_CHECKING, Dict, List, Tuple
+from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
 
 from synapse.api.presence import PresenceState, UserPresenceState
 from synapse.replication.tcp.streams import PresenceStream
@@ -57,6 +57,7 @@ class PresenceStore(SQLBaseStore):
                 db_conn, "presence_stream", "stream_id"
             )
 
+        self.hs = hs
         self._presence_on_startup = self._get_active_presence(db_conn)
 
         presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
@@ -210,6 +211,61 @@ class PresenceStore(SQLBaseStore):
 
         return {row["user_id"]: UserPresenceState(**row) for row in rows}
 
+    async def should_user_receive_full_presence_with_token(
+        self,
+        user_id: str,
+        from_token: int,
+    ) -> bool:
+        """Check whether the given user should receive full presence using the stream token
+        they're updating from.
+
+        Args:
+            user_id: The ID of the user to check.
+            from_token: The stream token included in their /sync token.
+
+        Returns:
+            True if the user should have full presence sent to them, False otherwise.
+        """
+
+        def _should_user_receive_full_presence_with_token_txn(txn):
+            sql = """
+                SELECT 1 FROM users_to_send_full_presence_to
+                WHERE user_id = ?
+                AND presence_stream_id >= ?
+            """
+            txn.execute(sql, (user_id, from_token))
+            return bool(txn.fetchone())
+
+        return await self.db_pool.runInteraction(
+            "should_user_receive_full_presence_with_token",
+            _should_user_receive_full_presence_with_token_txn,
+        )
+
+    async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]):
+        """Adds to the list of users who should receive a full snapshot of presence
+        upon their next sync.
+
+        Args:
+            user_ids: An iterable of user IDs.
+        """
+        # Add user entries to the table, updating the presence_stream_id column if the user already
+        # exists in the table.
+        await self.db_pool.simple_upsert_many(
+            table="users_to_send_full_presence_to",
+            key_names=("user_id",),
+            key_values=[(user_id,) for user_id in user_ids],
+            value_names=("presence_stream_id",),
+            # We save the current presence stream ID token along with the user ID entry so
+            # that when a user /sync's, even if they syncing multiple times across separate
+            # devices at different times, each device will receive full presence once - when
+            # the presence stream ID in their sync token is less than the one in the table
+            # for their user ID.
+            value_values=(
+                (self._presence_id_gen.get_current_token(),) for _ in user_ids
+            ),
+            desc="add_users_to_send_full_presence_to",
+        )
+
     async def get_presence_for_all_users(
         self,
         include_offline: bool = True,
diff --git a/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql
new file mode 100644
index 0000000000..07b0f53ecf
--- /dev/null
+++ b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql
@@ -0,0 +1,34 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a table that keeps track of a list of users who should, upon their next
+-- sync request, receive presence for all currently online users that they are
+-- "interested" in.
+
+-- The motivation for a DB table over an in-memory list is so that this list
+-- can be added to and retrieved from by any worker. Specifically, we don't
+-- want to duplicate work across multiple sync workers.
+
+CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to(
+    -- The user ID to send full presence to.
+    user_id TEXT PRIMARY KEY,
+    -- A presence stream ID token - the current presence stream token when the row was last upserted.
+    -- If a user calls /sync and this token is part of the update they're to receive, we also include
+    -- full user presence in the response.
+    -- This allows multiple devices for a user to receive full presence whenever they next call /sync.
+    presence_stream_id BIGINT,
+    FOREIGN KEY (user_id)
+        REFERENCES users (name)
+);
\ No newline at end of file