Comments
2 files changed, 15 insertions, 0 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 90519e33e3..4d0515ddfb 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -50,6 +50,8 @@ class MessageHandler(BaseHandler):
self.pagination_lock = ReadWriteLock()
+ # We arbitrarily limit concurrent event creation for a room to 5.
+ # This is to stop us from diverging history *too* much.
self.limiter = Limiter(max_count=5)
@defer.inlineCallbacks
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 2a680fff5e..16ed183d4c 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -213,12 +213,21 @@ class Limiter(object):
max_count(int): The maximum number of concurrent access
"""
self.max_count = max_count
+
+ # key_to_defer is a map from the key to a 2 element list where
+ # the first element is the number of things executing
+ # the second element is a list of deferreds for the things blocked from
+ # executing.
self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
entry = self.key_to_defer.setdefault(key, [0, []])
+ # If the number of things executing is greater than the maximum
+ # then add a deferred to the list of blocked items
+ # When on of the things currently executing finishes it will callback
+ # this item so that it can continue executing.
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1].append(new_defer)
@@ -232,10 +241,14 @@ class Limiter(object):
try:
yield
finally:
+ # We've finished executing so check if there are any things
+ # blocked waiting to execute and start one of them
entry[0] -= 1
try:
entry[1].pop(0).callback(None)
except IndexError:
+ # If nothing else is executing for this key then remove it
+ # from the map
if entry[0] == 0:
self.key_to_defer.pop(key, None)
|