summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2022-04-12 14:23:43 +0100
committerGitHub <noreply@github.com>2022-04-12 13:23:43 +0000
commit320186319ac4f1d16f8f964d92db8921a4b1073e (patch)
tree965d970fdea98a16bd2c23af3aabb0c9493eceb8 /synapse/storage
parentRemove references to unstable identifiers from MSC3440. (#12382) (diff)
downloadsynapse-320186319ac4f1d16f8f964d92db8921a4b1073e.tar.xz
Resync state after partial-state join (#12394)
We work through all the events with partial state, updating the state at each
of them. Once it's done, we recalculate the state for the whole room, and then
mark the room as having complete state.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/events.py15
-rw-r--r--synapse/storage/databases/main/events_worker.py24
-rw-r--r--synapse/storage/databases/main/room.py31
-rw-r--r--synapse/storage/databases/main/state.py48
-rw-r--r--synapse/storage/persist_events.py56
5 files changed, 174 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index e3be537cee..2a1e567ce0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -963,6 +963,21 @@ class PersistEventsStore:
                 values=to_insert,
             )
 
+    async def update_current_state(
+        self,
+        room_id: str,
+        state_delta: DeltaState,
+        stream_id: int,
+    ) -> None:
+        """Update the current state stored in the datatabase for the given room"""
+
+        await self.db_pool.runInteraction(
+            "update_current_state",
+            self._update_current_state_txn,
+            state_delta_by_room={room_id: state_delta},
+            stream_id=stream_id,
+        )
+
     def _update_current_state_txn(
         self,
         txn: LoggingTransaction,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a60e3f4fdd..5288cdba03 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1979,3 +1979,27 @@ class EventsWorkerStore(SQLBaseStore):
             desc="is_partial_state_event",
         )
         return result is not None
+
+    async def get_partial_state_events_batch(self, room_id: str) -> List[str]:
+        """Get a list of events in the given room that have partial state"""
+        return await self.db_pool.runInteraction(
+            "get_partial_state_events_batch",
+            self._get_partial_state_events_batch_txn,
+            room_id,
+        )
+
+    @staticmethod
+    def _get_partial_state_events_batch_txn(
+        txn: LoggingTransaction, room_id: str
+    ) -> List[str]:
+        txn.execute(
+            """
+            SELECT event_id FROM partial_state_events AS pse
+                JOIN events USING (event_id)
+            WHERE pse.room_id = ?
+            ORDER BY events.stream_ordering
+            LIMIT 100
+            """,
+            (room_id,),
+        )
+        return [row[0] for row in txn]
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 18b1acd9e1..87e9482c60 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1077,6 +1077,37 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
             get_rooms_for_retention_period_in_range_txn,
         )
 
+    async def clear_partial_state_room(self, room_id: str) -> bool:
+        # this can race with incoming events, so we watch out for FK errors.
+        # TODO(faster_joins): this still doesn't completely fix the race, since the persist process
+        #   is not atomic. I fear we need an application-level lock.
+        try:
+            await self.db_pool.runInteraction(
+                "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
+            )
+            return True
+        except self.db_pool.engine.module.DatabaseError as e:
+            # TODO(faster_joins): how do we distinguish between FK errors and other errors?
+            logger.warning(
+                "Exception while clearing lazy partial-state-room %s, retrying: %s",
+                room_id,
+                e,
+            )
+            return False
+
+    @staticmethod
+    def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None:
+        DatabasePool.simple_delete_txn(
+            txn,
+            table="partial_state_rooms_servers",
+            keyvalues={"room_id": room_id},
+        )
+        DatabasePool.simple_delete_one_txn(
+            txn,
+            table="partial_state_rooms",
+            keyvalues={"room_id": room_id},
+        )
+
 
 class _BackgroundUpdates:
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index ecdc1fdc4c..eba35f3700 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -21,6 +21,7 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
@@ -354,6 +355,53 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return {row["state_group"] for row in rows}
 
+    async def update_state_for_partial_state_event(
+        self,
+        event: EventBase,
+        context: EventContext,
+    ) -> None:
+        """Update the state group for a partial state event"""
+        await self.db_pool.runInteraction(
+            "update_state_for_partial_state_event",
+            self._update_state_for_partial_state_event_txn,
+            event,
+            context,
+        )
+
+    def _update_state_for_partial_state_event_txn(
+        self,
+        txn,
+        event: EventBase,
+        context: EventContext,
+    ):
+        # we shouldn't have any outliers here
+        assert not event.internal_metadata.is_outlier()
+
+        # anything that was rejected should have the same state as its
+        # predecessor.
+        if context.rejected:
+            assert context.state_group == context.state_group_before_event
+
+        self.db_pool.simple_update_txn(
+            txn,
+            table="event_to_state_groups",
+            keyvalues={"event_id": event.event_id},
+            updatevalues={"state_group": context.state_group},
+        )
+
+        self.db_pool.simple_delete_one_txn(
+            txn,
+            table="partial_state_events",
+            keyvalues={"event_id": event.event_id},
+        )
+
+        # TODO(faster_joins): need to do something about workers here
+        txn.call_after(
+            self._get_state_group_for_event.prefill,
+            (event.event_id,),
+            context.state_group,
+        )
+
 
 class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
 
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index b402922817..e496ba7bed 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -376,6 +376,62 @@ class EventsPersistenceStorage:
         pos = PersistedEventPosition(self._instance_name, event_stream_id)
         return event, pos, self.main_store.get_room_max_token()
 
+    async def update_current_state(self, room_id: str) -> None:
+        """Recalculate the current state for a room, and persist it"""
+        state = await self._calculate_current_state(room_id)
+        delta = await self._calculate_state_delta(room_id, state)
+
+        # TODO(faster_joins): get a real stream ordering, to make this work correctly
+        #    across workers.
+        #
+        # TODO(faster_joins): this can race against event persistence, in which case we
+        #    will end up with incorrect state. Perhaps we should make this a job we
+        #    farm out to the event persister, somehow.
+        stream_id = self.main_store.get_room_max_stream_ordering()
+        await self.persist_events_store.update_current_state(room_id, delta, stream_id)
+
+    async def _calculate_current_state(self, room_id: str) -> StateMap[str]:
+        """Calculate the current state of a room, based on the forward extremities
+
+        Args:
+            room_id: room for which to calculate current state
+
+        Returns:
+            map from (type, state_key) to event id for the  current state in the room
+        """
+        latest_event_ids = await self.main_store.get_latest_event_ids_in_room(room_id)
+        state_groups = set(
+            (
+                await self.main_store._get_state_group_for_events(latest_event_ids)
+            ).values()
+        )
+
+        state_maps_by_state_group = await self.state_store._get_state_for_groups(
+            state_groups
+        )
+
+        if len(state_groups) == 1:
+            # If there is only one state group, then we know what the current
+            # state is.
+            return state_maps_by_state_group[state_groups.pop()]
+
+        # Ok, we need to defer to the state handler to resolve our state sets.
+        logger.debug("calling resolve_state_groups from preserve_events")
+
+        # Avoid a circular import.
+        from synapse.state import StateResolutionStore
+
+        room_version = await self.main_store.get_room_version_id(room_id)
+        res = await self._state_resolution_handler.resolve_state_groups(
+            room_id,
+            room_version,
+            state_maps_by_state_group,
+            event_map=None,
+            state_res_store=StateResolutionStore(self.main_store),
+        )
+
+        return res.state
+
     async def _persist_event_batch(
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],