diff options
Diffstat (limited to 'synapse/federation/transport')
-rw-r--r-- | synapse/federation/transport/server.py | 59 |
1 files changed, 51 insertions, 8 deletions
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a9e625f127..390e54b9fb 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -32,6 +32,21 @@ logger = logging.getLogger(__name__) class FederationRateLimiter(object): def __init__(self, clock, window_size, sleep_limit, sleep_msec, reject_limit, concurrent_requests): + """ + Args: + clock (Clock) + window_size (int): The window size in milliseconds. + sleep_limit (int): The number of requests received in the last + `window_size` milliseconds before we artificially start + delaying processing of requests. + sleep_msec (int): The number of milliseconds to delay processing + of incoming requests by. + reject_limit (int): The maximum number of requests that are can be + queued for processing before we start rejecting requests with + a 429 Too Many Requests response. + concurrent_requests (int): The number of concurrent requests to + process. + """ self.clock = clock self.window_size = window_size @@ -43,9 +58,23 @@ class FederationRateLimiter(object): self.ratelimiters = {} def ratelimit(self, host): + """Used to ratelimit an incoming request from given host + + Example usage: + + with rate_limiter.ratelimit(origin) as wait_deferred: + yield wait_deferred + # Handle request ... + + Args: + host (str): Origin of incoming request. + + Returns: + _PerHostRatelimiter + """ return self.ratelimiters.setdefault( host, - PerHostRatelimiter( + _PerHostRatelimiter( clock=self.clock, window_size=self.window_size, sleep_limit=self.sleep_limit, @@ -56,7 +85,7 @@ class FederationRateLimiter(object): ).ratelimit() -class PerHostRatelimiter(object): +class _PerHostRatelimiter(object): def __init__(self, clock, window_size, sleep_limit, sleep_msec, reject_limit, concurrent_requests): self.clock = clock @@ -123,17 +152,25 @@ class PerHostRatelimiter(object): else: return defer.succeed(None) - logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) - logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) + logger.debug( + "Ratelimit [%s]: len(self.request_times)=%d", + id(request_id), len(self.request_times), + ) if len(self.request_times) > self.sleep_limit: - logger.debug("Ratelimit [%s]: sleeping req", id(request_id)) + logger.debug( + "Ratelimit [%s]: sleeping req", + id(request_id), + ) ret_defer = sleep(self.sleep_msec/1000.0) self.sleeping_requests.add(request_id) def on_wait_finished(_): - logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id)) + logger.debug( + "Ratelimit [%s]: Finished sleeping", + id(request_id), + ) self.sleeping_requests.discard(request_id) queue_defer = queue_request() return queue_defer @@ -143,7 +180,10 @@ class PerHostRatelimiter(object): ret_defer = queue_request() def on_start(r): - logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + logger.debug( + "Ratelimit [%s]: Processing req", + id(request_id), + ) self.current_processing.add(request_id) return r @@ -162,7 +202,10 @@ class PerHostRatelimiter(object): return ret_defer def _on_exit(self, request_id): - logger.debug("Ratelimit [%s]: Processed req", id(request_id)) + logger.debug( + "Ratelimit [%s]: Processed req", + id(request_id), + ) self.current_processing.discard(request_id) try: request_id, deferred = self.ready_request_queue.popitem() |