diff options
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/caches/lrucache.py | 79 |
1 files changed, 69 insertions, 10 deletions
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 45ff0de638..a3b60578e3 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +import math import threading import weakref from enum import Enum @@ -40,6 +41,7 @@ from twisted.internet.interfaces import IReactorTime from synapse.config import cache as cache_config from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.metrics.jemalloc import get_jemalloc_stats from synapse.util import Clock, caches from synapse.util.caches import CacheMetric, EvictionReason, register_cache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry @@ -106,10 +108,16 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node() @wrap_as_background_process("LruCache._expire_old_entries") -async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: +async def _expire_old_entries( + clock: Clock, expiry_seconds: int, autotune_config: Optional[dict] +) -> None: """Walks the global cache list to find cache entries that haven't been - accessed in the given number of seconds. + accessed in the given number of seconds, or if a given memory threshold has been breached. """ + if autotune_config: + max_cache_memory_usage = autotune_config["max_cache_memory_usage"] + target_cache_memory_usage = autotune_config["target_cache_memory_usage"] + min_cache_ttl = autotune_config["min_cache_ttl"] / 1000 now = int(clock.time()) node = GLOBAL_ROOT.prev_node @@ -119,11 +127,36 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: logger.debug("Searching for stale caches") + evicting_due_to_memory = False + + # determine if we're evicting due to memory + jemalloc_interface = get_jemalloc_stats() + if jemalloc_interface and autotune_config: + try: + jemalloc_interface.refresh_stats() + mem_usage = jemalloc_interface.get_stat("allocated") + if mem_usage > max_cache_memory_usage: + logger.info("Begin memory-based cache eviction.") + evicting_due_to_memory = True + except Exception: + logger.warning( + "Unable to read allocated memory, skipping memory-based cache eviction." + ) + while node is not GLOBAL_ROOT: # Only the root node isn't a `_TimedListNode`. assert isinstance(node, _TimedListNode) - if node.last_access_ts_secs > now - expiry_seconds: + # if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's + # nothing to do here + if ( + node.last_access_ts_secs > now - expiry_seconds + and not evicting_due_to_memory + ): + break + + # if entry is newer than min_cache_entry_ttl then do not evict and don't evict anything newer + if evicting_due_to_memory and now - node.last_access_ts_secs < min_cache_ttl: break cache_entry = node.get_cache_entry() @@ -136,10 +169,29 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: assert cache_entry is not None cache_entry.drop_from_cache() + # Check mem allocation periodically if we are evicting a bunch of caches + if jemalloc_interface and evicting_due_to_memory and (i + 1) % 100 == 0: + try: + jemalloc_interface.refresh_stats() + mem_usage = jemalloc_interface.get_stat("allocated") + if mem_usage < target_cache_memory_usage: + evicting_due_to_memory = False + logger.info("Stop memory-based cache eviction.") + except Exception: + logger.warning( + "Unable to read allocated memory, this may affect memory-based cache eviction." + ) + # If we've failed to read the current memory usage then we + # should stop trying to evict based on memory usage + evicting_due_to_memory = False + # If we do lots of work at once we yield to allow other stuff to happen. if (i + 1) % 10000 == 0: logger.debug("Waiting during drop") - await clock.sleep(0) + if node.last_access_ts_secs > now - expiry_seconds: + await clock.sleep(0.5) + else: + await clock.sleep(0) logger.debug("Waking during drop") node = next_node @@ -156,21 +208,28 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: def setup_expire_lru_cache_entries(hs: "HomeServer") -> None: """Start a background job that expires all cache entries if they have not - been accessed for the given number of seconds. + been accessed for the given number of seconds, or if a given memory usage threshold has been + breached. """ - if not hs.config.caches.expiry_time_msec: + if not hs.config.caches.expiry_time_msec and not hs.config.caches.cache_autotuning: return - logger.info( - "Expiring LRU caches after %d seconds", hs.config.caches.expiry_time_msec / 1000 - ) + if hs.config.caches.expiry_time_msec: + expiry_time = hs.config.caches.expiry_time_msec / 1000 + logger.info("Expiring LRU caches after %d seconds", expiry_time) + else: + expiry_time = math.inf global USE_GLOBAL_LIST USE_GLOBAL_LIST = True clock = hs.get_clock() clock.looping_call( - _expire_old_entries, 30 * 1000, clock, hs.config.caches.expiry_time_msec / 1000 + _expire_old_entries, + 30 * 1000, + clock, + expiry_time, + hs.config.caches.cache_autotuning, ) |