diff options
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/devices.py | 10 | ||||
-rw-r--r-- | synapse/storage/databases/main/roommember.py | 62 |
2 files changed, 72 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 8d845fe951..3b3a089b76 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -670,6 +670,16 @@ class DeviceWorkerStore(SQLBaseStore): device["device_id"]: db_to_json(device["content"]) for device in devices } + def get_cached_device_list_changes( + self, + from_key: int, + ) -> Optional[Set[str]]: + """Get set of users whose devices have changed since `from_key`, or None + if that information is not in our cache. + """ + + return self._device_list_stream_cache.get_all_entities_changed(from_key) + async def get_users_whose_devices_changed( self, from_key: int, user_ids: Iterable[str] ) -> Set[str]: diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 4489732fda..e48ec5f495 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -504,6 +504,68 @@ class RoomMemberWorkerStore(EventsWorkerStore): for room_id, instance, stream_id in txn ) + @cachedList( + cached_method_name="get_rooms_for_user_with_stream_ordering", + list_name="user_ids", + ) + async def get_rooms_for_users_with_stream_ordering( + self, user_ids: Collection[str] + ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: + """A batched version of `get_rooms_for_user_with_stream_ordering`. + + Returns: + Map from user_id to set of rooms that is currently in. + """ + return await self.db_pool.runInteraction( + "get_rooms_for_users_with_stream_ordering", + self._get_rooms_for_users_with_stream_ordering_txn, + user_ids, + ) + + def _get_rooms_for_users_with_stream_ordering_txn( + self, txn, user_ids: Collection[str] + ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]: + + clause, args = make_in_list_sql_clause( + self.database_engine, + "c.state_key", + user_ids, + ) + + if self._current_state_events_membership_up_to_date: + sql = f""" + SELECT c.state_key, room_id, e.instance_name, e.stream_ordering + FROM current_state_events AS c + INNER JOIN events AS e USING (room_id, event_id) + WHERE + c.type = 'm.room.member' + AND c.membership = ? + AND {clause} + """ + else: + sql = f""" + SELECT c.state_key, room_id, e.instance_name, e.stream_ordering + FROM current_state_events AS c + INNER JOIN room_memberships AS m USING (room_id, event_id) + INNER JOIN events AS e USING (room_id, event_id) + WHERE + c.type = 'm.room.member' + AND m.membership = ? + AND {clause} + """ + + txn.execute(sql, [Membership.JOIN] + args) + + result = {user_id: set() for user_id in user_ids} + for user_id, room_id, instance, stream_id in txn: + result[user_id].add( + GetRoomsForUserWithStreamOrdering( + room_id, PersistedEventPosition(instance, stream_id) + ) + ) + + return {user_id: frozenset(v) for user_id, v in result.items()} + async def get_users_server_still_shares_room_with( self, user_ids: Collection[str] ) -> Set[str]: |