diff --git a/synapse/http/client.py b/synapse/http/client.py
index 91fe474f36..c9479c81ff 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -74,8 +74,9 @@ from twisted.web.iweb import (
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
from synapse.http.proxyagent import ProxyAgent
+from synapse.http.replicationagent import ReplicationAgent
from synapse.http.types import QueryParams
-from synapse.logging.context import make_deferred_yieldable
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.types import ISynapseReactor
from synapse.util import json_decoder
@@ -819,6 +820,136 @@ class SimpleHttpClient(BaseHttpClient):
)
+class ReplicationClient(BaseHttpClient):
+ """Client for connecting to replication endpoints via HTTP and HTTPS.
+
+ Attributes:
+ agent: The custom Twisted Agent used for constructing the connection.
+ """
+
+ def __init__(
+ self,
+ hs: "HomeServer",
+ ):
+ """
+ Args:
+ hs: The HomeServer instance to pass in
+ """
+ super().__init__(hs)
+
+ # Use a pool, but a very small one.
+ pool = HTTPConnectionPool(self.reactor)
+ pool.maxPersistentPerHost = 5
+ pool.cachedConnectionTimeout = 2 * 60
+
+ self.agent: IAgent = ReplicationAgent(
+ hs.get_reactor(),
+ contextFactory=hs.get_http_client_context_factory(),
+ pool=pool,
+ )
+
+ async def request(
+ self,
+ method: str,
+ uri: str,
+ data: Optional[bytes] = None,
+ headers: Optional[Headers] = None,
+ ) -> IResponse:
+ """
+ Make a request, differs from BaseHttpClient.request in that it does not use treq.
+
+ Args:
+ method: HTTP method to use.
+ uri: URI to query.
+ data: Data to send in the request body, if applicable.
+ headers: Request headers.
+
+ Returns:
+ Response object, once the headers have been read.
+
+ Raises:
+ RequestTimedOutError if the request times out before the headers are read
+
+ """
+ outgoing_requests_counter.labels(method).inc()
+
+ logger.debug("Sending request %s %s", method, uri)
+
+ with start_active_span(
+ "outgoing-replication-request",
+ tags={
+ tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
+ tags.HTTP_METHOD: method,
+ tags.HTTP_URL: uri,
+ },
+ finish_on_close=True,
+ ):
+ try:
+ body_producer = None
+ if data is not None:
+ body_producer = QuieterFileBodyProducer(
+ BytesIO(data),
+ cooperator=self._cooperator,
+ )
+
+ # Skip the fancy treq stuff, we don't need cookie handling, redirects,
+ # or buffered response bodies.
+ method_bytes = method.encode("ascii")
+ uri_bytes = uri.encode("ascii")
+
+ # To preserve the logging context, the timeout is treated
+ # in a similar way to `defer.gatherResults`:
+ # * Each logging context-preserving fork is wrapped in
+ # `run_in_background`. In this case there is only one,
+ # since the timeout fork is not logging-context aware.
+ # * The `Deferred` that joins the forks back together is
+ # wrapped in `make_deferred_yieldable` to restore the
+ # logging context regardless of the path taken.
+ # (The logic/comments for this came from MatrixFederationHttpClient)
+ request_deferred = run_in_background(
+ self.agent.request,
+ method_bytes,
+ uri_bytes,
+ headers,
+ bodyProducer=body_producer,
+ )
+
+ # we use our own timeout mechanism rather than twisted's as a workaround
+ # for https://twistedmatrix.com/trac/ticket/9534.
+ # (Updated url https://github.com/twisted/twisted/issues/9534)
+ request_deferred = timeout_deferred(
+ request_deferred,
+ 60,
+ self.hs.get_reactor(),
+ )
+
+ # turn timeouts into RequestTimedOutErrors
+ request_deferred.addErrback(_timeout_to_request_timed_out_error)
+
+ response = await make_deferred_yieldable(request_deferred)
+
+ incoming_responses_counter.labels(method, response.code).inc()
+ logger.info(
+ "Received response to %s %s: %s",
+ method,
+ uri,
+ response.code,
+ )
+ return response
+ except Exception as e:
+ incoming_responses_counter.labels(method, "ERR").inc()
+ logger.info(
+ "Error sending request to %s %s: %s %s",
+ method,
+ uri,
+ type(e).__name__,
+ e.args[0],
+ )
+ set_tag(tags.ERROR, True)
+ set_tag("error_reason", e.args[0])
+ raise
+
+
def _timeout_to_request_timed_out_error(f: Failure) -> Failure:
if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
# The TCP connection has its own timeout (set by the 'connectTimeout' param
|