diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py
index 7028ca6947..f0283b5105 100644
--- a/synapse/federation/transport/__init__.py
+++ b/synapse/federation/transport/__init__.py
@@ -21,9 +21,11 @@ support HTTPS), however individual pairings of servers may decide to
communicate over a different (albeit still reliable) protocol.
"""
-from .server import TransportLayerServer, FederationRateLimiter
+from .server import TransportLayerServer
from .client import TransportLayerClient
+from synapse.util.ratelimitutils import FederationRateLimiter
+
class TransportLayer(TransportLayerServer, TransportLayerClient):
"""This is a basic implementation of the transport layer that translates
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 390e54b9fb..fce9c0195e 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -16,11 +16,9 @@
from twisted.internet import defer
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
-from synapse.api.errors import Codes, SynapseError, LimitExceededError
-from synapse.util.async import sleep
+from synapse.api.errors import Codes, SynapseError
from synapse.util.logutils import log_function
-import collections
import logging
import simplejson as json
import re
@@ -29,206 +27,6 @@ import re
logger = logging.getLogger(__name__)
-class FederationRateLimiter(object):
- def __init__(self, clock, window_size, sleep_limit, sleep_msec,
- reject_limit, concurrent_requests):
- """
- Args:
- clock (Clock)
- window_size (int): The window size in milliseconds.
- sleep_limit (int): The number of requests received in the last
- `window_size` milliseconds before we artificially start
- delaying processing of requests.
- sleep_msec (int): The number of milliseconds to delay processing
- of incoming requests by.
- reject_limit (int): The maximum number of requests that are can be
- queued for processing before we start rejecting requests with
- a 429 Too Many Requests response.
- concurrent_requests (int): The number of concurrent requests to
- process.
- """
- self.clock = clock
-
- self.window_size = window_size
- self.sleep_limit = sleep_limit
- self.sleep_msec = sleep_msec
- self.reject_limit = reject_limit
- self.concurrent_requests = concurrent_requests
-
- self.ratelimiters = {}
-
- def ratelimit(self, host):
- """Used to ratelimit an incoming request from given host
-
- Example usage:
-
- with rate_limiter.ratelimit(origin) as wait_deferred:
- yield wait_deferred
- # Handle request ...
-
- Args:
- host (str): Origin of incoming request.
-
- Returns:
- _PerHostRatelimiter
- """
- return self.ratelimiters.setdefault(
- host,
- _PerHostRatelimiter(
- clock=self.clock,
- window_size=self.window_size,
- sleep_limit=self.sleep_limit,
- sleep_msec=self.sleep_msec,
- reject_limit=self.reject_limit,
- concurrent_requests=self.concurrent_requests,
- )
- ).ratelimit()
-
-
-class _PerHostRatelimiter(object):
- def __init__(self, clock, window_size, sleep_limit, sleep_msec,
- reject_limit, concurrent_requests):
- self.clock = clock
-
- self.window_size = window_size
- self.sleep_limit = sleep_limit
- self.sleep_msec = sleep_msec
- self.reject_limit = reject_limit
- self.concurrent_requests = concurrent_requests
-
- self.sleeping_requests = set()
- self.ready_request_queue = collections.OrderedDict()
- self.current_processing = set()
- self.request_times = []
-
- def is_empty(self):
- time_now = self.clock.time_msec()
- self.request_times[:] = [
- r for r in self.request_times
- if time_now - r < self.window_size
- ]
-
- return not (
- self.ready_request_queue
- or self.sleeping_requests
- or self.current_processing
- or self.request_times
- )
-
- def ratelimit(self):
- request_id = object()
-
- def on_enter():
- return self._on_enter(request_id)
-
- def on_exit(exc_type, exc_val, exc_tb):
- return self._on_exit(request_id)
-
- return ContextManagerFunction(on_enter, on_exit)
-
- def _on_enter(self, request_id):
- time_now = self.clock.time_msec()
- self.request_times[:] = [
- r for r in self.request_times
- if time_now - r < self.window_size
- ]
-
- queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
- if queue_size > self.reject_limit:
- raise LimitExceededError(
- retry_after_ms=int(
- self.window_size / self.sleep_limit
- ),
- )
-
- self.request_times.append(time_now)
-
- def queue_request():
- if len(self.current_processing) > self.concurrent_requests:
- logger.debug("Ratelimit [%s]: Queue req", id(request_id))
- queue_defer = defer.Deferred()
- self.ready_request_queue[request_id] = queue_defer
- return queue_defer
- else:
- return defer.succeed(None)
-
- logger.debug(
- "Ratelimit [%s]: len(self.request_times)=%d",
- id(request_id), len(self.request_times),
- )
-
- if len(self.request_times) > self.sleep_limit:
- logger.debug(
- "Ratelimit [%s]: sleeping req",
- id(request_id),
- )
- ret_defer = sleep(self.sleep_msec/1000.0)
-
- self.sleeping_requests.add(request_id)
-
- def on_wait_finished(_):
- logger.debug(
- "Ratelimit [%s]: Finished sleeping",
- id(request_id),
- )
- self.sleeping_requests.discard(request_id)
- queue_defer = queue_request()
- return queue_defer
-
- ret_defer.addBoth(on_wait_finished)
- else:
- ret_defer = queue_request()
-
- def on_start(r):
- logger.debug(
- "Ratelimit [%s]: Processing req",
- id(request_id),
- )
- self.current_processing.add(request_id)
- return r
-
- def on_err(r):
- self.current_processing.discard(request_id)
- return r
-
- def on_both(r):
- # Ensure that we've properly cleaned up.
- self.sleeping_requests.discard(request_id)
- self.ready_request_queue.pop(request_id, None)
- return r
-
- ret_defer.addCallbacks(on_start, on_err)
- ret_defer.addBoth(on_both)
- return ret_defer
-
- def _on_exit(self, request_id):
- logger.debug(
- "Ratelimit [%s]: Processed req",
- id(request_id),
- )
- self.current_processing.discard(request_id)
- try:
- request_id, deferred = self.ready_request_queue.popitem()
- self.current_processing.add(request_id)
- deferred.callback(None)
- except KeyError:
- pass
-
-
-class ContextManagerFunction(object):
- def __init__(self, on_enter, on_exit):
- self.on_enter = on_enter
- self.on_exit = on_exit
-
- def __enter__(self):
- if self.on_enter:
- return self.on_enter()
-
- def __exit__(self, exc_type, exc_val, exc_tb):
- if self.on_exit:
- return self.on_exit(exc_type, exc_val, exc_tb)
-
-
class TransportLayerServer(object):
"""Handles incoming federation HTTP requests"""
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
new file mode 100644
index 0000000000..259d5f6f88
--- /dev/null
+++ b/synapse/util/ratelimitutils.py
@@ -0,0 +1,226 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.errors import LimitExceededError
+
+from synapse.util.async import sleep
+
+import collections
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class FederationRateLimiter(object):
+ def __init__(self, clock, window_size, sleep_limit, sleep_msec,
+ reject_limit, concurrent_requests):
+ """
+ Args:
+ clock (Clock)
+ window_size (int): The window size in milliseconds.
+ sleep_limit (int): The number of requests received in the last
+ `window_size` milliseconds before we artificially start
+ delaying processing of requests.
+ sleep_msec (int): The number of milliseconds to delay processing
+ of incoming requests by.
+ reject_limit (int): The maximum number of requests that are can be
+ queued for processing before we start rejecting requests with
+ a 429 Too Many Requests response.
+ concurrent_requests (int): The number of concurrent requests to
+ process.
+ """
+ self.clock = clock
+
+ self.window_size = window_size
+ self.sleep_limit = sleep_limit
+ self.sleep_msec = sleep_msec
+ self.reject_limit = reject_limit
+ self.concurrent_requests = concurrent_requests
+
+ self.ratelimiters = {}
+
+ def ratelimit(self, host):
+ """Used to ratelimit an incoming request from given host
+
+ Example usage:
+
+ with rate_limiter.ratelimit(origin) as wait_deferred:
+ yield wait_deferred
+ # Handle request ...
+
+ Args:
+ host (str): Origin of incoming request.
+
+ Returns:
+ _PerHostRatelimiter
+ """
+ return self.ratelimiters.setdefault(
+ host,
+ _PerHostRatelimiter(
+ clock=self.clock,
+ window_size=self.window_size,
+ sleep_limit=self.sleep_limit,
+ sleep_msec=self.sleep_msec,
+ reject_limit=self.reject_limit,
+ concurrent_requests=self.concurrent_requests,
+ )
+ ).ratelimit()
+
+
+class _PerHostRatelimiter(object):
+ def __init__(self, clock, window_size, sleep_limit, sleep_msec,
+ reject_limit, concurrent_requests):
+ self.clock = clock
+
+ self.window_size = window_size
+ self.sleep_limit = sleep_limit
+ self.sleep_msec = sleep_msec
+ self.reject_limit = reject_limit
+ self.concurrent_requests = concurrent_requests
+
+ self.sleeping_requests = set()
+ self.ready_request_queue = collections.OrderedDict()
+ self.current_processing = set()
+ self.request_times = []
+
+ def is_empty(self):
+ time_now = self.clock.time_msec()
+ self.request_times[:] = [
+ r for r in self.request_times
+ if time_now - r < self.window_size
+ ]
+
+ return not (
+ self.ready_request_queue
+ or self.sleeping_requests
+ or self.current_processing
+ or self.request_times
+ )
+
+ def ratelimit(self):
+ request_id = object()
+
+ def on_enter():
+ return self._on_enter(request_id)
+
+ def on_exit(exc_type, exc_val, exc_tb):
+ return self._on_exit(request_id)
+
+ return ContextManagerFunction(on_enter, on_exit)
+
+ def _on_enter(self, request_id):
+ time_now = self.clock.time_msec()
+ self.request_times[:] = [
+ r for r in self.request_times
+ if time_now - r < self.window_size
+ ]
+
+ queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
+ if queue_size > self.reject_limit:
+ raise LimitExceededError(
+ retry_after_ms=int(
+ self.window_size / self.sleep_limit
+ ),
+ )
+
+ self.request_times.append(time_now)
+
+ def queue_request():
+ if len(self.current_processing) > self.concurrent_requests:
+ logger.debug("Ratelimit [%s]: Queue req", id(request_id))
+ queue_defer = defer.Deferred()
+ self.ready_request_queue[request_id] = queue_defer
+ return queue_defer
+ else:
+ return defer.succeed(None)
+
+ logger.debug(
+ "Ratelimit [%s]: len(self.request_times)=%d",
+ id(request_id), len(self.request_times),
+ )
+
+ if len(self.request_times) > self.sleep_limit:
+ logger.debug(
+ "Ratelimit [%s]: sleeping req",
+ id(request_id),
+ )
+ ret_defer = sleep(self.sleep_msec/1000.0)
+
+ self.sleeping_requests.add(request_id)
+
+ def on_wait_finished(_):
+ logger.debug(
+ "Ratelimit [%s]: Finished sleeping",
+ id(request_id),
+ )
+ self.sleeping_requests.discard(request_id)
+ queue_defer = queue_request()
+ return queue_defer
+
+ ret_defer.addBoth(on_wait_finished)
+ else:
+ ret_defer = queue_request()
+
+ def on_start(r):
+ logger.debug(
+ "Ratelimit [%s]: Processing req",
+ id(request_id),
+ )
+ self.current_processing.add(request_id)
+ return r
+
+ def on_err(r):
+ self.current_processing.discard(request_id)
+ return r
+
+ def on_both(r):
+ # Ensure that we've properly cleaned up.
+ self.sleeping_requests.discard(request_id)
+ self.ready_request_queue.pop(request_id, None)
+ return r
+
+ ret_defer.addCallbacks(on_start, on_err)
+ ret_defer.addBoth(on_both)
+ return ret_defer
+
+ def _on_exit(self, request_id):
+ logger.debug(
+ "Ratelimit [%s]: Processed req",
+ id(request_id),
+ )
+ self.current_processing.discard(request_id)
+ try:
+ request_id, deferred = self.ready_request_queue.popitem()
+ self.current_processing.add(request_id)
+ deferred.callback(None)
+ except KeyError:
+ pass
+
+
+class ContextManagerFunction(object):
+ def __init__(self, on_enter, on_exit):
+ self.on_enter = on_enter
+ self.on_exit = on_exit
+
+ def __enter__(self):
+ if self.on_enter:
+ return self.on_enter()
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self.on_exit:
+ return self.on_exit(exc_type, exc_val, exc_tb)
|