diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a9e625f127..390e54b9fb 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -32,6 +32,21 @@ logger = logging.getLogger(__name__)
class FederationRateLimiter(object):
def __init__(self, clock, window_size, sleep_limit, sleep_msec,
reject_limit, concurrent_requests):
+ """
+ Args:
+ clock (Clock)
+ window_size (int): The window size in milliseconds.
+ sleep_limit (int): The number of requests received in the last
+ `window_size` milliseconds before we artificially start
+ delaying processing of requests.
+ sleep_msec (int): The number of milliseconds to delay processing
+ of incoming requests by.
+ reject_limit (int): The maximum number of requests that are can be
+ queued for processing before we start rejecting requests with
+ a 429 Too Many Requests response.
+ concurrent_requests (int): The number of concurrent requests to
+ process.
+ """
self.clock = clock
self.window_size = window_size
@@ -43,9 +58,23 @@ class FederationRateLimiter(object):
self.ratelimiters = {}
def ratelimit(self, host):
+ """Used to ratelimit an incoming request from given host
+
+ Example usage:
+
+ with rate_limiter.ratelimit(origin) as wait_deferred:
+ yield wait_deferred
+ # Handle request ...
+
+ Args:
+ host (str): Origin of incoming request.
+
+ Returns:
+ _PerHostRatelimiter
+ """
return self.ratelimiters.setdefault(
host,
- PerHostRatelimiter(
+ _PerHostRatelimiter(
clock=self.clock,
window_size=self.window_size,
sleep_limit=self.sleep_limit,
@@ -56,7 +85,7 @@ class FederationRateLimiter(object):
).ratelimit()
-class PerHostRatelimiter(object):
+class _PerHostRatelimiter(object):
def __init__(self, clock, window_size, sleep_limit, sleep_msec,
reject_limit, concurrent_requests):
self.clock = clock
@@ -123,17 +152,25 @@ class PerHostRatelimiter(object):
else:
return defer.succeed(None)
- logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times))
- logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times))
+ logger.debug(
+ "Ratelimit [%s]: len(self.request_times)=%d",
+ id(request_id), len(self.request_times),
+ )
if len(self.request_times) > self.sleep_limit:
- logger.debug("Ratelimit [%s]: sleeping req", id(request_id))
+ logger.debug(
+ "Ratelimit [%s]: sleeping req",
+ id(request_id),
+ )
ret_defer = sleep(self.sleep_msec/1000.0)
self.sleeping_requests.add(request_id)
def on_wait_finished(_):
- logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id))
+ logger.debug(
+ "Ratelimit [%s]: Finished sleeping",
+ id(request_id),
+ )
self.sleeping_requests.discard(request_id)
queue_defer = queue_request()
return queue_defer
@@ -143,7 +180,10 @@ class PerHostRatelimiter(object):
ret_defer = queue_request()
def on_start(r):
- logger.debug("Ratelimit [%s]: Processing req", id(request_id))
+ logger.debug(
+ "Ratelimit [%s]: Processing req",
+ id(request_id),
+ )
self.current_processing.add(request_id)
return r
@@ -162,7 +202,10 @@ class PerHostRatelimiter(object):
return ret_defer
def _on_exit(self, request_id):
- logger.debug("Ratelimit [%s]: Processed req", id(request_id))
+ logger.debug(
+ "Ratelimit [%s]: Processed req",
+ id(request_id),
+ )
self.current_processing.discard(request_id)
try:
request_id, deferred = self.ready_request_queue.popitem()
|