diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2021-03-18 12:43:35 +0000 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2021-03-18 16:35:21 +0000 |
commit | 18f50fb85d68285d7aabefcb06bd6c38ac4c53b2 (patch) | |
tree | 4eca5d6a6029202f7109f162da53ae705716952a | |
parent | Add a storage method to get the current presence state for all users (diff) | |
download | synapse-18f50fb85d68285d7aabefcb06bd6c38ac4c53b2.tar.xz |
Update PresenceHandler to call PresenceRouter methods when applicable
This big ol' change does three high-level things: 1. It modifies `_get_interested_in` to ask the loaded PresenceRouter if there are any users - in addition to those that share a room with the user in question - that it thinks should have their presence status queried. PresenceRouter can either return a Set of users, or "ALL". 2. It modifies `get_new_events` (which is mainly run when a user is syncing and needs to check for presence updates) to support receiving "ALL" from `_get_interested_in`. What happens then depends on whether a `from_key` was provided to `get_new_events`. We also now call `get_users_and_states` to filter the UserPresenceState objects after querying ALL of them from a given `from_key`. 3. It also modifies `get_new_events` to take into account whether the syncing user is included in `ModuleApi.send_full_presence_to_local_users`. If so, then we're going to send them all current user presence state (filtering it through `get_users_for_states` again). We then remove the user ID from the set to ensure the same doesn't happen on the next sync. This is mainly all to support redirecting presence for local users as they sync, though the same method is called for appservice users.
-rw-r--r-- | synapse/handlers/presence.py | 228 |
1 files changed, 191 insertions, 37 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 872a540b8c..21d5ce8010 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -25,15 +25,26 @@ The methods that define policy are: import abc import logging from contextlib import contextmanager -from typing import TYPE_CHECKING, Dict, Iterable, List, Set, Tuple +from typing import ( + TYPE_CHECKING, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Set, + Tuple, + Union, +) from prometheus_client import Counter -from typing_extensions import ContextManager +from typing_extensions import ContextManager, Literal import synapse.metrics from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError from synapse.api.presence import UserPresenceState +from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background from synapse.logging.utils import log_function from synapse.metrics import LaterGauge @@ -42,7 +53,7 @@ from synapse.state import StateHandler from synapse.storage.databases.main import DataStore from synapse.types import Collection, JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer @@ -207,6 +218,7 @@ class PresenceHandler(BasePresenceHandler): self.notifier = hs.get_notifier() self.federation = hs.get_federation_sender() self.state = hs.get_state_handler() + self.presence_router = hs.get_presence_router() self._presence_enabled = hs.config.use_presence federation_registry = hs.get_federation_registry() @@ -651,7 +663,7 @@ class PresenceHandler(BasePresenceHandler): """ stream_id, max_token = await self.store.update_presence(states) - parties = await get_interested_parties(self.store, states) + parties = await get_interested_parties(self.store, self.presence_router, states) room_ids_to_states, users_to_states = parties self.notifier.on_new_event( @@ -1033,7 +1045,12 @@ class PresenceEventSource: # # Presence -> Notifier -> PresenceEventSource -> Presence # + # Same with get_module_api, get_presence_router + # + # AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler self.get_presence_handler = hs.get_presence_handler + self.get_module_api = hs.get_module_api + self.get_presence_router = hs.get_presence_router self.clock = hs.get_clock() self.store = hs.get_datastore() self.state = hs.get_state_handler() @@ -1047,7 +1064,7 @@ class PresenceEventSource: include_offline=True, explicit_room_id=None, **kwargs - ): + ) -> Tuple[List[UserPresenceState], int]: # The process for getting presence events are: # 1. Get the rooms the user is in. # 2. Get the list of user in the rooms. @@ -1060,7 +1077,17 @@ class PresenceEventSource: # We don't try and limit the presence updates by the current token, as # sending down the rare duplicate is not a concern. + user_id = user.to_string() + stream_change_cache = self.store.presence_stream_cache + with Measure(self.clock, "presence.get_new_events"): + if user_id in self.get_module_api().send_full_presence_to_local_users: + # This user has been specified by a module to receive all current, online + # user presence. Removing from_key and setting include_offline to false + # will do effectively this. + from_key = None + include_offline = False + if from_key is not None: from_key = int(from_key) @@ -1083,59 +1110,186 @@ class PresenceEventSource: # doesn't return. C.f. #5503. return [], max_token - presence = self.get_presence_handler() - stream_change_cache = self.store.presence_stream_cache - + # Figure out which other users this user should receive updates for users_interested_in = await self._get_interested_in(user, explicit_room_id) - user_ids_changed = set() # type: Collection[str] - changed = None - if from_key: - changed = stream_change_cache.get_all_entities_changed(from_key) - - if changed is not None and len(changed) < 500: - assert isinstance(user_ids_changed, set) + # We have a set of users that we're interested in the presence of. We want to + # cross-reference that with the users that have actually changed their presence. - # For small deltas, its quicker to get all changes and then - # work out if we share a room or they're in our presence list - get_updates_counter.labels("stream").inc() - for other_user_id in changed: - if other_user_id in users_interested_in: - user_ids_changed.add(other_user_id) - else: - # Too many possible updates. Find all users we can see and check - # if any of them have changed. - get_updates_counter.labels("full").inc() + # Check whether this user should see all user updates + if users_interested_in == "ALL": if from_key: - user_ids_changed = stream_change_cache.get_entities_changed( - users_interested_in, from_key + # We need to return all new presence updates to this user, regardless of whether + # they share a room with that user + return await self._filter_all_presence_updates_for_user( + user_id, max_token, from_key, include_offline ) else: - user_ids_changed = users_interested_in + # This user should receive all user presence, and hasn't provided a from_key. + # Send all currently known user presence states. + users_to_state = await self.store.get_presence_for_all_users( + include_offline=include_offline + ) - updates = await presence.current_state_for_users(user_ids_changed) + return list(users_to_state.values()), max_token - if include_offline: - return (list(updates.values()), max_token) - else: - return ( - [s for s in updates.values() if s.state != PresenceState.OFFLINE], - max_token, + # The set of users that we're interested in and that have had a presence update. + # We'll actually pull the presence updates for these users at the end. + interested_and_updated_users = ( + set() + ) # type: Union[Set[str], FrozenSet[str]] + + if from_key: + # First get all users that have had a presence update + updated_users = stream_change_cache.get_all_entities_changed(from_key) + + # Cross-reference users we're interested in with those that have had updates. + # Use a slightly-optimised method for processing smaller sets of updates. + if updated_users is not None and len(updated_users) < 500: + # For small deltas, it's quicker to get all changes and then + # cross-reference with the users we're interested in + get_updates_counter.labels("stream").inc() + for other_user_id in updated_users: + if other_user_id in users_interested_in: + # mypy thinks this variable could be a FrozenSet as it's possibly set + # to one in the `get_entities_changed` call below, and `add()` is not + # method on a FrozenSet. That doesn't affect us here though, as + # `interested_and_updated_users` is clearly a set() above. + interested_and_updated_users.add(other_user_id) # type: ignore + else: + # Too many possible updates. Find all users we can see and check + # if any of them have changed. + get_updates_counter.labels("full").inc() + + interested_and_updated_users = ( + stream_change_cache.get_entities_changed( + users_interested_in, from_key + ) + ) + else: + # No from_key has been specified. Return the presence for all users + # this user is interested in + interested_and_updated_users = users_interested_in + + # Retrieve the current presence state for each user + users_to_state = await self.get_presence_handler().current_state_for_users( + interested_and_updated_users ) + presence_updates = list(users_to_state.values()) + + # Remove the user from the list of users to receive all presence + if user_id in self.get_module_api().send_full_presence_to_local_users: + self.get_module_api().send_full_presence_to_local_users.remove(user_id) + + if not include_offline: + # Filter out offline presence states + presence_updates = self._filter_offline_presence_state(presence_updates) + + return presence_updates, max_token + + async def _filter_all_presence_updates_for_user( + self, + user_id: str, + max_token: int, + from_key: int, + include_offline: bool, + ) -> Tuple[List[UserPresenceState], int]: + # Only return updates since the last sync + updated_users = self.store.presence_stream_cache.get_all_entities_changed( + from_key + ) + if not updated_users: + updated_users = [] + + # Get the actual presence update for each change + users_to_state = await self.get_presence_handler().current_state_for_users( + updated_users + ) + + # TODO: This feels wildly inefficient, and it's unfortunate we need to ask the + # module for information on a number of users when we then only take the info + # for a single user + + # Filter through the presence router + users_to_state_set = await self.get_presence_router().get_users_for_states( + users_to_state.values() + ) + + # We only want the mapping for the syncing user + presence_updates = list(users_to_state_set[user_id]) + + # Remove the user from the list of users to receive all presence + if user_id in self.get_module_api().send_full_presence_to_local_users: + self.get_module_api().send_full_presence_to_local_users.remove(user_id) + + if not include_offline: + # Filter out offline states + presence_updates = self._filter_offline_presence_state(presence_updates) + + # Return presence updates for all users since the last sync + return presence_updates, max_token + + def _filter_offline_presence_state( + self, presence_updates: Iterable[UserPresenceState] + ) -> List[UserPresenceState]: + """Given an iterable containing user presence updates, return a list with any offline + presence states removed. + + Args: + presence_updates: Presence states to filter + + Returns: + A new list with any offline presence states removed. + """ + return [ + update + for update in presence_updates + if update.state != PresenceState.OFFLINE + ] def get_current_key(self): return self.store.get_current_presence_token() @cached(num_args=2, cache_context=True) - async def _get_interested_in(self, user, explicit_room_id, cache_context): + async def _get_interested_in( + self, + user: UserID, + explicit_room_id: Optional[str] = None, + cache_context: Optional[_CacheContext] = None, + ) -> Union[Set[str], Literal["ALL"]]: """Returns the set of users that the given user should see presence - updates for + updates for. + + Args: + user: The user to retrieve presence updates for. + explicit_room_id: A """ user_id = user.to_string() users_interested_in = set() users_interested_in.add(user_id) # So that we receive our own presence + # cache_context isn't likely to ever be None due to the @cached decorator, + # but we can't have a non-optional argument after the optional argument + # explicit_room_id either. Assert cache_context is not None so we can use it + # without mypy complaining. + assert cache_context + + # Check with the presence router whether we should poll additional users for + # their presence information + additional_users = await self.get_presence_router().get_interested_users( + user.to_string() + ) + if additional_users == "ALL": + # If the module requested that this user see the presence updates of *all* + # users, then simply return that instead of calculating what rooms this + # user shares + return "ALL" + + # Add the additional users from the router + users_interested_in.update(additional_users) + + # Find the users who share a room with this user users_who_share_room = await self.store.get_users_who_share_room_with_user( user_id, on_invalidate=cache_context.invalidate ) |