diff options
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/__init__.py | 25 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 4 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 120 |
3 files changed, 97 insertions, 52 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 9a8fae0497..0ae7e2ef3b 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +import re from itertools import islice import attr @@ -138,3 +139,27 @@ def log_failure(failure, msg, consumeErrors=True): if not consumeErrors: return failure + + +def glob_to_regex(glob): + """Converts a glob to a compiled regex object. + + The regex is anchored at the beginning and end of the string. + + Args: + glob (str) + + Returns: + re.RegexObject + """ + res = '' + for c in glob: + if c == '*': + res = res + '.*' + elif c == '?': + res = res + '.' + else: + res = res + re.escape(c) + + # \A anchors at start of string, \Z at end of string + return re.compile(r"\A" + res + r"\Z", re.IGNORECASE) diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index f2bde74dc5..625aedc940 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -15,6 +15,8 @@ import logging +from six import integer_types + from sortedcontainers import SortedDict from synapse.util import caches @@ -47,7 +49,7 @@ class StreamChangeCache(object): def has_entity_changed(self, entity, stream_pos): """Returns True if the entity may have been updated since stream_pos """ - assert type(stream_pos) is int or type(stream_pos) is long + assert type(stream_pos) in integer_types if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 89224b26cc..4c6e92beb8 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -25,7 +25,7 @@ See doc/log_contexts.rst for details on how this works. import logging import threading -from twisted.internet import defer +from twisted.internet import defer, threads logger = logging.getLogger(__name__) @@ -562,58 +562,76 @@ def _set_context_cb(result, context): return result -# modules to ignore in `logcontext_tracer` -_to_ignore = [ - "synapse.util.logcontext", - "synapse.http.server", - "synapse.storage._base", - "synapse.util.async_helpers", -] +def defer_to_thread(reactor, f, *args, **kwargs): + """ + Calls the function `f` using a thread from the reactor's default threadpool and + returns the result as a Deferred. + + Creates a new logcontext for `f`, which is created as a child of the current + logcontext (so its CPU usage metrics will get attributed to the current + logcontext). `f` should preserve the logcontext it is given. + + The result deferred follows the Synapse logcontext rules: you should `yield` + on it. + + Args: + reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread + the Deferred will be invoked, and whose threadpool we should use for the + function. + + Normally this will be hs.get_reactor(). + + f (callable): The function to call. + args: positional arguments to pass to f. -def logcontext_tracer(frame, event, arg): - """A tracer that logs whenever a logcontext "unexpectedly" changes within - a function. Probably inaccurate. + kwargs: keyword arguments to pass to f. - Use by calling `sys.settrace(logcontext_tracer)` in the main thread. + Returns: + Deferred: A Deferred which fires a callback with the result of `f`, or an + errback if `f` throws an exception. """ - if event == 'call': - name = frame.f_globals["__name__"] - if name.startswith("synapse"): - if name == "synapse.util.logcontext": - if frame.f_code.co_name in ["__enter__", "__exit__"]: - tracer = frame.f_back.f_trace - if tracer: - tracer.just_changed = True - - tracer = frame.f_trace - if tracer: - return tracer - - if not any(name.startswith(ig) for ig in _to_ignore): - return LineTracer() - - -class LineTracer(object): - __slots__ = ["context", "just_changed"] - - def __init__(self): - self.context = LoggingContext.current_context() - self.just_changed = False - - def __call__(self, frame, event, arg): - if event in 'line': - if self.just_changed: - self.context = LoggingContext.current_context() - self.just_changed = False - else: - c = LoggingContext.current_context() - if c != self.context: - logger.info( - "Context changed! %s -> %s, %s, %s", - self.context, c, - frame.f_code.co_filename, frame.f_lineno - ) - self.context = c + return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs) - return self + +def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs): + """ + A wrapper for twisted.internet.threads.deferToThreadpool, which handles + logcontexts correctly. + + Calls the function `f` using a thread from the given threadpool and returns + the result as a Deferred. + + Creates a new logcontext for `f`, which is created as a child of the current + logcontext (so its CPU usage metrics will get attributed to the current + logcontext). `f` should preserve the logcontext it is given. + + The result deferred follows the Synapse logcontext rules: you should `yield` + on it. + + Args: + reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread + the Deferred will be invoked. Normally this will be hs.get_reactor(). + + threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for + running `f`. Normally this will be hs.get_reactor().getThreadPool(). + + f (callable): The function to call. + + args: positional arguments to pass to f. + + kwargs: keyword arguments to pass to f. + + Returns: + Deferred: A Deferred which fires a callback with the result of `f`, or an + errback if `f` throws an exception. + """ + logcontext = LoggingContext.current_context() + + def g(): + with LoggingContext(parent_context=logcontext): + return f(*args, **kwargs) + + return make_deferred_yieldable( + threads.deferToThreadPool(reactor, threadpool, g) + ) |