diff options
Diffstat (limited to 'synapse/http')
-rw-r--r-- | synapse/http/client.py | 133 | ||||
-rw-r--r-- | synapse/http/replicationagent.py | 150 |
2 files changed, 282 insertions, 1 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py index 91fe474f36..c9479c81ff 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -74,8 +74,9 @@ from twisted.web.iweb import ( from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri from synapse.http.proxyagent import ProxyAgent +from synapse.http.replicationagent import ReplicationAgent from synapse.http.types import QueryParams -from synapse.logging.context import make_deferred_yieldable +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.types import ISynapseReactor from synapse.util import json_decoder @@ -819,6 +820,136 @@ class SimpleHttpClient(BaseHttpClient): ) +class ReplicationClient(BaseHttpClient): + """Client for connecting to replication endpoints via HTTP and HTTPS. + + Attributes: + agent: The custom Twisted Agent used for constructing the connection. + """ + + def __init__( + self, + hs: "HomeServer", + ): + """ + Args: + hs: The HomeServer instance to pass in + """ + super().__init__(hs) + + # Use a pool, but a very small one. + pool = HTTPConnectionPool(self.reactor) + pool.maxPersistentPerHost = 5 + pool.cachedConnectionTimeout = 2 * 60 + + self.agent: IAgent = ReplicationAgent( + hs.get_reactor(), + contextFactory=hs.get_http_client_context_factory(), + pool=pool, + ) + + async def request( + self, + method: str, + uri: str, + data: Optional[bytes] = None, + headers: Optional[Headers] = None, + ) -> IResponse: + """ + Make a request, differs from BaseHttpClient.request in that it does not use treq. + + Args: + method: HTTP method to use. + uri: URI to query. + data: Data to send in the request body, if applicable. + headers: Request headers. + + Returns: + Response object, once the headers have been read. + + Raises: + RequestTimedOutError if the request times out before the headers are read + + """ + outgoing_requests_counter.labels(method).inc() + + logger.debug("Sending request %s %s", method, uri) + + with start_active_span( + "outgoing-replication-request", + tags={ + tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT, + tags.HTTP_METHOD: method, + tags.HTTP_URL: uri, + }, + finish_on_close=True, + ): + try: + body_producer = None + if data is not None: + body_producer = QuieterFileBodyProducer( + BytesIO(data), + cooperator=self._cooperator, + ) + + # Skip the fancy treq stuff, we don't need cookie handling, redirects, + # or buffered response bodies. + method_bytes = method.encode("ascii") + uri_bytes = uri.encode("ascii") + + # To preserve the logging context, the timeout is treated + # in a similar way to `defer.gatherResults`: + # * Each logging context-preserving fork is wrapped in + # `run_in_background`. In this case there is only one, + # since the timeout fork is not logging-context aware. + # * The `Deferred` that joins the forks back together is + # wrapped in `make_deferred_yieldable` to restore the + # logging context regardless of the path taken. + # (The logic/comments for this came from MatrixFederationHttpClient) + request_deferred = run_in_background( + self.agent.request, + method_bytes, + uri_bytes, + headers, + bodyProducer=body_producer, + ) + + # we use our own timeout mechanism rather than twisted's as a workaround + # for https://twistedmatrix.com/trac/ticket/9534. + # (Updated url https://github.com/twisted/twisted/issues/9534) + request_deferred = timeout_deferred( + request_deferred, + 60, + self.hs.get_reactor(), + ) + + # turn timeouts into RequestTimedOutErrors + request_deferred.addErrback(_timeout_to_request_timed_out_error) + + response = await make_deferred_yieldable(request_deferred) + + incoming_responses_counter.labels(method, response.code).inc() + logger.info( + "Received response to %s %s: %s", + method, + uri, + response.code, + ) + return response + except Exception as e: + incoming_responses_counter.labels(method, "ERR").inc() + logger.info( + "Error sending request to %s %s: %s %s", + method, + uri, + type(e).__name__, + e.args[0], + ) + set_tag(tags.ERROR, True) + set_tag("error_reason", e.args[0]) + raise + + def _timeout_to_request_timed_out_error(f: Failure) -> Failure: if f.check(twisted_error.TimeoutError, twisted_error.ConnectingCancelledError): # The TCP connection has its own timeout (set by the 'connectTimeout' param diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py new file mode 100644 index 0000000000..5ecd08be0f --- /dev/null +++ b/synapse/http/replicationagent.py @@ -0,0 +1,150 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import Optional + +from zope.interface import implementer + +from twisted.internet import defer +from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS +from twisted.internet.interfaces import IStreamClientEndpoint +from twisted.python.failure import Failure +from twisted.web.client import URI, HTTPConnectionPool, _AgentBase +from twisted.web.error import SchemeNotSupported +from twisted.web.http_headers import Headers +from twisted.web.iweb import ( + IAgent, + IAgentEndpointFactory, + IBodyProducer, + IPolicyForHTTPS, + IResponse, +) + +from synapse.types import ISynapseReactor + +logger = logging.getLogger(__name__) + + +@implementer(IAgentEndpointFactory) +class ReplicationEndpointFactory: + """Connect to a given TCP socket""" + + def __init__( + self, + reactor: ISynapseReactor, + context_factory: IPolicyForHTTPS, + ) -> None: + self.reactor = reactor + self.context_factory = context_factory + + def endpointForURI(self, uri: URI) -> IStreamClientEndpoint: + """ + This part of the factory decides what kind of endpoint is being connected to. + + Args: + uri: The pre-parsed URI object containing all the uri data + + Returns: The correct client endpoint object + """ + if uri.scheme in (b"http", b"https"): + endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port) + if uri.scheme == b"https": + endpoint = wrapClientTLS( + self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint + ) + return endpoint + else: + raise SchemeNotSupported(f"Unsupported scheme: {uri.scheme!r}") + + +@implementer(IAgent) +class ReplicationAgent(_AgentBase): + """ + Client for connecting to replication endpoints via HTTP and HTTPS. + + Much of this code is copied from Twisted's twisted.web.client.Agent. + """ + + def __init__( + self, + reactor: ISynapseReactor, + contextFactory: IPolicyForHTTPS, + connectTimeout: Optional[float] = None, + bindAddress: Optional[bytes] = None, + pool: Optional[HTTPConnectionPool] = None, + ): + """ + Create a ReplicationAgent. + + Args: + reactor: A reactor for this Agent to place outgoing connections. + contextFactory: A factory for TLS contexts, to control the + verification parameters of OpenSSL. The default is to use a + BrowserLikePolicyForHTTPS, so unless you have special + requirements you can leave this as-is. + connectTimeout: The amount of time that this Agent will wait + for the peer to accept a connection. + bindAddress: The local address for client sockets to bind to. + pool: An HTTPConnectionPool instance, or None, in which + case a non-persistent HTTPConnectionPool instance will be + created. + """ + _AgentBase.__init__(self, reactor, pool) + endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory) + self._endpointFactory = endpoint_factory + + def request( + self, + method: bytes, + uri: bytes, + headers: Optional[Headers] = None, + bodyProducer: Optional[IBodyProducer] = None, + ) -> "defer.Deferred[IResponse]": + """ + Issue a request to the server indicated by the given uri. + + An existing connection from the connection pool may be used or a new + one may be created. + + Currently, HTTP and HTTPS schemes are supported in uri. + + This is copied from twisted.web.client.Agent, except: + + * It uses a different pool key (combining the host & port). + * It does not call _ensureValidURI(...) since it breaks on some + UNIX paths. + + See: twisted.web.iweb.IAgent.request + """ + parsedURI = URI.fromBytes(uri) + try: + endpoint = self._endpointFactory.endpointForURI(parsedURI) + except SchemeNotSupported: + return defer.fail(Failure()) + + # This sets the Pool key to be: + # (http(s), <host:ip>) + key = (parsedURI.scheme, parsedURI.netloc) + + # _requestWithEndpoint comes from _AgentBase class + return self._requestWithEndpoint( + key, + endpoint, + method, + parsedURI, + headers, + bodyProducer, + parsedURI.originForm, + ) |