summary refs log tree commit diff
path: root/synapse/http/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/client.py')
-rw-r--r--synapse/http/client.py133
1 files changed, 132 insertions, 1 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 91fe474f36..c9479c81ff 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -74,8 +74,9 @@ from twisted.web.iweb import (
 from synapse.api.errors import Codes, HttpResponseException, SynapseError
 from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri
 from synapse.http.proxyagent import ProxyAgent
+from synapse.http.replicationagent import ReplicationAgent
 from synapse.http.types import QueryParams
-from synapse.logging.context import make_deferred_yieldable
+from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import set_tag, start_active_span, tags
 from synapse.types import ISynapseReactor
 from synapse.util import json_decoder
@@ -819,6 +820,136 @@ class SimpleHttpClient(BaseHttpClient):
             )
 
 
+class ReplicationClient(BaseHttpClient):
+    """Client for connecting to replication endpoints via HTTP and HTTPS.
+
+    Attributes:
+        agent: The custom Twisted Agent used for constructing the connection.
+    """
+
+    def __init__(
+        self,
+        hs: "HomeServer",
+    ):
+        """
+        Args:
+            hs: The HomeServer instance to pass in
+        """
+        super().__init__(hs)
+
+        # Use a pool, but a very small one.
+        pool = HTTPConnectionPool(self.reactor)
+        pool.maxPersistentPerHost = 5
+        pool.cachedConnectionTimeout = 2 * 60
+
+        self.agent: IAgent = ReplicationAgent(
+            hs.get_reactor(),
+            contextFactory=hs.get_http_client_context_factory(),
+            pool=pool,
+        )
+
+    async def request(
+        self,
+        method: str,
+        uri: str,
+        data: Optional[bytes] = None,
+        headers: Optional[Headers] = None,
+    ) -> IResponse:
+        """
+        Make a request, differs from BaseHttpClient.request in that it does not use treq.
+
+        Args:
+            method: HTTP method to use.
+            uri: URI to query.
+            data: Data to send in the request body, if applicable.
+            headers: Request headers.
+
+        Returns:
+            Response object, once the headers have been read.
+
+        Raises:
+            RequestTimedOutError if the request times out before the headers are read
+
+        """
+        outgoing_requests_counter.labels(method).inc()
+
+        logger.debug("Sending request %s %s", method, uri)
+
+        with start_active_span(
+            "outgoing-replication-request",
+            tags={
+                tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
+                tags.HTTP_METHOD: method,
+                tags.HTTP_URL: uri,
+            },
+            finish_on_close=True,
+        ):
+            try:
+                body_producer = None
+                if data is not None:
+                    body_producer = QuieterFileBodyProducer(
+                        BytesIO(data),
+                        cooperator=self._cooperator,
+                    )
+
+                # Skip the fancy treq stuff, we don't need cookie handling, redirects,
+                # or buffered response bodies.
+                method_bytes = method.encode("ascii")
+                uri_bytes = uri.encode("ascii")
+
+                # To preserve the logging context, the timeout is treated
+                # in a similar way to `defer.gatherResults`:
+                # * Each logging context-preserving fork is wrapped in
+                #   `run_in_background`. In this case there is only one,
+                #   since the timeout fork is not logging-context aware.
+                # * The `Deferred` that joins the forks back together is
+                #   wrapped in `make_deferred_yieldable` to restore the
+                #   logging context regardless of the path taken.
+                # (The logic/comments for this came from MatrixFederationHttpClient)
+                request_deferred = run_in_background(
+                    self.agent.request,
+                    method_bytes,
+                    uri_bytes,
+                    headers,
+                    bodyProducer=body_producer,
+                )
+
+                # we use our own timeout mechanism rather than twisted's as a workaround
+                # for https://twistedmatrix.com/trac/ticket/9534.
+                # (Updated url https://github.com/twisted/twisted/issues/9534)
+                request_deferred = timeout_deferred(
+                    request_deferred,
+                    60,
+                    self.hs.get_reactor(),
+                )
+
+                # turn timeouts into RequestTimedOutErrors
+                request_deferred.addErrback(_timeout_to_request_timed_out_error)
+
+                response = await make_deferred_yieldable(request_deferred)
+
+                incoming_responses_counter.labels(method, response.code).inc()
+                logger.info(
+                    "Received response to %s %s: %s",
+                    method,
+                    uri,
+                    response.code,
+                )
+                return response
+            except Exception as e:
+                incoming_responses_counter.labels(method, "ERR").inc()
+                logger.info(
+                    "Error sending request to  %s %s: %s %s",
+                    method,
+                    uri,
+                    type(e).__name__,
+                    e.args[0],
+                )
+                set_tag(tags.ERROR, True)
+                set_tag("error_reason", e.args[0])
+                raise
+
+
 def _timeout_to_request_timed_out_error(f: Failure) -> Failure:
     if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError):
         # The TCP connection has its own timeout (set by the 'connectTimeout' param