diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 17c7e64ce7..862c07ef8c 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -22,6 +22,12 @@ class RatelimitConfig(Config):
self.rc_messages_per_second = args.rc_messages_per_second
self.rc_message_burst_count = args.rc_message_burst_count
+ self.federation_rc_window_size = args.federation_rc_window_size
+ self.federation_rc_sleep_limit = args.federation_rc_sleep_limit
+ self.federation_rc_sleep_delay = args.federation_rc_sleep_delay
+ self.federation_rc_reject_limit = args.federation_rc_reject_limit
+ self.federation_rc_concurrent = args.federation_rc_concurrent
+
@classmethod
def add_arguments(cls, parser):
super(RatelimitConfig, cls).add_arguments(parser)
@@ -34,3 +40,33 @@ class RatelimitConfig(Config):
"--rc-message-burst-count", type=float, default=10,
help="number of message a client can send before being throttled"
)
+
+ rc_group.add_argument(
+ "--federation-rc-window-size", type=int, default=10000,
+ help="The federation window size in milliseconds",
+ )
+
+ rc_group.add_argument(
+ "--federation-rc-sleep-limit", type=int, default=10,
+ help="The number of federation requests from a single server"
+ " in a window before the server will delay processing the"
+ " request.",
+ )
+
+ rc_group.add_argument(
+ "--federation-rc-sleep-delay", type=int, default=500,
+ help="The duration in milliseconds to delay processing events from"
+ " remote servers by if they go over the sleep limit.",
+ )
+
+ rc_group.add_argument(
+ "--federation-rc-reject-limit", type=int, default=50,
+ help="The maximum number of concurrent federation requests allowed"
+ " from a single server",
+ )
+
+ rc_group.add_argument(
+ "--federation-rc-concurrent", type=int, default=3,
+ help="The number of federation requests to concurrently process"
+ " from a single server",
+ )
diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py
index 6800ac46c5..2a671b9aec 100644
--- a/synapse/federation/transport/__init__.py
+++ b/synapse/federation/transport/__init__.py
@@ -24,6 +24,8 @@ communicate over a different (albeit still reliable) protocol.
from .server import TransportLayerServer
from .client import TransportLayerClient
+from synapse.util.ratelimitutils import FederationRateLimiter
+
class TransportLayer(TransportLayerServer, TransportLayerClient):
"""This is a basic implementation of the transport layer that translates
@@ -55,8 +57,18 @@ class TransportLayer(TransportLayerServer, TransportLayerClient):
send requests
"""
self.keyring = homeserver.get_keyring()
+ self.clock = homeserver.get_clock()
self.server_name = server_name
self.server = server
self.client = client
self.request_handler = None
self.received_handler = None
+
+ self.ratelimiter = FederationRateLimiter(
+ self.clock,
+ window_size=homeserver.config.federation_rc_window_size,
+ sleep_limit=homeserver.config.federation_rc_sleep_limit,
+ sleep_msec=homeserver.config.federation_rc_sleep_delay,
+ reject_limit=homeserver.config.federation_rc_reject_limit,
+ concurrent_requests=homeserver.config.federation_rc_concurrent,
+ )
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2ffb37aa18..fce9c0195e 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -98,15 +98,23 @@ class TransportLayerServer(object):
def new_handler(request, *args, **kwargs):
try:
(origin, content) = yield self._authenticate_request(request)
- response = yield handler(
- origin, content, request.args, *args, **kwargs
- )
+ with self.ratelimiter.ratelimit(origin) as d:
+ yield d
+ response = yield handler(
+ origin, content, request.args, *args, **kwargs
+ )
except:
logger.exception("_authenticate_request failed")
raise
defer.returnValue(response)
return new_handler
+ def rate_limit_origin(self, handler):
+ def new_handler(origin, *args, **kwargs):
+ response = yield handler(origin, *args, **kwargs)
+ defer.returnValue(response)
+ return new_handler()
+
@log_function
def register_received_handler(self, handler):
""" Register a handler that will be fired when we receive data.
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
new file mode 100644
index 0000000000..d4457af950
--- /dev/null
+++ b/synapse/util/ratelimitutils.py
@@ -0,0 +1,216 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.errors import LimitExceededError
+
+from synapse.util.async import sleep
+
+import collections
+import contextlib
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class FederationRateLimiter(object):
+ def __init__(self, clock, window_size, sleep_limit, sleep_msec,
+ reject_limit, concurrent_requests):
+ """
+ Args:
+ clock (Clock)
+ window_size (int): The window size in milliseconds.
+ sleep_limit (int): The number of requests received in the last
+ `window_size` milliseconds before we artificially start
+ delaying processing of requests.
+ sleep_msec (int): The number of milliseconds to delay processing
+ of incoming requests by.
+ reject_limit (int): The maximum number of requests that are can be
+ queued for processing before we start rejecting requests with
+ a 429 Too Many Requests response.
+ concurrent_requests (int): The number of concurrent requests to
+ process.
+ """
+ self.clock = clock
+
+ self.window_size = window_size
+ self.sleep_limit = sleep_limit
+ self.sleep_msec = sleep_msec
+ self.reject_limit = reject_limit
+ self.concurrent_requests = concurrent_requests
+
+ self.ratelimiters = {}
+
+ def ratelimit(self, host):
+ """Used to ratelimit an incoming request from given host
+
+ Example usage:
+
+ with rate_limiter.ratelimit(origin) as wait_deferred:
+ yield wait_deferred
+ # Handle request ...
+
+ Args:
+ host (str): Origin of incoming request.
+
+ Returns:
+ _PerHostRatelimiter
+ """
+ return self.ratelimiters.setdefault(
+ host,
+ _PerHostRatelimiter(
+ clock=self.clock,
+ window_size=self.window_size,
+ sleep_limit=self.sleep_limit,
+ sleep_msec=self.sleep_msec,
+ reject_limit=self.reject_limit,
+ concurrent_requests=self.concurrent_requests,
+ )
+ ).ratelimit()
+
+
+class _PerHostRatelimiter(object):
+ def __init__(self, clock, window_size, sleep_limit, sleep_msec,
+ reject_limit, concurrent_requests):
+ self.clock = clock
+
+ self.window_size = window_size
+ self.sleep_limit = sleep_limit
+ self.sleep_msec = sleep_msec
+ self.reject_limit = reject_limit
+ self.concurrent_requests = concurrent_requests
+
+ self.sleeping_requests = set()
+ self.ready_request_queue = collections.OrderedDict()
+ self.current_processing = set()
+ self.request_times = []
+
+ def is_empty(self):
+ time_now = self.clock.time_msec()
+ self.request_times[:] = [
+ r for r in self.request_times
+ if time_now - r < self.window_size
+ ]
+
+ return not (
+ self.ready_request_queue
+ or self.sleeping_requests
+ or self.current_processing
+ or self.request_times
+ )
+
+ @contextlib.contextmanager
+ def ratelimit(self):
+ # `contextlib.contextmanager` takes a generator and turns it into a
+ # context manager. The generator should only yield once with a value
+ # to be returned by manager.
+ # Exceptions will be reraised at the yield.
+
+ request_id = object()
+ ret = self._on_enter(request_id)
+ try:
+ yield ret
+ finally:
+ self._on_exit(request_id)
+
+ def _on_enter(self, request_id):
+ time_now = self.clock.time_msec()
+ self.request_times[:] = [
+ r for r in self.request_times
+ if time_now - r < self.window_size
+ ]
+
+ queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
+ if queue_size > self.reject_limit:
+ raise LimitExceededError(
+ retry_after_ms=int(
+ self.window_size / self.sleep_limit
+ ),
+ )
+
+ self.request_times.append(time_now)
+
+ def queue_request():
+ if len(self.current_processing) > self.concurrent_requests:
+ logger.debug("Ratelimit [%s]: Queue req", id(request_id))
+ queue_defer = defer.Deferred()
+ self.ready_request_queue[request_id] = queue_defer
+ return queue_defer
+ else:
+ return defer.succeed(None)
+
+ logger.debug(
+ "Ratelimit [%s]: len(self.request_times)=%d",
+ id(request_id), len(self.request_times),
+ )
+
+ if len(self.request_times) > self.sleep_limit:
+ logger.debug(
+ "Ratelimit [%s]: sleeping req",
+ id(request_id),
+ )
+ ret_defer = sleep(self.sleep_msec/1000.0)
+
+ self.sleeping_requests.add(request_id)
+
+ def on_wait_finished(_):
+ logger.debug(
+ "Ratelimit [%s]: Finished sleeping",
+ id(request_id),
+ )
+ self.sleeping_requests.discard(request_id)
+ queue_defer = queue_request()
+ return queue_defer
+
+ ret_defer.addBoth(on_wait_finished)
+ else:
+ ret_defer = queue_request()
+
+ def on_start(r):
+ logger.debug(
+ "Ratelimit [%s]: Processing req",
+ id(request_id),
+ )
+ self.current_processing.add(request_id)
+ return r
+
+ def on_err(r):
+ self.current_processing.discard(request_id)
+ return r
+
+ def on_both(r):
+ # Ensure that we've properly cleaned up.
+ self.sleeping_requests.discard(request_id)
+ self.ready_request_queue.pop(request_id, None)
+ return r
+
+ ret_defer.addCallbacks(on_start, on_err)
+ ret_defer.addBoth(on_both)
+ return ret_defer
+
+ def _on_exit(self, request_id):
+ logger.debug(
+ "Ratelimit [%s]: Processed req",
+ id(request_id),
+ )
+ self.current_processing.discard(request_id)
+ try:
+ request_id, deferred = self.ready_request_queue.popitem()
+ self.current_processing.add(request_id)
+ deferred.callback(None)
+ except KeyError:
+ pass
|