summary refs log tree commit diff
path: root/synapse/util/ratelimitutils.py
diff options
context:
space:
mode:
authorMichael Telatynski <7t3chguy@gmail.com>2018-07-24 17:17:46 +0100
committerMichael Telatynski <7t3chguy@gmail.com>2018-07-24 17:17:46 +0100
commit87951d3891efb5bccedf72c12b3da0d6ab482253 (patch)
treede7d997567c66c5a4d8743c1f3b9d6b474f5cfd9 /synapse/util/ratelimitutils.py
parentif inviter_display_name == ""||None then default to inviter MXID (diff)
parentMerge pull request #3595 from matrix-org/erikj/use_deltas (diff)
downloadsynapse-87951d3891efb5bccedf72c12b3da0d6ab482253.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into t3chguy/default_inviter_display_name_3pid
Diffstat (limited to 'synapse/util/ratelimitutils.py')
-rw-r--r--synapse/util/ratelimitutils.py54
1 files changed, 38 insertions, 16 deletions
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 1101881a2d..7deb38f2a7 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -13,17 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
-from synapse.api.errors import LimitExceededError
-
-from synapse.util.async import sleep
-from synapse.util.logcontext import preserve_fn
-
 import collections
 import contextlib
 import logging
 
+from twisted.internet import defer
+
+from synapse.api.errors import LimitExceededError
+from synapse.util.logcontext import (
+    PreserveLoggingContext,
+    make_deferred_yieldable,
+    run_in_background,
+)
 
 logger = logging.getLogger(__name__)
 
@@ -91,13 +92,22 @@ class _PerHostRatelimiter(object):
 
         self.window_size = window_size
         self.sleep_limit = sleep_limit
-        self.sleep_msec = sleep_msec
+        self.sleep_sec = sleep_msec / 1000.0
         self.reject_limit = reject_limit
         self.concurrent_requests = concurrent_requests
 
+        # request_id objects for requests which have been slept
         self.sleeping_requests = set()
+
+        # map from request_id object to Deferred for requests which are ready
+        # for processing but have been queued
         self.ready_request_queue = collections.OrderedDict()
+
+        # request id objects for requests which are in progress
         self.current_processing = set()
+
+        # times at which we have recently (within the last window_size ms)
+        # received requests.
         self.request_times = []
 
     @contextlib.contextmanager
@@ -116,11 +126,15 @@ class _PerHostRatelimiter(object):
 
     def _on_enter(self, request_id):
         time_now = self.clock.time_msec()
+
+        # remove any entries from request_times which aren't within the window
         self.request_times[:] = [
             r for r in self.request_times
             if time_now - r < self.window_size
         ]
 
+        # reject the request if we already have too many queued up (either
+        # sleeping or in the ready queue).
         queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
         if queue_size > self.reject_limit:
             raise LimitExceededError(
@@ -133,9 +147,13 @@ class _PerHostRatelimiter(object):
 
         def queue_request():
             if len(self.current_processing) > self.concurrent_requests:
-                logger.debug("Ratelimit [%s]: Queue req", id(request_id))
                 queue_defer = defer.Deferred()
                 self.ready_request_queue[request_id] = queue_defer
+                logger.info(
+                    "Ratelimiter: queueing request (queue now %i items)",
+                    len(self.ready_request_queue),
+                )
+
                 return queue_defer
             else:
                 return defer.succeed(None)
@@ -147,10 +165,9 @@ class _PerHostRatelimiter(object):
 
         if len(self.request_times) > self.sleep_limit:
             logger.debug(
-                "Ratelimit [%s]: sleeping req",
-                id(request_id),
+                "Ratelimiter: sleeping request for %f sec", self.sleep_sec,
             )
-            ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0)
+            ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
 
             self.sleeping_requests.add(request_id)
 
@@ -176,6 +193,9 @@ class _PerHostRatelimiter(object):
             return r
 
         def on_err(r):
+            # XXX: why is this necessary? this is called before we start
+            # processing the request so why would the request be in
+            # current_processing?
             self.current_processing.discard(request_id)
             return r
 
@@ -187,7 +207,7 @@ class _PerHostRatelimiter(object):
 
         ret_defer.addCallbacks(on_start, on_err)
         ret_defer.addBoth(on_both)
-        return ret_defer
+        return make_deferred_yieldable(ret_defer)
 
     def _on_exit(self, request_id):
         logger.debug(
@@ -196,8 +216,10 @@ class _PerHostRatelimiter(object):
         )
         self.current_processing.discard(request_id)
         try:
-            request_id, deferred = self.ready_request_queue.popitem()
-            self.current_processing.add(request_id)
-            deferred.callback(None)
+            # start processing the next item on the queue.
+            _, deferred = self.ready_request_queue.popitem(last=False)
+
+            with PreserveLoggingContext():
+                deferred.callback(None)
         except KeyError:
             pass