diff --git a/synapse/util/async.py b/synapse/util/async_helpers.py
index a7094e2fb4..9b3f2f4b96 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async_helpers.py
@@ -188,62 +188,30 @@ class Linearizer(object):
# things blocked from executing.
self.key_to_defer = {}
- @defer.inlineCallbacks
def queue(self, key):
+ # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
+ # (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
+ # propagated inside inlineCallbacks until Twisted 18.7)
entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
# If the number of things executing is greater than the maximum
# then add a deferred to the list of blocked items
- # When on of the things currently executing finishes it will callback
+ # When one of the things currently executing finishes it will callback
# this item so that it can continue executing.
if entry[0] >= self.max_count:
- new_defer = defer.Deferred()
- entry[1][new_defer] = 1
-
- logger.info(
- "Waiting to acquire linearizer lock %r for key %r", self.name, key,
- )
- try:
- yield make_deferred_yieldable(new_defer)
- except Exception as e:
- if isinstance(e, CancelledError):
- logger.info(
- "Cancelling wait for linearizer lock %r for key %r",
- self.name, key,
- )
- else:
- logger.warn(
- "Unexpected exception waiting for linearizer lock %r for key %r",
- self.name, key,
- )
-
- # we just have to take ourselves back out of the queue.
- del entry[1][new_defer]
- raise
-
- logger.info("Acquired linearizer lock %r for key %r", self.name, key)
- entry[0] += 1
-
- # if the code holding the lock completes synchronously, then it
- # will recursively run the next claimant on the list. That can
- # relatively rapidly lead to stack exhaustion. This is essentially
- # the same problem as http://twistedmatrix.com/trac/ticket/9304.
- #
- # In order to break the cycle, we add a cheeky sleep(0) here to
- # ensure that we fall back to the reactor between each iteration.
- #
- # (This needs to happen while we hold the lock, and the context manager's exit
- # code must be synchronous, so this is the only sensible place.)
- yield self._clock.sleep(0)
-
+ res = self._await_lock(key)
else:
logger.info(
"Acquired uncontended linearizer lock %r for key %r", self.name, key,
)
entry[0] += 1
+ res = defer.succeed(None)
+
+ # once we successfully get the lock, we need to return a context manager which
+ # will release the lock.
@contextmanager
- def _ctx_manager():
+ def _ctx_manager(_):
try:
yield
finally:
@@ -264,7 +232,64 @@ class Linearizer(object):
# map.
del self.key_to_defer[key]
- defer.returnValue(_ctx_manager())
+ res.addCallback(_ctx_manager)
+ return res
+
+ def _await_lock(self, key):
+ """Helper for queue: adds a deferred to the queue
+
+ Assumes that we've already checked that we've reached the limit of the number
+ of lock-holders we allow. Creates a new deferred which is added to the list, and
+ adds some management around cancellations.
+
+ Returns the deferred, which will callback once we have secured the lock.
+
+ """
+ entry = self.key_to_defer[key]
+
+ logger.info(
+ "Waiting to acquire linearizer lock %r for key %r", self.name, key,
+ )
+
+ new_defer = make_deferred_yieldable(defer.Deferred())
+ entry[1][new_defer] = 1
+
+ def cb(_r):
+ logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+ entry[0] += 1
+
+ # if the code holding the lock completes synchronously, then it
+ # will recursively run the next claimant on the list. That can
+ # relatively rapidly lead to stack exhaustion. This is essentially
+ # the same problem as http://twistedmatrix.com/trac/ticket/9304.
+ #
+ # In order to break the cycle, we add a cheeky sleep(0) here to
+ # ensure that we fall back to the reactor between each iteration.
+ #
+ # (This needs to happen while we hold the lock, and the context manager's exit
+ # code must be synchronous, so this is the only sensible place.)
+ return self._clock.sleep(0)
+
+ def eb(e):
+ logger.info("defer %r got err %r", new_defer, e)
+ if isinstance(e, CancelledError):
+ logger.info(
+ "Cancelling wait for linearizer lock %r for key %r",
+ self.name, key,
+ )
+
+ else:
+ logger.warn(
+ "Unexpected exception waiting for linearizer lock %r for key %r",
+ self.name, key,
+ )
+
+ # we just have to take ourselves back out of the queue.
+ del entry[1][new_defer]
+ return e
+
+ new_defer.addCallbacks(cb, eb)
+ return new_defer
class ReadWriteLock(object):
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 861c24809c..187510576a 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -25,7 +25,7 @@ from six import itervalues, string_types
from twisted.internet import defer
from synapse.util import logcontext, unwrapFirstError
-from synapse.util.async import ObservableDeferred
+from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index a8491b42d5..afb03b2e1b 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -16,7 +16,7 @@ import logging
from twisted.internet import defer
-from synapse.util.async import ObservableDeferred
+from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches import register_cache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py
index d03678b8c8..8318db8d2c 100644
--- a/synapse/util/caches/snapshot_cache.py
+++ b/synapse/util/caches/snapshot_cache.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.async import ObservableDeferred
+from synapse.util.async_helpers import ObservableDeferred
class SnapshotCache(object):
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 8dcae50b39..07e83fadda 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -526,7 +526,7 @@ _to_ignore = [
"synapse.util.logcontext",
"synapse.http.server",
"synapse.storage._base",
- "synapse.util.async",
+ "synapse.util.async_helpers",
]
|