summary refs log tree commit diff
path: root/synapse/util/async_helpers.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-06-24 10:00:13 +0100
committerRichard van der Hoff <richard@matrix.org>2019-06-24 10:00:13 +0100
commit5097aee740b542407e5bb13d19a3e3e6c2227316 (patch)
tree09a03650256e09cd0b5df59dbf2d7bb2ba14df6c /synapse/util/async_helpers.py
parentchangelog (diff)
parentImprove help and cmdline option names for --generate-config options (#5512) (diff)
downloadsynapse-5097aee740b542407e5bb13d19a3e3e6c2227316.tar.xz
Merge branch 'develop' into rav/cleanup_metrics
Diffstat (limited to 'synapse/util/async_helpers.py')
-rw-r--r--synapse/util/async_helpers.py41
1 files changed, 24 insertions, 17 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 7253ba120f..7757b8708a 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -95,6 +95,7 @@ class ObservableDeferred(object):
             def remove(r):
                 self._observers.discard(d)
                 return r
+
             d.addBoth(remove)
 
             self._observers.add(d)
@@ -123,7 +124,9 @@ class ObservableDeferred(object):
 
     def __repr__(self):
         return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
-            id(self), self._result, self._deferred,
+            id(self),
+            self._result,
+            self._deferred,
         )
 
 
@@ -150,10 +153,12 @@ def concurrently_execute(func, args, limit):
         except StopIteration:
             pass
 
-    return logcontext.make_deferred_yieldable(defer.gatherResults([
-        run_in_background(_concurrently_execute_inner)
-        for _ in range(limit)
-    ], consumeErrors=True)).addErrback(unwrapFirstError)
+    return logcontext.make_deferred_yieldable(
+        defer.gatherResults(
+            [run_in_background(_concurrently_execute_inner) for _ in range(limit)],
+            consumeErrors=True,
+        )
+    ).addErrback(unwrapFirstError)
 
 
 def yieldable_gather_results(func, iter, *args, **kwargs):
@@ -169,10 +174,12 @@ def yieldable_gather_results(func, iter, *args, **kwargs):
         Deferred[list]: Resolved when all functions have been invoked, or errors if
         one of the function calls fails.
     """
-    return logcontext.make_deferred_yieldable(defer.gatherResults([
-        run_in_background(func, item, *args, **kwargs)
-        for item in iter
-    ], consumeErrors=True)).addErrback(unwrapFirstError)
+    return logcontext.make_deferred_yieldable(
+        defer.gatherResults(
+            [run_in_background(func, item, *args, **kwargs) for item in iter],
+            consumeErrors=True,
+        )
+    ).addErrback(unwrapFirstError)
 
 
 class Linearizer(object):
@@ -185,6 +192,7 @@ class Linearizer(object):
             # do some work.
 
     """
+
     def __init__(self, name=None, max_count=1, clock=None):
         """
         Args:
@@ -197,6 +205,7 @@ class Linearizer(object):
 
         if not clock:
             from twisted.internet import reactor
+
             clock = Clock(reactor)
         self._clock = clock
         self.max_count = max_count
@@ -221,7 +230,7 @@ class Linearizer(object):
             res = self._await_lock(key)
         else:
             logger.debug(
-                "Acquired uncontended linearizer lock %r for key %r", self.name, key,
+                "Acquired uncontended linearizer lock %r for key %r", self.name, key
             )
             entry[0] += 1
             res = defer.succeed(None)
@@ -266,9 +275,7 @@ class Linearizer(object):
         """
         entry = self.key_to_defer[key]
 
-        logger.debug(
-            "Waiting to acquire linearizer lock %r for key %r", self.name, key,
-        )
+        logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key)
 
         new_defer = make_deferred_yieldable(defer.Deferred())
         entry[1][new_defer] = 1
@@ -293,14 +300,14 @@ class Linearizer(object):
             logger.info("defer %r got err %r", new_defer, e)
             if isinstance(e, CancelledError):
                 logger.debug(
-                    "Cancelling wait for linearizer lock %r for key %r",
-                    self.name, key,
+                    "Cancelling wait for linearizer lock %r for key %r", self.name, key
                 )
 
             else:
                 logger.warn(
                     "Unexpected exception waiting for linearizer lock %r for key %r",
-                    self.name, key,
+                    self.name,
+                    key,
                 )
 
             # we just have to take ourselves back out of the queue.
@@ -438,7 +445,7 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
 
         try:
             deferred.cancel()
-        except:   # noqa: E722, if we throw any exception it'll break time outs
+        except:  # noqa: E722, if we throw any exception it'll break time outs
             logger.exception("Canceller failed during timeout")
 
         if not new_d.called: