summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-02-27 15:41:52 +0000
committerErik Johnston <erik@matrix.org>2015-02-27 15:41:52 +0000
commit0554d0708225afe13d141bd00e3aaca2509f3f78 (patch)
tree137eb28e98e8c086bccd78ca815f37daa5f07e9b /synapse/federation
parentDocument FederationRateLimiter (diff)
downloadsynapse-0554d0708225afe13d141bd00e3aaca2509f3f78.tar.xz
Move federation rate limiting out of transport layer
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/transport/__init__.py4
-rw-r--r--synapse/federation/transport/server.py204
2 files changed, 4 insertions, 204 deletions
diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py
index 7028ca6947..f0283b5105 100644
--- a/synapse/federation/transport/__init__.py
+++ b/synapse/federation/transport/__init__.py
@@ -21,9 +21,11 @@ support HTTPS), however individual pairings of servers may decide to
 communicate over a different (albeit still reliable) protocol.
 """
 
-from .server import TransportLayerServer, FederationRateLimiter
+from .server import TransportLayerServer
 from .client import TransportLayerClient
 
+from synapse.util.ratelimitutils import FederationRateLimiter
+
 
 class TransportLayer(TransportLayerServer, TransportLayerClient):
     """This is a basic implementation of the transport layer that translates
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 390e54b9fb..fce9c0195e 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -16,11 +16,9 @@
 from twisted.internet import defer
 
 from synapse.api.urls import FEDERATION_PREFIX as PREFIX
-from synapse.api.errors import Codes, SynapseError, LimitExceededError
-from synapse.util.async import sleep
+from synapse.api.errors import Codes, SynapseError
 from synapse.util.logutils import log_function
 
-import collections
 import logging
 import simplejson as json
 import re
@@ -29,206 +27,6 @@ import re
 logger = logging.getLogger(__name__)
 
 
-class FederationRateLimiter(object):
-    def __init__(self, clock, window_size, sleep_limit, sleep_msec,
-                 reject_limit, concurrent_requests):
-        """
-        Args:
-            clock (Clock)
-            window_size (int): The window size in milliseconds.
-            sleep_limit (int): The number of requests received in the last
-                `window_size` milliseconds before we artificially start
-                delaying processing of requests.
-            sleep_msec (int): The number of milliseconds to delay processing
-                of incoming requests by.
-            reject_limit (int): The maximum number of requests that are can be
-                queued for processing before we start rejecting requests with
-                a 429 Too Many Requests response.
-            concurrent_requests (int): The number of concurrent requests to
-                process.
-        """
-        self.clock = clock
-
-        self.window_size = window_size
-        self.sleep_limit = sleep_limit
-        self.sleep_msec = sleep_msec
-        self.reject_limit = reject_limit
-        self.concurrent_requests = concurrent_requests
-
-        self.ratelimiters = {}
-
-    def ratelimit(self, host):
-        """Used to ratelimit an incoming request from given host
-
-        Example usage:
-
-            with rate_limiter.ratelimit(origin) as wait_deferred:
-                yield wait_deferred
-                # Handle request ...
-
-        Args:
-            host (str): Origin of incoming request.
-
-        Returns:
-            _PerHostRatelimiter
-        """
-        return self.ratelimiters.setdefault(
-            host,
-            _PerHostRatelimiter(
-                clock=self.clock,
-                window_size=self.window_size,
-                sleep_limit=self.sleep_limit,
-                sleep_msec=self.sleep_msec,
-                reject_limit=self.reject_limit,
-                concurrent_requests=self.concurrent_requests,
-            )
-        ).ratelimit()
-
-
-class _PerHostRatelimiter(object):
-    def __init__(self, clock, window_size, sleep_limit, sleep_msec,
-                 reject_limit, concurrent_requests):
-        self.clock = clock
-
-        self.window_size = window_size
-        self.sleep_limit = sleep_limit
-        self.sleep_msec = sleep_msec
-        self.reject_limit = reject_limit
-        self.concurrent_requests = concurrent_requests
-
-        self.sleeping_requests = set()
-        self.ready_request_queue = collections.OrderedDict()
-        self.current_processing = set()
-        self.request_times = []
-
-    def is_empty(self):
-        time_now = self.clock.time_msec()
-        self.request_times[:] = [
-            r for r in self.request_times
-            if time_now - r < self.window_size
-        ]
-
-        return not (
-            self.ready_request_queue
-            or self.sleeping_requests
-            or self.current_processing
-            or self.request_times
-        )
-
-    def ratelimit(self):
-        request_id = object()
-
-        def on_enter():
-            return self._on_enter(request_id)
-
-        def on_exit(exc_type, exc_val, exc_tb):
-            return self._on_exit(request_id)
-
-        return ContextManagerFunction(on_enter, on_exit)
-
-    def _on_enter(self, request_id):
-        time_now = self.clock.time_msec()
-        self.request_times[:] = [
-            r for r in self.request_times
-            if time_now - r < self.window_size
-        ]
-
-        queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
-        if queue_size > self.reject_limit:
-            raise LimitExceededError(
-                retry_after_ms=int(
-                    self.window_size / self.sleep_limit
-                ),
-            )
-
-        self.request_times.append(time_now)
-
-        def queue_request():
-            if len(self.current_processing) > self.concurrent_requests:
-                logger.debug("Ratelimit [%s]: Queue req", id(request_id))
-                queue_defer = defer.Deferred()
-                self.ready_request_queue[request_id] = queue_defer
-                return queue_defer
-            else:
-                return defer.succeed(None)
-
-        logger.debug(
-            "Ratelimit [%s]: len(self.request_times)=%d",
-            id(request_id), len(self.request_times),
-        )
-
-        if len(self.request_times) > self.sleep_limit:
-            logger.debug(
-                "Ratelimit [%s]: sleeping req",
-                id(request_id),
-            )
-            ret_defer = sleep(self.sleep_msec/1000.0)
-
-            self.sleeping_requests.add(request_id)
-
-            def on_wait_finished(_):
-                logger.debug(
-                    "Ratelimit [%s]: Finished sleeping",
-                    id(request_id),
-                )
-                self.sleeping_requests.discard(request_id)
-                queue_defer = queue_request()
-                return queue_defer
-
-            ret_defer.addBoth(on_wait_finished)
-        else:
-            ret_defer = queue_request()
-
-        def on_start(r):
-            logger.debug(
-                "Ratelimit [%s]: Processing req",
-                id(request_id),
-            )
-            self.current_processing.add(request_id)
-            return r
-
-        def on_err(r):
-            self.current_processing.discard(request_id)
-            return r
-
-        def on_both(r):
-            # Ensure that we've properly cleaned up.
-            self.sleeping_requests.discard(request_id)
-            self.ready_request_queue.pop(request_id, None)
-            return r
-
-        ret_defer.addCallbacks(on_start, on_err)
-        ret_defer.addBoth(on_both)
-        return ret_defer
-
-    def _on_exit(self, request_id):
-        logger.debug(
-            "Ratelimit [%s]: Processed req",
-            id(request_id),
-        )
-        self.current_processing.discard(request_id)
-        try:
-            request_id, deferred = self.ready_request_queue.popitem()
-            self.current_processing.add(request_id)
-            deferred.callback(None)
-        except KeyError:
-            pass
-
-
-class ContextManagerFunction(object):
-    def __init__(self, on_enter, on_exit):
-        self.on_enter = on_enter
-        self.on_exit = on_exit
-
-    def __enter__(self):
-        if self.on_enter:
-            return self.on_enter()
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        if self.on_exit:
-            return self.on_exit(exc_type, exc_val, exc_tb)
-
-
 class TransportLayerServer(object):
     """Handles incoming federation HTTP requests"""