summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/__init__.py18
-rw-r--r--synapse/util/async.py69
-rw-r--r--synapse/util/distributor.py22
-rw-r--r--synapse/util/logcontext.py63
-rw-r--r--synapse/util/logformatter.py51
-rw-r--r--synapse/util/retryutils.py2
-rw-r--r--synapse/util/wheel_timer.py5
7 files changed, 122 insertions, 108 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 2a2360ab5d..756d8ffa32 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -59,9 +59,9 @@ class Clock(object):
             f(function): The function to call repeatedly.
             msec(float): How long to wait between calls in milliseconds.
         """
-        l = task.LoopingCall(f)
-        l.start(msec / 1000.0, now=False)
-        return l
+        call = task.LoopingCall(f)
+        call.start(msec / 1000.0, now=False)
+        return call
 
     def call_later(self, delay, callback, *args, **kwargs):
         """Call something later
@@ -82,7 +82,7 @@ class Clock(object):
     def cancel_call_later(self, timer, ignore_errs=False):
         try:
             timer.cancel()
-        except:
+        except Exception:
             if not ignore_errs:
                 raise
 
@@ -97,12 +97,12 @@ class Clock(object):
 
             try:
                 ret_deferred.errback(e)
-            except:
+            except Exception:
                 pass
 
             try:
                 given_deferred.cancel()
-            except:
+            except Exception:
                 pass
 
         timer = None
@@ -110,7 +110,7 @@ class Clock(object):
         def cancel(res):
             try:
                 self.cancel_call_later(timer)
-            except:
+            except Exception:
                 pass
             return res
 
@@ -119,7 +119,7 @@ class Clock(object):
         def success(res):
             try:
                 ret_deferred.callback(res)
-            except:
+            except Exception:
                 pass
 
             return res
@@ -127,7 +127,7 @@ class Clock(object):
         def err(res):
             try:
                 ret_deferred.errback(res)
-            except:
+            except Exception:
                 pass
 
         given_deferred.addCallbacks(callback=success, errback=err)
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 1453faf0ef..0729bb2863 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -17,9 +17,9 @@
 from twisted.internet import defer, reactor
 
 from .logcontext import (
-    PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+    PreserveLoggingContext, make_deferred_yieldable, preserve_fn
 )
-from synapse.util import unwrapFirstError
+from synapse.util import logcontext, unwrapFirstError
 
 from contextlib import contextmanager
 
@@ -53,6 +53,11 @@ class ObservableDeferred(object):
 
     Cancelling or otherwise resolving an observer will not affect the original
     ObservableDeferred.
+
+    NB that it does not attempt to do anything with logcontexts; in general
+    you should probably make_deferred_yieldable the deferreds
+    returned by `observe`, and ensure that the original deferred runs its
+    callbacks in the sentinel logcontext.
     """
 
     __slots__ = ["_deferred", "_observers", "_result"]
@@ -68,7 +73,7 @@ class ObservableDeferred(object):
                 try:
                     # TODO: Handle errors here.
                     self._observers.pop().callback(r)
-                except:
+                except Exception:
                     pass
             return r
 
@@ -78,7 +83,7 @@ class ObservableDeferred(object):
                 try:
                     # TODO: Handle errors here.
                     self._observers.pop().errback(f)
-                except:
+                except Exception:
                     pass
 
             if consumeErrors:
@@ -155,7 +160,7 @@ def concurrently_execute(func, args, limit):
         except StopIteration:
             pass
 
-    return preserve_context_over_deferred(defer.gatherResults([
+    return logcontext.make_deferred_yieldable(defer.gatherResults([
         preserve_fn(_concurrently_execute_inner)()
         for _ in xrange(limit)
     ], consumeErrors=True)).addErrback(unwrapFirstError)
@@ -200,10 +205,29 @@ class Linearizer(object):
             try:
                 with PreserveLoggingContext():
                     yield current_defer
-            except:
+            except Exception:
                 logger.exception("Unexpected exception in Linearizer")
 
-        logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+            logger.info("Acquired linearizer lock %r for key %r", self.name,
+                        key)
+
+            # if the code holding the lock completes synchronously, then it
+            # will recursively run the next claimant on the list. That can
+            # relatively rapidly lead to stack exhaustion. This is essentially
+            # the same problem as http://twistedmatrix.com/trac/ticket/9304.
+            #
+            # In order to break the cycle, we add a cheeky sleep(0) here to
+            # ensure that we fall back to the reactor between each iteration.
+            #
+            # (There's no particular need for it to happen before we return
+            # the context manager, but it needs to happen while we hold the
+            # lock, and the context manager's exit code must be synchronous,
+            # so actually this is the only sensible place.
+            yield run_on_reactor()
+
+        else:
+            logger.info("Acquired uncontended linearizer lock %r for key %r",
+                        self.name, key)
 
         @contextmanager
         def _ctx_manager():
@@ -211,7 +235,8 @@ class Linearizer(object):
                 yield
             finally:
                 logger.info("Releasing linearizer lock %r for key %r", self.name, key)
-                new_defer.callback(None)
+                with PreserveLoggingContext():
+                    new_defer.callback(None)
                 current_d = self.key_to_defer.get(key)
                 if current_d is new_defer:
                     self.key_to_defer.pop(key, None)
@@ -253,8 +278,13 @@ class Limiter(object):
         if entry[0] >= self.max_count:
             new_defer = defer.Deferred()
             entry[1].append(new_defer)
+
+            logger.info("Waiting to acquire limiter lock for key %r", key)
             with PreserveLoggingContext():
                 yield new_defer
+            logger.info("Acquired limiter lock for key %r", key)
+        else:
+            logger.info("Acquired uncontended limiter lock for key %r", key)
 
         entry[0] += 1
 
@@ -263,16 +293,21 @@ class Limiter(object):
             try:
                 yield
             finally:
+                logger.info("Releasing limiter lock for key %r", key)
+
                 # We've finished executing so check if there are any things
                 # blocked waiting to execute and start one of them
                 entry[0] -= 1
-                try:
-                    entry[1].pop(0).callback(None)
-                except IndexError:
-                    # If nothing else is executing for this key then remove it
-                    # from the map
-                    if entry[0] == 0:
-                        self.key_to_defer.pop(key, None)
+
+                if entry[1]:
+                    next_def = entry[1].pop(0)
+
+                    with PreserveLoggingContext():
+                        next_def.callback(None)
+                elif entry[0] == 0:
+                    # We were the last thing for this key: remove it from the
+                    # map.
+                    del self.key_to_defer[key]
 
         defer.returnValue(_ctx_manager())
 
@@ -316,7 +351,7 @@ class ReadWriteLock(object):
 
         # We wait for the latest writer to finish writing. We can safely ignore
         # any existing readers... as they're readers.
-        yield curr_writer
+        yield make_deferred_yieldable(curr_writer)
 
         @contextmanager
         def _ctx_manager():
@@ -345,7 +380,7 @@ class ReadWriteLock(object):
         curr_readers.clear()
         self.key_to_current_writer[key] = new_defer
 
-        yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
+        yield make_deferred_yieldable(defer.gatherResults(to_wait_on))
 
         @contextmanager
         def _ctx_manager():
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index e68f94ce77..734331caaa 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -13,32 +13,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
 
-from synapse.util.logcontext import (
-    PreserveLoggingContext, preserve_context_over_fn
-)
+from twisted.internet import defer
 
 from synapse.util import unwrapFirstError
-
-import logging
-
+from synapse.util.logcontext import PreserveLoggingContext
 
 logger = logging.getLogger(__name__)
 
 
 def user_left_room(distributor, user, room_id):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "user_left_room", user=user, room_id=room_id
-    )
+    with PreserveLoggingContext():
+        distributor.fire("user_left_room", user=user, room_id=room_id)
 
 
 def user_joined_room(distributor, user, room_id):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "user_joined_room", user=user, room_id=room_id
-    )
+    with PreserveLoggingContext():
+        distributor.fire("user_joined_room", user=user, room_id=room_id)
 
 
 class Distributor(object):
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 990216145e..48c9f6802d 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -42,7 +42,7 @@ try:
 
     def get_thread_resource_usage():
         return resource.getrusage(RUSAGE_THREAD)
-except:
+except Exception:
     # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we
     # won't track resource usage by returning None.
     def get_thread_resource_usage():
@@ -261,67 +261,6 @@ class PreserveLoggingContext(object):
                 )
 
 
-class _PreservingContextDeferred(defer.Deferred):
-    """A deferred that ensures that all callbacks and errbacks are called with
-    the given logging context.
-    """
-    def __init__(self, context):
-        self._log_context = context
-        defer.Deferred.__init__(self)
-
-    def addCallbacks(self, callback, errback=None,
-                     callbackArgs=None, callbackKeywords=None,
-                     errbackArgs=None, errbackKeywords=None):
-        callback = self._wrap_callback(callback)
-        errback = self._wrap_callback(errback)
-        return defer.Deferred.addCallbacks(
-            self, callback,
-            errback=errback,
-            callbackArgs=callbackArgs,
-            callbackKeywords=callbackKeywords,
-            errbackArgs=errbackArgs,
-            errbackKeywords=errbackKeywords,
-        )
-
-    def _wrap_callback(self, f):
-        def g(res, *args, **kwargs):
-            with PreserveLoggingContext(self._log_context):
-                res = f(res, *args, **kwargs)
-            return res
-        return g
-
-
-def preserve_context_over_fn(fn, *args, **kwargs):
-    """Takes a function and invokes it with the given arguments, but removes
-    and restores the current logging context while doing so.
-
-    If the result is a deferred, call preserve_context_over_deferred before
-    returning it.
-    """
-    with PreserveLoggingContext():
-        res = fn(*args, **kwargs)
-
-    if isinstance(res, defer.Deferred):
-        return preserve_context_over_deferred(res)
-    else:
-        return res
-
-
-def preserve_context_over_deferred(deferred, context=None):
-    """Given a deferred wrap it such that any callbacks added later to it will
-    be invoked with the current context.
-
-    Deprecated: this almost certainly doesn't do want you want, ie make
-    the deferred follow the synapse logcontext rules: try
-    ``make_deferred_yieldable`` instead.
-    """
-    if context is None:
-        context = LoggingContext.current_context()
-    d = _PreservingContextDeferred(context)
-    deferred.chainDeferred(d)
-    return d
-
-
 def preserve_fn(f):
     """Wraps a function, to ensure that the current context is restored after
     return from the function, and that the sentinel context is set once the
diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py
new file mode 100644
index 0000000000..cdbc4bffd7
--- /dev/null
+++ b/synapse/util/logformatter.py
@@ -0,0 +1,51 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import StringIO
+import logging
+import traceback
+
+
+class LogFormatter(logging.Formatter):
+    """Log formatter which gives more detail for exceptions
+
+    This is the same as the standard log formatter, except that when logging
+    exceptions [typically via log.foo("msg", exc_info=1)], it prints the
+    sequence that led up to the point at which the exception was caught.
+    (Normally only stack frames between the point the exception was raised and
+    where it was caught are logged).
+    """
+    def __init__(self, *args, **kwargs):
+        super(LogFormatter, self).__init__(*args, **kwargs)
+
+    def formatException(self, ei):
+        sio = StringIO.StringIO()
+        (typ, val, tb) = ei
+
+        # log the stack above the exception capture point if possible, but
+        # check that we actually have an f_back attribute to work around
+        # https://twistedmatrix.com/trac/ticket/9305
+
+        if tb and hasattr(tb.tb_frame, 'f_back'):
+            sio.write("Capture point (most recent call last):\n")
+            traceback.print_stack(tb.tb_frame.f_back, None, sio)
+
+        traceback.print_exception(typ, val, tb, None, sio)
+        s = sio.getvalue()
+        sio.close()
+        if s[-1:] == "\n":
+            s = s[:-1]
+        return s
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 4fa9d1a03c..1adedbb361 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -189,7 +189,7 @@ class RetryDestinationLimiter(object):
                 yield self.store.set_destination_retry_timings(
                     self.destination, retry_last_ts, self.retry_interval
                 )
-            except:
+            except Exception:
                 logger.exception(
                     "Failed to store set_destination_retry_timings",
                 )
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
index 7412fc57a4..b70f9a6b0a 100644
--- a/synapse/util/wheel_timer.py
+++ b/synapse/util/wheel_timer.py
@@ -91,7 +91,4 @@ class WheelTimer(object):
         return ret
 
     def __len__(self):
-        l = 0
-        for entry in self.entries:
-            l += len(entry.queue)
-        return l
+        return sum(len(entry.queue) for entry in self.entries)