From c9bf644fa0c2c06f8143b14ccdb655feebed97df Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Mon, 10 Jul 2023 11:10:20 -0500 Subject: Revert "Federation outbound proxy" (#15910) Revert "Federation outbound proxy (#15773)" This reverts commit b07b14b494ae1dd564b4c44f844c9a9545b3d08a. --- synapse/app/_base.py | 2 - synapse/app/generic_worker.py | 1 - synapse/app/homeserver.py | 1 - synapse/config/workers.py | 40 +----- synapse/http/client.py | 7 +- synapse/http/matrixfederationclient.py | 132 ++--------------- synapse/http/proxy.py | 249 --------------------------------- synapse/http/proxyagent.py | 79 +---------- synapse/http/server.py | 55 ++++---- synapse/http/site.py | 26 ++-- 10 files changed, 46 insertions(+), 546 deletions(-) delete mode 100644 synapse/http/proxy.py (limited to 'synapse') diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 938ab40f27..936b1b0430 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -386,7 +386,6 @@ def listen_unix( def listen_http( - hs: "HomeServer", listener_config: ListenerConfig, root_resource: Resource, version_string: str, @@ -407,7 +406,6 @@ def listen_http( version_string, max_request_body_size=max_request_body_size, reactor=reactor, - federation_agent=hs.get_federation_http_client().agent, ) if isinstance(listener_config, TCPListenerConfig): diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index dc79efcc14..7406c3948c 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -221,7 +221,6 @@ class GenericWorkerServer(HomeServer): root_resource = create_resource_tree(resources, OptionsResource()) _base.listen_http( - self, listener_config, root_resource, self.version_string, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index f188c7265a..84236ac299 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -139,7 +139,6 @@ class SynapseHomeServer(HomeServer): root_resource = OptionsResource() ports = listen_http( - self, listener_config, create_resource_tree(resources, root_resource), self.version_string, diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 5c81eb5c67..ccfe75eaf3 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -15,7 +15,7 @@ import argparse import logging -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Union import attr from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr @@ -154,27 +154,6 @@ class WriterLocations: ) -@attr.s(auto_attribs=True) -class OutboundFederationRestrictedTo: - """Whether we limit outbound federation to a certain set of instances. - - Attributes: - instances: optional list of instances that can make outbound federation - requests. If None then all instances can make federation requests. - locations: list of instance locations to connect to proxy via. - """ - - instances: Optional[List[str]] - locations: List[InstanceLocationConfig] = attr.Factory(list) - - def __contains__(self, instance: str) -> bool: - # It feels a bit dirty to return `True` if `instances` is `None`, but it makes - # sense in downstream usage in the sense that if - # `outbound_federation_restricted_to` is not configured, then any instance can - # talk to federation (no restrictions so always return `True`). - return self.instances is None or instance in self.instances - - class WorkerConfig(Config): """The workers are processes run separately to the main synapse process. They have their own pid_file and listener configuration. They use the @@ -386,23 +365,6 @@ class WorkerConfig(Config): new_option_name="update_user_directory_from_worker", ) - outbound_federation_restricted_to = config.get( - "outbound_federation_restricted_to", None - ) - self.outbound_federation_restricted_to = OutboundFederationRestrictedTo( - outbound_federation_restricted_to - ) - if outbound_federation_restricted_to: - for instance in outbound_federation_restricted_to: - if instance not in self.instance_map: - raise ConfigError( - "Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config." - % (instance,) - ) - self.outbound_federation_restricted_to.locations.append( - self.instance_map[instance] - ) - def _should_this_worker_perform_duty( self, config: Dict[str, Any], diff --git a/synapse/http/client.py b/synapse/http/client.py index ca2cdbc6e2..09ea93e10d 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -1037,12 +1037,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): if reason.check(ResponseDone): self.deferred.callback(self.length) elif reason.check(PotentialDataLoss): - # This applies to requests which don't set `Content-Length` or a - # `Transfer-Encoding` in the response because in this case the end of the - # response is indicated by the connection being closed, an event which may - # also be due to a transient network problem or other error. But since this - # behavior is expected of some servers (like YouTube), let's ignore it. - # Stolen from https://github.com/twisted/treq/pull/49/files + # stolen from https://github.com/twisted/treq/pull/49/files # http://twistedmatrix.com/trac/ticket/4840 self.deferred.callback(self.length) else: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index b00396fdc7..cc4e258b0f 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime from twisted.internet.task import Cooperator from twisted.web.client import ResponseFailed from twisted.web.http_headers import Headers -from twisted.web.iweb import IAgent, IBodyProducer, IResponse +from twisted.web.iweb import IBodyProducer, IResponse import synapse.metrics import synapse.util.retryutils @@ -72,7 +72,6 @@ from synapse.http.client import ( read_body_with_max_size, ) from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent -from synapse.http.proxyagent import ProxyAgent from synapse.http.types import QueryParams from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -394,32 +393,17 @@ class MatrixFederationHttpClient: if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - outbound_federation_restricted_to = ( - hs.config.worker.outbound_federation_restricted_to + federation_agent = MatrixFederationAgent( + self.reactor, + tls_client_options_factory, + user_agent.encode("ascii"), + hs.config.server.federation_ip_range_allowlist, + hs.config.server.federation_ip_range_blocklist, ) - if hs.get_instance_name() in outbound_federation_restricted_to: - # Talk to federation directly - federation_agent: IAgent = MatrixFederationAgent( - self.reactor, - tls_client_options_factory, - user_agent.encode("ascii"), - hs.config.server.federation_ip_range_allowlist, - hs.config.server.federation_ip_range_blocklist, - ) - else: - # We need to talk to federation via the proxy via one of the configured - # locations - federation_proxies = outbound_federation_restricted_to.locations - federation_agent = ProxyAgent( - self.reactor, - self.reactor, - tls_client_options_factory, - federation_proxies=federation_proxies, - ) # Use a BlocklistingAgentWrapper to prevent circumventing the IP # blocking via IP literals in server names - self.agent: IAgent = BlocklistingAgentWrapper( + self.agent = BlocklistingAgentWrapper( federation_agent, ip_blocklist=hs.config.server.federation_ip_range_blocklist, ) @@ -428,6 +412,7 @@ class MatrixFederationHttpClient: self._store = hs.get_datastores().main self.version_string_bytes = hs.version_string.encode("ascii") self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000 + self.max_long_retry_delay_seconds = ( hs.config.federation.max_long_retry_delay_ms / 1000 ) @@ -1146,101 +1131,6 @@ class MatrixFederationHttpClient: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Raises: - HttpResponseException: If we get an HTTP response code >= 300 - (except 429). - NotRetryingDestination: If we are not yet ready to retry this - server. - FederationDeniedError: If this destination is not on our - federation whitelist - RequestSendFailed: If there were problems connecting to the - remote, due to e.g. DNS failures, connection timeouts etc. - """ - json_dict, _ = await self.get_json_with_headers( - destination=destination, - path=path, - args=args, - retry_on_dns_fail=retry_on_dns_fail, - timeout=timeout, - ignore_backoff=ignore_backoff, - try_trailing_slash_on_400=try_trailing_slash_on_400, - parser=parser, - ) - return json_dict - - @overload - async def get_json_with_headers( - self, - destination: str, - path: str, - args: Optional[QueryParams] = None, - retry_on_dns_fail: bool = True, - timeout: Optional[int] = None, - ignore_backoff: bool = False, - try_trailing_slash_on_400: bool = False, - parser: Literal[None] = None, - ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]: - ... - - @overload - async def get_json_with_headers( - self, - destination: str, - path: str, - args: Optional[QueryParams] = ..., - retry_on_dns_fail: bool = ..., - timeout: Optional[int] = ..., - ignore_backoff: bool = ..., - try_trailing_slash_on_400: bool = ..., - parser: ByteParser[T] = ..., - ) -> Tuple[T, Dict[bytes, List[bytes]]]: - ... - - async def get_json_with_headers( - self, - destination: str, - path: str, - args: Optional[QueryParams] = None, - retry_on_dns_fail: bool = True, - timeout: Optional[int] = None, - ignore_backoff: bool = False, - try_trailing_slash_on_400: bool = False, - parser: Optional[ByteParser[T]] = None, - ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]: - """GETs some json from the given host homeserver and path - - Args: - destination: The remote server to send the HTTP request to. - - path: The HTTP path. - - args: A dictionary used to create query strings, defaults to - None. - - retry_on_dns_fail: true if the request should be retried on DNS failures - - timeout: number of milliseconds to wait for the response. - self._default_timeout (60s) by default. - - Note that we may make several attempts to send the request; this - timeout applies to the time spent waiting for response headers for - *each* attempt (including connection time) as well as the time spent - reading the response body after a 200 response. - - ignore_backoff: true to ignore the historical backoff data - and try the request anyway. - - try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED - response we should try appending a trailing slash to the end of - the request. Workaround for #3622 in Synapse <= v0.99.3. - - parser: The parser to use to decode the response. Defaults to - parsing as JSON. - - Returns: - Succeeds when we get a 2xx HTTP response. The result will be a tuple of the - decoded JSON body and a dict of the response headers. - Raises: HttpResponseException: If we get an HTTP response code >= 300 (except 429). @@ -1266,8 +1156,6 @@ class MatrixFederationHttpClient: timeout=timeout, ) - headers = dict(response.headers.getAllRawHeaders()) - if timeout is not None: _sec_timeout = timeout / 1000 else: @@ -1285,7 +1173,7 @@ class MatrixFederationHttpClient: parser=parser, ) - return body, headers + return body async def delete_json( self, diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py deleted file mode 100644 index 0874d67760..0000000000 --- a/synapse/http/proxy.py +++ /dev/null @@ -1,249 +0,0 @@ -# Copyright 2023 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import json -import logging -import urllib.parse -from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast - -from twisted.internet import protocol -from twisted.internet.interfaces import ITCPTransport -from twisted.internet.protocol import connectionDone -from twisted.python import failure -from twisted.python.failure import Failure -from twisted.web.client import ResponseDone -from twisted.web.http_headers import Headers -from twisted.web.iweb import IAgent, IResponse -from twisted.web.resource import IResource -from twisted.web.server import Site - -from synapse.api.errors import Codes -from synapse.http import QuieterFileBodyProducer -from synapse.http.server import _AsyncResource -from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.types import ISynapseReactor -from synapse.util.async_helpers import timeout_deferred - -if TYPE_CHECKING: - from synapse.http.site import SynapseRequest - -logger = logging.getLogger(__name__) - -# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616 -# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be -# consumed by the immediate recipient and not be forwarded on. -HOP_BY_HOP_HEADERS = { - "Connection", - "Keep-Alive", - "Proxy-Authenticate", - "Proxy-Authorization", - "TE", - "Trailers", - "Transfer-Encoding", - "Upgrade", -} - - -def parse_connection_header_value( - connection_header_value: Optional[bytes], -) -> Set[str]: - """ - Parse the `Connection` header to determine which headers we should not be copied - over from the remote response. - - As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1 - - Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}` - - Even though "close" is a special directive, let's just treat it as just another - header for simplicity. If people want to check for this directive, they can simply - check for `"Close" in headers`. - - Args: - connection_header_value: The value of the `Connection` header. - - Returns: - The set of header names that should not be copied over from the remote response. - The keys are capitalized in canonical capitalization. - """ - headers = Headers() - extra_headers_to_remove: Set[str] = set() - if connection_header_value: - extra_headers_to_remove = { - headers._canonicalNameCaps(connection_option.strip()).decode("ascii") - for connection_option in connection_header_value.split(b",") - } - - return extra_headers_to_remove - - -class ProxyResource(_AsyncResource): - """ - A stub resource that proxies any requests with a `matrix-federation://` scheme - through the given `federation_agent` to the remote homeserver and ferries back the - info. - """ - - isLeaf = True - - def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent): - super().__init__(True) - - self.reactor = reactor - self.agent = federation_agent - - async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: - uri = urllib.parse.urlparse(request.uri) - assert uri.scheme == b"matrix-federation" - - headers = Headers() - for header_name in (b"User-Agent", b"Authorization", b"Content-Type"): - header_value = request.getHeader(header_name) - if header_value: - headers.addRawHeader(header_name, header_value) - - request_deferred = run_in_background( - self.agent.request, - request.method, - request.uri, - headers=headers, - bodyProducer=QuieterFileBodyProducer(request.content), - ) - request_deferred = timeout_deferred( - request_deferred, - # This should be set longer than the timeout in `MatrixFederationHttpClient` - # so that it has enough time to complete and pass us the data before we give - # up. - timeout=90, - reactor=self.reactor, - ) - - response = await make_deferred_yieldable(request_deferred) - - return response.code, response - - def _send_response( - self, - request: "SynapseRequest", - code: int, - response_object: Any, - ) -> None: - response = cast(IResponse, response_object) - response_headers = cast(Headers, response.headers) - - request.setResponseCode(code) - - # The `Connection` header also defines which headers should not be copied over. - connection_header = response_headers.getRawHeaders(b"connection") - extra_headers_to_remove = parse_connection_header_value( - connection_header[0] if connection_header else None - ) - - # Copy headers. - for k, v in response_headers.getAllRawHeaders(): - # Do not copy over any hop-by-hop headers. These are meant to only be - # consumed by the immediate recipient and not be forwarded on. - header_key = k.decode("ascii") - if ( - header_key in HOP_BY_HOP_HEADERS - or header_key in extra_headers_to_remove - ): - continue - - request.responseHeaders.setRawHeaders(k, v) - - response.deliverBody(_ProxyResponseBody(request)) - - def _send_error_response( - self, - f: failure.Failure, - request: "SynapseRequest", - ) -> None: - request.setResponseCode(502) - request.setHeader(b"Content-Type", b"application/json") - request.write( - ( - json.dumps( - { - "errcode": Codes.UNKNOWN, - "err": "ProxyResource: Error when proxying request: %s %s -> %s" - % ( - request.method.decode("ascii"), - request.uri.decode("ascii"), - f, - ), - } - ) - ).encode() - ) - request.finish() - - -class _ProxyResponseBody(protocol.Protocol): - """ - A protocol that proxies the given remote response data back out to the given local - request. - """ - - transport: Optional[ITCPTransport] = None - - def __init__(self, request: "SynapseRequest") -> None: - self._request = request - - def dataReceived(self, data: bytes) -> None: - # Avoid sending response data to the local request that already disconnected - if self._request._disconnected and self.transport is not None: - # Close the connection (forcefully) since all the data will get - # discarded anyway. - self.transport.abortConnection() - return - - self._request.write(data) - - def connectionLost(self, reason: Failure = connectionDone) -> None: - # If the local request is already finished (successfully or failed), don't - # worry about sending anything back. - if self._request.finished: - return - - if reason.check(ResponseDone): - self._request.finish() - else: - # Abort the underlying request since our remote request also failed. - self._request.transport.abortConnection() - - -class ProxySite(Site): - """ - Proxies any requests with a `matrix-federation://` scheme through the given - `federation_agent`. Otherwise, behaves like a normal `Site`. - """ - - def __init__( - self, - resource: IResource, - reactor: ISynapseReactor, - federation_agent: IAgent, - ): - super().__init__(resource, reactor=reactor) - - self._proxy_resource = ProxyResource(reactor, federation_agent) - - def getResourceFor(self, request: "SynapseRequest") -> IResource: - uri = urllib.parse.urlparse(request.uri) - if uri.scheme == b"matrix-federation": - return self._proxy_resource - - return super().getResourceFor(request) diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py index 1fa3adbef2..7bdc4acae7 100644 --- a/synapse/http/proxyagent.py +++ b/synapse/http/proxyagent.py @@ -12,9 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import random import re -from typing import Any, Collection, Dict, List, Optional, Sequence, Tuple +from typing import Any, Dict, Optional, Tuple from urllib.parse import urlparse from urllib.request import ( # type: ignore[attr-defined] getproxies_environment, @@ -25,12 +24,7 @@ from zope.interface import implementer from twisted.internet import defer from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS -from twisted.internet.interfaces import ( - IProtocol, - IProtocolFactory, - IReactorCore, - IStreamClientEndpoint, -) +from twisted.internet.interfaces import IReactorCore, IStreamClientEndpoint from twisted.python.failure import Failure from twisted.web.client import ( URI, @@ -42,10 +36,8 @@ from twisted.web.error import SchemeNotSupported from twisted.web.http_headers import Headers from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse -from synapse.config.workers import InstanceLocationConfig from synapse.http import redact_uri from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials -from synapse.logging.context import run_in_background logger = logging.getLogger(__name__) @@ -82,10 +74,6 @@ class ProxyAgent(_AgentBase): use_proxy: Whether proxy settings should be discovered and used from conventional environment variables. - federation_proxies: An optional list of locations to proxy outbound federation - traffic through (only requests that use the `matrix-federation://` scheme - will be proxied). - Raises: ValueError if use_proxy is set and the environment variables contain an invalid proxy specification. @@ -101,7 +89,6 @@ class ProxyAgent(_AgentBase): bindAddress: Optional[bytes] = None, pool: Optional[HTTPConnectionPool] = None, use_proxy: bool = False, - federation_proxies: Collection[InstanceLocationConfig] = (), ): contextFactory = contextFactory or BrowserLikePolicyForHTTPS() @@ -140,27 +127,6 @@ class ProxyAgent(_AgentBase): self._policy_for_https = contextFactory self._reactor = reactor - self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None - if federation_proxies: - endpoints = [] - for federation_proxy in federation_proxies: - endpoint = HostnameEndpoint( - self.proxy_reactor, - federation_proxy.host, - federation_proxy.port, - ) - - if federation_proxy.tls: - tls_connection_creator = self._policy_for_https.creatorForNetloc( - federation_proxy.host, - federation_proxy.port, - ) - endpoint = wrapClientTLS(tls_connection_creator, endpoint) - - endpoints.append(endpoint) - - self._federation_proxy_endpoint = _ProxyEndpoints(endpoints) - def request( self, method: bytes, @@ -248,14 +214,6 @@ class ProxyAgent(_AgentBase): parsed_uri.port, self.https_proxy_creds, ) - elif ( - parsed_uri.scheme == b"matrix-federation" - and self._federation_proxy_endpoint - ): - # Cache *all* connections under the same key, since we are only - # connecting to a single destination, the proxy: - endpoint = self._federation_proxy_endpoint - request_path = uri else: # not using a proxy endpoint = HostnameEndpoint( @@ -275,11 +233,6 @@ class ProxyAgent(_AgentBase): endpoint = wrapClientTLS(tls_connection_creator, endpoint) elif parsed_uri.scheme == b"http": pass - elif ( - parsed_uri.scheme == b"matrix-federation" - and self._federation_proxy_endpoint - ): - pass else: return defer.fail( Failure( @@ -384,31 +337,3 @@ def parse_proxy( credentials = ProxyCredentials(b"".join([url.username, b":", url.password])) return url.scheme, url.hostname, url.port or default_port, credentials - - -@implementer(IStreamClientEndpoint) -class _ProxyEndpoints: - """An endpoint that randomly iterates through a given list of endpoints at - each connection attempt. - """ - - def __init__(self, endpoints: Sequence[IStreamClientEndpoint]) -> None: - assert endpoints - self._endpoints = endpoints - - def connect( - self, protocol_factory: IProtocolFactory - ) -> "defer.Deferred[IProtocol]": - """Implements IStreamClientEndpoint interface""" - - return run_in_background(self._do_connect, protocol_factory) - - async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol: - failures: List[Failure] = [] - for endpoint in random.sample(self._endpoints, k=len(self._endpoints)): - try: - return await endpoint.connect(protocol_factory) - except Exception: - failures.append(Failure()) - - failures.pop().raiseException() diff --git a/synapse/http/server.py b/synapse/http/server.py index ff3153a9d9..933172c873 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -18,7 +18,6 @@ import html import logging import types import urllib -import urllib.parse from http import HTTPStatus from http.client import FOUND from inspect import isawaitable @@ -66,6 +65,7 @@ from synapse.api.errors import ( UnrecognizedRequestError, ) from synapse.config.homeserver import HomeServerConfig +from synapse.http.site import SynapseRequest from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background from synapse.logging.opentracing import active_span, start_active_span, trace_servlet from synapse.util import json_encoder @@ -76,7 +76,6 @@ from synapse.util.iterutils import chunk_seq if TYPE_CHECKING: import opentracing - from synapse.http.site import SynapseRequest from synapse.server import HomeServer logger = logging.getLogger(__name__) @@ -103,7 +102,7 @@ HTTP_STATUS_REQUEST_CANCELLED = 499 def return_json_error( - f: failure.Failure, request: "SynapseRequest", config: Optional[HomeServerConfig] + f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig] ) -> None: """Sends a JSON error response to clients.""" @@ -221,8 +220,8 @@ def return_html_error( def wrap_async_request_handler( - h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]] -) -> Callable[["_AsyncResource", "SynapseRequest"], "defer.Deferred[None]"]: + h: Callable[["_AsyncResource", SynapseRequest], Awaitable[None]] +) -> Callable[["_AsyncResource", SynapseRequest], "defer.Deferred[None]"]: """Wraps an async request handler so that it calls request.processing. This helps ensure that work done by the request handler after the request is completed @@ -236,7 +235,7 @@ def wrap_async_request_handler( """ async def wrapped_async_request_handler( - self: "_AsyncResource", request: "SynapseRequest" + self: "_AsyncResource", request: SynapseRequest ) -> None: with request.processing(): await h(self, request) @@ -301,7 +300,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): self._extract_context = extract_context - def render(self, request: "SynapseRequest") -> int: + def render(self, request: SynapseRequest) -> int: """This gets called by twisted every time someone sends us a request.""" request.render_deferred = defer.ensureDeferred( self._async_render_wrapper(request) @@ -309,7 +308,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): return NOT_DONE_YET @wrap_async_request_handler - async def _async_render_wrapper(self, request: "SynapseRequest") -> None: + async def _async_render_wrapper(self, request: SynapseRequest) -> None: """This is a wrapper that delegates to `_async_render` and handles exceptions, return values, metrics, etc. """ @@ -327,15 +326,9 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): # of our stack, and thus gives us a sensible stack # trace. f = failure.Failure() - logger.exception( - "Error handling request", - exc_info=(f.type, f.value, f.getTracebackObject()), - ) self._send_error_response(f, request) - async def _async_render( - self, request: "SynapseRequest" - ) -> Optional[Tuple[int, Any]]: + async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, Any]]: """Delegates to `_async_render_` methods, or returns a 400 if no appropriate method exists. Can be overridden in sub classes for different routing. @@ -365,7 +358,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): @abc.abstractmethod def _send_response( self, - request: "SynapseRequest", + request: SynapseRequest, code: int, response_object: Any, ) -> None: @@ -375,7 +368,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta): def _send_error_response( self, f: failure.Failure, - request: "SynapseRequest", + request: SynapseRequest, ) -> None: raise NotImplementedError() @@ -391,7 +384,7 @@ class DirectServeJsonResource(_AsyncResource): def _send_response( self, - request: "SynapseRequest", + request: SynapseRequest, code: int, response_object: Any, ) -> None: @@ -408,7 +401,7 @@ class DirectServeJsonResource(_AsyncResource): def _send_error_response( self, f: failure.Failure, - request: "SynapseRequest", + request: SynapseRequest, ) -> None: """Implements _AsyncResource._send_error_response""" return_json_error(f, request, None) @@ -480,7 +473,7 @@ class JsonResource(DirectServeJsonResource): ) def _get_handler_for_request( - self, request: "SynapseRequest" + self, request: SynapseRequest ) -> Tuple[ServletCallback, str, Dict[str, str]]: """Finds a callback method to handle the given request. @@ -510,7 +503,7 @@ class JsonResource(DirectServeJsonResource): # Huh. No one wanted to handle that? Fiiiiiine. raise UnrecognizedRequestError(code=404) - async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]: + async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]: callback, servlet_classname, group_dict = self._get_handler_for_request(request) request.is_render_cancellable = is_function_cancellable(callback) @@ -542,7 +535,7 @@ class JsonResource(DirectServeJsonResource): def _send_error_response( self, f: failure.Failure, - request: "SynapseRequest", + request: SynapseRequest, ) -> None: """Implements _AsyncResource._send_error_response""" return_json_error(f, request, self.hs.config) @@ -558,7 +551,7 @@ class DirectServeHtmlResource(_AsyncResource): def _send_response( self, - request: "SynapseRequest", + request: SynapseRequest, code: int, response_object: Any, ) -> None: @@ -572,7 +565,7 @@ class DirectServeHtmlResource(_AsyncResource): def _send_error_response( self, f: failure.Failure, - request: "SynapseRequest", + request: SynapseRequest, ) -> None: """Implements _AsyncResource._send_error_response""" return_html_error(f, request, self.ERROR_TEMPLATE) @@ -599,7 +592,7 @@ class UnrecognizedRequestResource(resource.Resource): errcode of M_UNRECOGNIZED. """ - def render(self, request: "SynapseRequest") -> int: + def render(self, request: SynapseRequest) -> int: f = failure.Failure(UnrecognizedRequestError(code=404)) return_json_error(f, request, None) # A response has already been sent but Twisted requires either NOT_DONE_YET @@ -629,7 +622,7 @@ class RootRedirect(resource.Resource): class OptionsResource(resource.Resource): """Responds to OPTION requests for itself and all children.""" - def render_OPTIONS(self, request: "SynapseRequest") -> bytes: + def render_OPTIONS(self, request: SynapseRequest) -> bytes: request.setResponseCode(204) request.setHeader(b"Content-Length", b"0") @@ -744,7 +737,7 @@ def _encode_json_bytes(json_object: object) -> bytes: def respond_with_json( - request: "SynapseRequest", + request: SynapseRequest, code: int, json_object: Any, send_cors: bool = False, @@ -794,7 +787,7 @@ def respond_with_json( def respond_with_json_bytes( - request: "SynapseRequest", + request: SynapseRequest, code: int, json_bytes: bytes, send_cors: bool = False, @@ -832,7 +825,7 @@ def respond_with_json_bytes( async def _async_write_json_to_request_in_thread( - request: "SynapseRequest", + request: SynapseRequest, json_encoder: Callable[[Any], bytes], json_object: Any, ) -> None: @@ -890,7 +883,7 @@ def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None: _ByteProducer(request, bytes_generator) -def set_cors_headers(request: "SynapseRequest") -> None: +def set_cors_headers(request: SynapseRequest) -> None: """Set the CORS headers so that javascript running in a web browsers can use this API @@ -988,7 +981,7 @@ def set_clickjacking_protection_headers(request: Request) -> None: def respond_with_redirect( - request: "SynapseRequest", url: bytes, statusCode: int = FOUND, cors: bool = False + request: SynapseRequest, url: bytes, statusCode: int = FOUND, cors: bool = False ) -> None: """ Write a 302 (or other specified status code) response to the request, if it is still alive. diff --git a/synapse/http/site.py b/synapse/http/site.py index 0ee2598345..5b5a7c1e59 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -21,28 +21,25 @@ from zope.interface import implementer from twisted.internet.address import UNIXAddress from twisted.internet.defer import Deferred -from twisted.internet.interfaces import IAddress +from twisted.internet.interfaces import IAddress, IReactorTime from twisted.python.failure import Failure from twisted.web.http import HTTPChannel -from twisted.web.iweb import IAgent from twisted.web.resource import IResource, Resource -from twisted.web.server import Request +from twisted.web.server import Request, Site from synapse.config.server import ListenerConfig from synapse.http import get_request_user_agent, redact_uri -from synapse.http.proxy import ProxySite from synapse.http.request_metrics import RequestMetrics, requests_counter from synapse.logging.context import ( ContextRequest, LoggingContext, PreserveLoggingContext, ) -from synapse.types import ISynapseReactor, Requester +from synapse.types import Requester if TYPE_CHECKING: import opentracing - logger = logging.getLogger(__name__) _next_request_seq = 0 @@ -105,7 +102,7 @@ class SynapseRequest(Request): # A boolean indicating whether `render_deferred` should be cancelled if the # client disconnects early. Expected to be set by the coroutine started by # `Resource.render`, if rendering is asynchronous. - self.is_render_cancellable: bool = False + self.is_render_cancellable = False global _next_request_seq self.request_seq = _next_request_seq @@ -604,7 +601,7 @@ class _XForwardedForAddress: host: str -class SynapseSite(ProxySite): +class SynapseSite(Site): """ Synapse-specific twisted http Site @@ -626,8 +623,7 @@ class SynapseSite(ProxySite): resource: IResource, server_version_string: str, max_request_body_size: int, - reactor: ISynapseReactor, - federation_agent: IAgent, + reactor: IReactorTime, ): """ @@ -642,11 +638,7 @@ class SynapseSite(ProxySite): dropping the connection reactor: reactor to be used to manage connection timeouts """ - super().__init__( - resource=resource, - reactor=reactor, - federation_agent=federation_agent, - ) + Site.__init__(self, resource, reactor=reactor) self.site_tag = site_tag self.reactor = reactor @@ -657,9 +649,7 @@ class SynapseSite(ProxySite): request_id_header = config.http_options.request_id_header - self.experimental_cors_msc3886: bool = ( - config.http_options.experimental_cors_msc3886 - ) + self.experimental_cors_msc3886 = config.http_options.experimental_cors_msc3886 def request_factory(channel: HTTPChannel, queued: bool) -> Request: return request_class( -- cgit 1.4.1