summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-01-28 19:28:22 +0000
committerErik Johnston <erik@matrix.org>2021-01-28 19:28:22 +0000
commita1b6dea0b7d89c46df3c2650ebb1e35cecc8b8f4 (patch)
tree693de9ad67afde9700f6de6a06d1a99958834fab
parentRatelimit 3PID /requestToken API (#9238) (diff)
downloadsynapse-a1b6dea0b7d89c46df3c2650ebb1e35cecc8b8f4.tar.xz
Add smoother
-rw-r--r--synapse/util/async_helpers.py83
-rw-r--r--tests/util/test_async_utils.py43
2 files changed, 124 insertions, 2 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 9a873c8e8e..34967bb414 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -18,6 +18,7 @@ import collections
 import inspect
 import logging
 from contextlib import contextmanager
+from collections import deque
 from typing import (
     Any,
     Awaitable,
@@ -30,6 +31,7 @@ from typing import (
     Set,
     TypeVar,
     Union,
+    Deque,
 )
 
 import attr
@@ -37,7 +39,7 @@ from typing_extensions import ContextManager
 
 from twisted.internet import defer
 from twisted.internet.defer import CancelledError
-from twisted.internet.interfaces import IReactorTime
+from twisted.internet.interfaces import IReactorTime, IDelayedCall
 from twisted.python import failure
 
 from synapse.logging.context import (
@@ -552,3 +554,82 @@ def maybe_awaitable(value: Union[Awaitable[R], R]) -> Awaitable[R]:
         return value
 
     return DoneAwaitable(value)
+
+
+@attr.s(slots=True)
+class _SmootherEntry:
+    scheduled_at_ms = attr.ib(type=int)
+    scheduled_for_ms = attr.ib(type=int)
+    defer = attr.ib(type=defer.Deferred)
+
+
+@attr.s(slots=True)
+class Smoother:
+    _reactor = attr.ib(type=IReactorTime)
+    _target_ms = attr.ib(type=int)
+
+    _queue = attr.ib(type=Deque[_SmootherEntry], factory=deque)
+    _last_run = attr.ib(type=int, default=0)
+    _next_call = attr.ib(type=Optional[IDelayedCall], default=None)
+
+    def _fire_next(self):
+        if not self._queue:
+            return
+
+        entry = self._queue.popleft()
+        entry.defer.callback(None)
+
+    async def smooth(self) -> None:
+        now = self._reactor.seconds() * 1000.0
+
+        if not self._queue:
+            scheduled_for_ms = (now + self._target_ms + self._last_run) / 2
+            print(scheduled_for_ms, now)
+            if scheduled_for_ms <= now:
+                self._last_run = now
+                return
+
+            entry = _SmootherEntry(
+                scheduled_at_ms=now,
+                scheduled_for_ms=scheduled_for_ms,
+                defer=defer.Deferred(),
+            )
+            self._queue.append(entry)
+
+        else:
+            last_entry = self._queue[-1]
+
+            scheduled_for_ms = (now + self._target_ms + last_entry.scheduled_for_ms) / 2
+
+            entry = _SmootherEntry(
+                scheduled_at_ms=now,
+                scheduled_for_ms=scheduled_for_ms,
+                defer=defer.Deferred(),
+            )
+            self._queue.append(entry)
+
+            step = self._target_ms / (len(self._queue) + 1)
+            for idx, entry in enumerate(self._queue):
+                new_time = now + (idx + 1) * step
+                print("Moving?", entry.scheduled_for_ms, new_time)
+                if new_time < entry.scheduled_for_ms:
+                    entry.scheduled_for_ms = new_time
+
+        if self._next_call:
+            self._next_call.reset(
+                max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0
+            )
+        else:
+            self._reactor.callLater(
+                max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next
+            )
+
+        await make_deferred_yieldable(entry.defer)
+
+        self._last_run = now
+
+        self._reactor.callLater(
+            (self._queue[0].scheduled_for_ms - now) / 1000.0, self._fire_next,
+        )
+
+        return
diff --git a/tests/util/test_async_utils.py b/tests/util/test_async_utils.py
index 17fd86d02d..80a491e646 100644
--- a/tests/util/test_async_utils.py
+++ b/tests/util/test_async_utils.py
@@ -22,7 +22,7 @@ from synapse.logging.context import (
     PreserveLoggingContext,
     current_context,
 )
-from synapse.util.async_helpers import timeout_deferred
+from synapse.util.async_helpers import timeout_deferred, Smoother
 
 from tests.unittest import TestCase
 
@@ -105,3 +105,44 @@ class TimeoutDeferredTest(TestCase):
             )
             self.failureResultOf(timing_out_d, defer.TimeoutError)
             self.assertIs(current_context(), context_one)
+
+
+class TestSmoother(TestCase):
+    def setUp(self):
+        self.clock = Clock()
+
+        self.smoother = Smoother(self.clock, 10 * 1000)
+
+    def test_first(self):
+        self.clock.advance(100)
+
+        d = self.smoother.smooth()
+        self.successResultOf(d)
+
+    def test_multiple_at_same_time(self):
+        self.clock.advance(100)
+
+        d = self.smoother.smooth()
+        self.successResultOf(d)
+
+        d = self.smoother.smooth()
+        self.assertNoResult(d)
+        self.assertAlmostEqual(
+            self.smoother._queue[0].scheduled_for_ms,
+            self.clock.seconds() * 1000 + self.smoother._target_ms / 2,
+        )
+
+        d = self.smoother.smooth()
+        self.assertNoResult(d)
+        self.assertAlmostEqual(
+            self.smoother._queue[0].scheduled_for_ms,
+            self.clock.seconds() * 1000 + self.smoother._target_ms / 3,
+        )
+        self.assertAlmostEqual(
+            self.smoother._queue[1].scheduled_for_ms,
+            self.clock.seconds() * 1000 + 2 * self.smoother._target_ms / 3,
+        )
+
+        self.clock.advance(100)
+
+        self.assertNot(self.smoother._queue)