diff options
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/async_helpers.py | 144 |
1 files changed, 67 insertions, 77 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 6a8e844d63..4b2a16a6a9 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -18,7 +18,7 @@ import collections import inspect import itertools import logging -from contextlib import asynccontextmanager, contextmanager +from contextlib import asynccontextmanager from typing import ( Any, AsyncIterator, @@ -29,7 +29,6 @@ from typing import ( Generic, Hashable, Iterable, - Iterator, List, Optional, Set, @@ -342,7 +341,7 @@ class Linearizer: Example: - with await limiter.queue("test_key"): + async with limiter.queue("test_key"): # do some work. """ @@ -383,95 +382,53 @@ class Linearizer: # non-empty. return bool(entry.deferreds) - def queue(self, key: Hashable) -> defer.Deferred: - # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly. - # (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not - # propagated inside inlineCallbacks until Twisted 18.7) + def queue(self, key: Hashable) -> AsyncContextManager[None]: + @asynccontextmanager + async def _ctx_manager() -> AsyncIterator[None]: + entry = await self._acquire_lock(key) + try: + yield + finally: + self._release_lock(key, entry) + + return _ctx_manager() + + async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry: + """Acquires a linearizer lock, waiting if necessary. + + Returns once we have secured the lock. + """ entry = self.key_to_defer.setdefault( key, _LinearizerEntry(0, collections.OrderedDict()) ) - # If the number of things executing is greater than the maximum - # then add a deferred to the list of blocked items - # When one of the things currently executing finishes it will callback - # this item so that it can continue executing. - if entry.count >= self.max_count: - res = self._await_lock(key) - else: + if entry.count < self.max_count: + # The number of things executing is less than the maximum. logger.debug( "Acquired uncontended linearizer lock %r for key %r", self.name, key ) entry.count += 1 - res = defer.succeed(None) - - # once we successfully get the lock, we need to return a context manager which - # will release the lock. - - @contextmanager - def _ctx_manager(_: None) -> Iterator[None]: - try: - yield - finally: - logger.debug("Releasing linearizer lock %r for key %r", self.name, key) - - # We've finished executing so check if there are any things - # blocked waiting to execute and start one of them - entry.count -= 1 - - if entry.deferreds: - (next_def, _) = entry.deferreds.popitem(last=False) - - # we need to run the next thing in the sentinel context. - with PreserveLoggingContext(): - next_def.callback(None) - elif entry.count == 0: - # We were the last thing for this key: remove it from the - # map. - del self.key_to_defer[key] - - res.addCallback(_ctx_manager) - return res - - def _await_lock(self, key: Hashable) -> defer.Deferred: - """Helper for queue: adds a deferred to the queue - - Assumes that we've already checked that we've reached the limit of the number - of lock-holders we allow. Creates a new deferred which is added to the list, and - adds some management around cancellations. - - Returns the deferred, which will callback once we have secured the lock. - - """ - entry = self.key_to_defer[key] + return entry + # Otherwise, the number of things executing is at the maximum and we have to + # add a deferred to the list of blocked items. + # When one of the things currently executing finishes it will callback + # this item so that it can continue executing. logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key) new_defer: "defer.Deferred[None]" = make_deferred_yieldable(defer.Deferred()) entry.deferreds[new_defer] = 1 - def cb(_r: None) -> "defer.Deferred[None]": - logger.debug("Acquired linearizer lock %r for key %r", self.name, key) - entry.count += 1 - - # if the code holding the lock completes synchronously, then it - # will recursively run the next claimant on the list. That can - # relatively rapidly lead to stack exhaustion. This is essentially - # the same problem as http://twistedmatrix.com/trac/ticket/9304. - # - # In order to break the cycle, we add a cheeky sleep(0) here to - # ensure that we fall back to the reactor between each iteration. - # - # (This needs to happen while we hold the lock, and the context manager's exit - # code must be synchronous, so this is the only sensible place.) - return self._clock.sleep(0) - - def eb(e: Failure) -> Failure: + try: + await new_defer + except Exception as e: logger.info("defer %r got err %r", new_defer, e) if isinstance(e, CancelledError): logger.debug( - "Cancelling wait for linearizer lock %r for key %r", self.name, key + "Cancelling wait for linearizer lock %r for key %r", + self.name, + key, ) - else: logger.warning( "Unexpected exception waiting for linearizer lock %r for key %r", @@ -481,10 +438,43 @@ class Linearizer: # we just have to take ourselves back out of the queue. del entry.deferreds[new_defer] - return e + raise + + logger.debug("Acquired linearizer lock %r for key %r", self.name, key) + entry.count += 1 - new_defer.addCallbacks(cb, eb) - return new_defer + # if the code holding the lock completes synchronously, then it + # will recursively run the next claimant on the list. That can + # relatively rapidly lead to stack exhaustion. This is essentially + # the same problem as http://twistedmatrix.com/trac/ticket/9304. + # + # In order to break the cycle, we add a cheeky sleep(0) here to + # ensure that we fall back to the reactor between each iteration. + # + # This needs to happen while we hold the lock. We could put it on the + # exit path, but that would slow down the uncontended case. + await self._clock.sleep(0) + + return entry + + def _release_lock(self, key: Hashable, entry: _LinearizerEntry) -> None: + """Releases a held linearizer lock.""" + logger.debug("Releasing linearizer lock %r for key %r", self.name, key) + + # We've finished executing so check if there are any things + # blocked waiting to execute and start one of them + entry.count -= 1 + + if entry.deferreds: + (next_def, _) = entry.deferreds.popitem(last=False) + + # we need to run the next thing in the sentinel context. + with PreserveLoggingContext(): + next_def.callback(None) + elif entry.count == 0: + # We were the last thing for this key: remove it from the + # map. + del self.key_to_defer[key] class ReadWriteLock: |