diff options
author | Erik Johnston <erik@matrix.org> | 2021-01-28 19:28:22 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2021-01-28 19:28:22 +0000 |
commit | a1b6dea0b7d89c46df3c2650ebb1e35cecc8b8f4 (patch) | |
tree | 693de9ad67afde9700f6de6a06d1a99958834fab | |
parent | Ratelimit 3PID /requestToken API (#9238) (diff) | |
download | synapse-a1b6dea0b7d89c46df3c2650ebb1e35cecc8b8f4.tar.xz |
Add smoother
-rw-r--r-- | synapse/util/async_helpers.py | 83 | ||||
-rw-r--r-- | tests/util/test_async_utils.py | 43 |
2 files changed, 124 insertions, 2 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 9a873c8e8e..34967bb414 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -18,6 +18,7 @@ import collections import inspect import logging from contextlib import contextmanager +from collections import deque from typing import ( Any, Awaitable, @@ -30,6 +31,7 @@ from typing import ( Set, TypeVar, Union, + Deque, ) import attr @@ -37,7 +39,7 @@ from typing_extensions import ContextManager from twisted.internet import defer from twisted.internet.defer import CancelledError -from twisted.internet.interfaces import IReactorTime +from twisted.internet.interfaces import IReactorTime, IDelayedCall from twisted.python import failure from synapse.logging.context import ( @@ -552,3 +554,82 @@ def maybe_awaitable(value: Union[Awaitable[R], R]) -> Awaitable[R]: return value return DoneAwaitable(value) + + +@attr.s(slots=True) +class _SmootherEntry: + scheduled_at_ms = attr.ib(type=int) + scheduled_for_ms = attr.ib(type=int) + defer = attr.ib(type=defer.Deferred) + + +@attr.s(slots=True) +class Smoother: + _reactor = attr.ib(type=IReactorTime) + _target_ms = attr.ib(type=int) + + _queue = attr.ib(type=Deque[_SmootherEntry], factory=deque) + _last_run = attr.ib(type=int, default=0) + _next_call = attr.ib(type=Optional[IDelayedCall], default=None) + + def _fire_next(self): + if not self._queue: + return + + entry = self._queue.popleft() + entry.defer.callback(None) + + async def smooth(self) -> None: + now = self._reactor.seconds() * 1000.0 + + if not self._queue: + scheduled_for_ms = (now + self._target_ms + self._last_run) / 2 + print(scheduled_for_ms, now) + if scheduled_for_ms <= now: + self._last_run = now + return + + entry = _SmootherEntry( + scheduled_at_ms=now, + scheduled_for_ms=scheduled_for_ms, + defer=defer.Deferred(), + ) + self._queue.append(entry) + + else: + last_entry = self._queue[-1] + + scheduled_for_ms = (now + self._target_ms + last_entry.scheduled_for_ms) / 2 + + entry = _SmootherEntry( + scheduled_at_ms=now, + scheduled_for_ms=scheduled_for_ms, + defer=defer.Deferred(), + ) + self._queue.append(entry) + + step = self._target_ms / (len(self._queue) + 1) + for idx, entry in enumerate(self._queue): + new_time = now + (idx + 1) * step + print("Moving?", entry.scheduled_for_ms, new_time) + if new_time < entry.scheduled_for_ms: + entry.scheduled_for_ms = new_time + + if self._next_call: + self._next_call.reset( + max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0 + ) + else: + self._reactor.callLater( + max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next + ) + + await make_deferred_yieldable(entry.defer) + + self._last_run = now + + self._reactor.callLater( + (self._queue[0].scheduled_for_ms - now) / 1000.0, self._fire_next, + ) + + return diff --git a/tests/util/test_async_utils.py b/tests/util/test_async_utils.py index 17fd86d02d..80a491e646 100644 --- a/tests/util/test_async_utils.py +++ b/tests/util/test_async_utils.py @@ -22,7 +22,7 @@ from synapse.logging.context import ( PreserveLoggingContext, current_context, ) -from synapse.util.async_helpers import timeout_deferred +from synapse.util.async_helpers import timeout_deferred, Smoother from tests.unittest import TestCase @@ -105,3 +105,44 @@ class TimeoutDeferredTest(TestCase): ) self.failureResultOf(timing_out_d, defer.TimeoutError) self.assertIs(current_context(), context_one) + + +class TestSmoother(TestCase): + def setUp(self): + self.clock = Clock() + + self.smoother = Smoother(self.clock, 10 * 1000) + + def test_first(self): + self.clock.advance(100) + + d = self.smoother.smooth() + self.successResultOf(d) + + def test_multiple_at_same_time(self): + self.clock.advance(100) + + d = self.smoother.smooth() + self.successResultOf(d) + + d = self.smoother.smooth() + self.assertNoResult(d) + self.assertAlmostEqual( + self.smoother._queue[0].scheduled_for_ms, + self.clock.seconds() * 1000 + self.smoother._target_ms / 2, + ) + + d = self.smoother.smooth() + self.assertNoResult(d) + self.assertAlmostEqual( + self.smoother._queue[0].scheduled_for_ms, + self.clock.seconds() * 1000 + self.smoother._target_ms / 3, + ) + self.assertAlmostEqual( + self.smoother._queue[1].scheduled_for_ms, + self.clock.seconds() * 1000 + 2 * self.smoother._target_ms / 3, + ) + + self.clock.advance(100) + + self.assertNot(self.smoother._queue) |