summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-06-18 17:46:04 +0100
committerRichard van der Hoff <richard@matrix.org>2019-06-18 17:46:04 +0100
commitef8415adc2bb14fce19bbe454357bef2b701eca2 (patch)
tree313bf881fe0583a5129ed0e472667e6be566b6d4 /synapse/util
parentfix changelog (diff)
parentFix seven contrib files with Python syntax errors (#5446) (diff)
downloadsynapse-dbkr/3pid_verification_logging.tar.xz
Merge remote-tracking branch 'origin/develop' into dbkr/3pid_verification_logging github/dbkr/3pid_verification_logging dbkr/3pid_verification_logging
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/async_helpers.py21
-rw-r--r--synapse/util/distributor.py1
-rw-r--r--synapse/util/logcontext.py22
-rw-r--r--synapse/util/manhole.py59
-rw-r--r--synapse/util/ratelimitutils.py47
-rw-r--r--synapse/util/retryutils.py55
-rw-r--r--synapse/util/stringutils.py9
7 files changed, 150 insertions, 64 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py

index f0e4a0e10c..7253ba120f 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py
@@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd. +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -156,6 +156,25 @@ def concurrently_execute(func, args, limit): ], consumeErrors=True)).addErrback(unwrapFirstError) +def yieldable_gather_results(func, iter, *args, **kwargs): + """Executes the function with each argument concurrently. + + Args: + func (func): Function to execute that returns a Deferred + iter (iter): An iterable that yields items that get passed as the first + argument to the function + *args: Arguments to be passed to each call to func + + Returns + Deferred[list]: Resolved when all functions have been invoked, or errors if + one of the function calls fails. + """ + return logcontext.make_deferred_yieldable(defer.gatherResults([ + run_in_background(func, item, *args, **kwargs) + for item in iter + ], consumeErrors=True)).addErrback(unwrapFirstError) + + class Linearizer(object): """Limits concurrent access to resources based on a key. Useful to ensure only a few things happen at a time on a given resource. diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 194da87639..e14c8bdfda 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py
@@ -27,6 +27,7 @@ def user_left_room(distributor, user, room_id): distributor.fire("user_left_room", user=user, room_id=room_id) +# XXX: this is no longer used. We should probably kill it. def user_joined_room(distributor, user, room_id): distributor.fire("user_joined_room", user=user, room_id=room_id) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 311b49e18a..fe412355d8 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py
@@ -226,6 +226,8 @@ class LoggingContext(object): self.request = request def __str__(self): + if self.request: + return str(self.request) return "%s@%x" % (self.name, id(self)) @classmethod @@ -274,12 +276,10 @@ class LoggingContext(object): current = self.set_current_context(self.previous_context) if current is not self: if current is self.sentinel: - logger.warn("Expected logging context %s has been lost", self) + logger.warning("Expected logging context %s was lost", self) else: - logger.warn( - "Current logging context %s is not expected context %s", - current, - self + logger.warning( + "Expected logging context %s but found %s", self, current ) self.previous_context = None self.alive = False @@ -433,10 +433,14 @@ class PreserveLoggingContext(object): context = LoggingContext.set_current_context(self.current_context) if context != self.new_context: - logger.warn( - "Unexpected logging context: %s is not %s", - context, self.new_context, - ) + if context is LoggingContext.sentinel: + logger.warning("Expected logging context %s was lost", self.new_context) + else: + logger.warning( + "Expected logging context %s but found %s", + self.new_context, + context, + ) if self.current_context is not LoggingContext.sentinel: if not self.current_context.alive: diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py
index 9cb7e9c9ab..628a2962d9 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py
@@ -1,4 +1,5 @@ # Copyright 2016 OpenMarket Ltd +# Copyright 2019 New Vector Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -11,10 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import sys +import traceback from twisted.conch import manhole_ssh from twisted.conch.insults import insults -from twisted.conch.manhole import ColoredManhole +from twisted.conch.manhole import ColoredManhole, ManholeInterpreter from twisted.conch.ssh.keys import Key from twisted.cred import checkers, portal @@ -79,7 +82,7 @@ def manhole(username, password, globals): rlm = manhole_ssh.TerminalRealm() rlm.chainedProtocolFactory = lambda: insults.ServerProtocol( - ColoredManhole, + SynapseManhole, dict(globals, __name__="__console__") ) @@ -88,3 +91,55 @@ def manhole(username, password, globals): factory.privateKeys[b'ssh-rsa'] = Key.fromString(PRIVATE_KEY) return factory + + +class SynapseManhole(ColoredManhole): + """Overrides connectionMade to create our own ManholeInterpreter""" + def connectionMade(self): + super(SynapseManhole, self).connectionMade() + + # replace the manhole interpreter with our own impl + self.interpreter = SynapseManholeInterpreter(self, self.namespace) + + # this would also be a good place to add more keyHandlers. + + +class SynapseManholeInterpreter(ManholeInterpreter): + def showsyntaxerror(self, filename=None): + """Display the syntax error that just occurred. + + Overrides the base implementation, ignoring sys.excepthook. We always want + any syntax errors to be sent to the terminal, rather than sentry. + """ + type, value, tb = sys.exc_info() + sys.last_type = type + sys.last_value = value + sys.last_traceback = tb + if filename and type is SyntaxError: + # Work hard to stuff the correct filename in the exception + try: + msg, (dummy_filename, lineno, offset, line) = value.args + except ValueError: + # Not the format we expect; leave it alone + pass + else: + # Stuff in the right filename + value = SyntaxError(msg, (filename, lineno, offset, line)) + sys.last_value = value + lines = traceback.format_exception_only(type, value) + self.write(''.join(lines)) + + def showtraceback(self): + """Display the exception that just occurred. + + Overrides the base implementation, ignoring sys.excepthook. We always want + any syntax errors to be sent to the terminal, rather than sentry. + """ + sys.last_type, sys.last_value, last_tb = ei = sys.exc_info() + sys.last_traceback = last_tb + try: + # We remove the first stack item because it is our own code. + lines = traceback.format_exception(ei[0], ei[1], last_tb.tb_next) + self.write(''.join(lines)) + finally: + last_tb = ei = None diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 7deb38f2a7..b146d137f4 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py
@@ -30,31 +30,14 @@ logger = logging.getLogger(__name__) class FederationRateLimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): + def __init__(self, clock, config): """ Args: clock (Clock) - window_size (int): The window size in milliseconds. - sleep_limit (int): The number of requests received in the last - `window_size` milliseconds before we artificially start - delaying processing of requests. - sleep_msec (int): The number of milliseconds to delay processing - of incoming requests by. - reject_limit (int): The maximum number of requests that are can be - queued for processing before we start rejecting requests with - a 429 Too Many Requests response. - concurrent_requests (int): The number of concurrent requests to - process. + config (FederationRateLimitConfig) """ self.clock = clock - - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_msec = sleep_msec - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests - + self._config = config self.ratelimiters = {} def ratelimit(self, host): @@ -76,25 +59,25 @@ class FederationRateLimiter(object): host, _PerHostRatelimiter( clock=self.clock, - window_size=self.window_size, - sleep_limit=self.sleep_limit, - sleep_msec=self.sleep_msec, - reject_limit=self.reject_limit, - concurrent_requests=self.concurrent_requests, + config=self._config, ) ).ratelimit() class _PerHostRatelimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): + def __init__(self, clock, config): + """ + Args: + clock (Clock) + config (FederationRateLimitConfig) + """ self.clock = clock - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_sec = sleep_msec / 1000.0 - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests + self.window_size = config.window_size + self.sleep_limit = config.sleep_limit + self.sleep_sec = config.sleep_delay / 1000.0 + self.reject_limit = config.reject_limit + self.concurrent_requests = config.concurrent # request_id objects for requests which have been slept self.sleeping_requests = set() diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 26cce7d197..1a77456498 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py
@@ -46,8 +46,7 @@ class NotRetryingDestination(Exception): @defer.inlineCallbacks -def get_retry_limiter(destination, clock, store, ignore_backoff=False, - **kwargs): +def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs): """For a given destination check if we have previously failed to send a request there and are waiting before retrying the destination. If we are not ready to retry the destination, this will raise a @@ -60,8 +59,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, clock (synapse.util.clock): timing source store (synapse.storage.transactions.TransactionStore): datastore ignore_backoff (bool): true to ignore the historical backoff data and - try the request anyway. We will still update the next - retry_interval on success/failure. + try the request anyway. We will still reset the retry_interval on success. Example usage: @@ -75,13 +73,12 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, """ retry_last_ts, retry_interval = (0, 0) - retry_timings = yield store.get_destination_retry_timings( - destination - ) + retry_timings = yield store.get_destination_retry_timings(destination) if retry_timings: retry_last_ts, retry_interval = ( - retry_timings["retry_last_ts"], retry_timings["retry_interval"] + retry_timings["retry_last_ts"], + retry_timings["retry_interval"], ) now = int(clock.time_msec()) @@ -93,22 +90,36 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, destination=destination, ) + # if we are ignoring the backoff data, we should also not increment the backoff + # when we get another failure - otherwise a server can very quickly reach the + # maximum backoff even though it might only have been down briefly + backoff_on_failure = not ignore_backoff + defer.returnValue( RetryDestinationLimiter( destination, clock, store, retry_interval, + backoff_on_failure=backoff_on_failure, **kwargs ) ) class RetryDestinationLimiter(object): - def __init__(self, destination, clock, store, retry_interval, - min_retry_interval=10 * 60 * 1000, - max_retry_interval=24 * 60 * 60 * 1000, - multiplier_retry_interval=5, backoff_on_404=False): + def __init__( + self, + destination, + clock, + store, + retry_interval, + min_retry_interval=10 * 60 * 1000, + max_retry_interval=24 * 60 * 60 * 1000, + multiplier_retry_interval=5, + backoff_on_404=False, + backoff_on_failure=True, + ): """Marks the destination as "down" if an exception is thrown in the context, except for CodeMessageException with code < 500. @@ -128,6 +139,9 @@ class RetryDestinationLimiter(object): multiplier_retry_interval (int): The multiplier to use to increase the retry interval after a failed request. backoff_on_404 (bool): Back off if we get a 404 + + backoff_on_failure (bool): set to False if we should not increase the + retry interval on a failure. """ self.clock = clock self.store = store @@ -138,6 +152,7 @@ class RetryDestinationLimiter(object): self.max_retry_interval = max_retry_interval self.multiplier_retry_interval = multiplier_retry_interval self.backoff_on_404 = backoff_on_404 + self.backoff_on_failure = backoff_on_failure def __enter__(self): pass @@ -173,10 +188,13 @@ class RetryDestinationLimiter(object): if not self.retry_interval: return - logger.debug("Connection to %s was successful; clearing backoff", - self.destination) + logger.debug( + "Connection to %s was successful; clearing backoff", self.destination + ) retry_last_ts = 0 self.retry_interval = 0 + elif not self.backoff_on_failure: + return else: # We couldn't connect. if self.retry_interval: @@ -190,7 +208,10 @@ class RetryDestinationLimiter(object): logger.info( "Connection to %s was unsuccessful (%s(%s)); backoff now %i", - self.destination, exc_type, exc_val, self.retry_interval + self.destination, + exc_type, + exc_val, + self.retry_interval, ) retry_last_ts = int(self.clock.time_msec()) @@ -201,9 +222,7 @@ class RetryDestinationLimiter(object): self.destination, retry_last_ts, self.retry_interval ) except Exception: - logger.exception( - "Failed to store destination_retry_timings", - ) + logger.exception("Failed to store destination_retry_timings") # we deliberately do this in the background. synapse.util.logcontext.run_in_background(store_retry_timings) diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index fdcb375f95..69dffd8244 100644 --- a/synapse/util/stringutils.py +++ b/synapse/util/stringutils.py
@@ -24,14 +24,19 @@ _string_with_symbols = ( string.digits + string.ascii_letters + ".,;:^&*-_+=#~@" ) +# random_string and random_string_with_symbols are used for a range of things, +# some cryptographically important, some less so. We use SystemRandom to make sure +# we get cryptographically-secure randoms. +rand = random.SystemRandom() + def random_string(length): - return ''.join(random.choice(string.ascii_letters) for _ in range(length)) + return ''.join(rand.choice(string.ascii_letters) for _ in range(length)) def random_string_with_symbols(length): return ''.join( - random.choice(_string_with_symbols) for _ in range(length) + rand.choice(_string_with_symbols) for _ in range(length) )