diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/federation.py | 39 | ||||
-rw-r--r-- | synapse/handlers/federation_event.py | 51 | ||||
-rw-r--r-- | synapse/handlers/message.py | 79 |
3 files changed, 119 insertions, 50 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 34cc5ecd11..3c44b4bf86 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -45,6 +45,7 @@ from synapse.api.errors import ( FederationDeniedError, FederationError, HttpResponseException, + LimitExceededError, NotFoundError, RequestSendFailed, SynapseError, @@ -64,6 +65,7 @@ from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, ReplicationStoreRoomOnOutlierMembershipRestServlet, ) +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import JsonDict, StateMap, get_domain_from_id @@ -549,15 +551,29 @@ class FederationHandler: # https://github.com/matrix-org/synapse/issues/12998 await self.store.store_partial_state_room(room_id, ret.servers_in_room) - max_stream_id = await self._federation_event_handler.process_remote_join( - origin, - room_id, - auth_chain, - state, - event, - room_version_obj, - partial_state=ret.partial_state, - ) + try: + max_stream_id = ( + await self._federation_event_handler.process_remote_join( + origin, + room_id, + auth_chain, + state, + event, + room_version_obj, + partial_state=ret.partial_state, + ) + ) + except PartialStateConflictError as e: + # The homeserver was already in the room and it is no longer partial + # stated. We ought to be doing a local join instead. Turn the error into + # a 429, as a hint to the client to try again. + # TODO(faster_joins): `_should_perform_remote_join` suggests that we may + # do a remote join for restricted rooms even if we have full state. + logger.error( + "Room %s was un-partial stated while processing remote join.", + room_id, + ) + raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) if ret.partial_state: # Kick off the process of asynchronously fetching the state for this @@ -1567,11 +1583,6 @@ class FederationHandler: # we raced against more events arriving with partial state. Go round # the loop again. We've already logged a warning, so no need for more. - # TODO(faster_joins): there is still a race here, whereby incoming events which raced - # with us will fail to be persisted after the call to `clear_partial_state_room` due to - # having partial state. - # https://github.com/matrix-org/synapse/issues/12988 - # continue events = await self.store.get_events_as_list( diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 479d936dc0..c74117c19a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -64,6 +64,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ) from synapse.state import StateResolutionStore +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import ( @@ -275,7 +276,16 @@ class FederationEventHandler: affected=pdu.event_id, ) - await self._process_received_pdu(origin, pdu, state_ids=None) + try: + await self._process_received_pdu(origin, pdu, state_ids=None) + except PartialStateConflictError: + # The room was un-partial stated while we were processing the PDU. + # Try once more, with full state this time. + logger.info( + "Room %s was un-partial stated while processing the PDU, trying again.", + room_id, + ) + await self._process_received_pdu(origin, pdu, state_ids=None) async def on_send_membership_event( self, origin: str, event: EventBase @@ -306,6 +316,9 @@ class FederationEventHandler: Raises: SynapseError if the event is not accepted into the room + PartialStateConflictError if the room was un-partial stated in between + computing the state at the event and persisting it. The caller should + retry exactly once in this case. """ logger.debug( "on_send_membership_event: Got event: %s, signatures: %s", @@ -423,6 +436,8 @@ class FederationEventHandler: Raises: SynapseError if the response is in some way invalid. + PartialStateConflictError if the homeserver is already in the room and it + has been un-partial stated. """ create_event = None for e in state: @@ -1084,10 +1099,14 @@ class FederationEventHandler: state_ids: Normally None, but if we are handling a gap in the graph (ie, we are missing one or more prev_events), the resolved state at the - event + event. Must not be partial state. backfilled: True if this is part of a historical batch of events (inhibits notification to clients, and validation of device keys.) + + PartialStateConflictError: if the room was un-partial stated in between + computing the state at the event and persisting it. The caller should retry + exactly once in this case. Will never be raised if `state_ids` is provided. """ logger.debug("Processing event: %s", event) assert not event.internal_metadata.outlier @@ -1933,6 +1952,9 @@ class FederationEventHandler: event: The event itself. context: The event context. backfilled: True if the event was backfilled. + + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # this method should not be called on outliers (those code paths call # persist_events_and_notify directly.) @@ -1985,6 +2007,10 @@ class FederationEventHandler: Returns: The stream ID after which all events have been persisted. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ if not event_and_contexts: return self._store.get_room_max_stream_ordering() @@ -1993,14 +2019,19 @@ class FederationEventHandler: if instance != self._instance_name: # Limit the number of events sent over replication. We choose 200 # here as that is what we default to in `max_request_body_size(..)` - for batch in batch_iter(event_and_contexts, 200): - result = await self._send_events( - instance_name=instance, - store=self._store, - room_id=room_id, - event_and_contexts=batch, - backfilled=backfilled, - ) + try: + for batch in batch_iter(event_and_contexts, 200): + result = await self._send_events( + instance_name=instance, + store=self._store, + room_id=room_id, + event_and_contexts=batch, + backfilled=backfilled, + ) + except SynapseError as e: + if e.code == HTTPStatus.CONFLICT: + raise PartialStateConflictError() + raise return result["max_stream_id"] else: assert self._storage_controllers.persistence diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c6b40a5b7a..1980e37dae 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -37,6 +37,7 @@ from synapse.api.errors import ( AuthError, Codes, ConsentNotGivenError, + LimitExceededError, NotFoundError, ShadowBanError, SynapseError, @@ -53,6 +54,7 @@ from synapse.handlers.directory import DirectoryHandler from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import ( @@ -1250,6 +1252,8 @@ class EventCreationHandler: Raises: ShadowBanError if the requester has been shadow-banned. + SynapseError(503) if attempting to persist a partial state event in + a room that has been un-partial stated. """ extra_users = extra_users or [] @@ -1300,24 +1304,35 @@ class EventCreationHandler: # We now persist the event (and update the cache in parallel, since we # don't want to block on it). - result, _ = await make_deferred_yieldable( - gather_results( - ( - run_in_background( - self._persist_event, - requester=requester, - event=event, - context=context, - ratelimit=ratelimit, - extra_users=extra_users, + try: + result, _ = await make_deferred_yieldable( + gather_results( + ( + run_in_background( + self._persist_event, + requester=requester, + event=event, + context=context, + ratelimit=ratelimit, + extra_users=extra_users, + ), + run_in_background( + self.cache_joined_hosts_for_event, event, context + ).addErrback( + log_failure, "cache_joined_hosts_for_event failed" + ), ), - run_in_background( - self.cache_joined_hosts_for_event, event, context - ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), - ), - consumeErrors=True, + consumeErrors=True, + ) + ).addErrback(unwrapFirstError) + except PartialStateConflictError as e: + # The event context needs to be recomputed. + # Turn the error into a 429, as a hint to the client to try again. + logger.info( + "Room %s was un-partial stated while persisting client event.", + event.room_id, ) - ).addErrback(unwrapFirstError) + raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) return result @@ -1332,6 +1347,9 @@ class EventCreationHandler: """Actually persists the event. Should only be called by `handle_new_client_event`, and see its docstring for documentation of the arguments. + + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # Skip push notification actions for historical messages @@ -1348,16 +1366,21 @@ class EventCreationHandler: # If we're a worker we need to hit out to the master. writer_instance = self._events_shard_config.get_instance(event.room_id) if writer_instance != self._instance_name: - result = await self.send_event( - instance_name=writer_instance, - event_id=event.event_id, - store=self.store, - requester=requester, - event=event, - context=context, - ratelimit=ratelimit, - extra_users=extra_users, - ) + try: + result = await self.send_event( + instance_name=writer_instance, + event_id=event.event_id, + store=self.store, + requester=requester, + event=event, + context=context, + ratelimit=ratelimit, + extra_users=extra_users, + ) + except SynapseError as e: + if e.code == HTTPStatus.CONFLICT: + raise PartialStateConflictError() + raise stream_id = result["stream_id"] event_id = result["event_id"] if event_id != event.event_id: @@ -1485,6 +1508,10 @@ class EventCreationHandler: The persisted event. This may be different than the given event if it was de-duplicated (e.g. because we had already persisted an event with the same transaction ID.) + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ extra_users = extra_users or [] |