diff --git a/changelog.d/15470.misc b/changelog.d/15470.misc
new file mode 100644
index 0000000000..0af0b499c6
--- /dev/null
+++ b/changelog.d/15470.misc
@@ -0,0 +1 @@
+Create new `Client` for use with HTTP Replication between workers. Contributed by Jason Little.
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 91fe474f36..c9479c81ff 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -74,8 +74,9 @@ from twisted.web.iweb import (
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
from synapse.http.proxyagent import ProxyAgent
+from synapse.http.replicationagent import ReplicationAgent
from synapse.http.types import QueryParams
-from synapse.logging.context import make_deferred_yieldable
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.types import ISynapseReactor
from synapse.util import json_decoder
@@ -819,6 +820,136 @@ class SimpleHttpClient(BaseHttpClient):
)
+class ReplicationClient(BaseHttpClient):
+ """Client for connecting to replication endpoints via HTTP and HTTPS.
+
+ Attributes:
+ agent: The custom Twisted Agent used for constructing the connection.
+ """
+
+ def __init__(
+ self,
+ hs: "HomeServer",
+ ):
+ """
+ Args:
+ hs: The HomeServer instance to pass in
+ """
+ super().__init__(hs)
+
+ # Use a pool, but a very small one.
+ pool = HTTPConnectionPool(self.reactor)
+ pool.maxPersistentPerHost = 5
+ pool.cachedConnectionTimeout = 2 * 60
+
+ self.agent: IAgent = ReplicationAgent(
+ hs.get_reactor(),
+ contextFactory=hs.get_http_client_context_factory(),
+ pool=pool,
+ )
+
+ async def request(
+ self,
+ method: str,
+ uri: str,
+ data: Optional[bytes] = None,
+ headers: Optional[Headers] = None,
+ ) -> IResponse:
+ """
+ Make a request, differs from BaseHttpClient.request in that it does not use treq.
+
+ Args:
+ method: HTTP method to use.
+ uri: URI to query.
+ data: Data to send in the request body, if applicable.
+ headers: Request headers.
+
+ Returns:
+ Response object, once the headers have been read.
+
+ Raises:
+ RequestTimedOutError if the request times out before the headers are read
+
+ """
+ outgoing_requests_counter.labels(method).inc()
+
+ logger.debug("Sending request %s %s", method, uri)
+
+ with start_active_span(
+ "outgoing-replication-request",
+ tags={
+ tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
+ tags.HTTP_METHOD: method,
+ tags.HTTP_URL: uri,
+ },
+ finish_on_close=True,
+ ):
+ try:
+ body_producer = None
+ if data is not None:
+ body_producer = QuieterFileBodyProducer(
+ BytesIO(data),
+ cooperator=self._cooperator,
+ )
+
+ # Skip the fancy treq stuff, we don't need cookie handling, redirects,
+ # or buffered response bodies.
+ method_bytes = method.encode("ascii")
+ uri_bytes = uri.encode("ascii")
+
+ # To preserve the logging context, the timeout is treated
+ # in a similar way to `defer.gatherResults`:
+ # * Each logging context-preserving fork is wrapped in
+ # `run_in_background`. In this case there is only one,
+ # since the timeout fork is not logging-context aware.
+ # * The `Deferred` that joins the forks back together is
+ # wrapped in `make_deferred_yieldable` to restore the
+ # logging context regardless of the path taken.
+ # (The logic/comments for this came from MatrixFederationHttpClient)
+ request_deferred = run_in_background(
+ self.agent.request,
+ method_bytes,
+ uri_bytes,
+ headers,
+ bodyProducer=body_producer,
+ )
+
+ # we use our own timeout mechanism rather than twisted's as a workaround
+ # for https://twistedmatrix.com/trac/ticket/9534.
+ # (Updated url https://github.com/twisted/twisted/issues/9534)
+ request_deferred = timeout_deferred(
+ request_deferred,
+ 60,
+ self.hs.get_reactor(),
+ )
+
+ # turn timeouts into RequestTimedOutErrors
+ request_deferred.addErrback(_timeout_to_request_timed_out_error)
+
+ response = await make_deferred_yieldable(request_deferred)
+
+ incoming_responses_counter.labels(method, response.code).inc()
+ logger.info(
+ "Received response to %s %s: %s",
+ method,
+ uri,
+ response.code,
+ )
+ return response
+ except Exception as e:
+ incoming_responses_counter.labels(method, "ERR").inc()
+ logger.info(
+ "Error sending request to %s %s: %s %s",
+ method,
+ uri,
+ type(e).__name__,
+ e.args[0],
+ )
+ set_tag(tags.ERROR, True)
+ set_tag("error_reason", e.args[0])
+ raise
+
+
def _timeout_to_request_timed_out_error(f: Failure) -> Failure:
if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
# The TCP connection has its own timeout (set by the 'connectTimeout' param
diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py
new file mode 100644
index 0000000000..5ecd08be0f
--- /dev/null
+++ b/synapse/http/replicationagent.py
@@ -0,0 +1,150 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import Optional
+
+from zope.interface import implementer
+
+from twisted.internet import defer
+from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.internet.interfaces import IStreamClientEndpoint
+from twisted.python.failure import Failure
+from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
+from twisted.web.error import SchemeNotSupported
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import (
+ IAgent,
+ IAgentEndpointFactory,
+ IBodyProducer,
+ IPolicyForHTTPS,
+ IResponse,
+)
+
+from synapse.types import ISynapseReactor
+
+logger = logging.getLogger(__name__)
+
+
+@implementer(IAgentEndpointFactory)
+class ReplicationEndpointFactory:
+ """Connect to a given TCP socket"""
+
+ def __init__(
+ self,
+ reactor: ISynapseReactor,
+ context_factory: IPolicyForHTTPS,
+ ) -> None:
+ self.reactor = reactor
+ self.context_factory = context_factory
+
+ def endpointForURI(self, uri: URI) -> IStreamClientEndpoint:
+ """
+ This part of the factory decides what kind of endpoint is being connected to.
+
+ Args:
+ uri: The pre-parsed URI object containing all the uri data
+
+ Returns: The correct client endpoint object
+ """
+ if uri.scheme in (b"http", b"https"):
+ endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port)
+ if uri.scheme == b"https":
+ endpoint = wrapClientTLS(
+ self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint
+ )
+ return endpoint
+ else:
+ raise SchemeNotSupported(f"Unsupported scheme: {uri.scheme!r}")
+
+
+@implementer(IAgent)
+class ReplicationAgent(_AgentBase):
+ """
+ Client for connecting to replication endpoints via HTTP and HTTPS.
+
+ Much of this code is copied from Twisted's twisted.web.client.Agent.
+ """
+
+ def __init__(
+ self,
+ reactor: ISynapseReactor,
+ contextFactory: IPolicyForHTTPS,
+ connectTimeout: Optional[float] = None,
+ bindAddress: Optional[bytes] = None,
+ pool: Optional[HTTPConnectionPool] = None,
+ ):
+ """
+ Create a ReplicationAgent.
+
+ Args:
+ reactor: A reactor for this Agent to place outgoing connections.
+ contextFactory: A factory for TLS contexts, to control the
+ verification parameters of OpenSSL. The default is to use a
+ BrowserLikePolicyForHTTPS, so unless you have special
+ requirements you can leave this as-is.
+ connectTimeout: The amount of time that this Agent will wait
+ for the peer to accept a connection.
+ bindAddress: The local address for client sockets to bind to.
+ pool: An HTTPConnectionPool instance, or None, in which
+ case a non-persistent HTTPConnectionPool instance will be
+ created.
+ """
+ _AgentBase.__init__(self, reactor, pool)
+ endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory)
+ self._endpointFactory = endpoint_factory
+
+ def request(
+ self,
+ method: bytes,
+ uri: bytes,
+ headers: Optional[Headers] = None,
+ bodyProducer: Optional[IBodyProducer] = None,
+ ) -> "defer.Deferred[IResponse]":
+ """
+ Issue a request to the server indicated by the given uri.
+
+ An existing connection from the connection pool may be used or a new
+ one may be created.
+
+ Currently, HTTP and HTTPS schemes are supported in uri.
+
+ This is copied from twisted.web.client.Agent, except:
+
+ * It uses a different pool key (combining the host & port).
+ * It does not call _ensureValidURI(...) since it breaks on some
+ UNIX paths.
+
+ See: twisted.web.iweb.IAgent.request
+ """
+ parsedURI = URI.fromBytes(uri)
+ try:
+ endpoint = self._endpointFactory.endpointForURI(parsedURI)
+ except SchemeNotSupported:
+ return defer.fail(Failure())
+
+ # This sets the Pool key to be:
+ # (http(s), <host:ip>)
+ key = (parsedURI.scheme, parsedURI.netloc)
+
+ # _requestWithEndpoint comes from _AgentBase class
+ return self._requestWithEndpoint(
+ key,
+ endpoint,
+ method,
+ parsedURI,
+ headers,
+ bodyProducer,
+ parsedURI.originForm,
+ )
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 8c2c54c07a..23129962e9 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -194,7 +194,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
the `instance_map` config).
"""
clock = hs.get_clock()
- client = hs.get_simple_http_client()
+ client = hs.get_replication_client()
local_instance_name = hs.get_instance_name()
# The value of these option should match the replication listener settings
diff --git a/synapse/server.py b/synapse/server.py
index fd29c28173..b307295789 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -107,7 +107,11 @@ from synapse.handlers.stats import StatsHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
from synapse.handlers.user_directory import UserDirectoryHandler
-from synapse.http.client import InsecureInterceptableContextFactory, SimpleHttpClient
+from synapse.http.client import (
+ InsecureInterceptableContextFactory,
+ ReplicationClient,
+ SimpleHttpClient,
+)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
from synapse.media.media_repository import MediaRepository
from synapse.metrics.common_usage_metrics import CommonUsageMetricsManager
@@ -472,6 +476,13 @@ class HomeServer(metaclass=abc.ABCMeta):
return MatrixFederationHttpClient(self, tls_client_options_factory)
@cache_in_self
+ def get_replication_client(self) -> ReplicationClient:
+ """
+ An HTTP client for HTTP replication.
+ """
+ return ReplicationClient(self)
+
+ @cache_in_self
def get_room_creation_handler(self) -> RoomCreationHandler:
return RoomCreationHandler(self)
diff --git a/tests/test_state.py b/tests/test_state.py
index b20a26e1ff..2029d3d60a 100644
--- a/tests/test_state.py
+++ b/tests/test_state.py
@@ -228,6 +228,7 @@ class StateTestCase(unittest.TestCase):
"get_macaroon_generator",
"get_instance_name",
"get_simple_http_client",
+ "get_replication_client",
"hostname",
]
)
|