diff options
author | Erik Johnston <erik@matrix.org> | 2016-08-24 14:39:35 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-08-24 14:39:35 +0100 |
commit | 37638c06c59bbf7327d5c1edc4b9e346716e7374 (patch) | |
tree | 1c843a49d3d5168ff998a54f50d30cdc3814f104 /synapse/util | |
parent | Merge branch 'release-v0.17.0' of github.com:matrix-org/synapse (diff) | |
parent | Bump changelog and version (diff) | |
download | synapse-37638c06c59bbf7327d5c1edc4b9e346716e7374.tar.xz |
Merge branch 'release-v0.17.1' of github.com:matrix-org/synapse v0.17.1
Diffstat (limited to '')
-rw-r--r-- | synapse/util/async.py | 9 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 117 | ||||
-rw-r--r-- | synapse/util/caches/lrucache.py | 39 | ||||
-rw-r--r-- | synapse/util/caches/treecache.py | 3 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 16 | ||||
-rw-r--r-- | synapse/util/metrics.py | 19 |
6 files changed, 148 insertions, 55 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index c84b23ff46..347fb1e380 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -146,10 +146,10 @@ def concurrently_execute(func, args, limit): except StopIteration: pass - return defer.gatherResults([ + return preserve_context_over_deferred(defer.gatherResults([ preserve_fn(_concurrently_execute_inner)() for _ in xrange(limit) - ], consumeErrors=True).addErrback(unwrapFirstError) + ], consumeErrors=True)).addErrback(unwrapFirstError) class Linearizer(object): @@ -181,7 +181,8 @@ class Linearizer(object): self.key_to_defer[key] = new_defer if current_defer: - yield preserve_context_over_deferred(current_defer) + with PreserveLoggingContext(): + yield current_defer @contextmanager def _ctx_manager(): @@ -264,7 +265,7 @@ class ReadWriteLock(object): curr_readers.clear() self.key_to_current_writer[key] = new_defer - yield defer.gatherResults(to_wait_on) + yield preserve_context_over_deferred(defer.gatherResults(to_wait_on)) @contextmanager def _ctx_manager(): diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index f31dfb22b7..8dba61d49f 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -25,8 +25,7 @@ from synapse.util.logcontext import ( from . import DEBUG_CACHES, register_cache from twisted.internet import defer - -from collections import OrderedDict +from collections import namedtuple import os import functools @@ -54,16 +53,11 @@ class Cache(object): "metrics", ) - def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): - if lru: - cache_type = TreeCache if tree else dict - self.cache = LruCache( - max_size=max_entries, keylen=keylen, cache_type=cache_type - ) - self.max_entries = None - else: - self.cache = OrderedDict() - self.max_entries = max_entries + def __init__(self, name, max_entries=1000, keylen=1, tree=False): + cache_type = TreeCache if tree else dict + self.cache = LruCache( + max_size=max_entries, keylen=keylen, cache_type=cache_type + ) self.name = name self.keylen = keylen @@ -81,8 +75,8 @@ class Cache(object): "Cache objects can only be accessed from the main thread" ) - def get(self, key, default=_CacheSentinel): - val = self.cache.get(key, _CacheSentinel) + def get(self, key, default=_CacheSentinel, callback=None): + val = self.cache.get(key, _CacheSentinel, callback=callback) if val is not _CacheSentinel: self.metrics.inc_hits() return val @@ -94,19 +88,15 @@ class Cache(object): else: return default - def update(self, sequence, key, value): + def update(self, sequence, key, value, callback=None): self.check_thread() if self.sequence == sequence: # Only update the cache if the caches sequence number matches the # number that the cache had before the SELECT was started (SYN-369) - self.prefill(key, value) - - def prefill(self, key, value): - if self.max_entries is not None: - while len(self.cache) >= self.max_entries: - self.cache.popitem(last=False) + self.prefill(key, value, callback=callback) - self.cache[key] = value + def prefill(self, key, value, callback=None): + self.cache.set(key, value, callback=callback) def invalidate(self, key): self.check_thread() @@ -151,9 +141,21 @@ class CacheDescriptor(object): The wrapped function has another additional callable, called "prefill", which can be used to insert values into the cache specifically, without calling the calculation function. + + Cached functions can be "chained" (i.e. a cached function can call other cached + functions and get appropriately invalidated when they called caches are + invalidated) by adding a special "cache_context" argument to the function + and passing that as a kwarg to all caches called. For example:: + + @cachedInlineCallbacks(cache_context=True) + def foo(self, key, cache_context): + r1 = yield self.bar1(key, on_invalidate=cache_context.invalidate) + r2 = yield self.bar2(key, on_invalidate=cache_context.invalidate) + defer.returnValue(r1 + r2) + """ - def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False, - inlineCallbacks=False): + def __init__(self, orig, max_entries=1000, num_args=1, tree=False, + inlineCallbacks=False, cache_context=False): max_entries = int(max_entries * CACHE_SIZE_FACTOR) self.orig = orig @@ -165,15 +167,33 @@ class CacheDescriptor(object): self.max_entries = max_entries self.num_args = num_args - self.lru = lru self.tree = tree - self.arg_names = inspect.getargspec(orig).args[1:num_args + 1] + all_args = inspect.getargspec(orig) + self.arg_names = all_args.args[1:num_args + 1] + + if "cache_context" in all_args.args: + if not cache_context: + raise ValueError( + "Cannot have a 'cache_context' arg without setting" + " cache_context=True" + ) + try: + self.arg_names.remove("cache_context") + except ValueError: + pass + elif cache_context: + raise ValueError( + "Cannot have cache_context=True without having an arg" + " named `cache_context`" + ) + + self.add_cache_context = cache_context if len(self.arg_names) < self.num_args: raise Exception( "Not enough explicit positional arguments to key off of for %r." - " (@cached cannot key off of *args or **kwars)" + " (@cached cannot key off of *args or **kwargs)" % (orig.__name__,) ) @@ -182,16 +202,29 @@ class CacheDescriptor(object): name=self.orig.__name__, max_entries=self.max_entries, keylen=self.num_args, - lru=self.lru, tree=self.tree, ) @functools.wraps(self.orig) def wrapped(*args, **kwargs): + # If we're passed a cache_context then we'll want to call its invalidate() + # whenever we are invalidated + invalidate_callback = kwargs.pop("on_invalidate", None) + + # Add temp cache_context so inspect.getcallargs doesn't explode + if self.add_cache_context: + kwargs["cache_context"] = None + arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) cache_key = tuple(arg_dict[arg_nm] for arg_nm in self.arg_names) + + # Add our own `cache_context` to argument list if the wrapped function + # has asked for one + if self.add_cache_context: + kwargs["cache_context"] = _CacheContext(cache, cache_key) + try: - cached_result_d = cache.get(cache_key) + cached_result_d = cache.get(cache_key, callback=invalidate_callback) observer = cached_result_d.observe() if DEBUG_CACHES: @@ -228,7 +261,7 @@ class CacheDescriptor(object): ret.addErrback(onErr) ret = ObservableDeferred(ret, consumeErrors=True) - cache.update(sequence, cache_key, ret) + cache.update(sequence, cache_key, ret, callback=invalidate_callback) return preserve_context_over_deferred(ret.observe()) @@ -297,6 +330,10 @@ class CacheListDescriptor(object): @functools.wraps(self.orig) def wrapped(*args, **kwargs): + # If we're passed a cache_context then we'll want to call its invalidate() + # whenever we are invalidated + invalidate_callback = kwargs.pop("on_invalidate", None) + arg_dict = inspect.getcallargs(self.orig, obj, *args, **kwargs) keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names] list_args = arg_dict[self.list_name] @@ -311,7 +348,7 @@ class CacheListDescriptor(object): key[self.list_pos] = arg try: - res = cache.get(tuple(key)) + res = cache.get(tuple(key), callback=invalidate_callback) if not res.has_succeeded(): res = res.observe() res.addCallback(lambda r, arg: (arg, r), arg) @@ -345,7 +382,10 @@ class CacheListDescriptor(object): key = list(keyargs) key[self.list_pos] = arg - cache.update(sequence, tuple(key), observer) + cache.update( + sequence, tuple(key), observer, + callback=invalidate_callback + ) def invalidate(f, key): cache.invalidate(key) @@ -376,24 +416,29 @@ class CacheListDescriptor(object): return wrapped -def cached(max_entries=1000, num_args=1, lru=True, tree=False): +class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))): + def invalidate(self): + self.cache.invalidate(self.key) + + +def cached(max_entries=1000, num_args=1, tree=False, cache_context=False): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, num_args=num_args, - lru=lru, tree=tree, + cache_context=cache_context, ) -def cachedInlineCallbacks(max_entries=1000, num_args=1, lru=False, tree=False): +def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, num_args=num_args, - lru=lru, tree=tree, inlineCallbacks=True, + cache_context=cache_context, ) diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index f9df445a8d..9c4c679175 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -30,13 +30,14 @@ def enumerate_leaves(node, depth): class _Node(object): - __slots__ = ["prev_node", "next_node", "key", "value"] + __slots__ = ["prev_node", "next_node", "key", "value", "callbacks"] - def __init__(self, prev_node, next_node, key, value): + def __init__(self, prev_node, next_node, key, value, callbacks=set()): self.prev_node = prev_node self.next_node = next_node self.key = key self.value = value + self.callbacks = callbacks class LruCache(object): @@ -44,6 +45,9 @@ class LruCache(object): Least-recently-used cache. Supports del_multi only if cache_type=TreeCache If cache_type=TreeCache, all keys must be tuples. + + Can also set callbacks on objects when getting/setting which are fired + when that key gets invalidated/evicted. """ def __init__(self, max_size, keylen=1, cache_type=dict): cache = cache_type() @@ -62,10 +66,10 @@ class LruCache(object): return inner - def add_node(key, value): + def add_node(key, value, callbacks=set()): prev_node = list_root next_node = prev_node.next_node - node = _Node(prev_node, next_node, key, value) + node = _Node(prev_node, next_node, key, value, callbacks) prev_node.next_node = node next_node.prev_node = node cache[key] = node @@ -88,23 +92,41 @@ class LruCache(object): prev_node.next_node = next_node next_node.prev_node = prev_node + for cb in node.callbacks: + cb() + node.callbacks.clear() + @synchronized - def cache_get(key, default=None): + def cache_get(key, default=None, callback=None): node = cache.get(key, None) if node is not None: move_node_to_front(node) + if callback: + node.callbacks.add(callback) return node.value else: return default @synchronized - def cache_set(key, value): + def cache_set(key, value, callback=None): node = cache.get(key, None) if node is not None: + if value != node.value: + for cb in node.callbacks: + cb() + node.callbacks.clear() + + if callback: + node.callbacks.add(callback) + move_node_to_front(node) node.value = value else: - add_node(key, value) + if callback: + callbacks = set([callback]) + else: + callbacks = set() + add_node(key, value, callbacks) if len(cache) > max_size: todelete = list_root.prev_node delete_node(todelete) @@ -148,6 +170,9 @@ class LruCache(object): def cache_clear(): list_root.next_node = list_root list_root.prev_node = list_root + for node in cache.values(): + for cb in node.callbacks: + cb() cache.clear() @synchronized diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 03bc1401b7..c31585aea3 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -64,6 +64,9 @@ class TreeCache(object): self.size -= cnt return popped + def values(self): + return [e.value for e in self.root.values()] + def __len__(self): return self.size diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 5316259d15..6c83eb213d 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -297,12 +297,13 @@ def preserve_context_over_fn(fn, *args, **kwargs): return res -def preserve_context_over_deferred(deferred): +def preserve_context_over_deferred(deferred, context=None): """Given a deferred wrap it such that any callbacks added later to it will be invoked with the current context. """ - current_context = LoggingContext.current_context() - d = _PreservingContextDeferred(current_context) + if context is None: + context = LoggingContext.current_context() + d = _PreservingContextDeferred(context) deferred.chainDeferred(d) return d @@ -316,8 +317,13 @@ def preserve_fn(f): def g(*args, **kwargs): with PreserveLoggingContext(current): - return f(*args, **kwargs) - + res = f(*args, **kwargs) + if isinstance(res, defer.Deferred): + return preserve_context_over_deferred( + res, context=LoggingContext.sentinel + ) + else: + return res return g diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 0b944d3e63..4ea930d3e8 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -13,10 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from synapse.util.logcontext import LoggingContext import synapse.metrics +from functools import wraps import logging @@ -47,6 +49,18 @@ block_db_txn_duration = metrics.register_distribution( ) +def measure_func(name): + def wrapper(func): + @wraps(func) + @defer.inlineCallbacks + def measured_func(self, *args, **kwargs): + with Measure(self.clock, name): + r = yield func(self, *args, **kwargs) + defer.returnValue(r) + return measured_func + return wrapper + + class Measure(object): __slots__ = [ "clock", "name", "start_context", "start", "new_context", "ru_utime", @@ -64,7 +78,6 @@ class Measure(object): self.start = self.clock.time_msec() self.start_context = LoggingContext.current_context() if not self.start_context: - logger.warn("Entered Measure without log context: %s", self.name) self.start_context = LoggingContext("Measure") self.start_context.__enter__() self.created_context = True @@ -74,7 +87,7 @@ class Measure(object): self.db_txn_duration = self.start_context.db_txn_duration def __exit__(self, exc_type, exc_val, exc_tb): - if exc_type is not None or not self.start_context: + if isinstance(exc_type, Exception) or not self.start_context: return duration = self.clock.time_msec() - self.start @@ -85,7 +98,7 @@ class Measure(object): if context != self.start_context: logger.warn( "Context has unexpectedly changed from '%s' to '%s'. (%r)", - context, self.start_context, self.name + self.start_context, context, self.name ) return |