summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py12
-rw-r--r--synapse/federation/federation_client.py38
-rw-r--r--synapse/federation/federation_server.py12
-rw-r--r--synapse/federation/send_queue.py47
-rw-r--r--synapse/federation/transport/server/_base.py39
5 files changed, 49 insertions, 99 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py

index f56344a3b9..addc0bf000 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py
@@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from collections import namedtuple from typing import TYPE_CHECKING from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership @@ -104,10 +103,6 @@ class FederationBase: return pdu -class PduToCheckSig(namedtuple("PduToCheckSig", ["pdu", "sender_domain", "deferreds"])): - pass - - async def _check_sigs_on_pdu( keyring: Keyring, room_version: RoomVersion, pdu: EventBase ) -> None: @@ -220,15 +215,12 @@ def _is_invite_via_3pid(event: EventBase) -> bool: ) -def event_from_pdu_json( - pdu_json: JsonDict, room_version: RoomVersion, outlier: bool = False -) -> EventBase: +def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventBase: """Construct an EventBase from an event json received over federation Args: pdu_json: pdu as received over federation room_version: The version of the room this event belongs to - outlier: True to mark this event as an outlier Raises: SynapseError: if the pdu is missing required fields or is otherwise @@ -252,6 +244,4 @@ def event_from_pdu_json( validate_canonicaljson(pdu_json) event = make_event_from_dict(pdu_json, room_version) - event.internal_metadata.outlier = outlier - return event diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index fee1477ab6..6ea4edfc71 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -265,14 +265,11 @@ class FederationClient(FederationBase): room_version = await self.store.get_room_version(room_id) - pdus = [ - event_from_pdu_json(p, room_version, outlier=False) - for p in transaction_data_pdus - ] + pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus] # Check signatures and hash of pdus, removing any from the list that fail checks pdus[:] = await self._check_sigs_and_hash_and_fetch( - dest, pdus, outlier=True, room_version=room_version + dest, pdus, room_version=room_version ) return pdus @@ -282,7 +279,6 @@ class FederationClient(FederationBase): destination: str, event_id: str, room_version: RoomVersion, - outlier: bool = False, timeout: Optional[int] = None, ) -> Optional[EventBase]: """Requests the PDU with given origin and ID from the remote home @@ -292,9 +288,6 @@ class FederationClient(FederationBase): destination: Which homeserver to query event_id: event to fetch room_version: version of the room - outlier: Indicates whether the PDU is an `outlier`, i.e. if - it's from an arbitrary point in the context as opposed to part - of the current block of PDUs. Defaults to `False` timeout: How long to try (in ms) each destination for before moving to the next destination. None indicates no timeout. @@ -316,8 +309,7 @@ class FederationClient(FederationBase): ) pdu_list: List[EventBase] = [ - event_from_pdu_json(p, room_version, outlier=outlier) - for p in transaction_data["pdus"] + event_from_pdu_json(p, room_version) for p in transaction_data["pdus"] ] if pdu_list and pdu_list[0]: @@ -334,7 +326,6 @@ class FederationClient(FederationBase): destinations: Iterable[str], event_id: str, room_version: RoomVersion, - outlier: bool = False, timeout: Optional[int] = None, ) -> Optional[EventBase]: """Requests the PDU with given origin and ID from the remote home @@ -347,9 +338,6 @@ class FederationClient(FederationBase): destinations: Which homeservers to query event_id: event to fetch room_version: version of the room - outlier: Indicates whether the PDU is an `outlier`, i.e. if - it's from an arbitrary point in the context as opposed to part - of the current block of PDUs. Defaults to `False` timeout: How long to try (in ms) each destination for before moving to the next destination. None indicates no timeout. @@ -377,7 +365,6 @@ class FederationClient(FederationBase): destination=destination, event_id=event_id, room_version=room_version, - outlier=outlier, timeout=timeout, ) @@ -435,7 +422,6 @@ class FederationClient(FederationBase): origin: str, pdus: Collection[EventBase], room_version: RoomVersion, - outlier: bool = False, ) -> List[EventBase]: """Takes a list of PDUs and checks the signatures and hashes of each one. If a PDU fails its signature check then we check if we have it in @@ -451,7 +437,6 @@ class FederationClient(FederationBase): origin pdu room_version - outlier: Whether the events are outliers or not Returns: A list of PDUs that have valid signatures and hashes. @@ -466,7 +451,6 @@ class FederationClient(FederationBase): valid_pdu = await self._check_sigs_and_hash_and_fetch_one( pdu=pdu, origin=origin, - outlier=outlier, room_version=room_version, ) @@ -482,7 +466,6 @@ class FederationClient(FederationBase): pdu: EventBase, origin: str, room_version: RoomVersion, - outlier: bool = False, ) -> Optional[EventBase]: """Takes a PDU and checks its signatures and hashes. If the PDU fails its signature check then we check if we have it in the database and if @@ -494,9 +477,6 @@ class FederationClient(FederationBase): origin pdu room_version - outlier: Whether the events are outliers or not - include_none: Whether to include None in the returned list - for events that have failed their checks Returns: The PDU (possibly redacted) if it has valid signatures and hashes. @@ -521,7 +501,6 @@ class FederationClient(FederationBase): destinations=[pdu_origin], event_id=pdu.event_id, room_version=room_version, - outlier=outlier, timeout=10000, ) except SynapseError: @@ -541,13 +520,10 @@ class FederationClient(FederationBase): room_version = await self.store.get_room_version(room_id) - auth_chain = [ - event_from_pdu_json(p, room_version, outlier=True) - for p in res["auth_chain"] - ] + auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]] signed_auth = await self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True, room_version=room_version + destination, auth_chain, room_version=room_version ) return signed_auth @@ -816,7 +792,6 @@ class FederationClient(FederationBase): valid_pdu = await self._check_sigs_and_hash_and_fetch_one( pdu=event, origin=destination, - outlier=True, room_version=room_version, ) @@ -864,7 +839,6 @@ class FederationClient(FederationBase): valid_pdu = await self._check_sigs_and_hash_and_fetch_one( pdu=pdu, origin=destination, - outlier=True, room_version=room_version, ) @@ -1235,7 +1209,7 @@ class FederationClient(FederationBase): ] signed_events = await self._check_sigs_and_hash_and_fetch( - destination, events, outlier=False, room_version=room_version + destination, events, room_version=room_version ) except HttpResponseException as e: if not e.code == 400: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 8e37e76206..ee71f289c8 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py
@@ -28,9 +28,9 @@ from typing import ( Union, ) +from matrix_common.regex import glob_to_regex from prometheus_client import Counter, Gauge, Histogram -from twisted.internet import defer from twisted.internet.abstract import isIPAddress from twisted.python import failure @@ -66,8 +66,8 @@ from synapse.replication.http.federation import ( ) from synapse.storage.databases.main.lock import Lock from synapse.types import JsonDict, get_domain_from_id -from synapse.util import glob_to_regex, json_decoder, unwrapFirstError -from synapse.util.async_helpers import Linearizer, concurrently_execute +from synapse.util import json_decoder, unwrapFirstError +from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results from synapse.util.caches.response_cache import ResponseCache from synapse.util.stringutils import parse_server_name @@ -360,13 +360,13 @@ class FederationServer(FederationBase): # want to block things like to device messages from reaching clients # behind the potentially expensive handling of PDUs. pdu_results, _ = await make_deferred_yieldable( - defer.gatherResults( - [ + gather_results( + ( run_in_background( self._handle_pdus_in_txn, origin, transaction, request_time ), run_in_background(self._handle_edus_in_txn, origin, transaction), - ], + ), consumeErrors=True, ).addErrback(unwrapFirstError) ) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 63289a5a33..0d7c4f5067 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py
@@ -30,7 +30,6 @@ Events are replicated via a separate events stream. """ import logging -from collections import namedtuple from typing import ( TYPE_CHECKING, Dict, @@ -43,6 +42,7 @@ from typing import ( Type, ) +import attr from sortedcontainers import SortedDict from synapse.api.presence import UserPresenceState @@ -382,13 +382,11 @@ class BaseFederationRow: raise NotImplementedError() -class PresenceDestinationsRow( - BaseFederationRow, - namedtuple( - "PresenceDestinationsRow", - ("state", "destinations"), # UserPresenceState # list[str] - ), -): +@attr.s(slots=True, frozen=True, auto_attribs=True) +class PresenceDestinationsRow(BaseFederationRow): + state: UserPresenceState + destinations: List[str] + TypeId = "pd" @staticmethod @@ -404,17 +402,15 @@ class PresenceDestinationsRow( buff.presence_destinations.append((self.state, self.destinations)) -class KeyedEduRow( - BaseFederationRow, - namedtuple( - "KeyedEduRow", - ("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu - ), -): +@attr.s(slots=True, frozen=True, auto_attribs=True) +class KeyedEduRow(BaseFederationRow): """Streams EDUs that have an associated key that is ued to clobber. For example, typing EDUs clobber based on room_id. """ + key: Tuple[str, ...] # the edu key passed to send_edu + edu: Edu + TypeId = "k" @staticmethod @@ -428,9 +424,12 @@ class KeyedEduRow( buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu -class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu +@attr.s(slots=True, frozen=True, auto_attribs=True) +class EduRow(BaseFederationRow): """Streams EDUs that don't have keys. See KeyedEduRow""" + edu: Edu + TypeId = "e" @staticmethod @@ -453,14 +452,14 @@ _rowtypes: Tuple[Type[BaseFederationRow], ...] = ( TypeToRow = {Row.TypeId: Row for Row in _rowtypes} -ParsedFederationStreamData = namedtuple( - "ParsedFederationStreamData", - ( - "presence_destinations", # list of tuples of UserPresenceState and destinations - "keyed_edus", # dict of destination -> { key -> Edu } - "edus", # dict of destination -> [Edu] - ), -) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class ParsedFederationStreamData: + # list of tuples of UserPresenceState and destinations + presence_destinations: List[Tuple[UserPresenceState, List[str]]] + # dict of destination -> { key -> Edu } + keyed_edus: Dict[str, Dict[Tuple[str, ...], Edu]] + # dict of destination -> [Edu] + edus: Dict[str, List[Edu]] def process_rows_for_federation( diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index dc39e3537b..da1fbf8b63 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py
@@ -22,13 +22,11 @@ from synapse.api.urls import FEDERATION_V1_PREFIX from synapse.http.server import HttpServer, ServletCallback from synapse.http.servlet import parse_json_object_from_request from synapse.http.site import SynapseRequest -from synapse.logging import opentracing from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( - SynapseTags, - start_active_span, - start_active_span_from_request, - tags, + set_tag, + span_context_from_request, + start_active_span_follows_from, whitelisted_homeserver, ) from synapse.server import HomeServer @@ -279,30 +277,19 @@ class BaseFederationServlet: logger.warning("authenticate_request failed: %s", e) raise - request_tags = { - SynapseTags.REQUEST_ID: request.get_request_id(), - tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, - tags.HTTP_METHOD: request.get_method(), - tags.HTTP_URL: request.get_redacted_uri(), - tags.PEER_HOST_IPV6: request.getClientIP(), - "authenticated_entity": origin, - "servlet_name": request.request_metrics.name, - } - - # Only accept the span context if the origin is authenticated - # and whitelisted + # update the active opentracing span with the authenticated entity + set_tag("authenticated_entity", origin) + + # if the origin is authenticated and whitelisted, link to its span context + context = None if origin and whitelisted_homeserver(origin): - scope = start_active_span_from_request( - request, "incoming-federation-request", tags=request_tags - ) - else: - scope = start_active_span( - "incoming-federation-request", tags=request_tags - ) + context = span_context_from_request(request) - with scope: - opentracing.inject_response_headers(request.responseHeaders) + scope = start_active_span_follows_from( + "incoming-federation-request", contexts=(context,) if context else () + ) + with scope: if origin and self.RATELIMIT: with ratelimiter.ratelimit(origin) as d: await d