summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/controllers/persist_events.py12
-rw-r--r--synapse/storage/databases/main/events.py80
-rw-r--r--synapse/storage/databases/main/room.py22
3 files changed, 93 insertions, 21 deletions
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index 4bcb99d06e..c248fccc81 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -315,6 +315,10 @@ class EventsPersistenceStorageController:
             if they were deduplicated due to an event already existing that
             matched the transaction ID; the existing event is returned in such
             a case.
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
         for event, ctx in events_and_contexts:
@@ -363,6 +367,10 @@ class EventsPersistenceStorageController:
             latest persisted event. The returned event may not match the given
             event if it was deduplicated due to an existing event matching the
             transaction ID.
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         # add_to_queue returns a map from event ID to existing event ID if the
         # event was deduplicated. (The dict may also include other entries if
@@ -453,6 +461,10 @@ class EventsPersistenceStorageController:
         Returns:
             A dictionary of event ID to event ID we didn't persist as we already
             had another event persisted with the same TXN ID.
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         replaced_events: Dict[str, str] = {}
         if not events_and_contexts:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a3e12f1e9b..8a0e4e9589 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -16,6 +16,7 @@
 import itertools
 import logging
 from collections import OrderedDict
+from http import HTTPStatus
 from typing import (
     TYPE_CHECKING,
     Any,
@@ -35,6 +36,7 @@ from prometheus_client import Counter
 
 import synapse.metrics
 from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
+from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase, relation_from_event
 from synapse.events.snapshot import EventContext
@@ -69,6 +71,24 @@ event_counter = Counter(
 )
 
 
+class PartialStateConflictError(SynapseError):
+    """An internal error raised when attempting to persist an event with partial state
+    after the room containing the event has been un-partial stated.
+
+    This error should be handled by recomputing the event context and trying again.
+
+    This error has an HTTP status code so that it can be transported over replication.
+    It should not be exposed to clients.
+    """
+
+    def __init__(self) -> None:
+        super().__init__(
+            HTTPStatus.CONFLICT,
+            msg="Cannot persist partial state event in un-partial stated room",
+            errcode=Codes.UNKNOWN,
+        )
+
+
 @attr.s(slots=True, auto_attribs=True)
 class DeltaState:
     """Deltas to use to update the `current_state_events` table.
@@ -154,6 +174,10 @@ class PersistEventsStore:
 
         Returns:
             Resolves when the events have been persisted
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
 
         # We want to calculate the stream orderings as late as possible, as
@@ -354,6 +378,9 @@ class PersistEventsStore:
                 For each room, a list of the event ids which are the forward
                 extremities.
 
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         state_delta_for_room = state_delta_for_room or {}
         new_forward_extremities = new_forward_extremities or {}
@@ -1304,6 +1331,10 @@ class PersistEventsStore:
 
         Returns:
             new list, without events which are already in the events table.
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         txn.execute(
             "SELECT event_id, outlier FROM events WHERE event_id in (%s)"
@@ -2215,6 +2246,11 @@ class PersistEventsStore:
         txn: LoggingTransaction,
         events_and_contexts: Collection[Tuple[EventBase, EventContext]],
     ) -> None:
+        """
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
+        """
         state_groups = {}
         for event, context in events_and_contexts:
             if event.internal_metadata.is_outlier():
@@ -2239,19 +2275,37 @@ class PersistEventsStore:
         # if we have partial state for these events, record the fact. (This happens
         # here rather than in _store_event_txn because it also needs to happen when
         # we de-outlier an event.)
-        self.db_pool.simple_insert_many_txn(
-            txn,
-            table="partial_state_events",
-            keys=("room_id", "event_id"),
-            values=[
-                (
-                    event.room_id,
-                    event.event_id,
-                )
-                for event, ctx in events_and_contexts
-                if ctx.partial_state
-            ],
-        )
+        try:
+            self.db_pool.simple_insert_many_txn(
+                txn,
+                table="partial_state_events",
+                keys=("room_id", "event_id"),
+                values=[
+                    (
+                        event.room_id,
+                        event.event_id,
+                    )
+                    for event, ctx in events_and_contexts
+                    if ctx.partial_state
+                ],
+            )
+        except self.db_pool.engine.module.IntegrityError:
+            logger.info(
+                "Cannot persist events %s in rooms %s: room has been un-partial stated",
+                [
+                    event.event_id
+                    for event, ctx in events_and_contexts
+                    if ctx.partial_state
+                ],
+                list(
+                    {
+                        event.room_id
+                        for event, ctx in events_and_contexts
+                        if ctx.partial_state
+                    }
+                ),
+            )
+            raise PartialStateConflictError()
 
         self.db_pool.simple_upsert_many_txn(
             txn,
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index d8026e3fac..13d6a1d5c0 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1156,19 +1156,25 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         return room_servers
 
     async def clear_partial_state_room(self, room_id: str) -> bool:
-        # this can race with incoming events, so we watch out for FK errors.
-        # TODO(faster_joins): this still doesn't completely fix the race, since the persist process
-        #   is not atomic. I fear we need an application-level lock.
-        #   https://github.com/matrix-org/synapse/issues/12988
+        """Clears the partial state flag for a room.
+
+        Args:
+            room_id: The room whose partial state flag is to be cleared.
+
+        Returns:
+            `True` if the partial state flag has been cleared successfully.
+
+            `False` if the partial state flag could not be cleared because the room
+            still contains events with partial state.
+        """
         try:
             await self.db_pool.runInteraction(
                 "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
             )
             return True
-        except self.db_pool.engine.module.DatabaseError as e:
-            # TODO(faster_joins): how do we distinguish between FK errors and other errors?
-            #   https://github.com/matrix-org/synapse/issues/12988
-            logger.warning(
+        except self.db_pool.engine.module.IntegrityError as e:
+            # Assume that any `IntegrityError`s are due to partial state events.
+            logger.info(
                 "Exception while clearing lazy partial-state-room %s, retrying: %s",
                 room_id,
                 e,