diff options
author | Erik Johnston <erik@matrix.org> | 2022-07-25 10:21:06 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-25 09:21:06 +0000 |
commit | 43adf2521cc6952dcc7f0e3006dbfe52db85721a (patch) | |
tree | 6c061b210fe82c3b4c3f5209d3de62b56acaa5f6 /synapse/storage | |
parent | Backfill remote event fetched by MSC3030 so we can paginate from it later (... (diff) | |
download | synapse-43adf2521cc6952dcc7f0e3006dbfe52db85721a.tar.xz |
Refactor presence so we can prune user in room caches (#13313)
See #10826 and #10786 for context as to why we had to disable pruning on those caches. Now that `get_users_who_share_room_with_user` is called frequently only for presence, we just need to make calls to it less frequent and then we can remove the various levels of caching that is going on.
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/_base.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 83 |
2 files changed, 72 insertions, 15 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a2f8310388..e30f9c76d4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -80,6 +80,10 @@ class SQLBaseStore(metaclass=ABCMeta): ) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) + # There's no easy way of invalidating this cache for just the users + # that have changed, so we just clear the entire thing. + self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) + for user_id in members_changed: self._attempt_to_invalidate_cache( "get_user_in_room_with_profile", (room_id, user_id) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index df6b82660e..e2cccc688c 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -21,6 +21,7 @@ from typing import ( FrozenSet, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -55,6 +56,7 @@ from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList +from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -183,7 +185,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): self._check_safe_current_state_events_membership_updated_txn, ) - @cached(max_entries=100000, iterable=True, prune_unread_entries=False) + @cached(max_entries=100000, iterable=True) async def get_users_in_room(self, room_id: str) -> List[str]: return await self.db_pool.runInteraction( "get_users_in_room", self.get_users_in_room_txn, room_id @@ -561,7 +563,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): return results_dict.get("membership"), results_dict.get("event_id") - @cached(max_entries=500000, iterable=True, prune_unread_entries=False) + @cached(max_entries=500000, iterable=True) async def get_rooms_for_user_with_stream_ordering( self, user_id: str ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: @@ -732,25 +734,76 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) return frozenset(r.room_id for r in rooms) - @cached( - max_entries=500000, - cache_context=True, - iterable=True, - prune_unread_entries=False, + @cached(max_entries=10000) + async def does_pair_of_users_share_a_room( + self, user_id: str, other_user_id: str + ) -> bool: + raise NotImplementedError() + + @cachedList( + cached_method_name="does_pair_of_users_share_a_room", list_name="other_user_ids" ) - async def get_users_who_share_room_with_user( - self, user_id: str, cache_context: _CacheContext + async def _do_users_share_a_room( + self, user_id: str, other_user_ids: Collection[str] + ) -> Mapping[str, Optional[bool]]: + """Return mapping from user ID to whether they share a room with the + given user. + + Note: `None` and `False` are equivalent and mean they don't share a + room. + """ + + def do_users_share_a_room_txn( + txn: LoggingTransaction, user_ids: Collection[str] + ) -> Dict[str, bool]: + clause, args = make_in_list_sql_clause( + self.database_engine, "state_key", user_ids + ) + + # This query works by fetching both the list of rooms for the target + # user and the set of other users, and then checking if there is any + # overlap. + sql = f""" + SELECT b.state_key + FROM ( + SELECT room_id FROM current_state_events + WHERE type = 'm.room.member' AND membership = 'join' AND state_key = ? + ) AS a + INNER JOIN ( + SELECT room_id, state_key FROM current_state_events + WHERE type = 'm.room.member' AND membership = 'join' AND {clause} + ) AS b using (room_id) + LIMIT 1 + """ + + txn.execute(sql, (user_id, *args)) + return {u: True for u, in txn} + + to_return = {} + for batch_user_ids in batch_iter(other_user_ids, 1000): + res = await self.db_pool.runInteraction( + "do_users_share_a_room", do_users_share_a_room_txn, batch_user_ids + ) + to_return.update(res) + + return to_return + + async def do_users_share_a_room( + self, user_id: str, other_user_ids: Collection[str] ) -> Set[str]: + """Return the set of users who share a room with the first users""" + + user_dict = await self._do_users_share_a_room(user_id, other_user_ids) + + return {u for u, share_room in user_dict.items() if share_room} + + async def get_users_who_share_room_with_user(self, user_id: str) -> Set[str]: """Returns the set of users who share a room with `user_id`""" - room_ids = await self.get_rooms_for_user( - user_id, on_invalidate=cache_context.invalidate - ) + room_ids = await self.get_rooms_for_user(user_id) user_who_share_room = set() for room_id in room_ids: - user_ids = await self.get_users_in_room( - room_id, on_invalidate=cache_context.invalidate - ) + user_ids = await self.get_users_in_room(room_id) user_who_share_room.update(user_ids) return user_who_share_room |