diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 895ea63ed3..741504ba9f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -34,7 +34,6 @@ from typing import (
Callable,
Collection,
Dict,
- FrozenSet,
Generator,
Iterable,
List,
@@ -42,7 +41,6 @@ from typing import (
Set,
Tuple,
Type,
- Union,
)
from prometheus_client import Counter
@@ -68,7 +66,6 @@ from synapse.storage.databases.main import DataStore
from synapse.streams import EventSource
from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id
from synapse.util.async_helpers import Linearizer
-from synapse.util.caches.descriptors import _CacheContext, cached
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -1656,15 +1653,18 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
# doesn't return. C.f. #5503.
return [], max_token
- # Figure out which other users this user should receive updates for
- users_interested_in = await self._get_interested_in(user, explicit_room_id)
+ # Figure out which other users this user should explicitly receive
+ # updates for
+ additional_users_interested_in = (
+ await self.get_presence_router().get_interested_users(user.to_string())
+ )
# We have a set of users that we're interested in the presence of. We want to
# cross-reference that with the users that have actually changed their presence.
# Check whether this user should see all user updates
- if users_interested_in == PresenceRouter.ALL_USERS:
+ if additional_users_interested_in == PresenceRouter.ALL_USERS:
# Provide presence state for all users
presence_updates = await self._filter_all_presence_updates_for_user(
user_id, include_offline, from_key
@@ -1673,34 +1673,47 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
return presence_updates, max_token
# Make mypy happy. users_interested_in should now be a set
- assert not isinstance(users_interested_in, str)
+ assert not isinstance(additional_users_interested_in, str)
+
+ # We always care about our own presence.
+ additional_users_interested_in.add(user_id)
+
+ if explicit_room_id:
+ user_ids = await self.store.get_users_in_room(explicit_room_id)
+ additional_users_interested_in.update(user_ids)
# The set of users that we're interested in and that have had a presence update.
# We'll actually pull the presence updates for these users at the end.
- interested_and_updated_users: Union[Set[str], FrozenSet[str]] = set()
+ interested_and_updated_users: Collection[str]
if from_key is not None:
# First get all users that have had a presence update
updated_users = stream_change_cache.get_all_entities_changed(from_key)
# Cross-reference users we're interested in with those that have had updates.
- # Use a slightly-optimised method for processing smaller sets of updates.
- if updated_users is not None and len(updated_users) < 500:
- # For small deltas, it's quicker to get all changes and then
- # cross-reference with the users we're interested in
+ if updated_users is not None:
+ # If we have the full list of changes for presence we can
+ # simply check which ones share a room with the user.
get_updates_counter.labels("stream").inc()
- for other_user_id in updated_users:
- if other_user_id in users_interested_in:
- # mypy thinks this variable could be a FrozenSet as it's possibly set
- # to one in the `get_entities_changed` call below, and `add()` is not
- # method on a FrozenSet. That doesn't affect us here though, as
- # `interested_and_updated_users` is clearly a set() above.
- interested_and_updated_users.add(other_user_id) # type: ignore
+
+ sharing_users = await self.store.do_users_share_a_room(
+ user_id, updated_users
+ )
+
+ interested_and_updated_users = (
+ sharing_users.union(additional_users_interested_in)
+ ).intersection(updated_users)
+
else:
# Too many possible updates. Find all users we can see and check
# if any of them have changed.
get_updates_counter.labels("full").inc()
+ users_interested_in = (
+ await self.store.get_users_who_share_room_with_user(user_id)
+ )
+ users_interested_in.update(additional_users_interested_in)
+
interested_and_updated_users = (
stream_change_cache.get_entities_changed(
users_interested_in, from_key
@@ -1709,7 +1722,10 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
else:
# No from_key has been specified. Return the presence for all users
# this user is interested in
- interested_and_updated_users = users_interested_in
+ interested_and_updated_users = (
+ await self.store.get_users_who_share_room_with_user(user_id)
+ )
+ interested_and_updated_users.update(additional_users_interested_in)
# Retrieve the current presence state for each user
users_to_state = await self.get_presence_handler().current_state_for_users(
@@ -1804,62 +1820,6 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
def get_current_key(self) -> int:
return self.store.get_current_presence_token()
- @cached(num_args=2, cache_context=True)
- async def _get_interested_in(
- self,
- user: UserID,
- explicit_room_id: Optional[str] = None,
- cache_context: Optional[_CacheContext] = None,
- ) -> Union[Set[str], str]:
- """Returns the set of users that the given user should see presence
- updates for.
-
- Args:
- user: The user to retrieve presence updates for.
- explicit_room_id: The users that are in the room will be returned.
-
- Returns:
- A set of user IDs to return presence updates for, or "ALL" to return all
- known updates.
- """
- user_id = user.to_string()
- users_interested_in = set()
- users_interested_in.add(user_id) # So that we receive our own presence
-
- # cache_context isn't likely to ever be None due to the @cached decorator,
- # but we can't have a non-optional argument after the optional argument
- # explicit_room_id either. Assert cache_context is not None so we can use it
- # without mypy complaining.
- assert cache_context
-
- # Check with the presence router whether we should poll additional users for
- # their presence information
- additional_users = await self.get_presence_router().get_interested_users(
- user.to_string()
- )
- if additional_users == PresenceRouter.ALL_USERS:
- # If the module requested that this user see the presence updates of *all*
- # users, then simply return that instead of calculating what rooms this
- # user shares
- return PresenceRouter.ALL_USERS
-
- # Add the additional users from the router
- users_interested_in.update(additional_users)
-
- # Find the users who share a room with this user
- users_who_share_room = await self.store.get_users_who_share_room_with_user(
- user_id, on_invalidate=cache_context.invalidate
- )
- users_interested_in.update(users_who_share_room)
-
- if explicit_room_id:
- user_ids = await self.store.get_users_in_room(
- explicit_room_id, on_invalidate=cache_context.invalidate
- )
- users_interested_in.update(user_ids)
-
- return users_interested_in
-
def handle_timeouts(
user_states: List[UserPresenceState],
|