diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 7d1e405457..2df4b1004d 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -44,7 +44,11 @@ from twisted.internet.interfaces import IReactorTime
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import get_jemalloc_stats
-from synapse.synapse_rust.lru_cache import LruCacheNode, PerCacheLinkedList
+from synapse.synapse_rust.lru_cache import (
+ LruCacheNode,
+ PerCacheLinkedList,
+ get_global_list,
+)
from synapse.util import Clock, caches
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
from synapse.util.caches.treecache import (
@@ -52,7 +56,6 @@ from synapse.util.caches.treecache import (
iterate_tree_cache_entry,
iterate_tree_cache_items,
)
-from synapse.util.linked_list import ListNode
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -95,22 +98,10 @@ VT = TypeVar("VT")
T = TypeVar("T")
-class _TimedListNode(ListNode[T]):
- """A `ListNode` that tracks last access time."""
-
- __slots__ = ["last_access_ts_secs"]
-
- def update_last_access(self, clock: Clock) -> None:
- self.last_access_ts_secs = int(clock.time())
-
-
# Whether to insert new cache entries to the global list. We only add to it if
# time based eviction is enabled.
USE_GLOBAL_LIST = False
-# A linked list of all cache entries, allowing efficient time based eviction.
-GLOBAL_ROOT = ListNode["_Node"].create_root_node()
-
@wrap_as_background_process("LruCache._expire_old_entries")
async def _expire_old_entries(
@@ -124,9 +115,12 @@ async def _expire_old_entries(
target_cache_memory_usage = autotune_config["target_cache_memory_usage"]
min_cache_ttl = autotune_config["min_cache_ttl"] / 1000
+ # A linked list of all cache entries, allowing efficient time based eviction.
+ global_root = get_global_list()
+
now = int(clock.time())
- node = GLOBAL_ROOT.prev_node
- assert node is not None
+ assert len(global_root) > 0
+ node = global_root[0]
i = 0
@@ -148,10 +142,7 @@ async def _expire_old_entries(
"Unable to read allocated memory, skipping memory-based cache eviction."
)
- while node is not GLOBAL_ROOT:
- # Only the root node isn't a `_TimedListNode`.
- assert isinstance(node, _TimedListNode)
-
+ for node in global_root[1:]:
# if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's
# nothing to do here
if (
@@ -238,125 +229,6 @@ def setup_expire_lru_cache_entries(hs: "HomeServer") -> None:
)
-class _Node(Generic[KT, VT]):
- __slots__ = [
- "_list_node",
- "_global_list_node",
- "_cache",
- "key",
- "value",
- "callbacks",
- "memory",
- ]
-
- def __init__(
- self,
- root: "ListNode[_Node]",
- key: KT,
- value: VT,
- cache: "weakref.ReferenceType[LruCache[KT, VT]]",
- clock: Clock,
- callbacks: Collection[Callable[[], None]] = (),
- prune_unread_entries: bool = True,
- ):
- self._list_node = ListNode.insert_after(self, root)
- self._global_list_node: Optional[_TimedListNode] = None
- if USE_GLOBAL_LIST and prune_unread_entries:
- self._global_list_node = _TimedListNode.insert_after(self, GLOBAL_ROOT)
- self._global_list_node.update_last_access(clock)
-
- # We store a weak reference to the cache object so that this _Node can
- # remove itself from the cache. If the cache is dropped we ensure we
- # remove our entries in the lists.
- self._cache = cache
-
- self.key = key
- self.value = value
-
- # Set of callbacks to run when the node gets deleted. We store as a list
- # rather than a set to keep memory usage down (and since we expect few
- # entries per node, the performance of checking for duplication in a
- # list vs using a set is negligible).
- #
- # Note that we store this as an optional list to keep the memory
- # footprint down. Storing `None` is free as its a singleton, while empty
- # lists are 56 bytes (and empty sets are 216 bytes, if we did the naive
- # thing and used sets).
- self.callbacks: Optional[List[Callable[[], None]]] = None
-
- self.add_callbacks(callbacks)
-
- self.memory = 0
- if caches.TRACK_MEMORY_USAGE:
- self.memory = (
- _get_size_of(key)
- + _get_size_of(value)
- + _get_size_of(self._list_node, recurse=False)
- + _get_size_of(self.callbacks, recurse=False)
- + _get_size_of(self, recurse=False)
- )
- self.memory += _get_size_of(self.memory, recurse=False)
-
- if self._global_list_node:
- self.memory += _get_size_of(self._global_list_node, recurse=False)
- self.memory += _get_size_of(self._global_list_node.last_access_ts_secs)
-
- def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None:
- """Add to stored list of callbacks, removing duplicates."""
-
- if not callbacks:
- return
-
- if not self.callbacks:
- self.callbacks = []
-
- for callback in callbacks:
- if callback not in self.callbacks:
- self.callbacks.append(callback)
-
- def run_and_clear_callbacks(self) -> None:
- """Run all callbacks and clear the stored list of callbacks. Used when
- the node is being deleted.
- """
-
- if not self.callbacks:
- return
-
- for callback in self.callbacks:
- callback()
-
- self.callbacks = None
-
- def drop_from_cache(self) -> None:
- """Drop this node from the cache.
-
- Ensures that the entry gets removed from the cache and that we get
- removed from all lists.
- """
- cache = self._cache()
- if (
- cache is None
- or cache.pop(self.key, _Sentinel.sentinel) is _Sentinel.sentinel
- ):
- # `cache.pop` should call `drop_from_lists()`, unless this Node had
- # already been removed from the cache.
- self.drop_from_lists()
-
- def drop_from_lists(self) -> None:
- """Remove this node from the cache lists."""
- self._list_node.remove_from_list()
-
- if self._global_list_node:
- self._global_list_node.remove_from_list()
-
- def move_to_front(self, clock: Clock, cache_list_root: ListNode) -> None:
- """Moves this node to the front of all the lists its in."""
- self._list_node.move_after(cache_list_root)
- if self._global_list_node:
- self._global_list_node.move_after(GLOBAL_ROOT)
- self._global_list_node.update_last_access(clock)
-
-
class _Sentinel(Enum):
# defining a sentinel in this way allows mypy to correctly handle the
# type of a dictionary lookup.
@@ -418,7 +290,7 @@ class LruCache(Generic[KT, VT]):
else:
real_clock = clock
- cache: Union[Dict[KT, _Node[KT, VT]], TreeCache] = cache_type()
+ cache: "Union[Dict[KT, LruCacheNode[KT, VT]], TreeCache]" = cache_type()
self.cache = cache # Used for introspection.
self.apply_cache_factor_from_config = apply_cache_factor_from_config
@@ -450,12 +322,10 @@ class LruCache(Generic[KT, VT]):
self.metrics = metrics
# We create a single weakref to self here so that we don't need to keep
- # creating more each time we create a `_Node`.
+ # creating more each time we create a `LruCacheNode`.
weak_ref_to_self = weakref.ref(self)
- list_root = ListNode[_Node[KT, VT]].create_root_node()
-
- rust_linked_list = PerCacheLinkedList()
+ rust_linked_list: "PerCacheLinkedList[KT, VT]" = PerCacheLinkedList()
lock = threading.Lock()
@@ -497,13 +367,14 @@ class LruCache(Generic[KT, VT]):
def add_node(
key: KT, value: VT, callbacks: Collection[Callable[[], None]] = ()
) -> None:
- node: _Node[KT, VT] = LruCacheNode(
+ node: "LruCacheNode[KT, VT]" = LruCacheNode(
self,
rust_linked_list,
key,
value,
set(callbacks),
0,
+ int(real_clock.time()),
)
cache[key] = node
@@ -513,10 +384,10 @@ class LruCache(Generic[KT, VT]):
if caches.TRACK_MEMORY_USAGE and metrics:
metrics.inc_memory_usage(node.memory)
- def move_node_to_front(node: _Node[KT, VT]) -> None:
- node.move_to_front()
+ def move_node_to_front(node: "LruCacheNode[KT, VT]") -> None:
+ node.move_to_front(int(real_clock.time()))
- def delete_node(node: _Node[KT, VT]) -> int:
+ def delete_node(node: "LruCacheNode[KT, VT]") -> int:
node.drop_from_lists()
deleted_len = 1
@@ -635,7 +506,7 @@ class LruCache(Generic[KT, VT]):
if update_metrics and metrics:
metrics.inc_hits()
- # We store entries in the `TreeCache` with values of type `_Node`,
+ # We store entries in the `TreeCache` with values of type `LruCacheNode`,
# which we need to unwrap.
return (
(full_key, lru_node.value)
@@ -730,8 +601,8 @@ class LruCache(Generic[KT, VT]):
node.run_and_clear_callbacks()
node.drop_from_lists()
- assert list_root.next_node == list_root
- assert list_root.prev_node == list_root
+ # assert list_root.next_node == list_root
+ # assert list_root.prev_node == list_root
cache.clear()
if size_callback:
|