diff options
Diffstat (limited to 'synapse/util/ratelimitutils.py')
-rw-r--r-- | synapse/util/ratelimitutils.py | 37 |
1 files changed, 29 insertions, 8 deletions
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index e48324d926..434b02b97b 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -18,6 +18,8 @@ import logging import typing from typing import Any, DefaultDict, Iterator, List, Set +from prometheus_client.core import Counter + from twisted.internet import defer from synapse.api.errors import LimitExceededError @@ -37,6 +39,9 @@ if typing.TYPE_CHECKING: logger = logging.getLogger(__name__) +# Track how much the ratelimiter is affecting requests +rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "") +rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "") queue_wait_timer = Histogram( "synapse_rate_limit_queue_wait_time_seconds", "sec", @@ -84,7 +89,7 @@ class FederationRateLimiter: Returns: context manager which returns a deferred. """ - return self.ratelimiters[host].ratelimit() + return self.ratelimiters[host].ratelimit(host) class _PerHostRatelimiter: @@ -119,12 +124,14 @@ class _PerHostRatelimiter: self.request_times: List[int] = [] @contextlib.contextmanager - def ratelimit(self) -> "Iterator[defer.Deferred[None]]": + def ratelimit(self, host: str) -> "Iterator[defer.Deferred[None]]": # `contextlib.contextmanager` takes a generator and turns it into a # context manager. The generator should only yield once with a value # to be returned by manager. # Exceptions will be reraised at the yield. + self.host = host + request_id = object() ret = self._on_enter(request_id) try: @@ -144,6 +151,8 @@ class _PerHostRatelimiter: # sleeping or in the ready queue). queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) if queue_size > self.reject_limit: + logger.debug("Ratelimiter(%s): rejecting request", self.host) + rate_limit_reject_counter.inc() raise LimitExceededError( retry_after_ms=int(self.window_size / self.sleep_limit) ) @@ -155,7 +164,8 @@ class _PerHostRatelimiter: queue_defer: defer.Deferred[None] = defer.Deferred() self.ready_request_queue[request_id] = queue_defer logger.info( - "Ratelimiter: queueing request (queue now %i items)", + "Ratelimiter(%s): queueing request (queue now %i items)", + self.host, len(self.ready_request_queue), ) @@ -164,19 +174,28 @@ class _PerHostRatelimiter: return defer.succeed(None) logger.debug( - "Ratelimit [%s]: len(self.request_times)=%d", + "Ratelimit(%s) [%s]: len(self.request_times)=%d", + self.host, id(request_id), len(self.request_times), ) if len(self.request_times) > self.sleep_limit: - logger.debug("Ratelimiter: sleeping request for %f sec", self.sleep_sec) + logger.debug( + "Ratelimiter(%s) [%s]: sleeping request for %f sec", + self.host, + id(request_id), + self.sleep_sec, + ) + rate_limit_sleep_counter.inc() ret_defer = run_in_background(self.clock.sleep, self.sleep_sec) self.sleeping_requests.add(request_id) def on_wait_finished(_: Any) -> "defer.Deferred[None]": - logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id)) + logger.debug( + "Ratelimit(%s) [%s]: Finished sleeping", self.host, id(request_id) + ) self.sleeping_requests.discard(request_id) queue_defer = queue_request() return queue_defer @@ -186,7 +205,9 @@ class _PerHostRatelimiter: ret_defer = queue_request() def on_start(r: object) -> object: - logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + logger.debug( + "Ratelimit(%s) [%s]: Processing req", self.host, id(request_id) + ) self.current_processing.add(request_id) return r @@ -217,7 +238,7 @@ class _PerHostRatelimiter: return make_deferred_yieldable(ret_defer) def _on_exit(self, request_id: object) -> None: - logger.debug("Ratelimit [%s]: Processed req", id(request_id)) + logger.debug("Ratelimit(%s) [%s]: Processed req", self.host, id(request_id)) self.current_processing.discard(request_id) try: # start processing the next item on the queue. |