diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 72faf2ee38..a0cbeedc30 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -166,7 +166,7 @@ class DeviceWorkerHandler:
possibly_changed = set(changed)
possibly_left = set()
for room_id in rooms_changed:
- current_state_ids = await self.store.get_current_state_ids(room_id)
+ current_state_ids = await self._state_storage.get_current_state_ids(room_id)
# The user may have left the room
# TODO: Check if they actually did or if we were just invited.
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 4aa33df884..44e84698c4 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -45,6 +45,7 @@ class DirectoryHandler:
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
self.config = hs.config
self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search
self.require_membership = hs.config.server.require_membership_for_aliases
@@ -463,7 +464,11 @@ class DirectoryHandler:
making_public = visibility == "public"
if making_public:
room_aliases = await self.store.get_aliases_for_room(room_id)
- canonical_alias = await self.store.get_canonical_alias_for_room(room_id)
+ canonical_alias = (
+ await self._storage_controllers.state.get_canonical_alias_for_room(
+ room_id
+ )
+ )
if canonical_alias:
room_aliases.append(canonical_alias)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 659f279441..b212ee2172 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -750,7 +750,9 @@ class FederationHandler:
# Note that this requires the /send_join request to come back to the
# same server.
if room_version.msc3083_join_rules:
- state_ids = await self.store.get_current_state_ids(room_id)
+ state_ids = await self._state_storage_controller.get_current_state_ids(
+ room_id
+ )
if await self._event_auth_handler.has_restricted_join_rules(
state_ids, room_version
):
@@ -1552,6 +1554,9 @@ class FederationHandler:
success = await self.store.clear_partial_state_room(room_id)
if success:
logger.info("State resync complete for %s", room_id)
+ self._storage_controllers.state.notify_room_un_partial_stated(
+ room_id
+ )
# TODO(faster_joins) update room stats and user directory?
return
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ac911a2ddc..081625f0bd 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -217,7 +217,7 @@ class MessageHandler:
)
if membership == Membership.JOIN:
- state_ids = await self.store.get_filtered_current_state_ids(
+ state_ids = await self._state_storage_controller.get_current_state_ids(
room_id, state_filter=state_filter
)
room_state = await self.store.get_events(state_ids.values())
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index bf112b9e1e..895ea63ed3 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -134,6 +134,7 @@ class BasePresenceHandler(abc.ABC):
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
self.store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
self.presence_router = hs.get_presence_router()
self.state = hs.get_state_handler()
self.is_mine_id = hs.is_mine_id
@@ -1348,7 +1349,10 @@ class PresenceHandler(BasePresenceHandler):
self._event_pos,
room_max_stream_ordering,
)
- max_pos, deltas = await self.store.get_current_state_deltas(
+ (
+ max_pos,
+ deltas,
+ ) = await self._storage_controllers.state.get_current_state_deltas(
self._event_pos, room_max_stream_ordering
)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 05bb1e0225..338204287f 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -87,6 +87,7 @@ class LoginDict(TypedDict):
class RegistrationHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
self.clock = hs.get_clock()
self.hs = hs
self.auth = hs.get_auth()
@@ -528,7 +529,7 @@ class RegistrationHandler:
if requires_invite:
# If the server is in the room, check if the room is public.
- state = await self.store.get_filtered_current_state_ids(
+ state = await self._storage_controllers.state.get_current_state_ids(
room_id, StateFilter.from_types([(EventTypes.JoinRules, "")])
)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index e1341dd9bb..e2b0e519d4 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -107,6 +107,7 @@ class EventContext:
class RoomCreationHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
self.auth = hs.get_auth()
self.clock = hs.get_clock()
self.hs = hs
@@ -480,8 +481,10 @@ class RoomCreationHandler:
if room_type == RoomTypes.SPACE:
types_to_copy.append((EventTypes.SpaceChild, None))
- old_room_state_ids = await self.store.get_filtered_current_state_ids(
- old_room_id, StateFilter.from_types(types_to_copy)
+ old_room_state_ids = (
+ await self._storage_controllers.state.get_current_state_ids(
+ old_room_id, StateFilter.from_types(types_to_copy)
+ )
)
# map from event_id to BaseEvent
old_room_state_events = await self.store.get_events(old_room_state_ids.values())
@@ -558,8 +561,10 @@ class RoomCreationHandler:
)
# Transfer membership events
- old_room_member_state_ids = await self.store.get_filtered_current_state_ids(
- old_room_id, StateFilter.from_types([(EventTypes.Member, None)])
+ old_room_member_state_ids = (
+ await self._storage_controllers.state.get_current_state_ids(
+ old_room_id, StateFilter.from_types([(EventTypes.Member, None)])
+ )
)
# map from event_id to BaseEvent
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index f3577b5d5a..183d4ae3c4 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -50,6 +50,7 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler:
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
self.hs = hs
self.enable_room_list_search = hs.config.roomdirectory.enable_room_list_search
self.response_cache: ResponseCache[
@@ -274,7 +275,7 @@ class RoomListHandler:
if aliases:
result["aliases"] = aliases
- current_state_ids = await self.store.get_current_state_ids(
+ current_state_ids = await self._storage_controllers.state.get_current_state_ids(
room_id, on_invalidate=cache_context.invalidate
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 00662dc961..70c674ff8e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -68,6 +68,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
self.auth = hs.get_auth()
self.state_handler = hs.get_state_handler()
self.config = hs.config
@@ -994,7 +995,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# If the host is in the room, but not one of the authorised hosts
# for restricted join rules, a remote join must be used.
room_version = await self.store.get_room_version(room_id)
- current_state_ids = await self.store.get_current_state_ids(room_id)
+ current_state_ids = await self._storage_controllers.state.get_current_state_ids(
+ room_id
+ )
# If restricted join rules are not being used, a local join can always
# be used.
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 75aee6a111..13098f56ed 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -90,6 +90,7 @@ class RoomSummaryHandler:
def __init__(self, hs: "HomeServer"):
self._event_auth_handler = hs.get_event_auth_handler()
self._store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
self._event_serializer = hs.get_event_client_serializer()
self._server_name = hs.hostname
self._federation_client = hs.get_federation_client()
@@ -537,7 +538,7 @@ class RoomSummaryHandler:
Returns:
True if the room is accessible to the requesting user or server.
"""
- state_ids = await self._store.get_current_state_ids(room_id)
+ state_ids = await self._storage_controllers.state.get_current_state_ids(room_id)
# If there's no state for the room, it isn't known.
if not state_ids:
@@ -702,7 +703,9 @@ class RoomSummaryHandler:
# there should always be an entry
assert stats is not None, "unable to retrieve stats for %s" % (room_id,)
- current_state_ids = await self._store.get_current_state_ids(room_id)
+ current_state_ids = await self._storage_controllers.state.get_current_state_ids(
+ room_id
+ )
create_event = await self._store.get_event(
current_state_ids[(EventTypes.Create, "")]
)
@@ -760,7 +763,9 @@ class RoomSummaryHandler:
"""
# look for child rooms/spaces.
- current_state_ids = await self._store.get_current_state_ids(room_id)
+ current_state_ids = await self._storage_controllers.state.get_current_state_ids(
+ room_id
+ )
events = await self._store.get_events_as_list(
[
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 436cd971ce..f45e06eb0e 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -40,6 +40,7 @@ class StatsHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
self.clock = hs.get_clock()
@@ -105,7 +106,10 @@ class StatsHandler:
logger.debug(
"Processing room stats %s->%s", self.pos, room_max_stream_ordering
)
- max_pos, deltas = await self.store.get_current_state_deltas(
+ (
+ max_pos,
+ deltas,
+ ) = await self._storage_controllers.state.get_current_state_deltas(
self.pos, room_max_stream_ordering
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index a1d41358d9..b4ead79f97 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -506,8 +506,10 @@ class SyncHandler:
# ensure that we always include current state in the timeline
current_state_ids: FrozenSet[str] = frozenset()
if any(e.is_state() for e in recents):
- current_state_ids_map = await self.store.get_current_state_ids(
- room_id
+ current_state_ids_map = (
+ await self._state_storage_controller.get_current_state_ids(
+ room_id
+ )
)
current_state_ids = frozenset(current_state_ids_map.values())
@@ -574,8 +576,11 @@ class SyncHandler:
# ensure that we always include current state in the timeline
current_state_ids = frozenset()
if any(e.is_state() for e in loaded_recents):
- current_state_ids_map = await self.store.get_current_state_ids(
- room_id
+ # FIXME(faster_joins): We use the partial state here as
+ # we don't want to block `/sync` on finishing a lazy join.
+ # Is this the correct way of doing it?
+ current_state_ids_map = (
+ await self.store.get_partial_current_state_ids(room_id)
)
current_state_ids = frozenset(current_state_ids_map.values())
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 74f7fdfe6c..8c3c52e1ca 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -56,6 +56,7 @@ class UserDirectoryHandler(StateDeltasHandler):
super().__init__(hs)
self.store = hs.get_datastores().main
+ self._storage_controllers = hs.get_storage_controllers()
self.server_name = hs.hostname
self.clock = hs.get_clock()
self.notifier = hs.get_notifier()
@@ -174,7 +175,10 @@ class UserDirectoryHandler(StateDeltasHandler):
logger.debug(
"Processing user stats %s->%s", self.pos, room_max_stream_ordering
)
- max_pos, deltas = await self.store.get_current_state_deltas(
+ (
+ max_pos,
+ deltas,
+ ) = await self._storage_controllers.state.get_current_state_deltas(
self.pos, room_max_stream_ordering
)
|