summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-07-20 12:43:23 +0100
committerRichard van der Hoff <richard@matrix.org>2018-07-20 12:50:27 +0100
commit8462c26485fb4f19fc52accc05870c0ea4c8eb6a (patch)
treebcf4724005702d8798fbda90439439ff4e9c3693 /synapse
parentAdd a sleep to the Limiter to fix stack overflows. (diff)
downloadsynapse-8462c26485fb4f19fc52accc05870c0ea4c8eb6a.tar.xz
Improvements to the Limiter
* give them names, to improve logging
* use a deque rather than a list for efficiency
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/util/async.py33
2 files changed, 21 insertions, 14 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a39b852ceb..8c12c6990f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -427,7 +427,7 @@ class EventCreationHandler(object):
 
         # We arbitrarily limit concurrent event creation for a room to 5.
         # This is to stop us from diverging history *too* much.
-        self.limiter = Limiter(max_count=5)
+        self.limiter = Limiter(max_count=5, name="room_event_creation_limit")
 
         self.action_generator = hs.get_action_generator()
 
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 7d5acecb1c..22071ddef7 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,7 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import collections
 import logging
 from contextlib import contextmanager
 
@@ -248,11 +249,16 @@ class Limiter(object):
             # do some work.
 
     """
-    def __init__(self, max_count, clock=None):
+    def __init__(self, max_count=1, name=None, clock=None):
         """
         Args:
-            max_count(int): The maximum number of concurrent access
+            max_count(int): The maximum number of concurrent accesses
         """
+        if name is None:
+            self.name = id(self)
+        else:
+            self.name = name
+
         if not clock:
             from twisted.internet import reactor
             clock = Clock(reactor)
@@ -260,14 +266,14 @@ class Limiter(object):
         self.max_count = max_count
 
         # key_to_defer is a map from the key to a 2 element list where
-        # the first element is the number of things executing
-        # the second element is a list of deferreds for the things blocked from
+        # the first element is the number of things executing, and
+        # the second element is a deque of deferreds for the things blocked from
         # executing.
         self.key_to_defer = {}
 
     @defer.inlineCallbacks
     def queue(self, key):
-        entry = self.key_to_defer.setdefault(key, [0, []])
+        entry = self.key_to_defer.setdefault(key, [0, collections.deque()])
 
         # If the number of things executing is greater than the maximum
         # then add a deferred to the list of blocked items
@@ -277,10 +283,10 @@ class Limiter(object):
             new_defer = defer.Deferred()
             entry[1].append(new_defer)
 
-            logger.info("Waiting to acquire limiter lock for key %r", key)
-            with PreserveLoggingContext():
-                yield new_defer
-            logger.info("Acquired limiter lock for key %r", key)
+            logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key)
+            yield make_deferred_yieldable(new_defer)
+
+            logger.info("Acquired limiter lock %r for key %r", self.name, key)
             entry[0] += 1
 
             # if the code holding the lock completes synchronously, then it
@@ -296,7 +302,7 @@ class Limiter(object):
             yield self._clock.sleep(0)
 
         else:
-            logger.info("Acquired uncontended limiter lock for key %r", key)
+            logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key)
             entry[0] += 1
 
         @contextmanager
@@ -304,15 +310,16 @@ class Limiter(object):
             try:
                 yield
             finally:
-                logger.info("Releasing limiter lock for key %r", key)
+                logger.info("Releasing limiter lock %r for key %r", self.name, key)
 
                 # We've finished executing so check if there are any things
                 # blocked waiting to execute and start one of them
                 entry[0] -= 1
 
                 if entry[1]:
-                    next_def = entry[1].pop(0)
+                    next_def = entry[1].popleft()
 
+                    # we need to run the next thing in the sentinel context.
                     with PreserveLoggingContext():
                         next_def.callback(None)
                 elif entry[0] == 0: