summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py241
1 files changed, 171 insertions, 70 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 747a791f83..bf1aa29502 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,48 +13,46 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import synapse.util.retryutils
-from twisted.internet import defer, reactor, protocol
-from twisted.internet.error import DNSLookupError
-from twisted.web.client import readBody, HTTPConnectionPool, Agent
-from twisted.web.http_headers import Headers
-from twisted.web._newclient import ResponseDone
-
-from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util.async import sleep
-from synapse.util import logcontext
-import synapse.metrics
-
-from canonicaljson import encode_canonical_json
-
-from synapse.api.errors import (
-    SynapseError, Codes, HttpResponseException,
-)
-
-from signedjson.sign import sign_json
-
 import cgi
-import simplejson as json
 import logging
 import random
 import sys
 import urllib
-import urlparse
 
+from six import string_types
+from six.moves.urllib import parse as urlparse
 
-logger = logging.getLogger(__name__)
-outbound_logger = logging.getLogger("synapse.http.outbound")
+from canonicaljson import encode_canonical_json, json
+from prometheus_client import Counter
+from signedjson.sign import sign_json
 
-metrics = synapse.metrics.get_metrics_for(__name__)
+from twisted.internet import defer, protocol, reactor
+from twisted.internet.error import DNSLookupError
+from twisted.web._newclient import ResponseDone
+from twisted.web.client import Agent, HTTPConnectionPool, readBody
+from twisted.web.http_headers import Headers
 
-outgoing_requests_counter = metrics.register_counter(
-    "requests",
-    labels=["method"],
-)
-incoming_responses_counter = metrics.register_counter(
-    "responses",
-    labels=["method", "code"],
+import synapse.metrics
+import synapse.util.retryutils
+from synapse.api.errors import (
+    Codes,
+    FederationDeniedError,
+    HttpResponseException,
+    SynapseError,
 )
+from synapse.http import cancelled_to_request_timed_out_error
+from synapse.http.endpoint import matrix_federation_endpoint
+from synapse.util import logcontext
+from synapse.util.async import add_timeout_to_deferred
+from synapse.util.logcontext import make_deferred_yieldable
+
+logger = logging.getLogger(__name__)
+outbound_logger = logging.getLogger("synapse.http.outbound")
+
+outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
+                                    "", ["method"])
+incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
+                                     "", ["method", "code"])
 
 
 MAX_LONG_RETRIES = 10
@@ -123,11 +122,22 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``HTTPRequestException``: if we get an HTTP response
                 code >= 300.
+
             Fails with ``NotRetryingDestination`` if we are not yet ready
                 to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+                is not on our federation whitelist
+
             (May also fail with plenty of other Exceptions for things like DNS
                 failures, connection failures, SSL failures.)
         """
+        if (
+            self.hs.config.federation_domain_whitelist and
+            destination not in self.hs.config.federation_domain_whitelist
+        ):
+            raise FederationDeniedError(destination)
+
         limiter = yield synapse.util.retryutils.get_retry_limiter(
             destination,
             self.clock,
@@ -173,21 +183,21 @@ class MatrixFederationHttpClient(object):
                         producer = body_callback(method, http_url_bytes, headers_dict)
 
                     try:
-                        def send_request():
-                            request_deferred = self.agent.request(
-                                method,
-                                url_bytes,
-                                Headers(headers_dict),
-                                producer
-                            )
-
-                            return self.clock.time_bound_deferred(
-                                request_deferred,
-                                time_out=timeout / 1000. if timeout else 60,
-                            )
-
-                        with logcontext.PreserveLoggingContext():
-                            response = yield send_request()
+                        request_deferred = self.agent.request(
+                            method,
+                            url_bytes,
+                            Headers(headers_dict),
+                            producer
+                        )
+                        add_timeout_to_deferred(
+                            request_deferred,
+                            timeout / 1000. if timeout else 60,
+                            self.hs.get_reactor(),
+                            cancelled_to_request_timed_out_error,
+                        )
+                        response = yield make_deferred_yieldable(
+                            request_deferred,
+                        )
 
                         log_result = "%d %s" % (response.code, response.phrase,)
                         break
@@ -204,18 +214,15 @@ class MatrixFederationHttpClient(object):
                             raise
 
                         logger.warn(
-                            "{%s} Sending request failed to %s: %s %s: %s - %s",
+                            "{%s} Sending request failed to %s: %s %s: %s",
                             txn_id,
                             destination,
                             method,
                             url_bytes,
-                            type(e).__name__,
                             _flatten_response_never_received(e),
                         )
 
-                        log_result = "%s - %s" % (
-                            type(e).__name__, _flatten_response_never_received(e),
-                        )
+                        log_result = _flatten_response_never_received(e)
 
                         if retries_left and not timeout:
                             if long_retries:
@@ -227,7 +234,7 @@ class MatrixFederationHttpClient(object):
                                 delay = min(delay, 2)
                                 delay *= random.uniform(0.8, 1.4)
 
-                            yield sleep(delay)
+                            yield self.clock.sleep(delay)
                             retries_left -= 1
                         else:
                             raise
@@ -253,14 +260,35 @@ class MatrixFederationHttpClient(object):
             defer.returnValue(response)
 
     def sign_request(self, destination, method, url_bytes, headers_dict,
-                     content=None):
+                     content=None, destination_is=None):
+        """
+        Signs a request by adding an Authorization header to headers_dict
+        Args:
+            destination (bytes|None): The desination home server of the request.
+                May be None if the destination is an identity server, in which case
+                destination_is must be non-None.
+            method (bytes): The HTTP method of the request
+            url_bytes (bytes): The URI path of the request
+            headers_dict (dict): Dictionary of request headers to append to
+            content (bytes): The body of the request
+            destination_is (bytes): As 'destination', but if the destination is an
+                identity server
+
+        Returns:
+            None
+        """
         request = {
             "method": method,
             "uri": url_bytes,
             "origin": self.server_name,
-            "destination": destination,
         }
 
+        if destination is not None:
+            request["destination"] = destination
+
+        if destination_is is not None:
+            request["destination_is"] = destination_is
+
         if content is not None:
             request["content"] = content
 
@@ -278,7 +306,8 @@ class MatrixFederationHttpClient(object):
         headers_dict[b"Authorization"] = auth_headers
 
     @defer.inlineCallbacks
-    def put_json(self, destination, path, data={}, json_data_callback=None,
+    def put_json(self, destination, path, args={}, data={},
+                 json_data_callback=None,
                  long_retries=False, timeout=None,
                  ignore_backoff=False,
                  backoff_on_404=False):
@@ -288,6 +317,7 @@ class MatrixFederationHttpClient(object):
             destination (str): The remote server to send the HTTP request
                 to.
             path (str): The HTTP path.
+            args (dict): query params
             data (dict): A dict containing the data that will be used as
                 the request body. This will be encoded as JSON.
             json_data_callback (callable): A callable returning the dict to
@@ -311,6 +341,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         if not json_data_callback:
@@ -331,6 +364,7 @@ class MatrixFederationHttpClient(object):
             path,
             body_callback=body_callback,
             headers_dict={"Content-Type": ["application/json"]},
+            query_bytes=encode_query_args(args),
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
@@ -347,7 +381,7 @@ class MatrixFederationHttpClient(object):
 
     @defer.inlineCallbacks
     def post_json(self, destination, path, data={}, long_retries=False,
-                  timeout=None, ignore_backoff=False):
+                  timeout=None, ignore_backoff=False, args={}):
         """ Sends the specifed json data using POST
 
         Args:
@@ -362,6 +396,7 @@ class MatrixFederationHttpClient(object):
                 giving up. None indicates no timeout.
             ignore_backoff (bool): true to ignore the historical backoff data and
                 try the request anyway.
+            args (dict): query params
         Returns:
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body.
@@ -371,6 +406,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         def body_callback(method, url_bytes, headers_dict):
@@ -383,6 +421,7 @@ class MatrixFederationHttpClient(object):
             destination,
             "POST",
             path,
+            query_bytes=encode_query_args(args),
             body_callback=body_callback,
             headers_dict={"Content-Type": ["application/json"]},
             long_retries=long_retries,
@@ -424,16 +463,12 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
         logger.debug("get_json args: %s", args)
 
-        encoded_args = {}
-        for k, vs in args.items():
-            if isinstance(vs, basestring):
-                vs = [vs]
-            encoded_args[k] = [v.encode("UTF-8") for v in vs]
-
-        query_bytes = urllib.urlencode(encoded_args, True)
         logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
 
         def body_callback(method, url_bytes, headers_dict):
@@ -444,7 +479,7 @@ class MatrixFederationHttpClient(object):
             destination,
             "GET",
             path,
-            query_bytes=query_bytes,
+            query_bytes=encode_query_args(args),
             body_callback=body_callback,
             retry_on_dns_fail=retry_on_dns_fail,
             timeout=timeout,
@@ -461,6 +496,55 @@ class MatrixFederationHttpClient(object):
         defer.returnValue(json.loads(body))
 
     @defer.inlineCallbacks
+    def delete_json(self, destination, path, long_retries=False,
+                    timeout=None, ignore_backoff=False, args={}):
+        """Send a DELETE request to the remote expecting some json response
+
+        Args:
+            destination (str): The remote server to send the HTTP request
+                to.
+            path (str): The HTTP path.
+            long_retries (bool): A boolean that indicates whether we should
+                retry for a short or long time.
+            timeout(int): How long to try (in ms) the destination for before
+                giving up. None indicates no timeout.
+            ignore_backoff (bool): true to ignore the historical backoff data and
+                try the request anyway.
+        Returns:
+            Deferred: Succeeds when we get a 2xx HTTP response. The result
+            will be the decoded JSON body.
+
+            Fails with ``HTTPRequestException`` if we get an HTTP response
+            code >= 300.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
+        """
+
+        response = yield self._request(
+            destination,
+            "DELETE",
+            path,
+            query_bytes=encode_query_args(args),
+            headers_dict={"Content-Type": ["application/json"]},
+            long_retries=long_retries,
+            timeout=timeout,
+            ignore_backoff=ignore_backoff,
+        )
+
+        if 200 <= response.code < 300:
+            # We need to update the transactions table to say it was sent?
+            check_content_type_is_json(response.headers)
+
+        with logcontext.PreserveLoggingContext():
+            body = yield readBody(response)
+
+        defer.returnValue(json.loads(body))
+
+    @defer.inlineCallbacks
     def get_file(self, destination, path, output_stream, args={},
                  retry_on_dns_fail=True, max_size=None,
                  ignore_backoff=False):
@@ -481,11 +565,14 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
             to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
 
         encoded_args = {}
         for k, vs in args.items():
-            if isinstance(vs, basestring):
+            if isinstance(vs, string_types):
                 vs = [vs]
             encoded_args[k] = [v.encode("UTF-8") for v in vs]
 
@@ -513,7 +600,7 @@ class MatrixFederationHttpClient(object):
                 length = yield _readBodyToFile(
                     response, output_stream, max_size
                 )
-        except:
+        except Exception:
             logger.exception("Failed to download body")
             raise
 
@@ -578,12 +665,14 @@ class _JsonProducer(object):
 
 def _flatten_response_never_received(e):
     if hasattr(e, "reasons"):
-        return ", ".join(
+        reasons = ", ".join(
             _flatten_response_never_received(f.value)
             for f in e.reasons
         )
+
+        return "%s:[%s]" % (type(e).__name__, reasons)
     else:
-        return "%s: %s" % (type(e).__name__, e.message,)
+        return repr(e)
 
 
 def check_content_type_is_json(headers):
@@ -598,7 +687,7 @@ def check_content_type_is_json(headers):
         RuntimeError if the
 
     """
-    c_type = headers.getRawHeaders("Content-Type")
+    c_type = headers.getRawHeaders(b"Content-Type")
     if c_type is None:
         raise RuntimeError(
             "No Content-Type header"
@@ -610,3 +699,15 @@ def check_content_type_is_json(headers):
         raise RuntimeError(
             "Content-Type not application/json: was '%s'" % c_type
         )
+
+
+def encode_query_args(args):
+    encoded_args = {}
+    for k, vs in args.items():
+        if isinstance(vs, string_types):
+            vs = [vs]
+        encoded_args[k] = [v.encode("UTF-8") for v in vs]
+
+    query_bytes = urllib.urlencode(encoded_args, True)
+
+    return query_bytes