summary refs log tree commit diff
path: root/synapse/http
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2019-11-05 17:08:07 +0000
committerGitHub <noreply@github.com>2019-11-05 17:08:07 +0000
commit55a7da247a912ca2a4d539a63200fca0d6541f46 (patch)
treec0445991317e3dc26def157bfeb70d76d1b830c5 /synapse/http
parentApply suggestions from code review (diff)
parentImprove documentation for EventContext fields (#6319) (diff)
downloadsynapse-55a7da247a912ca2a4d539a63200fca0d6541f46.tar.xz
Merge branch 'develop' into rav/url_preview_limit_title
Diffstat (limited to 'synapse/http')
-rw-r--r--synapse/http/client.py21
-rw-r--r--synapse/http/connectproxyclient.py195
-rw-r--r--synapse/http/federation/srv_resolver.py2
-rw-r--r--synapse/http/matrixfederationclient.py10
-rw-r--r--synapse/http/proxyagent.py195
-rw-r--r--synapse/http/request_metrics.py2
-rw-r--r--synapse/http/server.py2
-rw-r--r--synapse/http/servlet.py4
-rw-r--r--synapse/http/site.py4
9 files changed, 419 insertions, 16 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py
index cdf828a4ff..d4c285445e 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -45,6 +45,7 @@ from synapse.http import (
     cancelled_to_request_timed_out_error,
     redact_uri,
 )
+from synapse.http.proxyagent import ProxyAgent
 from synapse.logging.context import make_deferred_yieldable
 from synapse.logging.opentracing import set_tag, start_active_span, tags
 from synapse.util.async_helpers import timeout_deferred
@@ -183,7 +184,15 @@ class SimpleHttpClient(object):
     using HTTP in Matrix
     """
 
-    def __init__(self, hs, treq_args={}, ip_whitelist=None, ip_blacklist=None):
+    def __init__(
+        self,
+        hs,
+        treq_args={},
+        ip_whitelist=None,
+        ip_blacklist=None,
+        http_proxy=None,
+        https_proxy=None,
+    ):
         """
         Args:
             hs (synapse.server.HomeServer)
@@ -192,6 +201,8 @@ class SimpleHttpClient(object):
                 we may not request.
             ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can
                request if it were otherwise caught in a blacklist.
+            http_proxy (bytes): proxy server to use for http connections. host[:port]
+            https_proxy (bytes): proxy server to use for https connections. host[:port]
         """
         self.hs = hs
 
@@ -236,11 +247,13 @@ class SimpleHttpClient(object):
         # The default context factory in Twisted 14.0.0 (which we require) is
         # BrowserLikePolicyForHTTPS which will do regular cert validation
         # 'like a browser'
-        self.agent = Agent(
+        self.agent = ProxyAgent(
             self.reactor,
             connectTimeout=15,
             contextFactory=self.hs.get_http_client_context_factory(),
             pool=pool,
+            http_proxy=http_proxy,
+            https_proxy=https_proxy,
         )
 
         if self._ip_blacklist:
@@ -535,7 +548,7 @@ class SimpleHttpClient(object):
             b"Content-Length" in resp_headers
             and int(resp_headers[b"Content-Length"][0]) > max_size
         ):
-            logger.warn("Requested URL is too large > %r bytes" % (self.max_size,))
+            logger.warning("Requested URL is too large > %r bytes" % (self.max_size,))
             raise SynapseError(
                 502,
                 "Requested file is too large > %r bytes" % (self.max_size,),
@@ -543,7 +556,7 @@ class SimpleHttpClient(object):
             )
 
         if response.code > 299:
-            logger.warn("Got %d when downloading %s" % (response.code, url))
+            logger.warning("Got %d when downloading %s" % (response.code, url))
             raise SynapseError(502, "Got error %d" % (response.code,), Codes.UNKNOWN)
 
         # TODO: if our Content-Type is HTML or something, just read the first
diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py
new file mode 100644
index 0000000000..be7b2ceb8e
--- /dev/null
+++ b/synapse/http/connectproxyclient.py
@@ -0,0 +1,195 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from zope.interface import implementer
+
+from twisted.internet import defer, protocol
+from twisted.internet.error import ConnectError
+from twisted.internet.interfaces import IStreamClientEndpoint
+from twisted.internet.protocol import connectionDone
+from twisted.web import http
+
+logger = logging.getLogger(__name__)
+
+
+class ProxyConnectError(ConnectError):
+    pass
+
+
+@implementer(IStreamClientEndpoint)
+class HTTPConnectProxyEndpoint(object):
+    """An Endpoint implementation which will send a CONNECT request to an http proxy
+
+    Wraps an existing HostnameEndpoint for the proxy.
+
+    When we get the connect() request from the connection pool (via the TLS wrapper),
+    we'll first connect to the proxy endpoint with a ProtocolFactory which will make the
+    CONNECT request. Once that completes, we invoke the protocolFactory which was passed
+    in.
+
+    Args:
+        reactor: the Twisted reactor to use for the connection
+        proxy_endpoint (IStreamClientEndpoint): the endpoint to use to connect to the
+            proxy
+        host (bytes): hostname that we want to CONNECT to
+        port (int): port that we want to connect to
+    """
+
+    def __init__(self, reactor, proxy_endpoint, host, port):
+        self._reactor = reactor
+        self._proxy_endpoint = proxy_endpoint
+        self._host = host
+        self._port = port
+
+    def __repr__(self):
+        return "<HTTPConnectProxyEndpoint %s>" % (self._proxy_endpoint,)
+
+    def connect(self, protocolFactory):
+        f = HTTPProxiedClientFactory(self._host, self._port, protocolFactory)
+        d = self._proxy_endpoint.connect(f)
+        # once the tcp socket connects successfully, we need to wait for the
+        # CONNECT to complete.
+        d.addCallback(lambda conn: f.on_connection)
+        return d
+
+
+class HTTPProxiedClientFactory(protocol.ClientFactory):
+    """ClientFactory wrapper that triggers an HTTP proxy CONNECT on connect.
+
+    Once the CONNECT completes, invokes the original ClientFactory to build the
+    HTTP Protocol object and run the rest of the connection.
+
+    Args:
+        dst_host (bytes): hostname that we want to CONNECT to
+        dst_port (int): port that we want to connect to
+        wrapped_factory (protocol.ClientFactory): The original Factory
+    """
+
+    def __init__(self, dst_host, dst_port, wrapped_factory):
+        self.dst_host = dst_host
+        self.dst_port = dst_port
+        self.wrapped_factory = wrapped_factory
+        self.on_connection = defer.Deferred()
+
+    def startedConnecting(self, connector):
+        return self.wrapped_factory.startedConnecting(connector)
+
+    def buildProtocol(self, addr):
+        wrapped_protocol = self.wrapped_factory.buildProtocol(addr)
+
+        return HTTPConnectProtocol(
+            self.dst_host, self.dst_port, wrapped_protocol, self.on_connection
+        )
+
+    def clientConnectionFailed(self, connector, reason):
+        logger.debug("Connection to proxy failed: %s", reason)
+        if not self.on_connection.called:
+            self.on_connection.errback(reason)
+        return self.wrapped_factory.clientConnectionFailed(connector, reason)
+
+    def clientConnectionLost(self, connector, reason):
+        logger.debug("Connection to proxy lost: %s", reason)
+        if not self.on_connection.called:
+            self.on_connection.errback(reason)
+        return self.wrapped_factory.clientConnectionLost(connector, reason)
+
+
+class HTTPConnectProtocol(protocol.Protocol):
+    """Protocol that wraps an existing Protocol to do a CONNECT handshake at connect
+
+    Args:
+        host (bytes): The original HTTP(s) hostname or IPv4 or IPv6 address literal
+            to put in the CONNECT request
+
+        port (int): The original HTTP(s) port to put in the CONNECT request
+
+        wrapped_protocol (interfaces.IProtocol): the original protocol (probably
+            HTTPChannel or TLSMemoryBIOProtocol, but could be anything really)
+
+        connected_deferred (Deferred): a Deferred which will be callbacked with
+            wrapped_protocol when the CONNECT completes
+    """
+
+    def __init__(self, host, port, wrapped_protocol, connected_deferred):
+        self.host = host
+        self.port = port
+        self.wrapped_protocol = wrapped_protocol
+        self.connected_deferred = connected_deferred
+        self.http_setup_client = HTTPConnectSetupClient(self.host, self.port)
+        self.http_setup_client.on_connected.addCallback(self.proxyConnected)
+
+    def connectionMade(self):
+        self.http_setup_client.makeConnection(self.transport)
+
+    def connectionLost(self, reason=connectionDone):
+        if self.wrapped_protocol.connected:
+            self.wrapped_protocol.connectionLost(reason)
+
+        self.http_setup_client.connectionLost(reason)
+
+        if not self.connected_deferred.called:
+            self.connected_deferred.errback(reason)
+
+    def proxyConnected(self, _):
+        self.wrapped_protocol.makeConnection(self.transport)
+
+        self.connected_deferred.callback(self.wrapped_protocol)
+
+        # Get any pending data from the http buf and forward it to the original protocol
+        buf = self.http_setup_client.clearLineBuffer()
+        if buf:
+            self.wrapped_protocol.dataReceived(buf)
+
+    def dataReceived(self, data):
+        # if we've set up the HTTP protocol, we can send the data there
+        if self.wrapped_protocol.connected:
+            return self.wrapped_protocol.dataReceived(data)
+
+        # otherwise, we must still be setting up the connection: send the data to the
+        # setup client
+        return self.http_setup_client.dataReceived(data)
+
+
+class HTTPConnectSetupClient(http.HTTPClient):
+    """HTTPClient protocol to send a CONNECT message for proxies and read the response.
+
+    Args:
+        host (bytes): The hostname to send in the CONNECT message
+        port (int): The port to send in the CONNECT message
+    """
+
+    def __init__(self, host, port):
+        self.host = host
+        self.port = port
+        self.on_connected = defer.Deferred()
+
+    def connectionMade(self):
+        logger.debug("Connected to proxy, sending CONNECT")
+        self.sendCommand(b"CONNECT", b"%s:%d" % (self.host, self.port))
+        self.endHeaders()
+
+    def handleStatus(self, version, status, message):
+        logger.debug("Got Status: %s %s %s", status, message, version)
+        if status != b"200":
+            raise ProxyConnectError("Unexpected status on CONNECT: %s" % status)
+
+    def handleEndHeaders(self):
+        logger.debug("End Headers")
+        self.on_connected.callback(None)
+
+    def handleResponse(self, body):
+        pass
diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py
index 3fe4ffb9e5..021b233a7d 100644
--- a/synapse/http/federation/srv_resolver.py
+++ b/synapse/http/federation/srv_resolver.py
@@ -148,7 +148,7 @@ class SrvResolver(object):
             # Try something in the cache, else rereaise
             cache_entry = self._cache.get(service_name, None)
             if cache_entry:
-                logger.warn(
+                logger.warning(
                     "Failed to resolve %r, falling back to cache. %r", service_name, e
                 )
                 return list(cache_entry)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 3f7c93ffcb..691380abda 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -149,7 +149,7 @@ def _handle_json_response(reactor, timeout_sec, request, response):
 
         body = yield make_deferred_yieldable(d)
     except Exception as e:
-        logger.warn(
+        logger.warning(
             "{%s} [%s] Error reading response: %s",
             request.txn_id,
             request.destination,
@@ -457,7 +457,7 @@ class MatrixFederationHttpClient(object):
                         except Exception as e:
                             # Eh, we're already going to raise an exception so lets
                             # ignore if this fails.
-                            logger.warn(
+                            logger.warning(
                                 "{%s} [%s] Failed to get error response: %s %s: %s",
                                 request.txn_id,
                                 request.destination,
@@ -478,7 +478,7 @@ class MatrixFederationHttpClient(object):
 
                     break
                 except RequestSendFailed as e:
-                    logger.warn(
+                    logger.warning(
                         "{%s} [%s] Request failed: %s %s: %s",
                         request.txn_id,
                         request.destination,
@@ -513,7 +513,7 @@ class MatrixFederationHttpClient(object):
                         raise
 
                 except Exception as e:
-                    logger.warn(
+                    logger.warning(
                         "{%s} [%s] Request failed: %s %s: %s",
                         request.txn_id,
                         request.destination,
@@ -889,7 +889,7 @@ class MatrixFederationHttpClient(object):
             d.addTimeout(self.default_timeout, self.reactor)
             length = yield make_deferred_yieldable(d)
         except Exception as e:
-            logger.warn(
+            logger.warning(
                 "{%s} [%s] Error reading response: %s",
                 request.txn_id,
                 request.destination,
diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py
new file mode 100644
index 0000000000..332da02a8d
--- /dev/null
+++ b/synapse/http/proxyagent.py
@@ -0,0 +1,195 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import re
+
+from zope.interface import implementer
+
+from twisted.internet import defer
+from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.python.failure import Failure
+from twisted.web.client import URI, BrowserLikePolicyForHTTPS, _AgentBase
+from twisted.web.error import SchemeNotSupported
+from twisted.web.iweb import IAgent
+
+from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint
+
+logger = logging.getLogger(__name__)
+
+_VALID_URI = re.compile(br"\A[\x21-\x7e]+\Z")
+
+
+@implementer(IAgent)
+class ProxyAgent(_AgentBase):
+    """An Agent implementation which will use an HTTP proxy if one was requested
+
+    Args:
+        reactor: twisted reactor to place outgoing
+            connections.
+
+        contextFactory (IPolicyForHTTPS): A factory for TLS contexts, to control the
+            verification parameters of OpenSSL.  The default is to use a
+            `BrowserLikePolicyForHTTPS`, so unless you have special
+            requirements you can leave this as-is.
+
+        connectTimeout (float): The amount of time that this Agent will wait
+            for the peer to accept a connection.
+
+        bindAddress (bytes): The local address for client sockets to bind to.
+
+        pool (HTTPConnectionPool|None): connection pool to be used. If None, a
+            non-persistent pool instance will be created.
+    """
+
+    def __init__(
+        self,
+        reactor,
+        contextFactory=BrowserLikePolicyForHTTPS(),
+        connectTimeout=None,
+        bindAddress=None,
+        pool=None,
+        http_proxy=None,
+        https_proxy=None,
+    ):
+        _AgentBase.__init__(self, reactor, pool)
+
+        self._endpoint_kwargs = {}
+        if connectTimeout is not None:
+            self._endpoint_kwargs["timeout"] = connectTimeout
+        if bindAddress is not None:
+            self._endpoint_kwargs["bindAddress"] = bindAddress
+
+        self.http_proxy_endpoint = _http_proxy_endpoint(
+            http_proxy, reactor, **self._endpoint_kwargs
+        )
+
+        self.https_proxy_endpoint = _http_proxy_endpoint(
+            https_proxy, reactor, **self._endpoint_kwargs
+        )
+
+        self._policy_for_https = contextFactory
+        self._reactor = reactor
+
+    def request(self, method, uri, headers=None, bodyProducer=None):
+        """
+        Issue a request to the server indicated by the given uri.
+
+        Supports `http` and `https` schemes.
+
+        An existing connection from the connection pool may be used or a new one may be
+        created.
+
+        See also: twisted.web.iweb.IAgent.request
+
+        Args:
+            method (bytes): The request method to use, such as `GET`, `POST`, etc
+
+            uri (bytes): The location of the resource to request.
+
+            headers (Headers|None): Extra headers to send with the request
+
+            bodyProducer (IBodyProducer|None): An object which can generate bytes to
+                make up the body of this request (for example, the properly encoded
+                contents of a file for a file upload). Or, None if the request is to
+                have no body.
+
+        Returns:
+            Deferred[IResponse]: completes when the header of the response has
+                 been received (regardless of the response status code).
+        """
+        uri = uri.strip()
+        if not _VALID_URI.match(uri):
+            raise ValueError("Invalid URI {!r}".format(uri))
+
+        parsed_uri = URI.fromBytes(uri)
+        pool_key = (parsed_uri.scheme, parsed_uri.host, parsed_uri.port)
+        request_path = parsed_uri.originForm
+
+        if parsed_uri.scheme == b"http" and self.http_proxy_endpoint:
+            # Cache *all* connections under the same key, since we are only
+            # connecting to a single destination, the proxy:
+            pool_key = ("http-proxy", self.http_proxy_endpoint)
+            endpoint = self.http_proxy_endpoint
+            request_path = uri
+        elif parsed_uri.scheme == b"https" and self.https_proxy_endpoint:
+            endpoint = HTTPConnectProxyEndpoint(
+                self._reactor,
+                self.https_proxy_endpoint,
+                parsed_uri.host,
+                parsed_uri.port,
+            )
+        else:
+            # not using a proxy
+            endpoint = HostnameEndpoint(
+                self._reactor, parsed_uri.host, parsed_uri.port, **self._endpoint_kwargs
+            )
+
+        logger.debug("Requesting %s via %s", uri, endpoint)
+
+        if parsed_uri.scheme == b"https":
+            tls_connection_creator = self._policy_for_https.creatorForNetloc(
+                parsed_uri.host, parsed_uri.port
+            )
+            endpoint = wrapClientTLS(tls_connection_creator, endpoint)
+        elif parsed_uri.scheme == b"http":
+            pass
+        else:
+            return defer.fail(
+                Failure(
+                    SchemeNotSupported("Unsupported scheme: %r" % (parsed_uri.scheme,))
+                )
+            )
+
+        return self._requestWithEndpoint(
+            pool_key, endpoint, method, parsed_uri, headers, bodyProducer, request_path
+        )
+
+
+def _http_proxy_endpoint(proxy, reactor, **kwargs):
+    """Parses an http proxy setting and returns an endpoint for the proxy
+
+    Args:
+        proxy (bytes|None):  the proxy setting
+        reactor: reactor to be used to connect to the proxy
+        kwargs: other args to be passed to HostnameEndpoint
+
+    Returns:
+        interfaces.IStreamClientEndpoint|None: endpoint to use to connect to the proxy,
+            or None
+    """
+    if proxy is None:
+        return None
+
+    # currently we only support hostname:port. Some apps also support
+    # protocol://<host>[:port], which allows a way of requiring a TLS connection to the
+    # proxy.
+
+    host, port = parse_host_port(proxy, default_port=1080)
+    return HostnameEndpoint(reactor, host, port, **kwargs)
+
+
+def parse_host_port(hostport, default_port=None):
+    # could have sworn we had one of these somewhere else...
+    if b":" in hostport:
+        host, port = hostport.rsplit(b":", 1)
+        try:
+            port = int(port)
+            return host, port
+        except ValueError:
+            # the thing after the : wasn't a valid port; presumably this is an
+            # IPv6 address.
+            pass
+
+    return hostport, default_port
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index 46af27c8f6..58f9cc61c8 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -170,7 +170,7 @@ class RequestMetrics(object):
             tag = context.tag
 
             if context != self.start_context:
-                logger.warn(
+                logger.warning(
                     "Context have unexpectedly changed %r, %r",
                     context,
                     self.start_context,
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2ccb210fd6..943d12c907 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -454,7 +454,7 @@ def respond_with_json(
     # the Deferred fires, but since the flag is RIGHT THERE it seems like
     # a waste.
     if request._disconnected:
-        logger.warn(
+        logger.warning(
             "Not sending response to request %s, already disconnected.", request
         )
         return
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 274c1a6a87..e9a5e46ced 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -219,13 +219,13 @@ def parse_json_value_from_request(request, allow_empty_body=False):
     try:
         content_unicode = content_bytes.decode("utf8")
     except UnicodeDecodeError:
-        logger.warn("Unable to decode UTF-8")
+        logger.warning("Unable to decode UTF-8")
         raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
 
     try:
         content = json.loads(content_unicode)
     except Exception as e:
-        logger.warn("Unable to parse JSON: %s", e)
+        logger.warning("Unable to parse JSON: %s", e)
         raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
 
     return content
diff --git a/synapse/http/site.py b/synapse/http/site.py
index df5274c177..ff8184a3d0 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -199,7 +199,7 @@ class SynapseRequest(Request):
         # It's useful to log it here so that we can get an idea of when
         # the client disconnects.
         with PreserveLoggingContext(self.logcontext):
-            logger.warn(
+            logger.warning(
                 "Error processing request %r: %s %s", self, reason.type, reason.value
             )
 
@@ -305,7 +305,7 @@ class SynapseRequest(Request):
         try:
             self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
         except Exception as e:
-            logger.warn("Failed to stop metrics: %r", e)
+            logger.warning("Failed to stop metrics: %r", e)
 
 
 class XForwardedForRequest(SynapseRequest):