diff options
Diffstat (limited to 'synapse/util/async_helpers.py')
-rw-r--r-- | synapse/util/async_helpers.py | 41 |
1 files changed, 24 insertions, 17 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 7253ba120f..7757b8708a 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -95,6 +95,7 @@ class ObservableDeferred(object): def remove(r): self._observers.discard(d) return r + d.addBoth(remove) self._observers.add(d) @@ -123,7 +124,9 @@ class ObservableDeferred(object): def __repr__(self): return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % ( - id(self), self._result, self._deferred, + id(self), + self._result, + self._deferred, ) @@ -150,10 +153,12 @@ def concurrently_execute(func, args, limit): except StopIteration: pass - return logcontext.make_deferred_yieldable(defer.gatherResults([ - run_in_background(_concurrently_execute_inner) - for _ in range(limit) - ], consumeErrors=True)).addErrback(unwrapFirstError) + return logcontext.make_deferred_yieldable( + defer.gatherResults( + [run_in_background(_concurrently_execute_inner) for _ in range(limit)], + consumeErrors=True, + ) + ).addErrback(unwrapFirstError) def yieldable_gather_results(func, iter, *args, **kwargs): @@ -169,10 +174,12 @@ def yieldable_gather_results(func, iter, *args, **kwargs): Deferred[list]: Resolved when all functions have been invoked, or errors if one of the function calls fails. """ - return logcontext.make_deferred_yieldable(defer.gatherResults([ - run_in_background(func, item, *args, **kwargs) - for item in iter - ], consumeErrors=True)).addErrback(unwrapFirstError) + return logcontext.make_deferred_yieldable( + defer.gatherResults( + [run_in_background(func, item, *args, **kwargs) for item in iter], + consumeErrors=True, + ) + ).addErrback(unwrapFirstError) class Linearizer(object): @@ -185,6 +192,7 @@ class Linearizer(object): # do some work. """ + def __init__(self, name=None, max_count=1, clock=None): """ Args: @@ -197,6 +205,7 @@ class Linearizer(object): if not clock: from twisted.internet import reactor + clock = Clock(reactor) self._clock = clock self.max_count = max_count @@ -221,7 +230,7 @@ class Linearizer(object): res = self._await_lock(key) else: logger.debug( - "Acquired uncontended linearizer lock %r for key %r", self.name, key, + "Acquired uncontended linearizer lock %r for key %r", self.name, key ) entry[0] += 1 res = defer.succeed(None) @@ -266,9 +275,7 @@ class Linearizer(object): """ entry = self.key_to_defer[key] - logger.debug( - "Waiting to acquire linearizer lock %r for key %r", self.name, key, - ) + logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key) new_defer = make_deferred_yieldable(defer.Deferred()) entry[1][new_defer] = 1 @@ -293,14 +300,14 @@ class Linearizer(object): logger.info("defer %r got err %r", new_defer, e) if isinstance(e, CancelledError): logger.debug( - "Cancelling wait for linearizer lock %r for key %r", - self.name, key, + "Cancelling wait for linearizer lock %r for key %r", self.name, key ) else: logger.warn( "Unexpected exception waiting for linearizer lock %r for key %r", - self.name, key, + self.name, + key, ) # we just have to take ourselves back out of the queue. @@ -438,7 +445,7 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): try: deferred.cancel() - except: # noqa: E722, if we throw any exception it'll break time outs + except: # noqa: E722, if we throw any exception it'll break time outs logger.exception("Canceller failed during timeout") if not new_d.called: |