summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2022-07-25 10:21:06 +0100
committerGitHub <noreply@github.com>2022-07-25 09:21:06 +0000
commit43adf2521cc6952dcc7f0e3006dbfe52db85721a (patch)
tree6c061b210fe82c3b4c3f5209d3de62b56acaa5f6 /synapse/handlers
parentBackfill remote event fetched by MSC3030 so we can paginate from it later (... (diff)
downloadsynapse-43adf2521cc6952dcc7f0e3006dbfe52db85721a.tar.xz
Refactor presence so we can prune user in room caches (#13313)
See #10826 and #10786 for context as to why we had to disable pruning on
those caches.

Now that `get_users_who_share_room_with_user` is called frequently only
for presence, we just need to make calls to it less frequent and then we
can remove the various levels of caching that is going on.
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/presence.py112
1 files changed, 36 insertions, 76 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 895ea63ed3..741504ba9f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -34,7 +34,6 @@ from typing import (
     Callable,
     Collection,
     Dict,
-    FrozenSet,
     Generator,
     Iterable,
     List,
@@ -42,7 +41,6 @@ from typing import (
     Set,
     Tuple,
     Type,
-    Union,
 )
 
 from prometheus_client import Counter
@@ -68,7 +66,6 @@ from synapse.storage.databases.main import DataStore
 from synapse.streams import EventSource
 from synapse.types import JsonDict, StreamKeyType, UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
-from synapse.util.caches.descriptors import _CacheContext, cached
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
 
@@ -1656,15 +1653,18 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
                 # doesn't return. C.f. #5503.
                 return [], max_token
 
-            # Figure out which other users this user should receive updates for
-            users_interested_in = await self._get_interested_in(user, explicit_room_id)
+            # Figure out which other users this user should explicitly receive
+            # updates for
+            additional_users_interested_in = (
+                await self.get_presence_router().get_interested_users(user.to_string())
+            )
 
             # We have a set of users that we're interested in the presence of. We want to
             # cross-reference that with the users that have actually changed their presence.
 
             # Check whether this user should see all user updates
 
-            if users_interested_in == PresenceRouter.ALL_USERS:
+            if additional_users_interested_in == PresenceRouter.ALL_USERS:
                 # Provide presence state for all users
                 presence_updates = await self._filter_all_presence_updates_for_user(
                     user_id, include_offline, from_key
@@ -1673,34 +1673,47 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
                 return presence_updates, max_token
 
             # Make mypy happy. users_interested_in should now be a set
-            assert not isinstance(users_interested_in, str)
+            assert not isinstance(additional_users_interested_in, str)
+
+            # We always care about our own presence.
+            additional_users_interested_in.add(user_id)
+
+            if explicit_room_id:
+                user_ids = await self.store.get_users_in_room(explicit_room_id)
+                additional_users_interested_in.update(user_ids)
 
             # The set of users that we're interested in and that have had a presence update.
             # We'll actually pull the presence updates for these users at the end.
-            interested_and_updated_users: Union[Set[str], FrozenSet[str]] = set()
+            interested_and_updated_users: Collection[str]
 
             if from_key is not None:
                 # First get all users that have had a presence update
                 updated_users = stream_change_cache.get_all_entities_changed(from_key)
 
                 # Cross-reference users we're interested in with those that have had updates.
-                # Use a slightly-optimised method for processing smaller sets of updates.
-                if updated_users is not None and len(updated_users) < 500:
-                    # For small deltas, it's quicker to get all changes and then
-                    # cross-reference with the users we're interested in
+                if updated_users is not None:
+                    # If we have the full list of changes for presence we can
+                    # simply check which ones share a room with the user.
                     get_updates_counter.labels("stream").inc()
-                    for other_user_id in updated_users:
-                        if other_user_id in users_interested_in:
-                            # mypy thinks this variable could be a FrozenSet as it's possibly set
-                            # to one in the `get_entities_changed` call below, and `add()` is not
-                            # method on a FrozenSet. That doesn't affect us here though, as
-                            # `interested_and_updated_users` is clearly a set() above.
-                            interested_and_updated_users.add(other_user_id)  # type: ignore
+
+                    sharing_users = await self.store.do_users_share_a_room(
+                        user_id, updated_users
+                    )
+
+                    interested_and_updated_users = (
+                        sharing_users.union(additional_users_interested_in)
+                    ).intersection(updated_users)
+
                 else:
                     # Too many possible updates. Find all users we can see and check
                     # if any of them have changed.
                     get_updates_counter.labels("full").inc()
 
+                    users_interested_in = (
+                        await self.store.get_users_who_share_room_with_user(user_id)
+                    )
+                    users_interested_in.update(additional_users_interested_in)
+
                     interested_and_updated_users = (
                         stream_change_cache.get_entities_changed(
                             users_interested_in, from_key
@@ -1709,7 +1722,10 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
             else:
                 # No from_key has been specified. Return the presence for all users
                 # this user is interested in
-                interested_and_updated_users = users_interested_in
+                interested_and_updated_users = (
+                    await self.store.get_users_who_share_room_with_user(user_id)
+                )
+                interested_and_updated_users.update(additional_users_interested_in)
 
             # Retrieve the current presence state for each user
             users_to_state = await self.get_presence_handler().current_state_for_users(
@@ -1804,62 +1820,6 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
     def get_current_key(self) -> int:
         return self.store.get_current_presence_token()
 
-    @cached(num_args=2, cache_context=True)
-    async def _get_interested_in(
-        self,
-        user: UserID,
-        explicit_room_id: Optional[str] = None,
-        cache_context: Optional[_CacheContext] = None,
-    ) -> Union[Set[str], str]:
-        """Returns the set of users that the given user should see presence
-        updates for.
-
-        Args:
-            user: The user to retrieve presence updates for.
-            explicit_room_id: The users that are in the room will be returned.
-
-        Returns:
-            A set of user IDs to return presence updates for, or "ALL" to return all
-            known updates.
-        """
-        user_id = user.to_string()
-        users_interested_in = set()
-        users_interested_in.add(user_id)  # So that we receive our own presence
-
-        # cache_context isn't likely to ever be None due to the @cached decorator,
-        # but we can't have a non-optional argument after the optional argument
-        # explicit_room_id either. Assert cache_context is not None so we can use it
-        # without mypy complaining.
-        assert cache_context
-
-        # Check with the presence router whether we should poll additional users for
-        # their presence information
-        additional_users = await self.get_presence_router().get_interested_users(
-            user.to_string()
-        )
-        if additional_users == PresenceRouter.ALL_USERS:
-            # If the module requested that this user see the presence updates of *all*
-            # users, then simply return that instead of calculating what rooms this
-            # user shares
-            return PresenceRouter.ALL_USERS
-
-        # Add the additional users from the router
-        users_interested_in.update(additional_users)
-
-        # Find the users who share a room with this user
-        users_who_share_room = await self.store.get_users_who_share_room_with_user(
-            user_id, on_invalidate=cache_context.invalidate
-        )
-        users_interested_in.update(users_who_share_room)
-
-        if explicit_room_id:
-            user_ids = await self.store.get_users_in_room(
-                explicit_room_id, on_invalidate=cache_context.invalidate
-            )
-            users_interested_in.update(user_ids)
-
-        return users_interested_in
-
 
 def handle_timeouts(
     user_states: List[UserPresenceState],