From 58114f8a17a5b52a9b90b89b3c7d9b595307c9a8 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 8 Mar 2021 08:25:43 -0500 Subject: Create a SynapseReactor type which incorporates the necessary reactor interfaces. (#9528) This helps fix some type hints when running with Twisted 21.2.0. --- synapse/http/client.py | 5 +++-- synapse/http/federation/matrix_federation_agent.py | 3 ++- synapse/http/matrixfederationclient.py | 8 ++++---- 3 files changed, 9 insertions(+), 7 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/client.py b/synapse/http/client.py index 72901e3f95..af34d583ad 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -63,6 +63,7 @@ from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_u from synapse.http.proxyagent import ProxyAgent from synapse.logging.context import make_deferred_yieldable from synapse.logging.opentracing import set_tag, start_active_span, tags +from synapse.types import ISynapseReactor from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred @@ -199,7 +200,7 @@ class _IPBlacklistingResolver: return r -@implementer(IReactorPluggableNameResolver) +@implementer(ISynapseReactor) class BlacklistingReactorWrapper: """ A Reactor wrapper which will prevent DNS resolution to blacklisted IP @@ -324,7 +325,7 @@ class SimpleHttpClient: # filters out blacklisted IP addresses, to prevent DNS rebinding. self.reactor = BlacklistingReactorWrapper( hs.get_reactor(), self._ip_whitelist, self._ip_blacklist - ) + ) # type: ISynapseReactor else: self.reactor = hs.get_reactor() diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index b07aa59c08..5935a125fd 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -35,6 +35,7 @@ from synapse.http.client import BlacklistingAgentWrapper from synapse.http.federation.srv_resolver import Server, SrvResolver from synapse.http.federation.well_known_resolver import WellKnownResolver from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.types import ISynapseReactor from synapse.util import Clock logger = logging.getLogger(__name__) @@ -68,7 +69,7 @@ class MatrixFederationAgent: def __init__( self, - reactor: IReactorCore, + reactor: ISynapseReactor, tls_client_options_factory: Optional[FederationPolicyForHTTPS], user_agent: bytes, ip_blacklist: IPSet, diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 0f107714ea..da6866addf 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -59,7 +59,7 @@ from synapse.logging.opentracing import ( start_active_span, tags, ) -from synapse.types import JsonDict +from synapse.types import ISynapseReactor, JsonDict from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred from synapse.util.metrics import Measure @@ -237,14 +237,14 @@ class MatrixFederationHttpClient: # addresses, to prevent DNS rebinding. self.reactor = BlacklistingReactorWrapper( hs.get_reactor(), None, hs.config.federation_ip_range_blacklist - ) + ) # type: ISynapseReactor user_agent = hs.version_string if hs.config.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix) user_agent = user_agent.encode("ascii") - self.agent = MatrixFederationAgent( + federation_agent = MatrixFederationAgent( self.reactor, tls_client_options_factory, user_agent, @@ -254,7 +254,7 @@ class MatrixFederationHttpClient: # Use a BlacklistingAgentWrapper to prevent circumventing the IP # blacklist via IP literals in server names self.agent = BlacklistingAgentWrapper( - self.agent, + federation_agent, ip_blacklist=hs.config.federation_ip_range_blacklist, ) -- cgit 1.5.1 From 7fdc6cefb3017f3c4bbe1d824e1de249ac29d219 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 9 Mar 2021 07:41:32 -0500 Subject: Fix additional type hints. (#9543) Type hint fixes due to Twisted 21.2.0 adding type hints. --- changelog.d/9543.misc | 1 + synapse/config/logger.py | 5 ++++- synapse/federation/federation_server.py | 2 +- synapse/handlers/pagination.py | 2 +- synapse/http/federation/well_known_resolver.py | 3 ++- synapse/logging/context.py | 6 ++++-- tests/replication/_base.py | 27 ++++++++++++++++---------- tests/server.py | 2 +- tests/test_utils/logging_setup.py | 2 +- 9 files changed, 32 insertions(+), 18 deletions(-) create mode 100644 changelog.d/9543.misc (limited to 'synapse/http') diff --git a/changelog.d/9543.misc b/changelog.d/9543.misc new file mode 100644 index 0000000000..14c7b78dd9 --- /dev/null +++ b/changelog.d/9543.misc @@ -0,0 +1 @@ +Fix incorrect type hints. diff --git a/synapse/config/logger.py b/synapse/config/logger.py index e56cf846f5..999aecce5c 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -21,8 +21,10 @@ import threading from string import Template import yaml +from zope.interface import implementer from twisted.logger import ( + ILogObserver, LogBeginner, STDLibLogObserver, eventAsText, @@ -227,7 +229,8 @@ def _setup_stdlib_logging(config, log_config_path, logBeginner: LogBeginner) -> threadlocal = threading.local() - def _log(event): + @implementer(ILogObserver) + def _log(event: dict) -> None: if "log_text" in event: if event["log_text"].startswith("DNSDatagramProtocol starting on "): return diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 7657697bfa..ffc735ba25 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -361,7 +361,7 @@ class FederationServer(FederationBase): logger.error( "Failed to handle PDU %s", event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), + exc_info=(f.type, f.value, f.getTracebackObject()), # type: ignore ) await concurrently_execute( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 059064a4eb..66dc886c81 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -285,7 +285,7 @@ class PaginationHandler: except Exception: f = Failure() logger.error( - "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) + "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject()) # type: ignore ) self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED finally: diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py index 4def7d7633..ecd63e6596 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py @@ -322,7 +322,8 @@ def _cache_period_from_headers( def _parse_cache_control(headers: Headers) -> Dict[bytes, Optional[bytes]]: cache_controls = {} - for hdr in headers.getRawHeaders(b"cache-control", []): + cache_control_headers = headers.getRawHeaders(b"cache-control") or [] + for hdr in cache_control_headers: for directive in hdr.split(b","): splits = [x.strip() for x in directive.split(b"=", 1)] k = splits[0].lower() diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 78e27bfb00..1a7ea4fa96 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -669,7 +669,7 @@ def preserve_fn(f): return g -def run_in_background(f, *args, **kwargs): +def run_in_background(f, *args, **kwargs) -> defer.Deferred: """Calls a function, ensuring that the current context is restored after return from the function, and that the sentinel context is set once the deferred returned by the function completes. @@ -697,8 +697,10 @@ def run_in_background(f, *args, **kwargs): if isinstance(res, types.CoroutineType): res = defer.ensureDeferred(res) + # At this point we should have a Deferred, if not then f was a synchronous + # function, wrap it in a Deferred for consistency. if not isinstance(res, defer.Deferred): - return res + return defer.succeed(res) if res.called and not res.paused: # The function should have maintained the logcontext, so we can diff --git a/tests/replication/_base.py b/tests/replication/_base.py index f6a6aed35e..20940c8107 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -22,6 +22,7 @@ from twisted.internet.protocol import Protocol from twisted.internet.task import LoopingCall from twisted.web.http import HTTPChannel from twisted.web.resource import Resource +from twisted.web.server import Request, Site from synapse.app.generic_worker import ( GenericWorkerReplicationHandler, @@ -32,7 +33,10 @@ from synapse.http.site import SynapseRequest, SynapseSite from synapse.replication.http import ReplicationRestResource from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol -from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory +from synapse.replication.tcp.resource import ( + ReplicationStreamProtocolFactory, + ServerReplicationStreamProtocol, +) from synapse.server import HomeServer from synapse.util import Clock @@ -59,7 +63,9 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): # build a replication server server_factory = ReplicationStreamProtocolFactory(hs) self.streamer = hs.get_replication_streamer() - self.server = server_factory.buildProtocol(None) + self.server = server_factory.buildProtocol( + None + ) # type: ServerReplicationStreamProtocol # Make a new HomeServer object for the worker self.reactor.lookups["testserv"] = "1.2.3.4" @@ -155,9 +161,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): request_factory = OneShotRequestFactory() # Set up the server side protocol - channel = _PushHTTPChannel(self.reactor) - channel.requestFactory = request_factory - channel.site = self.site + channel = _PushHTTPChannel(self.reactor, request_factory, self.site) # Connect client to server and vice versa. client_to_server_transport = FakeTransport( @@ -188,8 +192,9 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): fetching updates for given stream. """ + path = request.path # type: bytes # type: ignore self.assertRegex( - request.path, + path, br"^/_synapse/replication/get_repl_stream_updates/%s/[^/]+$" % (stream_name.encode("ascii"),), ) @@ -390,9 +395,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): request_factory = OneShotRequestFactory() # Set up the server side protocol - channel = _PushHTTPChannel(self.reactor) - channel.requestFactory = request_factory - channel.site = self._hs_to_site[hs] + channel = _PushHTTPChannel(self.reactor, request_factory, self._hs_to_site[hs]) # Connect client to server and vice versa. client_to_server_transport = FakeTransport( @@ -475,9 +478,13 @@ class _PushHTTPChannel(HTTPChannel): makes it very hard to test. """ - def __init__(self, reactor: IReactorTime): + def __init__( + self, reactor: IReactorTime, request_factory: Callable[..., Request], site: Site + ): super().__init__() self.reactor = reactor + self.requestFactory = request_factory + self.site = site self._pull_to_push_producer = None # type: Optional[_PullToPushProducer] diff --git a/tests/server.py b/tests/server.py index 939a0008ca..863f6da738 100644 --- a/tests/server.py +++ b/tests/server.py @@ -188,7 +188,7 @@ class FakeSite: def make_request( reactor, - site: Site, + site: Union[Site, FakeSite], method, path, content=b"", diff --git a/tests/test_utils/logging_setup.py b/tests/test_utils/logging_setup.py index 52ae5c5713..74568b34f8 100644 --- a/tests/test_utils/logging_setup.py +++ b/tests/test_utils/logging_setup.py @@ -28,7 +28,7 @@ class ToTwistedHandler(logging.Handler): def emit(self, record): log_entry = self.format(record) log_level = record.levelname.lower().replace("warning", "warn") - self.tx_log.emit( + self.tx_log.emit( # type: ignore twisted.logger.LogLevel.levelWithName(log_level), "{entry}", entry=log_entry ) -- cgit 1.5.1 From 9cd18cc5886a5f44625c1c4730f146ec189b833e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Mar 2021 13:15:12 +0000 Subject: Retry 5xx errors in federation client (#9567) Fixes #8915 --- changelog.d/9567.bugfix | 1 + synapse/http/matrixfederationclient.py | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) create mode 100644 changelog.d/9567.bugfix (limited to 'synapse/http') diff --git a/changelog.d/9567.bugfix b/changelog.d/9567.bugfix new file mode 100644 index 0000000000..e7322c2b5e --- /dev/null +++ b/changelog.d/9567.bugfix @@ -0,0 +1 @@ +Fix bug where federation requests were not correctly retried on 5xx responses. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index da6866addf..5f01ebd3d4 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -534,9 +534,10 @@ class MatrixFederationHttpClient: response.code, response_phrase, body ) - # Retry if the error is a 429 (Too Many Requests), - # otherwise just raise a standard HttpResponseException - if response.code == 429: + # Retry if the error is a 5xx or a 429 (Too Many + # Requests), otherwise just raise a standard + # `HttpResponseException` + if 500 <= response.code < 600 or response.code == 429: raise RequestSendFailed(exc, can_retry=True) from exc else: raise exc -- cgit 1.5.1