diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 328 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 84 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 14 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 5 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 79 |
5 files changed, 356 insertions, 154 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 62d7ed13cf..c9f3c2d352 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -25,7 +25,7 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.api.constants import Membership +from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership from synapse.api.errors import ( CodeMessageException, FederationDeniedError, @@ -48,6 +48,13 @@ sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["t PDU_RETRY_TIME_MS = 1 * 60 * 1000 +class InvalidResponseError(RuntimeError): + """Helper for _try_destination_list: indicates that the server returned a response + we couldn't parse + """ + pass + + class FederationClient(FederationBase): def __init__(self, hs): super(FederationClient, self).__init__(hs) @@ -458,8 +465,63 @@ class FederationClient(FederationBase): defer.returnValue(signed_auth) @defer.inlineCallbacks + def _try_destination_list(self, description, destinations, callback): + """Try an operation on a series of servers, until it succeeds + + Args: + description (unicode): description of the operation we're doing, for logging + + destinations (Iterable[unicode]): list of server_names to try + + callback (callable): Function to run for each server. Passed a single + argument: the server_name to try. May return a deferred. + + If the callback raises a CodeMessageException with a 300/400 code, + attempts to perform the operation stop immediately and the exception is + reraised. + + Otherwise, if the callback raises an Exception the error is logged and the + next server tried. Normally the stacktrace is logged but this is + suppressed if the exception is an InvalidResponseError. + + Returns: + The [Deferred] result of callback, if it succeeds + + Raises: + SynapseError if the chosen remote server returns a 300/400 code. + + RuntimeError if no servers were reachable. + """ + for destination in destinations: + if destination == self.server_name: + continue + + try: + res = yield callback(destination) + defer.returnValue(res) + except InvalidResponseError as e: + logger.warn( + "Failed to %s via %s: %s", + description, destination, e, + ) + except HttpResponseException as e: + if not 500 <= e.code < 600: + raise e.to_synapse_error() + else: + logger.warn( + "Failed to %s via %s: %i %s", + description, destination, e.code, e.message, + ) + except Exception: + logger.warn( + "Failed to %s via %s", + description, destination, exc_info=1, + ) + + raise RuntimeError("Failed to %s via any server" % (description, )) + def make_membership_event(self, destinations, room_id, user_id, membership, - content={},): + content, params): """ Creates an m.room.member event, with context, without participating in the room. @@ -475,13 +537,15 @@ class FederationClient(FederationBase): user_id (str): The user whose membership is being evented. membership (str): The "membership" property of the event. Must be one of "join" or "leave". - content (object): Any additional data to put into the content field + content (dict): Any additional data to put into the content field of the event. + params (dict[str, str|Iterable[str]]): Query parameters to include in the + request. Return: Deferred: resolves to a tuple of (origin (str), event (object)) where origin is the remote homeserver which generated the event. - Fails with a ``CodeMessageException`` if the chosen remote server + Fails with a ``SynapseError`` if the chosen remote server returns a 300/400 code. Fails with a ``RuntimeError`` if no servers were reachable. @@ -492,50 +556,37 @@ class FederationClient(FederationBase): "make_membership_event called with membership='%s', must be one of %s" % (membership, ",".join(valid_memberships)) ) - for destination in destinations: - if destination == self.server_name: - continue - try: - ret = yield self.transport_layer.make_membership_event( - destination, room_id, user_id, membership - ) + @defer.inlineCallbacks + def send_request(destination): + ret = yield self.transport_layer.make_membership_event( + destination, room_id, user_id, membership, params, + ) - pdu_dict = ret["event"] + pdu_dict = ret.get("event", None) + if not isinstance(pdu_dict, dict): + raise InvalidResponseError("Bad 'event' field in response") - logger.debug("Got response to make_%s: %s", membership, pdu_dict) + logger.debug("Got response to make_%s: %s", membership, pdu_dict) - pdu_dict["content"].update(content) + pdu_dict["content"].update(content) - # The protoevent received over the JSON wire may not have all - # the required fields. Lets just gloss over that because - # there's some we never care about - if "prev_state" not in pdu_dict: - pdu_dict["prev_state"] = [] + # The protoevent received over the JSON wire may not have all + # the required fields. Lets just gloss over that because + # there's some we never care about + if "prev_state" not in pdu_dict: + pdu_dict["prev_state"] = [] - ev = builder.EventBuilder(pdu_dict) + ev = builder.EventBuilder(pdu_dict) - defer.returnValue( - (destination, ev) - ) - break - except CodeMessageException as e: - if not 500 <= e.code < 600: - raise - else: - logger.warn( - "Failed to make_%s via %s: %s", - membership, destination, e.message - ) - except Exception as e: - logger.warn( - "Failed to make_%s via %s: %s", - membership, destination, e.message - ) + defer.returnValue( + (destination, ev) + ) - raise RuntimeError("Failed to send to any server.") + return self._try_destination_list( + "make_" + membership, destinations, send_request, + ) - @defer.inlineCallbacks def send_join(self, destinations, pdu): """Sends a join event to one of a list of homeservers. @@ -552,103 +603,111 @@ class FederationClient(FederationBase): giving the serer the event was sent to, ``state`` (?) and ``auth_chain``. - Fails with a ``CodeMessageException`` if the chosen remote server + Fails with a ``SynapseError`` if the chosen remote server returns a 300/400 code. Fails with a ``RuntimeError`` if no servers were reachable. """ - for destination in destinations: - if destination == self.server_name: - continue - - try: - time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_join( - destination=destination, - room_id=pdu.room_id, - event_id=pdu.event_id, - content=pdu.get_pdu_json(time_now), + def check_authchain_validity(signed_auth_chain): + for e in signed_auth_chain: + if e.type == EventTypes.Create: + create_event = e + break + else: + raise InvalidResponseError( + "no %s in auth chain" % (EventTypes.Create,), ) - logger.debug("Got content: %s", content) + # the room version should be sane. + room_version = create_event.content.get("room_version", "1") + if room_version not in KNOWN_ROOM_VERSIONS: + # This shouldn't be possible, because the remote server should have + # rejected the join attempt during make_join. + raise InvalidResponseError( + "room appears to have unsupported version %s" % ( + room_version, + )) + + @defer.inlineCallbacks + def send_request(destination): + time_now = self._clock.time_msec() + _, content = yield self.transport_layer.send_join( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) - state = [ - event_from_pdu_json(p, outlier=True) - for p in content.get("state", []) - ] + logger.debug("Got content: %s", content) - auth_chain = [ - event_from_pdu_json(p, outlier=True) - for p in content.get("auth_chain", []) - ] + state = [ + event_from_pdu_json(p, outlier=True) + for p in content.get("state", []) + ] - pdus = { - p.event_id: p - for p in itertools.chain(state, auth_chain) - } + auth_chain = [ + event_from_pdu_json(p, outlier=True) + for p in content.get("auth_chain", []) + ] - valid_pdus = yield self._check_sigs_and_hash_and_fetch( - destination, list(pdus.values()), - outlier=True, - ) + pdus = { + p.event_id: p + for p in itertools.chain(state, auth_chain) + } - valid_pdus_map = { - p.event_id: p - for p in valid_pdus - } - - # NB: We *need* to copy to ensure that we don't have multiple - # references being passed on, as that causes... issues. - signed_state = [ - copy.copy(valid_pdus_map[p.event_id]) - for p in state - if p.event_id in valid_pdus_map - ] + valid_pdus = yield self._check_sigs_and_hash_and_fetch( + destination, list(pdus.values()), + outlier=True, + ) - signed_auth = [ - valid_pdus_map[p.event_id] - for p in auth_chain - if p.event_id in valid_pdus_map - ] + valid_pdus_map = { + p.event_id: p + for p in valid_pdus + } - # NB: We *need* to copy to ensure that we don't have multiple - # references being passed on, as that causes... issues. - for s in signed_state: - s.internal_metadata = copy.deepcopy(s.internal_metadata) + # NB: We *need* to copy to ensure that we don't have multiple + # references being passed on, as that causes... issues. + signed_state = [ + copy.copy(valid_pdus_map[p.event_id]) + for p in state + if p.event_id in valid_pdus_map + ] - auth_chain.sort(key=lambda e: e.depth) + signed_auth = [ + valid_pdus_map[p.event_id] + for p in auth_chain + if p.event_id in valid_pdus_map + ] - defer.returnValue({ - "state": signed_state, - "auth_chain": signed_auth, - "origin": destination, - }) - except CodeMessageException as e: - if not 500 <= e.code < 600: - raise - else: - logger.exception( - "Failed to send_join via %s: %s", - destination, e.message - ) - except Exception as e: - logger.exception( - "Failed to send_join via %s: %s", - destination, e.message - ) + # NB: We *need* to copy to ensure that we don't have multiple + # references being passed on, as that causes... issues. + for s in signed_state: + s.internal_metadata = copy.deepcopy(s.internal_metadata) - raise RuntimeError("Failed to send to any server.") + check_authchain_validity(signed_auth) + + defer.returnValue({ + "state": signed_state, + "auth_chain": signed_auth, + "origin": destination, + }) + return self._try_destination_list("send_join", destinations, send_request) @defer.inlineCallbacks def send_invite(self, destination, room_id, event_id, pdu): time_now = self._clock.time_msec() - code, content = yield self.transport_layer.send_invite( - destination=destination, - room_id=room_id, - event_id=event_id, - content=pdu.get_pdu_json(time_now), - ) + try: + code, content = yield self.transport_layer.send_invite( + destination=destination, + room_id=room_id, + event_id=event_id, + content=pdu.get_pdu_json(time_now), + ) + except HttpResponseException as e: + if e.code == 403: + raise e.to_synapse_error() + raise pdu_dict = content["event"] @@ -663,7 +722,6 @@ class FederationClient(FederationBase): defer.returnValue(pdu) - @defer.inlineCallbacks def send_leave(self, destinations, pdu): """Sends a leave event to one of a list of homeservers. @@ -680,35 +738,25 @@ class FederationClient(FederationBase): Return: Deferred: resolves to None. - Fails with a ``CodeMessageException`` if the chosen remote server - returns a non-200 code. + Fails with a ``SynapseError`` if the chosen remote server + returns a 300/400 code. Fails with a ``RuntimeError`` if no servers were reachable. """ - for destination in destinations: - if destination == self.server_name: - continue - - try: - time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_leave( - destination=destination, - room_id=pdu.room_id, - event_id=pdu.event_id, - content=pdu.get_pdu_json(time_now), - ) + @defer.inlineCallbacks + def send_request(destination): + time_now = self._clock.time_msec() + _, content = yield self.transport_layer.send_leave( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) - logger.debug("Got content: %s", content) - defer.returnValue(None) - except CodeMessageException: - raise - except Exception as e: - logger.exception( - "Failed to send_leave via %s: %s", - destination, e.message - ) + logger.debug("Got content: %s", content) + defer.returnValue(None) - raise RuntimeError("Failed to send to any server.") + return self._try_destination_list("send_leave", destinations, send_request) def get_public_rooms(self, destination, limit=None, since_token=None, search_filter=None, include_all_networks=False, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 657935d1ac..3e0cd294a1 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -27,14 +27,24 @@ from twisted.internet.abstract import isIPAddress from twisted.python import failure from synapse.api.constants import EventTypes -from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError +from synapse.api.errors import ( + AuthError, + FederationError, + IncompatibleRoomVersionError, + NotFoundError, + SynapseError, +) from synapse.crypto.event_signing import compute_event_signature from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name +from synapse.replication.http.federation import ( + ReplicationFederationSendEduRestServlet, + ReplicationGetQueryRestServlet, +) from synapse.types import get_domain_from_id -from synapse.util import async +from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache from synapse.util.logutils import log_function @@ -61,8 +71,8 @@ class FederationServer(FederationBase): self.auth = hs.get_auth() self.handler = hs.get_handlers().federation_handler - self._server_linearizer = async.Linearizer("fed_server") - self._transaction_linearizer = async.Linearizer("fed_txn_handler") + self._server_linearizer = Linearizer("fed_server") + self._transaction_linearizer = Linearizer("fed_txn_handler") self.transaction_actions = TransactionActions(self.store) @@ -194,7 +204,7 @@ class FederationServer(FederationBase): event_id, f.getTraceback().rstrip(), ) - yield async.concurrently_execute( + yield concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT, ) @@ -323,12 +333,21 @@ class FederationServer(FederationBase): defer.returnValue((200, resp)) @defer.inlineCallbacks - def on_make_join_request(self, origin, room_id, user_id): + def on_make_join_request(self, origin, room_id, user_id, supported_versions): origin_host, _ = parse_server_name(origin) yield self.check_server_matches_acl(origin_host, room_id) + + room_version = yield self.store.get_room_version(room_id) + if room_version not in supported_versions: + logger.warn("Room version %s not in %s", room_version, supported_versions) + raise IncompatibleRoomVersionError(room_version=room_version) + pdu = yield self.handler.on_make_join_request(room_id, user_id) time_now = self._clock.time_msec() - defer.returnValue({"event": pdu.get_pdu_json(time_now)}) + defer.returnValue({ + "event": pdu.get_pdu_json(time_now), + "room_version": room_version, + }) @defer.inlineCallbacks def on_invite_request(self, origin, content): @@ -426,6 +445,7 @@ class FederationServer(FederationBase): ret = yield self.handler.on_query_auth( origin, event_id, + room_id, signed_auth, content.get("rejects", []), content.get("missing", []), @@ -744,6 +764,8 @@ class FederationHandlerRegistry(object): if edu_type in self.edu_handlers: raise KeyError("Already have an EDU handler for %s" % (edu_type,)) + logger.info("Registering federation EDU handler for %r", edu_type) + self.edu_handlers[edu_type] = handler def register_query_handler(self, query_type, handler): @@ -762,6 +784,8 @@ class FederationHandlerRegistry(object): "Already have a Query handler for %s" % (query_type,) ) + logger.info("Registering federation query handler for %r", query_type) + self.query_handlers[query_type] = handler @defer.inlineCallbacks @@ -784,3 +808,49 @@ class FederationHandlerRegistry(object): raise NotFoundError("No handler for Query type '%s'" % (query_type,)) return handler(args) + + +class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): + """A FederationHandlerRegistry for worker processes. + + When receiving EDU or queries it will check if an appropriate handler has + been registered on the worker, if there isn't one then it calls off to the + master process. + """ + + def __init__(self, hs): + self.config = hs.config + self.http_client = hs.get_simple_http_client() + self.clock = hs.get_clock() + + self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) + self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) + + super(ReplicationFederationHandlerRegistry, self).__init__() + + def on_edu(self, edu_type, origin, content): + """Overrides FederationHandlerRegistry + """ + handler = self.edu_handlers.get(edu_type) + if handler: + return super(ReplicationFederationHandlerRegistry, self).on_edu( + edu_type, origin, content, + ) + + return self._send_edu( + edu_type=edu_type, + origin=origin, + content=content, + ) + + def on_query(self, query_type, args): + """Overrides FederationHandlerRegistry + """ + handler = self.query_handlers.get(query_type) + if handler: + return handler(args) + + return self._get_query_client( + query_type=query_type, + args=args, + ) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 78f9d40a3a..94d7423d01 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,8 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException from synapse.handlers.presence import format_user_presence_state, get_interested_remotes from synapse.metrics import ( LaterGauge, + event_processing_loop_counter, + event_processing_loop_room_count, events_processed_counter, sent_edus_counter, sent_transactions_counter, @@ -56,6 +58,7 @@ class TransactionQueue(object): """ def __init__(self, hs): + self.hs = hs self.server_name = hs.hostname self.store = hs.get_datastore() @@ -253,7 +256,13 @@ class TransactionQueue(object): synapse.metrics.event_processing_last_ts.labels( "federation_sender").set(ts) - events_processed_counter.inc(len(events)) + events_processed_counter.inc(len(events)) + + event_processing_loop_room_count.labels( + "federation_sender" + ).inc(len(events_by_room)) + + event_processing_loop_counter.labels("federation_sender").inc() synapse.metrics.event_processing_positions.labels( "federation_sender").set(next_token) @@ -300,6 +309,9 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ + if not self.hs.config.use_presence: + # No-op if presence is disabled. + return # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 625b4def9b..1054441ca5 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -195,7 +195,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def make_membership_event(self, destination, room_id, user_id, membership): + def make_membership_event(self, destination, room_id, user_id, membership, params): """Asks a remote server to build and sign us a membership event Note that this does not append any events to any graphs. @@ -205,6 +205,8 @@ class TransportLayerClient(object): room_id (str): room to join/leave user_id (str): user to be joined/left membership (str): one of join/leave + params (dict[str, str|Iterable[str]]): Query parameters to include in the + request. Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result @@ -241,6 +243,7 @@ class TransportLayerClient(object): content = yield self.client.get_json( destination=destination, path=path, + args=params, retry_on_dns_fail=retry_on_dns_fail, timeout=20000, ignore_backoff=ignore_backoff, diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 3b5ea9515a..7a993fd1cf 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -165,7 +165,7 @@ def _parse_auth_header(header_bytes): param_dict = dict(kv.split("=") for kv in params) def strip_quotes(value): - if value.startswith(b"\""): + if value.startswith("\""): return value[1:-1] else: return value @@ -190,6 +190,41 @@ def _parse_auth_header(header_bytes): class BaseFederationServlet(object): + """Abstract base class for federation servlet classes. + + The servlet object should have a PATH attribute which takes the form of a regexp to + match against the request path (excluding the /federation/v1 prefix). + + The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match + the appropriate HTTP method. These methods have the signature: + + on_<METHOD>(self, origin, content, query, **kwargs) + + With arguments: + + origin (unicode|None): The authenticated server_name of the calling server, + unless REQUIRE_AUTH is set to False and authentication failed. + + content (unicode|None): decoded json body of the request. None if the + request was a GET. + + query (dict[bytes, list[bytes]]): Query params from the request. url-decoded + (ie, '+' and '%xx' are decoded) but note that it is *not* utf8-decoded + yet. + + **kwargs (dict[unicode, unicode]): the dict mapping keys to path + components as specified in the path match regexp. + + Returns: + Deferred[(int, object)|None]: either (response code, response object) to + return a JSON response, or None if the request has already been handled. + + Raises: + SynapseError: to return an error code + + Exception: other exceptions will be caught, logged, and a 500 will be + returned. + """ REQUIRE_AUTH = True def __init__(self, handler, authenticator, ratelimiter, server_name): @@ -204,6 +239,18 @@ class BaseFederationServlet(object): @defer.inlineCallbacks @functools.wraps(func) def new_func(request, *args, **kwargs): + """ A callback which can be passed to HttpServer.RegisterPaths + + Args: + request (twisted.web.http.Request): + *args: unused? + **kwargs (dict[unicode, unicode]): the dict mapping keys to path + components as specified in the path match regexp. + + Returns: + Deferred[(int, object)|None]: (response code, response object) as returned + by the callback method. None if the request has already been handled. + """ content = None if request.method in ["PUT", "POST"]: # TODO: Handle other method types? other content types? @@ -214,10 +261,10 @@ class BaseFederationServlet(object): except NoAuthenticationError: origin = None if self.REQUIRE_AUTH: - logger.exception("authenticate_request failed") + logger.warn("authenticate_request failed: missing authentication") raise - except Exception: - logger.exception("authenticate_request failed") + except Exception as e: + logger.warn("authenticate_request failed: %s", e) raise if origin: @@ -384,9 +431,31 @@ class FederationMakeJoinServlet(BaseFederationServlet): PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)" @defer.inlineCallbacks - def on_GET(self, origin, content, query, context, user_id): + def on_GET(self, origin, _content, query, context, user_id): + """ + Args: + origin (unicode): The authenticated server_name of the calling server + + _content (None): (GETs don't have bodies) + + query (dict[bytes, list[bytes]]): Query params from the request. + + **kwargs (dict[unicode, unicode]): the dict mapping keys to path + components as specified in the path match regexp. + + Returns: + Deferred[(int, object)|None]: either (response code, response object) to + return a JSON response, or None if the request has already been handled. + """ + versions = query.get(b'ver') + if versions is not None: + supported_versions = [v.decode("utf-8") for v in versions] + else: + supported_versions = ["1"] + content = yield self.handler.on_make_join_request( origin, context, user_id, + supported_versions=supported_versions, ) defer.returnValue((200, content)) |