summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/http/client.py133
-rw-r--r--synapse/http/replicationagent.py150
-rw-r--r--synapse/replication/http/_base.py2
-rw-r--r--synapse/server.py13
4 files changed, 295 insertions, 3 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 91fe474f36..c9479c81ff 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -74,8 +74,9 @@ from twisted.web.iweb import (
 from synapse.api.errors import Codes, HttpResponseException, SynapseError
 from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
 from synapse.http.proxyagent import ProxyAgent
+from synapse.http.replicationagent import ReplicationAgent
 from synapse.http.types import QueryParams
-from synapse.logging.context import make_deferred_yieldable
+from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import set_tag, start_active_span, tags
 from synapse.types import ISynapseReactor
 from synapse.util import json_decoder
@@ -819,6 +820,136 @@ class SimpleHttpClient(BaseHttpClient):
             )
 
 
+class ReplicationClient(BaseHttpClient):
+    """Client for connecting to replication endpoints via HTTP and HTTPS.
+
+    Attributes:
+        agent: The custom Twisted Agent used for constructing the connection.
+    """
+
+    def __init__(
+        self,
+        hs: "HomeServer",
+    ):
+        """
+        Args:
+            hs: The HomeServer instance to pass in
+        """
+        super().__init__(hs)
+
+        # Use a pool, but a very small one.
+        pool = HTTPConnectionPool(self.reactor)
+        pool.maxPersistentPerHost = 5
+        pool.cachedConnectionTimeout = 2 * 60
+
+        self.agent: IAgent = ReplicationAgent(
+            hs.get_reactor(),
+            contextFactory=hs.get_http_client_context_factory(),
+            pool=pool,
+        )
+
+    async def request(
+        self,
+        method: str,
+        uri: str,
+        data: Optional[bytes] = None,
+        headers: Optional[Headers] = None,
+    ) -> IResponse:
+        """
+        Make a request, differs from BaseHttpClient.request in that it does not use treq.
+
+        Args:
+            method: HTTP method to use.
+            uri: URI to query.
+            data: Data to send in the request body, if applicable.
+            headers: Request headers.
+
+        Returns:
+            Response object, once the headers have been read.
+
+        Raises:
+            RequestTimedOutError if the request times out before the headers are read
+
+        """
+        outgoing_requests_counter.labels(method).inc()
+
+        logger.debug("Sending request %s %s", method, uri)
+
+        with start_active_span(
+            "outgoing-replication-request",
+            tags={
+                tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
+                tags.HTTP_METHOD: method,
+                tags.HTTP_URL: uri,
+            },
+            finish_on_close=True,
+        ):
+            try:
+                body_producer = None
+                if data is not None:
+                    body_producer = QuieterFileBodyProducer(
+                        BytesIO(data),
+                        cooperator=self._cooperator,
+                    )
+
+                # Skip the fancy treq stuff, we don't need cookie handling, redirects,
+                # or buffered response bodies.
+                method_bytes = method.encode("ascii")
+                uri_bytes = uri.encode("ascii")
+
+                # To preserve the logging context, the timeout is treated
+                # in a similar way to `defer.gatherResults`:
+                # * Each logging context-preserving fork is wrapped in
+                #   `run_in_background`. In this case there is only one,
+                #   since the timeout fork is not logging-context aware.
+                # * The `Deferred` that joins the forks back together is
+                #   wrapped in `make_deferred_yieldable` to restore the
+                #   logging context regardless of the path taken.
+                # (The logic/comments for this came from MatrixFederationHttpClient)
+                request_deferred = run_in_background(
+                    self.agent.request,
+                    method_bytes,
+                    uri_bytes,
+                    headers,
+                    bodyProducer=body_producer,
+                )
+
+                # we use our own timeout mechanism rather than twisted's as a workaround
+                # for https://twistedmatrix.com/trac/ticket/9534.
+                # (Updated url https://github.com/twisted/twisted/issues/9534)
+                request_deferred = timeout_deferred(
+                    request_deferred,
+                    60,
+                    self.hs.get_reactor(),
+                )
+
+                # turn timeouts into RequestTimedOutErrors
+                request_deferred.addErrback(_timeout_to_request_timed_out_error)
+
+                response = await make_deferred_yieldable(request_deferred)
+
+                incoming_responses_counter.labels(method, response.code).inc()
+                logger.info(
+                    "Received response to %s %s: %s",
+                    method,
+                    uri,
+                    response.code,
+                )
+                return response
+            except Exception as e:
+                incoming_responses_counter.labels(method, "ERR").inc()
+                logger.info(
+                    "Error sending request to  %s %s: %s %s",
+                    method,
+                    uri,
+                    type(e).__name__,
+                    e.args[0],
+                )
+                set_tag(tags.ERROR, True)
+                set_tag("error_reason", e.args[0])
+                raise
+
+
 def _timeout_to_request_timed_out_error(f: Failure) -> Failure:
     if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
         # The TCP connection has its own timeout (set by the 'connectTimeout' param
diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py
new file mode 100644
index 0000000000..5ecd08be0f
--- /dev/null
+++ b/synapse/http/replicationagent.py
@@ -0,0 +1,150 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import Optional
+
+from zope.interface import implementer
+
+from twisted.internet import defer
+from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.internet.interfaces import IStreamClientEndpoint
+from twisted.python.failure import Failure
+from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
+from twisted.web.error import SchemeNotSupported
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import (
+    IAgent,
+    IAgentEndpointFactory,
+    IBodyProducer,
+    IPolicyForHTTPS,
+    IResponse,
+)
+
+from synapse.types import ISynapseReactor
+
+logger = logging.getLogger(__name__)
+
+
+@implementer(IAgentEndpointFactory)
+class ReplicationEndpointFactory:
+    """Connect to a given TCP socket"""
+
+    def __init__(
+        self,
+        reactor: ISynapseReactor,
+        context_factory: IPolicyForHTTPS,
+    ) -> None:
+        self.reactor = reactor
+        self.context_factory = context_factory
+
+    def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
+        """
+        This part of the factory decides what kind of endpoint is being connected to.
+
+        Args:
+            uri: The pre-parsed URI object containing all the uri data
+
+        Returns: The correct client endpoint object
+        """
+        if uri.scheme in (b"http", b"https"):
+            endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port)
+            if uri.scheme == b"https":
+                endpoint = wrapClientTLS(
+                    self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint
+                )
+            return endpoint
+        else:
+            raise SchemeNotSupported(f"Unsupported scheme: {uri.scheme!r}")
+
+
+@implementer(IAgent)
+class ReplicationAgent(_AgentBase):
+    """
+    Client for connecting to replication endpoints via HTTP and HTTPS.
+
+    Much of this code is copied from Twisted's twisted.web.client.Agent.
+    """
+
+    def __init__(
+        self,
+        reactor: ISynapseReactor,
+        contextFactory: IPolicyForHTTPS,
+        connectTimeout: Optional[float] = None,
+        bindAddress: Optional[bytes] = None,
+        pool: Optional[HTTPConnectionPool] = None,
+    ):
+        """
+        Create a ReplicationAgent.
+
+        Args:
+            reactor: A reactor for this Agent to place outgoing connections.
+            contextFactory: A factory for TLS contexts, to control the
+                verification parameters of OpenSSL.  The default is to use a
+                BrowserLikePolicyForHTTPS, so unless you have special
+                requirements you can leave this as-is.
+            connectTimeout: The amount of time that this Agent will wait
+                for the peer to accept a connection.
+            bindAddress: The local address for client sockets to bind to.
+            pool: An HTTPConnectionPool instance, or None, in which
+                case a non-persistent HTTPConnectionPool instance will be
+                created.
+        """
+        _AgentBase.__init__(self, reactor, pool)
+        endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory)
+        self._endpointFactory = endpoint_factory
+
+    def request(
+        self,
+        method: bytes,
+        uri: bytes,
+        headers: Optional[Headers] = None,
+        bodyProducer: Optional[IBodyProducer] = None,
+    ) -> "defer.Deferred[IResponse]":
+        """
+        Issue a request to the server indicated by the given uri.
+
+        An existing connection from the connection pool may be used or a new
+        one may be created.
+
+        Currently, HTTP and HTTPS schemes are supported in uri.
+
+        This is copied from twisted.web.client.Agent, except:
+
+        * It uses a different pool key (combining the host & port).
+        * It does not call _ensureValidURI(...) since it breaks on some
+          UNIX paths.
+
+        See: twisted.web.iweb.IAgent.request
+        """
+        parsedURI = URI.fromBytes(uri)
+        try:
+            endpoint = self._endpointFactory.endpointForURI(parsedURI)
+        except SchemeNotSupported:
+            return defer.fail(Failure())
+
+        # This sets the Pool key to be:
+        #  (http(s), <host:ip>)
+        key = (parsedURI.scheme, parsedURI.netloc)
+
+        # _requestWithEndpoint comes from _AgentBase class
+        return self._requestWithEndpoint(
+            key,
+            endpoint,
+            method,
+            parsedURI,
+            headers,
+            bodyProducer,
+            parsedURI.originForm,
+        )
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 8c2c54c07a..23129962e9 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -194,7 +194,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         the `instance_map` config).
         """
         clock = hs.get_clock()
-        client = hs.get_simple_http_client()
+        client = hs.get_replication_client()
         local_instance_name = hs.get_instance_name()
 
         # The value of these option should match the replication listener settings
diff --git a/synapse/server.py b/synapse/server.py
index fd29c28173..b307295789 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -107,7 +107,11 @@ from synapse.handlers.stats import StatsHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
 from synapse.handlers.user_directory import UserDirectoryHandler
-from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
+from synapse.http.client import (
+    InsecureInterceptableContextFactory,
+    ReplicationClient,
+    SimpleHttpClient,
+)
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.media.media_repository import MediaRepository
 from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
@@ -472,6 +476,13 @@ class HomeServer(metaclass=abc.ABCMeta):
         return MatrixFederationHttpClient(self, tls_client_options_factory)
 
     @cache_in_self
+    def get_replication_client(self) -> ReplicationClient:
+        """
+        An HTTP client for HTTP replication.
+        """
+        return ReplicationClient(self)
+
+    @cache_in_self
     def get_room_creation_handler(self) -> RoomCreationHandler:
         return RoomCreationHandler(self)