diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 4adae96681..900575eb3c 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,28 +13,87 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse.metrics
+from prometheus_client.core import Gauge, REGISTRY, GaugeMetricFamily
+
import os
+from six.moves import intern
+import six
+
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
-metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
+
+def get_cache_factor_for(cache_name):
+ env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
+ factor = os.environ.get(env_var)
+ if factor:
+ return float(factor)
+
+ return CACHE_SIZE_FACTOR
+
caches_by_name = {}
-# cache_counter = metrics.register_cache(
-# "cache",
-# lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
-# labels=["name"],
-# )
-
-
-def register_cache(name, cache):
- caches_by_name[name] = cache
- return metrics.register_cache(
- "cache",
- lambda: len(cache),
- name,
- )
+collectors_by_name = {}
+
+cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
+cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
+cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
+cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
+
+response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
+response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
+response_cache_evicted = Gauge(
+ "synapse_util_caches_response_cache:evicted_size", "", ["name"]
+)
+response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"])
+
+
+def register_cache(cache_type, cache_name, cache):
+
+ # Check if the metric is already registered. Unregister it, if so.
+ # This usually happens during tests, as at runtime these caches are
+ # effectively singletons.
+ metric_name = "cache_%s_%s" % (cache_type, cache_name)
+ if metric_name in collectors_by_name.keys():
+ REGISTRY.unregister(collectors_by_name[metric_name])
+
+ class CacheMetric(object):
+
+ hits = 0
+ misses = 0
+ evicted_size = 0
+
+ def inc_hits(self):
+ self.hits += 1
+
+ def inc_misses(self):
+ self.misses += 1
+
+ def inc_evictions(self, size=1):
+ self.evicted_size += size
+
+ def describe(self):
+ return []
+
+ def collect(self):
+ if cache_type == "response_cache":
+ response_cache_size.labels(cache_name).set(len(cache))
+ response_cache_hits.labels(cache_name).set(self.hits)
+ response_cache_evicted.labels(cache_name).set(self.evicted_size)
+ response_cache_total.labels(cache_name).set(self.hits + self.misses)
+ else:
+ cache_size.labels(cache_name).set(len(cache))
+ cache_hits.labels(cache_name).set(self.hits)
+ cache_evicted.labels(cache_name).set(self.evicted_size)
+ cache_total.labels(cache_name).set(self.hits + self.misses)
+
+ yield GaugeMetricFamily("__unused", "")
+
+ metric = CacheMetric()
+ REGISTRY.register(metric)
+ caches_by_name[cache_name] = cache
+ collectors_by_name[metric_name] = metric
+ return metric
KNOWN_KEYS = {
@@ -66,7 +125,9 @@ def intern_string(string):
return None
try:
- string = string.encode("ascii")
+ if six.PY2:
+ string = string.encode("ascii")
+
return intern(string)
except UnicodeEncodeError:
return string
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 68285a7594..65a1042de1 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -17,7 +17,7 @@ import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError, logcontext
-from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.stringutils import to_ascii
@@ -31,6 +31,9 @@ import functools
import inspect
import threading
+from six import string_types, itervalues
+import six
+
logger = logging.getLogger(__name__)
@@ -80,7 +83,7 @@ class Cache(object):
self.name = name
self.keylen = keylen
self.thread = None
- self.metrics = register_cache(name, self.cache)
+ self.metrics = register_cache("cache", name, self.cache)
def _on_evicted(self, evicted_count):
self.metrics.inc_evictions(evicted_count)
@@ -205,7 +208,7 @@ class Cache(object):
def invalidate_all(self):
self.check_thread()
self.cache.clear()
- for entry in self._pending_deferred_cache.itervalues():
+ for entry in itervalues(self._pending_deferred_cache):
entry.invalidate()
self._pending_deferred_cache.clear()
@@ -310,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase):
orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
cache_context=cache_context)
- max_entries = int(max_entries * CACHE_SIZE_FACTOR)
+ max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
self.max_entries = max_entries
self.tree = tree
@@ -392,9 +395,10 @@ class CacheDescriptor(_CacheDescriptorBase):
ret.addErrback(onErr)
- # If our cache_key is a string, try to convert to ascii to save
- # a bit of space in large caches
- if isinstance(cache_key, basestring):
+ # If our cache_key is a string on py2, try to convert to ascii
+ # to save a bit of space in large caches. Py3 does this
+ # internally automatically.
+ if six.PY2 and isinstance(cache_key, string_types):
cache_key = to_ascii(cache_key)
result_d = ObservableDeferred(ret, consumeErrors=True)
@@ -565,7 +569,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
return results
return logcontext.make_deferred_yieldable(defer.gatherResults(
- cached_defers.values(),
+ list(cached_defers.values()),
consumeErrors=True,
).addCallback(update_results_dict).addErrback(
unwrapFirstError
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index 1709e8b429..bdc21e348f 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -55,7 +55,7 @@ class DictionaryCache(object):
__slots__ = []
self.sentinel = Sentinel()
- self.metrics = register_cache(name, self.cache)
+ self.metrics = register_cache("dictionary", name, self.cache)
def check_thread(self):
expected_thread = self.thread
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 0aa103eecb..ff04c91955 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -52,12 +52,12 @@ class ExpiringCache(object):
self._cache = OrderedDict()
- self.metrics = register_cache(cache_name, self)
-
self.iterable = iterable
self._size_estimate = 0
+ self.metrics = register_cache("expiring", cache_name, self)
+
def start(self):
if not self._expiry_ms:
# Don't bother starting the loop if things never expire
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 7f79333e96..a8491b42d5 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
from synapse.util.async import ObservableDeferred
-from synapse.util.caches import metrics as cache_metrics
+from synapse.util.caches import register_cache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@@ -38,15 +38,16 @@ class ResponseCache(object):
self.timeout_sec = timeout_ms / 1000.
self._name = name
- self._metrics = cache_metrics.register_cache(
- "response_cache",
- size_callback=lambda: self.size(),
- cache_name=name,
+ self._metrics = register_cache(
+ "response_cache", name, self
)
def size(self):
return len(self.pending_result_cache)
+ def __len__(self):
+ return self.size()
+
def get(self, key):
"""Look up the given key.
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 941d873ab8..817118e30f 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,10 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
+from synapse.util import caches
-from blist import sorteddict
+from sortedcontainers import SortedDict
import logging
@@ -32,16 +32,18 @@ class StreamChangeCache(object):
entities that may have changed since that position. If position key is too
old then the cache will simply return all given entities.
"""
- def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
- self._max_size = int(max_size * CACHE_SIZE_FACTOR)
+
+ def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
+ self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
self._entity_to_key = {}
- self._cache = sorteddict()
+ self._cache = SortedDict()
self._earliest_known_stream_pos = current_stream_pos
self.name = name
- self.metrics = register_cache(self.name, self._cache)
+ self.metrics = caches.register_cache("cache", self.name, self._cache)
- for entity, stream_pos in prefilled_cache.items():
- self.entity_has_changed(entity, stream_pos)
+ if prefilled_cache:
+ for entity, stream_pos in prefilled_cache.items():
+ self.entity_has_changed(entity, stream_pos)
def has_entity_changed(self, entity, stream_pos):
"""Returns True if the entity may have been updated since stream_pos
@@ -65,22 +67,25 @@ class StreamChangeCache(object):
return False
def get_entities_changed(self, entities, stream_pos):
- """Returns subset of entities that have had new things since the
- given position. If the position is too old it will just return the given list.
+ """
+ Returns subset of entities that have had new things since the given
+ position. Entities unknown to the cache will be returned. If the
+ position is too old it will just return the given list.
"""
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- keys = self._cache.keys()
- i = keys.bisect_right(stream_pos)
+ not_known_entities = set(entities) - set(self._entity_to_key)
- result = set(
- self._cache[k] for k in keys[i:]
- ).intersection(entities)
+ result = (
+ set(self._cache.values()[self._cache.bisect_right(stream_pos) :])
+ .intersection(entities)
+ .union(not_known_entities)
+ )
self.metrics.inc_hits()
else:
- result = entities
+ result = set(entities)
self.metrics.inc_misses()
return result
@@ -90,12 +95,13 @@ class StreamChangeCache(object):
"""
assert type(stream_pos) is int
+ if not self._cache:
+ # If we have no cache, nothing can have changed.
+ return False
+
if stream_pos >= self._earliest_known_stream_pos:
self.metrics.inc_hits()
- keys = self._cache.keys()
- i = keys.bisect_right(stream_pos)
-
- return i < len(keys)
+ return self._cache.bisect_right(stream_pos) < len(self._cache)
else:
self.metrics.inc_misses()
return True
@@ -107,10 +113,7 @@ class StreamChangeCache(object):
assert type(stream_pos) is int
if stream_pos >= self._earliest_known_stream_pos:
- keys = self._cache.keys()
- i = keys.bisect_right(stream_pos)
-
- return [self._cache[k] for k in keys[i:]]
+ return self._cache.values()[self._cache.bisect_right(stream_pos) :]
else:
return None
@@ -129,8 +132,10 @@ class StreamChangeCache(object):
self._entity_to_key[entity] = stream_pos
while len(self._cache) > self._max_size:
- k, r = self._cache.popitem()
- self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+ k, r = self._cache.popitem(0)
+ self._earliest_known_stream_pos = max(
+ k, self._earliest_known_stream_pos,
+ )
self._entity_to_key.pop(r, None)
def get_max_pos_of_last_change(self, entity):
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index fcc341a6b7..dd4c9e6067 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -1,3 +1,5 @@
+from six import itervalues
+
SENTINEL = object()
@@ -49,7 +51,7 @@ class TreeCache(object):
if popped is SENTINEL:
return default
- node_and_keys = zip(nodes, key)
+ node_and_keys = list(zip(nodes, key))
node_and_keys.reverse()
node_and_keys.append((self.root, None))
@@ -76,7 +78,7 @@ def iterate_tree_cache_entry(d):
can contain dicts.
"""
if isinstance(d, dict):
- for value_d in d.itervalues():
+ for value_d in itervalues(d):
for value in iterate_tree_cache_entry(value_d):
yield value
else:
|