summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-07-20 13:11:43 +0100
committerRichard van der Hoff <richard@matrix.org>2018-07-20 13:11:43 +0100
commit7c712f95bbc7f405355d5714c92d65551f64fec2 (patch)
tree1c120e9f62a42824d496f05882c27570ac0ce58a /synapse
parentImprovements to the Limiter (diff)
downloadsynapse-7c712f95bbc7f405355d5714c92d65551f64fec2.tar.xz
Combine Limiter and Linearizer
Linearizer was effectively a Limiter with max_count=1, so rather than
maintaining two sets of code, let's combine them.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/util/async.py99
2 files changed, 12 insertions, 91 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8c12c6990f..abc07ea87c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -33,7 +33,7 @@ from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.replication.http.send_event import send_event_to_master
 from synapse.types import RoomAlias, RoomStreamToken, UserID
-from synapse.util.async import Limiter, ReadWriteLock
+from synapse.util.async import Linearizer, ReadWriteLock
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.logcontext import run_in_background
 from synapse.util.metrics import measure_func
@@ -427,7 +427,7 @@ class EventCreationHandler(object):
 
         # We arbitrarily limit concurrent event creation for a room to 5.
         # This is to stop us from diverging history *too* much.
-        self.limiter = Limiter(max_count=5, name="room_event_creation_limit")
+        self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
 
         self.action_generator = hs.get_action_generator()
 
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 22071ddef7..5a50d9700f 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -157,91 +157,8 @@ def concurrently_execute(func, args, limit):
 
 
 class Linearizer(object):
-    """Linearizes access to resources based on a key. Useful to ensure only one
-    thing is happening at a time on a given resource.
-
-    Example:
-
-        with (yield linearizer.queue("test_key")):
-            # do some work.
-
-    """
-    def __init__(self, name=None, clock=None):
-        if name is None:
-            self.name = id(self)
-        else:
-            self.name = name
-        self.key_to_defer = {}
-
-        if not clock:
-            from twisted.internet import reactor
-            clock = Clock(reactor)
-        self._clock = clock
-
-    @defer.inlineCallbacks
-    def queue(self, key):
-        # If there is already a deferred in the queue, we pull it out so that
-        # we can wait on it later.
-        # Then we replace it with a deferred that we resolve *after* the
-        # context manager has exited.
-        # We only return the context manager after the previous deferred has
-        # resolved.
-        # This all has the net effect of creating a chain of deferreds that
-        # wait for the previous deferred before starting their work.
-        current_defer = self.key_to_defer.get(key)
-
-        new_defer = defer.Deferred()
-        self.key_to_defer[key] = new_defer
-
-        if current_defer:
-            logger.info(
-                "Waiting to acquire linearizer lock %r for key %r", self.name, key
-            )
-            try:
-                with PreserveLoggingContext():
-                    yield current_defer
-            except Exception:
-                logger.exception("Unexpected exception in Linearizer")
-
-            logger.info("Acquired linearizer lock %r for key %r", self.name,
-                        key)
-
-            # if the code holding the lock completes synchronously, then it
-            # will recursively run the next claimant on the list. That can
-            # relatively rapidly lead to stack exhaustion. This is essentially
-            # the same problem as http://twistedmatrix.com/trac/ticket/9304.
-            #
-            # In order to break the cycle, we add a cheeky sleep(0) here to
-            # ensure that we fall back to the reactor between each iteration.
-            #
-            # (There's no particular need for it to happen before we return
-            # the context manager, but it needs to happen while we hold the
-            # lock, and the context manager's exit code must be synchronous,
-            # so actually this is the only sensible place.
-            yield self._clock.sleep(0)
-
-        else:
-            logger.info("Acquired uncontended linearizer lock %r for key %r",
-                        self.name, key)
-
-        @contextmanager
-        def _ctx_manager():
-            try:
-                yield
-            finally:
-                logger.info("Releasing linearizer lock %r for key %r", self.name, key)
-                with PreserveLoggingContext():
-                    new_defer.callback(None)
-                current_d = self.key_to_defer.get(key)
-                if current_d is new_defer:
-                    self.key_to_defer.pop(key, None)
-
-        defer.returnValue(_ctx_manager())
-
-
-class Limiter(object):
     """Limits concurrent access to resources based on a key. Useful to ensure
-    only a few thing happen at a time on a given resource.
+    only a few things happen at a time on a given resource.
 
     Example:
 
@@ -249,7 +166,7 @@ class Limiter(object):
             # do some work.
 
     """
-    def __init__(self, max_count=1, name=None, clock=None):
+    def __init__(self, name=None, max_count=1, clock=None):
         """
         Args:
             max_count(int): The maximum number of concurrent accesses
@@ -283,10 +200,12 @@ class Limiter(object):
             new_defer = defer.Deferred()
             entry[1].append(new_defer)
 
-            logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key)
+            logger.info(
+                "Waiting to acquire linearizer lock %r for key %r", self.name, key,
+            )
             yield make_deferred_yieldable(new_defer)
 
-            logger.info("Acquired limiter lock %r for key %r", self.name, key)
+            logger.info("Acquired linearizer lock %r for key %r", self.name, key)
             entry[0] += 1
 
             # if the code holding the lock completes synchronously, then it
@@ -302,7 +221,9 @@ class Limiter(object):
             yield self._clock.sleep(0)
 
         else:
-            logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key)
+            logger.info(
+                "Acquired uncontended linearizer lock %r for key %r", self.name, key,
+            )
             entry[0] += 1
 
         @contextmanager
@@ -310,7 +231,7 @@ class Limiter(object):
             try:
                 yield
             finally:
-                logger.info("Releasing limiter lock %r for key %r", self.name, key)
+                logger.info("Releasing linearizer lock %r for key %r", self.name, key)
 
                 # We've finished executing so check if there are any things
                 # blocked waiting to execute and start one of them