summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/__init__.py31
-rw-r--r--synapse/util/caches/stream_change_cache.py4
-rw-r--r--synapse/util/logcontext.py120
-rw-r--r--synapse/util/manhole.py6
4 files changed, 106 insertions, 55 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 680ea928c7..9a8fae0497 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -68,7 +68,10 @@ class Clock(object):
         """
         call = task.LoopingCall(f)
         call.clock = self._reactor
-        call.start(msec / 1000.0, now=False)
+        d = call.start(msec / 1000.0, now=False)
+        d.addErrback(
+            log_failure, "Looping call died", consumeErrors=False,
+        )
         return call
 
     def call_later(self, delay, callback, *args, **kwargs):
@@ -109,3 +112,29 @@ def batch_iter(iterable, size):
     sourceiter = iter(iterable)
     # call islice until it returns an empty tuple
     return iter(lambda: tuple(islice(sourceiter, size)), ())
+
+
+def log_failure(failure, msg, consumeErrors=True):
+    """Creates a function suitable for passing to `Deferred.addErrback` that
+    logs any failures that occur.
+
+    Args:
+        msg (str): Message to log
+        consumeErrors (bool): If true consumes the failure, otherwise passes
+            on down the callback chain
+
+    Returns:
+        func(Failure)
+    """
+
+    logger.error(
+        msg,
+        exc_info=(
+            failure.type,
+            failure.value,
+            failure.getTracebackObject()
+        )
+    )
+
+    if not consumeErrors:
+        return failure
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index f2bde74dc5..625aedc940 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -15,6 +15,8 @@
 
 import logging
 
+from six import integer_types
+
 from sortedcontainers import SortedDict
 
 from synapse.util import caches
@@ -47,7 +49,7 @@ class StreamChangeCache(object):
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
         """
-        assert type(stream_pos) is int or type(stream_pos) is long
+        assert type(stream_pos) in integer_types
 
         if stream_pos < self._earliest_known_stream_pos:
             self.metrics.inc_misses()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 89224b26cc..4c6e92beb8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works.
 import logging
 import threading
 
-from twisted.internet import defer
+from twisted.internet import defer, threads
 
 logger = logging.getLogger(__name__)
 
@@ -562,58 +562,76 @@ def _set_context_cb(result, context):
     return result
 
 
-# modules to ignore in `logcontext_tracer`
-_to_ignore = [
-    "synapse.util.logcontext",
-    "synapse.http.server",
-    "synapse.storage._base",
-    "synapse.util.async_helpers",
-]
+def defer_to_thread(reactor, f, *args, **kwargs):
+    """
+    Calls the function `f` using a thread from the reactor's default threadpool and
+    returns the result as a Deferred.
+
+    Creates a new logcontext for `f`, which is created as a child of the current
+    logcontext (so its CPU usage metrics will get attributed to the current
+    logcontext). `f` should preserve the logcontext it is given.
+
+    The result deferred follows the Synapse logcontext rules: you should `yield`
+    on it.
+
+    Args:
+        reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+            the Deferred will be invoked, and whose threadpool we should use for the
+            function.
+
+            Normally this will be hs.get_reactor().
+
+        f (callable): The function to call.
 
+        args: positional arguments to pass to f.
 
-def logcontext_tracer(frame, event, arg):
-    """A tracer that logs whenever a logcontext "unexpectedly" changes within
-    a function. Probably inaccurate.
+        kwargs: keyword arguments to pass to f.
 
-    Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
+    Returns:
+        Deferred: A Deferred which fires a callback with the result of `f`, or an
+            errback if `f` throws an exception.
     """
-    if event == 'call':
-        name = frame.f_globals["__name__"]
-        if name.startswith("synapse"):
-            if name == "synapse.util.logcontext":
-                if frame.f_code.co_name in ["__enter__", "__exit__"]:
-                    tracer = frame.f_back.f_trace
-                    if tracer:
-                        tracer.just_changed = True
-
-            tracer = frame.f_trace
-            if tracer:
-                return tracer
-
-            if not any(name.startswith(ig) for ig in _to_ignore):
-                return LineTracer()
-
-
-class LineTracer(object):
-    __slots__ = ["context", "just_changed"]
-
-    def __init__(self):
-        self.context = LoggingContext.current_context()
-        self.just_changed = False
-
-    def __call__(self, frame, event, arg):
-        if event in 'line':
-            if self.just_changed:
-                self.context = LoggingContext.current_context()
-                self.just_changed = False
-            else:
-                c = LoggingContext.current_context()
-                if c != self.context:
-                    logger.info(
-                        "Context changed! %s -> %s, %s, %s",
-                        self.context, c,
-                        frame.f_code.co_filename, frame.f_lineno
-                    )
-                    self.context = c
+    return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
 
-        return self
+
+def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
+    """
+    A wrapper for twisted.internet.threads.deferToThreadpool, which handles
+    logcontexts correctly.
+
+    Calls the function `f` using a thread from the given threadpool and returns
+    the result as a Deferred.
+
+    Creates a new logcontext for `f`, which is created as a child of the current
+    logcontext (so its CPU usage metrics will get attributed to the current
+    logcontext). `f` should preserve the logcontext it is given.
+
+    The result deferred follows the Synapse logcontext rules: you should `yield`
+    on it.
+
+    Args:
+        reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+            the Deferred will be invoked. Normally this will be hs.get_reactor().
+
+        threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
+            running `f`. Normally this will be hs.get_reactor().getThreadPool().
+
+        f (callable): The function to call.
+
+        args: positional arguments to pass to f.
+
+        kwargs: keyword arguments to pass to f.
+
+    Returns:
+        Deferred: A Deferred which fires a callback with the result of `f`, or an
+            errback if `f` throws an exception.
+    """
+    logcontext = LoggingContext.current_context()
+
+    def g():
+        with LoggingContext(parent_context=logcontext):
+            return f(*args, **kwargs)
+
+    return make_deferred_yieldable(
+        threads.deferToThreadPool(reactor, threadpool, g)
+    )
diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py
index 8d0f2a8918..9cb7e9c9ab 100644
--- a/synapse/util/manhole.py
+++ b/synapse/util/manhole.py
@@ -70,6 +70,8 @@ def manhole(username, password, globals):
     Returns:
         twisted.internet.protocol.Factory: A factory to pass to ``listenTCP``
     """
+    if not isinstance(password, bytes):
+        password = password.encode('ascii')
 
     checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(
         **{username: password}
@@ -82,7 +84,7 @@ def manhole(username, password, globals):
     )
 
     factory = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker]))
-    factory.publicKeys['ssh-rsa'] = Key.fromString(PUBLIC_KEY)
-    factory.privateKeys['ssh-rsa'] = Key.fromString(PRIVATE_KEY)
+    factory.publicKeys[b'ssh-rsa'] = Key.fromString(PUBLIC_KEY)
+    factory.privateKeys[b'ssh-rsa'] = Key.fromString(PRIVATE_KEY)
 
     return factory