diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 4489732fda..e48ec5f495 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -504,6 +504,68 @@ class RoomMemberWorkerStore(EventsWorkerStore):
for room_id, instance, stream_id in txn
)
+ @cachedList(
+ cached_method_name="get_rooms_for_user_with_stream_ordering",
+ list_name="user_ids",
+ )
+ async def get_rooms_for_users_with_stream_ordering(
+ self, user_ids: Collection[str]
+ ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
+ """A batched version of `get_rooms_for_user_with_stream_ordering`.
+
+ Returns:
+ Map from user_id to set of rooms that is currently in.
+ """
+ return await self.db_pool.runInteraction(
+ "get_rooms_for_users_with_stream_ordering",
+ self._get_rooms_for_users_with_stream_ordering_txn,
+ user_ids,
+ )
+
+ def _get_rooms_for_users_with_stream_ordering_txn(
+ self, txn, user_ids: Collection[str]
+ ) -> Dict[str, FrozenSet[GetRoomsForUserWithStreamOrdering]]:
+
+ clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "c.state_key",
+ user_ids,
+ )
+
+ if self._current_state_events_membership_up_to_date:
+ sql = f"""
+ SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
+ FROM current_state_events AS c
+ INNER JOIN events AS e USING (room_id, event_id)
+ WHERE
+ c.type = 'm.room.member'
+ AND c.membership = ?
+ AND {clause}
+ """
+ else:
+ sql = f"""
+ SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
+ FROM current_state_events AS c
+ INNER JOIN room_memberships AS m USING (room_id, event_id)
+ INNER JOIN events AS e USING (room_id, event_id)
+ WHERE
+ c.type = 'm.room.member'
+ AND m.membership = ?
+ AND {clause}
+ """
+
+ txn.execute(sql, [Membership.JOIN] + args)
+
+ result = {user_id: set() for user_id in user_ids}
+ for user_id, room_id, instance, stream_id in txn:
+ result[user_id].add(
+ GetRoomsForUserWithStreamOrdering(
+ room_id, PersistedEventPosition(instance, stream_id)
+ )
+ )
+
+ return {user_id: frozenset(v) for user_id, v in result.items()}
+
async def get_users_server_still_shares_room_with(
self, user_ids: Collection[str]
) -> Set[str]:
|