diff options
Diffstat (limited to 'synapse/util/__init__.py')
-rw-r--r-- | synapse/util/__init__.py | 104 |
1 files changed, 39 insertions, 65 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 2a2360ab5d..680ea928c7 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,20 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.api.errors import SynapseError -from synapse.util.logcontext import PreserveLoggingContext - -from twisted.internet import defer, reactor, task - -import time import logging +from itertools import islice -logger = logging.getLogger(__name__) +import attr +from twisted.internet import defer, task -class DeferredTimedOutError(SynapseError): - def __init__(self): - super(DeferredTimedOutError, self).__init__(504, "Timed out") +from synapse.util.logcontext import PreserveLoggingContext + +logger = logging.getLogger(__name__) def unwrapFirstError(failure): @@ -35,16 +31,27 @@ def unwrapFirstError(failure): return failure.value.subFailure +@attr.s class Clock(object): - """A small utility that obtains current time-of-day so that time may be - mocked during unit-tests. + """ + A Clock wraps a Twisted reactor and provides utilities on top of it. - TODO(paul): Also move the sleep() functionality into it + Args: + reactor: The Twisted reactor to use. """ + _reactor = attr.ib() + + @defer.inlineCallbacks + def sleep(self, seconds): + d = defer.Deferred() + with PreserveLoggingContext(): + self._reactor.callLater(seconds, d.callback, seconds) + res = yield d + defer.returnValue(res) def time(self): """Returns the current system time in seconds since epoch.""" - return time.time() + return self._reactor.seconds() def time_msec(self): """Returns the current system time in miliseconds since epoch.""" @@ -59,9 +66,10 @@ class Clock(object): f(function): The function to call repeatedly. msec(float): How long to wait between calls in milliseconds. """ - l = task.LoopingCall(f) - l.start(msec / 1000.0, now=False) - return l + call = task.LoopingCall(f) + call.clock = self._reactor + call.start(msec / 1000.0, now=False) + return call def call_later(self, delay, callback, *args, **kwargs): """Call something later @@ -77,61 +85,27 @@ class Clock(object): callback(*args, **kwargs) with PreserveLoggingContext(): - return reactor.callLater(delay, wrapped_callback, *args, **kwargs) + return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer, ignore_errs=False): try: timer.cancel() - except: + except Exception: if not ignore_errs: raise - def time_bound_deferred(self, given_deferred, time_out): - if given_deferred.called: - return given_deferred - - ret_deferred = defer.Deferred() - def timed_out_fn(): - e = DeferredTimedOutError() +def batch_iter(iterable, size): + """batch an iterable up into tuples with a maximum size - try: - ret_deferred.errback(e) - except: - pass + Args: + iterable (iterable): the iterable to slice + size (int): the maximum batch size - try: - given_deferred.cancel() - except: - pass - - timer = None - - def cancel(res): - try: - self.cancel_call_later(timer) - except: - pass - return res - - ret_deferred.addBoth(cancel) - - def success(res): - try: - ret_deferred.callback(res) - except: - pass - - return res - - def err(res): - try: - ret_deferred.errback(res) - except: - pass - - given_deferred.addCallbacks(callback=success, errback=err) - - timer = self.call_later(time_out, timed_out_fn) - - return ret_deferred + Returns: + an iterator over the chunks + """ + # make sure we can deal with iterables like lists too + sourceiter = iter(iterable) + # call islice until it returns an empty tuple + return iter(lambda: tuple(islice(sourceiter, size)), ()) |