diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 4e837a918e..e77eba90ad 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -15,9 +15,12 @@
from synapse.util.logcontext import LoggingContext
-from twisted.internet import reactor
+from twisted.internet import defer, reactor, task
import time
+import logging
+
+logger = logging.getLogger(__name__)
class Clock(object):
@@ -35,6 +38,14 @@ class Clock(object):
"""Returns the current system time in miliseconds since epoch."""
return self.time() * 1000
+ def looping_call(self, f, msec):
+ l = task.LoopingCall(f)
+ l.start(msec/1000.0, now=False)
+ return l
+
+ def stop_looping_call(self, loop):
+ loop.stop()
+
def call_later(self, delay, callback):
current_context = LoggingContext.current_context()
@@ -45,3 +56,53 @@ class Clock(object):
def cancel_call_later(self, timer):
timer.cancel()
+
+ def time_bound_deferred(self, given_deferred, time_out):
+ if given_deferred.called:
+ return given_deferred
+
+ ret_deferred = defer.Deferred()
+
+ def timed_out_fn():
+ try:
+ ret_deferred.errback(RuntimeError("Timed out"))
+ except:
+ pass
+
+ try:
+ given_deferred.cancel()
+ except:
+ pass
+
+ timer = None
+
+ def cancel(res):
+ try:
+ self.cancel_call_later(timer)
+ except:
+ pass
+ return res
+
+ ret_deferred.addBoth(cancel)
+
+ def sucess(res):
+ try:
+ ret_deferred.callback(res)
+ except:
+ pass
+
+ return res
+
+ def err(res):
+ try:
+ ret_deferred.errback(res)
+ except:
+ pass
+
+ return res
+
+ given_deferred.addCallbacks(callback=sucess, errback=err)
+
+ timer = self.call_later(time_out, timed_out_fn)
+
+ return ret_deferred
|