diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 4 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 83 |
2 files changed, 72 insertions, 15 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index a2f8310388..e30f9c76d4 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -80,6 +80,10 @@ class SQLBaseStore(metaclass=ABCMeta): ) self._attempt_to_invalidate_cache("get_local_users_in_room", (room_id,)) + # There's no easy way of invalidating this cache for just the users + # that have changed, so we just clear the entire thing. + self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None) + for user_id in members_changed: self._attempt_to_invalidate_cache( "get_user_in_room_with_profile", (room_id, user_id) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index df6b82660e..e2cccc688c 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -21,6 +21,7 @@ from typing import ( FrozenSet, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -55,6 +56,7 @@ from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList +from synapse.util.iterutils import batch_iter from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -183,7 +185,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): self._check_safe_current_state_events_membership_updated_txn, ) - @cached(max_entries=100000, iterable=True, prune_unread_entries=False) + @cached(max_entries=100000, iterable=True) async def get_users_in_room(self, room_id: str) -> List[str]: return await self.db_pool.runInteraction( "get_users_in_room", self.get_users_in_room_txn, room_id @@ -561,7 +563,7 @@ class RoomMemberWorkerStore(EventsWorkerStore): return results_dict.get("membership"), results_dict.get("event_id") - @cached(max_entries=500000, iterable=True, prune_unread_entries=False) + @cached(max_entries=500000, iterable=True) async def get_rooms_for_user_with_stream_ordering( self, user_id: str ) -> FrozenSet[GetRoomsForUserWithStreamOrdering]: @@ -732,25 +734,76 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) return frozenset(r.room_id for r in rooms) - @cached( - max_entries=500000, - cache_context=True, - iterable=True, - prune_unread_entries=False, + @cached(max_entries=10000) + async def does_pair_of_users_share_a_room( + self, user_id: str, other_user_id: str + ) -> bool: + raise NotImplementedError() + + @cachedList( + cached_method_name="does_pair_of_users_share_a_room", list_name="other_user_ids" ) - async def get_users_who_share_room_with_user( - self, user_id: str, cache_context: _CacheContext + async def _do_users_share_a_room( + self, user_id: str, other_user_ids: Collection[str] + ) -> Mapping[str, Optional[bool]]: + """Return mapping from user ID to whether they share a room with the + given user. + + Note: `None` and `False` are equivalent and mean they don't share a + room. + """ + + def do_users_share_a_room_txn( + txn: LoggingTransaction, user_ids: Collection[str] + ) -> Dict[str, bool]: + clause, args = make_in_list_sql_clause( + self.database_engine, "state_key", user_ids + ) + + # This query works by fetching both the list of rooms for the target + # user and the set of other users, and then checking if there is any + # overlap. + sql = f""" + SELECT b.state_key + FROM ( + SELECT room_id FROM current_state_events + WHERE type = 'm.room.member' AND membership = 'join' AND state_key = ? + ) AS a + INNER JOIN ( + SELECT room_id, state_key FROM current_state_events + WHERE type = 'm.room.member' AND membership = 'join' AND {clause} + ) AS b using (room_id) + LIMIT 1 + """ + + txn.execute(sql, (user_id, *args)) + return {u: True for u, in txn} + + to_return = {} + for batch_user_ids in batch_iter(other_user_ids, 1000): + res = await self.db_pool.runInteraction( + "do_users_share_a_room", do_users_share_a_room_txn, batch_user_ids + ) + to_return.update(res) + + return to_return + + async def do_users_share_a_room( + self, user_id: str, other_user_ids: Collection[str] ) -> Set[str]: + """Return the set of users who share a room with the first users""" + + user_dict = await self._do_users_share_a_room(user_id, other_user_ids) + + return {u for u, share_room in user_dict.items() if share_room} + + async def get_users_who_share_room_with_user(self, user_id: str) -> Set[str]: """Returns the set of users who share a room with `user_id`""" - room_ids = await self.get_rooms_for_user( - user_id, on_invalidate=cache_context.invalidate - ) + room_ids = await self.get_rooms_for_user(user_id) user_who_share_room = set() for room_id in room_ids: - user_ids = await self.get_users_in_room( - room_id, on_invalidate=cache_context.invalidate - ) + user_ids = await self.get_users_in_room(room_id) user_who_share_room.update(user_ids) return user_who_share_room |