diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 45ff0de638..a3b60578e3 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -13,6 +13,7 @@
# limitations under the License.
import logging
+import math
import threading
import weakref
from enum import Enum
@@ -40,6 +41,7 @@ from twisted.internet.interfaces import IReactorTime
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.metrics.jemalloc import get_jemalloc_stats
from synapse.util import Clock, caches
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
@@ -106,10 +108,16 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node()
@wrap_as_background_process("LruCache._expire_old_entries")
-async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
+async def _expire_old_entries(
+ clock: Clock, expiry_seconds: int, autotune_config: Optional[dict]
+) -> None:
"""Walks the global cache list to find cache entries that haven't been
- accessed in the given number of seconds.
+ accessed in the given number of seconds, or if a given memory threshold has been breached.
"""
+ if autotune_config:
+ max_cache_memory_usage = autotune_config["max_cache_memory_usage"]
+ target_cache_memory_usage = autotune_config["target_cache_memory_usage"]
+ min_cache_ttl = autotune_config["min_cache_ttl"] / 1000
now = int(clock.time())
node = GLOBAL_ROOT.prev_node
@@ -119,11 +127,36 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
logger.debug("Searching for stale caches")
+ evicting_due_to_memory = False
+
+ # determine if we're evicting due to memory
+ jemalloc_interface = get_jemalloc_stats()
+ if jemalloc_interface and autotune_config:
+ try:
+ jemalloc_interface.refresh_stats()
+ mem_usage = jemalloc_interface.get_stat("allocated")
+ if mem_usage > max_cache_memory_usage:
+ logger.info("Begin memory-based cache eviction.")
+ evicting_due_to_memory = True
+ except Exception:
+ logger.warning(
+ "Unable to read allocated memory, skipping memory-based cache eviction."
+ )
+
while node is not GLOBAL_ROOT:
# Only the root node isn't a `_TimedListNode`.
assert isinstance(node, _TimedListNode)
- if node.last_access_ts_secs > now - expiry_seconds:
+ # if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's
+ # nothing to do here
+ if (
+ node.last_access_ts_secs > now - expiry_seconds
+ and not evicting_due_to_memory
+ ):
+ break
+
+ # if entry is newer than min_cache_entry_ttl then do not evict and don't evict anything newer
+ if evicting_due_to_memory and now - node.last_access_ts_secs < min_cache_ttl:
break
cache_entry = node.get_cache_entry()
@@ -136,10 +169,29 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
assert cache_entry is not None
cache_entry.drop_from_cache()
+ # Check mem allocation periodically if we are evicting a bunch of caches
+ if jemalloc_interface and evicting_due_to_memory and (i + 1) % 100 == 0:
+ try:
+ jemalloc_interface.refresh_stats()
+ mem_usage = jemalloc_interface.get_stat("allocated")
+ if mem_usage < target_cache_memory_usage:
+ evicting_due_to_memory = False
+ logger.info("Stop memory-based cache eviction.")
+ except Exception:
+ logger.warning(
+ "Unable to read allocated memory, this may affect memory-based cache eviction."
+ )
+ # If we've failed to read the current memory usage then we
+ # should stop trying to evict based on memory usage
+ evicting_due_to_memory = False
+
# If we do lots of work at once we yield to allow other stuff to happen.
if (i + 1) % 10000 == 0:
logger.debug("Waiting during drop")
- await clock.sleep(0)
+ if node.last_access_ts_secs > now - expiry_seconds:
+ await clock.sleep(0.5)
+ else:
+ await clock.sleep(0)
logger.debug("Waking during drop")
node = next_node
@@ -156,21 +208,28 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
def setup_expire_lru_cache_entries(hs: "HomeServer") -> None:
"""Start a background job that expires all cache entries if they have not
- been accessed for the given number of seconds.
+ been accessed for the given number of seconds, or if a given memory usage threshold has been
+ breached.
"""
- if not hs.config.caches.expiry_time_msec:
+ if not hs.config.caches.expiry_time_msec and not hs.config.caches.cache_autotuning:
return
- logger.info(
- "Expiring LRU caches after %d seconds", hs.config.caches.expiry_time_msec / 1000
- )
+ if hs.config.caches.expiry_time_msec:
+ expiry_time = hs.config.caches.expiry_time_msec / 1000
+ logger.info("Expiring LRU caches after %d seconds", expiry_time)
+ else:
+ expiry_time = math.inf
global USE_GLOBAL_LIST
USE_GLOBAL_LIST = True
clock = hs.get_clock()
clock.looping_call(
- _expire_old_entries, 30 * 1000, clock, hs.config.caches.expiry_time_msec / 1000
+ _expire_old_entries,
+ 30 * 1000,
+ clock,
+ expiry_time,
+ hs.config.caches.cache_autotuning,
)
|