summary refs log tree commit diff
path: root/synapse/util/async.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-12-12 10:30:23 +0000
committerGitHub <noreply@github.com>2016-12-12 10:30:23 +0000
commitd53a80af25d518da801c03efa911ba5774ab526b (patch)
treed412b71edf38f6852c1fa374615348d86ed0cea7 /synapse/util/async.py
parentMerge pull request #1686 from matrix-org/rav/fix_federation_key_fails (diff)
parentComments (diff)
downloadsynapse-d53a80af25d518da801c03efa911ba5774ab526b.tar.xz
Merge pull request #1620 from matrix-org/erikj/concurrent_room_access
Limit the number of events that can be created on a given room concurrently
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r--synapse/util/async.py58
1 files changed, 58 insertions, 0 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 347fb1e380..16ed183d4c 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -197,6 +197,64 @@ class Linearizer(object):
         defer.returnValue(_ctx_manager())
 
 
+class Limiter(object):
+    """Limits concurrent access to resources based on a key. Useful to ensure
+    only a few thing happen at a time on a given resource.
+
+    Example:
+
+        with (yield limiter.queue("test_key")):
+            # do some work.
+
+    """
+    def __init__(self, max_count):
+        """
+        Args:
+            max_count(int): The maximum number of concurrent access
+        """
+        self.max_count = max_count
+
+        # key_to_defer is a map from the key to a 2 element list where
+        # the first element is the number of things executing
+        # the second element is a list of deferreds for the things blocked from
+        # executing.
+        self.key_to_defer = {}
+
+    @defer.inlineCallbacks
+    def queue(self, key):
+        entry = self.key_to_defer.setdefault(key, [0, []])
+
+        # If the number of things executing is greater than the maximum
+        # then add a deferred to the list of blocked items
+        # When on of the things currently executing finishes it will callback
+        # this item so that it can continue executing.
+        if entry[0] >= self.max_count:
+            new_defer = defer.Deferred()
+            entry[1].append(new_defer)
+            with PreserveLoggingContext():
+                yield new_defer
+
+        entry[0] += 1
+
+        @contextmanager
+        def _ctx_manager():
+            try:
+                yield
+            finally:
+                # We've finished executing so check if there are any things
+                # blocked waiting to execute and start one of them
+                entry[0] -= 1
+                try:
+                    entry[1].pop(0).callback(None)
+                except IndexError:
+                    # If nothing else is executing for this key then remove it
+                    # from the map
+                    if entry[0] == 0:
+                        self.key_to_defer.pop(key, None)
+
+        defer.returnValue(_ctx_manager())
+
+
 class ReadWriteLock(object):
     """A deferred style read write lock.