diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 7253ba120f..7757b8708a 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -95,6 +95,7 @@ class ObservableDeferred(object):
def remove(r):
self._observers.discard(d)
return r
+
d.addBoth(remove)
self._observers.add(d)
@@ -123,7 +124,9 @@ class ObservableDeferred(object):
def __repr__(self):
return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
- id(self), self._result, self._deferred,
+ id(self),
+ self._result,
+ self._deferred,
)
@@ -150,10 +153,12 @@ def concurrently_execute(func, args, limit):
except StopIteration:
pass
- return logcontext.make_deferred_yieldable(defer.gatherResults([
- run_in_background(_concurrently_execute_inner)
- for _ in range(limit)
- ], consumeErrors=True)).addErrback(unwrapFirstError)
+ return logcontext.make_deferred_yieldable(
+ defer.gatherResults(
+ [run_in_background(_concurrently_execute_inner) for _ in range(limit)],
+ consumeErrors=True,
+ )
+ ).addErrback(unwrapFirstError)
def yieldable_gather_results(func, iter, *args, **kwargs):
@@ -169,10 +174,12 @@ def yieldable_gather_results(func, iter, *args, **kwargs):
Deferred[list]: Resolved when all functions have been invoked, or errors if
one of the function calls fails.
"""
- return logcontext.make_deferred_yieldable(defer.gatherResults([
- run_in_background(func, item, *args, **kwargs)
- for item in iter
- ], consumeErrors=True)).addErrback(unwrapFirstError)
+ return logcontext.make_deferred_yieldable(
+ defer.gatherResults(
+ [run_in_background(func, item, *args, **kwargs) for item in iter],
+ consumeErrors=True,
+ )
+ ).addErrback(unwrapFirstError)
class Linearizer(object):
@@ -185,6 +192,7 @@ class Linearizer(object):
# do some work.
"""
+
def __init__(self, name=None, max_count=1, clock=None):
"""
Args:
@@ -197,6 +205,7 @@ class Linearizer(object):
if not clock:
from twisted.internet import reactor
+
clock = Clock(reactor)
self._clock = clock
self.max_count = max_count
@@ -221,7 +230,7 @@ class Linearizer(object):
res = self._await_lock(key)
else:
logger.debug(
- "Acquired uncontended linearizer lock %r for key %r", self.name, key,
+ "Acquired uncontended linearizer lock %r for key %r", self.name, key
)
entry[0] += 1
res = defer.succeed(None)
@@ -266,9 +275,7 @@ class Linearizer(object):
"""
entry = self.key_to_defer[key]
- logger.debug(
- "Waiting to acquire linearizer lock %r for key %r", self.name, key,
- )
+ logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key)
new_defer = make_deferred_yieldable(defer.Deferred())
entry[1][new_defer] = 1
@@ -293,14 +300,14 @@ class Linearizer(object):
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.debug(
- "Cancelling wait for linearizer lock %r for key %r",
- self.name, key,
+ "Cancelling wait for linearizer lock %r for key %r", self.name, key
)
else:
logger.warn(
"Unexpected exception waiting for linearizer lock %r for key %r",
- self.name, key,
+ self.name,
+ key,
)
# we just have to take ourselves back out of the queue.
@@ -438,7 +445,7 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
try:
deferred.cancel()
- except: # noqa: E722, if we throw any exception it'll break time outs
+ except: # noqa: E722, if we throw any exception it'll break time outs
logger.exception("Canceller failed during timeout")
if not new_d.called:
|