diff options
author | Matthew Hodgson <matthew@matrix.org> | 2016-06-03 12:14:18 +0100 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2016-06-03 12:14:18 +0100 |
commit | 8d740132f499b2c232716533ea786dfffacc1ec8 (patch) | |
tree | 5034a5aba0c6e604683080357ae6481803a4dea1 /synapse/util | |
parent | brand the email from header (diff) | |
parent | Merge branch 'erikj/cache_perf' of github.com:matrix-org/synapse into develop (diff) | |
download | synapse-8d740132f499b2c232716533ea786dfffacc1ec8.tar.xz |
Merge branch 'develop' into matthew/brand-from-header
Diffstat (limited to '')
-rw-r--r-- | synapse/util/async.py | 9 | ||||
-rw-r--r-- | synapse/util/caches/__init__.py | 20 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 52 | ||||
-rw-r--r-- | synapse/util/caches/dictionary_cache.py | 8 | ||||
-rw-r--r-- | synapse/util/caches/expiringcache.py | 8 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 16 |
6 files changed, 78 insertions, 35 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index 0d6f48e2d8..40be7fe7e3 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -102,6 +102,15 @@ class ObservableDeferred(object): def observers(self): return self._observers + def has_called(self): + return self._result is not None + + def has_succeeded(self): + return self._result is not None and self._result[0] is True + + def get_result(self): + return self._result[1] + def __getattr__(self, name): return getattr(self._deferred, name) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index d53569ca49..ebd715c5dc 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -24,11 +24,21 @@ DEBUG_CACHES = False metrics = synapse.metrics.get_metrics_for("synapse.util.caches") caches_by_name = {} -cache_counter = metrics.register_cache( - "cache", - lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, - labels=["name"], -) +# cache_counter = metrics.register_cache( +# "cache", +# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, +# labels=["name"], +# ) + + +def register_cache(name, cache): + caches_by_name[name] = cache + return metrics.register_cache( + "cache", + lambda: len(cache), + name, + ) + _string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR)) caches_by_name["string_cache"] = _string_cache diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 758f5982b0..f31dfb22b7 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -22,7 +22,7 @@ from synapse.util.logcontext import ( PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn ) -from . import caches_by_name, DEBUG_CACHES, cache_counter +from . import DEBUG_CACHES, register_cache from twisted.internet import defer @@ -33,6 +33,7 @@ import functools import inspect import threading + logger = logging.getLogger(__name__) @@ -43,6 +44,15 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1)) class Cache(object): + __slots__ = ( + "cache", + "max_entries", + "name", + "keylen", + "sequence", + "thread", + "metrics", + ) def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False): if lru: @@ -59,7 +69,7 @@ class Cache(object): self.keylen = keylen self.sequence = 0 self.thread = None - caches_by_name[name] = self.cache + self.metrics = register_cache(name, self.cache) def check_thread(self): expected_thread = self.thread @@ -74,10 +84,10 @@ class Cache(object): def get(self, key, default=_CacheSentinel): val = self.cache.get(key, _CacheSentinel) if val is not _CacheSentinel: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return val - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() if default is _CacheSentinel: raise KeyError() @@ -293,16 +303,21 @@ class CacheListDescriptor(object): # cached is a dict arg -> deferred, where deferred results in a # 2-tuple (`arg`, `result`) - cached = {} + results = {} + cached_defers = {} missing = [] for arg in list_args: key = list(keyargs) key[self.list_pos] = arg try: - res = cache.get(tuple(key)).observe() - res.addCallback(lambda r, arg: (arg, r), arg) - cached[arg] = res + res = cache.get(tuple(key)) + if not res.has_succeeded(): + res = res.observe() + res.addCallback(lambda r, arg: (arg, r), arg) + cached_defers[arg] = res + else: + results[arg] = res.get_result() except KeyError: missing.append(arg) @@ -340,12 +355,21 @@ class CacheListDescriptor(object): res = observer.observe() res.addCallback(lambda r, arg: (arg, r), arg) - cached[arg] = res - - return preserve_context_over_deferred(defer.gatherResults( - cached.values(), - consumeErrors=True, - ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res))) + cached_defers[arg] = res + + if cached_defers: + def update_results_dict(res): + results.update(res) + return results + + return preserve_context_over_deferred(defer.gatherResults( + cached_defers.values(), + consumeErrors=True, + ).addCallback(update_results_dict).addErrback( + unwrapFirstError + )) + else: + return results obj.__dict__[self.orig.__name__] = wrapped diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index f92d80542b..b0ca1bb79d 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -15,7 +15,7 @@ from synapse.util.caches.lrucache import LruCache from collections import namedtuple -from . import caches_by_name, cache_counter +from . import register_cache import threading import logging @@ -43,7 +43,7 @@ class DictionaryCache(object): __slots__ = [] self.sentinel = Sentinel() - caches_by_name[name] = self.cache + self.metrics = register_cache(name, self.cache) def check_thread(self): expected_thread = self.thread @@ -58,7 +58,7 @@ class DictionaryCache(object): def get(self, key, dict_keys=None): entry = self.cache.get(key, self.sentinel) if entry is not self.sentinel: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() if dict_keys is None: return DictionaryEntry(entry.full, dict(entry.value)) @@ -69,7 +69,7 @@ class DictionaryCache(object): if k in entry.value }) - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return DictionaryEntry(False, {}) def invalidate(self, key): diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 2b68c1ac93..080388958f 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches import register_cache import logging @@ -49,7 +49,7 @@ class ExpiringCache(object): self._cache = {} - caches_by_name[cache_name] = self._cache + self.metrics = register_cache(cache_name, self._cache) def start(self): if not self._expiry_ms: @@ -78,9 +78,9 @@ class ExpiringCache(object): def __getitem__(self, key): try: entry = self._cache[key] - cache_counter.inc_hits(self._cache_name) + self.metrics.inc_hits() except KeyError: - cache_counter.inc_misses(self._cache_name) + self.metrics.inc_misses() raise if self._reset_expiry_on_get: diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index ea8a74ca69..3c051dabc4 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.caches import cache_counter, caches_by_name +from synapse.util.caches import register_cache from blist import sorteddict @@ -42,7 +42,7 @@ class StreamChangeCache(object): self._cache = sorteddict() self._earliest_known_stream_pos = current_stream_pos self.name = name - caches_by_name[self.name] = self._cache + self.metrics = register_cache(self.name, self._cache) for entity, stream_pos in prefilled_cache.items(): self.entity_has_changed(entity, stream_pos) @@ -53,19 +53,19 @@ class StreamChangeCache(object): assert type(stream_pos) is int if stream_pos < self._earliest_known_stream_pos: - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return True latest_entity_change_pos = self._entity_to_key.get(entity, None) if latest_entity_change_pos is None: - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return False if stream_pos < latest_entity_change_pos: - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return True - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() return False def get_entities_changed(self, entities, stream_pos): @@ -82,10 +82,10 @@ class StreamChangeCache(object): self._cache[k] for k in keys[i:] ).intersection(entities) - cache_counter.inc_hits(self.name) + self.metrics.inc_hits() else: result = entities - cache_counter.inc_misses(self.name) + self.metrics.inc_misses() return result |