summary refs log tree commit diff
path: root/synapse/util/caches
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/caches')
-rw-r--r--synapse/util/caches/__init__.py87
-rw-r--r--synapse/util/caches/descriptors.py16
-rw-r--r--synapse/util/caches/dictionary_cache.py2
-rw-r--r--synapse/util/caches/expiringcache.py4
-rw-r--r--synapse/util/caches/response_cache.py11
-rw-r--r--synapse/util/caches/stream_change_cache.py2
-rw-r--r--synapse/util/caches/treecache.py6
7 files changed, 93 insertions, 35 deletions
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 4adae96681..183faf75a1 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,28 +13,77 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import synapse.metrics
+from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily
+
 import os
 
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
+from six.moves import intern
+import six
 
-metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
 
 caches_by_name = {}
-# cache_counter = metrics.register_cache(
-#     "cache",
-#     lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
-#     labels=["name"],
-# )
-
-
-def register_cache(name, cache):
-    caches_by_name[name] = cache
-    return metrics.register_cache(
-        "cache",
-        lambda: len(cache),
-        name,
-    )
+collectors_by_name = {}
+
+cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
+cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
+cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
+cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
+
+response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
+response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
+response_cache_evicted = Gauge(
+    "synapse_util_caches_response_cache:evicted_size", "", ["name"]
+)
+response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"])
+
+
+def register_cache(cache_type, cache_name, cache):
+
+    # Check if the metric is already registered. Unregister it, if so.
+    # This usually happens during tests, as at runtime these caches are
+    # effectively singletons.
+    metric_name = "cache_%s_%s" % (cache_type, cache_name)
+    if metric_name in collectors_by_name.keys():
+        REGISTRY.unregister(collectors_by_name[metric_name])
+
+    class CacheMetric(object):
+
+        hits = 0
+        misses = 0
+        evicted_size = 0
+
+        def inc_hits(self):
+            self.hits += 1
+
+        def inc_misses(self):
+            self.misses += 1
+
+        def inc_evictions(self, size=1):
+            self.evicted_size += size
+
+        def describe(self):
+            return []
+
+        def collect(self):
+            if cache_type == "response_cache":
+                response_cache_size.labels(cache_name).set(len(cache))
+                response_cache_hits.labels(cache_name).set(self.hits)
+                response_cache_evicted.labels(cache_name).set(self.evicted_size)
+                response_cache_total.labels(cache_name).set(self.hits + self.misses)
+            else:
+                cache_size.labels(cache_name).set(len(cache))
+                cache_hits.labels(cache_name).set(self.hits)
+                cache_evicted.labels(cache_name).set(self.evicted_size)
+                cache_total.labels(cache_name).set(self.hits + self.misses)
+
+            yield GaugeMetricFamily("__unused", "")
+
+    metric = CacheMetric()
+    REGISTRY.register(metric)
+    caches_by_name[cache_name] = cache
+    collectors_by_name[metric_name] = metric
+    return metric
 
 
 KNOWN_KEYS = {
@@ -66,7 +115,9 @@ def intern_string(string):
         return None
 
     try:
-        string = string.encode("ascii")
+        if six.PY2:
+            string = string.encode("ascii")
+
         return intern(string)
     except UnicodeEncodeError:
         return string
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 68285a7594..fc1874b65b 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -31,6 +31,9 @@ import functools
 import inspect
 import threading
 
+from six import string_types, itervalues
+import six
+
 
 logger = logging.getLogger(__name__)
 
@@ -80,7 +83,7 @@ class Cache(object):
         self.name = name
         self.keylen = keylen
         self.thread = None
-        self.metrics = register_cache(name, self.cache)
+        self.metrics = register_cache("cache", name, self.cache)
 
     def _on_evicted(self, evicted_count):
         self.metrics.inc_evictions(evicted_count)
@@ -205,7 +208,7 @@ class Cache(object):
     def invalidate_all(self):
         self.check_thread()
         self.cache.clear()
-        for entry in self._pending_deferred_cache.itervalues():
+        for entry in itervalues(self._pending_deferred_cache):
             entry.invalidate()
         self._pending_deferred_cache.clear()
 
@@ -392,9 +395,10 @@ class CacheDescriptor(_CacheDescriptorBase):
 
                 ret.addErrback(onErr)
 
-                # If our cache_key is a string, try to convert to ascii to save
-                # a bit of space in large caches
-                if isinstance(cache_key, basestring):
+                # If our cache_key is a string on py2, try to convert to ascii
+                # to save a bit of space in large caches. Py3 does this
+                # internally automatically.
+                if six.PY2 and isinstance(cache_key, string_types):
                     cache_key = to_ascii(cache_key)
 
                 result_d = ObservableDeferred(ret, consumeErrors=True)
@@ -565,7 +569,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
                     return results
 
                 return logcontext.make_deferred_yieldable(defer.gatherResults(
-                    cached_defers.values(),
+                    list(cached_defers.values()),
                     consumeErrors=True,
                 ).addCallback(update_results_dict).addErrback(
                     unwrapFirstError
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index 1709e8b429..bdc21e348f 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -55,7 +55,7 @@ class DictionaryCache(object):
             __slots__ = []
 
         self.sentinel = Sentinel()
-        self.metrics = register_cache(name, self.cache)
+        self.metrics = register_cache("dictionary", name, self.cache)
 
     def check_thread(self):
         expected_thread = self.thread
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 0aa103eecb..ff04c91955 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -52,12 +52,12 @@ class ExpiringCache(object):
 
         self._cache = OrderedDict()
 
-        self.metrics = register_cache(cache_name, self)
-
         self.iterable = iterable
 
         self._size_estimate = 0
 
+        self.metrics = register_cache("expiring", cache_name, self)
+
     def start(self):
         if not self._expiry_ms:
             # Don't bother starting the loop if things never expire
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 7f79333e96..a8491b42d5 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -17,7 +17,7 @@ import logging
 from twisted.internet import defer
 
 from synapse.util.async import ObservableDeferred
-from synapse.util.caches import metrics as cache_metrics
+from synapse.util.caches import register_cache
 from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
@@ -38,15 +38,16 @@ class ResponseCache(object):
         self.timeout_sec = timeout_ms / 1000.
 
         self._name = name
-        self._metrics = cache_metrics.register_cache(
-            "response_cache",
-            size_callback=lambda: self.size(),
-            cache_name=name,
+        self._metrics = register_cache(
+            "response_cache", name, self
         )
 
     def size(self):
         return len(self.pending_result_cache)
 
+    def __len__(self):
+        return self.size()
+
     def get(self, key):
         """Look up the given key.
 
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 941d873ab8..a7fe0397fa 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -38,7 +38,7 @@ class StreamChangeCache(object):
         self._cache = sorteddict()
         self._earliest_known_stream_pos = current_stream_pos
         self.name = name
-        self.metrics = register_cache(self.name, self._cache)
+        self.metrics = register_cache("cache", self.name, self._cache)
 
         for entity, stream_pos in prefilled_cache.items():
             self.entity_has_changed(entity, stream_pos)
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index fcc341a6b7..dd4c9e6067 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -1,3 +1,5 @@
+from six import itervalues
+
 SENTINEL = object()
 
 
@@ -49,7 +51,7 @@ class TreeCache(object):
         if popped is SENTINEL:
             return default
 
-        node_and_keys = zip(nodes, key)
+        node_and_keys = list(zip(nodes, key))
         node_and_keys.reverse()
         node_and_keys.append((self.root, None))
 
@@ -76,7 +78,7 @@ def iterate_tree_cache_entry(d):
     can contain dicts.
     """
     if isinstance(d, dict):
-        for value_d in d.itervalues():
+        for value_d in itervalues(d):
             for value in iterate_tree_cache_entry(value_d):
                 yield value
     else: