summary refs log tree commit diff
path: root/synapse/federation/transport
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/transport')
-rw-r--r--synapse/federation/transport/server.py59
1 files changed, 51 insertions, 8 deletions
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a9e625f127..390e54b9fb 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -32,6 +32,21 @@ logger = logging.getLogger(__name__)
 class FederationRateLimiter(object):
     def __init__(self, clock, window_size, sleep_limit, sleep_msec,
                  reject_limit, concurrent_requests):
+        """
+        Args:
+            clock (Clock)
+            window_size (int): The window size in milliseconds.
+            sleep_limit (int): The number of requests received in the last
+                `window_size` milliseconds before we artificially start
+                delaying processing of requests.
+            sleep_msec (int): The number of milliseconds to delay processing
+                of incoming requests by.
+            reject_limit (int): The maximum number of requests that are can be
+                queued for processing before we start rejecting requests with
+                a 429 Too Many Requests response.
+            concurrent_requests (int): The number of concurrent requests to
+                process.
+        """
         self.clock = clock
 
         self.window_size = window_size
@@ -43,9 +58,23 @@ class FederationRateLimiter(object):
         self.ratelimiters = {}
 
     def ratelimit(self, host):
+        """Used to ratelimit an incoming request from given host
+
+        Example usage:
+
+            with rate_limiter.ratelimit(origin) as wait_deferred:
+                yield wait_deferred
+                # Handle request ...
+
+        Args:
+            host (str): Origin of incoming request.
+
+        Returns:
+            _PerHostRatelimiter
+        """
         return self.ratelimiters.setdefault(
             host,
-            PerHostRatelimiter(
+            _PerHostRatelimiter(
                 clock=self.clock,
                 window_size=self.window_size,
                 sleep_limit=self.sleep_limit,
@@ -56,7 +85,7 @@ class FederationRateLimiter(object):
         ).ratelimit()
 
 
-class PerHostRatelimiter(object):
+class _PerHostRatelimiter(object):
     def __init__(self, clock, window_size, sleep_limit, sleep_msec,
                  reject_limit, concurrent_requests):
         self.clock = clock
@@ -123,17 +152,25 @@ class PerHostRatelimiter(object):
             else:
                 return defer.succeed(None)
 
-        logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times))
-        logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times))
+        logger.debug(
+            "Ratelimit [%s]: len(self.request_times)=%d",
+            id(request_id), len(self.request_times),
+        )
 
         if len(self.request_times) > self.sleep_limit:
-            logger.debug("Ratelimit [%s]: sleeping req", id(request_id))
+            logger.debug(
+                "Ratelimit [%s]: sleeping req",
+                id(request_id),
+            )
             ret_defer = sleep(self.sleep_msec/1000.0)
 
             self.sleeping_requests.add(request_id)
 
             def on_wait_finished(_):
-                logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id))
+                logger.debug(
+                    "Ratelimit [%s]: Finished sleeping",
+                    id(request_id),
+                )
                 self.sleeping_requests.discard(request_id)
                 queue_defer = queue_request()
                 return queue_defer
@@ -143,7 +180,10 @@ class PerHostRatelimiter(object):
             ret_defer = queue_request()
 
         def on_start(r):
-            logger.debug("Ratelimit [%s]: Processing req", id(request_id))
+            logger.debug(
+                "Ratelimit [%s]: Processing req",
+                id(request_id),
+            )
             self.current_processing.add(request_id)
             return r
 
@@ -162,7 +202,10 @@ class PerHostRatelimiter(object):
         return ret_defer
 
     def _on_exit(self, request_id):
-        logger.debug("Ratelimit [%s]: Processed req", id(request_id))
+        logger.debug(
+            "Ratelimit [%s]: Processed req",
+            id(request_id),
+        )
         self.current_processing.discard(request_id)
         try:
             request_id, deferred = self.ready_request_queue.popitem()