diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 2a2360ab5d..680ea928c7 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,20 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.api.errors import SynapseError
-from synapse.util.logcontext import PreserveLoggingContext
-
-from twisted.internet import defer, reactor, task
-
-import time
import logging
+from itertools import islice
-logger = logging.getLogger(__name__)
+import attr
+from twisted.internet import defer, task
-class DeferredTimedOutError(SynapseError):
- def __init__(self):
- super(DeferredTimedOutError, self).__init__(504, "Timed out")
+from synapse.util.logcontext import PreserveLoggingContext
+
+logger = logging.getLogger(__name__)
def unwrapFirstError(failure):
@@ -35,16 +31,27 @@ def unwrapFirstError(failure):
return failure.value.subFailure
+@attr.s
class Clock(object):
- """A small utility that obtains current time-of-day so that time may be
- mocked during unit-tests.
+ """
+ A Clock wraps a Twisted reactor and provides utilities on top of it.
- TODO(paul): Also move the sleep() functionality into it
+ Args:
+ reactor: The Twisted reactor to use.
"""
+ _reactor = attr.ib()
+
+ @defer.inlineCallbacks
+ def sleep(self, seconds):
+ d = defer.Deferred()
+ with PreserveLoggingContext():
+ self._reactor.callLater(seconds, d.callback, seconds)
+ res = yield d
+ defer.returnValue(res)
def time(self):
"""Returns the current system time in seconds since epoch."""
- return time.time()
+ return self._reactor.seconds()
def time_msec(self):
"""Returns the current system time in miliseconds since epoch."""
@@ -59,9 +66,10 @@ class Clock(object):
f(function): The function to call repeatedly.
msec(float): How long to wait between calls in milliseconds.
"""
- l = task.LoopingCall(f)
- l.start(msec / 1000.0, now=False)
- return l
+ call = task.LoopingCall(f)
+ call.clock = self._reactor
+ call.start(msec / 1000.0, now=False)
+ return call
def call_later(self, delay, callback, *args, **kwargs):
"""Call something later
@@ -77,61 +85,27 @@ class Clock(object):
callback(*args, **kwargs)
with PreserveLoggingContext():
- return reactor.callLater(delay, wrapped_callback, *args, **kwargs)
+ return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
def cancel_call_later(self, timer, ignore_errs=False):
try:
timer.cancel()
- except:
+ except Exception:
if not ignore_errs:
raise
- def time_bound_deferred(self, given_deferred, time_out):
- if given_deferred.called:
- return given_deferred
-
- ret_deferred = defer.Deferred()
- def timed_out_fn():
- e = DeferredTimedOutError()
+def batch_iter(iterable, size):
+ """batch an iterable up into tuples with a maximum size
- try:
- ret_deferred.errback(e)
- except:
- pass
+ Args:
+ iterable (iterable): the iterable to slice
+ size (int): the maximum batch size
- try:
- given_deferred.cancel()
- except:
- pass
-
- timer = None
-
- def cancel(res):
- try:
- self.cancel_call_later(timer)
- except:
- pass
- return res
-
- ret_deferred.addBoth(cancel)
-
- def success(res):
- try:
- ret_deferred.callback(res)
- except:
- pass
-
- return res
-
- def err(res):
- try:
- ret_deferred.errback(res)
- except:
- pass
-
- given_deferred.addCallbacks(callback=success, errback=err)
-
- timer = self.call_later(time_out, timed_out_fn)
-
- return ret_deferred
+ Returns:
+ an iterator over the chunks
+ """
+ # make sure we can deal with iterables like lists too
+ sourceiter = iter(iterable)
+ # call islice until it returns an empty tuple
+ return iter(lambda: tuple(islice(sourceiter, size)), ())
|