diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8c12c6990f..abc07ea87c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -33,7 +33,7 @@ from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master
from synapse.types import RoomAlias, RoomStreamToken, UserID
-from synapse.util.async import Limiter, ReadWriteLock
+from synapse.util.async import Linearizer, ReadWriteLock
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
@@ -427,7 +427,7 @@ class EventCreationHandler(object):
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
- self.limiter = Limiter(max_count=5, name="room_event_creation_limit")
+ self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
self.action_generator = hs.get_action_generator()
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 22071ddef7..5a50d9700f 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -157,91 +157,8 @@ def concurrently_execute(func, args, limit):
class Linearizer(object):
- """Linearizes access to resources based on a key. Useful to ensure only one
- thing is happening at a time on a given resource.
-
- Example:
-
- with (yield linearizer.queue("test_key")):
- # do some work.
-
- """
- def __init__(self, name=None, clock=None):
- if name is None:
- self.name = id(self)
- else:
- self.name = name
- self.key_to_defer = {}
-
- if not clock:
- from twisted.internet import reactor
- clock = Clock(reactor)
- self._clock = clock
-
- @defer.inlineCallbacks
- def queue(self, key):
- # If there is already a deferred in the queue, we pull it out so that
- # we can wait on it later.
- # Then we replace it with a deferred that we resolve *after* the
- # context manager has exited.
- # We only return the context manager after the previous deferred has
- # resolved.
- # This all has the net effect of creating a chain of deferreds that
- # wait for the previous deferred before starting their work.
- current_defer = self.key_to_defer.get(key)
-
- new_defer = defer.Deferred()
- self.key_to_defer[key] = new_defer
-
- if current_defer:
- logger.info(
- "Waiting to acquire linearizer lock %r for key %r", self.name, key
- )
- try:
- with PreserveLoggingContext():
- yield current_defer
- except Exception:
- logger.exception("Unexpected exception in Linearizer")
-
- logger.info("Acquired linearizer lock %r for key %r", self.name,
- key)
-
- # if the code holding the lock completes synchronously, then it
- # will recursively run the next claimant on the list. That can
- # relatively rapidly lead to stack exhaustion. This is essentially
- # the same problem as http://twistedmatrix.com/trac/ticket/9304.
- #
- # In order to break the cycle, we add a cheeky sleep(0) here to
- # ensure that we fall back to the reactor between each iteration.
- #
- # (There's no particular need for it to happen before we return
- # the context manager, but it needs to happen while we hold the
- # lock, and the context manager's exit code must be synchronous,
- # so actually this is the only sensible place.
- yield self._clock.sleep(0)
-
- else:
- logger.info("Acquired uncontended linearizer lock %r for key %r",
- self.name, key)
-
- @contextmanager
- def _ctx_manager():
- try:
- yield
- finally:
- logger.info("Releasing linearizer lock %r for key %r", self.name, key)
- with PreserveLoggingContext():
- new_defer.callback(None)
- current_d = self.key_to_defer.get(key)
- if current_d is new_defer:
- self.key_to_defer.pop(key, None)
-
- defer.returnValue(_ctx_manager())
-
-
-class Limiter(object):
"""Limits concurrent access to resources based on a key. Useful to ensure
- only a few thing happen at a time on a given resource.
+ only a few things happen at a time on a given resource.
Example:
@@ -249,7 +166,7 @@ class Limiter(object):
# do some work.
"""
- def __init__(self, max_count=1, name=None, clock=None):
+ def __init__(self, name=None, max_count=1, clock=None):
"""
Args:
max_count(int): The maximum number of concurrent accesses
@@ -283,10 +200,12 @@ class Limiter(object):
new_defer = defer.Deferred()
entry[1].append(new_defer)
- logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key)
+ logger.info(
+ "Waiting to acquire linearizer lock %r for key %r", self.name, key,
+ )
yield make_deferred_yieldable(new_defer)
- logger.info("Acquired limiter lock %r for key %r", self.name, key)
+ logger.info("Acquired linearizer lock %r for key %r", self.name, key)
entry[0] += 1
# if the code holding the lock completes synchronously, then it
@@ -302,7 +221,9 @@ class Limiter(object):
yield self._clock.sleep(0)
else:
- logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key)
+ logger.info(
+ "Acquired uncontended linearizer lock %r for key %r", self.name, key,
+ )
entry[0] += 1
@contextmanager
@@ -310,7 +231,7 @@ class Limiter(object):
try:
yield
finally:
- logger.info("Releasing limiter lock %r for key %r", self.name, key)
+ logger.info("Releasing linearizer lock %r for key %r", self.name, key)
# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
|