summary refs log tree commit diff
path: root/synapse/util/async.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r--synapse/util/async.py33
1 files changed, 11 insertions, 22 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 9dd4e6b5bc..1668df4ce6 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -13,15 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.defer import CancelledError
 from twisted.python import failure
 
 from .logcontext import (
     PreserveLoggingContext, make_deferred_yieldable, run_in_background
 )
-from synapse.util import logcontext, unwrapFirstError
+from synapse.util import logcontext, unwrapFirstError, Clock
 
 from contextlib import contextmanager
 
@@ -32,22 +31,6 @@ from six.moves import range
 logger = logging.getLogger(__name__)
 
 
-@defer.inlineCallbacks
-def sleep(seconds):
-    d = defer.Deferred()
-    with PreserveLoggingContext():
-        reactor.callLater(seconds, d.callback, seconds)
-        res = yield d
-    defer.returnValue(res)
-
-
-def run_on_reactor():
-    """ This will cause the rest of the function to be invoked upon the next
-    iteration of the main loop
-    """
-    return sleep(0)
-
-
 class ObservableDeferred(object):
     """Wraps a deferred object so that we can add observer deferreds. These
     observer deferreds do not affect the callback chain of the original
@@ -180,13 +163,18 @@ class Linearizer(object):
             # do some work.
 
     """
-    def __init__(self, name=None):
+    def __init__(self, name=None, clock=None):
         if name is None:
             self.name = id(self)
         else:
             self.name = name
         self.key_to_defer = {}
 
+        if not clock:
+            from twisted.internet import reactor
+            clock = Clock(reactor)
+        self._clock = clock
+
     @defer.inlineCallbacks
     def queue(self, key):
         # If there is already a deferred in the queue, we pull it out so that
@@ -227,7 +215,7 @@ class Linearizer(object):
             # the context manager, but it needs to happen while we hold the
             # lock, and the context manager's exit code must be synchronous,
             # so actually this is the only sensible place.
-            yield run_on_reactor()
+            yield self._clock.sleep(0)
 
         else:
             logger.info("Acquired uncontended linearizer lock %r for key %r",
@@ -404,7 +392,7 @@ class DeferredTimeoutError(Exception):
     """
 
 
-def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
+def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
     """
     Add a timeout to a deferred by scheduling it to be cancelled after
     timeout seconds.
@@ -419,6 +407,7 @@ def add_timeout_to_deferred(deferred, timeout, on_timeout_cancel=None):
     Args:
         deferred (defer.Deferred): deferred to be timed out
         timeout (Number): seconds to time out after
+        reactor (twisted.internet.reactor): the Twisted reactor to use
 
         on_timeout_cancel (callable): A callable which is called immediately
             after the deferred times out, and not if this deferred is