diff options
Diffstat (limited to 'synapse/handlers/federation_event.py')
-rw-r--r-- | synapse/handlers/federation_event.py | 51 |
1 files changed, 41 insertions, 10 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 479d936dc0..c74117c19a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -64,6 +64,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ) from synapse.state import StateResolutionStore +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter from synapse.types import ( @@ -275,7 +276,16 @@ class FederationEventHandler: affected=pdu.event_id, ) - await self._process_received_pdu(origin, pdu, state_ids=None) + try: + await self._process_received_pdu(origin, pdu, state_ids=None) + except PartialStateConflictError: + # The room was un-partial stated while we were processing the PDU. + # Try once more, with full state this time. + logger.info( + "Room %s was un-partial stated while processing the PDU, trying again.", + room_id, + ) + await self._process_received_pdu(origin, pdu, state_ids=None) async def on_send_membership_event( self, origin: str, event: EventBase @@ -306,6 +316,9 @@ class FederationEventHandler: Raises: SynapseError if the event is not accepted into the room + PartialStateConflictError if the room was un-partial stated in between + computing the state at the event and persisting it. The caller should + retry exactly once in this case. """ logger.debug( "on_send_membership_event: Got event: %s, signatures: %s", @@ -423,6 +436,8 @@ class FederationEventHandler: Raises: SynapseError if the response is in some way invalid. + PartialStateConflictError if the homeserver is already in the room and it + has been un-partial stated. """ create_event = None for e in state: @@ -1084,10 +1099,14 @@ class FederationEventHandler: state_ids: Normally None, but if we are handling a gap in the graph (ie, we are missing one or more prev_events), the resolved state at the - event + event. Must not be partial state. backfilled: True if this is part of a historical batch of events (inhibits notification to clients, and validation of device keys.) + + PartialStateConflictError: if the room was un-partial stated in between + computing the state at the event and persisting it. The caller should retry + exactly once in this case. Will never be raised if `state_ids` is provided. """ logger.debug("Processing event: %s", event) assert not event.internal_metadata.outlier @@ -1933,6 +1952,9 @@ class FederationEventHandler: event: The event itself. context: The event context. backfilled: True if the event was backfilled. + + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # this method should not be called on outliers (those code paths call # persist_events_and_notify directly.) @@ -1985,6 +2007,10 @@ class FederationEventHandler: Returns: The stream ID after which all events have been persisted. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ if not event_and_contexts: return self._store.get_room_max_stream_ordering() @@ -1993,14 +2019,19 @@ class FederationEventHandler: if instance != self._instance_name: # Limit the number of events sent over replication. We choose 200 # here as that is what we default to in `max_request_body_size(..)` - for batch in batch_iter(event_and_contexts, 200): - result = await self._send_events( - instance_name=instance, - store=self._store, - room_id=room_id, - event_and_contexts=batch, - backfilled=backfilled, - ) + try: + for batch in batch_iter(event_and_contexts, 200): + result = await self._send_events( + instance_name=instance, + store=self._store, + room_id=room_id, + event_and_contexts=batch, + backfilled=backfilled, + ) + except SynapseError as e: + if e.code == HTTPStatus.CONFLICT: + raise PartialStateConflictError() + raise return result["max_stream_id"] else: assert self._storage_controllers.persistence |