summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/events.py15
-rw-r--r--synapse/storage/databases/main/events_worker.py24
-rw-r--r--synapse/storage/databases/main/room.py31
-rw-r--r--synapse/storage/databases/main/state.py48
4 files changed, 118 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index e3be537cee..2a1e567ce0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -963,6 +963,21 @@ class PersistEventsStore:
                 values=to_insert,
             )
 
+    async def update_current_state(
+        self,
+        room_id: str,
+        state_delta: DeltaState,
+        stream_id: int,
+    ) -> None:
+        """Update the current state stored in the datatabase for the given room"""
+
+        await self.db_pool.runInteraction(
+            "update_current_state",
+            self._update_current_state_txn,
+            state_delta_by_room={room_id: state_delta},
+            stream_id=stream_id,
+        )
+
     def _update_current_state_txn(
         self,
         txn: LoggingTransaction,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a60e3f4fdd..5288cdba03 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1979,3 +1979,27 @@ class EventsWorkerStore(SQLBaseStore):
             desc="is_partial_state_event",
         )
         return result is not None
+
+    async def get_partial_state_events_batch(self, room_id: str) -> List[str]:
+        """Get a list of events in the given room that have partial state"""
+        return await self.db_pool.runInteraction(
+            "get_partial_state_events_batch",
+            self._get_partial_state_events_batch_txn,
+            room_id,
+        )
+
+    @staticmethod
+    def _get_partial_state_events_batch_txn(
+        txn: LoggingTransaction, room_id: str
+    ) -> List[str]:
+        txn.execute(
+            """
+            SELECT event_id FROM partial_state_events AS pse
+                JOIN events USING (event_id)
+            WHERE pse.room_id = ?
+            ORDER BY events.stream_ordering
+            LIMIT 100
+            """,
+            (room_id,),
+        )
+        return [row[0] for row in txn]
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 18b1acd9e1..87e9482c60 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1077,6 +1077,37 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
             get_rooms_for_retention_period_in_range_txn,
         )
 
+    async def clear_partial_state_room(self, room_id: str) -> bool:
+        # this can race with incoming events, so we watch out for FK errors.
+        # TODO(faster_joins): this still doesn't completely fix the race, since the persist process
+        #   is not atomic. I fear we need an application-level lock.
+        try:
+            await self.db_pool.runInteraction(
+                "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
+            )
+            return True
+        except self.db_pool.engine.module.DatabaseError as e:
+            # TODO(faster_joins): how do we distinguish between FK errors and other errors?
+            logger.warning(
+                "Exception while clearing lazy partial-state-room %s, retrying: %s",
+                room_id,
+                e,
+            )
+            return False
+
+    @staticmethod
+    def _clear_partial_state_room_txn(txn: LoggingTransaction, room_id: str) -> None:
+        DatabasePool.simple_delete_txn(
+            txn,
+            table="partial_state_rooms_servers",
+            keyvalues={"room_id": room_id},
+        )
+        DatabasePool.simple_delete_one_txn(
+            txn,
+            table="partial_state_rooms",
+            keyvalues={"room_id": room_id},
+        )
+
 
 class _BackgroundUpdates:
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index ecdc1fdc4c..eba35f3700 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -21,6 +21,7 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
@@ -354,6 +355,53 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return {row["state_group"] for row in rows}
 
+    async def update_state_for_partial_state_event(
+        self,
+        event: EventBase,
+        context: EventContext,
+    ) -> None:
+        """Update the state group for a partial state event"""
+        await self.db_pool.runInteraction(
+            "update_state_for_partial_state_event",
+            self._update_state_for_partial_state_event_txn,
+            event,
+            context,
+        )
+
+    def _update_state_for_partial_state_event_txn(
+        self,
+        txn,
+        event: EventBase,
+        context: EventContext,
+    ):
+        # we shouldn't have any outliers here
+        assert not event.internal_metadata.is_outlier()
+
+        # anything that was rejected should have the same state as its
+        # predecessor.
+        if context.rejected:
+            assert context.state_group == context.state_group_before_event
+
+        self.db_pool.simple_update_txn(
+            txn,
+            table="event_to_state_groups",
+            keyvalues={"event_id": event.event_id},
+            updatevalues={"state_group": context.state_group},
+        )
+
+        self.db_pool.simple_delete_one_txn(
+            txn,
+            table="partial_state_events",
+            keyvalues={"event_id": event.event_id},
+        )
+
+        # TODO(faster_joins): need to do something about workers here
+        txn.call_after(
+            self._get_state_group_for_event.prefill,
+            (event.event_id,),
+            context.state_group,
+        )
+
 
 class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):