summary refs log tree commit diff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2021-09-01 10:48:08 +0100
committerBrendan Abolivier <babolivier@matrix.org>2021-09-01 10:48:08 +0100
commit6c9461eb41391aa34e0da81d1f4e8ecf947ec7ad (patch)
tree14fc91666fed9fc7b585fd044b18b58c0e2dd9cf /synapse/federation/federation_server.py
parentMerge tag 'v1.37.0' into babolivier/dinsic_1.41.0 (diff)
parentFixup changelog (diff)
downloadsynapse-6c9461eb41391aa34e0da81d1f4e8ecf947ec7ad.tar.xz
Merge tag 'v1.37.1' into babolivier/dinsic_1.41.0
Synapse 1.37.1 (2021-06-30)
===========================

This release resolves issues (such as [#9490](https://github.com/matrix-org/synapse/issues/9490)) where one busy room could cause head-of-line blocking, starving Synapse from processing events in other rooms, and causing all federated traffic to fall behind. Synapse 1.37.1 processes inbound federation traffic asynchronously, ensuring that one busy room won't impact others. Please upgrade to Synapse 1.37.1 as soon as possible, in order to increase resilience to other traffic spikes.

No significant changes since v1.37.1rc1.

Synapse 1.37.1rc1 (2021-06-29)
==============================

Features
--------

- Handle inbound events from federation asynchronously. ([\#10269](https://github.com/matrix-org/synapse/issues/10269), [\#10272](https://github.com/matrix-org/synapse/issues/10272))
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r--synapse/federation/federation_server.py99
1 files changed, 96 insertions, 3 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py

index 3466523c3c..1d050e54e2 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -44,8 +44,7 @@ from synapse.api.errors import ( SynapseError, UnsupportedRoomVersionError, ) -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS -from synapse.config.api import DEFAULT_ROOM_STATE_TYPES +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.events import EventBase from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.persistence import TransactionActions @@ -58,10 +57,12 @@ from synapse.logging.context import ( ) from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace from synapse.logging.utils import log_function +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, ) +from synapse.storage.databases.main.lock import Lock from synapse.types import JsonDict from synapse.util import glob_to_regex, json_decoder, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute @@ -97,6 +98,11 @@ last_pdu_ts_metric = Gauge( ) +# The name of the lock to use when process events in a room received over +# federation. +_INBOUND_EVENT_HANDLING_LOCK_NAME = "federation_inbound_pdu" + + class FederationServer(FederationBase): def __init__(self, hs: "HomeServer"): super().__init__(hs) @@ -835,7 +841,94 @@ class FederationServer(FederationBase): except SynapseError as e: raise FederationError("ERROR", e.code, e.msg, affected=pdu.event_id) - await self.handler.on_receive_pdu(origin, pdu, sent_to_us_directly=True) + # Add the event to our staging area + await self.store.insert_received_event_to_staging(origin, pdu) + + # Try and acquire the processing lock for the room, if we get it start a + # background process for handling the events in the room. + lock = await self.store.try_acquire_lock( + _INBOUND_EVENT_HANDLING_LOCK_NAME, pdu.room_id + ) + if lock: + self._process_incoming_pdus_in_room_inner( + pdu.room_id, room_version, lock, origin, pdu + ) + + @wrap_as_background_process("_process_incoming_pdus_in_room_inner") + async def _process_incoming_pdus_in_room_inner( + self, + room_id: str, + room_version: RoomVersion, + lock: Lock, + latest_origin: str, + latest_event: EventBase, + ) -> None: + """Process events in the staging area for the given room. + + The latest_origin and latest_event args are the latest origin and event + received. + """ + + # The common path is for the event we just received be the only event in + # the room, so instead of pulling the event out of the DB and parsing + # the event we just pull out the next event ID and check if that matches. + next_origin, next_event_id = await self.store.get_next_staged_event_id_for_room( + room_id + ) + if next_origin == latest_origin and next_event_id == latest_event.event_id: + origin = latest_origin + event = latest_event + else: + next = await self.store.get_next_staged_event_for_room( + room_id, room_version + ) + if not next: + return + + origin, event = next + + # We loop round until there are no more events in the room in the + # staging area, or we fail to get the lock (which means another process + # has started processing). + while True: + async with lock: + try: + await self.handler.on_receive_pdu( + origin, event, sent_to_us_directly=True + ) + except FederationError as e: + # XXX: Ideally we'd inform the remote we failed to process + # the event, but we can't return an error in the transaction + # response (as we've already responded). + logger.warning("Error handling PDU %s: %s", event.event_id, e) + except Exception: + f = failure.Failure() + logger.error( + "Failed to handle PDU %s", + event.event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore + ) + + await self.store.remove_received_event_from_staging( + origin, event.event_id + ) + + # We need to do this check outside the lock to avoid a race between + # a new event being inserted by another instance and it attempting + # to acquire the lock. + next = await self.store.get_next_staged_event_for_room( + room_id, room_version + ) + if not next: + break + + origin, event = next + + lock = await self.store.try_acquire_lock( + _INBOUND_EVENT_HANDLING_LOCK_NAME, room_id + ) + if not lock: + return def __str__(self) -> str: return "<ReplicationLayer(%s)>" % self.server_name