summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/caches/__init__.py31
-rw-r--r--synapse/util/caches/deferred_cache.py2
-rw-r--r--synapse/util/caches/descriptors.py5
-rw-r--r--synapse/util/caches/expiringcache.py10
-rw-r--r--synapse/util/caches/lrucache.py20
-rw-r--r--synapse/util/iterutils.py19
6 files changed, 67 insertions, 20 deletions
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py

index cab1bf0c15..df4d61e4b6 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py
@@ -12,8 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections import logging +import typing +from enum import Enum, auto from sys import intern from typing import Callable, Dict, Optional, Sized @@ -34,7 +36,7 @@ collectors_by_name: Dict[str, "CacheMetric"] = {} cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"]) cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) -cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"]) +cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name", "reason"]) cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"]) cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"]) cache_memory_usage = Gauge( @@ -46,11 +48,16 @@ cache_memory_usage = Gauge( response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"]) response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"]) response_cache_evicted = Gauge( - "synapse_util_caches_response_cache:evicted_size", "", ["name"] + "synapse_util_caches_response_cache:evicted_size", "", ["name", "reason"] ) response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"]) +class EvictionReason(Enum): + size = auto() + time = auto() + + @attr.s(slots=True) class CacheMetric: @@ -61,7 +68,9 @@ class CacheMetric: hits = attr.ib(default=0) misses = attr.ib(default=0) - evicted_size = attr.ib(default=0) + eviction_size_by_reason: typing.Counter[EvictionReason] = attr.ib( + factory=collections.Counter + ) memory_usage = attr.ib(default=None) def inc_hits(self) -> None: @@ -70,8 +79,8 @@ class CacheMetric: def inc_misses(self) -> None: self.misses += 1 - def inc_evictions(self, size: int = 1) -> None: - self.evicted_size += size + def inc_evictions(self, reason: EvictionReason, size: int = 1) -> None: + self.eviction_size_by_reason[reason] += size def inc_memory_usage(self, memory: int) -> None: if self.memory_usage is None: @@ -94,14 +103,20 @@ class CacheMetric: if self._cache_type == "response_cache": response_cache_size.labels(self._cache_name).set(len(self._cache)) response_cache_hits.labels(self._cache_name).set(self.hits) - response_cache_evicted.labels(self._cache_name).set(self.evicted_size) + for reason in EvictionReason: + response_cache_evicted.labels(self._cache_name, reason.name).set( + self.eviction_size_by_reason[reason] + ) response_cache_total.labels(self._cache_name).set( self.hits + self.misses ) else: cache_size.labels(self._cache_name).set(len(self._cache)) cache_hits.labels(self._cache_name).set(self.hits) - cache_evicted.labels(self._cache_name).set(self.evicted_size) + for reason in EvictionReason: + cache_evicted.labels(self._cache_name, reason.name).set( + self.eviction_size_by_reason[reason] + ) cache_total.labels(self._cache_name).set(self.hits + self.misses) if getattr(self._cache, "max_size", None): cache_max_size.labels(self._cache_name).set(self._cache.max_size) diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py
index f05590da0d..6262efe072 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py
@@ -73,6 +73,7 @@ class DeferredCache(Generic[KT, VT]): tree: bool = False, iterable: bool = False, apply_cache_factor_from_config: bool = True, + prune_unread_entries: bool = True, ): """ Args: @@ -105,6 +106,7 @@ class DeferredCache(Generic[KT, VT]): size_callback=(lambda d: len(d) or 1) if iterable else None, metrics_collection_callback=metrics_cb, apply_cache_factor_from_config=apply_cache_factor_from_config, + prune_unread_entries=prune_unread_entries, ) self.thread: Optional[threading.Thread] = None diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 1ca31e41ac..b9dcca17f1 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py
@@ -258,6 +258,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): tree=False, cache_context=False, iterable=False, + prune_unread_entries: bool = True, ): super().__init__(orig, num_args=num_args, cache_context=cache_context) @@ -269,6 +270,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): self.max_entries = max_entries self.tree = tree self.iterable = iterable + self.prune_unread_entries = prune_unread_entries def __get__(self, obj, owner): cache: DeferredCache[CacheKey, Any] = DeferredCache( @@ -276,6 +278,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): max_entries=self.max_entries, tree=self.tree, iterable=self.iterable, + prune_unread_entries=self.prune_unread_entries, ) get_cache_key = self.cache_key_builder @@ -507,6 +510,7 @@ def cached( tree: bool = False, cache_context: bool = False, iterable: bool = False, + prune_unread_entries: bool = True, ) -> Callable[[F], _CachedFunction[F]]: func = lambda orig: DeferredCacheDescriptor( orig, @@ -515,6 +519,7 @@ def cached( tree=tree, cache_context=cache_context, iterable=iterable, + prune_unread_entries=prune_unread_entries, ) return cast(Callable[[F], _CachedFunction[F]], func) diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index bde16b8577..c3f72aa06d 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py
@@ -22,7 +22,7 @@ from typing_extensions import Literal from synapse.config import cache as cache_config from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util import Clock -from synapse.util.caches import register_cache +from synapse.util.caches import EvictionReason, register_cache logger = logging.getLogger(__name__) @@ -98,9 +98,9 @@ class ExpiringCache(Generic[KT, VT]): while self._max_size and len(self) > self._max_size: _key, value = self._cache.popitem(last=False) if self.iterable: - self.metrics.inc_evictions(len(value.value)) + self.metrics.inc_evictions(EvictionReason.size, len(value.value)) else: - self.metrics.inc_evictions() + self.metrics.inc_evictions(EvictionReason.size) def __getitem__(self, key: KT) -> VT: try: @@ -175,9 +175,9 @@ class ExpiringCache(Generic[KT, VT]): for k in keys_to_delete: value = self._cache.pop(k) if self.iterable: - self.metrics.inc_evictions(len(value.value)) + self.metrics.inc_evictions(EvictionReason.time, len(value.value)) else: - self.metrics.inc_evictions() + self.metrics.inc_evictions(EvictionReason.time) logger.debug( "[%s] _prune_cache before: %d, after len: %d", diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 39dce9dd41..4ff62b403f 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py
@@ -40,7 +40,7 @@ from twisted.internet.interfaces import IReactorTime from synapse.config import cache as cache_config from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.util import Clock, caches -from synapse.util.caches import CacheMetric, register_cache +from synapse.util.caches import CacheMetric, EvictionReason, register_cache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry from synapse.util.linked_list import ListNode @@ -202,10 +202,11 @@ class _Node: cache: "weakref.ReferenceType[LruCache]", clock: Clock, callbacks: Collection[Callable[[], None]] = (), + prune_unread_entries: bool = True, ): self._list_node = ListNode.insert_after(self, root) - self._global_list_node = None - if USE_GLOBAL_LIST: + self._global_list_node: Optional[_TimedListNode] = None + if USE_GLOBAL_LIST and prune_unread_entries: self._global_list_node = _TimedListNode.insert_after(self, GLOBAL_ROOT) self._global_list_node.update_last_access(clock) @@ -314,6 +315,7 @@ class LruCache(Generic[KT, VT]): metrics_collection_callback: Optional[Callable[[], None]] = None, apply_cache_factor_from_config: bool = True, clock: Optional[Clock] = None, + prune_unread_entries: bool = True, ): """ Args: @@ -403,7 +405,7 @@ class LruCache(Generic[KT, VT]): evicted_len = delete_node(node) cache.pop(node.key, None) if metrics: - metrics.inc_evictions(evicted_len) + metrics.inc_evictions(EvictionReason.size, evicted_len) def synchronized(f: FT) -> FT: @wraps(f) @@ -427,7 +429,15 @@ class LruCache(Generic[KT, VT]): self.len = synchronized(cache_len) def add_node(key, value, callbacks: Collection[Callable[[], None]] = ()): - node = _Node(list_root, key, value, weak_ref_to_self, real_clock, callbacks) + node = _Node( + list_root, + key, + value, + weak_ref_to_self, + real_clock, + callbacks, + prune_unread_entries, + ) cache[key] = node if size_callback: diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index 8ac3eab2f5..4938ddf703 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py
@@ -21,13 +21,28 @@ from typing import ( Iterable, Iterator, Mapping, - Sequence, Set, + Sized, Tuple, TypeVar, ) +from typing_extensions import Protocol + T = TypeVar("T") +S = TypeVar("S", bound="_SelfSlice") + + +class _SelfSlice(Sized, Protocol): + """A helper protocol that matches types where taking a slice results in the + same type being returned. + + This is more specific than `Sequence`, which allows another `Sequence` to be + returned. + """ + + def __getitem__(self: S, i: slice) -> S: + ... def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: @@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: return iter(lambda: tuple(islice(sourceiter, size)), ()) -def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]: +def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]: """Split the given sequence into chunks of the given size The last chunk may be shorter than the given size.