diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 6a8e844d63..4b2a16a6a9 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -18,7 +18,7 @@ import collections
import inspect
import itertools
import logging
-from contextlib import asynccontextmanager, contextmanager
+from contextlib import asynccontextmanager
from typing import (
Any,
AsyncIterator,
@@ -29,7 +29,6 @@ from typing import (
Generic,
Hashable,
Iterable,
- Iterator,
List,
Optional,
Set,
@@ -342,7 +341,7 @@ class Linearizer:
Example:
- with await limiter.queue("test_key"):
+ async with limiter.queue("test_key"):
# do some work.
"""
@@ -383,95 +382,53 @@ class Linearizer:
# non-empty.
return bool(entry.deferreds)
- def queue(self, key: Hashable) -> defer.Deferred:
- # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
- # (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
- # propagated inside inlineCallbacks until Twisted 18.7)
+ def queue(self, key: Hashable) -> AsyncContextManager[None]:
+ @asynccontextmanager
+ async def _ctx_manager() -> AsyncIterator[None]:
+ entry = await self._acquire_lock(key)
+ try:
+ yield
+ finally:
+ self._release_lock(key, entry)
+
+ return _ctx_manager()
+
+ async def _acquire_lock(self, key: Hashable) -> _LinearizerEntry:
+ """Acquires a linearizer lock, waiting if necessary.
+
+ Returns once we have secured the lock.
+ """
entry = self.key_to_defer.setdefault(
key, _LinearizerEntry(0, collections.OrderedDict())
)
- # If the number of things executing is greater than the maximum
- # then add a deferred to the list of blocked items
- # When one of the things currently executing finishes it will callback
- # this item so that it can continue executing.
- if entry.count >= self.max_count:
- res = self._await_lock(key)
- else:
+ if entry.count < self.max_count:
+ # The number of things executing is less than the maximum.
logger.debug(
"Acquired uncontended linearizer lock %r for key %r", self.name, key
)
entry.count += 1
- res = defer.succeed(None)
-
- # once we successfully get the lock, we need to return a context manager which
- # will release the lock.
-
- @contextmanager
- def _ctx_manager(_: None) -> Iterator[None]:
- try:
- yield
- finally:
- logger.debug("Releasing linearizer lock %r for key %r", self.name, key)
-
- # We've finished executing so check if there are any things
- # blocked waiting to execute and start one of them
- entry.count -= 1
-
- if entry.deferreds:
- (next_def, _) = entry.deferreds.popitem(last=False)
-
- # we need to run the next thing in the sentinel context.
- with PreserveLoggingContext():
- next_def.callback(None)
- elif entry.count == 0:
- # We were the last thing for this key: remove it from the
- # map.
- del self.key_to_defer[key]
-
- res.addCallback(_ctx_manager)
- return res
-
- def _await_lock(self, key: Hashable) -> defer.Deferred:
- """Helper for queue: adds a deferred to the queue
-
- Assumes that we've already checked that we've reached the limit of the number
- of lock-holders we allow. Creates a new deferred which is added to the list, and
- adds some management around cancellations.
-
- Returns the deferred, which will callback once we have secured the lock.
-
- """
- entry = self.key_to_defer[key]
+ return entry
+ # Otherwise, the number of things executing is at the maximum and we have to
+ # add a deferred to the list of blocked items.
+ # When one of the things currently executing finishes it will callback
+ # this item so that it can continue executing.
logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key)
new_defer: "defer.Deferred[None]" = make_deferred_yieldable(defer.Deferred())
entry.deferreds[new_defer] = 1
- def cb(_r: None) -> "defer.Deferred[None]":
- logger.debug("Acquired linearizer lock %r for key %r", self.name, key)
- entry.count += 1
-
- # if the code holding the lock completes synchronously, then it
- # will recursively run the next claimant on the list. That can
- # relatively rapidly lead to stack exhaustion. This is essentially
- # the same problem as http://twistedmatrix.com/trac/ticket/9304.
- #
- # In order to break the cycle, we add a cheeky sleep(0) here to
- # ensure that we fall back to the reactor between each iteration.
- #
- # (This needs to happen while we hold the lock, and the context manager's exit
- # code must be synchronous, so this is the only sensible place.)
- return self._clock.sleep(0)
-
- def eb(e: Failure) -> Failure:
+ try:
+ await new_defer
+ except Exception as e:
logger.info("defer %r got err %r", new_defer, e)
if isinstance(e, CancelledError):
logger.debug(
- "Cancelling wait for linearizer lock %r for key %r", self.name, key
+ "Cancelling wait for linearizer lock %r for key %r",
+ self.name,
+ key,
)
-
else:
logger.warning(
"Unexpected exception waiting for linearizer lock %r for key %r",
@@ -481,10 +438,43 @@ class Linearizer:
# we just have to take ourselves back out of the queue.
del entry.deferreds[new_defer]
- return e
+ raise
+
+ logger.debug("Acquired linearizer lock %r for key %r", self.name, key)
+ entry.count += 1
- new_defer.addCallbacks(cb, eb)
- return new_defer
+ # if the code holding the lock completes synchronously, then it
+ # will recursively run the next claimant on the list. That can
+ # relatively rapidly lead to stack exhaustion. This is essentially
+ # the same problem as http://twistedmatrix.com/trac/ticket/9304.
+ #
+ # In order to break the cycle, we add a cheeky sleep(0) here to
+ # ensure that we fall back to the reactor between each iteration.
+ #
+ # This needs to happen while we hold the lock. We could put it on the
+ # exit path, but that would slow down the uncontended case.
+ await self._clock.sleep(0)
+
+ return entry
+
+ def _release_lock(self, key: Hashable, entry: _LinearizerEntry) -> None:
+ """Releases a held linearizer lock."""
+ logger.debug("Releasing linearizer lock %r for key %r", self.name, key)
+
+ # We've finished executing so check if there are any things
+ # blocked waiting to execute and start one of them
+ entry.count -= 1
+
+ if entry.deferreds:
+ (next_def, _) = entry.deferreds.popitem(last=False)
+
+ # we need to run the next thing in the sentinel context.
+ with PreserveLoggingContext():
+ next_def.callback(None)
+ elif entry.count == 0:
+ # We were the last thing for this key: remove it from the
+ # map.
+ del self.key_to_defer[key]
class ReadWriteLock:
|