diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index e3be537cee..2a1e567ce0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -963,6 +963,21 @@ class PersistEventsStore:
values=to_insert,
)
+ async def update_current_state(
+ self,
+ room_id: str,
+ state_delta: DeltaState,
+ stream_id: int,
+ ) -> None:
+ """Update the current state stored in the datatabase for the given room"""
+
+ await self.db_pool.runInteraction(
+ "update_current_state",
+ self._update_current_state_txn,
+ state_delta_by_room={room_id: state_delta},
+ stream_id=stream_id,
+ )
+
def _update_current_state_txn(
self,
txn: LoggingTransaction,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a60e3f4fdd..5288cdba03 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1979,3 +1979,27 @@ class EventsWorkerStore(SQLBaseStore):
desc="is_partial_state_event",
)
return result is not None
+
+ async def get_partial_state_events_batch(self, room_id: str) -> List[str]:
+ """Get a list of events in the given room that have partial state"""
+ return await self.db_pool.runInteraction(
+ "get_partial_state_events_batch",
+ self._get_partial_state_events_batch_txn,
+ room_id,
+ )
+
+ @staticmethod
+ def _get_partial_state_events_batch_txn(
+ txn: LoggingTransaction, room_id: str
+ ) -> List[str]:
+ txn.execute(
+ """
+ SELECT event_id FROM partial_state_events AS pse
+ JOIN events USING (event_id)
+ WHERE pse.room_id = ?
+ ORDER BY events.stream_ordering
+ LIMIT 100
+ """,
+ (room_id,),
+ )
+ return [row[0] for row in txn]
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 18b1acd9e1..87e9482c60 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1077,6 +1077,37 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
get_rooms_for_retention_period_in_range_txn,
)
+ async def clear_partial_state_room(self, room_id: str) -> bool:
+ # this can race with incoming events, so we watch out for FK errors.
+ # TODO(faster_joins): this still doesn't completely fix the race, since the persist process
+ # is not atomic. I fear we need an application-level lock.
+ try:
+ await self.db_pool.runInteraction(
+ "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
+ )
+ return True
+ except self.db_pool.engine.module.DatabaseError as e:
+ # TODO(faster_joins): how do we distinguish between FK errors and other errors?
+ logger.warning(
+ "Exception while clearing lazy partial-state-room %s, retrying: %s",
+ room_id,
+ e,
+ )
+ return False
+
+ @staticmethod
+ def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None:
+ DatabasePool.simple_delete_txn(
+ txn,
+ table="partial_state_rooms_servers",
+ keyvalues={"room_id": room_id},
+ )
+ DatabasePool.simple_delete_one_txn(
+ txn,
+ table="partial_state_rooms",
+ keyvalues={"room_id": room_id},
+ )
+
class _BackgroundUpdates:
REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index ecdc1fdc4c..eba35f3700 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -21,6 +21,7 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
DatabasePool,
@@ -354,6 +355,53 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
return {row["state_group"] for row in rows}
+ async def update_state_for_partial_state_event(
+ self,
+ event: EventBase,
+ context: EventContext,
+ ) -> None:
+ """Update the state group for a partial state event"""
+ await self.db_pool.runInteraction(
+ "update_state_for_partial_state_event",
+ self._update_state_for_partial_state_event_txn,
+ event,
+ context,
+ )
+
+ def _update_state_for_partial_state_event_txn(
+ self,
+ txn,
+ event: EventBase,
+ context: EventContext,
+ ):
+ # we shouldn't have any outliers here
+ assert not event.internal_metadata.is_outlier()
+
+ # anything that was rejected should have the same state as its
+ # predecessor.
+ if context.rejected:
+ assert context.state_group == context.state_group_before_event
+
+ self.db_pool.simple_update_txn(
+ txn,
+ table="event_to_state_groups",
+ keyvalues={"event_id": event.event_id},
+ updatevalues={"state_group": context.state_group},
+ )
+
+ self.db_pool.simple_delete_one_txn(
+ txn,
+ table="partial_state_events",
+ keyvalues={"event_id": event.event_id},
+ )
+
+ # TODO(faster_joins): need to do something about workers here
+ txn.call_after(
+ self._get_state_group_for_event.prefill,
+ (event.event_id,),
+ context.state_group,
+ )
+
class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index b402922817..e496ba7bed 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -376,6 +376,62 @@ class EventsPersistenceStorage:
pos = PersistedEventPosition(self._instance_name, event_stream_id)
return event, pos, self.main_store.get_room_max_token()
+ async def update_current_state(self, room_id: str) -> None:
+ """Recalculate the current state for a room, and persist it"""
+ state = await self._calculate_current_state(room_id)
+ delta = await self._calculate_state_delta(room_id, state)
+
+ # TODO(faster_joins): get a real stream ordering, to make this work correctly
+ # across workers.
+ #
+ # TODO(faster_joins): this can race against event persistence, in which case we
+ # will end up with incorrect state. Perhaps we should make this a job we
+ # farm out to the event persister, somehow.
+ stream_id = self.main_store.get_room_max_stream_ordering()
+ await self.persist_events_store.update_current_state(room_id, delta, stream_id)
+
+ async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
+ """Calculate the current state of a room, based on the forward extremities
+
+ Args:
+ room_id: room for which to calculate current state
+
+ Returns:
+ map from (type, state_key) to event id for the current state in the room
+ """
+ latest_event_ids = await self.main_store.get_latest_event_ids_in_room(room_id)
+ state_groups = set(
+ (
+ await self.main_store._get_state_group_for_events(latest_event_ids)
+ ).values()
+ )
+
+ state_maps_by_state_group = await self.state_store._get_state_for_groups(
+ state_groups
+ )
+
+ if len(state_groups) == 1:
+ # If there is only one state group, then we know what the current
+ # state is.
+ return state_maps_by_state_group[state_groups.pop()]
+
+ # Ok, we need to defer to the state handler to resolve our state sets.
+ logger.debug("calling resolve_state_groups from preserve_events")
+
+ # Avoid a circular import.
+ from synapse.state import StateResolutionStore
+
+ room_version = await self.main_store.get_room_version_id(room_id)
+ res = await self._state_resolution_handler.resolve_state_groups(
+ room_id,
+ room_version,
+ state_maps_by_state_group,
+ event_map=None,
+ state_res_store=StateResolutionStore(self.main_store),
+ )
+
+ return res.state
+
async def _persist_event_batch(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
|