summary refs log tree commit diff
path: root/synapse/util/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/__init__.py')
-rw-r--r--synapse/util/__init__.py104
1 files changed, 39 insertions, 65 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 2a2360ab5d..680ea928c7 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -13,20 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.api.errors import SynapseError
-from synapse.util.logcontext import PreserveLoggingContext
-
-from twisted.internet import defer, reactor, task
-
-import time
 import logging
+from itertools import islice
 
-logger = logging.getLogger(__name__)
+import attr
 
+from twisted.internet import defer, task
 
-class DeferredTimedOutError(SynapseError):
-    def __init__(self):
-        super(DeferredTimedOutError, self).__init__(504, "Timed out")
+from synapse.util.logcontext import PreserveLoggingContext
+
+logger = logging.getLogger(__name__)
 
 
 def unwrapFirstError(failure):
@@ -35,16 +31,27 @@ def unwrapFirstError(failure):
     return failure.value.subFailure
 
 
+@attr.s
 class Clock(object):
-    """A small utility that obtains current time-of-day so that time may be
-    mocked during unit-tests.
+    """
+    A Clock wraps a Twisted reactor and provides utilities on top of it.
 
-    TODO(paul): Also move the sleep() functionality into it
+    Args:
+        reactor: The Twisted reactor to use.
     """
+    _reactor = attr.ib()
+
+    @defer.inlineCallbacks
+    def sleep(self, seconds):
+        d = defer.Deferred()
+        with PreserveLoggingContext():
+            self._reactor.callLater(seconds, d.callback, seconds)
+            res = yield d
+        defer.returnValue(res)
 
     def time(self):
         """Returns the current system time in seconds since epoch."""
-        return time.time()
+        return self._reactor.seconds()
 
     def time_msec(self):
         """Returns the current system time in miliseconds since epoch."""
@@ -59,9 +66,10 @@ class Clock(object):
             f(function): The function to call repeatedly.
             msec(float): How long to wait between calls in milliseconds.
         """
-        l = task.LoopingCall(f)
-        l.start(msec / 1000.0, now=False)
-        return l
+        call = task.LoopingCall(f)
+        call.clock = self._reactor
+        call.start(msec / 1000.0, now=False)
+        return call
 
     def call_later(self, delay, callback, *args, **kwargs):
         """Call something later
@@ -77,61 +85,27 @@ class Clock(object):
                 callback(*args, **kwargs)
 
         with PreserveLoggingContext():
-            return reactor.callLater(delay, wrapped_callback, *args, **kwargs)
+            return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
 
     def cancel_call_later(self, timer, ignore_errs=False):
         try:
             timer.cancel()
-        except:
+        except Exception:
             if not ignore_errs:
                 raise
 
-    def time_bound_deferred(self, given_deferred, time_out):
-        if given_deferred.called:
-            return given_deferred
-
-        ret_deferred = defer.Deferred()
 
-        def timed_out_fn():
-            e = DeferredTimedOutError()
+def batch_iter(iterable, size):
+    """batch an iterable up into tuples with a maximum size
 
-            try:
-                ret_deferred.errback(e)
-            except:
-                pass
+    Args:
+        iterable (iterable): the iterable to slice
+        size (int): the maximum batch size
 
-            try:
-                given_deferred.cancel()
-            except:
-                pass
-
-        timer = None
-
-        def cancel(res):
-            try:
-                self.cancel_call_later(timer)
-            except:
-                pass
-            return res
-
-        ret_deferred.addBoth(cancel)
-
-        def success(res):
-            try:
-                ret_deferred.callback(res)
-            except:
-                pass
-
-            return res
-
-        def err(res):
-            try:
-                ret_deferred.errback(res)
-            except:
-                pass
-
-        given_deferred.addCallbacks(callback=success, errback=err)
-
-        timer = self.call_later(time_out, timed_out_fn)
-
-        return ret_deferred
+    Returns:
+        an iterator over the chunks
+    """
+    # make sure we can deal with iterables like lists too
+    sourceiter = iter(iterable)
+    # call islice until it returns an empty tuple
+    return iter(lambda: tuple(islice(sourceiter, size)), ())