summary refs log tree commit diff
path: root/synapse/util/async.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-07-12 09:56:28 +0100
committerRichard van der Hoff <richard@matrix.org>2018-07-12 09:56:28 +0100
commit482d17b58b55e4a62c1b4df9484d1c3af80d94ff (patch)
treed936edf00491834d76c7c7aa651d2f884e0c307b /synapse/util/async.py
parentEnforce the specified API for report_event (diff)
parentMerge pull request #3505 from matrix-org/erikj/receipts_cahce (diff)
downloadsynapse-482d17b58b55e4a62c1b4df9484d1c3af80d94ff.tar.xz
Merge branch 'develop' into rav/enforce_report_api
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r--synapse/util/async.py47
1 files changed, 19 insertions, 28 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 9dd4e6b5bc..5d0fb39130 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -13,41 +13,26 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+from contextlib import contextmanager
 
-from twisted.internet import defer, reactor
+from six.moves import range
+
+from twisted.internet import defer
 from twisted.internet.defer import CancelledError
 from twisted.python import failure
 
+from synapse.util import Clock, logcontext, unwrapFirstError
+
 from .logcontext import (
-    PreserveLoggingContext, make_deferred_yieldable, run_in_background
+    PreserveLoggingContext,
+    make_deferred_yieldable,
+    run_in_background,
 )
-from synapse.util import logcontext, unwrapFirstError
-
-from contextlib import contextmanager
-
-import logging
-
-from six.moves import range
 
 logger = logging.getLogger(__name__)
 
 
-@defer.inlineCallbacks
-def sleep(seconds):
-    d = defer.Deferred()
-    with PreserveLoggingContext():
-        reactor.callLater(seconds, d.callback, seconds)
-        res = yield d
-    defer.returnValue(res)
-
-
-def run_on_reactor():
-    """ This will cause the rest of the function to be invoked upon the next
-    iteration of the main loop
-    """
-    return sleep(0)
-
-
 class ObservableDeferred(object):
     """Wraps a deferred object so that we can add observer deferreds. These
     observer deferreds do not affect the callback chain of the original
@@ -180,13 +165,18 @@ class Linearizer(object):
             # do some work.
 
     """
-    def __init__(self, name=None):
+    def __init__(self, name=None, clock=None):
         if name is None:
             self.name = id(self)
         else:
             self.name = name
         self.key_to_defer = {}
 
+        if not clock:
+            from twisted.internet import reactor
+            clock = Clock(reactor)
+        self._clock = clock
+
     @defer.inlineCallbacks
     def queue(self, key):
         # If there is already a deferred in the queue, we pull it out so that
@@ -227,7 +217,7 @@ class Linearizer(object):
             # the context manager, but it needs to happen while we hold the
             # lock, and the context manager's exit code must be synchronous,
             # so actually this is the only sensible place.
-            yield run_on_reactor()
+            yield self._clock.sleep(0)
 
         else:
             logger.info("Acquired uncontended linearizer lock %r for key %r",
@@ -404,7 +394,7 @@ class DeferredTimeoutError(Exception):
     """
 
 
-def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
+def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
     """
     Add a timeout to a deferred by scheduling it to be cancelled after
     timeout seconds.
@@ -419,6 +409,7 @@ def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
     Args:
         deferred (defer.Deferred): deferred to be timed out
         timeout (Number): seconds to time out after
+        reactor (twisted.internet.reactor): the Twisted reactor to use
 
         on_timeout_cancel (callable): A callable which is called immediately
             after the deferred times out, and not if this deferred is