summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-02-12 16:06:49 +0000
committerErik Johnston <erik@matrix.org>2016-02-17 14:29:28 +0000
commit9e7900da1ebe892104ec61e8d3d45c4d956bd323 (patch)
treeafd9b725aa54d51157ef1a9eb3903453307f652c
parentMerge pull request #581 from Rugvip/event_id (diff)
downloadsynapse-9e7900da1ebe892104ec61e8d3d45c4d956bd323.tar.xz
Add wheeltimer impl
-rw-r--r--synapse/util/wheel_timer.py88
-rw-r--r--tests/util/test_wheel_timer.py74
2 files changed, 162 insertions, 0 deletions
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
new file mode 100644
index 0000000000..b447b2456c
--- /dev/null
+++ b/synapse/util/wheel_timer.py
@@ -0,0 +1,88 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class _Entry(object):
+    __slots__ = ["end_key", "queue"]
+
+    def __init__(self, end_key):
+        self.end_key = end_key
+        self.queue = []
+
+
+class WheelTimer(object):
+    """Stores arbitrary objects that will be returned after their timers have
+    expired.
+    """
+
+    def __init__(self, bucket_size=5000):
+        """
+        Args:
+            bucket_size (int): Size of buckets in ms. Corresponds roughly to the
+                accuracy of the timer.
+        """
+        self.bucket_size = bucket_size
+        self.entries = []
+        self.current_tick = 0
+
+    def insert(self, now, obj, then):
+        """Inserts object into timer.
+
+        Args:
+            now (int): Current time in msec
+            obj (object): Object to be inserted
+            then (int): When to return the object strictly after.
+        """
+        then_key = int(then / self.bucket_size) + 1
+        for entry in self.entries:
+            # Add to first bucket we find. This should gracefully handle inserts
+            # for times in the past.
+            if entry.end_key >= then_key:
+                entry.queue.append(obj)
+                return
+
+        next_key = int(now / self.bucket_size) + 1
+        if self.entries:
+            last_key = self.entries[-1].end_key
+        else:
+            last_key = next_key
+
+        # Handle the case when `then` is in the past and `entries` is empty.
+        then_key = max(last_key, then_key)
+
+        # Add empty entries between the end of the current list and when we want
+        # to insert. This ensures there are no gaps.
+        self.entries.extend(
+            _Entry(key) for key in xrange(last_key, then_key + 1)
+        )
+
+        self.entries[-1].queue.append(obj)
+
+    def fetch(self, now):
+        """Fetch any objects that have timed out
+
+        Args:
+            now (ms): Current time in msec
+
+        Returns:
+            list: List of objects that have timed out
+        """
+        now_key = int(now / self.bucket_size)
+
+        ret = []
+        while self.entries and self.entries[0].end_key <= now_key:
+            ret.extend(self.entries.pop(0).queue)
+
+        return ret
diff --git a/tests/util/test_wheel_timer.py b/tests/util/test_wheel_timer.py
new file mode 100644
index 0000000000..c44567e52e
--- /dev/null
+++ b/tests/util/test_wheel_timer.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from .. import unittest
+
+from synapse.util.wheel_timer import WheelTimer
+
+
+class WheelTimerTestCase(unittest.TestCase):
+    def test_single_insert_fetch(self):
+        wheel = WheelTimer(bucket_size=5)
+
+        obj = object()
+        wheel.insert(100, obj, 150)
+
+        self.assertListEqual(wheel.fetch(101), [])
+        self.assertListEqual(wheel.fetch(110), [])
+        self.assertListEqual(wheel.fetch(120), [])
+        self.assertListEqual(wheel.fetch(130), [])
+        self.assertListEqual(wheel.fetch(149), [])
+        self.assertListEqual(wheel.fetch(156), [obj])
+        self.assertListEqual(wheel.fetch(170), [])
+
+    def test_mutli_insert(self):
+        wheel = WheelTimer(bucket_size=5)
+
+        obj1 = object()
+        obj2 = object()
+        obj3 = object()
+        wheel.insert(100, obj1, 150)
+        wheel.insert(105, obj2, 130)
+        wheel.insert(106, obj3, 160)
+
+        self.assertListEqual(wheel.fetch(110), [])
+        self.assertListEqual(wheel.fetch(135), [obj2])
+        self.assertListEqual(wheel.fetch(149), [])
+        self.assertListEqual(wheel.fetch(158), [obj1])
+        self.assertListEqual(wheel.fetch(160), [])
+        self.assertListEqual(wheel.fetch(200), [obj3])
+        self.assertListEqual(wheel.fetch(210), [])
+
+    def test_insert_past(self):
+        wheel = WheelTimer(bucket_size=5)
+
+        obj = object()
+        wheel.insert(100, obj, 50)
+        self.assertListEqual(wheel.fetch(120), [obj])
+
+    def test_insert_past_mutli(self):
+        wheel = WheelTimer(bucket_size=5)
+
+        obj1 = object()
+        obj2 = object()
+        obj3 = object()
+        wheel.insert(100, obj1, 150)
+        wheel.insert(100, obj2, 140)
+        wheel.insert(100, obj3, 50)
+        self.assertListEqual(wheel.fetch(110), [obj3])
+        self.assertListEqual(wheel.fetch(120), [])
+        self.assertListEqual(wheel.fetch(147), [obj2])
+        self.assertListEqual(wheel.fetch(200), [obj1])
+        self.assertListEqual(wheel.fetch(240), [])