diff options
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 241 |
1 files changed, 171 insertions, 70 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 747a791f83..bf1aa29502 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,48 +13,46 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.util.retryutils -from twisted.internet import defer, reactor, protocol -from twisted.internet.error import DNSLookupError -from twisted.web.client import readBody, HTTPConnectionPool, Agent -from twisted.web.http_headers import Headers -from twisted.web._newclient import ResponseDone - -from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util.async import sleep -from synapse.util import logcontext -import synapse.metrics - -from canonicaljson import encode_canonical_json - -from synapse.api.errors import ( - SynapseError, Codes, HttpResponseException, -) - -from signedjson.sign import sign_json - import cgi -import simplejson as json import logging import random import sys import urllib -import urlparse +from six import string_types +from six.moves.urllib import parse as urlparse -logger = logging.getLogger(__name__) -outbound_logger = logging.getLogger("synapse.http.outbound") +from canonicaljson import encode_canonical_json, json +from prometheus_client import Counter +from signedjson.sign import sign_json -metrics = synapse.metrics.get_metrics_for(__name__) +from twisted.internet import defer, protocol, reactor +from twisted.internet.error import DNSLookupError +from twisted.web._newclient import ResponseDone +from twisted.web.client import Agent, HTTPConnectionPool, readBody +from twisted.web.http_headers import Headers -outgoing_requests_counter = metrics.register_counter( - "requests", - labels=["method"], -) -incoming_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], +import synapse.metrics +import synapse.util.retryutils +from synapse.api.errors import ( + Codes, + FederationDeniedError, + HttpResponseException, + SynapseError, ) +from synapse.http import cancelled_to_request_timed_out_error +from synapse.http.endpoint import matrix_federation_endpoint +from synapse.util import logcontext +from synapse.util.async import add_timeout_to_deferred +from synapse.util.logcontext import make_deferred_yieldable + +logger = logging.getLogger(__name__) +outbound_logger = logging.getLogger("synapse.http.outbound") + +outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", + "", ["method"]) +incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses", + "", ["method", "code"]) MAX_LONG_RETRIES = 10 @@ -123,11 +122,22 @@ class MatrixFederationHttpClient(object): Fails with ``HTTPRequestException``: if we get an HTTP response code >= 300. + Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist + (May also fail with plenty of other Exceptions for things like DNS failures, connection failures, SSL failures.) """ + if ( + self.hs.config.federation_domain_whitelist and + destination not in self.hs.config.federation_domain_whitelist + ): + raise FederationDeniedError(destination) + limiter = yield synapse.util.retryutils.get_retry_limiter( destination, self.clock, @@ -173,21 +183,21 @@ class MatrixFederationHttpClient(object): producer = body_callback(method, http_url_bytes, headers_dict) try: - def send_request(): - request_deferred = self.agent.request( - method, - url_bytes, - Headers(headers_dict), - producer - ) - - return self.clock.time_bound_deferred( - request_deferred, - time_out=timeout / 1000. if timeout else 60, - ) - - with logcontext.PreserveLoggingContext(): - response = yield send_request() + request_deferred = self.agent.request( + method, + url_bytes, + Headers(headers_dict), + producer + ) + add_timeout_to_deferred( + request_deferred, + timeout / 1000. if timeout else 60, + self.hs.get_reactor(), + cancelled_to_request_timed_out_error, + ) + response = yield make_deferred_yieldable( + request_deferred, + ) log_result = "%d %s" % (response.code, response.phrase,) break @@ -204,18 +214,15 @@ class MatrixFederationHttpClient(object): raise logger.warn( - "{%s} Sending request failed to %s: %s %s: %s - %s", + "{%s} Sending request failed to %s: %s %s: %s", txn_id, destination, method, url_bytes, - type(e).__name__, _flatten_response_never_received(e), ) - log_result = "%s - %s" % ( - type(e).__name__, _flatten_response_never_received(e), - ) + log_result = _flatten_response_never_received(e) if retries_left and not timeout: if long_retries: @@ -227,7 +234,7 @@ class MatrixFederationHttpClient(object): delay = min(delay, 2) delay *= random.uniform(0.8, 1.4) - yield sleep(delay) + yield self.clock.sleep(delay) retries_left -= 1 else: raise @@ -253,14 +260,35 @@ class MatrixFederationHttpClient(object): defer.returnValue(response) def sign_request(self, destination, method, url_bytes, headers_dict, - content=None): + content=None, destination_is=None): + """ + Signs a request by adding an Authorization header to headers_dict + Args: + destination (bytes|None): The desination home server of the request. + May be None if the destination is an identity server, in which case + destination_is must be non-None. + method (bytes): The HTTP method of the request + url_bytes (bytes): The URI path of the request + headers_dict (dict): Dictionary of request headers to append to + content (bytes): The body of the request + destination_is (bytes): As 'destination', but if the destination is an + identity server + + Returns: + None + """ request = { "method": method, "uri": url_bytes, "origin": self.server_name, - "destination": destination, } + if destination is not None: + request["destination"] = destination + + if destination_is is not None: + request["destination_is"] = destination_is + if content is not None: request["content"] = content @@ -278,7 +306,8 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers @defer.inlineCallbacks - def put_json(self, destination, path, data={}, json_data_callback=None, + def put_json(self, destination, path, args={}, data={}, + json_data_callback=None, long_retries=False, timeout=None, ignore_backoff=False, backoff_on_404=False): @@ -288,6 +317,7 @@ class MatrixFederationHttpClient(object): destination (str): The remote server to send the HTTP request to. path (str): The HTTP path. + args (dict): query params data (dict): A dict containing the data that will be used as the request body. This will be encoded as JSON. json_data_callback (callable): A callable returning the dict to @@ -311,6 +341,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ if not json_data_callback: @@ -331,6 +364,7 @@ class MatrixFederationHttpClient(object): path, body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, + query_bytes=encode_query_args(args), long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, @@ -347,7 +381,7 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def post_json(self, destination, path, data={}, long_retries=False, - timeout=None, ignore_backoff=False): + timeout=None, ignore_backoff=False, args={}): """ Sends the specifed json data using POST Args: @@ -362,6 +396,7 @@ class MatrixFederationHttpClient(object): giving up. None indicates no timeout. ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. + args (dict): query params Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. @@ -371,6 +406,9 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ def body_callback(method, url_bytes, headers_dict): @@ -383,6 +421,7 @@ class MatrixFederationHttpClient(object): destination, "POST", path, + query_bytes=encode_query_args(args), body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, long_retries=long_retries, @@ -424,16 +463,12 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ logger.debug("get_json args: %s", args) - encoded_args = {} - for k, vs in args.items(): - if isinstance(vs, basestring): - vs = [vs] - encoded_args[k] = [v.encode("UTF-8") for v in vs] - - query_bytes = urllib.urlencode(encoded_args, True) logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) def body_callback(method, url_bytes, headers_dict): @@ -444,7 +479,7 @@ class MatrixFederationHttpClient(object): destination, "GET", path, - query_bytes=query_bytes, + query_bytes=encode_query_args(args), body_callback=body_callback, retry_on_dns_fail=retry_on_dns_fail, timeout=timeout, @@ -461,6 +496,55 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks + def delete_json(self, destination, path, long_retries=False, + timeout=None, ignore_backoff=False, args={}): + """Send a DELETE request to the remote expecting some json response + + Args: + destination (str): The remote server to send the HTTP request + to. + path (str): The HTTP path. + long_retries (bool): A boolean that indicates whether we should + retry for a short or long time. + timeout(int): How long to try (in ms) the destination for before + giving up. None indicates no timeout. + ignore_backoff (bool): true to ignore the historical backoff data and + try the request anyway. + Returns: + Deferred: Succeeds when we get a 2xx HTTP response. The result + will be the decoded JSON body. + + Fails with ``HTTPRequestException`` if we get an HTTP response + code >= 300. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist + """ + + response = yield self._request( + destination, + "DELETE", + path, + query_bytes=encode_query_args(args), + headers_dict={"Content-Type": ["application/json"]}, + long_retries=long_retries, + timeout=timeout, + ignore_backoff=ignore_backoff, + ) + + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + check_content_type_is_json(response.headers) + + with logcontext.PreserveLoggingContext(): + body = yield readBody(response) + + defer.returnValue(json.loads(body)) + + @defer.inlineCallbacks def get_file(self, destination, path, output_stream, args={}, retry_on_dns_fail=True, max_size=None, ignore_backoff=False): @@ -481,11 +565,14 @@ class MatrixFederationHttpClient(object): Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ encoded_args = {} for k, vs in args.items(): - if isinstance(vs, basestring): + if isinstance(vs, string_types): vs = [vs] encoded_args[k] = [v.encode("UTF-8") for v in vs] @@ -513,7 +600,7 @@ class MatrixFederationHttpClient(object): length = yield _readBodyToFile( response, output_stream, max_size ) - except: + except Exception: logger.exception("Failed to download body") raise @@ -578,12 +665,14 @@ class _JsonProducer(object): def _flatten_response_never_received(e): if hasattr(e, "reasons"): - return ", ".join( + reasons = ", ".join( _flatten_response_never_received(f.value) for f in e.reasons ) + + return "%s:[%s]" % (type(e).__name__, reasons) else: - return "%s: %s" % (type(e).__name__, e.message,) + return repr(e) def check_content_type_is_json(headers): @@ -598,7 +687,7 @@ def check_content_type_is_json(headers): RuntimeError if the """ - c_type = headers.getRawHeaders("Content-Type") + c_type = headers.getRawHeaders(b"Content-Type") if c_type is None: raise RuntimeError( "No Content-Type header" @@ -610,3 +699,15 @@ def check_content_type_is_json(headers): raise RuntimeError( "Content-Type not application/json: was '%s'" % c_type ) + + +def encode_query_args(args): + encoded_args = {} + for k, vs in args.items(): + if isinstance(vs, string_types): + vs = [vs] + encoded_args[k] = [v.encode("UTF-8") for v in vs] + + query_bytes = urllib.urlencode(encoded_args, True) + + return query_bytes |