diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index da5077b471..dd356bf156 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019, 2020 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,28 +15,16 @@
# limitations under the License.
import logging
-import os
-from typing import Dict
+from sys import intern
+from typing import Callable, Dict, Optional
-import six
-from six.moves import intern
+import attr
+from prometheus_client.core import Gauge
-from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily
+from synapse.config.cache import add_resizable_cache
logger = logging.getLogger(__name__)
-CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
-
-
-def get_cache_factor_for(cache_name):
- env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper()
- factor = os.environ.get(env_var)
- if factor:
- return float(factor)
-
- return CACHE_SIZE_FACTOR
-
-
caches_by_name = {}
collectors_by_name = {} # type: Dict
@@ -44,6 +32,7 @@ cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"])
cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"])
cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"])
cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"])
+cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"])
response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"])
@@ -53,67 +42,82 @@ response_cache_evicted = Gauge(
response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"])
-def register_cache(cache_type, cache_name, cache, collect_callback=None):
- """Register a cache object for metric collection.
+@attr.s
+class CacheMetric(object):
+
+ _cache = attr.ib()
+ _cache_type = attr.ib(type=str)
+ _cache_name = attr.ib(type=str)
+ _collect_callback = attr.ib(type=Optional[Callable])
+
+ hits = attr.ib(default=0)
+ misses = attr.ib(default=0)
+ evicted_size = attr.ib(default=0)
+
+ def inc_hits(self):
+ self.hits += 1
+
+ def inc_misses(self):
+ self.misses += 1
+
+ def inc_evictions(self, size=1):
+ self.evicted_size += size
+
+ def describe(self):
+ return []
+
+ def collect(self):
+ try:
+ if self._cache_type == "response_cache":
+ response_cache_size.labels(self._cache_name).set(len(self._cache))
+ response_cache_hits.labels(self._cache_name).set(self.hits)
+ response_cache_evicted.labels(self._cache_name).set(self.evicted_size)
+ response_cache_total.labels(self._cache_name).set(
+ self.hits + self.misses
+ )
+ else:
+ cache_size.labels(self._cache_name).set(len(self._cache))
+ cache_hits.labels(self._cache_name).set(self.hits)
+ cache_evicted.labels(self._cache_name).set(self.evicted_size)
+ cache_total.labels(self._cache_name).set(self.hits + self.misses)
+ if getattr(self._cache, "max_size", None):
+ cache_max_size.labels(self._cache_name).set(self._cache.max_size)
+ if self._collect_callback:
+ self._collect_callback()
+ except Exception as e:
+ logger.warning("Error calculating metrics for %s: %s", self._cache_name, e)
+ raise
+
+
+def register_cache(
+ cache_type: str,
+ cache_name: str,
+ cache,
+ collect_callback: Optional[Callable] = None,
+ resizable: bool = True,
+ resize_callback: Optional[Callable] = None,
+) -> CacheMetric:
+ """Register a cache object for metric collection and resizing.
Args:
- cache_type (str):
- cache_name (str): name of the cache
- cache (object): cache itself
- collect_callback (callable|None): if not None, a function which is called during
- metric collection to update additional metrics.
+ cache_type
+ cache_name: name of the cache
+ cache: cache itself
+ collect_callback: If given, a function which is called during metric
+ collection to update additional metrics.
+ resizable: Whether this cache supports being resized.
+ resize_callback: A function which can be called to resize the cache.
Returns:
CacheMetric: an object which provides inc_{hits,misses,evictions} methods
"""
+ if resizable:
+ if not resize_callback:
+ resize_callback = getattr(cache, "set_cache_factor")
+ add_resizable_cache(cache_name, resize_callback)
- # Check if the metric is already registered. Unregister it, if so.
- # This usually happens during tests, as at runtime these caches are
- # effectively singletons.
+ metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
metric_name = "cache_%s_%s" % (cache_type, cache_name)
- if metric_name in collectors_by_name.keys():
- REGISTRY.unregister(collectors_by_name[metric_name])
-
- class CacheMetric(object):
-
- hits = 0
- misses = 0
- evicted_size = 0
-
- def inc_hits(self):
- self.hits += 1
-
- def inc_misses(self):
- self.misses += 1
-
- def inc_evictions(self, size=1):
- self.evicted_size += size
-
- def describe(self):
- return []
-
- def collect(self):
- try:
- if cache_type == "response_cache":
- response_cache_size.labels(cache_name).set(len(cache))
- response_cache_hits.labels(cache_name).set(self.hits)
- response_cache_evicted.labels(cache_name).set(self.evicted_size)
- response_cache_total.labels(cache_name).set(self.hits + self.misses)
- else:
- cache_size.labels(cache_name).set(len(cache))
- cache_hits.labels(cache_name).set(self.hits)
- cache_evicted.labels(cache_name).set(self.evicted_size)
- cache_total.labels(cache_name).set(self.hits + self.misses)
- if collect_callback:
- collect_callback()
- except Exception as e:
- logger.warning("Error calculating metrics for %s: %s", cache_name, e)
- raise
-
- yield GaugeMetricFamily("__unused", "")
-
- metric = CacheMetric()
- REGISTRY.register(metric)
caches_by_name[cache_name] = cache
collectors_by_name[metric_name] = metric
return metric
@@ -148,9 +152,6 @@ def intern_string(string):
return None
try:
- if six.PY2:
- string = string.encode("ascii")
-
return intern(string)
except UnicodeEncodeError:
return string
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 2e8f6543e5..cd48262420 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -13,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import functools
import inspect
import logging
@@ -30,7 +31,6 @@ from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, preserve_fn
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred
-from synapse.util.caches import get_cache_factor_for
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
@@ -81,7 +81,6 @@ class CacheEntry(object):
class Cache(object):
__slots__ = (
"cache",
- "max_entries",
"name",
"keylen",
"thread",
@@ -89,7 +88,29 @@ class Cache(object):
"_pending_deferred_cache",
)
- def __init__(self, name, max_entries=1000, keylen=1, tree=False, iterable=False):
+ def __init__(
+ self,
+ name: str,
+ max_entries: int = 1000,
+ keylen: int = 1,
+ tree: bool = False,
+ iterable: bool = False,
+ apply_cache_factor_from_config: bool = True,
+ ):
+ """
+ Args:
+ name: The name of the cache
+ max_entries: Maximum amount of entries that the cache will hold
+ keylen: The length of the tuple used as the cache key
+ tree: Use a TreeCache instead of a dict as the underlying cache type
+ iterable: If True, count each item in the cached object as an entry,
+ rather than each cached object
+ apply_cache_factor_from_config: Whether cache factors specified in the
+ config file affect `max_entries`
+
+ Returns:
+ Cache
+ """
cache_type = TreeCache if tree else dict
self._pending_deferred_cache = cache_type()
@@ -99,6 +120,7 @@ class Cache(object):
cache_type=cache_type,
size_callback=(lambda d: len(d)) if iterable else None,
evicted_callback=self._on_evicted,
+ apply_cache_factor_from_config=apply_cache_factor_from_config,
)
self.name = name
@@ -111,6 +133,10 @@ class Cache(object):
collect_callback=self._metrics_collection_callback,
)
+ @property
+ def max_entries(self):
+ return self.cache.max_size
+
def _on_evicted(self, evicted_count):
self.metrics.inc_evictions(evicted_count)
@@ -370,13 +396,11 @@ class CacheDescriptor(_CacheDescriptorBase):
cache_context=cache_context,
)
- max_entries = int(max_entries * get_cache_factor_for(orig.__name__))
-
self.max_entries = max_entries
self.tree = tree
self.iterable = iterable
- def __get__(self, obj, objtype=None):
+ def __get__(self, obj, owner):
cache = Cache(
name=self.orig.__name__,
max_entries=self.max_entries,
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index cddf1ed515..2726b67b6d 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -18,6 +18,7 @@ from collections import OrderedDict
from six import iteritems, itervalues
+from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
@@ -51,15 +52,16 @@ class ExpiringCache(object):
an item on access. Defaults to False.
iterable (bool): If true, the size is calculated by summing the
sizes of all entries, rather than the number of entries.
-
"""
self._cache_name = cache_name
+ self._original_max_size = max_len
+
+ self._max_size = int(max_len * cache_config.properties.default_factor_size)
+
self._clock = clock
- self._max_len = max_len
self._expiry_ms = expiry_ms
-
self._reset_expiry_on_get = reset_expiry_on_get
self._cache = OrderedDict()
@@ -82,9 +84,11 @@ class ExpiringCache(object):
def __setitem__(self, key, value):
now = self._clock.time_msec()
self._cache[key] = _CacheEntry(now, value)
+ self.evict()
+ def evict(self):
# Evict if there are now too many items
- while self._max_len and len(self) > self._max_len:
+ while self._max_size and len(self) > self._max_size:
_key, value = self._cache.popitem(last=False)
if self.iterable:
self.metrics.inc_evictions(len(value.value))
@@ -170,6 +174,23 @@ class ExpiringCache(object):
else:
return len(self._cache)
+ def set_cache_factor(self, factor: float) -> bool:
+ """
+ Set the cache factor for this individual cache.
+
+ This will trigger a resize if it changes, which may require evicting
+ items from the cache.
+
+ Returns:
+ bool: Whether the cache changed size or not.
+ """
+ new_size = int(self._original_max_size * factor)
+ if new_size != self._max_size:
+ self._max_size = new_size
+ self.evict()
+ return True
+ return False
+
class _CacheEntry(object):
__slots__ = ["time", "value"]
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 1536cb64f3..df4ea5901d 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-
import threading
from functools import wraps
+from typing import Callable, Optional, Type, Union
+from synapse.config import cache as cache_config
from synapse.util.caches.treecache import TreeCache
@@ -52,17 +53,18 @@ class LruCache(object):
def __init__(
self,
- max_size,
- keylen=1,
- cache_type=dict,
- size_callback=None,
- evicted_callback=None,
+ max_size: int,
+ keylen: int = 1,
+ cache_type: Type[Union[dict, TreeCache]] = dict,
+ size_callback: Optional[Callable] = None,
+ evicted_callback: Optional[Callable] = None,
+ apply_cache_factor_from_config: bool = True,
):
"""
Args:
- max_size (int):
+ max_size: The maximum amount of entries the cache can hold
- keylen (int):
+ keylen: The length of the tuple used as the cache key
cache_type (type):
type of underlying cache to be used. Typically one of dict
@@ -73,9 +75,24 @@ class LruCache(object):
evicted_callback (func(int)|None):
if not None, called on eviction with the size of the evicted
entry
+
+ apply_cache_factor_from_config (bool): If true, `max_size` will be
+ multiplied by a cache factor derived from the homeserver config
"""
cache = cache_type()
self.cache = cache # Used for introspection.
+ self.apply_cache_factor_from_config = apply_cache_factor_from_config
+
+ # Save the original max size, and apply the default size factor.
+ self._original_max_size = max_size
+ # We previously didn't apply the cache factor here, and as such some caches were
+ # not affected by the global cache factor. Add an option here to disable applying
+ # the cache factor when a cache is created
+ if apply_cache_factor_from_config:
+ self.max_size = int(max_size * cache_config.properties.default_factor_size)
+ else:
+ self.max_size = int(max_size)
+
list_root = _Node(None, None, None, None)
list_root.next_node = list_root
list_root.prev_node = list_root
@@ -83,7 +100,7 @@ class LruCache(object):
lock = threading.Lock()
def evict():
- while cache_len() > max_size:
+ while cache_len() > self.max_size:
todelete = list_root.prev_node
evicted_len = delete_node(todelete)
cache.pop(todelete.key, None)
@@ -236,6 +253,7 @@ class LruCache(object):
return key in cache
self.sentinel = object()
+ self._on_resize = evict
self.get = cache_get
self.set = cache_set
self.setdefault = cache_set_default
@@ -266,3 +284,23 @@ class LruCache(object):
def __contains__(self, key):
return self.contains(key)
+
+ def set_cache_factor(self, factor: float) -> bool:
+ """
+ Set the cache factor for this individual cache.
+
+ This will trigger a resize if it changes, which may require evicting
+ items from the cache.
+
+ Returns:
+ bool: Whether the cache changed size or not.
+ """
+ if not self.apply_cache_factor_from_config:
+ return False
+
+ new_size = int(self._original_max_size * factor)
+ if new_size != self.max_size:
+ self.max_size = new_size
+ self._on_resize()
+ return True
+ return False
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index b68f9fe0d4..a6c60888e5 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -38,7 +38,7 @@ class ResponseCache(object):
self.timeout_sec = timeout_ms / 1000.0
self._name = name
- self._metrics = register_cache("response_cache", name, self)
+ self._metrics = register_cache("response_cache", name, self, resizable=False)
def size(self):
return len(self.pending_result_cache)
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 235f64049c..2a161bf244 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -14,17 +14,23 @@
# limitations under the License.
import logging
+import math
+from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union
from six import integer_types
from sortedcontainers import SortedDict
+from synapse.types import Collection
from synapse.util import caches
logger = logging.getLogger(__name__)
+# for now, assume all entities in the cache are strings
+EntityType = str
-class StreamChangeCache(object):
+
+class StreamChangeCache:
"""Keeps track of the stream positions of the latest change in a set of entities.
Typically the entity will be a room or user id.
@@ -34,19 +40,52 @@ class StreamChangeCache(object):
old then the cache will simply return all given entities.
"""
- def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None):
- self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR)
- self._entity_to_key = {}
- self._cache = SortedDict()
+ def __init__(
+ self,
+ name: str,
+ current_stream_pos: int,
+ max_size=10000,
+ prefilled_cache: Optional[Mapping[EntityType, int]] = None,
+ ):
+ self._original_max_size = max_size
+ self._max_size = math.floor(max_size)
+ self._entity_to_key = {} # type: Dict[EntityType, int]
+
+ # map from stream id to the a set of entities which changed at that stream id.
+ self._cache = SortedDict() # type: SortedDict[int, Set[EntityType]]
+
+ # the earliest stream_pos for which we can reliably answer
+ # get_all_entities_changed. In other words, one less than the earliest
+ # stream_pos for which we know _cache is valid.
+ #
self._earliest_known_stream_pos = current_stream_pos
self.name = name
- self.metrics = caches.register_cache("cache", self.name, self._cache)
+ self.metrics = caches.register_cache(
+ "cache", self.name, self._cache, resize_callback=self.set_cache_factor
+ )
if prefilled_cache:
for entity, stream_pos in prefilled_cache.items():
self.entity_has_changed(entity, stream_pos)
- def has_entity_changed(self, entity, stream_pos):
+ def set_cache_factor(self, factor: float) -> bool:
+ """
+ Set the cache factor for this individual cache.
+
+ This will trigger a resize if it changes, which may require evicting
+ items from the cache.
+
+ Returns:
+ bool: Whether the cache changed size or not.
+ """
+ new_size = math.floor(self._original_max_size * factor)
+ if new_size != self._max_size:
+ self.max_size = new_size
+ self._evict()
+ return True
+ return False
+
+ def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
"""Returns True if the entity may have been updated since stream_pos
"""
assert type(stream_pos) in integer_types
@@ -67,22 +106,27 @@ class StreamChangeCache(object):
self.metrics.inc_hits()
return False
- def get_entities_changed(self, entities, stream_pos):
+ def get_entities_changed(
+ self, entities: Collection[EntityType], stream_pos: int
+ ) -> Union[Set[EntityType], FrozenSet[EntityType]]:
"""
Returns subset of entities that have had new things since the given
position. Entities unknown to the cache will be returned. If the
position is too old it will just return the given list.
"""
- assert type(stream_pos) is int
-
- if stream_pos >= self._earliest_known_stream_pos:
- changed_entities = {
- self._cache[k]
- for k in self._cache.islice(start=self._cache.bisect_right(stream_pos))
- }
-
- result = changed_entities.intersection(entities)
-
+ changed_entities = self.get_all_entities_changed(stream_pos)
+ if changed_entities is not None:
+ # We now do an intersection, trying to do so in the most efficient
+ # way possible (some of these sets are *large*). First check in the
+ # given iterable is already set that we can reuse, otherwise we
+ # create a set of the *smallest* of the two iterables and call
+ # `intersection(..)` on it (this can be twice as fast as the reverse).
+ if isinstance(entities, (set, frozenset)):
+ result = entities.intersection(changed_entities)
+ elif len(changed_entities) < len(entities):
+ result = set(changed_entities).intersection(entities)
+ else:
+ result = set(entities).intersection(changed_entities)
self.metrics.inc_hits()
else:
result = set(entities)
@@ -90,13 +134,13 @@ class StreamChangeCache(object):
return result
- def has_any_entity_changed(self, stream_pos):
+ def has_any_entity_changed(self, stream_pos: int) -> bool:
"""Returns if any entity has changed
"""
assert type(stream_pos) is int
if not self._cache:
- # If we have no cache, nothing can have changed.
+ # If the cache is empty, nothing can have changed.
return False
if stream_pos >= self._earliest_known_stream_pos:
@@ -106,42 +150,66 @@ class StreamChangeCache(object):
self.metrics.inc_misses()
return True
- def get_all_entities_changed(self, stream_pos):
- """Returns all entites that have had new things since the given
+ def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]:
+ """Returns all entities that have had new things since the given
position. If the position is too old it will return None.
+
+ Returns the entities in the order that they were changed.
"""
assert type(stream_pos) is int
- if stream_pos >= self._earliest_known_stream_pos:
- return [
- self._cache[k]
- for k in self._cache.islice(start=self._cache.bisect_right(stream_pos))
- ]
- else:
+ if stream_pos < self._earliest_known_stream_pos:
return None
- def entity_has_changed(self, entity, stream_pos):
+ changed_entities = [] # type: List[EntityType]
+
+ for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)):
+ changed_entities.extend(self._cache[k])
+ return changed_entities
+
+ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
"""Informs the cache that the entity has been changed at the given
position.
"""
assert type(stream_pos) is int
- if stream_pos > self._earliest_known_stream_pos:
- old_pos = self._entity_to_key.get(entity, None)
- if old_pos is not None:
- stream_pos = max(stream_pos, old_pos)
- self._cache.pop(old_pos, None)
- self._cache[stream_pos] = entity
- self._entity_to_key[entity] = stream_pos
-
- while len(self._cache) > self._max_size:
- k, r = self._cache.popitem(0)
- self._earliest_known_stream_pos = max(
- k, self._earliest_known_stream_pos
- )
- self._entity_to_key.pop(r, None)
-
- def get_max_pos_of_last_change(self, entity):
+ if stream_pos <= self._earliest_known_stream_pos:
+ return
+
+ old_pos = self._entity_to_key.get(entity, None)
+ if old_pos is not None:
+ if old_pos >= stream_pos:
+ # nothing to do
+ return
+ e = self._cache[old_pos]
+ e.remove(entity)
+ if not e:
+ # cache at this point is now empty
+ del self._cache[old_pos]
+
+ e1 = self._cache.get(stream_pos)
+ if e1 is None:
+ e1 = self._cache[stream_pos] = set()
+ e1.add(entity)
+ self._entity_to_key[entity] = stream_pos
+ self._evict()
+
+ # if the cache is too big, remove entries
+ while len(self._cache) > self._max_size:
+ k, r = self._cache.popitem(0)
+ self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+ for entity in r:
+ del self._entity_to_key[entity]
+
+ def _evict(self):
+ while len(self._cache) > self._max_size:
+ k, r = self._cache.popitem(0)
+ self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
+ for entity in r:
+ self._entity_to_key.pop(entity, None)
+
+ def get_max_pos_of_last_change(self, entity: EntityType) -> int:
+
"""Returns an upper bound of the stream id of the last change to an
entity.
"""
diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py
index 99646c7cf0..6437aa907e 100644
--- a/synapse/util/caches/ttlcache.py
+++ b/synapse/util/caches/ttlcache.py
@@ -38,7 +38,7 @@ class TTLCache(object):
self._timer = timer
- self._metrics = register_cache("ttl", cache_name, self)
+ self._metrics = register_cache("ttl", cache_name, self, resizable=False)
def set(self, key, value, ttl):
"""Add/update an entry in the cache
|