diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 9a8fae0497..0ae7e2ef3b 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+import re
from itertools import islice
import attr
@@ -138,3 +139,27 @@ def log_failure(failure, msg, consumeErrors=True):
if not consumeErrors:
return failure
+
+
+def glob_to_regex(glob):
+ """Converts a glob to a compiled regex object.
+
+ The regex is anchored at the beginning and end of the string.
+
+ Args:
+ glob (str)
+
+ Returns:
+ re.RegexObject
+ """
+ res = ''
+ for c in glob:
+ if c == '*':
+ res = res + '.*'
+ elif c == '?':
+ res = res + '.'
+ else:
+ res = res + re.escape(c)
+
+ # \A anchors at start of string, \Z at end of string
+ return re.compile(r"\A" + res + r"\Z", re.IGNORECASE)
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index f2bde74dc5..625aedc940 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -15,6 +15,8 @@
import logging
+from six import integer_types
+
from sortedcontainers import SortedDict
from synapse.util import caches
@@ -47,7 +49,7 @@ class StreamChangeCache(object):
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
"""
- assert type(stream_pos) is int or type(stream_pos) is long
+ assert type(stream_pos) in integer_types
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 89224b26cc..4c6e92beb8 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works.
import logging
import threading
-from twisted.internet import defer
+from twisted.internet import defer, threads
logger = logging.getLogger(__name__)
@@ -562,58 +562,76 @@ def _set_context_cb(result, context):
return result
-# modules to ignore in `logcontext_tracer`
-_to_ignore = [
- "synapse.util.logcontext",
- "synapse.http.server",
- "synapse.storage._base",
- "synapse.util.async_helpers",
-]
+def defer_to_thread(reactor, f, *args, **kwargs):
+ """
+ Calls the function `f` using a thread from the reactor's default threadpool and
+ returns the result as a Deferred.
+
+ Creates a new logcontext for `f`, which is created as a child of the current
+ logcontext (so its CPU usage metrics will get attributed to the current
+ logcontext). `f` should preserve the logcontext it is given.
+
+ The result deferred follows the Synapse logcontext rules: you should `yield`
+ on it.
+
+ Args:
+ reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+ the Deferred will be invoked, and whose threadpool we should use for the
+ function.
+
+ Normally this will be hs.get_reactor().
+
+ f (callable): The function to call.
+ args: positional arguments to pass to f.
-def logcontext_tracer(frame, event, arg):
- """A tracer that logs whenever a logcontext "unexpectedly" changes within
- a function. Probably inaccurate.
+ kwargs: keyword arguments to pass to f.
- Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
+ Returns:
+ Deferred: A Deferred which fires a callback with the result of `f`, or an
+ errback if `f` throws an exception.
"""
- if event == 'call':
- name = frame.f_globals["__name__"]
- if name.startswith("synapse"):
- if name == "synapse.util.logcontext":
- if frame.f_code.co_name in ["__enter__", "__exit__"]:
- tracer = frame.f_back.f_trace
- if tracer:
- tracer.just_changed = True
-
- tracer = frame.f_trace
- if tracer:
- return tracer
-
- if not any(name.startswith(ig) for ig in _to_ignore):
- return LineTracer()
-
-
-class LineTracer(object):
- __slots__ = ["context", "just_changed"]
-
- def __init__(self):
- self.context = LoggingContext.current_context()
- self.just_changed = False
-
- def __call__(self, frame, event, arg):
- if event in 'line':
- if self.just_changed:
- self.context = LoggingContext.current_context()
- self.just_changed = False
- else:
- c = LoggingContext.current_context()
- if c != self.context:
- logger.info(
- "Context changed! %s -> %s, %s, %s",
- self.context, c,
- frame.f_code.co_filename, frame.f_lineno
- )
- self.context = c
+ return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
- return self
+
+def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
+ """
+ A wrapper for twisted.internet.threads.deferToThreadpool, which handles
+ logcontexts correctly.
+
+ Calls the function `f` using a thread from the given threadpool and returns
+ the result as a Deferred.
+
+ Creates a new logcontext for `f`, which is created as a child of the current
+ logcontext (so its CPU usage metrics will get attributed to the current
+ logcontext). `f` should preserve the logcontext it is given.
+
+ The result deferred follows the Synapse logcontext rules: you should `yield`
+ on it.
+
+ Args:
+ reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
+ the Deferred will be invoked. Normally this will be hs.get_reactor().
+
+ threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
+ running `f`. Normally this will be hs.get_reactor().getThreadPool().
+
+ f (callable): The function to call.
+
+ args: positional arguments to pass to f.
+
+ kwargs: keyword arguments to pass to f.
+
+ Returns:
+ Deferred: A Deferred which fires a callback with the result of `f`, or an
+ errback if `f` throws an exception.
+ """
+ logcontext = LoggingContext.current_context()
+
+ def g():
+ with LoggingContext(parent_context=logcontext):
+ return f(*args, **kwargs)
+
+ return make_deferred_yieldable(
+ threads.deferToThreadPool(reactor, threadpool, g)
+ )
|