diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 105e4e1fec..bac21ecf9c 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -18,41 +18,20 @@ The storage layer is split up into multiple parts to allow Synapse to run
against different configurations of databases (e.g. single or multiple
databases). The `DatabasePool` class represents connections to a single physical
database. The `databases` are classes that talk directly to a `DatabasePool`
-instance and have associated schemas, background updates, etc. On top of those
-there are classes that provide high level interfaces that combine calls to
-multiple `databases`.
+instance and have associated schemas, background updates, etc.
+
+On top of the databases are the StorageControllers, located in the
+`synapse.storage.controllers` module. These classes provide high level
+interfaces that combine calls to multiple `databases`. They are bundled into the
+`StorageControllers` singleton for ease of use, and exposed via
+`HomeServer.get_storage_controllers()`.
There are also schemas that get applied to every database, regardless of the
data stores associated with them (e.g. the schema version tables), which are
stored in `synapse.storage.schema`.
"""
-from typing import TYPE_CHECKING
from synapse.storage.databases import Databases
from synapse.storage.databases.main import DataStore
-from synapse.storage.persist_events import EventsPersistenceStorage
-from synapse.storage.purge_events import PurgeEventsStorage
-from synapse.storage.state import StateGroupStorage
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
__all__ = ["Databases", "DataStore"]
-
-
-class Storage:
- """The high level interfaces for talking to various storage layers."""
-
- def __init__(self, hs: "HomeServer", stores: Databases):
- # We include the main data store here mainly so that we don't have to
- # rewrite all the existing code to split it into high vs low level
- # interfaces.
- self.main = stores.main
-
- self.purge_events = PurgeEventsStorage(hs, stores)
- self.state = StateGroupStorage(hs, stores)
-
- self.persistence = None
- if stores.persist_events:
- self.persistence = EventsPersistenceStorage(hs, stores)
diff --git a/synapse/storage/controllers/__init__.py b/synapse/storage/controllers/__init__.py
new file mode 100644
index 0000000000..992261d07b
--- /dev/null
+++ b/synapse/storage/controllers/__init__.py
@@ -0,0 +1,46 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import TYPE_CHECKING
+
+from synapse.storage.controllers.persist_events import (
+ EventsPersistenceStorageController,
+)
+from synapse.storage.controllers.purge_events import PurgeEventsStorageController
+from synapse.storage.controllers.state import StateGroupStorageController
+from synapse.storage.databases import Databases
+from synapse.storage.databases.main import DataStore
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
+__all__ = ["Databases", "DataStore"]
+
+
+class StorageControllers:
+ """The high level interfaces for talking to various storage controller layers."""
+
+ def __init__(self, hs: "HomeServer", stores: Databases):
+ # We include the main data store here mainly so that we don't have to
+ # rewrite all the existing code to split it into high vs low level
+ # interfaces.
+ self.main = stores.main
+
+ self.purge_events = PurgeEventsStorageController(hs, stores)
+ self.state = StateGroupStorageController(hs, stores)
+
+ self.persistence = None
+ if stores.persist_events:
+ self.persistence = EventsPersistenceStorageController(hs, stores)
diff --git a/synapse/storage/persist_events.py b/synapse/storage/controllers/persist_events.py
index a21dea91c8..ef8c135b12 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -272,7 +272,7 @@ class _EventPeristenceQueue(Generic[_PersistResult]):
pass
-class EventsPersistenceStorage:
+class EventsPersistenceStorageController:
"""High level interface for handling persisting newly received events.
Takes care of batching up events by room, and calculating the necessary
diff --git a/synapse/storage/purge_events.py b/synapse/storage/controllers/purge_events.py
index 30669beb7c..9ca50d6a09 100644
--- a/synapse/storage/purge_events.py
+++ b/synapse/storage/controllers/purge_events.py
@@ -24,7 +24,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-class PurgeEventsStorage:
+class PurgeEventsStorageController:
"""High level interface for purging rooms and event history."""
def __init__(self, hs: "HomeServer", stores: Databases):
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
new file mode 100644
index 0000000000..0f09953086
--- /dev/null
+++ b/synapse/storage/controllers/state.py
@@ -0,0 +1,351 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import (
+ TYPE_CHECKING,
+ Awaitable,
+ Collection,
+ Dict,
+ Iterable,
+ List,
+ Mapping,
+ Optional,
+ Tuple,
+)
+
+from synapse.events import EventBase
+from synapse.storage.state import StateFilter
+from synapse.storage.util.partial_state_events_tracker import PartialStateEventsTracker
+from synapse.types import MutableStateMap, StateMap
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+ from synapse.storage.databases import Databases
+
+logger = logging.getLogger(__name__)
+
+
+class StateGroupStorageController:
+ """High level interface to fetching state for event."""
+
+ def __init__(self, hs: "HomeServer", stores: "Databases"):
+ self._is_mine_id = hs.is_mine_id
+ self.stores = stores
+ self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
+
+ def notify_event_un_partial_stated(self, event_id: str) -> None:
+ self._partial_state_events_tracker.notify_un_partial_stated(event_id)
+
+ async def get_state_group_delta(
+ self, state_group: int
+ ) -> Tuple[Optional[int], Optional[StateMap[str]]]:
+ """Given a state group try to return a previous group and a delta between
+ the old and the new.
+
+ Args:
+ state_group: The state group used to retrieve state deltas.
+
+ Returns:
+ A tuple of the previous group and a state map of the event IDs which
+ make up the delta between the old and new state groups.
+ """
+
+ state_group_delta = await self.stores.state.get_state_group_delta(state_group)
+ return state_group_delta.prev_group, state_group_delta.delta_ids
+
+ async def get_state_groups_ids(
+ self, _room_id: str, event_ids: Collection[str]
+ ) -> Dict[int, MutableStateMap[str]]:
+ """Get the event IDs of all the state for the state groups for the given events
+
+ Args:
+ _room_id: id of the room for these events
+ event_ids: ids of the events
+
+ Returns:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
+
+ Raises:
+ RuntimeError if we don't have a state group for one or more of the events
+ (ie they are outliers or unknown)
+ """
+ if not event_ids:
+ return {}
+
+ event_to_groups = await self.get_state_group_for_events(event_ids)
+
+ groups = set(event_to_groups.values())
+ group_to_state = await self.stores.state._get_state_for_groups(groups)
+
+ return group_to_state
+
+ async def get_state_ids_for_group(
+ self, state_group: int, state_filter: Optional[StateFilter] = None
+ ) -> StateMap[str]:
+ """Get the event IDs of all the state in the given state group
+
+ Args:
+ state_group: A state group for which we want to get the state IDs.
+ state_filter: specifies the type of state event to fetch from DB, example: EventTypes.JoinRules
+
+ Returns:
+ Resolves to a map of (type, state_key) -> event_id
+ """
+ group_to_state = await self.get_state_for_groups((state_group,), state_filter)
+
+ return group_to_state[state_group]
+
+ async def get_state_groups(
+ self, room_id: str, event_ids: Collection[str]
+ ) -> Dict[int, List[EventBase]]:
+ """Get the state groups for the given list of event_ids
+
+ Args:
+ room_id: ID of the room for these events.
+ event_ids: The event IDs to retrieve state for.
+
+ Returns:
+ dict of state_group_id -> list of state events.
+ """
+ if not event_ids:
+ return {}
+
+ group_to_ids = await self.get_state_groups_ids(room_id, event_ids)
+
+ state_event_map = await self.stores.main.get_events(
+ [
+ ev_id
+ for group_ids in group_to_ids.values()
+ for ev_id in group_ids.values()
+ ],
+ get_prev_content=False,
+ )
+
+ return {
+ group: [
+ state_event_map[v]
+ for v in event_id_map.values()
+ if v in state_event_map
+ ]
+ for group, event_id_map in group_to_ids.items()
+ }
+
+ def _get_state_groups_from_groups(
+ self, groups: List[int], state_filter: StateFilter
+ ) -> Awaitable[Dict[int, StateMap[str]]]:
+ """Returns the state groups for a given set of groups, filtering on
+ types of state events.
+
+ Args:
+ groups: list of state group IDs to query
+ state_filter: The state filter used to fetch state
+ from the database.
+
+ Returns:
+ Dict of state group to state map.
+ """
+
+ return self.stores.state._get_state_groups_from_groups(groups, state_filter)
+
+ async def get_state_for_events(
+ self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
+ ) -> Dict[str, StateMap[EventBase]]:
+ """Given a list of event_ids and type tuples, return a list of state
+ dicts for each event.
+
+ Args:
+ event_ids: The events to fetch the state of.
+ state_filter: The state filter used to fetch state.
+
+ Returns:
+ A dict of (event_id) -> (type, state_key) -> [state_events]
+
+ Raises:
+ RuntimeError if we don't have a state group for one or more of the events
+ (ie they are outliers or unknown)
+ """
+ await_full_state = True
+ if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
+ await_full_state = False
+
+ event_to_groups = await self.get_state_group_for_events(
+ event_ids, await_full_state=await_full_state
+ )
+
+ groups = set(event_to_groups.values())
+ group_to_state = await self.stores.state._get_state_for_groups(
+ groups, state_filter or StateFilter.all()
+ )
+
+ state_event_map = await self.stores.main.get_events(
+ [ev_id for sd in group_to_state.values() for ev_id in sd.values()],
+ get_prev_content=False,
+ )
+
+ event_to_state = {
+ event_id: {
+ k: state_event_map[v]
+ for k, v in group_to_state[group].items()
+ if v in state_event_map
+ }
+ for event_id, group in event_to_groups.items()
+ }
+
+ return {event: event_to_state[event] for event in event_ids}
+
+ async def get_state_ids_for_events(
+ self,
+ event_ids: Collection[str],
+ state_filter: Optional[StateFilter] = None,
+ ) -> Dict[str, StateMap[str]]:
+ """
+ Get the state dicts corresponding to a list of events, containing the event_ids
+ of the state events (as opposed to the events themselves)
+
+ Args:
+ event_ids: events whose state should be returned
+ state_filter: The state filter used to fetch state from the database.
+
+ Returns:
+ A dict from event_id -> (type, state_key) -> event_id
+
+ Raises:
+ RuntimeError if we don't have a state group for one or more of the events
+ (ie they are outliers or unknown)
+ """
+ await_full_state = True
+ if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
+ await_full_state = False
+
+ event_to_groups = await self.get_state_group_for_events(
+ event_ids, await_full_state=await_full_state
+ )
+
+ groups = set(event_to_groups.values())
+ group_to_state = await self.stores.state._get_state_for_groups(
+ groups, state_filter or StateFilter.all()
+ )
+
+ event_to_state = {
+ event_id: group_to_state[group]
+ for event_id, group in event_to_groups.items()
+ }
+
+ return {event: event_to_state[event] for event in event_ids}
+
+ async def get_state_for_event(
+ self, event_id: str, state_filter: Optional[StateFilter] = None
+ ) -> StateMap[EventBase]:
+ """
+ Get the state dict corresponding to a particular event
+
+ Args:
+ event_id: event whose state should be returned
+ state_filter: The state filter used to fetch state from the database.
+
+ Returns:
+ A dict from (type, state_key) -> state_event
+
+ Raises:
+ RuntimeError if we don't have a state group for the event (ie it is an
+ outlier or is unknown)
+ """
+ state_map = await self.get_state_for_events(
+ [event_id], state_filter or StateFilter.all()
+ )
+ return state_map[event_id]
+
+ async def get_state_ids_for_event(
+ self, event_id: str, state_filter: Optional[StateFilter] = None
+ ) -> StateMap[str]:
+ """
+ Get the state dict corresponding to a particular event
+
+ Args:
+ event_id: event whose state should be returned
+ state_filter: The state filter used to fetch state from the database.
+
+ Returns:
+ A dict from (type, state_key) -> state_event_id
+
+ Raises:
+ RuntimeError if we don't have a state group for the event (ie it is an
+ outlier or is unknown)
+ """
+ state_map = await self.get_state_ids_for_events(
+ [event_id], state_filter or StateFilter.all()
+ )
+ return state_map[event_id]
+
+ def get_state_for_groups(
+ self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
+ ) -> Awaitable[Dict[int, MutableStateMap[str]]]:
+ """Gets the state at each of a list of state groups, optionally
+ filtering by type/state_key
+
+ Args:
+ groups: list of state groups for which we want to get the state.
+ state_filter: The state filter used to fetch state.
+ from the database.
+
+ Returns:
+ Dict of state group to state map.
+ """
+ return self.stores.state._get_state_for_groups(
+ groups, state_filter or StateFilter.all()
+ )
+
+ async def get_state_group_for_events(
+ self,
+ event_ids: Collection[str],
+ await_full_state: bool = True,
+ ) -> Mapping[str, int]:
+ """Returns mapping event_id -> state_group
+
+ Args:
+ event_ids: events to get state groups for
+ await_full_state: if true, will block if we do not yet have complete
+ state at these events.
+ """
+ if await_full_state:
+ await self._partial_state_events_tracker.await_full_state(event_ids)
+
+ return await self.stores.main._get_state_group_for_events(event_ids)
+
+ async def store_state_group(
+ self,
+ event_id: str,
+ room_id: str,
+ prev_group: Optional[int],
+ delta_ids: Optional[StateMap[str]],
+ current_state_ids: StateMap[str],
+ ) -> int:
+ """Store a new set of state, returning a newly assigned state group.
+
+ Args:
+ event_id: The event ID for which the state was calculated.
+ room_id: ID of the room for which the state was calculated.
+ prev_group: A previous state group for the room, optional.
+ delta_ids: The delta between state at `prev_group` and
+ `current_state_ids`, if `prev_group` was given. Same format as
+ `current_state_ids`.
+ current_state_ids: The state to store. Map of (type, state_key)
+ to event_id.
+
+ Returns:
+ The state group ID
+ """
+ return await self.stores.state.store_state_group(
+ event_id, room_id, prev_group, delta_ids, current_state_ids
+ )
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index ab630953ac..96aaffb53c 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -15,7 +15,6 @@
import logging
from typing import (
TYPE_CHECKING,
- Awaitable,
Callable,
Collection,
Dict,
@@ -32,15 +31,11 @@ import attr
from frozendict import frozendict
from synapse.api.constants import EventTypes
-from synapse.events import EventBase
-from synapse.storage.util.partial_state_events_tracker import PartialStateEventsTracker
from synapse.types import MutableStateMap, StateKey, StateMap
if TYPE_CHECKING:
from typing import FrozenSet # noqa: used within quoted type hint; flake8 sad
- from synapse.server import HomeServer
- from synapse.storage.databases import Databases
logger = logging.getLogger(__name__)
@@ -578,318 +573,3 @@ _ALL_NON_MEMBER_STATE_FILTER = StateFilter(
types=frozendict({EventTypes.Member: frozenset()}), include_others=True
)
_NONE_STATE_FILTER = StateFilter(types=frozendict(), include_others=False)
-
-
-class StateGroupStorage:
- """High level interface to fetching state for event."""
-
- def __init__(self, hs: "HomeServer", stores: "Databases"):
- self._is_mine_id = hs.is_mine_id
- self.stores = stores
- self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
-
- def notify_event_un_partial_stated(self, event_id: str) -> None:
- self._partial_state_events_tracker.notify_un_partial_stated(event_id)
-
- async def get_state_group_delta(
- self, state_group: int
- ) -> Tuple[Optional[int], Optional[StateMap[str]]]:
- """Given a state group try to return a previous group and a delta between
- the old and the new.
-
- Args:
- state_group: The state group used to retrieve state deltas.
-
- Returns:
- A tuple of the previous group and a state map of the event IDs which
- make up the delta between the old and new state groups.
- """
-
- state_group_delta = await self.stores.state.get_state_group_delta(state_group)
- return state_group_delta.prev_group, state_group_delta.delta_ids
-
- async def get_state_groups_ids(
- self, _room_id: str, event_ids: Collection[str]
- ) -> Dict[int, MutableStateMap[str]]:
- """Get the event IDs of all the state for the state groups for the given events
-
- Args:
- _room_id: id of the room for these events
- event_ids: ids of the events
-
- Returns:
- dict of state_group_id -> (dict of (type, state_key) -> event id)
-
- Raises:
- RuntimeError if we don't have a state group for one or more of the events
- (ie they are outliers or unknown)
- """
- if not event_ids:
- return {}
-
- event_to_groups = await self.get_state_group_for_events(event_ids)
-
- groups = set(event_to_groups.values())
- group_to_state = await self.stores.state._get_state_for_groups(groups)
-
- return group_to_state
-
- async def get_state_ids_for_group(
- self, state_group: int, state_filter: Optional[StateFilter] = None
- ) -> StateMap[str]:
- """Get the event IDs of all the state in the given state group
-
- Args:
- state_group: A state group for which we want to get the state IDs.
- state_filter: specifies the type of state event to fetch from DB, example: EventTypes.JoinRules
-
- Returns:
- Resolves to a map of (type, state_key) -> event_id
- """
- group_to_state = await self.get_state_for_groups((state_group,), state_filter)
-
- return group_to_state[state_group]
-
- async def get_state_groups(
- self, room_id: str, event_ids: Collection[str]
- ) -> Dict[int, List[EventBase]]:
- """Get the state groups for the given list of event_ids
-
- Args:
- room_id: ID of the room for these events.
- event_ids: The event IDs to retrieve state for.
-
- Returns:
- dict of state_group_id -> list of state events.
- """
- if not event_ids:
- return {}
-
- group_to_ids = await self.get_state_groups_ids(room_id, event_ids)
-
- state_event_map = await self.stores.main.get_events(
- [
- ev_id
- for group_ids in group_to_ids.values()
- for ev_id in group_ids.values()
- ],
- get_prev_content=False,
- )
-
- return {
- group: [
- state_event_map[v]
- for v in event_id_map.values()
- if v in state_event_map
- ]
- for group, event_id_map in group_to_ids.items()
- }
-
- def _get_state_groups_from_groups(
- self, groups: List[int], state_filter: StateFilter
- ) -> Awaitable[Dict[int, StateMap[str]]]:
- """Returns the state groups for a given set of groups, filtering on
- types of state events.
-
- Args:
- groups: list of state group IDs to query
- state_filter: The state filter used to fetch state
- from the database.
-
- Returns:
- Dict of state group to state map.
- """
-
- return self.stores.state._get_state_groups_from_groups(groups, state_filter)
-
- async def get_state_for_events(
- self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
- ) -> Dict[str, StateMap[EventBase]]:
- """Given a list of event_ids and type tuples, return a list of state
- dicts for each event.
-
- Args:
- event_ids: The events to fetch the state of.
- state_filter: The state filter used to fetch state.
-
- Returns:
- A dict of (event_id) -> (type, state_key) -> [state_events]
-
- Raises:
- RuntimeError if we don't have a state group for one or more of the events
- (ie they are outliers or unknown)
- """
- await_full_state = True
- if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
- await_full_state = False
-
- event_to_groups = await self.get_state_group_for_events(
- event_ids, await_full_state=await_full_state
- )
-
- groups = set(event_to_groups.values())
- group_to_state = await self.stores.state._get_state_for_groups(
- groups, state_filter or StateFilter.all()
- )
-
- state_event_map = await self.stores.main.get_events(
- [ev_id for sd in group_to_state.values() for ev_id in sd.values()],
- get_prev_content=False,
- )
-
- event_to_state = {
- event_id: {
- k: state_event_map[v]
- for k, v in group_to_state[group].items()
- if v in state_event_map
- }
- for event_id, group in event_to_groups.items()
- }
-
- return {event: event_to_state[event] for event in event_ids}
-
- async def get_state_ids_for_events(
- self,
- event_ids: Collection[str],
- state_filter: Optional[StateFilter] = None,
- ) -> Dict[str, StateMap[str]]:
- """
- Get the state dicts corresponding to a list of events, containing the event_ids
- of the state events (as opposed to the events themselves)
-
- Args:
- event_ids: events whose state should be returned
- state_filter: The state filter used to fetch state from the database.
-
- Returns:
- A dict from event_id -> (type, state_key) -> event_id
-
- Raises:
- RuntimeError if we don't have a state group for one or more of the events
- (ie they are outliers or unknown)
- """
- await_full_state = True
- if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
- await_full_state = False
-
- event_to_groups = await self.get_state_group_for_events(
- event_ids, await_full_state=await_full_state
- )
-
- groups = set(event_to_groups.values())
- group_to_state = await self.stores.state._get_state_for_groups(
- groups, state_filter or StateFilter.all()
- )
-
- event_to_state = {
- event_id: group_to_state[group]
- for event_id, group in event_to_groups.items()
- }
-
- return {event: event_to_state[event] for event in event_ids}
-
- async def get_state_for_event(
- self, event_id: str, state_filter: Optional[StateFilter] = None
- ) -> StateMap[EventBase]:
- """
- Get the state dict corresponding to a particular event
-
- Args:
- event_id: event whose state should be returned
- state_filter: The state filter used to fetch state from the database.
-
- Returns:
- A dict from (type, state_key) -> state_event
-
- Raises:
- RuntimeError if we don't have a state group for the event (ie it is an
- outlier or is unknown)
- """
- state_map = await self.get_state_for_events(
- [event_id], state_filter or StateFilter.all()
- )
- return state_map[event_id]
-
- async def get_state_ids_for_event(
- self, event_id: str, state_filter: Optional[StateFilter] = None
- ) -> StateMap[str]:
- """
- Get the state dict corresponding to a particular event
-
- Args:
- event_id: event whose state should be returned
- state_filter: The state filter used to fetch state from the database.
-
- Returns:
- A dict from (type, state_key) -> state_event_id
-
- Raises:
- RuntimeError if we don't have a state group for the event (ie it is an
- outlier or is unknown)
- """
- state_map = await self.get_state_ids_for_events(
- [event_id], state_filter or StateFilter.all()
- )
- return state_map[event_id]
-
- def get_state_for_groups(
- self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
- ) -> Awaitable[Dict[int, MutableStateMap[str]]]:
- """Gets the state at each of a list of state groups, optionally
- filtering by type/state_key
-
- Args:
- groups: list of state groups for which we want to get the state.
- state_filter: The state filter used to fetch state.
- from the database.
-
- Returns:
- Dict of state group to state map.
- """
- return self.stores.state._get_state_for_groups(
- groups, state_filter or StateFilter.all()
- )
-
- async def get_state_group_for_events(
- self,
- event_ids: Collection[str],
- await_full_state: bool = True,
- ) -> Mapping[str, int]:
- """Returns mapping event_id -> state_group
-
- Args:
- event_ids: events to get state groups for
- await_full_state: if true, will block if we do not yet have complete
- state at these events.
- """
- if await_full_state:
- await self._partial_state_events_tracker.await_full_state(event_ids)
-
- return await self.stores.main._get_state_group_for_events(event_ids)
-
- async def store_state_group(
- self,
- event_id: str,
- room_id: str,
- prev_group: Optional[int],
- delta_ids: Optional[StateMap[str]],
- current_state_ids: StateMap[str],
- ) -> int:
- """Store a new set of state, returning a newly assigned state group.
-
- Args:
- event_id: The event ID for which the state was calculated.
- room_id: ID of the room for which the state was calculated.
- prev_group: A previous state group for the room, optional.
- delta_ids: The delta between state at `prev_group` and
- `current_state_ids`, if `prev_group` was given. Same format as
- `current_state_ids`.
- current_state_ids: The state to store. Map of (type, state_key)
- to event_id.
-
- Returns:
- The state group ID
- """
- return await self.stores.state.store_state_group(
- event_id, room_id, prev_group, delta_ids, current_state_ids
- )
|