diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index b402922817..e496ba7bed 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -376,6 +376,62 @@ class EventsPersistenceStorage:
pos = PersistedEventPosition(self._instance_name, event_stream_id)
return event, pos, self.main_store.get_room_max_token()
+ async def update_current_state(self, room_id: str) -> None:
+ """Recalculate the current state for a room, and persist it"""
+ state = await self._calculate_current_state(room_id)
+ delta = await self._calculate_state_delta(room_id, state)
+
+ # TODO(faster_joins): get a real stream ordering, to make this work correctly
+ # across workers.
+ #
+ # TODO(faster_joins): this can race against event persistence, in which case we
+ # will end up with incorrect state. Perhaps we should make this a job we
+ # farm out to the event persister, somehow.
+ stream_id = self.main_store.get_room_max_stream_ordering()
+ await self.persist_events_store.update_current_state(room_id, delta, stream_id)
+
+ async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
+ """Calculate the current state of a room, based on the forward extremities
+
+ Args:
+ room_id: room for which to calculate current state
+
+ Returns:
+ map from (type, state_key) to event id for the current state in the room
+ """
+ latest_event_ids = await self.main_store.get_latest_event_ids_in_room(room_id)
+ state_groups = set(
+ (
+ await self.main_store._get_state_group_for_events(latest_event_ids)
+ ).values()
+ )
+
+ state_maps_by_state_group = await self.state_store._get_state_for_groups(
+ state_groups
+ )
+
+ if len(state_groups) == 1:
+ # If there is only one state group, then we know what the current
+ # state is.
+ return state_maps_by_state_group[state_groups.pop()]
+
+ # Ok, we need to defer to the state handler to resolve our state sets.
+ logger.debug("calling resolve_state_groups from preserve_events")
+
+ # Avoid a circular import.
+ from synapse.state import StateResolutionStore
+
+ room_version = await self.main_store.get_room_version_id(room_id)
+ res = await self._state_resolution_handler.resolve_state_groups(
+ room_id,
+ room_version,
+ state_maps_by_state_group,
+ event_map=None,
+ state_res_store=StateResolutionStore(self.main_store),
+ )
+
+ return res.state
+
async def _persist_event_batch(
self,
events_and_contexts: List[Tuple[EventBase, EventContext]],
|