diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 1101881a2d..7deb38f2a7 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -13,17 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
-
-from synapse.api.errors import LimitExceededError
-
-from synapse.util.async import sleep
-from synapse.util.logcontext import preserve_fn
-
import collections
import contextlib
import logging
+from twisted.internet import defer
+
+from synapse.api.errors import LimitExceededError
+from synapse.util.logcontext import (
+ PreserveLoggingContext,
+ make_deferred_yieldable,
+ run_in_background,
+)
logger = logging.getLogger(__name__)
@@ -91,13 +92,22 @@ class _PerHostRatelimiter(object):
self.window_size = window_size
self.sleep_limit = sleep_limit
- self.sleep_msec = sleep_msec
+ self.sleep_sec = sleep_msec / 1000.0
self.reject_limit = reject_limit
self.concurrent_requests = concurrent_requests
+ # request_id objects for requests which have been slept
self.sleeping_requests = set()
+
+ # map from request_id object to Deferred for requests which are ready
+ # for processing but have been queued
self.ready_request_queue = collections.OrderedDict()
+
+ # request id objects for requests which are in progress
self.current_processing = set()
+
+ # times at which we have recently (within the last window_size ms)
+ # received requests.
self.request_times = []
@contextlib.contextmanager
@@ -116,11 +126,15 @@ class _PerHostRatelimiter(object):
def _on_enter(self, request_id):
time_now = self.clock.time_msec()
+
+ # remove any entries from request_times which aren't within the window
self.request_times[:] = [
r for r in self.request_times
if time_now - r < self.window_size
]
+ # reject the request if we already have too many queued up (either
+ # sleeping or in the ready queue).
queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
if queue_size > self.reject_limit:
raise LimitExceededError(
@@ -133,9 +147,13 @@ class _PerHostRatelimiter(object):
def queue_request():
if len(self.current_processing) > self.concurrent_requests:
- logger.debug("Ratelimit [%s]: Queue req", id(request_id))
queue_defer = defer.Deferred()
self.ready_request_queue[request_id] = queue_defer
+ logger.info(
+ "Ratelimiter: queueing request (queue now %i items)",
+ len(self.ready_request_queue),
+ )
+
return queue_defer
else:
return defer.succeed(None)
@@ -147,10 +165,9 @@ class _PerHostRatelimiter(object):
if len(self.request_times) > self.sleep_limit:
logger.debug(
- "Ratelimit [%s]: sleeping req",
- id(request_id),
+ "Ratelimiter: sleeping request for %f sec", self.sleep_sec,
)
- ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0)
+ ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
self.sleeping_requests.add(request_id)
@@ -176,6 +193,9 @@ class _PerHostRatelimiter(object):
return r
def on_err(r):
+ # XXX: why is this necessary? this is called before we start
+ # processing the request so why would the request be in
+ # current_processing?
self.current_processing.discard(request_id)
return r
@@ -187,7 +207,7 @@ class _PerHostRatelimiter(object):
ret_defer.addCallbacks(on_start, on_err)
ret_defer.addBoth(on_both)
- return ret_defer
+ return make_deferred_yieldable(ret_defer)
def _on_exit(self, request_id):
logger.debug(
@@ -196,8 +216,10 @@ class _PerHostRatelimiter(object):
)
self.current_processing.discard(request_id)
try:
- request_id, deferred = self.ready_request_queue.popitem()
- self.current_processing.add(request_id)
- deferred.callback(None)
+ # start processing the next item on the queue.
+ _, deferred = self.ready_request_queue.popitem(last=False)
+
+ with PreserveLoggingContext():
+ deferred.callback(None)
except KeyError:
pass
|