summary refs log tree commit diff
path: root/synapse/util/ratelimitutils.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/ratelimitutils.py')
-rw-r--r--synapse/util/ratelimitutils.py111
1 files changed, 99 insertions, 12 deletions
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 6394cc39ac..f678b52cb4 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -18,6 +18,8 @@ import logging
 import typing
 from typing import Any, DefaultDict, Iterator, List, Set
 
+from prometheus_client.core import Counter
+
 from twisted.internet import defer
 
 from synapse.api.errors import LimitExceededError
@@ -27,6 +29,8 @@ from synapse.logging.context import (
     make_deferred_yieldable,
     run_in_background,
 )
+from synapse.logging.opentracing import start_active_span
+from synapse.metrics import Histogram, LaterGauge
 from synapse.util import Clock
 
 if typing.TYPE_CHECKING:
@@ -35,6 +39,32 @@ if typing.TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+# Track how much the ratelimiter is affecting requests
+rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "")
+rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "")
+queue_wait_timer = Histogram(
+    "synapse_rate_limit_queue_wait_time_seconds",
+    "sec",
+    [],
+    buckets=(
+        0.005,
+        0.01,
+        0.025,
+        0.05,
+        0.1,
+        0.25,
+        0.5,
+        0.75,
+        1.0,
+        2.5,
+        5.0,
+        10.0,
+        20.0,
+        "+Inf",
+    ),
+)
+
+
 class FederationRateLimiter:
     def __init__(self, clock: Clock, config: FederationRatelimitSettings):
         def new_limiter() -> "_PerHostRatelimiter":
@@ -44,6 +74,27 @@ class FederationRateLimiter:
             str, "_PerHostRatelimiter"
         ] = collections.defaultdict(new_limiter)
 
+        # We track the number of affected hosts per time-period so we can
+        # differentiate one really noisy homeserver from a general
+        # ratelimit tuning problem across the federation.
+        LaterGauge(
+            "synapse_rate_limit_sleep_affected_hosts",
+            "Number of hosts that had requests put to sleep",
+            [],
+            lambda: sum(
+                ratelimiter.should_sleep() for ratelimiter in self.ratelimiters.values()
+            ),
+        )
+        LaterGauge(
+            "synapse_rate_limit_reject_affected_hosts",
+            "Number of hosts that had requests rejected",
+            [],
+            lambda: sum(
+                ratelimiter.should_reject()
+                for ratelimiter in self.ratelimiters.values()
+            ),
+        )
+
     def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]":
         """Used to ratelimit an incoming request from a given host
 
@@ -59,7 +110,7 @@ class FederationRateLimiter:
         Returns:
             context manager which returns a deferred.
         """
-        return self.ratelimiters[host].ratelimit()
+        return self.ratelimiters[host].ratelimit(host)
 
 
 class _PerHostRatelimiter:
@@ -94,19 +145,42 @@ class _PerHostRatelimiter:
         self.request_times: List[int] = []
 
     @contextlib.contextmanager
-    def ratelimit(self) -> "Iterator[defer.Deferred[None]]":
+    def ratelimit(self, host: str) -> "Iterator[defer.Deferred[None]]":
         # `contextlib.contextmanager` takes a generator and turns it into a
         # context manager. The generator should only yield once with a value
         # to be returned by manager.
         # Exceptions will be reraised at the yield.
 
+        self.host = host
+
         request_id = object()
-        ret = self._on_enter(request_id)
+        # Ideally we'd use `Deferred.fromCoroutine()` here, to save on redundant
+        # type-checking, but we'd need Twisted >= 21.2.
+        ret = defer.ensureDeferred(self._on_enter_with_tracing(request_id))
         try:
             yield ret
         finally:
             self._on_exit(request_id)
 
+    def should_reject(self) -> bool:
+        """
+        Whether to reject the request if we already have too many queued up
+        (either sleeping or in the ready queue).
+        """
+        queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
+        return queue_size > self.reject_limit
+
+    def should_sleep(self) -> bool:
+        """
+        Whether to sleep the request if we already have too many requests coming
+        through within the window.
+        """
+        return len(self.request_times) > self.sleep_limit
+
+    async def _on_enter_with_tracing(self, request_id: object) -> None:
+        with start_active_span("ratelimit wait"), queue_wait_timer.time():
+            await self._on_enter(request_id)
+
     def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
         time_now = self.clock.time_msec()
 
@@ -117,8 +191,9 @@ class _PerHostRatelimiter:
 
         # reject the request if we already have too many queued up (either
         # sleeping or in the ready queue).
-        queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
-        if queue_size > self.reject_limit:
+        if self.should_reject():
+            logger.debug("Ratelimiter(%s): rejecting request", self.host)
+            rate_limit_reject_counter.inc()
             raise LimitExceededError(
                 retry_after_ms=int(self.window_size / self.sleep_limit)
             )
@@ -130,7 +205,8 @@ class _PerHostRatelimiter:
                 queue_defer: defer.Deferred[None] = defer.Deferred()
                 self.ready_request_queue[request_id] = queue_defer
                 logger.info(
-                    "Ratelimiter: queueing request (queue now %i items)",
+                    "Ratelimiter(%s): queueing request (queue now %i items)",
+                    self.host,
                     len(self.ready_request_queue),
                 )
 
@@ -139,19 +215,28 @@ class _PerHostRatelimiter:
                 return defer.succeed(None)
 
         logger.debug(
-            "Ratelimit [%s]: len(self.request_times)=%d",
+            "Ratelimit(%s) [%s]: len(self.request_times)=%d",
+            self.host,
             id(request_id),
             len(self.request_times),
         )
 
-        if len(self.request_times) > self.sleep_limit:
-            logger.debug("Ratelimiter: sleeping request for %f sec", self.sleep_sec)
+        if self.should_sleep():
+            logger.debug(
+                "Ratelimiter(%s) [%s]: sleeping request for %f sec",
+                self.host,
+                id(request_id),
+                self.sleep_sec,
+            )
+            rate_limit_sleep_counter.inc()
             ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
 
             self.sleeping_requests.add(request_id)
 
             def on_wait_finished(_: Any) -> "defer.Deferred[None]":
-                logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id))
+                logger.debug(
+                    "Ratelimit(%s) [%s]: Finished sleeping", self.host, id(request_id)
+                )
                 self.sleeping_requests.discard(request_id)
                 queue_defer = queue_request()
                 return queue_defer
@@ -161,7 +246,9 @@ class _PerHostRatelimiter:
             ret_defer = queue_request()
 
         def on_start(r: object) -> object:
-            logger.debug("Ratelimit [%s]: Processing req", id(request_id))
+            logger.debug(
+                "Ratelimit(%s) [%s]: Processing req", self.host, id(request_id)
+            )
             self.current_processing.add(request_id)
             return r
 
@@ -183,7 +270,7 @@ class _PerHostRatelimiter:
         return make_deferred_yieldable(ret_defer)
 
     def _on_exit(self, request_id: object) -> None:
-        logger.debug("Ratelimit [%s]: Processed req", id(request_id))
+        logger.debug("Ratelimit(%s) [%s]: Processed req", self.host, id(request_id))
         self.current_processing.discard(request_id)
         try:
             # start processing the next item on the queue.