diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 1e35046e07..2b31ce54bb 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -29,12 +29,15 @@ from typing import (
from synapse.api.constants import EventTypes
from synapse.events import EventBase
+from synapse.logging.opentracing import tag_args, trace
+from synapse.storage.roommember import ProfileInfo
from synapse.storage.state import StateFilter
from synapse.storage.util.partial_state_events_tracker import (
PartialCurrentStateTracker,
PartialStateEventsTracker,
)
from synapse.types import MutableStateMap, StateMap
+from synapse.util.cancellation import cancellable
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -179,6 +182,7 @@ class StateStorageController:
return self.stores.state._get_state_groups_from_groups(groups, state_filter)
+ @trace
async def get_state_for_events(
self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
) -> Dict[str, StateMap[EventBase]]:
@@ -225,10 +229,14 @@ class StateStorageController:
return {event: event_to_state[event] for event in event_ids}
+ @trace
+ @tag_args
+ @cancellable
async def get_state_ids_for_events(
self,
event_ids: Collection[str],
state_filter: Optional[StateFilter] = None,
+ await_full_state: bool = True,
) -> Dict[str, StateMap[str]]:
"""
Get the state dicts corresponding to a list of events, containing the event_ids
@@ -237,6 +245,9 @@ class StateStorageController:
Args:
event_ids: events whose state should be returned
state_filter: The state filter used to fetch state from the database.
+ await_full_state: if `True`, will block if we do not yet have complete state
+ at these events and `state_filter` is not satisfied by partial state.
+ Defaults to `True`.
Returns:
A dict from event_id -> (type, state_key) -> event_id
@@ -245,8 +256,12 @@ class StateStorageController:
RuntimeError if we don't have a state group for one or more of the events
(ie they are outliers or unknown)
"""
- await_full_state = True
- if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
+ if (
+ await_full_state
+ and state_filter
+ and not state_filter.must_await_full_state(self._is_mine_id)
+ ):
+ # Full state is not required if the state filter is restrictive enough.
await_full_state = False
event_to_groups = await self.get_state_group_for_events(
@@ -287,8 +302,12 @@ class StateStorageController:
)
return state_map[event_id]
+ @trace
async def get_state_ids_for_event(
- self, event_id: str, state_filter: Optional[StateFilter] = None
+ self,
+ event_id: str,
+ state_filter: Optional[StateFilter] = None,
+ await_full_state: bool = True,
) -> StateMap[str]:
"""
Get the state dict corresponding to a particular event
@@ -296,6 +315,9 @@ class StateStorageController:
Args:
event_id: event whose state should be returned
state_filter: The state filter used to fetch state from the database.
+ await_full_state: if `True`, will block if we do not yet have complete state
+ at the event and `state_filter` is not satisfied by partial state.
+ Defaults to `True`.
Returns:
A dict from (type, state_key) -> state_event_id
@@ -305,7 +327,9 @@ class StateStorageController:
outlier or is unknown)
"""
state_map = await self.get_state_ids_for_events(
- [event_id], state_filter or StateFilter.all()
+ [event_id],
+ state_filter or StateFilter.all(),
+ await_full_state=await_full_state,
)
return state_map[event_id]
@@ -327,6 +351,9 @@ class StateStorageController:
groups, state_filter or StateFilter.all()
)
+ @trace
+ @tag_args
+ @cancellable
async def get_state_group_for_events(
self,
event_ids: Collection[str],
@@ -375,10 +402,12 @@ class StateStorageController:
event_id, room_id, prev_group, delta_ids, current_state_ids
)
+ @cancellable
async def get_current_state_ids(
self,
room_id: str,
state_filter: Optional[StateFilter] = None,
+ await_full_state: bool = True,
on_invalidate: Optional[Callable[[], None]] = None,
) -> StateMap[str]:
"""Get the current state event ids for a room based on the
@@ -391,13 +420,17 @@ class StateStorageController:
room_id: The room to get the state IDs of. state_filter: The state
filter used to fetch state from the
database.
+ await_full_state: if true, will block if we do not yet have complete
+ state for the room.
on_invalidate: Callback for when the `get_current_state_ids` cache
for the room gets invalidated.
Returns:
The current state of the room.
"""
- if not state_filter or state_filter.must_await_full_state(self._is_mine_id):
+ if await_full_state and (
+ not state_filter or state_filter.must_await_full_state(self._is_mine_id)
+ ):
await self._partial_state_room_tracker.await_full_state(room_id)
if state_filter and not state_filter.is_full():
@@ -468,6 +501,7 @@ class StateStorageController:
prev_stream_id, max_stream_id
)
+ @trace
async def get_current_state(
self, room_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[EventBase]:
@@ -496,8 +530,67 @@ class StateStorageController:
return state_map.get(key)
async def get_current_hosts_in_room(self, room_id: str) -> Set[str]:
- """Get current hosts in room based on current state."""
+ """Get current hosts in room based on current state.
+
+ Blocks until we have full state for the given room. This only happens for rooms
+ with partial state.
+ """
await self._partial_state_room_tracker.await_full_state(room_id)
return await self.stores.main.get_current_hosts_in_room(room_id)
+
+ async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]:
+ """Get current hosts in room based on current state.
+
+ Blocks until we have full state for the given room. This only happens for rooms
+ with partial state.
+
+ Returns:
+ A list of hosts in the room, sorted by longest in the room first. (aka.
+ sorted by join with the lowest depth first).
+ """
+
+ await self._partial_state_room_tracker.await_full_state(room_id)
+
+ return await self.stores.main.get_current_hosts_in_room_ordered(room_id)
+
+ async def get_current_hosts_in_room_or_partial_state_approximation(
+ self, room_id: str
+ ) -> Collection[str]:
+ """Get approximation of current hosts in room based on current state.
+
+ For rooms with full state, this is equivalent to `get_current_hosts_in_room`,
+ with the same order of results.
+
+ For rooms with partial state, no blocking occurs. Instead, the list of hosts
+ in the room at the time of joining is combined with the list of hosts which
+ joined the room afterwards. The returned list may include hosts that are not
+ actually in the room and exclude hosts that are in the room, since we may
+ calculate state incorrectly during the partial state phase. The order of results
+ is arbitrary for rooms with partial state.
+ """
+ # We have to read this list first to mitigate races with un-partial stating.
+ # This will be empty for rooms with full state.
+ hosts_at_join = await self.stores.main.get_partial_state_servers_at_join(
+ room_id
+ )
+
+ hosts_from_state = await self.stores.main.get_current_hosts_in_room(room_id)
+
+ hosts = set(hosts_at_join)
+ hosts.update(hosts_from_state)
+
+ return hosts
+
+ async def get_users_in_room_with_profiles(
+ self, room_id: str
+ ) -> Dict[str, ProfileInfo]:
+ """
+ Get the current users in the room with their profiles.
+ If the room is currently partial-stated, this will block until the room has
+ full state.
+ """
+ await self._partial_state_room_tracker.await_full_state(room_id)
+
+ return await self.stores.main.get_users_in_room_with_profiles(room_id)
|