From 7c712f95bbc7f405355d5714c92d65551f64fec2 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 20 Jul 2018 13:11:43 +0100 Subject: Combine Limiter and Linearizer Linearizer was effectively a Limiter with max_count=1, so rather than maintaining two sets of code, let's combine them. --- synapse/handlers/message.py | 4 +- synapse/util/async.py | 99 +++++---------------------------------------- 2 files changed, 12 insertions(+), 91 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 8c12c6990f..abc07ea87c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -33,7 +33,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.replication.http.send_event import send_event_to_master from synapse.types import RoomAlias, RoomStreamToken, UserID -from synapse.util.async import Limiter, ReadWriteLock +from synapse.util.async import Linearizer, ReadWriteLock from synapse.util.frozenutils import frozendict_json_encoder from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func @@ -427,7 +427,7 @@ class EventCreationHandler(object): # We arbitrarily limit concurrent event creation for a room to 5. # This is to stop us from diverging history *too* much. - self.limiter = Limiter(max_count=5, name="room_event_creation_limit") + self.limiter = Linearizer(max_count=5, name="room_event_creation_limit") self.action_generator = hs.get_action_generator() diff --git a/synapse/util/async.py b/synapse/util/async.py index 22071ddef7..5a50d9700f 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -157,91 +157,8 @@ def concurrently_execute(func, args, limit): class Linearizer(object): - """Linearizes access to resources based on a key. Useful to ensure only one - thing is happening at a time on a given resource. - - Example: - - with (yield linearizer.queue("test_key")): - # do some work. - - """ - def __init__(self, name=None, clock=None): - if name is None: - self.name = id(self) - else: - self.name = name - self.key_to_defer = {} - - if not clock: - from twisted.internet import reactor - clock = Clock(reactor) - self._clock = clock - - @defer.inlineCallbacks - def queue(self, key): - # If there is already a deferred in the queue, we pull it out so that - # we can wait on it later. - # Then we replace it with a deferred that we resolve *after* the - # context manager has exited. - # We only return the context manager after the previous deferred has - # resolved. - # This all has the net effect of creating a chain of deferreds that - # wait for the previous deferred before starting their work. - current_defer = self.key_to_defer.get(key) - - new_defer = defer.Deferred() - self.key_to_defer[key] = new_defer - - if current_defer: - logger.info( - "Waiting to acquire linearizer lock %r for key %r", self.name, key - ) - try: - with PreserveLoggingContext(): - yield current_defer - except Exception: - logger.exception("Unexpected exception in Linearizer") - - logger.info("Acquired linearizer lock %r for key %r", self.name, - key) - - # if the code holding the lock completes synchronously, then it - # will recursively run the next claimant on the list. That can - # relatively rapidly lead to stack exhaustion. This is essentially - # the same problem as http://twistedmatrix.com/trac/ticket/9304. - # - # In order to break the cycle, we add a cheeky sleep(0) here to - # ensure that we fall back to the reactor between each iteration. - # - # (There's no particular need for it to happen before we return - # the context manager, but it needs to happen while we hold the - # lock, and the context manager's exit code must be synchronous, - # so actually this is the only sensible place. - yield self._clock.sleep(0) - - else: - logger.info("Acquired uncontended linearizer lock %r for key %r", - self.name, key) - - @contextmanager - def _ctx_manager(): - try: - yield - finally: - logger.info("Releasing linearizer lock %r for key %r", self.name, key) - with PreserveLoggingContext(): - new_defer.callback(None) - current_d = self.key_to_defer.get(key) - if current_d is new_defer: - self.key_to_defer.pop(key, None) - - defer.returnValue(_ctx_manager()) - - -class Limiter(object): """Limits concurrent access to resources based on a key. Useful to ensure - only a few thing happen at a time on a given resource. + only a few things happen at a time on a given resource. Example: @@ -249,7 +166,7 @@ class Limiter(object): # do some work. """ - def __init__(self, max_count=1, name=None, clock=None): + def __init__(self, name=None, max_count=1, clock=None): """ Args: max_count(int): The maximum number of concurrent accesses @@ -283,10 +200,12 @@ class Limiter(object): new_defer = defer.Deferred() entry[1].append(new_defer) - logger.info("Waiting to acquire limiter lock %r for key %r", self.name, key) + logger.info( + "Waiting to acquire linearizer lock %r for key %r", self.name, key, + ) yield make_deferred_yieldable(new_defer) - logger.info("Acquired limiter lock %r for key %r", self.name, key) + logger.info("Acquired linearizer lock %r for key %r", self.name, key) entry[0] += 1 # if the code holding the lock completes synchronously, then it @@ -302,7 +221,9 @@ class Limiter(object): yield self._clock.sleep(0) else: - logger.info("Acquired uncontended limiter lock %r for key %r", self.name, key) + logger.info( + "Acquired uncontended linearizer lock %r for key %r", self.name, key, + ) entry[0] += 1 @contextmanager @@ -310,7 +231,7 @@ class Limiter(object): try: yield finally: - logger.info("Releasing limiter lock %r for key %r", self.name, key) + logger.info("Releasing linearizer lock %r for key %r", self.name, key) # We've finished executing so check if there are any things # blocked waiting to execute and start one of them -- cgit 1.4.1