summary refs log tree commit diff
path: root/synapse/http/replicationagent.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/replicationagent.py')
-rw-r--r--synapse/http/replicationagent.py150
1 files changed, 150 insertions, 0 deletions
diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py
new file mode 100644
index 0000000000..5ecd08be0f
--- /dev/null
+++ b/synapse/http/replicationagent.py
@@ -0,0 +1,150 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import Optional
+
+from zope.interface import implementer
+
+from twisted.internet import defer
+from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.internet.interfaces import IStreamClientEndpoint
+from twisted.python.failure import Failure
+from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
+from twisted.web.error import SchemeNotSupported
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import (
+    IAgent,
+    IAgentEndpointFactory,
+    IBodyProducer,
+    IPolicyForHTTPS,
+    IResponse,
+)
+
+from synapse.types import ISynapseReactor
+
+logger = logging.getLogger(__name__)
+
+
+@implementer(IAgentEndpointFactory)
+class ReplicationEndpointFactory:
+    """Connect to a given TCP socket"""
+
+    def __init__(
+        self,
+        reactor: ISynapseReactor,
+        context_factory: IPolicyForHTTPS,
+    ) -> None:
+        self.reactor = reactor
+        self.context_factory = context_factory
+
+    def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
+        """
+        This part of the factory decides what kind of endpoint is being connected to.
+
+        Args:
+            uri: The pre-parsed URI object containing all the uri data
+
+        Returns: The correct client endpoint object
+        """
+        if uri.scheme in (b"http", b"https"):
+            endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port)
+            if uri.scheme == b"https":
+                endpoint = wrapClientTLS(
+                    self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint
+                )
+            return endpoint
+        else:
+            raise SchemeNotSupported(f"Unsupported scheme: {uri.scheme!r}")
+
+
+@implementer(IAgent)
+class ReplicationAgent(_AgentBase):
+    """
+    Client for connecting to replication endpoints via HTTP and HTTPS.
+
+    Much of this code is copied from Twisted's twisted.web.client.Agent.
+    """
+
+    def __init__(
+        self,
+        reactor: ISynapseReactor,
+        contextFactory: IPolicyForHTTPS,
+        connectTimeout: Optional[float] = None,
+        bindAddress: Optional[bytes] = None,
+        pool: Optional[HTTPConnectionPool] = None,
+    ):
+        """
+        Create a ReplicationAgent.
+
+        Args:
+            reactor: A reactor for this Agent to place outgoing connections.
+            contextFactory: A factory for TLS contexts, to control the
+                verification parameters of OpenSSL.  The default is to use a
+                BrowserLikePolicyForHTTPS, so unless you have special
+                requirements you can leave this as-is.
+            connectTimeout: The amount of time that this Agent will wait
+                for the peer to accept a connection.
+            bindAddress: The local address for client sockets to bind to.
+            pool: An HTTPConnectionPool instance, or None, in which
+                case a non-persistent HTTPConnectionPool instance will be
+                created.
+        """
+        _AgentBase.__init__(self, reactor, pool)
+        endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory)
+        self._endpointFactory = endpoint_factory
+
+    def request(
+        self,
+        method: bytes,
+        uri: bytes,
+        headers: Optional[Headers] = None,
+        bodyProducer: Optional[IBodyProducer] = None,
+    ) -> "defer.Deferred[IResponse]":
+        """
+        Issue a request to the server indicated by the given uri.
+
+        An existing connection from the connection pool may be used or a new
+        one may be created.
+
+        Currently, HTTP and HTTPS schemes are supported in uri.
+
+        This is copied from twisted.web.client.Agent, except:
+
+        * It uses a different pool key (combining the host & port).
+        * It does not call _ensureValidURI(...) since it breaks on some
+          UNIX paths.
+
+        See: twisted.web.iweb.IAgent.request
+        """
+        parsedURI = URI.fromBytes(uri)
+        try:
+            endpoint = self._endpointFactory.endpointForURI(parsedURI)
+        except SchemeNotSupported:
+            return defer.fail(Failure())
+
+        # This sets the Pool key to be:
+        #  (http(s), <host:ip>)
+        key = (parsedURI.scheme, parsedURI.netloc)
+
+        # _requestWithEndpoint comes from _AgentBase class
+        return self._requestWithEndpoint(
+            key,
+            endpoint,
+            method,
+            parsedURI,
+            headers,
+            bodyProducer,
+            parsedURI.originForm,
+        )