summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/config/ratelimiting.py36
-rw-r--r--synapse/federation/transport/__init__.py12
-rw-r--r--synapse/federation/transport/server.py14
-rw-r--r--synapse/util/ratelimitutils.py216
4 files changed, 275 insertions, 3 deletions
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 17c7e64ce7..862c07ef8c 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -22,6 +22,12 @@ class RatelimitConfig(Config):
         self.rc_messages_per_second = args.rc_messages_per_second
         self.rc_message_burst_count = args.rc_message_burst_count
 
+        self.federation_rc_window_size = args.federation_rc_window_size
+        self.federation_rc_sleep_limit = args.federation_rc_sleep_limit
+        self.federation_rc_sleep_delay = args.federation_rc_sleep_delay
+        self.federation_rc_reject_limit = args.federation_rc_reject_limit
+        self.federation_rc_concurrent = args.federation_rc_concurrent
+
     @classmethod
     def add_arguments(cls, parser):
         super(RatelimitConfig, cls).add_arguments(parser)
@@ -34,3 +40,33 @@ class RatelimitConfig(Config):
             "--rc-message-burst-count", type=float, default=10,
             help="number of message a client can send before being throttled"
         )
+
+        rc_group.add_argument(
+            "--federation-rc-window-size", type=int, default=10000,
+            help="The federation window size in milliseconds",
+        )
+
+        rc_group.add_argument(
+            "--federation-rc-sleep-limit", type=int, default=10,
+            help="The number of federation requests from a single server"
+                 " in a window before the server will delay processing the"
+                 " request.",
+        )
+
+        rc_group.add_argument(
+            "--federation-rc-sleep-delay", type=int, default=500,
+            help="The duration in milliseconds to delay processing events from"
+                 " remote servers by if they go over the sleep limit.",
+        )
+
+        rc_group.add_argument(
+            "--federation-rc-reject-limit", type=int, default=50,
+            help="The maximum number of concurrent federation requests allowed"
+                 " from a single server",
+        )
+
+        rc_group.add_argument(
+            "--federation-rc-concurrent", type=int, default=3,
+            help="The number of federation requests to concurrently process"
+                 " from a single server",
+        )
diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py
index 6800ac46c5..2a671b9aec 100644
--- a/synapse/federation/transport/__init__.py
+++ b/synapse/federation/transport/__init__.py
@@ -24,6 +24,8 @@ communicate over a different (albeit still reliable) protocol.
 from .server import TransportLayerServer
 from .client import TransportLayerClient
 
+from synapse.util.ratelimitutils import FederationRateLimiter
+
 
 class TransportLayer(TransportLayerServer, TransportLayerClient):
     """This is a basic implementation of the transport layer that translates
@@ -55,8 +57,18 @@ class TransportLayer(TransportLayerServer, TransportLayerClient):
                 send requests
         """
         self.keyring = homeserver.get_keyring()
+        self.clock = homeserver.get_clock()
         self.server_name = server_name
         self.server = server
         self.client = client
         self.request_handler = None
         self.received_handler = None
+
+        self.ratelimiter = FederationRateLimiter(
+            self.clock,
+            window_size=homeserver.config.federation_rc_window_size,
+            sleep_limit=homeserver.config.federation_rc_sleep_limit,
+            sleep_msec=homeserver.config.federation_rc_sleep_delay,
+            reject_limit=homeserver.config.federation_rc_reject_limit,
+            concurrent_requests=homeserver.config.federation_rc_concurrent,
+        )
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2ffb37aa18..fce9c0195e 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -98,15 +98,23 @@ class TransportLayerServer(object):
         def new_handler(request, *args, **kwargs):
             try:
                 (origin, content) = yield self._authenticate_request(request)
-                response = yield handler(
-                    origin, content, request.args, *args, **kwargs
-                )
+                with self.ratelimiter.ratelimit(origin) as d:
+                    yield d
+                    response = yield handler(
+                        origin, content, request.args, *args, **kwargs
+                    )
             except:
                 logger.exception("_authenticate_request failed")
                 raise
             defer.returnValue(response)
         return new_handler
 
+    def rate_limit_origin(self, handler):
+        def new_handler(origin, *args, **kwargs):
+            response = yield handler(origin, *args, **kwargs)
+            defer.returnValue(response)
+        return new_handler()
+
     @log_function
     def register_received_handler(self, handler):
         """ Register a handler that will be fired when we receive data.
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
new file mode 100644
index 0000000000..d4457af950
--- /dev/null
+++ b/synapse/util/ratelimitutils.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.errors import LimitExceededError
+
+from synapse.util.async import sleep
+
+import collections
+import contextlib
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class FederationRateLimiter(object):
+    def __init__(self, clock, window_size, sleep_limit, sleep_msec,
+                 reject_limit, concurrent_requests):
+        """
+        Args:
+            clock (Clock)
+            window_size (int): The window size in milliseconds.
+            sleep_limit (int): The number of requests received in the last
+                `window_size` milliseconds before we artificially start
+                delaying processing of requests.
+            sleep_msec (int): The number of milliseconds to delay processing
+                of incoming requests by.
+            reject_limit (int): The maximum number of requests that are can be
+                queued for processing before we start rejecting requests with
+                a 429 Too Many Requests response.
+            concurrent_requests (int): The number of concurrent requests to
+                process.
+        """
+        self.clock = clock
+
+        self.window_size = window_size
+        self.sleep_limit = sleep_limit
+        self.sleep_msec = sleep_msec
+        self.reject_limit = reject_limit
+        self.concurrent_requests = concurrent_requests
+
+        self.ratelimiters = {}
+
+    def ratelimit(self, host):
+        """Used to ratelimit an incoming request from given host
+
+        Example usage:
+
+            with rate_limiter.ratelimit(origin) as wait_deferred:
+                yield wait_deferred
+                # Handle request ...
+
+        Args:
+            host (str): Origin of incoming request.
+
+        Returns:
+            _PerHostRatelimiter
+        """
+        return self.ratelimiters.setdefault(
+            host,
+            _PerHostRatelimiter(
+                clock=self.clock,
+                window_size=self.window_size,
+                sleep_limit=self.sleep_limit,
+                sleep_msec=self.sleep_msec,
+                reject_limit=self.reject_limit,
+                concurrent_requests=self.concurrent_requests,
+            )
+        ).ratelimit()
+
+
+class _PerHostRatelimiter(object):
+    def __init__(self, clock, window_size, sleep_limit, sleep_msec,
+                 reject_limit, concurrent_requests):
+        self.clock = clock
+
+        self.window_size = window_size
+        self.sleep_limit = sleep_limit
+        self.sleep_msec = sleep_msec
+        self.reject_limit = reject_limit
+        self.concurrent_requests = concurrent_requests
+
+        self.sleeping_requests = set()
+        self.ready_request_queue = collections.OrderedDict()
+        self.current_processing = set()
+        self.request_times = []
+
+    def is_empty(self):
+        time_now = self.clock.time_msec()
+        self.request_times[:] = [
+            r for r in self.request_times
+            if time_now - r < self.window_size
+        ]
+
+        return not (
+            self.ready_request_queue
+            or self.sleeping_requests
+            or self.current_processing
+            or self.request_times
+        )
+
+    @contextlib.contextmanager
+    def ratelimit(self):
+        # `contextlib.contextmanager` takes a generator and turns it into a
+        # context manager. The generator should only yield once with a value
+        # to be returned by manager.
+        # Exceptions will be reraised at the yield.
+
+        request_id = object()
+        ret = self._on_enter(request_id)
+        try:
+            yield ret
+        finally:
+            self._on_exit(request_id)
+
+    def _on_enter(self, request_id):
+        time_now = self.clock.time_msec()
+        self.request_times[:] = [
+            r for r in self.request_times
+            if time_now - r < self.window_size
+        ]
+
+        queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
+        if queue_size > self.reject_limit:
+            raise LimitExceededError(
+                retry_after_ms=int(
+                    self.window_size / self.sleep_limit
+                ),
+            )
+
+        self.request_times.append(time_now)
+
+        def queue_request():
+            if len(self.current_processing) > self.concurrent_requests:
+                logger.debug("Ratelimit [%s]: Queue req", id(request_id))
+                queue_defer = defer.Deferred()
+                self.ready_request_queue[request_id] = queue_defer
+                return queue_defer
+            else:
+                return defer.succeed(None)
+
+        logger.debug(
+            "Ratelimit [%s]: len(self.request_times)=%d",
+            id(request_id), len(self.request_times),
+        )
+
+        if len(self.request_times) > self.sleep_limit:
+            logger.debug(
+                "Ratelimit [%s]: sleeping req",
+                id(request_id),
+            )
+            ret_defer = sleep(self.sleep_msec/1000.0)
+
+            self.sleeping_requests.add(request_id)
+
+            def on_wait_finished(_):
+                logger.debug(
+                    "Ratelimit [%s]: Finished sleeping",
+                    id(request_id),
+                )
+                self.sleeping_requests.discard(request_id)
+                queue_defer = queue_request()
+                return queue_defer
+
+            ret_defer.addBoth(on_wait_finished)
+        else:
+            ret_defer = queue_request()
+
+        def on_start(r):
+            logger.debug(
+                "Ratelimit [%s]: Processing req",
+                id(request_id),
+            )
+            self.current_processing.add(request_id)
+            return r
+
+        def on_err(r):
+            self.current_processing.discard(request_id)
+            return r
+
+        def on_both(r):
+            # Ensure that we've properly cleaned up.
+            self.sleeping_requests.discard(request_id)
+            self.ready_request_queue.pop(request_id, None)
+            return r
+
+        ret_defer.addCallbacks(on_start, on_err)
+        ret_defer.addBoth(on_both)
+        return ret_defer
+
+    def _on_exit(self, request_id):
+        logger.debug(
+            "Ratelimit [%s]: Processed req",
+            id(request_id),
+        )
+        self.current_processing.discard(request_id)
+        try:
+            request_id, deferred = self.ready_request_queue.popitem()
+            self.current_processing.add(request_id)
+            deferred.callback(None)
+        except KeyError:
+            pass