diff options
-rw-r--r-- | synapse/util/async.py | 111 |
1 files changed, 68 insertions, 43 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index a7094e2fb4..9b3f2f4b96 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -188,62 +188,30 @@ class Linearizer(object): # things blocked from executing. self.key_to_defer = {} - @defer.inlineCallbacks def queue(self, key): + # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly. + # (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not + # propagated inside inlineCallbacks until Twisted 18.7) entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()]) # If the number of things executing is greater than the maximum # then add a deferred to the list of blocked items - # When on of the things currently executing finishes it will callback + # When one of the things currently executing finishes it will callback # this item so that it can continue executing. if entry[0] >= self.max_count: - new_defer = defer.Deferred() - entry[1][new_defer] = 1 - - logger.info( - "Waiting to acquire linearizer lock %r for key %r", self.name, key, - ) - try: - yield make_deferred_yieldable(new_defer) - except Exception as e: - if isinstance(e, CancelledError): - logger.info( - "Cancelling wait for linearizer lock %r for key %r", - self.name, key, - ) - else: - logger.warn( - "Unexpected exception waiting for linearizer lock %r for key %r", - self.name, key, - ) - - # we just have to take ourselves back out of the queue. - del entry[1][new_defer] - raise - - logger.info("Acquired linearizer lock %r for key %r", self.name, key) - entry[0] += 1 - - # if the code holding the lock completes synchronously, then it - # will recursively run the next claimant on the list. That can - # relatively rapidly lead to stack exhaustion. This is essentially - # the same problem as http://twistedmatrix.com/trac/ticket/9304. - # - # In order to break the cycle, we add a cheeky sleep(0) here to - # ensure that we fall back to the reactor between each iteration. - # - # (This needs to happen while we hold the lock, and the context manager's exit - # code must be synchronous, so this is the only sensible place.) - yield self._clock.sleep(0) - + res = self._await_lock(key) else: logger.info( "Acquired uncontended linearizer lock %r for key %r", self.name, key, ) entry[0] += 1 + res = defer.succeed(None) + + # once we successfully get the lock, we need to return a context manager which + # will release the lock. @contextmanager - def _ctx_manager(): + def _ctx_manager(_): try: yield finally: @@ -264,7 +232,64 @@ class Linearizer(object): # map. del self.key_to_defer[key] - defer.returnValue(_ctx_manager()) + res.addCallback(_ctx_manager) + return res + + def _await_lock(self, key): + """Helper for queue: adds a deferred to the queue + + Assumes that we've already checked that we've reached the limit of the number + of lock-holders we allow. Creates a new deferred which is added to the list, and + adds some management around cancellations. + + Returns the deferred, which will callback once we have secured the lock. + + """ + entry = self.key_to_defer[key] + + logger.info( + "Waiting to acquire linearizer lock %r for key %r", self.name, key, + ) + + new_defer = make_deferred_yieldable(defer.Deferred()) + entry[1][new_defer] = 1 + + def cb(_r): + logger.info("Acquired linearizer lock %r for key %r", self.name, key) + entry[0] += 1 + + # if the code holding the lock completes synchronously, then it + # will recursively run the next claimant on the list. That can + # relatively rapidly lead to stack exhaustion. This is essentially + # the same problem as http://twistedmatrix.com/trac/ticket/9304. + # + # In order to break the cycle, we add a cheeky sleep(0) here to + # ensure that we fall back to the reactor between each iteration. + # + # (This needs to happen while we hold the lock, and the context manager's exit + # code must be synchronous, so this is the only sensible place.) + return self._clock.sleep(0) + + def eb(e): + logger.info("defer %r got err %r", new_defer, e) + if isinstance(e, CancelledError): + logger.info( + "Cancelling wait for linearizer lock %r for key %r", + self.name, key, + ) + + else: + logger.warn( + "Unexpected exception waiting for linearizer lock %r for key %r", + self.name, key, + ) + + # we just have to take ourselves back out of the queue. + del entry[1][new_defer] + return e + + new_defer.addCallbacks(cb, eb) + return new_defer class ReadWriteLock(object): |