summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/13100.misc1
-rw-r--r--synapse/federation/federation_server.py18
-rw-r--r--synapse/handlers/federation.py39
-rw-r--r--synapse/handlers/federation_event.py51
-rw-r--r--synapse/handlers/message.py79
-rw-r--r--synapse/replication/http/federation.py3
-rw-r--r--synapse/replication/http/send_event.py3
-rw-r--r--synapse/storage/controllers/persist_events.py12
-rw-r--r--synapse/storage/databases/main/events.py80
-rw-r--r--synapse/storage/databases/main/room.py22
10 files changed, 234 insertions, 74 deletions
diff --git a/changelog.d/13100.misc b/changelog.d/13100.misc
new file mode 100644
index 0000000000..28f2fe0349
--- /dev/null
+++ b/changelog.d/13100.misc
@@ -0,0 +1 @@
+Faster room joins: Handle race between persisting an event and un-partial stating a room.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 3e1518f1f6..5dfdc86740 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -67,6 +67,7 @@ from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
     ReplicationGetQueryRestServlet,
 )
+from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.storage.databases.main.lock import Lock
 from synapse.types import JsonDict, StateMap, get_domain_from_id
 from synapse.util import json_decoder, unwrapFirstError
@@ -882,9 +883,20 @@ class FederationServer(FederationBase):
             logger.warning("%s", errmsg)
             raise SynapseError(403, errmsg, Codes.FORBIDDEN)
 
-        return await self._federation_event_handler.on_send_membership_event(
-            origin, event
-        )
+        try:
+            return await self._federation_event_handler.on_send_membership_event(
+                origin, event
+            )
+        except PartialStateConflictError:
+            # The room was un-partial stated while we were persisting the event.
+            # Try once more, with full state this time.
+            logger.info(
+                "Room %s was un-partial stated during `on_send_membership_event`, trying again.",
+                room_id,
+            )
+            return await self._federation_event_handler.on_send_membership_event(
+                origin, event
+            )
 
     async def on_event_auth(
         self, origin: str, room_id: str, event_id: str
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 34cc5ecd11..3c44b4bf86 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -45,6 +45,7 @@ from synapse.api.errors import (
     FederationDeniedError,
     FederationError,
     HttpResponseException,
+    LimitExceededError,
     NotFoundError,
     RequestSendFailed,
     SynapseError,
@@ -64,6 +65,7 @@ from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
     ReplicationStoreRoomOnOutlierMembershipRestServlet,
 )
+from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
 from synapse.types import JsonDict, StateMap, get_domain_from_id
@@ -549,15 +551,29 @@ class FederationHandler:
                 #   https://github.com/matrix-org/synapse/issues/12998
                 await self.store.store_partial_state_room(room_id, ret.servers_in_room)
 
-            max_stream_id = await self._federation_event_handler.process_remote_join(
-                origin,
-                room_id,
-                auth_chain,
-                state,
-                event,
-                room_version_obj,
-                partial_state=ret.partial_state,
-            )
+            try:
+                max_stream_id = (
+                    await self._federation_event_handler.process_remote_join(
+                        origin,
+                        room_id,
+                        auth_chain,
+                        state,
+                        event,
+                        room_version_obj,
+                        partial_state=ret.partial_state,
+                    )
+                )
+            except PartialStateConflictError as e:
+                # The homeserver was already in the room and it is no longer partial
+                # stated. We ought to be doing a local join instead. Turn the error into
+                # a 429, as a hint to the client to try again.
+                # TODO(faster_joins): `_should_perform_remote_join` suggests that we may
+                #   do a remote join for restricted rooms even if we have full state.
+                logger.error(
+                    "Room %s was un-partial stated while processing remote join.",
+                    room_id,
+                )
+                raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
 
             if ret.partial_state:
                 # Kick off the process of asynchronously fetching the state for this
@@ -1567,11 +1583,6 @@ class FederationHandler:
 
                 # we raced against more events arriving with partial state. Go round
                 # the loop again. We've already logged a warning, so no need for more.
-                # TODO(faster_joins): there is still a race here, whereby incoming events which raced
-                #   with us will fail to be persisted after the call to `clear_partial_state_room` due to
-                #   having partial state.
-                #   https://github.com/matrix-org/synapse/issues/12988
-                #
                 continue
 
             events = await self.store.get_events_as_list(
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 479d936dc0..c74117c19a 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -64,6 +64,7 @@ from synapse.replication.http.federation import (
     ReplicationFederationSendEventsRestServlet,
 )
 from synapse.state import StateResolutionStore
+from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
 from synapse.types import (
@@ -275,7 +276,16 @@ class FederationEventHandler:
                     affected=pdu.event_id,
                 )
 
-        await self._process_received_pdu(origin, pdu, state_ids=None)
+        try:
+            await self._process_received_pdu(origin, pdu, state_ids=None)
+        except PartialStateConflictError:
+            # The room was un-partial stated while we were processing the PDU.
+            # Try once more, with full state this time.
+            logger.info(
+                "Room %s was un-partial stated while processing the PDU, trying again.",
+                room_id,
+            )
+            await self._process_received_pdu(origin, pdu, state_ids=None)
 
     async def on_send_membership_event(
         self, origin: str, event: EventBase
@@ -306,6 +316,9 @@ class FederationEventHandler:
 
         Raises:
             SynapseError if the event is not accepted into the room
+            PartialStateConflictError if the room was un-partial stated in between
+                computing the state at the event and persisting it. The caller should
+                retry exactly once in this case.
         """
         logger.debug(
             "on_send_membership_event: Got event: %s, signatures: %s",
@@ -423,6 +436,8 @@ class FederationEventHandler:
 
         Raises:
             SynapseError if the response is in some way invalid.
+            PartialStateConflictError if the homeserver is already in the room and it
+                has been un-partial stated.
         """
         create_event = None
         for e in state:
@@ -1084,10 +1099,14 @@ class FederationEventHandler:
 
             state_ids: Normally None, but if we are handling a gap in the graph
                 (ie, we are missing one or more prev_events), the resolved state at the
-                event
+                event. Must not be partial state.
 
             backfilled: True if this is part of a historical batch of events (inhibits
                 notification to clients, and validation of device keys.)
+
+        PartialStateConflictError: if the room was un-partial stated in between
+            computing the state at the event and persisting it. The caller should retry
+            exactly once in this case. Will never be raised if `state_ids` is provided.
         """
         logger.debug("Processing event: %s", event)
         assert not event.internal_metadata.outlier
@@ -1933,6 +1952,9 @@ class FederationEventHandler:
             event: The event itself.
             context: The event context.
             backfilled: True if the event was backfilled.
+
+        PartialStateConflictError: if attempting to persist a partial state event in
+            a room that has been un-partial stated.
         """
         # this method should not be called on outliers (those code paths call
         # persist_events_and_notify directly.)
@@ -1985,6 +2007,10 @@ class FederationEventHandler:
 
         Returns:
             The stream ID after which all events have been persisted.
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         if not event_and_contexts:
             return self._store.get_room_max_stream_ordering()
@@ -1993,14 +2019,19 @@ class FederationEventHandler:
         if instance != self._instance_name:
             # Limit the number of events sent over replication. We choose 200
             # here as that is what we default to in `max_request_body_size(..)`
-            for batch in batch_iter(event_and_contexts, 200):
-                result = await self._send_events(
-                    instance_name=instance,
-                    store=self._store,
-                    room_id=room_id,
-                    event_and_contexts=batch,
-                    backfilled=backfilled,
-                )
+            try:
+                for batch in batch_iter(event_and_contexts, 200):
+                    result = await self._send_events(
+                        instance_name=instance,
+                        store=self._store,
+                        room_id=room_id,
+                        event_and_contexts=batch,
+                        backfilled=backfilled,
+                    )
+            except SynapseError as e:
+                if e.code == HTTPStatus.CONFLICT:
+                    raise PartialStateConflictError()
+                raise
             return result["max_stream_id"]
         else:
             assert self._storage_controllers.persistence
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c6b40a5b7a..1980e37dae 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -37,6 +37,7 @@ from synapse.api.errors import (
     AuthError,
     Codes,
     ConsentNotGivenError,
+    LimitExceededError,
     NotFoundError,
     ShadowBanError,
     SynapseError,
@@ -53,6 +54,7 @@ from synapse.handlers.directory import DirectoryHandler
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
 from synapse.types import (
@@ -1250,6 +1252,8 @@ class EventCreationHandler:
 
         Raises:
             ShadowBanError if the requester has been shadow-banned.
+            SynapseError(503) if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         extra_users = extra_users or []
 
@@ -1300,24 +1304,35 @@ class EventCreationHandler:
 
         # We now persist the event (and update the cache in parallel, since we
         # don't want to block on it).
-        result, _ = await make_deferred_yieldable(
-            gather_results(
-                (
-                    run_in_background(
-                        self._persist_event,
-                        requester=requester,
-                        event=event,
-                        context=context,
-                        ratelimit=ratelimit,
-                        extra_users=extra_users,
+        try:
+            result, _ = await make_deferred_yieldable(
+                gather_results(
+                    (
+                        run_in_background(
+                            self._persist_event,
+                            requester=requester,
+                            event=event,
+                            context=context,
+                            ratelimit=ratelimit,
+                            extra_users=extra_users,
+                        ),
+                        run_in_background(
+                            self.cache_joined_hosts_for_event, event, context
+                        ).addErrback(
+                            log_failure, "cache_joined_hosts_for_event failed"
+                        ),
                     ),
-                    run_in_background(
-                        self.cache_joined_hosts_for_event, event, context
-                    ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
-                ),
-                consumeErrors=True,
+                    consumeErrors=True,
+                )
+            ).addErrback(unwrapFirstError)
+        except PartialStateConflictError as e:
+            # The event context needs to be recomputed.
+            # Turn the error into a 429, as a hint to the client to try again.
+            logger.info(
+                "Room %s was un-partial stated while persisting client event.",
+                event.room_id,
             )
-        ).addErrback(unwrapFirstError)
+            raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
 
         return result
 
@@ -1332,6 +1347,9 @@ class EventCreationHandler:
         """Actually persists the event. Should only be called by
         `handle_new_client_event`, and see its docstring for documentation of
         the arguments.
+
+        PartialStateConflictError: if attempting to persist a partial state event in
+            a room that has been un-partial stated.
         """
 
         # Skip push notification actions for historical messages
@@ -1348,16 +1366,21 @@ class EventCreationHandler:
             # If we're a worker we need to hit out to the master.
             writer_instance = self._events_shard_config.get_instance(event.room_id)
             if writer_instance != self._instance_name:
-                result = await self.send_event(
-                    instance_name=writer_instance,
-                    event_id=event.event_id,
-                    store=self.store,
-                    requester=requester,
-                    event=event,
-                    context=context,
-                    ratelimit=ratelimit,
-                    extra_users=extra_users,
-                )
+                try:
+                    result = await self.send_event(
+                        instance_name=writer_instance,
+                        event_id=event.event_id,
+                        store=self.store,
+                        requester=requester,
+                        event=event,
+                        context=context,
+                        ratelimit=ratelimit,
+                        extra_users=extra_users,
+                    )
+                except SynapseError as e:
+                    if e.code == HTTPStatus.CONFLICT:
+                        raise PartialStateConflictError()
+                    raise
                 stream_id = result["stream_id"]
                 event_id = result["event_id"]
                 if event_id != event.event_id:
@@ -1485,6 +1508,10 @@ class EventCreationHandler:
             The persisted event. This may be different than the given event if
             it was de-duplicated (e.g. because we had already persisted an
             event with the same transaction ID.)
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         extra_users = extra_users or []
 
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index eed29cd597..d3abafed28 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -60,6 +60,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         {
             "max_stream_id": 32443,
         }
+
+    Responds with a 409 when a `PartialStateConflictError` is raised due to an event
+    context that needs to be recomputed due to the un-partial stating of a room.
     """
 
     NAME = "fed_send_events"
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index c2b2588ea5..486f04723c 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -59,6 +59,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
 
         { "stream_id": 12345, "event_id": "$abcdef..." }
 
+    Responds with a 409 when a `PartialStateConflictError` is raised due to an event
+    context that needs to be recomputed due to the un-partial stating of a room.
+
     The returned event ID may not match the sent event if it was deduplicated.
     """
 
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index 4bcb99d06e..c248fccc81 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -315,6 +315,10 @@ class EventsPersistenceStorageController:
             if they were deduplicated due to an event already existing that
             matched the transaction ID; the existing event is returned in such
             a case.
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
         for event, ctx in events_and_contexts:
@@ -363,6 +367,10 @@ class EventsPersistenceStorageController:
             latest persisted event. The returned event may not match the given
             event if it was deduplicated due to an existing event matching the
             transaction ID.
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         # add_to_queue returns a map from event ID to existing event ID if the
         # event was deduplicated. (The dict may also include other entries if
@@ -453,6 +461,10 @@ class EventsPersistenceStorageController:
         Returns:
             A dictionary of event ID to event ID we didn't persist as we already
             had another event persisted with the same TXN ID.
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         replaced_events: Dict[str, str] = {}
         if not events_and_contexts:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a3e12f1e9b..8a0e4e9589 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -16,6 +16,7 @@
 import itertools
 import logging
 from collections import OrderedDict
+from http import HTTPStatus
 from typing import (
     TYPE_CHECKING,
     Any,
@@ -35,6 +36,7 @@ from prometheus_client import Counter
 
 import synapse.metrics
 from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
+from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase, relation_from_event
 from synapse.events.snapshot import EventContext
@@ -69,6 +71,24 @@ event_counter = Counter(
 )
 
 
+class PartialStateConflictError(SynapseError):
+    """An internal error raised when attempting to persist an event with partial state
+    after the room containing the event has been un-partial stated.
+
+    This error should be handled by recomputing the event context and trying again.
+
+    This error has an HTTP status code so that it can be transported over replication.
+    It should not be exposed to clients.
+    """
+
+    def __init__(self) -> None:
+        super().__init__(
+            HTTPStatus.CONFLICT,
+            msg="Cannot persist partial state event in un-partial stated room",
+            errcode=Codes.UNKNOWN,
+        )
+
+
 @attr.s(slots=True, auto_attribs=True)
 class DeltaState:
     """Deltas to use to update the `current_state_events` table.
@@ -154,6 +174,10 @@ class PersistEventsStore:
 
         Returns:
             Resolves when the events have been persisted
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
 
         # We want to calculate the stream orderings as late as possible, as
@@ -354,6 +378,9 @@ class PersistEventsStore:
                 For each room, a list of the event ids which are the forward
                 extremities.
 
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         state_delta_for_room = state_delta_for_room or {}
         new_forward_extremities = new_forward_extremities or {}
@@ -1304,6 +1331,10 @@ class PersistEventsStore:
 
         Returns:
             new list, without events which are already in the events table.
+
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         txn.execute(
             "SELECT event_id, outlier FROM events WHERE event_id in (%s)"
@@ -2215,6 +2246,11 @@ class PersistEventsStore:
         txn: LoggingTransaction,
         events_and_contexts: Collection[Tuple[EventBase, EventContext]],
     ) -> None:
+        """
+        Raises:
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
+        """
         state_groups = {}
         for event, context in events_and_contexts:
             if event.internal_metadata.is_outlier():
@@ -2239,19 +2275,37 @@ class PersistEventsStore:
         # if we have partial state for these events, record the fact. (This happens
         # here rather than in _store_event_txn because it also needs to happen when
         # we de-outlier an event.)
-        self.db_pool.simple_insert_many_txn(
-            txn,
-            table="partial_state_events",
-            keys=("room_id", "event_id"),
-            values=[
-                (
-                    event.room_id,
-                    event.event_id,
-                )
-                for event, ctx in events_and_contexts
-                if ctx.partial_state
-            ],
-        )
+        try:
+            self.db_pool.simple_insert_many_txn(
+                txn,
+                table="partial_state_events",
+                keys=("room_id", "event_id"),
+                values=[
+                    (
+                        event.room_id,
+                        event.event_id,
+                    )
+                    for event, ctx in events_and_contexts
+                    if ctx.partial_state
+                ],
+            )
+        except self.db_pool.engine.module.IntegrityError:
+            logger.info(
+                "Cannot persist events %s in rooms %s: room has been un-partial stated",
+                [
+                    event.event_id
+                    for event, ctx in events_and_contexts
+                    if ctx.partial_state
+                ],
+                list(
+                    {
+                        event.room_id
+                        for event, ctx in events_and_contexts
+                        if ctx.partial_state
+                    }
+                ),
+            )
+            raise PartialStateConflictError()
 
         self.db_pool.simple_upsert_many_txn(
             txn,
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index d8026e3fac..13d6a1d5c0 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1156,19 +1156,25 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         return room_servers
 
     async def clear_partial_state_room(self, room_id: str) -> bool:
-        # this can race with incoming events, so we watch out for FK errors.
-        # TODO(faster_joins): this still doesn't completely fix the race, since the persist process
-        #   is not atomic. I fear we need an application-level lock.
-        #   https://github.com/matrix-org/synapse/issues/12988
+        """Clears the partial state flag for a room.
+
+        Args:
+            room_id: The room whose partial state flag is to be cleared.
+
+        Returns:
+            `True` if the partial state flag has been cleared successfully.
+
+            `False` if the partial state flag could not be cleared because the room
+            still contains events with partial state.
+        """
         try:
             await self.db_pool.runInteraction(
                 "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
             )
             return True
-        except self.db_pool.engine.module.DatabaseError as e:
-            # TODO(faster_joins): how do we distinguish between FK errors and other errors?
-            #   https://github.com/matrix-org/synapse/issues/12988
-            logger.warning(
+        except self.db_pool.engine.module.IntegrityError as e:
+            # Assume that any `IntegrityError`s are due to partial state events.
+            logger.info(
                 "Exception while clearing lazy partial-state-room %s, retrying: %s",
                 room_id,
                 e,