summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-03-18 12:43:35 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2021-03-18 16:35:21 +0000
commit18f50fb85d68285d7aabefcb06bd6c38ac4c53b2 (patch)
tree4eca5d6a6029202f7109f162da53ae705716952a
parentAdd a storage method to get the current presence state for all users (diff)
downloadsynapse-18f50fb85d68285d7aabefcb06bd6c38ac4c53b2.tar.xz
Update PresenceHandler to call PresenceRouter methods when applicable
This big ol' change does three high-level things:

1. It modifies `_get_interested_in` to ask the loaded PresenceRouter if
there are any users - in addition to those that share a room with the
user in question - that it thinks should have their presence status
queried. PresenceRouter can either return a Set of users, or "ALL".

2. It modifies `get_new_events` (which is mainly run when a user is
syncing and needs to check for presence updates) to support receiving
"ALL" from `_get_interested_in`. What happens then depends on whether a
`from_key` was provided to `get_new_events`. We also now call
`get_users_and_states` to filter the UserPresenceState objects after
querying ALL of them from a given `from_key`.

3. It also modifies `get_new_events` to take into account whether the
syncing user is included in
`ModuleApi.send_full_presence_to_local_users`. If so, then we're going
to send them all current user presence state (filtering it through
`get_users_for_states` again). We then remove the user ID from the set
to ensure the same doesn't happen on the next sync.

This is mainly all to support redirecting presence for local users as
they sync, though the same method is called for appservice users.
-rw-r--r--synapse/handlers/presence.py228
1 files changed, 191 insertions, 37 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 872a540b8c..21d5ce8010 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,15 +25,26 @@ The methods that define policy are:
 import abc
 import logging
 from contextlib import contextmanager
-from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Dict,
+    FrozenSet,
+    Iterable,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+)
 
 from prometheus_client import Counter
-from typing_extensions import ContextManager
+from typing_extensions import ContextManager, Literal
 
 import synapse.metrics
 from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
 from synapse.api.presence import UserPresenceState
+from synapse.events.presence_router import PresenceRouter
 from synapse.logging.context import run_in_background
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
@@ -42,7 +53,7 @@ from synapse.state import StateHandler
 from synapse.storage.databases.main import DataStore
 from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import _CacheContext, cached
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
 
@@ -207,6 +218,7 @@ class PresenceHandler(BasePresenceHandler):
         self.notifier = hs.get_notifier()
         self.federation = hs.get_federation_sender()
         self.state = hs.get_state_handler()
+        self.presence_router = hs.get_presence_router()
         self._presence_enabled = hs.config.use_presence
 
         federation_registry = hs.get_federation_registry()
@@ -651,7 +663,7 @@ class PresenceHandler(BasePresenceHandler):
         """
         stream_id, max_token = await self.store.update_presence(states)
 
-        parties = await get_interested_parties(self.store, states)
+        parties = await get_interested_parties(self.store, self.presence_router, states)
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
@@ -1033,7 +1045,12 @@ class PresenceEventSource:
         #
         #   Presence -> Notifier -> PresenceEventSource -> Presence
         #
+        # Same with get_module_api, get_presence_router
+        #
+        #   AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler
         self.get_presence_handler = hs.get_presence_handler
+        self.get_module_api = hs.get_module_api
+        self.get_presence_router = hs.get_presence_router
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
@@ -1047,7 +1064,7 @@ class PresenceEventSource:
         include_offline=True,
         explicit_room_id=None,
         **kwargs
-    ):
+    ) -> Tuple[List[UserPresenceState], int]:
         # The process for getting presence events are:
         #  1. Get the rooms the user is in.
         #  2. Get the list of user in the rooms.
@@ -1060,7 +1077,17 @@ class PresenceEventSource:
         # We don't try and limit the presence updates by the current token, as
         # sending down the rare duplicate is not a concern.
 
+        user_id = user.to_string()
+        stream_change_cache = self.store.presence_stream_cache
+
         with Measure(self.clock, "presence.get_new_events"):
+            if user_id in self.get_module_api().send_full_presence_to_local_users:
+                # This user has been specified by a module to receive all current, online
+                # user presence. Removing from_key and setting include_offline to false
+                # will do effectively this.
+                from_key = None
+                include_offline = False
+
             if from_key is not None:
                 from_key = int(from_key)
 
@@ -1083,59 +1110,186 @@ class PresenceEventSource:
                 # doesn't return. C.f. #5503.
                 return [], max_token
 
-            presence = self.get_presence_handler()
-            stream_change_cache = self.store.presence_stream_cache
-
+            # Figure out which other users this user should receive updates for
             users_interested_in = await self._get_interested_in(user, explicit_room_id)
 
-            user_ids_changed = set()  # type: Collection[str]
-            changed = None
-            if from_key:
-                changed = stream_change_cache.get_all_entities_changed(from_key)
-
-            if changed is not None and len(changed) < 500:
-                assert isinstance(user_ids_changed, set)
+            # We have a set of users that we're interested in the presence of. We want to
+            # cross-reference that with the users that have actually changed their presence.
 
-                # For small deltas, its quicker to get all changes and then
-                # work out if we share a room or they're in our presence list
-                get_updates_counter.labels("stream").inc()
-                for other_user_id in changed:
-                    if other_user_id in users_interested_in:
-                        user_ids_changed.add(other_user_id)
-            else:
-                # Too many possible updates. Find all users we can see and check
-                # if any of them have changed.
-                get_updates_counter.labels("full").inc()
+            # Check whether this user should see all user updates
 
+            if users_interested_in == "ALL":
                 if from_key:
-                    user_ids_changed = stream_change_cache.get_entities_changed(
-                        users_interested_in, from_key
+                    # We need to return all new presence updates to this user, regardless of whether
+                    # they share a room with that user
+                    return await self._filter_all_presence_updates_for_user(
+                        user_id, max_token, from_key, include_offline
                     )
                 else:
-                    user_ids_changed = users_interested_in
+                    # This user should receive all user presence, and hasn't provided a from_key.
+                    # Send all currently known user presence states.
+                    users_to_state = await self.store.get_presence_for_all_users(
+                        include_offline=include_offline
+                    )
 
-            updates = await presence.current_state_for_users(user_ids_changed)
+                    return list(users_to_state.values()), max_token
 
-        if include_offline:
-            return (list(updates.values()), max_token)
-        else:
-            return (
-                [s for s in updates.values() if s.state != PresenceState.OFFLINE],
-                max_token,
+            # The set of users that we're interested in and that have had a presence update.
+            # We'll actually pull the presence updates for these users at the end.
+            interested_and_updated_users = (
+                set()
+            )  # type: Union[Set[str], FrozenSet[str]]
+
+            if from_key:
+                # First get all users that have had a presence update
+                updated_users = stream_change_cache.get_all_entities_changed(from_key)
+
+                # Cross-reference users we're interested in with those that have had updates.
+                # Use a slightly-optimised method for processing smaller sets of updates.
+                if updated_users is not None and len(updated_users) < 500:
+                    # For small deltas, it's quicker to get all changes and then
+                    # cross-reference with the users we're interested in
+                    get_updates_counter.labels("stream").inc()
+                    for other_user_id in updated_users:
+                        if other_user_id in users_interested_in:
+                            # mypy thinks this variable could be a FrozenSet as it's possibly set
+                            # to one in the `get_entities_changed` call below, and `add()` is not
+                            # method on a FrozenSet. That doesn't affect us here though, as
+                            # `interested_and_updated_users` is clearly a set() above.
+                            interested_and_updated_users.add(other_user_id)  # type: ignore
+                else:
+                    # Too many possible updates. Find all users we can see and check
+                    # if any of them have changed.
+                    get_updates_counter.labels("full").inc()
+
+                    interested_and_updated_users = (
+                        stream_change_cache.get_entities_changed(
+                            users_interested_in, from_key
+                        )
+                    )
+            else:
+                # No from_key has been specified. Return the presence for all users
+                # this user is interested in
+                interested_and_updated_users = users_interested_in
+
+            # Retrieve the current presence state for each user
+            users_to_state = await self.get_presence_handler().current_state_for_users(
+                interested_and_updated_users
             )
+            presence_updates = list(users_to_state.values())
+
+        # Remove the user from the list of users to receive all presence
+        if user_id in self.get_module_api().send_full_presence_to_local_users:
+            self.get_module_api().send_full_presence_to_local_users.remove(user_id)
+
+        if not include_offline:
+            # Filter out offline presence states
+            presence_updates = self._filter_offline_presence_state(presence_updates)
+
+        return presence_updates, max_token
+
+    async def _filter_all_presence_updates_for_user(
+        self,
+        user_id: str,
+        max_token: int,
+        from_key: int,
+        include_offline: bool,
+    ) -> Tuple[List[UserPresenceState], int]:
+        # Only return updates since the last sync
+        updated_users = self.store.presence_stream_cache.get_all_entities_changed(
+            from_key
+        )
+        if not updated_users:
+            updated_users = []
+
+        # Get the actual presence update for each change
+        users_to_state = await self.get_presence_handler().current_state_for_users(
+            updated_users
+        )
+
+        # TODO: This feels wildly inefficient, and it's unfortunate we need to ask the
+        # module for information on a number of users when we then only take the info
+        # for a single user
+
+        # Filter through the presence router
+        users_to_state_set = await self.get_presence_router().get_users_for_states(
+            users_to_state.values()
+        )
+
+        # We only want the mapping for the syncing user
+        presence_updates = list(users_to_state_set[user_id])
+
+        # Remove the user from the list of users to receive all presence
+        if user_id in self.get_module_api().send_full_presence_to_local_users:
+            self.get_module_api().send_full_presence_to_local_users.remove(user_id)
+
+        if not include_offline:
+            # Filter out offline states
+            presence_updates = self._filter_offline_presence_state(presence_updates)
+
+        # Return presence updates for all users since the last sync
+        return presence_updates, max_token
+
+    def _filter_offline_presence_state(
+        self, presence_updates: Iterable[UserPresenceState]
+    ) -> List[UserPresenceState]:
+        """Given an iterable containing user presence updates, return a list with any offline
+        presence states removed.
+
+        Args:
+            presence_updates: Presence states to filter
+
+        Returns:
+            A new list with any offline presence states removed.
+        """
+        return [
+            update
+            for update in presence_updates
+            if update.state != PresenceState.OFFLINE
+        ]
 
     def get_current_key(self):
         return self.store.get_current_presence_token()
 
     @cached(num_args=2, cache_context=True)
-    async def _get_interested_in(self, user, explicit_room_id, cache_context):
+    async def _get_interested_in(
+        self,
+        user: UserID,
+        explicit_room_id: Optional[str] = None,
+        cache_context: Optional[_CacheContext] = None,
+    ) -> Union[Set[str], Literal["ALL"]]:
         """Returns the set of users that the given user should see presence
-        updates for
+        updates for.
+
+        Args:
+            user: The user to retrieve presence updates for.
+            explicit_room_id: A
         """
         user_id = user.to_string()
         users_interested_in = set()
         users_interested_in.add(user_id)  # So that we receive our own presence
 
+        # cache_context isn't likely to ever be None due to the @cached decorator,
+        # but we can't have a non-optional argument after the optional argument
+        # explicit_room_id either. Assert cache_context is not None so we can use it
+        # without mypy complaining.
+        assert cache_context
+
+        # Check with the presence router whether we should poll additional users for
+        # their presence information
+        additional_users = await self.get_presence_router().get_interested_users(
+            user.to_string()
+        )
+        if additional_users == "ALL":
+            # If the module requested that this user see the presence updates of *all*
+            # users, then simply return that instead of calculating what rooms this
+            # user shares
+            return "ALL"
+
+        # Add the additional users from the router
+        users_interested_in.update(additional_users)
+
+        # Find the users who share a room with this user
         users_who_share_room = await self.store.get_users_who_share_room_with_user(
             user_id, on_invalidate=cache_context.invalidate
         )