diff options
author | Sean Quah <8349537+squahtx@users.noreply.github.com> | 2022-07-05 16:12:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-05 16:12:52 +0100 |
commit | 68db233f0cf16a20f21fd927374121966976d9c7 (patch) | |
tree | dc9054e39534b5933140d688abaa9221e3381627 /synapse/storage | |
parent | Type `tests.utils` (#13028) (diff) | |
download | synapse-68db233f0cf16a20f21fd927374121966976d9c7.tar.xz |
Handle race between persisting an event and un-partial stating a room (#13100)
Whenever we want to persist an event, we first compute an event context, which includes the state at the event and a flag indicating whether the state is partial. After a lot of processing, we finally try to store the event in the database, which can fail for partial state events when the containing room has been un-partial stated in the meantime. We detect the race as a foreign key constraint failure in the data store layer and turn it into a special `PartialStateConflictError` exception, which makes its way up to the method in which we computed the event context. To make things difficult, the exception needs to cross a replication request: `/fed_send_events` for events coming over federation and `/send_event` for events from clients. We transport the `PartialStateConflictError` as a `409 Conflict` over replication and turn `409`s back into `PartialStateConflictError`s on the worker making the request. All client events go through `EventCreationHandler.handle_new_client_event`, which is called in *a lot* of places. Instead of trying to update all the code which creates client events, we turn the `PartialStateConflictError` into a `429 Too Many Requests` in `EventCreationHandler.handle_new_client_event` and hope that clients take it as a hint to retry their request. On the federation event side, there are 7 places which compute event contexts. 4 of them use outlier event contexts: `FederationEventHandler._auth_and_persist_outliers_inner`, `FederationHandler.do_knock`, `FederationHandler.on_invite_request` and `FederationHandler.do_remotely_reject_invite`. These events won't have the partial state flag, so we do not need to do anything for then. The remaining 3 paths which create events are `FederationEventHandler.process_remote_join`, `FederationEventHandler.on_send_membership_event` and `FederationEventHandler._process_received_pdu`. We can't experience the race in `process_remote_join`, unless we're handling an additional join into a partial state room, which currently blocks, so we make no attempt to handle it correctly. `on_send_membership_event` is only called by `FederationServer._on_send_membership_event`, so we catch the `PartialStateConflictError` there and retry just once. `_process_received_pdu` is called by `on_receive_pdu` for incoming events and `_process_pulled_event` for backfill. The latter should never try to persist partial state events, so we ignore it. We catch the `PartialStateConflictError` in `on_receive_pdu` and retry just once. Refering to the graph of code paths in https://github.com/matrix-org/synapse/issues/12988#issuecomment-1156857648 may make the above make more sense. Signed-off-by: Sean Quah <seanq@matrix.org>
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/controllers/persist_events.py | 12 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 80 | ||||
-rw-r--r-- | synapse/storage/databases/main/room.py | 22 |
3 files changed, 93 insertions, 21 deletions
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 4bcb99d06e..c248fccc81 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -315,6 +315,10 @@ class EventsPersistenceStorageController: if they were deduplicated due to an event already existing that matched the transaction ID; the existing event is returned in such a case. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {} for event, ctx in events_and_contexts: @@ -363,6 +367,10 @@ class EventsPersistenceStorageController: latest persisted event. The returned event may not match the given event if it was deduplicated due to an existing event matching the transaction ID. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # add_to_queue returns a map from event ID to existing event ID if the # event was deduplicated. (The dict may also include other entries if @@ -453,6 +461,10 @@ class EventsPersistenceStorageController: Returns: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ replaced_events: Dict[str, str] = {} if not events_and_contexts: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a3e12f1e9b..8a0e4e9589 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -16,6 +16,7 @@ import itertools import logging from collections import OrderedDict +from http import HTTPStatus from typing import ( TYPE_CHECKING, Any, @@ -35,6 +36,7 @@ from prometheus_client import Counter import synapse.metrics from synapse.api.constants import EventContentFields, EventTypes, RelationTypes +from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext @@ -69,6 +71,24 @@ event_counter = Counter( ) +class PartialStateConflictError(SynapseError): + """An internal error raised when attempting to persist an event with partial state + after the room containing the event has been un-partial stated. + + This error should be handled by recomputing the event context and trying again. + + This error has an HTTP status code so that it can be transported over replication. + It should not be exposed to clients. + """ + + def __init__(self) -> None: + super().__init__( + HTTPStatus.CONFLICT, + msg="Cannot persist partial state event in un-partial stated room", + errcode=Codes.UNKNOWN, + ) + + @attr.s(slots=True, auto_attribs=True) class DeltaState: """Deltas to use to update the `current_state_events` table. @@ -154,6 +174,10 @@ class PersistEventsStore: Returns: Resolves when the events have been persisted + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # We want to calculate the stream orderings as late as possible, as @@ -354,6 +378,9 @@ class PersistEventsStore: For each room, a list of the event ids which are the forward extremities. + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ state_delta_for_room = state_delta_for_room or {} new_forward_extremities = new_forward_extremities or {} @@ -1304,6 +1331,10 @@ class PersistEventsStore: Returns: new list, without events which are already in the events table. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ txn.execute( "SELECT event_id, outlier FROM events WHERE event_id in (%s)" @@ -2215,6 +2246,11 @@ class PersistEventsStore: txn: LoggingTransaction, events_and_contexts: Collection[Tuple[EventBase, EventContext]], ) -> None: + """ + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. + """ state_groups = {} for event, context in events_and_contexts: if event.internal_metadata.is_outlier(): @@ -2239,19 +2275,37 @@ class PersistEventsStore: # if we have partial state for these events, record the fact. (This happens # here rather than in _store_event_txn because it also needs to happen when # we de-outlier an event.) - self.db_pool.simple_insert_many_txn( - txn, - table="partial_state_events", - keys=("room_id", "event_id"), - values=[ - ( - event.room_id, - event.event_id, - ) - for event, ctx in events_and_contexts - if ctx.partial_state - ], - ) + try: + self.db_pool.simple_insert_many_txn( + txn, + table="partial_state_events", + keys=("room_id", "event_id"), + values=[ + ( + event.room_id, + event.event_id, + ) + for event, ctx in events_and_contexts + if ctx.partial_state + ], + ) + except self.db_pool.engine.module.IntegrityError: + logger.info( + "Cannot persist events %s in rooms %s: room has been un-partial stated", + [ + event.event_id + for event, ctx in events_and_contexts + if ctx.partial_state + ], + list( + { + event.room_id + for event, ctx in events_and_contexts + if ctx.partial_state + } + ), + ) + raise PartialStateConflictError() self.db_pool.simple_upsert_many_txn( txn, diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index d8026e3fac..13d6a1d5c0 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1156,19 +1156,25 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): return room_servers async def clear_partial_state_room(self, room_id: str) -> bool: - # this can race with incoming events, so we watch out for FK errors. - # TODO(faster_joins): this still doesn't completely fix the race, since the persist process - # is not atomic. I fear we need an application-level lock. - # https://github.com/matrix-org/synapse/issues/12988 + """Clears the partial state flag for a room. + + Args: + room_id: The room whose partial state flag is to be cleared. + + Returns: + `True` if the partial state flag has been cleared successfully. + + `False` if the partial state flag could not be cleared because the room + still contains events with partial state. + """ try: await self.db_pool.runInteraction( "clear_partial_state_room", self._clear_partial_state_room_txn, room_id ) return True - except self.db_pool.engine.module.DatabaseError as e: - # TODO(faster_joins): how do we distinguish between FK errors and other errors? - # https://github.com/matrix-org/synapse/issues/12988 - logger.warning( + except self.db_pool.engine.module.IntegrityError as e: + # Assume that any `IntegrityError`s are due to partial state events. + logger.info( "Exception while clearing lazy partial-state-room %s, retrying: %s", room_id, e, |