summary refs log tree commit diff
path: root/synapse/util/caches
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/util/caches/dictionary_cache.py13
-rw-r--r--synapse/util/caches/expiringcache.py3
-rw-r--r--synapse/util/caches/lrucache.py3
-rw-r--r--synapse/util/caches/response_cache.py33
-rw-r--r--synapse/util/caches/stream_change_cache.py23
5 files changed, 52 insertions, 23 deletions
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py

index 1e6696332f..14bd3ba3b0 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py
@@ -21,10 +21,19 @@ import enum import logging import threading -from typing import Dict, Generic, Iterable, Optional, Set, Tuple, TypeVar, Union +from typing import ( + Dict, + Generic, + Iterable, + Literal, + Optional, + Set, + Tuple, + TypeVar, + Union, +) import attr -from typing_extensions import Literal from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 8017c031ee..3198fdd2ed 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py
@@ -21,10 +21,9 @@ import logging from collections import OrderedDict -from typing import Any, Generic, Iterable, Optional, TypeVar, Union, overload +from typing import Any, Generic, Iterable, Literal, Optional, TypeVar, Union, overload import attr -from typing_extensions import Literal from twisted.internet import defer diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 481a1a621e..2e5efa3a52 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py
@@ -34,6 +34,7 @@ from typing import ( Generic, Iterable, List, + Literal, Optional, Set, Tuple, @@ -44,8 +45,6 @@ from typing import ( overload, ) -from typing_extensions import Literal - from twisted.internet import reactor from twisted.internet.interfaces import IReactorTime diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 96b7ca83dc..54b99134b9 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py
@@ -101,7 +101,13 @@ class ResponseCache(Generic[KV]): used rather than trying to compute a new response. """ - def __init__(self, clock: Clock, name: str, timeout_ms: float = 0): + def __init__( + self, + clock: Clock, + name: str, + timeout_ms: float = 0, + enable_logging: bool = True, + ): self._result_cache: Dict[KV, ResponseCacheEntry] = {} self.clock = clock @@ -109,6 +115,7 @@ class ResponseCache(Generic[KV]): self._name = name self._metrics = register_cache("response_cache", name, self, resizable=False) + self._enable_logging = enable_logging def size(self) -> int: return len(self._result_cache) @@ -246,9 +253,12 @@ class ResponseCache(Generic[KV]): """ entry = self._get(key) if not entry: - logger.debug( - "[%s]: no cached result for [%s], calculating new one", self._name, key - ) + if self._enable_logging: + logger.debug( + "[%s]: no cached result for [%s], calculating new one", + self._name, + key, + ) context = ResponseCacheContext(cache_key=key) if cache_context: kwargs["cache_context"] = context @@ -269,12 +279,15 @@ class ResponseCache(Generic[KV]): return await make_deferred_yieldable(entry.result.observe()) result = entry.result.observe() - if result.called: - logger.info("[%s]: using completed cached result for [%s]", self._name, key) - else: - logger.info( - "[%s]: using incomplete cached result for [%s]", self._name, key - ) + if self._enable_logging: + if result.called: + logger.info( + "[%s]: using completed cached result for [%s]", self._name, key + ) + else: + logger.info( + "[%s]: using incomplete cached result for [%s]", self._name, key + ) span_context = entry.opentracing_span_context with start_active_span_follows_from( diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 16fcb00206..5ac8643eef 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py
@@ -142,9 +142,9 @@ class StreamChangeCache: """ assert isinstance(stream_pos, int) - # _cache is not valid at or before the earliest known stream position, so + # _cache is not valid before the earliest known stream position, so # return that the entity has changed. - if stream_pos <= self._earliest_known_stream_pos: + if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() return True @@ -186,7 +186,7 @@ class StreamChangeCache: This will be all entities if the given stream position is at or earlier than the earliest known stream position. """ - if not self._cache or stream_pos <= self._earliest_known_stream_pos: + if not self._cache or stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() return set(entities) @@ -238,9 +238,9 @@ class StreamChangeCache: """ assert isinstance(stream_pos, int) - # _cache is not valid at or before the earliest known stream position, so + # _cache is not valid before the earliest known stream position, so # return that an entity has changed. - if stream_pos <= self._earliest_known_stream_pos: + if stream_pos < self._earliest_known_stream_pos: self.metrics.inc_misses() return True @@ -270,9 +270,9 @@ class StreamChangeCache: """ assert isinstance(stream_pos, int) - # _cache is not valid at or before the earliest known stream position, so + # _cache is not valid before the earliest known stream position, so # return None to mark that it is unknown if an entity has changed. - if stream_pos <= self._earliest_known_stream_pos: + if stream_pos < self._earliest_known_stream_pos: return AllEntitiesChangedResult(None) changed_entities: List[EntityType] = [] @@ -314,6 +314,15 @@ class StreamChangeCache: self._entity_to_key[entity] = stream_pos self._evict() + def all_entities_changed(self, stream_pos: int) -> None: + """ + Mark all entities as changed. This is useful when the cache is invalidated and + there may be some potential change for all of the entities. + """ + self._cache.clear() + self._entity_to_key.clear() + self._earliest_known_stream_pos = stream_pos + def _evict(self) -> None: """ Ensure the cache has not exceeded the maximum size.