summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/__init__.py25
-rw-r--r--synapse/util/caches/stream_change_cache.py4
-rw-r--r--synapse/util/logcontext.py120
3 files changed, 97 insertions, 52 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 9a8fae0497..0ae7e2ef3b 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+import re
 from itertools import islice
 
 import attr
@@ -138,3 +139,27 @@ def log_failure(failure, msg, consumeErrors=True):
 
     if not consumeErrors:
         return failure
+
+
+def glob_to_regex(glob):
+    """Converts a glob to a compiled regex object.
+
+    The regex is anchored at the beginning and end of the string.
+
+    Args:
+        glob (str)
+
+    Returns:
+        re.RegexObject
+    """
+    res = ''
+    for c in glob:
+        if c == '*':
+            res = res + '.*'
+        elif c == '?':
+            res = res + '.'
+        else:
+            res = res + re.escape(c)
+
+    # \A anchors at start of string, \Z at end of string
+    return re.compile(r"\A" + res + r"\Z", re.IGNORECASE)
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index f2bde74dc5..625aedc940 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -15,6 +15,8 @@
 
 import logging
 
+from six import integer_types
+
 from sortedcontainers import SortedDict
 
 from synapse.util import caches
@@ -47,7 +49,7 @@ class StreamChangeCache(object):
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
         """
-        assert type(stream_pos) is int or type(stream_pos) is long
+        assert type(stream_pos) in integer_types
 
         if stream_pos < self._earliest_known_stream_pos:
             self.metrics.inc_misses()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 89224b26cc..4c6e92beb8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works.
 import logging
 import threading
 
-from twisted.internet import defer
+from twisted.internet import defer, threads
 
 logger = logging.getLogger(__name__)
 
@@ -562,58 +562,76 @@ def _set_context_cb(result, context):
     return result
 
 
-# modules to ignore in `logcontext_tracer`
-_to_ignore = [
-    "synapse.util.logcontext",
-    "synapse.http.server",
-    "synapse.storage._base",
-    "synapse.util.async_helpers",
-]
+def defer_to_thread(reactor, f, *args, **kwargs):
+    """
+    Calls the function `f` using a thread from the reactor's default threadpool and
+    returns the result as a Deferred.
+
+    Creates a new logcontext for `f`, which is created as a child of the current
+    logcontext (so its CPU usage metrics will get attributed to the current
+    logcontext). `f` should preserve the logcontext it is given.
+
+    The result deferred follows the Synapse logcontext rules: you should `yield`
+    on it.
+
+    Args:
+        reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+            the Deferred will be invoked, and whose threadpool we should use for the
+            function.
+
+            Normally this will be hs.get_reactor().
+
+        f (callable): The function to call.
 
+        args: positional arguments to pass to f.
 
-def logcontext_tracer(frame, event, arg):
-    """A tracer that logs whenever a logcontext "unexpectedly" changes within
-    a function. Probably inaccurate.
+        kwargs: keyword arguments to pass to f.
 
-    Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
+    Returns:
+        Deferred: A Deferred which fires a callback with the result of `f`, or an
+            errback if `f` throws an exception.
     """
-    if event == 'call':
-        name = frame.f_globals["__name__"]
-        if name.startswith("synapse"):
-            if name == "synapse.util.logcontext":
-                if frame.f_code.co_name in ["__enter__", "__exit__"]:
-                    tracer = frame.f_back.f_trace
-                    if tracer:
-                        tracer.just_changed = True
-
-            tracer = frame.f_trace
-            if tracer:
-                return tracer
-
-            if not any(name.startswith(ig) for ig in _to_ignore):
-                return LineTracer()
-
-
-class LineTracer(object):
-    __slots__ = ["context", "just_changed"]
-
-    def __init__(self):
-        self.context = LoggingContext.current_context()
-        self.just_changed = False
-
-    def __call__(self, frame, event, arg):
-        if event in 'line':
-            if self.just_changed:
-                self.context = LoggingContext.current_context()
-                self.just_changed = False
-            else:
-                c = LoggingContext.current_context()
-                if c != self.context:
-                    logger.info(
-                        "Context changed! %s -> %s, %s, %s",
-                        self.context, c,
-                        frame.f_code.co_filename, frame.f_lineno
-                    )
-                    self.context = c
+    return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
 
-        return self
+
+def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
+    """
+    A wrapper for twisted.internet.threads.deferToThreadpool, which handles
+    logcontexts correctly.
+
+    Calls the function `f` using a thread from the given threadpool and returns
+    the result as a Deferred.
+
+    Creates a new logcontext for `f`, which is created as a child of the current
+    logcontext (so its CPU usage metrics will get attributed to the current
+    logcontext). `f` should preserve the logcontext it is given.
+
+    The result deferred follows the Synapse logcontext rules: you should `yield`
+    on it.
+
+    Args:
+        reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+            the Deferred will be invoked. Normally this will be hs.get_reactor().
+
+        threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
+            running `f`. Normally this will be hs.get_reactor().getThreadPool().
+
+        f (callable): The function to call.
+
+        args: positional arguments to pass to f.
+
+        kwargs: keyword arguments to pass to f.
+
+    Returns:
+        Deferred: A Deferred which fires a callback with the result of `f`, or an
+            errback if `f` throws an exception.
+    """
+    logcontext = LoggingContext.current_context()
+
+    def g():
+        with LoggingContext(parent_context=logcontext):
+            return f(*args, **kwargs)
+
+    return make_deferred_yieldable(
+        threads.deferToThreadPool(reactor, threadpool, g)
+    )