diff --git a/synapse/http/client.py b/synapse/http/client.py
index 09ea93e10d..ca2cdbc6e2 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -1037,7 +1037,12 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
if reason.check(ResponseDone):
self.deferred.callback(self.length)
elif reason.check(PotentialDataLoss):
- # stolen from https://github.com/twisted/treq/pull/49/files
+ # This applies to requests which don't set `Content-Length` or a
+ # `Transfer-Encoding` in the response because in this case the end of the
+ # response is indicated by the connection being closed, an event which may
+ # also be due to a transient network problem or other error. But since this
+ # behavior is expected of some servers (like YouTube), let's ignore it.
+ # Stolen from https://github.com/twisted/treq/pull/49/files
# http://twistedmatrix.com/trac/ticket/4840
self.deferred.callback(self.length)
else:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index cc4e258b0f..b00396fdc7 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime
from twisted.internet.task import Cooperator
from twisted.web.client import ResponseFailed
from twisted.web.http_headers import Headers
-from twisted.web.iweb import IBodyProducer, IResponse
+from twisted.web.iweb import IAgent, IBodyProducer, IResponse
import synapse.metrics
import synapse.util.retryutils
@@ -72,6 +72,7 @@ from synapse.http.client import (
read_body_with_max_size,
)
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
+from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -393,17 +394,32 @@ class MatrixFederationHttpClient:
if hs.config.server.user_agent_suffix:
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
- federation_agent = MatrixFederationAgent(
- self.reactor,
- tls_client_options_factory,
- user_agent.encode("ascii"),
- hs.config.server.federation_ip_range_allowlist,
- hs.config.server.federation_ip_range_blocklist,
+ outbound_federation_restricted_to = (
+ hs.config.worker.outbound_federation_restricted_to
)
+ if hs.get_instance_name() in outbound_federation_restricted_to:
+ # Talk to federation directly
+ federation_agent: IAgent = MatrixFederationAgent(
+ self.reactor,
+ tls_client_options_factory,
+ user_agent.encode("ascii"),
+ hs.config.server.federation_ip_range_allowlist,
+ hs.config.server.federation_ip_range_blocklist,
+ )
+ else:
+ # We need to talk to federation via the proxy via one of the configured
+ # locations
+ federation_proxies = outbound_federation_restricted_to.locations
+ federation_agent = ProxyAgent(
+ self.reactor,
+ self.reactor,
+ tls_client_options_factory,
+ federation_proxies=federation_proxies,
+ )
# Use a BlocklistingAgentWrapper to prevent circumventing the IP
# blocking via IP literals in server names
- self.agent = BlocklistingAgentWrapper(
+ self.agent: IAgent = BlocklistingAgentWrapper(
federation_agent,
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
)
@@ -412,7 +428,6 @@ class MatrixFederationHttpClient:
self._store = hs.get_datastores().main
self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
-
self.max_long_retry_delay_seconds = (
hs.config.federation.max_long_retry_delay_ms / 1000
)
@@ -1141,6 +1156,101 @@ class MatrixFederationHttpClient:
RequestSendFailed: If there were problems connecting to the
remote, due to e.g. DNS failures, connection timeouts etc.
"""
+ json_dict, _ = await self.get_json_with_headers(
+ destination=destination,
+ path=path,
+ args=args,
+ retry_on_dns_fail=retry_on_dns_fail,
+ timeout=timeout,
+ ignore_backoff=ignore_backoff,
+ try_trailing_slash_on_400=try_trailing_slash_on_400,
+ parser=parser,
+ )
+ return json_dict
+
+ @overload
+ async def get_json_with_headers(
+ self,
+ destination: str,
+ path: str,
+ args: Optional[QueryParams] = None,
+ retry_on_dns_fail: bool = True,
+ timeout: Optional[int] = None,
+ ignore_backoff: bool = False,
+ try_trailing_slash_on_400: bool = False,
+ parser: Literal[None] = None,
+ ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
+ ...
+
+ @overload
+ async def get_json_with_headers(
+ self,
+ destination: str,
+ path: str,
+ args: Optional[QueryParams] = ...,
+ retry_on_dns_fail: bool = ...,
+ timeout: Optional[int] = ...,
+ ignore_backoff: bool = ...,
+ try_trailing_slash_on_400: bool = ...,
+ parser: ByteParser[T] = ...,
+ ) -> Tuple[T, Dict[bytes, List[bytes]]]:
+ ...
+
+ async def get_json_with_headers(
+ self,
+ destination: str,
+ path: str,
+ args: Optional[QueryParams] = None,
+ retry_on_dns_fail: bool = True,
+ timeout: Optional[int] = None,
+ ignore_backoff: bool = False,
+ try_trailing_slash_on_400: bool = False,
+ parser: Optional[ByteParser[T]] = None,
+ ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
+ """GETs some json from the given host homeserver and path
+
+ Args:
+ destination: The remote server to send the HTTP request to.
+
+ path: The HTTP path.
+
+ args: A dictionary used to create query strings, defaults to
+ None.
+
+ retry_on_dns_fail: true if the request should be retried on DNS failures
+
+ timeout: number of milliseconds to wait for the response.
+ self._default_timeout (60s) by default.
+
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
+ ignore_backoff: true to ignore the historical backoff data
+ and try the request anyway.
+
+ try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
+ response we should try appending a trailing slash to the end of
+ the request. Workaround for #3622 in Synapse <= v0.99.3.
+
+ parser: The parser to use to decode the response. Defaults to
+ parsing as JSON.
+
+ Returns:
+ Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
+ decoded JSON body and a dict of the response headers.
+
+ Raises:
+ HttpResponseException: If we get an HTTP response code >= 300
+ (except 429).
+ NotRetryingDestination: If we are not yet ready to retry this
+ server.
+ FederationDeniedError: If this destination is not on our
+ federation whitelist
+ RequestSendFailed: If there were problems connecting to the
+ remote, due to e.g. DNS failures, connection timeouts etc.
+ """
request = MatrixFederationRequest(
method="GET", destination=destination, path=path, query=args
)
@@ -1156,6 +1266,8 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
+ headers = dict(response.headers.getAllRawHeaders())
+
if timeout is not None:
_sec_timeout = timeout / 1000
else:
@@ -1173,7 +1285,7 @@ class MatrixFederationHttpClient:
parser=parser,
)
- return body
+ return body, headers
async def delete_json(
self,
diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py
new file mode 100644
index 0000000000..0874d67760
--- /dev/null
+++ b/synapse/http/proxy.py
@@ -0,0 +1,249 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import logging
+import urllib.parse
+from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast
+
+from twisted.internet import protocol
+from twisted.internet.interfaces import ITCPTransport
+from twisted.internet.protocol import connectionDone
+from twisted.python import failure
+from twisted.python.failure import Failure
+from twisted.web.client import ResponseDone
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import IAgent, IResponse
+from twisted.web.resource import IResource
+from twisted.web.server import Site
+
+from synapse.api.errors import Codes
+from synapse.http import QuieterFileBodyProducer
+from synapse.http.server import _AsyncResource
+from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.types import ISynapseReactor
+from synapse.util.async_helpers import timeout_deferred
+
+if TYPE_CHECKING:
+ from synapse.http.site import SynapseRequest
+
+logger = logging.getLogger(__name__)
+
+# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616
+# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be
+# consumed by the immediate recipient and not be forwarded on.
+HOP_BY_HOP_HEADERS = {
+ "Connection",
+ "Keep-Alive",
+ "Proxy-Authenticate",
+ "Proxy-Authorization",
+ "TE",
+ "Trailers",
+ "Transfer-Encoding",
+ "Upgrade",
+}
+
+
+def parse_connection_header_value(
+ connection_header_value: Optional[bytes],
+) -> Set[str]:
+ """
+ Parse the `Connection` header to determine which headers we should not be copied
+ over from the remote response.
+
+ As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1
+
+ Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}`
+
+ Even though "close" is a special directive, let's just treat it as just another
+ header for simplicity. If people want to check for this directive, they can simply
+ check for `"Close" in headers`.
+
+ Args:
+ connection_header_value: The value of the `Connection` header.
+
+ Returns:
+ The set of header names that should not be copied over from the remote response.
+ The keys are capitalized in canonical capitalization.
+ """
+ headers = Headers()
+ extra_headers_to_remove: Set[str] = set()
+ if connection_header_value:
+ extra_headers_to_remove = {
+ headers._canonicalNameCaps(connection_option.strip()).decode("ascii")
+ for connection_option in connection_header_value.split(b",")
+ }
+
+ return extra_headers_to_remove
+
+
+class ProxyResource(_AsyncResource):
+ """
+ A stub resource that proxies any requests with a `matrix-federation://` scheme
+ through the given `federation_agent` to the remote homeserver and ferries back the
+ info.
+ """
+
+ isLeaf = True
+
+ def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent):
+ super().__init__(True)
+
+ self.reactor = reactor
+ self.agent = federation_agent
+
+ async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
+ uri = urllib.parse.urlparse(request.uri)
+ assert uri.scheme == b"matrix-federation"
+
+ headers = Headers()
+ for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
+ header_value = request.getHeader(header_name)
+ if header_value:
+ headers.addRawHeader(header_name, header_value)
+
+ request_deferred = run_in_background(
+ self.agent.request,
+ request.method,
+ request.uri,
+ headers=headers,
+ bodyProducer=QuieterFileBodyProducer(request.content),
+ )
+ request_deferred = timeout_deferred(
+ request_deferred,
+ # This should be set longer than the timeout in `MatrixFederationHttpClient`
+ # so that it has enough time to complete and pass us the data before we give
+ # up.
+ timeout=90,
+ reactor=self.reactor,
+ )
+
+ response = await make_deferred_yieldable(request_deferred)
+
+ return response.code, response
+
+ def _send_response(
+ self,
+ request: "SynapseRequest",
+ code: int,
+ response_object: Any,
+ ) -> None:
+ response = cast(IResponse, response_object)
+ response_headers = cast(Headers, response.headers)
+
+ request.setResponseCode(code)
+
+ # The `Connection` header also defines which headers should not be copied over.
+ connection_header = response_headers.getRawHeaders(b"connection")
+ extra_headers_to_remove = parse_connection_header_value(
+ connection_header[0] if connection_header else None
+ )
+
+ # Copy headers.
+ for k, v in response_headers.getAllRawHeaders():
+ # Do not copy over any hop-by-hop headers. These are meant to only be
+ # consumed by the immediate recipient and not be forwarded on.
+ header_key = k.decode("ascii")
+ if (
+ header_key in HOP_BY_HOP_HEADERS
+ or header_key in extra_headers_to_remove
+ ):
+ continue
+
+ request.responseHeaders.setRawHeaders(k, v)
+
+ response.deliverBody(_ProxyResponseBody(request))
+
+ def _send_error_response(
+ self,
+ f: failure.Failure,
+ request: "SynapseRequest",
+ ) -> None:
+ request.setResponseCode(502)
+ request.setHeader(b"Content-Type", b"application/json")
+ request.write(
+ (
+ json.dumps(
+ {
+ "errcode": Codes.UNKNOWN,
+ "err": "ProxyResource: Error when proxying request: %s %s -> %s"
+ % (
+ request.method.decode("ascii"),
+ request.uri.decode("ascii"),
+ f,
+ ),
+ }
+ )
+ ).encode()
+ )
+ request.finish()
+
+
+class _ProxyResponseBody(protocol.Protocol):
+ """
+ A protocol that proxies the given remote response data back out to the given local
+ request.
+ """
+
+ transport: Optional[ITCPTransport] = None
+
+ def __init__(self, request: "SynapseRequest") -> None:
+ self._request = request
+
+ def dataReceived(self, data: bytes) -> None:
+ # Avoid sending response data to the local request that already disconnected
+ if self._request._disconnected and self.transport is not None:
+ # Close the connection (forcefully) since all the data will get
+ # discarded anyway.
+ self.transport.abortConnection()
+ return
+
+ self._request.write(data)
+
+ def connectionLost(self, reason: Failure = connectionDone) -> None:
+ # If the local request is already finished (successfully or failed), don't
+ # worry about sending anything back.
+ if self._request.finished:
+ return
+
+ if reason.check(ResponseDone):
+ self._request.finish()
+ else:
+ # Abort the underlying request since our remote request also failed.
+ self._request.transport.abortConnection()
+
+
+class ProxySite(Site):
+ """
+ Proxies any requests with a `matrix-federation://` scheme through the given
+ `federation_agent`. Otherwise, behaves like a normal `Site`.
+ """
+
+ def __init__(
+ self,
+ resource: IResource,
+ reactor: ISynapseReactor,
+ federation_agent: IAgent,
+ ):
+ super().__init__(resource, reactor=reactor)
+
+ self._proxy_resource = ProxyResource(reactor, federation_agent)
+
+ def getResourceFor(self, request: "SynapseRequest") -> IResource:
+ uri = urllib.parse.urlparse(request.uri)
+ if uri.scheme == b"matrix-federation":
+ return self._proxy_resource
+
+ return super().getResourceFor(request)
diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py
index 7bdc4acae7..1fa3adbef2 100644
--- a/synapse/http/proxyagent.py
+++ b/synapse/http/proxyagent.py
@@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import random
import re
-from typing import Any, Dict, Optional, Tuple
+from typing import Any, Collection, Dict, List, Optional, Sequence, Tuple
from urllib.parse import urlparse
from urllib.request import ( # type: ignore[attr-defined]
getproxies_environment,
@@ -24,7 +25,12 @@ from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.internet.interfaces import IReactorCore, IStreamClientEndpoint
+from twisted.internet.interfaces import (
+ IProtocol,
+ IProtocolFactory,
+ IReactorCore,
+ IStreamClientEndpoint,
+)
from twisted.python.failure import Failure
from twisted.web.client import (
URI,
@@ -36,8 +42,10 @@ from twisted.web.error import SchemeNotSupported
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse
+from synapse.config.workers import InstanceLocationConfig
from synapse.http import redact_uri
from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials
+from synapse.logging.context import run_in_background
logger = logging.getLogger(__name__)
@@ -74,6 +82,10 @@ class ProxyAgent(_AgentBase):
use_proxy: Whether proxy settings should be discovered and used
from conventional environment variables.
+ federation_proxies: An optional list of locations to proxy outbound federation
+ traffic through (only requests that use the `matrix-federation://` scheme
+ will be proxied).
+
Raises:
ValueError if use_proxy is set and the environment variables
contain an invalid proxy specification.
@@ -89,6 +101,7 @@ class ProxyAgent(_AgentBase):
bindAddress: Optional[bytes] = None,
pool: Optional[HTTPConnectionPool] = None,
use_proxy: bool = False,
+ federation_proxies: Collection[InstanceLocationConfig] = (),
):
contextFactory = contextFactory or BrowserLikePolicyForHTTPS()
@@ -127,6 +140,27 @@ class ProxyAgent(_AgentBase):
self._policy_for_https = contextFactory
self._reactor = reactor
+ self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None
+ if federation_proxies:
+ endpoints = []
+ for federation_proxy in federation_proxies:
+ endpoint = HostnameEndpoint(
+ self.proxy_reactor,
+ federation_proxy.host,
+ federation_proxy.port,
+ )
+
+ if federation_proxy.tls:
+ tls_connection_creator = self._policy_for_https.creatorForNetloc(
+ federation_proxy.host,
+ federation_proxy.port,
+ )
+ endpoint = wrapClientTLS(tls_connection_creator, endpoint)
+
+ endpoints.append(endpoint)
+
+ self._federation_proxy_endpoint = _ProxyEndpoints(endpoints)
+
def request(
self,
method: bytes,
@@ -214,6 +248,14 @@ class ProxyAgent(_AgentBase):
parsed_uri.port,
self.https_proxy_creds,
)
+ elif (
+ parsed_uri.scheme == b"matrix-federation"
+ and self._federation_proxy_endpoint
+ ):
+ # Cache *all* connections under the same key, since we are only
+ # connecting to a single destination, the proxy:
+ endpoint = self._federation_proxy_endpoint
+ request_path = uri
else:
# not using a proxy
endpoint = HostnameEndpoint(
@@ -233,6 +275,11 @@ class ProxyAgent(_AgentBase):
endpoint = wrapClientTLS(tls_connection_creator, endpoint)
elif parsed_uri.scheme == b"http":
pass
+ elif (
+ parsed_uri.scheme == b"matrix-federation"
+ and self._federation_proxy_endpoint
+ ):
+ pass
else:
return defer.fail(
Failure(
@@ -337,3 +384,31 @@ def parse_proxy(
credentials = ProxyCredentials(b"".join([url.username, b":", url.password]))
return url.scheme, url.hostname, url.port or default_port, credentials
+
+
+@implementer(IStreamClientEndpoint)
+class _ProxyEndpoints:
+ """An endpoint that randomly iterates through a given list of endpoints at
+ each connection attempt.
+ """
+
+ def __init__(self, endpoints: Sequence[IStreamClientEndpoint]) -> None:
+ assert endpoints
+ self._endpoints = endpoints
+
+ def connect(
+ self, protocol_factory: IProtocolFactory
+ ) -> "defer.Deferred[IProtocol]":
+ """Implements IStreamClientEndpoint interface"""
+
+ return run_in_background(self._do_connect, protocol_factory)
+
+ async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
+ failures: List[Failure] = []
+ for endpoint in random.sample(self._endpoints, k=len(self._endpoints)):
+ try:
+ return await endpoint.connect(protocol_factory)
+ except Exception:
+ failures.append(Failure())
+
+ failures.pop().raiseException()
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 933172c873..ff3153a9d9 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -18,6 +18,7 @@ import html
import logging
import types
import urllib
+import urllib.parse
from http import HTTPStatus
from http.client import FOUND
from inspect import isawaitable
@@ -65,7 +66,6 @@ from synapse.api.errors import (
UnrecognizedRequestError,
)
from synapse.config.homeserver import HomeServerConfig
-from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
from synapse.util import json_encoder
@@ -76,6 +76,7 @@ from synapse.util.iterutils import chunk_seq
if TYPE_CHECKING:
import opentracing
+ from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -102,7 +103,7 @@ HTTP_STATUS_REQUEST_CANCELLED = 499
def return_json_error(
- f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig]
+ f: failure.Failure, request: "SynapseRequest", config: Optional[HomeServerConfig]
) -> None:
"""Sends a JSON error response to clients."""
@@ -220,8 +221,8 @@ def return_html_error(
def wrap_async_request_handler(
- h: Callable[["_AsyncResource", SynapseRequest], Awaitable[None]]
-) -> Callable[["_AsyncResource", SynapseRequest], "defer.Deferred[None]"]:
+ h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]]
+) -> Callable[["_AsyncResource", "SynapseRequest"], "defer.Deferred[None]"]:
"""Wraps an async request handler so that it calls request.processing.
This helps ensure that work done by the request handler after the request is completed
@@ -235,7 +236,7 @@ def wrap_async_request_handler(
"""
async def wrapped_async_request_handler(
- self: "_AsyncResource", request: SynapseRequest
+ self: "_AsyncResource", request: "SynapseRequest"
) -> None:
with request.processing():
await h(self, request)
@@ -300,7 +301,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
self._extract_context = extract_context
- def render(self, request: SynapseRequest) -> int:
+ def render(self, request: "SynapseRequest") -> int:
"""This gets called by twisted every time someone sends us a request."""
request.render_deferred = defer.ensureDeferred(
self._async_render_wrapper(request)
@@ -308,7 +309,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
return NOT_DONE_YET
@wrap_async_request_handler
- async def _async_render_wrapper(self, request: SynapseRequest) -> None:
+ async def _async_render_wrapper(self, request: "SynapseRequest") -> None:
"""This is a wrapper that delegates to `_async_render` and handles
exceptions, return values, metrics, etc.
"""
@@ -326,9 +327,15 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
# of our stack, and thus gives us a sensible stack
# trace.
f = failure.Failure()
+ logger.exception(
+ "Error handling request",
+ exc_info=(f.type, f.value, f.getTracebackObject()),
+ )
self._send_error_response(f, request)
- async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, Any]]:
+ async def _async_render(
+ self, request: "SynapseRequest"
+ ) -> Optional[Tuple[int, Any]]:
"""Delegates to `_async_render_<METHOD>` methods, or returns a 400 if
no appropriate method exists. Can be overridden in sub classes for
different routing.
@@ -358,7 +365,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
@abc.abstractmethod
def _send_response(
self,
- request: SynapseRequest,
+ request: "SynapseRequest",
code: int,
response_object: Any,
) -> None:
@@ -368,7 +375,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
def _send_error_response(
self,
f: failure.Failure,
- request: SynapseRequest,
+ request: "SynapseRequest",
) -> None:
raise NotImplementedError()
@@ -384,7 +391,7 @@ class DirectServeJsonResource(_AsyncResource):
def _send_response(
self,
- request: SynapseRequest,
+ request: "SynapseRequest",
code: int,
response_object: Any,
) -> None:
@@ -401,7 +408,7 @@ class DirectServeJsonResource(_AsyncResource):
def _send_error_response(
self,
f: failure.Failure,
- request: SynapseRequest,
+ request: "SynapseRequest",
) -> None:
"""Implements _AsyncResource._send_error_response"""
return_json_error(f, request, None)
@@ -473,7 +480,7 @@ class JsonResource(DirectServeJsonResource):
)
def _get_handler_for_request(
- self, request: SynapseRequest
+ self, request: "SynapseRequest"
) -> Tuple[ServletCallback, str, Dict[str, str]]:
"""Finds a callback method to handle the given request.
@@ -503,7 +510,7 @@ class JsonResource(DirectServeJsonResource):
# Huh. No one wanted to handle that? Fiiiiiine.
raise UnrecognizedRequestError(code=404)
- async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]:
+ async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
callback, servlet_classname, group_dict = self._get_handler_for_request(request)
request.is_render_cancellable = is_function_cancellable(callback)
@@ -535,7 +542,7 @@ class JsonResource(DirectServeJsonResource):
def _send_error_response(
self,
f: failure.Failure,
- request: SynapseRequest,
+ request: "SynapseRequest",
) -> None:
"""Implements _AsyncResource._send_error_response"""
return_json_error(f, request, self.hs.config)
@@ -551,7 +558,7 @@ class DirectServeHtmlResource(_AsyncResource):
def _send_response(
self,
- request: SynapseRequest,
+ request: "SynapseRequest",
code: int,
response_object: Any,
) -> None:
@@ -565,7 +572,7 @@ class DirectServeHtmlResource(_AsyncResource):
def _send_error_response(
self,
f: failure.Failure,
- request: SynapseRequest,
+ request: "SynapseRequest",
) -> None:
"""Implements _AsyncResource._send_error_response"""
return_html_error(f, request, self.ERROR_TEMPLATE)
@@ -592,7 +599,7 @@ class UnrecognizedRequestResource(resource.Resource):
errcode of M_UNRECOGNIZED.
"""
- def render(self, request: SynapseRequest) -> int:
+ def render(self, request: "SynapseRequest") -> int:
f = failure.Failure(UnrecognizedRequestError(code=404))
return_json_error(f, request, None)
# A response has already been sent but Twisted requires either NOT_DONE_YET
@@ -622,7 +629,7 @@ class RootRedirect(resource.Resource):
class OptionsResource(resource.Resource):
"""Responds to OPTION requests for itself and all children."""
- def render_OPTIONS(self, request: SynapseRequest) -> bytes:
+ def render_OPTIONS(self, request: "SynapseRequest") -> bytes:
request.setResponseCode(204)
request.setHeader(b"Content-Length", b"0")
@@ -737,7 +744,7 @@ def _encode_json_bytes(json_object: object) -> bytes:
def respond_with_json(
- request: SynapseRequest,
+ request: "SynapseRequest",
code: int,
json_object: Any,
send_cors: bool = False,
@@ -787,7 +794,7 @@ def respond_with_json(
def respond_with_json_bytes(
- request: SynapseRequest,
+ request: "SynapseRequest",
code: int,
json_bytes: bytes,
send_cors: bool = False,
@@ -825,7 +832,7 @@ def respond_with_json_bytes(
async def _async_write_json_to_request_in_thread(
- request: SynapseRequest,
+ request: "SynapseRequest",
json_encoder: Callable[[Any], bytes],
json_object: Any,
) -> None:
@@ -883,7 +890,7 @@ def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None:
_ByteProducer(request, bytes_generator)
-def set_cors_headers(request: SynapseRequest) -> None:
+def set_cors_headers(request: "SynapseRequest") -> None:
"""Set the CORS headers so that javascript running in a web browsers can
use this API
@@ -981,7 +988,7 @@ def set_clickjacking_protection_headers(request: Request) -> None:
def respond_with_redirect(
- request: SynapseRequest, url: bytes, statusCode: int = FOUND, cors: bool = False
+ request: "SynapseRequest", url: bytes, statusCode: int = FOUND, cors: bool = False
) -> None:
"""
Write a 302 (or other specified status code) response to the request, if it is still alive.
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 5b5a7c1e59..0ee2598345 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -21,25 +21,28 @@ from zope.interface import implementer
from twisted.internet.address import UNIXAddress
from twisted.internet.defer import Deferred
-from twisted.internet.interfaces import IAddress, IReactorTime
+from twisted.internet.interfaces import IAddress
from twisted.python.failure import Failure
from twisted.web.http import HTTPChannel
+from twisted.web.iweb import IAgent
from twisted.web.resource import IResource, Resource
-from twisted.web.server import Request, Site
+from twisted.web.server import Request
from synapse.config.server import ListenerConfig
from synapse.http import get_request_user_agent, redact_uri
+from synapse.http.proxy import ProxySite
from synapse.http.request_metrics import RequestMetrics, requests_counter
from synapse.logging.context import (
ContextRequest,
LoggingContext,
PreserveLoggingContext,
)
-from synapse.types import Requester
+from synapse.types import ISynapseReactor, Requester
if TYPE_CHECKING:
import opentracing
+
logger = logging.getLogger(__name__)
_next_request_seq = 0
@@ -102,7 +105,7 @@ class SynapseRequest(Request):
# A boolean indicating whether `render_deferred` should be cancelled if the
# client disconnects early. Expected to be set by the coroutine started by
# `Resource.render`, if rendering is asynchronous.
- self.is_render_cancellable = False
+ self.is_render_cancellable: bool = False
global _next_request_seq
self.request_seq = _next_request_seq
@@ -601,7 +604,7 @@ class _XForwardedForAddress:
host: str
-class SynapseSite(Site):
+class SynapseSite(ProxySite):
"""
Synapse-specific twisted http Site
@@ -623,7 +626,8 @@ class SynapseSite(Site):
resource: IResource,
server_version_string: str,
max_request_body_size: int,
- reactor: IReactorTime,
+ reactor: ISynapseReactor,
+ federation_agent: IAgent,
):
"""
@@ -638,7 +642,11 @@ class SynapseSite(Site):
dropping the connection
reactor: reactor to be used to manage connection timeouts
"""
- Site.__init__(self, resource, reactor=reactor)
+ super().__init__(
+ resource=resource,
+ reactor=reactor,
+ federation_agent=federation_agent,
+ )
self.site_tag = site_tag
self.reactor = reactor
@@ -649,7 +657,9 @@ class SynapseSite(Site):
request_id_header = config.http_options.request_id_header
- self.experimental_cors_msc3886 = config.http_options.experimental_cors_msc3886
+ self.experimental_cors_msc3886: bool = (
+ config.http_options.experimental_cors_msc3886
+ )
def request_factory(channel: HTTPChannel, queued: bool) -> Request:
return request_class(
|