summary refs log tree commit diff
path: root/synapse/util/caches/lrucache.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/caches/lrucache.py')
-rw-r--r--synapse/util/caches/lrucache.py173
1 files changed, 22 insertions, 151 deletions
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py

index 7d1e405457..2df4b1004d 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py
@@ -44,7 +44,11 @@ from twisted.internet.interfaces import IReactorTime from synapse.config import cache as cache_config from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.jemalloc import get_jemalloc_stats -from synapse.synapse_rust.lru_cache import LruCacheNode, PerCacheLinkedList +from synapse.synapse_rust.lru_cache import ( + LruCacheNode, + PerCacheLinkedList, + get_global_list, +) from synapse.util import Clock, caches from synapse.util.caches import CacheMetric, EvictionReason, register_cache from synapse.util.caches.treecache import ( @@ -52,7 +56,6 @@ from synapse.util.caches.treecache import ( iterate_tree_cache_entry, iterate_tree_cache_items, ) -from synapse.util.linked_list import ListNode if TYPE_CHECKING: from synapse.server import HomeServer @@ -95,22 +98,10 @@ VT = TypeVar("VT") T = TypeVar("T") -class _TimedListNode(ListNode[T]): - """A `ListNode` that tracks last access time.""" - - __slots__ = ["last_access_ts_secs"] - - def update_last_access(self, clock: Clock) -> None: - self.last_access_ts_secs = int(clock.time()) - - # Whether to insert new cache entries to the global list. We only add to it if # time based eviction is enabled. USE_GLOBAL_LIST = False -# A linked list of all cache entries, allowing efficient time based eviction. -GLOBAL_ROOT = ListNode["_Node"].create_root_node() - @wrap_as_background_process("LruCache._expire_old_entries") async def _expire_old_entries( @@ -124,9 +115,12 @@ async def _expire_old_entries( target_cache_memory_usage = autotune_config["target_cache_memory_usage"] min_cache_ttl = autotune_config["min_cache_ttl"] / 1000 + # A linked list of all cache entries, allowing efficient time based eviction. + global_root = get_global_list() + now = int(clock.time()) - node = GLOBAL_ROOT.prev_node - assert node is not None + assert len(global_root) > 0 + node = global_root[0] i = 0 @@ -148,10 +142,7 @@ async def _expire_old_entries( "Unable to read allocated memory, skipping memory-based cache eviction." ) - while node is not GLOBAL_ROOT: - # Only the root node isn't a `_TimedListNode`. - assert isinstance(node, _TimedListNode) - + for node in global_root[1:]: # if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's # nothing to do here if ( @@ -238,125 +229,6 @@ def setup_expire_lru_cache_entries(hs: "HomeServer") -> None: ) -class _Node(Generic[KT, VT]): - __slots__ = [ - "_list_node", - "_global_list_node", - "_cache", - "key", - "value", - "callbacks", - "memory", - ] - - def __init__( - self, - root: "ListNode[_Node]", - key: KT, - value: VT, - cache: "weakref.ReferenceType[LruCache[KT, VT]]", - clock: Clock, - callbacks: Collection[Callable[[], None]] = (), - prune_unread_entries: bool = True, - ): - self._list_node = ListNode.insert_after(self, root) - self._global_list_node: Optional[_TimedListNode] = None - if USE_GLOBAL_LIST and prune_unread_entries: - self._global_list_node = _TimedListNode.insert_after(self, GLOBAL_ROOT) - self._global_list_node.update_last_access(clock) - - # We store a weak reference to the cache object so that this _Node can - # remove itself from the cache. If the cache is dropped we ensure we - # remove our entries in the lists. - self._cache = cache - - self.key = key - self.value = value - - # Set of callbacks to run when the node gets deleted. We store as a list - # rather than a set to keep memory usage down (and since we expect few - # entries per node, the performance of checking for duplication in a - # list vs using a set is negligible). - # - # Note that we store this as an optional list to keep the memory - # footprint down. Storing `None` is free as its a singleton, while empty - # lists are 56 bytes (and empty sets are 216 bytes, if we did the naive - # thing and used sets). - self.callbacks: Optional[List[Callable[[], None]]] = None - - self.add_callbacks(callbacks) - - self.memory = 0 - if caches.TRACK_MEMORY_USAGE: - self.memory = ( - _get_size_of(key) - + _get_size_of(value) - + _get_size_of(self._list_node, recurse=False) - + _get_size_of(self.callbacks, recurse=False) - + _get_size_of(self, recurse=False) - ) - self.memory += _get_size_of(self.memory, recurse=False) - - if self._global_list_node: - self.memory += _get_size_of(self._global_list_node, recurse=False) - self.memory += _get_size_of(self._global_list_node.last_access_ts_secs) - - def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None: - """Add to stored list of callbacks, removing duplicates.""" - - if not callbacks: - return - - if not self.callbacks: - self.callbacks = [] - - for callback in callbacks: - if callback not in self.callbacks: - self.callbacks.append(callback) - - def run_and_clear_callbacks(self) -> None: - """Run all callbacks and clear the stored list of callbacks. Used when - the node is being deleted. - """ - - if not self.callbacks: - return - - for callback in self.callbacks: - callback() - - self.callbacks = None - - def drop_from_cache(self) -> None: - """Drop this node from the cache. - - Ensures that the entry gets removed from the cache and that we get - removed from all lists. - """ - cache = self._cache() - if ( - cache is None - or cache.pop(self.key, _Sentinel.sentinel) is _Sentinel.sentinel - ): - # `cache.pop` should call `drop_from_lists()`, unless this Node had - # already been removed from the cache. - self.drop_from_lists() - - def drop_from_lists(self) -> None: - """Remove this node from the cache lists.""" - self._list_node.remove_from_list() - - if self._global_list_node: - self._global_list_node.remove_from_list() - - def move_to_front(self, clock: Clock, cache_list_root: ListNode) -> None: - """Moves this node to the front of all the lists its in.""" - self._list_node.move_after(cache_list_root) - if self._global_list_node: - self._global_list_node.move_after(GLOBAL_ROOT) - self._global_list_node.update_last_access(clock) - - class _Sentinel(Enum): # defining a sentinel in this way allows mypy to correctly handle the # type of a dictionary lookup. @@ -418,7 +290,7 @@ class LruCache(Generic[KT, VT]): else: real_clock = clock - cache: Union[Dict[KT, _Node[KT, VT]], TreeCache] = cache_type() + cache: "Union[Dict[KT, LruCacheNode[KT, VT]], TreeCache]" = cache_type() self.cache = cache # Used for introspection. self.apply_cache_factor_from_config = apply_cache_factor_from_config @@ -450,12 +322,10 @@ class LruCache(Generic[KT, VT]): self.metrics = metrics # We create a single weakref to self here so that we don't need to keep - # creating more each time we create a `_Node`. + # creating more each time we create a `LruCacheNode`. weak_ref_to_self = weakref.ref(self) - list_root = ListNode[_Node[KT, VT]].create_root_node() - - rust_linked_list = PerCacheLinkedList() + rust_linked_list: "PerCacheLinkedList[KT, VT]" = PerCacheLinkedList() lock = threading.Lock() @@ -497,13 +367,14 @@ class LruCache(Generic[KT, VT]): def add_node( key: KT, value: VT, callbacks: Collection[Callable[[], None]] = () ) -> None: - node: _Node[KT, VT] = LruCacheNode( + node: "LruCacheNode[KT, VT]" = LruCacheNode( self, rust_linked_list, key, value, set(callbacks), 0, + int(real_clock.time()), ) cache[key] = node @@ -513,10 +384,10 @@ class LruCache(Generic[KT, VT]): if caches.TRACK_MEMORY_USAGE and metrics: metrics.inc_memory_usage(node.memory) - def move_node_to_front(node: _Node[KT, VT]) -> None: - node.move_to_front() + def move_node_to_front(node: "LruCacheNode[KT, VT]") -> None: + node.move_to_front(int(real_clock.time())) - def delete_node(node: _Node[KT, VT]) -> int: + def delete_node(node: "LruCacheNode[KT, VT]") -> int: node.drop_from_lists() deleted_len = 1 @@ -635,7 +506,7 @@ class LruCache(Generic[KT, VT]): if update_metrics and metrics: metrics.inc_hits() - # We store entries in the `TreeCache` with values of type `_Node`, + # We store entries in the `TreeCache` with values of type `LruCacheNode`, # which we need to unwrap. return ( (full_key, lru_node.value) @@ -730,8 +601,8 @@ class LruCache(Generic[KT, VT]): node.run_and_clear_callbacks() node.drop_from_lists() - assert list_root.next_node == list_root - assert list_root.prev_node == list_root + # assert list_root.next_node == list_root + # assert list_root.prev_node == list_root cache.clear() if size_callback: