summary refs log tree commit diff
path: root/synapse/util/async_helpers.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-06-20 11:59:14 +0100
committerErik Johnston <erik@matrix.org>2019-06-20 11:59:14 +0100
commit45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3 (patch)
tree07bb21377c6611db89f64f948a2e27645662ff0e /synapse/util/async_helpers.py
parentAdd descriptions and remove redundant set(..) (diff)
parentRun Black. (#5482) (diff)
downloadsynapse-45f28a9d2fc0466dcf2a05b0063b7caa3b7e12c3.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/histogram_extremities
Diffstat (limited to 'synapse/util/async_helpers.py')
-rw-r--r--synapse/util/async_helpers.py41
1 files changed, 24 insertions, 17 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 7253ba120f..7757b8708a 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -95,6 +95,7 @@ class ObservableDeferred(object):
             def remove(r):
                 self._observers.discard(d)
                 return r
+
             d.addBoth(remove)
 
             self._observers.add(d)
@@ -123,7 +124,9 @@ class ObservableDeferred(object):
 
     def __repr__(self):
         return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
-            id(self), self._result, self._deferred,
+            id(self),
+            self._result,
+            self._deferred,
         )
 
 
@@ -150,10 +153,12 @@ def concurrently_execute(func, args, limit):
         except StopIteration:
             pass
 
-    return logcontext.make_deferred_yieldable(defer.gatherResults([
-        run_in_background(_concurrently_execute_inner)
-        for _ in range(limit)
-    ], consumeErrors=True)).addErrback(unwrapFirstError)
+    return logcontext.make_deferred_yieldable(
+        defer.gatherResults(
+            [run_in_background(_concurrently_execute_inner) for _ in range(limit)],
+            consumeErrors=True,
+        )
+    ).addErrback(unwrapFirstError)
 
 
 def yieldable_gather_results(func, iter, *args, **kwargs):
@@ -169,10 +174,12 @@ def yieldable_gather_results(func, iter, *args, **kwargs):
         Deferred[list]: Resolved when all functions have been invoked, or errors if
         one of the function calls fails.
     """
-    return logcontext.make_deferred_yieldable(defer.gatherResults([
-        run_in_background(func, item, *args, **kwargs)
-        for item in iter
-    ], consumeErrors=True)).addErrback(unwrapFirstError)
+    return logcontext.make_deferred_yieldable(
+        defer.gatherResults(
+            [run_in_background(func, item, *args, **kwargs) for item in iter],
+            consumeErrors=True,
+        )
+    ).addErrback(unwrapFirstError)
 
 
 class Linearizer(object):
@@ -185,6 +192,7 @@ class Linearizer(object):
             # do some work.
 
     """
+
     def __init__(self, name=None, max_count=1, clock=None):
         """
         Args:
@@ -197,6 +205,7 @@ class Linearizer(object):
 
         if not clock:
             from twisted.internet import reactor
+
             clock = Clock(reactor)
         self._clock = clock
         self.max_count = max_count
@@ -221,7 +230,7 @@ class Linearizer(object):
             res = self._await_lock(key)
         else:
             logger.debug(
-                "Acquired uncontended linearizer lock %r for key %r", self.name, key,
+                "Acquired uncontended linearizer lock %r for key %r", self.name, key
             )
             entry[0] += 1
             res = defer.succeed(None)
@@ -266,9 +275,7 @@ class Linearizer(object):
         """
         entry = self.key_to_defer[key]
 
-        logger.debug(
-            "Waiting to acquire linearizer lock %r for key %r", self.name, key,
-        )
+        logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key)
 
         new_defer = make_deferred_yieldable(defer.Deferred())
         entry[1][new_defer] = 1
@@ -293,14 +300,14 @@ class Linearizer(object):
             logger.info("defer %r got err %r", new_defer, e)
             if isinstance(e, CancelledError):
                 logger.debug(
-                    "Cancelling wait for linearizer lock %r for key %r",
-                    self.name, key,
+                    "Cancelling wait for linearizer lock %r for key %r", self.name, key
                 )
 
             else:
                 logger.warn(
                     "Unexpected exception waiting for linearizer lock %r for key %r",
-                    self.name, key,
+                    self.name,
+                    key,
                 )
 
             # we just have to take ourselves back out of the queue.
@@ -438,7 +445,7 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
 
         try:
             deferred.cancel()
-        except:   # noqa: E722, if we throw any exception it'll break time outs
+        except:  # noqa: E722, if we throw any exception it'll break time outs
             logger.exception("Canceller failed during timeout")
 
         if not new_d.called: