diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_base.py | 29 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 365 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 72 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 99 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 88 | ||||
-rw-r--r-- | synapse/federation/sender/transaction_manager.py | 16 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 6 | ||||
-rw-r--r-- | synapse/federation/units.py | 23 |
8 files changed, 361 insertions, 337 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 0e22183280..eea64c1c9f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,9 +23,13 @@ from twisted.internet.defer import DeferredList from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions +from synapse.api.room_versions import ( + KNOWN_ROOM_VERSIONS, + EventFormatVersions, + RoomVersion, +) from synapse.crypto.event_signing import check_event_content_hash -from synapse.events import event_type_from_format_version +from synapse.events import EventBase, make_event_from_dict from synapse.events.utils import prune_event from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( @@ -33,7 +38,7 @@ from synapse.logging.context import ( make_deferred_yieldable, preserve_fn, ) -from synapse.types import get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util import unwrapFirstError logger = logging.getLogger(__name__) @@ -342,16 +347,15 @@ def _is_invite_via_3pid(event): ) -def event_from_pdu_json(pdu_json, event_format_version, outlier=False): - """Construct a FrozenEvent from an event json received over federation +def event_from_pdu_json( + pdu_json: JsonDict, room_version: RoomVersion, outlier: bool = False +) -> EventBase: + """Construct an EventBase from an event json received over federation Args: - pdu_json (object): pdu as received over federation - event_format_version (int): The event format version - outlier (bool): True to mark this event as an outlier - - Returns: - FrozenEvent + pdu_json: pdu as received over federation + room_version: The version of the room this event belongs to + outlier: True to mark this event as an outlier Raises: SynapseError: if the pdu is missing required fields or is otherwise @@ -370,8 +374,7 @@ def event_from_pdu_json(pdu_json, event_format_version, outlier=False): elif depth > MAX_DEPTH: raise SynapseError(400, "Depth too large", Codes.BAD_JSON) - event = event_type_from_format_version(event_format_version)(pdu_json) - + event = make_event_from_dict(pdu_json, room_version) event.internal_metadata.outlier = outlier return event diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f99d17a7de..4870e39652 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -17,7 +17,18 @@ import copy import itertools import logging -from typing import Dict, Iterable +from typing import ( + Any, + Awaitable, + Callable, + Dict, + Iterable, + List, + Optional, + Sequence, + Tuple, + TypeVar, +) from prometheus_client import Counter @@ -35,12 +46,14 @@ from synapse.api.errors import ( from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, EventFormatVersions, + RoomVersion, RoomVersions, ) -from synapse.events import builder, room_version_to_event_format +from synapse.events import EventBase, builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.logging.context import make_deferred_yieldable from synapse.logging.utils import log_function +from synapse.types import JsonDict from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -52,6 +65,8 @@ sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["t PDU_RETRY_TIME_MS = 1 * 60 * 1000 +T = TypeVar("T") + class InvalidResponseError(RuntimeError): """Helper for _try_destination_list: indicates that the server returned a response @@ -170,21 +185,17 @@ class FederationClient(FederationBase): sent_queries_counter.labels("client_one_time_keys").inc() return self.transport_layer.claim_client_keys(destination, content, timeout) - @defer.inlineCallbacks - @log_function - def backfill(self, dest, room_id, limit, extremities): - """Requests some more historic PDUs for the given context from the + async def backfill( + self, dest: str, room_id: str, limit: int, extremities: Iterable[str] + ) -> List[EventBase]: + """Requests some more historic PDUs for the given room from the given destination server. Args: dest (str): The remote homeserver to ask. room_id (str): The room_id to backfill. - limit (int): The maximum number of PDUs to return. - extremities (list): List of PDU id and origins of the first pdus - we have seen from the context - - Returns: - Deferred: Results in the received PDUs. + limit (int): The maximum number of events to return. + extremities (list): our current backwards extremities, to backfill from """ logger.debug("backfill extrem=%s", extremities) @@ -192,34 +203,37 @@ class FederationClient(FederationBase): if not extremities: return - transaction_data = yield self.transport_layer.backfill( + transaction_data = await self.transport_layer.backfill( dest, room_id, extremities, limit ) logger.debug("backfill transaction_data=%r", transaction_data) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) pdus = [ - event_from_pdu_json(p, format_ver, outlier=False) + event_from_pdu_json(p, room_version, outlier=False) for p in transaction_data["pdus"] ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield make_deferred_yieldable( + pdus[:] = await make_deferred_yieldable( defer.gatherResults( - self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True + self._check_sigs_and_hashes(room_version.identifier, pdus), + consumeErrors=True, ).addErrback(unwrapFirstError) ) return pdus - @defer.inlineCallbacks - @log_function - def get_pdu( - self, destinations, event_id, room_version, outlier=False, timeout=None - ): + async def get_pdu( + self, + destinations: Iterable[str], + event_id: str, + room_version: RoomVersion, + outlier: bool = False, + timeout: Optional[int] = None, + ) -> Optional[EventBase]: """Requests the PDU with given origin and ID from the remote home servers. @@ -227,18 +241,17 @@ class FederationClient(FederationBase): one succeeds. Args: - destinations (list): Which homeservers to query - event_id (str): event to fetch - room_version (str): version of the room - outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if + destinations: Which homeservers to query + event_id: event to fetch + room_version: version of the room + outlier: Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` - timeout (int): How long to try (in ms) each destination for before + timeout: How long to try (in ms) each destination for before moving to the next destination. None indicates no timeout. Returns: - Deferred: Results in the requested PDU, or None if we were unable to find - it. + The requested PDU, or None if we were unable to find it. """ # TODO: Rate limit the number of times we try and get the same event. @@ -249,8 +262,6 @@ class FederationClient(FederationBase): pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) - format_ver = room_version_to_event_format(room_version) - signed_pdu = None for destination in destinations: now = self._clock.time_msec() @@ -259,7 +270,7 @@ class FederationClient(FederationBase): continue try: - transaction_data = yield self.transport_layer.get_event( + transaction_data = await self.transport_layer.get_event( destination, event_id, timeout=timeout ) @@ -271,7 +282,7 @@ class FederationClient(FederationBase): ) pdu_list = [ - event_from_pdu_json(p, format_ver, outlier=outlier) + event_from_pdu_json(p, room_version, outlier=outlier) for p in transaction_data["pdus"] ] @@ -279,7 +290,9 @@ class FederationClient(FederationBase): pdu = pdu_list[0] # Check signatures are correct. - signed_pdu = yield self._check_sigs_and_hash(room_version, pdu) + signed_pdu = await self._check_sigs_and_hash( + room_version.identifier, pdu + ) break @@ -309,15 +322,16 @@ class FederationClient(FederationBase): return signed_pdu - @defer.inlineCallbacks - def get_room_state_ids(self, destination: str, room_id: str, event_id: str): + async def get_room_state_ids( + self, destination: str, room_id: str, event_id: str + ) -> Tuple[List[str], List[str]]: """Calls the /state_ids endpoint to fetch the state at a particular point in the room, and the auth events for the given event Returns: - Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids) + a tuple of (state event_ids, auth event_ids) """ - result = yield self.transport_layer.get_room_state_ids( + result = await self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id ) @@ -331,37 +345,39 @@ class FederationClient(FederationBase): return state_event_ids, auth_event_ids - @defer.inlineCallbacks - @log_function - def get_event_auth(self, destination, room_id, event_id): - res = yield self.transport_layer.get_event_auth(destination, room_id, event_id) + async def get_event_auth(self, destination, room_id, event_id): + res = await self.transport_layer.get_event_auth(destination, room_id, event_id) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) auth_chain = [ - event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"] + event_from_pdu_json(p, room_version, outlier=True) + for p in res["auth_chain"] ] - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True, room_version=room_version + signed_auth = await self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True, room_version=room_version.identifier ) signed_auth.sort(key=lambda e: e.depth) return signed_auth - @defer.inlineCallbacks - def _try_destination_list(self, description, destinations, callback): + async def _try_destination_list( + self, + description: str, + destinations: Iterable[str], + callback: Callable[[str], Awaitable[T]], + ) -> T: """Try an operation on a series of servers, until it succeeds Args: - description (unicode): description of the operation we're doing, for logging + description: description of the operation we're doing, for logging - destinations (Iterable[unicode]): list of server_names to try + destinations: list of server_names to try - callback (callable): Function to run for each server. Passed a single - argument: the server_name to try. May return a deferred. + callback: Function to run for each server. Passed a single + argument: the server_name to try. If the callback raises a CodeMessageException with a 300/400 code, attempts to perform the operation stop immediately and the exception is @@ -372,7 +388,7 @@ class FederationClient(FederationBase): suppressed if the exception is an InvalidResponseError. Returns: - The [Deferred] result of callback, if it succeeds + The result of callback, if it succeeds Raises: SynapseError if the chosen remote server returns a 300/400 code, or @@ -383,7 +399,7 @@ class FederationClient(FederationBase): continue try: - res = yield callback(destination) + res = await callback(destination) return res except InvalidResponseError as e: logger.warning("Failed to %s via %s: %s", description, destination, e) @@ -402,12 +418,12 @@ class FederationClient(FederationBase): ) except Exception: logger.warning( - "Failed to %s via %s", description, destination, exc_info=1 + "Failed to %s via %s", description, destination, exc_info=True ) raise SynapseError(502, "Failed to %s via any server" % (description,)) - def make_membership_event( + async def make_membership_event( self, destinations: Iterable[str], room_id: str, @@ -415,7 +431,7 @@ class FederationClient(FederationBase): membership: str, content: dict, params: Dict[str, str], - ): + ) -> Tuple[str, EventBase, RoomVersion]: """ Creates an m.room.member event, with context, without participating in the room. @@ -436,19 +452,19 @@ class FederationClient(FederationBase): content: Any additional data to put into the content field of the event. params: Query parameters to include in the request. - Return: - Deferred[Tuple[str, FrozenEvent, RoomVersion]]: resolves to a tuple of + + Returns: `(origin, event, room_version)` where origin is the remote homeserver which generated the event, and room_version is the version of the room. - Fails with a `UnsupportedRoomVersionError` if remote responds with - a room version we don't understand. + Raises: + UnsupportedRoomVersionError: if remote responds with + a room version we don't understand. - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + SynapseError: if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError: if no servers were reachable. """ valid_memberships = {Membership.JOIN, Membership.LEAVE} if membership not in valid_memberships: @@ -457,9 +473,8 @@ class FederationClient(FederationBase): % (membership, ",".join(valid_memberships)) ) - @defer.inlineCallbacks - def send_request(destination): - ret = yield self.transport_layer.make_membership_event( + async def send_request(destination: str) -> Tuple[str, EventBase, RoomVersion]: + ret = await self.transport_layer.make_membership_event( destination, room_id, user_id, membership, params ) @@ -492,88 +507,83 @@ class FederationClient(FederationBase): event_dict=pdu_dict, ) - return (destination, ev, room_version) + return destination, ev, room_version - return self._try_destination_list( + return await self._try_destination_list( "make_" + membership, destinations, send_request ) - def send_join(self, destinations, pdu, event_format_version): + async def send_join( + self, destinations: Iterable[str], pdu: EventBase, room_version: RoomVersion + ) -> Dict[str, Any]: """Sends a join event to one of a list of homeservers. Doing so will cause the remote server to add the event to the graph, and send the event out to the rest of the federation. Args: - destinations (str): Candidate homeservers which are probably + destinations: Candidate homeservers which are probably participating in the room. - pdu (BaseEvent): event to be sent - event_format_version (int): The event format version + pdu: event to be sent + room_version: the version of the room (according to the server that + did the make_join) - Return: - Deferred: resolves to a dict with members ``origin`` (a string - giving the serer the event was sent to, ``state`` (?) and + Returns: + a dict with members ``origin`` (a string + giving the server the event was sent to, ``state`` (?) and ``auth_chain``. - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + Raises: + SynapseError: if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError: if no servers were reachable. """ - def check_authchain_validity(signed_auth_chain): - for e in signed_auth_chain: - if e.type == EventTypes.Create: - create_event = e - break - else: - raise InvalidResponseError("no %s in auth chain" % (EventTypes.Create,)) - - # the room version should be sane. - room_version = create_event.content.get("room_version", "1") - if room_version not in KNOWN_ROOM_VERSIONS: - # This shouldn't be possible, because the remote server should have - # rejected the join attempt during make_join. - raise InvalidResponseError( - "room appears to have unsupported version %s" % (room_version,) - ) - - @defer.inlineCallbacks - def send_request(destination): - content = yield self._do_send_join(destination, pdu) + async def send_request(destination) -> Dict[str, Any]: + content = await self._do_send_join(destination, pdu) logger.debug("Got content: %s", content) state = [ - event_from_pdu_json(p, event_format_version, outlier=True) + event_from_pdu_json(p, room_version, outlier=True) for p in content.get("state", []) ] auth_chain = [ - event_from_pdu_json(p, event_format_version, outlier=True) + event_from_pdu_json(p, room_version, outlier=True) for p in content.get("auth_chain", []) ] pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)} - room_version = None + create_event = None for e in state: if (e.type, e.state_key) == (EventTypes.Create, ""): - room_version = e.content.get( - "room_version", RoomVersions.V1.identifier - ) + create_event = e break - if room_version is None: + if create_event is None: # If the state doesn't have a create event then the room is # invalid, and it would fail auth checks anyway. raise SynapseError(400, "No create event in state") - valid_pdus = yield self._check_sigs_and_hash_and_fetch( + # the room version should be sane. + create_room_version = create_event.content.get( + "room_version", RoomVersions.V1.identifier + ) + if create_room_version != room_version.identifier: + # either the server that fulfilled the make_join, or the server that is + # handling the send_join, is lying. + raise InvalidResponseError( + "Unexpected room version %s in create event" + % (create_room_version,) + ) + + valid_pdus = await self._check_sigs_and_hash_and_fetch( destination, list(pdus.values()), outlier=True, - room_version=room_version, + room_version=room_version.identifier, ) valid_pdus_map = {p.event_id: p for p in valid_pdus} @@ -597,7 +607,17 @@ class FederationClient(FederationBase): for s in signed_state: s.internal_metadata = copy.deepcopy(s.internal_metadata) - check_authchain_validity(signed_auth) + # double-check that the same create event has ended up in the auth chain + auth_chain_create_events = [ + e.event_id + for e in signed_auth + if (e.type, e.state_key) == (EventTypes.Create, "") + ] + if auth_chain_create_events != [create_event.event_id]: + raise InvalidResponseError( + "Unexpected create event(s) in auth chain" + % (auth_chain_create_events,) + ) return { "state": signed_state, @@ -605,14 +625,13 @@ class FederationClient(FederationBase): "origin": destination, } - return self._try_destination_list("send_join", destinations, send_request) + return await self._try_destination_list("send_join", destinations, send_request) - @defer.inlineCallbacks - def _do_send_join(self, destination, pdu): + async def _do_send_join(self, destination: str, pdu: EventBase): time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_join_v2( + content = await self.transport_layer.send_join_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -634,7 +653,7 @@ class FederationClient(FederationBase): logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API") - resp = yield self.transport_layer.send_join_v1( + resp = await self.transport_layer.send_join_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -645,51 +664,45 @@ class FederationClient(FederationBase): # content. return resp[1] - @defer.inlineCallbacks - def send_invite(self, destination, room_id, event_id, pdu): - room_version = yield self.store.get_room_version_id(room_id) + async def send_invite( + self, destination: str, room_id: str, event_id: str, pdu: EventBase, + ) -> EventBase: + room_version = await self.store.get_room_version(room_id) - content = yield self._do_send_invite(destination, pdu, room_version) + content = await self._do_send_invite(destination, pdu, room_version) pdu_dict = content["event"] logger.debug("Got response to send_invite: %s", pdu_dict) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) - - pdu = event_from_pdu_json(pdu_dict, format_ver) + pdu = event_from_pdu_json(pdu_dict, room_version) # Check signatures are correct. - pdu = yield self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) # FIXME: We should handle signature failures more gracefully. return pdu - @defer.inlineCallbacks - def _do_send_invite(self, destination, pdu, room_version): + async def _do_send_invite( + self, destination: str, pdu: EventBase, room_version: RoomVersion + ) -> JsonDict: """Actually sends the invite, first trying v2 API and falling back to v1 API if necessary. - Args: - destination (str): Target server - pdu (FrozenEvent) - room_version (str) - Returns: - dict: The event as a dict as returned by the remote server + The event as a dict as returned by the remote server """ time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_invite_v2( + content = await self.transport_layer.send_invite_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, content={ "event": pdu.get_pdu_json(time_now), - "room_version": room_version, + "room_version": room_version.identifier, "invite_room_state": pdu.unsigned.get("invite_room_state", []), }, ) @@ -707,8 +720,7 @@ class FederationClient(FederationBase): # Otherwise, we assume that the remote server doesn't understand # the v2 invite API. That's ok provided the room uses old-style event # IDs. - v = KNOWN_ROOM_VERSIONS.get(room_version) - if v.event_format != EventFormatVersions.V1: + if room_version.event_format != EventFormatVersions.V1: raise SynapseError( 400, "User's homeserver does not support this room version", @@ -722,7 +734,7 @@ class FederationClient(FederationBase): # Didn't work, try v1 API. # Note the v1 API returns a tuple of `(200, content)` - _, content = yield self.transport_layer.send_invite_v1( + _, content = await self.transport_layer.send_invite_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -730,7 +742,7 @@ class FederationClient(FederationBase): ) return content - def send_leave(self, destinations, pdu): + async def send_leave(self, destinations: Iterable[str], pdu: EventBase) -> None: """Sends a leave event to one of a list of homeservers. Doing so will cause the remote server to add the event to the graph, @@ -739,34 +751,29 @@ class FederationClient(FederationBase): This is mostly useful to reject received invites. Args: - destinations (str): Candidate homeservers which are probably + destinations: Candidate homeservers which are probably participating in the room. - pdu (BaseEvent): event to be sent + pdu: event to be sent - Return: - Deferred: resolves to None. - - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + Raises: + SynapseError if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError if no servers were reachable. """ - @defer.inlineCallbacks - def send_request(destination): - content = yield self._do_send_leave(destination, pdu) - + async def send_request(destination: str) -> None: + content = await self._do_send_leave(destination, pdu) logger.debug("Got content: %s", content) - return None - return self._try_destination_list("send_leave", destinations, send_request) + return await self._try_destination_list( + "send_leave", destinations, send_request + ) - @defer.inlineCallbacks - def _do_send_leave(self, destination, pdu): + async def _do_send_leave(self, destination, pdu): time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_leave_v2( + content = await self.transport_layer.send_leave_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -788,7 +795,7 @@ class FederationClient(FederationBase): logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API") - resp = yield self.transport_layer.send_leave_v1( + resp = await self.transport_layer.send_leave_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -820,34 +827,33 @@ class FederationClient(FederationBase): third_party_instance_id=third_party_instance_id, ) - @defer.inlineCallbacks - def get_missing_events( + async def get_missing_events( self, - destination, - room_id, - earliest_events_ids, - latest_events, - limit, - min_depth, - timeout, - ): + destination: str, + room_id: str, + earliest_events_ids: Sequence[str], + latest_events: Iterable[EventBase], + limit: int, + min_depth: int, + timeout: int, + ) -> List[EventBase]: """Tries to fetch events we are missing. This is called when we receive an event without having received all of its ancestors. Args: - destination (str) - room_id (str) - earliest_events_ids (list): List of event ids. Effectively the + destination + room_id + earliest_events_ids: List of event ids. Effectively the events we expected to receive, but haven't. `get_missing_events` should only return events that didn't happen before these. - latest_events (list): List of events we have received that we don't + latest_events: List of events we have received that we don't have all previous events for. - limit (int): Maximum number of events to return. - min_depth (int): Minimum depth of events tor return. - timeout (int): Max time to wait in ms + limit: Maximum number of events to return. + min_depth: Minimum depth of events to return. + timeout: Max time to wait in ms """ try: - content = yield self.transport_layer.get_missing_events( + content = await self.transport_layer.get_missing_events( destination=destination, room_id=room_id, earliest_events=earliest_events_ids, @@ -857,15 +863,14 @@ class FederationClient(FederationBase): timeout=timeout, ) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) events = [ - event_from_pdu_json(e, format_ver) for e in content.get("events", []) + event_from_pdu_json(e, room_version) for e in content.get("events", []) ] - signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=False, room_version=room_version + signed_events = await self._check_sigs_and_hash_and_fetch( + destination, events, outlier=False, room_version=room_version.identifier ) except HttpResponseException as e: if not e.code == 400: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index a4c97ed458..7f9da49326 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -38,7 +38,6 @@ from synapse.api.errors import ( UnsupportedRoomVersionError, ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS -from synapse.events import room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction @@ -54,7 +53,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, ) -from synapse.types import get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util import glob_to_regex, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -82,6 +81,8 @@ class FederationServer(FederationBase): self.handler = hs.get_handlers().federation_handler self.state = hs.get_state_handler() + self.device_handler = hs.get_device_handler() + self._server_linearizer = Linearizer("fed_server") self._transaction_linearizer = Linearizer("fed_txn_handler") @@ -234,24 +235,17 @@ class FederationServer(FederationBase): continue try: - room_version = await self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version(room_id) except NotFoundError: logger.info("Ignoring PDU for unknown room_id: %s", room_id) continue - - try: - format_ver = room_version_to_event_format(room_version) - except UnsupportedRoomVersionError: + except UnsupportedRoomVersionError as e: # this can happen if support for a given room version is withdrawn, # so that we still get events for said room. - logger.info( - "Ignoring PDU for room %s with unknown version %s", - room_id, - room_version, - ) + logger.info("Ignoring PDU: %s", e) continue - event = event_from_pdu_json(p, format_ver) + event = event_from_pdu_json(p, room_version) pdus_by_room.setdefault(room_id, []).append(event) pdu_results = {} @@ -302,7 +296,12 @@ class FederationServer(FederationBase): async def _process_edu(edu_dict): received_edus_counter.inc() - edu = Edu(**edu_dict) + edu = Edu( + origin=origin, + destination=self.server_name, + edu_type=edu_dict["edu_type"], + content=edu_dict["content"], + ) await self.registry.on_edu(edu.edu_type, origin, edu.content) await concurrently_execute( @@ -396,20 +395,21 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() return {"event": pdu.get_pdu_json(time_now), "room_version": room_version} - async def on_invite_request(self, origin, content, room_version): - if room_version not in KNOWN_ROOM_VERSIONS: + async def on_invite_request( + self, origin: str, content: JsonDict, room_version_id: str + ): + room_version = KNOWN_ROOM_VERSIONS.get(room_version_id) + if not room_version: raise SynapseError( 400, "Homeserver does not support this room version", Codes.UNSUPPORTED_ROOM_VERSION, ) - format_ver = room_version_to_event_format(room_version) - - pdu = event_from_pdu_json(content, format_ver) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) - pdu = await self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version) time_now = self._clock.time_msec() return {"event": ret_pdu.get_pdu_json(time_now)} @@ -417,16 +417,15 @@ class FederationServer(FederationBase): async def on_send_join_request(self, origin, content, room_id): logger.debug("on_send_join_request: content: %s", content) - room_version = await self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) - pdu = event_from_pdu_json(content, format_ver) + room_version = await self.store.get_room_version(room_id) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) - pdu = await self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) res_pdus = await self.handler.on_send_join_request(origin, pdu) time_now = self._clock.time_msec() @@ -448,16 +447,15 @@ class FederationServer(FederationBase): async def on_send_leave_request(self, origin, content, room_id): logger.debug("on_send_leave_request: content: %s", content) - room_version = await self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) - pdu = event_from_pdu_json(content, format_ver) + room_version = await self.store.get_room_version(room_id) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures) - pdu = await self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) await self.handler.on_send_leave_request(origin, pdu) return {} @@ -495,15 +493,14 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) - room_version = await self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) auth_chain = [ - event_from_pdu_json(e, format_ver) for e in content["auth_chain"] + event_from_pdu_json(e, room_version) for e in content["auth_chain"] ] signed_auth = await self._check_sigs_and_hash_and_fetch( - origin, auth_chain, outlier=True, room_version=room_version + origin, auth_chain, outlier=True, room_version=room_version.identifier ) ret = await self.handler.on_query_auth( @@ -528,8 +525,9 @@ class FederationServer(FederationBase): def on_query_client_keys(self, origin, content): return self.on_query_request("client_keys", content) - def on_query_user_devices(self, origin, user_id): - return self.on_query_request("user_devices", user_id) + async def on_query_user_devices(self, origin: str, user_id: str): + keys = await self.device_handler.on_federation_query_user_devices(user_id) + return 200, keys @trace async def on_claim_client_keys(self, origin, content): @@ -570,7 +568,7 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) - logger.info( + logger.debug( "on_get_missing_events: earliest_events: %r, latest_events: %r," " limit: %d", earliest_events, @@ -583,11 +581,11 @@ class FederationServer(FederationBase): ) if len(missing_events) < 5: - logger.info( + logger.debug( "Returning %d events: %r", len(missing_events), missing_events ) else: - logger.info("Returning %d events", len(missing_events)) + logger.debug("Returning %d events", len(missing_events)) time_now = self._clock.time_msec() diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 36c83c3027..233cb33daf 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Dict, Hashable, Iterable, List, Optional, Set from six import itervalues @@ -23,6 +24,7 @@ from twisted.internet import defer import synapse import synapse.metrics +from synapse.events import EventBase from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu @@ -39,6 +41,8 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage.presence import UserPresenceState +from synapse.types import ReadReceipt from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -68,7 +72,7 @@ class FederationSender(object): self._transaction_manager = TransactionManager(hs) # map from destination to PerDestinationQueue - self._per_destination_queues = {} # type: dict[str, PerDestinationQueue] + self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] LaterGauge( "synapse_federation_transaction_queue_pending_destinations", @@ -84,7 +88,7 @@ class FederationSender(object): # Map of user_id -> UserPresenceState for all the pending presence # to be sent out by user_id. Entries here get processed and put in # pending_presence_by_dest - self.pending_presence = {} + self.pending_presence = {} # type: Dict[str, UserPresenceState] LaterGauge( "synapse_federation_transaction_queue_pending_pdus", @@ -116,20 +120,17 @@ class FederationSender(object): # and that there is a pending call to _flush_rrs_for_room in the system. self._queues_awaiting_rr_flush_by_room = ( {} - ) # type: dict[str, set[PerDestinationQueue]] + ) # type: Dict[str, Set[PerDestinationQueue]] self._rr_txn_interval_per_room_ms = ( - 1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second + 1000.0 / hs.config.federation_rr_transactions_per_room_per_second ) - def _get_per_destination_queue(self, destination): + def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination Args: - destination (str): server_name of remote server - - Returns: - PerDestinationQueue + destination: server_name of remote server """ queue = self._per_destination_queues.get(destination) if not queue: @@ -137,7 +138,7 @@ class FederationSender(object): self._per_destination_queues[destination] = queue return queue - def notify_new_events(self, current_id): + def notify_new_events(self, current_id: int) -> None: """This gets called when we have some new events we might want to send out to other servers. """ @@ -151,13 +152,12 @@ class FederationSender(object): "process_event_queue_for_federation", self._process_event_queue_loop ) - @defer.inlineCallbacks - def _process_event_queue_loop(self): + async def _process_event_queue_loop(self) -> None: try: self._is_processing = True while True: - last_token = yield self.store.get_federation_out_pos("events") - next_token, events = yield self.store.get_all_new_events_stream( + last_token = await self.store.get_federation_out_pos("events") + next_token, events = await self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 ) @@ -166,8 +166,7 @@ class FederationSender(object): if not events and next_token >= self._last_poked_id: break - @defer.inlineCallbacks - def handle_event(event): + async def handle_event(event: EventBase) -> None: # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.sender) @@ -184,7 +183,7 @@ class FederationSender(object): # Otherwise if the last member on a server in a room is # banned then it won't receive the event because it won't # be in the room after the ban. - destinations = yield self.state.get_hosts_in_room_at_events( + destinations = await self.state.get_hosts_in_room_at_events( event.room_id, event_ids=event.prev_event_ids() ) except Exception: @@ -206,17 +205,16 @@ class FederationSender(object): self._send_pdu(event, destinations) - @defer.inlineCallbacks - def handle_room_events(events): + async def handle_room_events(events: Iterable[EventBase]) -> None: with Measure(self.clock, "handle_room_events"): for event in events: - yield handle_event(event) + await handle_event(event) - events_by_room = {} + events_by_room = {} # type: Dict[str, List[EventBase]] for event in events: events_by_room.setdefault(event.room_id, []).append(event) - yield make_deferred_yieldable( + await make_deferred_yieldable( defer.gatherResults( [ run_in_background(handle_room_events, evs) @@ -226,11 +224,11 @@ class FederationSender(object): ) ) - yield self.store.update_federation_out_pos("events", next_token) + await self.store.update_federation_out_pos("events", next_token) if events: now = self.clock.time_msec() - ts = yield self.store.get_received_ts(events[-1].event_id) + ts = await self.store.get_received_ts(events[-1].event_id) synapse.metrics.event_processing_lag.labels( "federation_sender" @@ -254,7 +252,7 @@ class FederationSender(object): finally: self._is_processing = False - def _send_pdu(self, pdu, destinations): + def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -276,11 +274,11 @@ class FederationSender(object): self._get_per_destination_queue(destination).send_pdu(pdu, order) @defer.inlineCallbacks - def send_read_receipt(self, receipt): + def send_read_receipt(self, receipt: ReadReceipt): """Send a RR to any other servers in the room Args: - receipt (synapse.types.ReadReceipt): receipt to be sent + receipt: receipt to be sent """ # Some background on the rate-limiting going on here. @@ -343,7 +341,7 @@ class FederationSender(object): else: queue.flush_read_receipts_for_room(room_id) - def _schedule_rr_flush_for_room(self, room_id, n_domains): + def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None: # that is going to cause approximately len(domains) transactions, so now back # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM backoff_ms = self._rr_txn_interval_per_room_ms * n_domains @@ -352,7 +350,7 @@ class FederationSender(object): self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id) self._queues_awaiting_rr_flush_by_room[room_id] = set() - def _flush_rrs_for_room(self, room_id): + def _flush_rrs_for_room(self, room_id: str) -> None: queues = self._queues_awaiting_rr_flush_by_room.pop(room_id) logger.debug("Flushing RRs in %s to %s", room_id, queues) @@ -368,14 +366,11 @@ class FederationSender(object): @preserve_fn # the caller should not yield on this @defer.inlineCallbacks - def send_presence(self, states): + def send_presence(self, states: List[UserPresenceState]): """Send the new presence states to the appropriate destinations. This actually queues up the presence states ready for sending and triggers a background task to process them and send out the transactions. - - Args: - states (list(UserPresenceState)) """ if not self.hs.config.use_presence: # No-op if presence is disabled. @@ -412,11 +407,10 @@ class FederationSender(object): finally: self._processing_pending_presence = False - def send_presence_to_destinations(self, states, destinations): + def send_presence_to_destinations( + self, states: List[UserPresenceState], destinations: List[str] + ) -> None: """Send the given presence states to the given destinations. - - Args: - states (list[UserPresenceState]) destinations (list[str]) """ @@ -431,12 +425,9 @@ class FederationSender(object): @measure_func("txnqueue._process_presence") @defer.inlineCallbacks - def _process_presence_inner(self, states): + def _process_presence_inner(self, states: List[UserPresenceState]): """Given a list of states populate self.pending_presence_by_dest and poke to send a new transaction to each destination - - Args: - states (list(UserPresenceState)) """ hosts_and_states = yield get_interested_remotes(self.store, states, self.state) @@ -446,14 +437,20 @@ class FederationSender(object): continue self._get_per_destination_queue(destination).send_presence(states) - def build_and_send_edu(self, destination, edu_type, content, key=None): + def build_and_send_edu( + self, + destination: str, + edu_type: str, + content: dict, + key: Optional[Hashable] = None, + ): """Construct an Edu object, and queue it for sending Args: - destination (str): name of server to send to - edu_type (str): type of EDU to send - content (dict): content of EDU - key (Any|None): clobbering key for this edu + destination: name of server to send to + edu_type: type of EDU to send + content: content of EDU + key: clobbering key for this edu """ if destination == self.server_name: logger.info("Not sending EDU to ourselves") @@ -468,12 +465,12 @@ class FederationSender(object): self.send_edu(edu, key) - def send_edu(self, edu, key): + def send_edu(self, edu: Edu, key: Optional[Hashable]): """Queue an EDU for sending Args: - edu (Edu): edu to send - key (Any|None): clobbering key for this edu + edu: edu to send + key: clobbering key for this edu """ queue = self._get_per_destination_queue(edu.destination) if key: @@ -481,7 +478,7 @@ class FederationSender(object): else: queue.send_edu(edu) - def send_device_messages(self, destination): + def send_device_messages(self, destination: str): if destination == self.server_name: logger.warning("Not sending device update to ourselves") return @@ -501,5 +498,5 @@ class FederationSender(object): self._get_per_destination_queue(destination).attempt_new_transaction() - def get_current_token(self): + def get_current_token(self) -> int: return 0 diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 5012aaea35..e13cd20ffa 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -15,11 +15,11 @@ # limitations under the License. import datetime import logging +from typing import Dict, Hashable, Iterable, List, Tuple from prometheus_client import Counter -from twisted.internet import defer - +import synapse.server from synapse.api.errors import ( FederationDeniedError, HttpResponseException, @@ -31,7 +31,7 @@ from synapse.handlers.presence import format_user_presence_state from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.presence import UserPresenceState -from synapse.types import StateMap +from synapse.types import ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter # This is defined in the Matrix spec and enforced by the receiver. @@ -56,13 +56,18 @@ class PerDestinationQueue(object): Manages the per-destination transmission queues. Args: - hs (synapse.HomeServer): - transaction_sender (TransactionManager): - destination (str): the server_name of the destination that we are managing + hs + transaction_sender + destination: the server_name of the destination that we are managing transmission for. """ - def __init__(self, hs, transaction_manager, destination): + def __init__( + self, + hs: "synapse.server.HomeServer", + transaction_manager: "synapse.federation.sender.TransactionManager", + destination: str, + ): self._server_name = hs.hostname self._clock = hs.get_clock() self._store = hs.get_datastore() @@ -72,20 +77,20 @@ class PerDestinationQueue(object): self.transmission_loop_running = False # a list of tuples of (pending pdu, order) - self._pending_pdus = [] # type: list[tuple[EventBase, int]] - self._pending_edus = [] # type: list[Edu] + self._pending_pdus = [] # type: List[Tuple[EventBase, int]] + self._pending_edus = [] # type: List[Edu] # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered # based on their key (e.g. typing events by room_id) # Map of (edu_type, key) -> Edu - self._pending_edus_keyed = {} # type: StateMap[Edu] + self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu] # Map of user_id -> UserPresenceState of pending presence to be sent to this # destination - self._pending_presence = {} # type: dict[str, UserPresenceState] + self._pending_presence = {} # type: Dict[str, UserPresenceState] # room_id -> receipt_type -> user_id -> receipt_dict - self._pending_rrs = {} + self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]] self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -95,50 +100,50 @@ class PerDestinationQueue(object): # stream_id of last successfully sent device list update. self._last_device_list_stream_id = 0 - def __str__(self): + def __str__(self) -> str: return "PerDestinationQueue[%s]" % self._destination - def pending_pdu_count(self): + def pending_pdu_count(self) -> int: return len(self._pending_pdus) - def pending_edu_count(self): + def pending_edu_count(self) -> int: return ( len(self._pending_edus) + len(self._pending_presence) + len(self._pending_edus_keyed) ) - def send_pdu(self, pdu, order): + def send_pdu(self, pdu: EventBase, order: int) -> None: """Add a PDU to the queue, and start the transmission loop if neccessary Args: - pdu (EventBase): pdu to send - order (int): + pdu: pdu to send + order """ self._pending_pdus.append((pdu, order)) self.attempt_new_transaction() - def send_presence(self, states): + def send_presence(self, states: Iterable[UserPresenceState]) -> None: """Add presence updates to the queue. Start the transmission loop if neccessary. Args: - states (iterable[UserPresenceState]): presence to send + states: presence to send """ self._pending_presence.update({state.user_id: state for state in states}) self.attempt_new_transaction() - def queue_read_receipt(self, receipt): + def queue_read_receipt(self, receipt: ReadReceipt) -> None: """Add a RR to the list to be sent. Doesn't start the transmission loop yet (see flush_read_receipts_for_room) Args: - receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued + receipt: receipt to be queued """ self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( receipt.receipt_type, {} )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} - def flush_read_receipts_for_room(self, room_id): + def flush_read_receipts_for_room(self, room_id: str) -> None: # if we don't have any read-receipts for this room, it may be that we've already # sent them out, so we don't need to flush. if room_id not in self._pending_rrs: @@ -146,15 +151,15 @@ class PerDestinationQueue(object): self._rrs_pending_flush = True self.attempt_new_transaction() - def send_keyed_edu(self, edu, key): + def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu self.attempt_new_transaction() - def send_edu(self, edu): + def send_edu(self, edu) -> None: self._pending_edus.append(edu) self.attempt_new_transaction() - def attempt_new_transaction(self): + def attempt_new_transaction(self) -> None: """Try to start a new transaction to this destination If there is already a transaction in progress to this destination, @@ -177,23 +182,22 @@ class PerDestinationQueue(object): self._transaction_transmission_loop, ) - @defer.inlineCallbacks - def _transaction_transmission_loop(self): - pending_pdus = [] + async def _transaction_transmission_loop(self) -> None: + pending_pdus = [] # type: List[Tuple[EventBase, int]] try: self.transmission_loop_running = True # This will throw if we wouldn't retry. We do this here so we fail # quickly, but we will later check this again in the http client, # hence why we throw the result away. - yield get_retry_limiter(self._destination, self._clock, self._store) + await get_retry_limiter(self._destination, self._clock, self._store) pending_pdus = [] while True: # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 - device_update_edus, dev_list_id = yield self._get_device_update_edus( + device_update_edus, dev_list_id = await self._get_device_update_edus( limit ) @@ -202,7 +206,7 @@ class PerDestinationQueue(object): ( to_device_edus, device_stream_id, - ) = yield self._get_to_device_message_edus(limit) + ) = await self._get_to_device_message_edus(limit) pending_edus = device_update_edus + to_device_edus @@ -269,7 +273,7 @@ class PerDestinationQueue(object): # END CRITICAL SECTION - success = yield self._transaction_manager.send_new_transaction( + success = await self._transaction_manager.send_new_transaction( self._destination, pending_pdus, pending_edus ) if success: @@ -280,7 +284,7 @@ class PerDestinationQueue(object): # Remove the acknowledged device messages from the database # Only bother if we actually sent some device messages if to_device_edus: - yield self._store.delete_device_msgs_for_remote( + await self._store.delete_device_msgs_for_remote( self._destination, device_stream_id ) @@ -289,7 +293,7 @@ class PerDestinationQueue(object): logger.info( "Marking as sent %r %r", self._destination, dev_list_id ) - yield self._store.mark_as_sent_devices_by_remote( + await self._store.mark_as_sent_devices_by_remote( self._destination, dev_list_id ) @@ -334,7 +338,7 @@ class PerDestinationQueue(object): # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False - def _get_rr_edus(self, force_flush): + def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: if not self._pending_rrs: return if not force_flush and not self._rrs_pending_flush: @@ -351,17 +355,16 @@ class PerDestinationQueue(object): self._rrs_pending_flush = False yield edu - def _pop_pending_edus(self, limit): + def _pop_pending_edus(self, limit: int) -> List[Edu]: pending_edus = self._pending_edus pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:] return pending_edus - @defer.inlineCallbacks - def _get_device_update_edus(self, limit): + async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]: last_device_list = self._last_device_list_stream_id # Retrieve list of new device updates to send to the destination - now_stream_id, results = yield self._store.get_device_updates_by_remote( + now_stream_id, results = await self._store.get_device_updates_by_remote( self._destination, last_device_list, limit=limit ) edus = [ @@ -378,11 +381,10 @@ class PerDestinationQueue(object): return (edus, now_stream_id) - @defer.inlineCallbacks - def _get_to_device_message_edus(self, limit): + async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]: last_device_stream_id = self._last_device_stream_id to_device_stream_id = self._store.get_to_device_stream_token() - contents, stream_id = yield self._store.get_new_device_msgs_for_remote( + contents, stream_id = await self._store.get_new_device_msgs_for_remote( self._destination, last_device_stream_id, to_device_stream_id, limit ) edus = [ diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 5fed626d5b..3c2a02a3b3 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -13,14 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List from canonicaljson import json -from twisted.internet import defer - +import synapse.server from synapse.api.errors import HttpResponseException +from synapse.events import EventBase from synapse.federation.persistence import TransactionActions -from synapse.federation.units import Transaction +from synapse.federation.units import Edu, Transaction from synapse.logging.opentracing import ( extract_text_map, set_tag, @@ -39,7 +40,7 @@ class TransactionManager(object): shared between PerDestinationQueue objects """ - def __init__(self, hs): + def __init__(self, hs: "synapse.server.HomeServer"): self._server_name = hs.hostname self.clock = hs.get_clock() # nb must be called this for @measure_func self._store = hs.get_datastore() @@ -50,8 +51,9 @@ class TransactionManager(object): self._next_txn_id = int(self.clock.time_msec()) @measure_func("_send_new_transaction") - @defer.inlineCallbacks - def send_new_transaction(self, destination, pending_pdus, pending_edus): + async def send_new_transaction( + self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu] + ): # Make a transaction-sending opentracing span. This span follows on from # all the edus in that transaction. This needs to be done since there is @@ -127,7 +129,7 @@ class TransactionManager(object): return data try: - response = yield self._transport_layer.send_transaction( + response = await self._transport_layer.send_transaction( transaction, json_data_cb ) code = 200 diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 125eadd796..92a9ae2320 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -158,7 +158,7 @@ class Authenticator(object): origin, json_request, now, "Incoming request" ) - logger.info("Request from %s", origin) + logger.debug("Request from %s", origin) request.authenticated_entity = origin # If we get a valid signed request from the other side, its probably @@ -579,7 +579,7 @@ class FederationV1InviteServlet(BaseFederationServlet): # state resolution algorithm, and we don't use that for processing # invites content = await self.handler.on_invite_request( - origin, content, room_version=RoomVersions.V1.identifier + origin, content, room_version_id=RoomVersions.V1.identifier ) # V1 federation API is defined to return a content of `[200, {...}]` @@ -606,7 +606,7 @@ class FederationV2InviteServlet(BaseFederationServlet): event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state content = await self.handler.on_invite_request( - origin, event, room_version=room_version + origin, event, room_version_id=room_version ) return 200, content diff --git a/synapse/federation/units.py b/synapse/federation/units.py index b4d743cde7..6b32e0dcbf 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -19,11 +19,15 @@ server protocol. import logging +import attr + +from synapse.types import JsonDict from synapse.util.jsonobject import JsonEncodedObject logger = logging.getLogger(__name__) +@attr.s(slots=True) class Edu(JsonEncodedObject): """ An Edu represents a piece of data sent from one homeserver to another. @@ -32,11 +36,24 @@ class Edu(JsonEncodedObject): internal ID or previous references graph. """ - valid_keys = ["origin", "destination", "edu_type", "content"] + edu_type = attr.ib(type=str) + content = attr.ib(type=dict) + origin = attr.ib(type=str) + destination = attr.ib(type=str) - required_keys = ["edu_type"] + def get_dict(self) -> JsonDict: + return { + "edu_type": self.edu_type, + "content": self.content, + } - internal_keys = ["origin", "destination"] + def get_internal_dict(self) -> JsonDict: + return { + "edu_type": self.edu_type, + "content": self.content, + "origin": self.origin, + "destination": self.destination, + } def get_context(self): return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}") |