diff options
author | Jason Little <realtyem@gmail.com> | 2023-05-09 13:25:20 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-09 14:25:20 -0400 |
commit | d3bd03559b14272dd68499ab7cff4b190858b285 (patch) | |
tree | ae8783b4d21c8cd3fc22f31c70c3d02484cf6a1c /synapse/http/client.py | |
parent | Add config option to prevent media downloads from listed domains. (#15197) (diff) | |
download | synapse-d3bd03559b14272dd68499ab7cff4b190858b285.tar.xz |
HTTP Replication Client (#15470)
Separate out a HTTP client for replication in preparation for also supporting using UNIX sockets. The major difference from the base class is that this does not use treq to handle HTTP requests.
Diffstat (limited to 'synapse/http/client.py')
-rw-r--r-- | synapse/http/client.py | 133 |
1 files changed, 132 insertions, 1 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py index 91fe474f36..c9479c81ff 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -74,8 +74,9 @@ from twisted.web.iweb import ( from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri from synapse.http.proxyagent import ProxyAgent +from synapse.http.replicationagent import ReplicationAgent from synapse.http.types import QueryParams -from synapse.logging.context import make_deferred_yieldable +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.types import ISynapseReactor from synapse.util import json_decoder @@ -819,6 +820,136 @@ class SimpleHttpClient(BaseHttpClient): ) +class ReplicationClient(BaseHttpClient): + """Client for connecting to replication endpoints via HTTP and HTTPS. + + Attributes: + agent: The custom Twisted Agent used for constructing the connection. + """ + + def __init__( + self, + hs: "HomeServer", + ): + """ + Args: + hs: The HomeServer instance to pass in + """ + super().__init__(hs) + + # Use a pool, but a very small one. + pool = HTTPConnectionPool(self.reactor) + pool.maxPersistentPerHost = 5 + pool.cachedConnectionTimeout = 2 * 60 + + self.agent: IAgent = ReplicationAgent( + hs.get_reactor(), + contextFactory=hs.get_http_client_context_factory(), + pool=pool, + ) + + async def request( + self, + method: str, + uri: str, + data: Optional[bytes] = None, + headers: Optional[Headers] = None, + ) -> IResponse: + """ + Make a request, differs from BaseHttpClient.request in that it does not use treq. + + Args: + method: HTTP method to use. + uri: URI to query. + data: Data to send in the request body, if applicable. + headers: Request headers. + + Returns: + Response object, once the headers have been read. + + Raises: + RequestTimedOutError if the request times out before the headers are read + + """ + outgoing_requests_counter.labels(method).inc() + + logger.debug("Sending request %s %s", method, uri) + + with start_active_span( + "outgoing-replication-request", + tags={ + tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, + tags.HTTP_METHOD: method, + tags.HTTP_URL: uri, + }, + finish_on_close=True, + ): + try: + body_producer = None + if data is not None: + body_producer = QuieterFileBodyProducer( + BytesIO(data), + cooperator=self._cooperator, + ) + + # Skip the fancy treq stuff, we don't need cookie handling, redirects, + # or buffered response bodies. + method_bytes = method.encode("ascii") + uri_bytes = uri.encode("ascii") + + # To preserve the logging context, the timeout is treated + # in a similar way to `defer.gatherResults`: + # * Each logging context-preserving fork is wrapped in + # `run_in_background`. In this case there is only one, + # since the timeout fork is not logging-context aware. + # * The `Deferred` that joins the forks back together is + # wrapped in `make_deferred_yieldable` to restore the + # logging context regardless of the path taken. + # (The logic/comments for this came from MatrixFederationHttpClient) + request_deferred = run_in_background( + self.agent.request, + method_bytes, + uri_bytes, + headers, + bodyProducer=body_producer, + ) + + # we use our own timeout mechanism rather than twisted's as a workaround + # for https://twistedmatrix.com/trac/ticket/9534. + # (Updated url https://github.com/twisted/twisted/issues/9534) + request_deferred = timeout_deferred( + request_deferred, + 60, + self.hs.get_reactor(), + ) + + # turn timeouts into RequestTimedOutErrors + request_deferred.addErrback(_timeout_to_request_timed_out_error) + + response = await make_deferred_yieldable(request_deferred) + + incoming_responses_counter.labels(method, response.code).inc() + logger.info( + "Received response to %s %s: %s", + method, + uri, + response.code, + ) + return response + except Exception as e: + incoming_responses_counter.labels(method, "ERR").inc() + logger.info( + "Error sending request to %s %s: %s %s", + method, + uri, + type(e).__name__, + e.args[0], + ) + set_tag(tags.ERROR, True) + set_tag("error_reason", e.args[0]) + raise + + def _timeout_to_request_timed_out_error(f: Failure) -> Failure: if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError): # The TCP connection has its own timeout (set by the 'connectTimeout' param |