diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 9a873c8e8e..34967bb414 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -18,6 +18,7 @@ import collections
import inspect
import logging
from contextlib import contextmanager
+from collections import deque
from typing import (
Any,
Awaitable,
@@ -30,6 +31,7 @@ from typing import (
Set,
TypeVar,
Union,
+ Deque,
)
import attr
@@ -37,7 +39,7 @@ from typing_extensions import ContextManager
from twisted.internet import defer
from twisted.internet.defer import CancelledError
-from twisted.internet.interfaces import IReactorTime
+from twisted.internet.interfaces import IReactorTime, IDelayedCall
from twisted.python import failure
from synapse.logging.context import (
@@ -552,3 +554,82 @@ def maybe_awaitable(value: Union[Awaitable[R], R]) -> Awaitable[R]:
return value
return DoneAwaitable(value)
+
+
+@attr.s(slots=True)
+class _SmootherEntry:
+ scheduled_at_ms = attr.ib(type=int)
+ scheduled_for_ms = attr.ib(type=int)
+ defer = attr.ib(type=defer.Deferred)
+
+
+@attr.s(slots=True)
+class Smoother:
+ _reactor = attr.ib(type=IReactorTime)
+ _target_ms = attr.ib(type=int)
+
+ _queue = attr.ib(type=Deque[_SmootherEntry], factory=deque)
+ _last_run = attr.ib(type=int, default=0)
+ _next_call = attr.ib(type=Optional[IDelayedCall], default=None)
+
+ def _fire_next(self):
+ if not self._queue:
+ return
+
+ entry = self._queue.popleft()
+ entry.defer.callback(None)
+
+ async def smooth(self) -> None:
+ now = self._reactor.seconds() * 1000.0
+
+ if not self._queue:
+ scheduled_for_ms = (now + self._target_ms + self._last_run) / 2
+ print(scheduled_for_ms, now)
+ if scheduled_for_ms <= now:
+ self._last_run = now
+ return
+
+ entry = _SmootherEntry(
+ scheduled_at_ms=now,
+ scheduled_for_ms=scheduled_for_ms,
+ defer=defer.Deferred(),
+ )
+ self._queue.append(entry)
+
+ else:
+ last_entry = self._queue[-1]
+
+ scheduled_for_ms = (now + self._target_ms + last_entry.scheduled_for_ms) / 2
+
+ entry = _SmootherEntry(
+ scheduled_at_ms=now,
+ scheduled_for_ms=scheduled_for_ms,
+ defer=defer.Deferred(),
+ )
+ self._queue.append(entry)
+
+ step = self._target_ms / (len(self._queue) + 1)
+ for idx, entry in enumerate(self._queue):
+ new_time = now + (idx + 1) * step
+ print("Moving?", entry.scheduled_for_ms, new_time)
+ if new_time < entry.scheduled_for_ms:
+ entry.scheduled_for_ms = new_time
+
+ if self._next_call:
+ self._next_call.reset(
+ max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0
+ )
+ else:
+ self._reactor.callLater(
+ max(self._queue[0].scheduled_for_ms - now, 0) / 1000.0, self._fire_next
+ )
+
+ await make_deferred_yieldable(entry.defer)
+
+ self._last_run = now
+
+ self._reactor.callLater(
+ (self._queue[0].scheduled_for_ms - now) / 1000.0, self._fire_next,
+ )
+
+ return
|