summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/async.py9
-rw-r--r--synapse/util/caches/descriptors.py117
-rw-r--r--synapse/util/caches/lrucache.py39
-rw-r--r--synapse/util/caches/treecache.py3
-rw-r--r--synapse/util/logcontext.py16
-rw-r--r--synapse/util/metrics.py19
6 files changed, 148 insertions, 55 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index c84b23ff46..347fb1e380 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -146,10 +146,10 @@ def concurrently_execute(func, args, limit):
         except StopIteration:
             pass
 
-    return defer.gatherResults([
+    return preserve_context_over_deferred(defer.gatherResults([
         preserve_fn(_concurrently_execute_inner)()
         for _ in xrange(limit)
-    ], consumeErrors=True).addErrback(unwrapFirstError)
+    ], consumeErrors=True)).addErrback(unwrapFirstError)
 
 
 class Linearizer(object):
@@ -181,7 +181,8 @@ class Linearizer(object):
         self.key_to_defer[key] = new_defer
 
         if current_defer:
-            yield preserve_context_over_deferred(current_defer)
+            with PreserveLoggingContext():
+                yield current_defer
 
         @contextmanager
         def _ctx_manager():
@@ -264,7 +265,7 @@ class ReadWriteLock(object):
         curr_readers.clear()
         self.key_to_current_writer[key] = new_defer
 
-        yield defer.gatherResults(to_wait_on)
+        yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
 
         @contextmanager
         def _ctx_manager():
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index f31dfb22b7..8dba61d49f 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -25,8 +25,7 @@ from synapse.util.logcontext import (
 from . import DEBUG_CACHES, register_cache
 
 from twisted.internet import defer
-
-from collections import OrderedDict
+from collections import namedtuple
 
 import os
 import functools
@@ -54,16 +53,11 @@ class Cache(object):
         "metrics",
     )
 
-    def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
-        if lru:
-            cache_type = TreeCache if tree else dict
-            self.cache = LruCache(
-                max_size=max_entries, keylen=keylen, cache_type=cache_type
-            )
-            self.max_entries = None
-        else:
-            self.cache = OrderedDict()
-            self.max_entries = max_entries
+    def __init__(self, name, max_entries=1000, keylen=1, tree=False):
+        cache_type = TreeCache if tree else dict
+        self.cache = LruCache(
+            max_size=max_entries, keylen=keylen, cache_type=cache_type
+        )
 
         self.name = name
         self.keylen = keylen
@@ -81,8 +75,8 @@ class Cache(object):
                     "Cache objects can only be accessed from the main thread"
                 )
 
-    def get(self, key, default=_CacheSentinel):
-        val = self.cache.get(key, _CacheSentinel)
+    def get(self, key, default=_CacheSentinel, callback=None):
+        val = self.cache.get(key, _CacheSentinel, callback=callback)
         if val is not _CacheSentinel:
             self.metrics.inc_hits()
             return val
@@ -94,19 +88,15 @@ class Cache(object):
         else:
             return default
 
-    def update(self, sequence, key, value):
+    def update(self, sequence, key, value, callback=None):
         self.check_thread()
         if self.sequence == sequence:
             # Only update the cache if the caches sequence number matches the
             # number that the cache had before the SELECT was started (SYN-369)
-            self.prefill(key, value)
-
-    def prefill(self, key, value):
-        if self.max_entries is not None:
-            while len(self.cache) >= self.max_entries:
-                self.cache.popitem(last=False)
+            self.prefill(key, value, callback=callback)
 
-        self.cache[key] = value
+    def prefill(self, key, value, callback=None):
+        self.cache.set(key, value, callback=callback)
 
     def invalidate(self, key):
         self.check_thread()
@@ -151,9 +141,21 @@ class CacheDescriptor(object):
     The wrapped function has another additional callable, called "prefill",
     which can be used to insert values into the cache specifically, without
     calling the calculation function.
+
+    Cached functions can be "chained" (i.e. a cached function can call other cached
+    functions and get appropriately invalidated when they called caches are
+    invalidated) by adding a special "cache_context" argument to the function
+    and passing that as a kwarg to all caches called. For example::
+
+        @cachedInlineCallbacks(cache_context=True)
+        def foo(self, key, cache_context):
+            r1 = yield self.bar1(key, on_invalidate=cache_context.invalidate)
+            r2 = yield self.bar2(key, on_invalidate=cache_context.invalidate)
+            defer.returnValue(r1 + r2)
+
     """
-    def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False,
-                 inlineCallbacks=False):
+    def __init__(self, orig, max_entries=1000, num_args=1, tree=False,
+                 inlineCallbacks=False, cache_context=False):
         max_entries = int(max_entries * CACHE_SIZE_FACTOR)
 
         self.orig = orig
@@ -165,15 +167,33 @@ class CacheDescriptor(object):
 
         self.max_entries = max_entries
         self.num_args = num_args
-        self.lru = lru
         self.tree = tree
 
-        self.arg_names = inspect.getargspec(orig).args[1:num_args + 1]
+        all_args = inspect.getargspec(orig)
+        self.arg_names = all_args.args[1:num_args + 1]
+
+        if "cache_context" in all_args.args:
+            if not cache_context:
+                raise ValueError(
+                    "Cannot have a 'cache_context' arg without setting"
+                    " cache_context=True"
+                )
+            try:
+                self.arg_names.remove("cache_context")
+            except ValueError:
+                pass
+        elif cache_context:
+            raise ValueError(
+                "Cannot have cache_context=True without having an arg"
+                " named `cache_context`"
+            )
+
+        self.add_cache_context = cache_context
 
         if len(self.arg_names) < self.num_args:
             raise Exception(
                 "Not enough explicit positional arguments to key off of for %r."
-                " (@cached cannot key off of *args or **kwars)"
+                " (@cached cannot key off of *args or **kwargs)"
                 % (orig.__name__,)
             )
 
@@ -182,16 +202,29 @@ class CacheDescriptor(object):
             name=self.orig.__name__,
             max_entries=self.max_entries,
             keylen=self.num_args,
-            lru=self.lru,
             tree=self.tree,
         )
 
         @functools.wraps(self.orig)
         def wrapped(*args, **kwargs):
+            # If we're passed a cache_context then we'll want to call its invalidate()
+            # whenever we are invalidated
+            invalidate_callback = kwargs.pop("on_invalidate", None)
+
+            # Add temp cache_context so inspect.getcallargs doesn't explode
+            if self.add_cache_context:
+                kwargs["cache_context"] = None
+
             arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs)
             cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names)
+
+            # Add our own `cache_context` to argument list if the wrapped function
+            # has asked for one
+            if self.add_cache_context:
+                kwargs["cache_context"] = _CacheContext(cache, cache_key)
+
             try:
-                cached_result_d = cache.get(cache_key)
+                cached_result_d = cache.get(cache_key, callback=invalidate_callback)
 
                 observer = cached_result_d.observe()
                 if DEBUG_CACHES:
@@ -228,7 +261,7 @@ class CacheDescriptor(object):
                 ret.addErrback(onErr)
 
                 ret = ObservableDeferred(ret, consumeErrors=True)
-                cache.update(sequence, cache_key, ret)
+                cache.update(sequence, cache_key, ret, callback=invalidate_callback)
 
                 return preserve_context_over_deferred(ret.observe())
 
@@ -297,6 +330,10 @@ class CacheListDescriptor(object):
 
         @functools.wraps(self.orig)
         def wrapped(*args, **kwargs):
+            # If we're passed a cache_context then we'll want to call its invalidate()
+            # whenever we are invalidated
+            invalidate_callback = kwargs.pop("on_invalidate", None)
+
             arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs)
             keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names]
             list_args = arg_dict[self.list_name]
@@ -311,7 +348,7 @@ class CacheListDescriptor(object):
                 key[self.list_pos] = arg
 
                 try:
-                    res = cache.get(tuple(key))
+                    res = cache.get(tuple(key), callback=invalidate_callback)
                     if not res.has_succeeded():
                         res = res.observe()
                         res.addCallback(lambda r, arg: (arg, r), arg)
@@ -345,7 +382,10 @@ class CacheListDescriptor(object):
 
                     key = list(keyargs)
                     key[self.list_pos] = arg
-                    cache.update(sequence, tuple(key), observer)
+                    cache.update(
+                        sequence, tuple(key), observer,
+                        callback=invalidate_callback
+                    )
 
                     def invalidate(f, key):
                         cache.invalidate(key)
@@ -376,24 +416,29 @@ class CacheListDescriptor(object):
         return wrapped
 
 
-def cached(max_entries=1000, num_args=1, lru=True, tree=False):
+class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
+    def invalidate(self):
+        self.cache.invalidate(self.key)
+
+
+def cached(max_entries=1000, num_args=1, tree=False, cache_context=False):
     return lambda orig: CacheDescriptor(
         orig,
         max_entries=max_entries,
         num_args=num_args,
-        lru=lru,
         tree=tree,
+        cache_context=cache_context,
     )
 
 
-def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False, tree=False):
+def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False):
     return lambda orig: CacheDescriptor(
         orig,
         max_entries=max_entries,
         num_args=num_args,
-        lru=lru,
         tree=tree,
         inlineCallbacks=True,
+        cache_context=cache_context,
     )
 
 
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index f9df445a8d..9c4c679175 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -30,13 +30,14 @@ def enumerate_leaves(node, depth):
 
 
 class _Node(object):
-    __slots__ = ["prev_node", "next_node", "key", "value"]
+    __slots__ = ["prev_node", "next_node", "key", "value", "callbacks"]
 
-    def __init__(self, prev_node, next_node, key, value):
+    def __init__(self, prev_node, next_node, key, value, callbacks=set()):
         self.prev_node = prev_node
         self.next_node = next_node
         self.key = key
         self.value = value
+        self.callbacks = callbacks
 
 
 class LruCache(object):
@@ -44,6 +45,9 @@ class LruCache(object):
     Least-recently-used cache.
     Supports del_multi only if cache_type=TreeCache
     If cache_type=TreeCache, all keys must be tuples.
+
+    Can also set callbacks on objects when getting/setting which are fired
+    when that key gets invalidated/evicted.
     """
     def __init__(self, max_size, keylen=1, cache_type=dict):
         cache = cache_type()
@@ -62,10 +66,10 @@ class LruCache(object):
 
             return inner
 
-        def add_node(key, value):
+        def add_node(key, value, callbacks=set()):
             prev_node = list_root
             next_node = prev_node.next_node
-            node = _Node(prev_node, next_node, key, value)
+            node = _Node(prev_node, next_node, key, value, callbacks)
             prev_node.next_node = node
             next_node.prev_node = node
             cache[key] = node
@@ -88,23 +92,41 @@ class LruCache(object):
             prev_node.next_node = next_node
             next_node.prev_node = prev_node
 
+            for cb in node.callbacks:
+                cb()
+            node.callbacks.clear()
+
         @synchronized
-        def cache_get(key, default=None):
+        def cache_get(key, default=None, callback=None):
             node = cache.get(key, None)
             if node is not None:
                 move_node_to_front(node)
+                if callback:
+                    node.callbacks.add(callback)
                 return node.value
             else:
                 return default
 
         @synchronized
-        def cache_set(key, value):
+        def cache_set(key, value, callback=None):
             node = cache.get(key, None)
             if node is not None:
+                if value != node.value:
+                    for cb in node.callbacks:
+                        cb()
+                    node.callbacks.clear()
+
+                if callback:
+                    node.callbacks.add(callback)
+
                 move_node_to_front(node)
                 node.value = value
             else:
-                add_node(key, value)
+                if callback:
+                    callbacks = set([callback])
+                else:
+                    callbacks = set()
+                add_node(key, value, callbacks)
                 if len(cache) > max_size:
                     todelete = list_root.prev_node
                     delete_node(todelete)
@@ -148,6 +170,9 @@ class LruCache(object):
         def cache_clear():
             list_root.next_node = list_root
             list_root.prev_node = list_root
+            for node in cache.values():
+                for cb in node.callbacks:
+                    cb()
             cache.clear()
 
         @synchronized
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index 03bc1401b7..c31585aea3 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -64,6 +64,9 @@ class TreeCache(object):
         self.size -= cnt
         return popped
 
+    def values(self):
+        return [e.value for e in self.root.values()]
+
     def __len__(self):
         return self.size
 
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 5316259d15..6c83eb213d 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -297,12 +297,13 @@ def preserve_context_over_fn(fn, *args, **kwargs):
         return res
 
 
-def preserve_context_over_deferred(deferred):
+def preserve_context_over_deferred(deferred, context=None):
     """Given a deferred wrap it such that any callbacks added later to it will
     be invoked with the current context.
     """
-    current_context = LoggingContext.current_context()
-    d = _PreservingContextDeferred(current_context)
+    if context is None:
+        context = LoggingContext.current_context()
+    d = _PreservingContextDeferred(context)
     deferred.chainDeferred(d)
     return d
 
@@ -316,8 +317,13 @@ def preserve_fn(f):
 
     def g(*args, **kwargs):
         with PreserveLoggingContext(current):
-            return f(*args, **kwargs)
-
+            res = f(*args, **kwargs)
+            if isinstance(res, defer.Deferred):
+                return preserve_context_over_deferred(
+                    res, context=LoggingContext.sentinel
+                )
+            else:
+                return res
     return g
 
 
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 0b944d3e63..4ea930d3e8 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -13,10 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer
 
 from synapse.util.logcontext import LoggingContext
 import synapse.metrics
 
+from functools import wraps
 import logging
 
 
@@ -47,6 +49,18 @@ block_db_txn_duration = metrics.register_distribution(
 )
 
 
+def measure_func(name):
+    def wrapper(func):
+        @wraps(func)
+        @defer.inlineCallbacks
+        def measured_func(self, *args, **kwargs):
+            with Measure(self.clock, name):
+                r = yield func(self, *args, **kwargs)
+            defer.returnValue(r)
+        return measured_func
+    return wrapper
+
+
 class Measure(object):
     __slots__ = [
         "clock", "name", "start_context", "start", "new_context", "ru_utime",
@@ -64,7 +78,6 @@ class Measure(object):
         self.start = self.clock.time_msec()
         self.start_context = LoggingContext.current_context()
         if not self.start_context:
-            logger.warn("Entered Measure without log context: %s", self.name)
             self.start_context = LoggingContext("Measure")
             self.start_context.__enter__()
             self.created_context = True
@@ -74,7 +87,7 @@ class Measure(object):
         self.db_txn_duration = self.start_context.db_txn_duration
 
     def __exit__(self, exc_type, exc_val, exc_tb):
-        if exc_type is not None or not self.start_context:
+        if isinstance(exc_type, Exception) or not self.start_context:
             return
 
         duration = self.clock.time_msec() - self.start
@@ -85,7 +98,7 @@ class Measure(object):
         if context != self.start_context:
             logger.warn(
                 "Context has unexpectedly changed from '%s' to '%s'. (%r)",
-                context, self.start_context, self.name
+                self.start_context, context, self.name
             )
             return