# -*- coding: utf-8 -*- # Copyright 2014, 2015 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.util.logutils import log_function import logging import simplejson as json import re logger = logging.getLogger(__name__) class TransportLayerServer(object): """Handles incoming federation HTTP requests""" @defer.inlineCallbacks def _authenticate_request(self, request): json_request = { "method": request.method, "uri": request.uri, "destination": self.server_name, "signatures": {}, } content = None origin = None if request.method in ["PUT", "POST"]: # TODO: Handle other method types? other content types? try: content_bytes = request.content.read() content = json.loads(content_bytes) json_request["content"] = content except: raise SynapseError(400, "Unable to parse JSON", Codes.BAD_JSON) def parse_auth_header(header_str): try: params = auth.split(" ")[1].split(",") param_dict = dict(kv.split("=") for kv in params) def strip_quotes(value): if value.startswith("\""): return value[1:-1] else: return value origin = strip_quotes(param_dict["origin"]) key = strip_quotes(param_dict["key"]) sig = strip_quotes(param_dict["sig"]) return (origin, key, sig) except: raise SynapseError( 400, "Malformed Authorization header", Codes.UNAUTHORIZED ) auth_headers = request.requestHeaders.getRawHeaders(b"Authorization") if not auth_headers: raise SynapseError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, ) for auth in auth_headers: if auth.startswith("X-Matrix"): (origin, key, sig) = parse_auth_header(auth) json_request["origin"] = origin json_request["signatures"].setdefault(origin, {})[key] = sig if not json_request["signatures"]: raise SynapseError( 401, "Missing Authorization headers", Codes.UNAUTHORIZED, ) yield self.keyring.verify_json_for_server(origin, json_request) defer.returnValue((origin, content)) def _with_authentication(self, handler): @defer.inlineCallbacks def new_handler(request, *args, **kwargs): try: (origin, content) = yield self._authenticate_request(request) with self.ratelimiter.ratelimit(origin) as d: yield d response = yield handler( origin, content, request.args, *args, **kwargs ) except: logger.exception("_authenticate_request failed") raise defer.returnValue(response) return new_handler @log_function def register_received_handler(self, handler): """ Register a handler that will be fired when we receive data. Args: handler (TransportReceivedHandler) """ FederationSendServlet( handler, self._with_authentication, self.server_name ).register(self.server) @log_function def register_request_handler(self, handler): """ Register a handler that will be fired when we get asked for data. Args: handler (TransportRequestHandler) """ for servletclass in ( FederationPullServlet, FederationEventServlet, FederationStateServlet, FederationBackfillServlet, FederationQueryServlet, FederationMakeJoinServlet, FederationEventServlet, FederationSendJoinServlet, FederationInviteServlet, FederationQueryAuthServlet, FederationGetMissingEventsServlet, ): servletclass(handler, self._with_authentication).register(self.server) class BaseFederationServlet(object): def __init__(self, handler, wrapper): self.handler = handler self.wrapper = wrapper def register(self, server): pattern = re.compile("^" + PREFIX + self.PATH) for method in ("GET", "PUT", "POST"): code = getattr(self, "on_%s" % (method), None) if code is None: continue server.register_path(method, pattern, self.wrapper(code)) class FederationSendServlet(BaseFederationServlet): PATH = "/send/([^/]*)/$" def __init__(self, handler, wrapper, server_name): super(FederationSendServlet, self).__init__(handler, wrapper) self.server_name = server_name # This is when someone is trying to send us a bunch of data. @defer.inlineCallbacks def on_PUT(self, origin, content, query, transaction_id): """ Called on PUT /send// Args: request (twisted.web.http.Request): The HTTP request. transaction_id (str): The transaction_id associated with this request. This is *not* None. Returns: Deferred: Results in a tuple of `(code, response)`, where `response` is a python dict to be converted into JSON that is used as the response body. """ # Parse the request try: transaction_data = content logger.debug( "Decoded %s: %s", transaction_id, str(transaction_data) ) # We should ideally be getting this from the security layer. # origin = body["origin"] # Add some extra data to the transaction dict that isn't included # in the request body. transaction_data.update( transaction_id=transaction_id, destination=self.server_name ) except Exception as e: logger.exception(e) defer.returnValue((400, {"error": "Invalid transaction"})) return try: code, response = yield self.handler.on_incoming_transaction( transaction_data ) except: logger.exception("on_incoming_transaction failed") raise defer.returnValue((code, response)) class FederationPullServlet(BaseFederationServlet): PATH = "/pull/$" # This is for when someone asks us for everything since version X def on_GET(self, origin, content, query): return self.handler.on_pull_request(query["origin"][0], query["v"]) class FederationEventServlet(BaseFederationServlet): PATH = "/event/([^/]*)/$" # This is when someone asks for a data item for a given server data_id pair. def on_GET(self, origin, content, query, event_id): return self.handler.on_pdu_request(origin, event_id) class FederationStateServlet(BaseFederationServlet): PATH = "/state/([^/]*)/$" # This is when someone asks for all data for a given context. def on_GET(self, origin, content, query, context): return self.handler.on_context_state_request(origin, context, query.get("event_id", [None])[0], ) class FederationBackfillServlet(BaseFederationServlet): PATH = "/backfill/([^/]*)/$" def on_GET(self, origin, content, query, context): versions = query["v"] limits = query["limit"] if not limits: return defer.succeed((400, {"error": "Did not include limit param"})) limit = int(limits[-1]) return self.handler.on_backfill_request(origin, context, versions, limit) class FederationQueryServlet(BaseFederationServlet): PATH = "/query/([^/]*)$" # This is when we receive a server-server Query def on_GET(self, origin, content, query, query_type): return self.handler.on_query_request(query_type, {k: v[0].decode("utf-8") for k, v in query.items()} ) class FederationMakeJoinServlet(BaseFederationServlet): PATH = "/make_join/([^/]*)/([^/]*)$" @defer.inlineCallbacks def on_GET(self, origin, content, query, context, user_id): content = yield self.handler.on_make_join_request(context, user_id) defer.returnValue((200, content)) class FederationEventAuthServlet(BaseFederationServlet): PATH = "/event_auth/([^/]*)/([^/]*)$" def on_GET(self, origin, content, query, context, event_id): return self.handler.on_event_auth(origin, context, event_id) class FederationSendJoinServlet(BaseFederationServlet): PATH = "/send_join/([^/]*)/([^/]*)$" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): # TODO(paul): assert that context/event_id parsed from path actually # match those given in content content = yield self.handler.on_send_join_request(origin, content) defer.returnValue((200, content)) class FederationInviteServlet(BaseFederationServlet): PATH = "/invite/([^/]*)/([^/]*)$" @defer.inlineCallbacks def on_PUT(self, origin, content, query, context, event_id): # TODO(paul): assert that context/event_id parsed from path actually # match those given in content content = yield self.handler.on_invite_request(origin, content) defer.returnValue((200, content)) class FederationQueryAuthServlet(BaseFederationServlet): PATH = "/query_auth/([^/]*)/([^/]*)$" @defer.inlineCallbacks def on_POST(self, origin, content, query, context, event_id): new_content = yield self.handler.on_query_auth_request( origin, content, event_id ) defer.returnValue((200, new_content)) class FederationGetMissingEventsServlet(BaseFederationServlet): PATH = "/get_missing_events/([^/]*)/?$" @defer.inlineCallbacks def on_POST(self, origin, content, query, room_id): limit = int(content.get("limit", 10)) min_depth = int(content.get("min_depth", 0)) earliest_events = content.get("earliest_events", []) latest_events = content.get("latest_events", []) content = yield self.handler.on_get_missing_events( origin, room_id=room_id, earliest_events=earliest_events, latest_events=latest_events, min_depth=min_depth, limit=limit, ) defer.returnValue((200, content))