diff --git a/synapse/util/async.py b/synapse/util/async.py
index 1453faf0ef..0729bb2863 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -17,9 +17,9 @@
from twisted.internet import defer, reactor
from .logcontext import (
- PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+ PreserveLoggingContext, make_deferred_yieldable, preserve_fn
)
-from synapse.util import unwrapFirstError
+from synapse.util import logcontext, unwrapFirstError
from contextlib import contextmanager
@@ -53,6 +53,11 @@ class ObservableDeferred(object):
Cancelling or otherwise resolving an observer will not affect the original
ObservableDeferred.
+
+ NB that it does not attempt to do anything with logcontexts; in general
+ you should probably make_deferred_yieldable the deferreds
+ returned by `observe`, and ensure that the original deferred runs its
+ callbacks in the sentinel logcontext.
"""
__slots__ = ["_deferred", "_observers", "_result"]
@@ -68,7 +73,7 @@ class ObservableDeferred(object):
try:
# TODO: Handle errors here.
self._observers.pop().callback(r)
- except:
+ except Exception:
pass
return r
@@ -78,7 +83,7 @@ class ObservableDeferred(object):
try:
# TODO: Handle errors here.
self._observers.pop().errback(f)
- except:
+ except Exception:
pass
if consumeErrors:
@@ -155,7 +160,7 @@ def concurrently_execute(func, args, limit):
except StopIteration:
pass
- return preserve_context_over_deferred(defer.gatherResults([
+ return logcontext.make_deferred_yieldable(defer.gatherResults([
preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit)
], consumeErrors=True)).addErrback(unwrapFirstError)
@@ -200,10 +205,29 @@ class Linearizer(object):
try:
with PreserveLoggingContext():
yield current_defer
- except:
+ except Exception:
logger.exception("Unexpected exception in Linearizer")
- logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+ logger.info("Acquired linearizer lock %r for key %r", self.name,
+ key)
+
+ # if the code holding the lock completes synchronously, then it
+ # will recursively run the next claimant on the list. That can
+ # relatively rapidly lead to stack exhaustion. This is essentially
+ # the same problem as http://twistedmatrix.com/trac/ticket/9304.
+ #
+ # In order to break the cycle, we add a cheeky sleep(0) here to
+ # ensure that we fall back to the reactor between each iteration.
+ #
+ # (There's no particular need for it to happen before we return
+ # the context manager, but it needs to happen while we hold the
+ # lock, and the context manager's exit code must be synchronous,
+ # so actually this is the only sensible place.
+ yield run_on_reactor()
+
+ else:
+ logger.info("Acquired uncontended linearizer lock %r for key %r",
+ self.name, key)
@contextmanager
def _ctx_manager():
@@ -211,7 +235,8 @@ class Linearizer(object):
yield
finally:
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
- new_defer.callback(None)
+ with PreserveLoggingContext():
+ new_defer.callback(None)
current_d = self.key_to_defer.get(key)
if current_d is new_defer:
self.key_to_defer.pop(key, None)
@@ -253,8 +278,13 @@ class Limiter(object):
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1].append(new_defer)
+
+ logger.info("Waiting to acquire limiter lock for key %r", key)
with PreserveLoggingContext():
yield new_defer
+ logger.info("Acquired limiter lock for key %r", key)
+ else:
+ logger.info("Acquired uncontended limiter lock for key %r", key)
entry[0] += 1
@@ -263,16 +293,21 @@ class Limiter(object):
try:
yield
finally:
+ logger.info("Releasing limiter lock for key %r", key)
+
# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
entry[0] -= 1
- try:
- entry[1].pop(0).callback(None)
- except IndexError:
- # If nothing else is executing for this key then remove it
- # from the map
- if entry[0] == 0:
- self.key_to_defer.pop(key, None)
+
+ if entry[1]:
+ next_def = entry[1].pop(0)
+
+ with PreserveLoggingContext():
+ next_def.callback(None)
+ elif entry[0] == 0:
+ # We were the last thing for this key: remove it from the
+ # map.
+ del self.key_to_defer[key]
defer.returnValue(_ctx_manager())
@@ -316,7 +351,7 @@ class ReadWriteLock(object):
# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
- yield curr_writer
+ yield make_deferred_yieldable(curr_writer)
@contextmanager
def _ctx_manager():
@@ -345,7 +380,7 @@ class ReadWriteLock(object):
curr_readers.clear()
self.key_to_current_writer[key] = new_defer
- yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
+ yield make_deferred_yieldable(defer.gatherResults(to_wait_on))
@contextmanager
def _ctx_manager():
|