diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 2a2360ab5d..756d8ffa32 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -59,9 +59,9 @@ class Clock(object):
f(function): The function to call repeatedly.
msec(float): How long to wait between calls in milliseconds.
"""
- l = task.LoopingCall(f)
- l.start(msec / 1000.0, now=False)
- return l
+ call = task.LoopingCall(f)
+ call.start(msec / 1000.0, now=False)
+ return call
def call_later(self, delay, callback, *args, **kwargs):
"""Call something later
@@ -82,7 +82,7 @@ class Clock(object):
def cancel_call_later(self, timer, ignore_errs=False):
try:
timer.cancel()
- except:
+ except Exception:
if not ignore_errs:
raise
@@ -97,12 +97,12 @@ class Clock(object):
try:
ret_deferred.errback(e)
- except:
+ except Exception:
pass
try:
given_deferred.cancel()
- except:
+ except Exception:
pass
timer = None
@@ -110,7 +110,7 @@ class Clock(object):
def cancel(res):
try:
self.cancel_call_later(timer)
- except:
+ except Exception:
pass
return res
@@ -119,7 +119,7 @@ class Clock(object):
def success(res):
try:
ret_deferred.callback(res)
- except:
+ except Exception:
pass
return res
@@ -127,7 +127,7 @@ class Clock(object):
def err(res):
try:
ret_deferred.errback(res)
- except:
+ except Exception:
pass
given_deferred.addCallbacks(callback=success, errback=err)
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 1453faf0ef..0729bb2863 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -17,9 +17,9 @@
from twisted.internet import defer, reactor
from .logcontext import (
- PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+ PreserveLoggingContext, make_deferred_yieldable, preserve_fn
)
-from synapse.util import unwrapFirstError
+from synapse.util import logcontext, unwrapFirstError
from contextlib import contextmanager
@@ -53,6 +53,11 @@ class ObservableDeferred(object):
Cancelling or otherwise resolving an observer will not affect the original
ObservableDeferred.
+
+ NB that it does not attempt to do anything with logcontexts; in general
+ you should probably make_deferred_yieldable the deferreds
+ returned by `observe`, and ensure that the original deferred runs its
+ callbacks in the sentinel logcontext.
"""
__slots__ = ["_deferred", "_observers", "_result"]
@@ -68,7 +73,7 @@ class ObservableDeferred(object):
try:
# TODO: Handle errors here.
self._observers.pop().callback(r)
- except:
+ except Exception:
pass
return r
@@ -78,7 +83,7 @@ class ObservableDeferred(object):
try:
# TODO: Handle errors here.
self._observers.pop().errback(f)
- except:
+ except Exception:
pass
if consumeErrors:
@@ -155,7 +160,7 @@ def concurrently_execute(func, args, limit):
except StopIteration:
pass
- return preserve_context_over_deferred(defer.gatherResults([
+ return logcontext.make_deferred_yieldable(defer.gatherResults([
preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit)
], consumeErrors=True)).addErrback(unwrapFirstError)
@@ -200,10 +205,29 @@ class Linearizer(object):
try:
with PreserveLoggingContext():
yield current_defer
- except:
+ except Exception:
logger.exception("Unexpected exception in Linearizer")
- logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+ logger.info("Acquired linearizer lock %r for key %r", self.name,
+ key)
+
+ # if the code holding the lock completes synchronously, then it
+ # will recursively run the next claimant on the list. That can
+ # relatively rapidly lead to stack exhaustion. This is essentially
+ # the same problem as http://twistedmatrix.com/trac/ticket/9304.
+ #
+ # In order to break the cycle, we add a cheeky sleep(0) here to
+ # ensure that we fall back to the reactor between each iteration.
+ #
+ # (There's no particular need for it to happen before we return
+ # the context manager, but it needs to happen while we hold the
+ # lock, and the context manager's exit code must be synchronous,
+ # so actually this is the only sensible place.
+ yield run_on_reactor()
+
+ else:
+ logger.info("Acquired uncontended linearizer lock %r for key %r",
+ self.name, key)
@contextmanager
def _ctx_manager():
@@ -211,7 +235,8 @@ class Linearizer(object):
yield
finally:
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
- new_defer.callback(None)
+ with PreserveLoggingContext():
+ new_defer.callback(None)
current_d = self.key_to_defer.get(key)
if current_d is new_defer:
self.key_to_defer.pop(key, None)
@@ -253,8 +278,13 @@ class Limiter(object):
if entry[0] >= self.max_count:
new_defer = defer.Deferred()
entry[1].append(new_defer)
+
+ logger.info("Waiting to acquire limiter lock for key %r", key)
with PreserveLoggingContext():
yield new_defer
+ logger.info("Acquired limiter lock for key %r", key)
+ else:
+ logger.info("Acquired uncontended limiter lock for key %r", key)
entry[0] += 1
@@ -263,16 +293,21 @@ class Limiter(object):
try:
yield
finally:
+ logger.info("Releasing limiter lock for key %r", key)
+
# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
entry[0] -= 1
- try:
- entry[1].pop(0).callback(None)
- except IndexError:
- # If nothing else is executing for this key then remove it
- # from the map
- if entry[0] == 0:
- self.key_to_defer.pop(key, None)
+
+ if entry[1]:
+ next_def = entry[1].pop(0)
+
+ with PreserveLoggingContext():
+ next_def.callback(None)
+ elif entry[0] == 0:
+ # We were the last thing for this key: remove it from the
+ # map.
+ del self.key_to_defer[key]
defer.returnValue(_ctx_manager())
@@ -316,7 +351,7 @@ class ReadWriteLock(object):
# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
- yield curr_writer
+ yield make_deferred_yieldable(curr_writer)
@contextmanager
def _ctx_manager():
@@ -345,7 +380,7 @@ class ReadWriteLock(object):
curr_readers.clear()
self.key_to_current_writer[key] = new_defer
- yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
+ yield make_deferred_yieldable(defer.gatherResults(to_wait_on))
@contextmanager
def _ctx_manager():
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index e68f94ce77..734331caaa 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -13,32 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+import logging
-from synapse.util.logcontext import (
- PreserveLoggingContext, preserve_context_over_fn
-)
+from twisted.internet import defer
from synapse.util import unwrapFirstError
-
-import logging
-
+from synapse.util.logcontext import PreserveLoggingContext
logger = logging.getLogger(__name__)
def user_left_room(distributor, user, room_id):
- return preserve_context_over_fn(
- distributor.fire,
- "user_left_room", user=user, room_id=room_id
- )
+ with PreserveLoggingContext():
+ distributor.fire("user_left_room", user=user, room_id=room_id)
def user_joined_room(distributor, user, room_id):
- return preserve_context_over_fn(
- distributor.fire,
- "user_joined_room", user=user, room_id=room_id
- )
+ with PreserveLoggingContext():
+ distributor.fire("user_joined_room", user=user, room_id=room_id)
class Distributor(object):
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 990216145e..48c9f6802d 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -42,7 +42,7 @@ try:
def get_thread_resource_usage():
return resource.getrusage(RUSAGE_THREAD)
-except:
+except Exception:
# If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we
# won't track resource usage by returning None.
def get_thread_resource_usage():
@@ -261,67 +261,6 @@ class PreserveLoggingContext(object):
)
-class _PreservingContextDeferred(defer.Deferred):
- """A deferred that ensures that all callbacks and errbacks are called with
- the given logging context.
- """
- def __init__(self, context):
- self._log_context = context
- defer.Deferred.__init__(self)
-
- def addCallbacks(self, callback, errback=None,
- callbackArgs=None, callbackKeywords=None,
- errbackArgs=None, errbackKeywords=None):
- callback = self._wrap_callback(callback)
- errback = self._wrap_callback(errback)
- return defer.Deferred.addCallbacks(
- self, callback,
- errback=errback,
- callbackArgs=callbackArgs,
- callbackKeywords=callbackKeywords,
- errbackArgs=errbackArgs,
- errbackKeywords=errbackKeywords,
- )
-
- def _wrap_callback(self, f):
- def g(res, *args, **kwargs):
- with PreserveLoggingContext(self._log_context):
- res = f(res, *args, **kwargs)
- return res
- return g
-
-
-def preserve_context_over_fn(fn, *args, **kwargs):
- """Takes a function and invokes it with the given arguments, but removes
- and restores the current logging context while doing so.
-
- If the result is a deferred, call preserve_context_over_deferred before
- returning it.
- """
- with PreserveLoggingContext():
- res = fn(*args, **kwargs)
-
- if isinstance(res, defer.Deferred):
- return preserve_context_over_deferred(res)
- else:
- return res
-
-
-def preserve_context_over_deferred(deferred, context=None):
- """Given a deferred wrap it such that any callbacks added later to it will
- be invoked with the current context.
-
- Deprecated: this almost certainly doesn't do want you want, ie make
- the deferred follow the synapse logcontext rules: try
- ``make_deferred_yieldable`` instead.
- """
- if context is None:
- context = LoggingContext.current_context()
- d = _PreservingContextDeferred(context)
- deferred.chainDeferred(d)
- return d
-
-
def preserve_fn(f):
"""Wraps a function, to ensure that the current context is restored after
return from the function, and that the sentinel context is set once the
diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py
new file mode 100644
index 0000000000..cdbc4bffd7
--- /dev/null
+++ b/synapse/util/logformatter.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import StringIO
+import logging
+import traceback
+
+
+class LogFormatter(logging.Formatter):
+ """Log formatter which gives more detail for exceptions
+
+ This is the same as the standard log formatter, except that when logging
+ exceptions [typically via log.foo("msg", exc_info=1)], it prints the
+ sequence that led up to the point at which the exception was caught.
+ (Normally only stack frames between the point the exception was raised and
+ where it was caught are logged).
+ """
+ def __init__(self, *args, **kwargs):
+ super(LogFormatter, self).__init__(*args, **kwargs)
+
+ def formatException(self, ei):
+ sio = StringIO.StringIO()
+ (typ, val, tb) = ei
+
+ # log the stack above the exception capture point if possible, but
+ # check that we actually have an f_back attribute to work around
+ # https://twistedmatrix.com/trac/ticket/9305
+
+ if tb and hasattr(tb.tb_frame, 'f_back'):
+ sio.write("Capture point (most recent call last):\n")
+ traceback.print_stack(tb.tb_frame.f_back, None, sio)
+
+ traceback.print_exception(typ, val, tb, None, sio)
+ s = sio.getvalue()
+ sio.close()
+ if s[-1:] == "\n":
+ s = s[:-1]
+ return s
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 4fa9d1a03c..1adedbb361 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -189,7 +189,7 @@ class RetryDestinationLimiter(object):
yield self.store.set_destination_retry_timings(
self.destination, retry_last_ts, self.retry_interval
)
- except:
+ except Exception:
logger.exception(
"Failed to store set_destination_retry_timings",
)
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
index 7412fc57a4..b70f9a6b0a 100644
--- a/synapse/util/wheel_timer.py
+++ b/synapse/util/wheel_timer.py
@@ -91,7 +91,4 @@ class WheelTimer(object):
return ret
def __len__(self):
- l = 0
- for entry in self.entries:
- l += len(entry.queue)
- return l
+ return sum(len(entry.queue) for entry in self.entries)
|