summary refs log tree commit diff
path: root/synapse/storage/persist_events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/persist_events.py')
-rw-r--r--synapse/storage/persist_events.py56
1 files changed, 56 insertions, 0 deletions
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index b402922817..e496ba7bed 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -376,6 +376,62 @@ class EventsPersistenceStorage:
         pos = PersistedEventPosition(self._instance_name, event_stream_id)
         return event, pos, self.main_store.get_room_max_token()
 
+    async def update_current_state(self, room_id: str) -> None:
+        """Recalculate the current state for a room, and persist it"""
+        state = await self._calculate_current_state(room_id)
+        delta = await self._calculate_state_delta(room_id, state)
+
+        # TODO(faster_joins): get a real stream ordering, to make this work correctly
+        #    across workers.
+        #
+        # TODO(faster_joins): this can race against event persistence, in which case we
+        #    will end up with incorrect state. Perhaps we should make this a job we
+        #    farm out to the event persister, somehow.
+        stream_id = self.main_store.get_room_max_stream_ordering()
+        await self.persist_events_store.update_current_state(room_id, delta, stream_id)
+
+    async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
+        """Calculate the current state of a room, based on the forward extremities
+
+        Args:
+            room_id: room for which to calculate current state
+
+        Returns:
+            map from (type, state_key) to event id for the  current state in the room
+        """
+        latest_event_ids = await self.main_store.get_latest_event_ids_in_room(room_id)
+        state_groups = set(
+            (
+                await self.main_store._get_state_group_for_events(latest_event_ids)
+            ).values()
+        )
+
+        state_maps_by_state_group = await self.state_store._get_state_for_groups(
+            state_groups
+        )
+
+        if len(state_groups) == 1:
+            # If there is only one state group, then we know what the current
+            # state is.
+            return state_maps_by_state_group[state_groups.pop()]
+
+        # Ok, we need to defer to the state handler to resolve our state sets.
+        logger.debug("calling resolve_state_groups from preserve_events")
+
+        # Avoid a circular import.
+        from synapse.state import StateResolutionStore
+
+        room_version = await self.main_store.get_room_version_id(room_id)
+        res = await self._state_resolution_handler.resolve_state_groups(
+            room_id,
+            room_version,
+            state_maps_by_state_group,
+            event_map=None,
+            state_res_store=StateResolutionStore(self.main_store),
+        )
+
+        return res.state
+
     async def _persist_event_batch(
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],