diff options
-rw-r--r-- | Cargo.lock | 21 | ||||
-rw-r--r-- | rust/src/lru_cache.rs | 16 | ||||
-rw-r--r-- | stubs/synapse/synapse_rust/lru_cache.pyi | 48 | ||||
-rw-r--r-- | synapse/util/caches/lrucache.py | 173 | ||||
-rw-r--r-- | synapse/util/linked_list.py | 150 |
5 files changed, 105 insertions, 303 deletions
diff --git a/Cargo.lock b/Cargo.lock index ea9aa18a5c..30464a41f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -103,6 +103,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "adab1eaa3408fb7f0c777a73e7465fd5656136fc93b670eb6df3c88c2c1344e3" [[package]] +name = "intrusive-collections" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b694dc9f70c3bda874626d2aed13b780f137aab435f4e9814121955cf706122e" +dependencies = [ + "memoffset 0.9.0", +] + +[[package]] name = "itoa" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -152,6 +161,15 @@ dependencies = [ ] [[package]] +name = "memoffset" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" +dependencies = [ + "autocfg", +] + +[[package]] name = "once_cell" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -199,7 +217,7 @@ dependencies = [ "cfg-if", "indoc", "libc", - "memoffset", + "memoffset 0.6.5", "parking_lot", "pyo3-build-config", "pyo3-ffi", @@ -402,6 +420,7 @@ dependencies = [ "anyhow", "blake2", "hex", + "intrusive-collections", "lazy_static", "log", "pyo3", diff --git a/rust/src/lru_cache.rs b/rust/src/lru_cache.rs index 847f5d84be..efdcef8622 100644 --- a/rust/src/lru_cache.rs +++ b/rust/src/lru_cache.rs @@ -51,6 +51,13 @@ struct LruCacheNodeInner { value: Arc<Mutex<PyObject>>, callbacks: Py<PySet>, memory: usize, + last_access_ts_secs: usize, +} + +impl LruCacheNodeInner { + fn update_last_access(&mut self, ts_secs: usize) { + self.last_access_ts_secs = ts_secs; + } } #[pyclass] @@ -66,6 +73,7 @@ impl LruCacheNode { value: PyObject, callbacks: Py<PySet>, memory: usize, + ts_secs: usize, ) -> Self { let node = Arc::new(LruCacheNodeInner { per_cache_link: Default::default(), @@ -76,6 +84,7 @@ impl LruCacheNode { value: Arc::new(Mutex::new(value)), callbacks, memory, + last_access_ts_secs: ts_secs, }); GLOBAL_LIST @@ -159,7 +168,7 @@ impl LruCacheNode { } } - fn move_to_front(&self) { + fn move_to_front(&self, ts_secs: usize) { if self.0.global_list_link.is_linked() { let mut global_list = GLOBAL_LIST.lock().expect("poisoned"); @@ -171,6 +180,8 @@ impl LruCacheNode { curor_mut.remove(); global_list.push_front(self.0.clone()); + + // TODO Update self.0.last_access_ts_secs } if self.0.per_cache_link.is_linked() { @@ -207,6 +218,9 @@ impl LruCacheNode { fn memory(&self) -> usize { self.0.memory } + + #[getter] + fn last_access_ts_secs(&self) -> usize { self.0.last_access_ts_secs } } #[pyfunction] diff --git a/stubs/synapse/synapse_rust/lru_cache.pyi b/stubs/synapse/synapse_rust/lru_cache.pyi new file mode 100644 index 0000000000..8ca07bc4dd --- /dev/null +++ b/stubs/synapse/synapse_rust/lru_cache.pyi @@ -0,0 +1,48 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Callable, Generic, List, Optional, Set, TypeVar, Collection + +from synapse.util.caches.lrucache import LruCache + +# Key and Value type for the cache +KT = TypeVar("KT") +VT = TypeVar("VT") + +class LruCacheNode(Generic[KT, VT]): + key: KT + value: VT + memory: int + last_access_ts_secs: int + + def __init__( + self, + cache: LruCache, + cache_list: "PerCacheLinkedList", + key: object, + value: object, + callbacks: Set[Callable[[], None]], + memory: int, + ts_secs: int, + ) -> None: ... + def add_callbacks(self, new_callbacks: Collection[Callable[[], None]]) -> None: ... + def run_and_clear_callbacks(self) -> None: ... + def drop_from_cache(self) -> None: ... + def drop_from_lists(self) -> None: ... + def move_to_front(self, ts_secs: int) -> None: ... + +class PerCacheLinkedList(Generic[KT, VT]): + def __init__(self) -> None: ... + def get_back(self) -> Optional[LruCacheNode[KT, VT]]: ... + +def get_global_list() -> List[LruCacheNode]: ... diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 7d1e405457..2df4b1004d 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -44,7 +44,11 @@ from twisted.internet.interfaces import IReactorTime from synapse.config import cache as cache_config from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.jemalloc import get_jemalloc_stats -from synapse.synapse_rust.lru_cache import LruCacheNode, PerCacheLinkedList +from synapse.synapse_rust.lru_cache import ( + LruCacheNode, + PerCacheLinkedList, + get_global_list, +) from synapse.util import Clock, caches from synapse.util.caches import CacheMetric, EvictionReason, register_cache from synapse.util.caches.treecache import ( @@ -52,7 +56,6 @@ from synapse.util.caches.treecache import ( iterate_tree_cache_entry, iterate_tree_cache_items, ) -from synapse.util.linked_list import ListNode if TYPE_CHECKING: from synapse.server import HomeServer @@ -95,22 +98,10 @@ VT = TypeVar("VT") T = TypeVar("T") -class _TimedListNode(ListNode[T]): - """A `ListNode` that tracks last access time.""" - - __slots__ = ["last_access_ts_secs"] - - def update_last_access(self, clock: Clock) -> None: - self.last_access_ts_secs = int(clock.time()) - - # Whether to insert new cache entries to the global list. We only add to it if # time based eviction is enabled. USE_GLOBAL_LIST = False -# A linked list of all cache entries, allowing efficient time based eviction. -GLOBAL_ROOT = ListNode["_Node"].create_root_node() - @wrap_as_background_process("LruCache._expire_old_entries") async def _expire_old_entries( @@ -124,9 +115,12 @@ async def _expire_old_entries( target_cache_memory_usage = autotune_config["target_cache_memory_usage"] min_cache_ttl = autotune_config["min_cache_ttl"] / 1000 + # A linked list of all cache entries, allowing efficient time based eviction. + global_root = get_global_list() + now = int(clock.time()) - node = GLOBAL_ROOT.prev_node - assert node is not None + assert len(global_root) > 0 + node = global_root[0] i = 0 @@ -148,10 +142,7 @@ async def _expire_old_entries( "Unable to read allocated memory, skipping memory-based cache eviction." ) - while node is not GLOBAL_ROOT: - # Only the root node isn't a `_TimedListNode`. - assert isinstance(node, _TimedListNode) - + for node in global_root[1:]: # if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's # nothing to do here if ( @@ -238,125 +229,6 @@ def setup_expire_lru_cache_entries(hs: "HomeServer") -> None: ) -class _Node(Generic[KT, VT]): - __slots__ = [ - "_list_node", - "_global_list_node", - "_cache", - "key", - "value", - "callbacks", - "memory", - ] - - def __init__( - self, - root: "ListNode[_Node]", - key: KT, - value: VT, - cache: "weakref.ReferenceType[LruCache[KT, VT]]", - clock: Clock, - callbacks: Collection[Callable[[], None]] = (), - prune_unread_entries: bool = True, - ): - self._list_node = ListNode.insert_after(self, root) - self._global_list_node: Optional[_TimedListNode] = None - if USE_GLOBAL_LIST and prune_unread_entries: - self._global_list_node = _TimedListNode.insert_after(self, GLOBAL_ROOT) - self._global_list_node.update_last_access(clock) - - # We store a weak reference to the cache object so that this _Node can - # remove itself from the cache. If the cache is dropped we ensure we - # remove our entries in the lists. - self._cache = cache - - self.key = key - self.value = value - - # Set of callbacks to run when the node gets deleted. We store as a list - # rather than a set to keep memory usage down (and since we expect few - # entries per node, the performance of checking for duplication in a - # list vs using a set is negligible). - # - # Note that we store this as an optional list to keep the memory - # footprint down. Storing `None` is free as its a singleton, while empty - # lists are 56 bytes (and empty sets are 216 bytes, if we did the naive - # thing and used sets). - self.callbacks: Optional[List[Callable[[], None]]] = None - - self.add_callbacks(callbacks) - - self.memory = 0 - if caches.TRACK_MEMORY_USAGE: - self.memory = ( - _get_size_of(key) - + _get_size_of(value) - + _get_size_of(self._list_node, recurse=False) - + _get_size_of(self.callbacks, recurse=False) - + _get_size_of(self, recurse=False) - ) - self.memory += _get_size_of(self.memory, recurse=False) - - if self._global_list_node: - self.memory += _get_size_of(self._global_list_node, recurse=False) - self.memory += _get_size_of(self._global_list_node.last_access_ts_secs) - - def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None: - """Add to stored list of callbacks, removing duplicates.""" - - if not callbacks: - return - - if not self.callbacks: - self.callbacks = [] - - for callback in callbacks: - if callback not in self.callbacks: - self.callbacks.append(callback) - - def run_and_clear_callbacks(self) -> None: - """Run all callbacks and clear the stored list of callbacks. Used when - the node is being deleted. - """ - - if not self.callbacks: - return - - for callback in self.callbacks: - callback() - - self.callbacks = None - - def drop_from_cache(self) -> None: - """Drop this node from the cache. - - Ensures that the entry gets removed from the cache and that we get - removed from all lists. - """ - cache = self._cache() - if ( - cache is None - or cache.pop(self.key, _Sentinel.sentinel) is _Sentinel.sentinel - ): - # `cache.pop` should call `drop_from_lists()`, unless this Node had - # already been removed from the cache. - self.drop_from_lists() - - def drop_from_lists(self) -> None: - """Remove this node from the cache lists.""" - self._list_node.remove_from_list() - - if self._global_list_node: - self._global_list_node.remove_from_list() - - def move_to_front(self, clock: Clock, cache_list_root: ListNode) -> None: - """Moves this node to the front of all the lists its in.""" - self._list_node.move_after(cache_list_root) - if self._global_list_node: - self._global_list_node.move_after(GLOBAL_ROOT) - self._global_list_node.update_last_access(clock) - - class _Sentinel(Enum): # defining a sentinel in this way allows mypy to correctly handle the # type of a dictionary lookup. @@ -418,7 +290,7 @@ class LruCache(Generic[KT, VT]): else: real_clock = clock - cache: Union[Dict[KT, _Node[KT, VT]], TreeCache] = cache_type() + cache: "Union[Dict[KT, LruCacheNode[KT, VT]], TreeCache]" = cache_type() self.cache = cache # Used for introspection. self.apply_cache_factor_from_config = apply_cache_factor_from_config @@ -450,12 +322,10 @@ class LruCache(Generic[KT, VT]): self.metrics = metrics # We create a single weakref to self here so that we don't need to keep - # creating more each time we create a `_Node`. + # creating more each time we create a `LruCacheNode`. weak_ref_to_self = weakref.ref(self) - list_root = ListNode[_Node[KT, VT]].create_root_node() - - rust_linked_list = PerCacheLinkedList() + rust_linked_list: "PerCacheLinkedList[KT, VT]" = PerCacheLinkedList() lock = threading.Lock() @@ -497,13 +367,14 @@ class LruCache(Generic[KT, VT]): def add_node( key: KT, value: VT, callbacks: Collection[Callable[[], None]] = () ) -> None: - node: _Node[KT, VT] = LruCacheNode( + node: "LruCacheNode[KT, VT]" = LruCacheNode( self, rust_linked_list, key, value, set(callbacks), 0, + int(real_clock.time()), ) cache[key] = node @@ -513,10 +384,10 @@ class LruCache(Generic[KT, VT]): if caches.TRACK_MEMORY_USAGE and metrics: metrics.inc_memory_usage(node.memory) - def move_node_to_front(node: _Node[KT, VT]) -> None: - node.move_to_front() + def move_node_to_front(node: "LruCacheNode[KT, VT]") -> None: + node.move_to_front(int(real_clock.time())) - def delete_node(node: _Node[KT, VT]) -> int: + def delete_node(node: "LruCacheNode[KT, VT]") -> int: node.drop_from_lists() deleted_len = 1 @@ -635,7 +506,7 @@ class LruCache(Generic[KT, VT]): if update_metrics and metrics: metrics.inc_hits() - # We store entries in the `TreeCache` with values of type `_Node`, + # We store entries in the `TreeCache` with values of type `LruCacheNode`, # which we need to unwrap. return ( (full_key, lru_node.value) @@ -730,8 +601,8 @@ class LruCache(Generic[KT, VT]): node.run_and_clear_callbacks() node.drop_from_lists() - assert list_root.next_node == list_root - assert list_root.prev_node == list_root + # assert list_root.next_node == list_root + # assert list_root.prev_node == list_root cache.clear() if size_callback: diff --git a/synapse/util/linked_list.py b/synapse/util/linked_list.py deleted file mode 100644 index 8efbf061aa..0000000000 --- a/synapse/util/linked_list.py +++ /dev/null @@ -1,150 +0,0 @@ -# Copyright 2021 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""A circular doubly linked list implementation. -""" - -import threading -from typing import Generic, Optional, Type, TypeVar - -P = TypeVar("P") -LN = TypeVar("LN", bound="ListNode") - - -class ListNode(Generic[P]): - """A node in a circular doubly linked list, with an (optional) reference to - a cache entry. - - The reference should only be `None` for the root node or if the node has - been removed from the list. - """ - - # A lock to protect mutating the list prev/next pointers. - _LOCK = threading.Lock() - - # We don't use attrs here as in py3.6 you can't have `attr.s(slots=True)` - # and inherit from `Generic` for some reason - __slots__ = [ - "cache_entry", - "prev_node", - "next_node", - ] - - def __init__(self, cache_entry: Optional[P] = None) -> None: - self.cache_entry = cache_entry - self.prev_node: Optional[ListNode[P]] = None - self.next_node: Optional[ListNode[P]] = None - - @classmethod - def create_root_node(cls: Type["ListNode[P]"]) -> "ListNode[P]": - """Create a new linked list by creating a "root" node, which is a node - that has prev_node/next_node pointing to itself and no associated cache - entry. - """ - root = cls() - root.prev_node = root - root.next_node = root - return root - - @classmethod - def insert_after( - cls: Type[LN], - cache_entry: P, - node: "ListNode[P]", - ) -> LN: - """Create a new list node that is placed after the given node. - - Args: - cache_entry: The associated cache entry. - node: The existing node in the list to insert the new entry after. - """ - new_node = cls(cache_entry) - with cls._LOCK: - new_node._refs_insert_after(node) - return new_node - - def remove_from_list(self) -> None: - """Remove this node from the list.""" - with self._LOCK: - self._refs_remove_node_from_list() - - # We drop the reference to the cache entry to break the reference cycle - # between the list node and cache entry, allowing the two to be dropped - # immediately rather than at the next GC. - self.cache_entry = None - - def move_after(self, node: "ListNode[P]") -> None: - """Move this node from its current location in the list to after the - given node. - """ - with self._LOCK: - # We assert that both this node and the target node is still "alive". - assert self.prev_node - assert self.next_node - assert node.prev_node - assert node.next_node - - assert self is not node - - # Remove self from the list - self._refs_remove_node_from_list() - - # Insert self back into the list, after target node - self._refs_insert_after(node) - - def _refs_remove_node_from_list(self) -> None: - """Internal method to *just* remove the node from the list, without - e.g. clearing out the cache entry. - """ - if self.prev_node is None or self.next_node is None: - # We've already been removed from the list. - return - - prev_node = self.prev_node - next_node = self.next_node - - prev_node.next_node = next_node - next_node.prev_node = prev_node - - # We set these to None so that we don't get circular references, - # allowing us to be dropped without having to go via the GC. - self.prev_node = None - self.next_node = None - - def _refs_insert_after(self, node: "ListNode[P]") -> None: - """Internal method to insert the node after the given node.""" - - # This method should only be called when we're not already in the list. - assert self.prev_node is None - assert self.next_node is None - - # We expect the given node to be in the list and thus have valid - # prev/next refs. - assert node.next_node - assert node.prev_node - - prev_node = node - next_node = node.next_node - - self.prev_node = prev_node - self.next_node = next_node - - prev_node.next_node = self - next_node.prev_node = self - - def get_cache_entry(self) -> Optional[P]: - """Get the cache entry, returns None if this is the root node (i.e. - cache_entry is None) or if the entry has been dropped. - """ - return self.cache_entry |