summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/caches/lrucache.py79
1 files changed, 69 insertions, 10 deletions
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 45ff0de638..a3b60578e3 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import logging
+import math
 import threading
 import weakref
 from enum import Enum
@@ -40,6 +41,7 @@ from twisted.internet.interfaces import IReactorTime
 
 from synapse.config import cache as cache_config
 from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.metrics.jemalloc import get_jemalloc_stats
 from synapse.util import Clock, caches
 from synapse.util.caches import CacheMetric, EvictionReason, register_cache
 from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
@@ -106,10 +108,16 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node()
 
 
 @wrap_as_background_process("LruCache._expire_old_entries")
-async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
+async def _expire_old_entries(
+    clock: Clock, expiry_seconds: int, autotune_config: Optional[dict]
+) -> None:
     """Walks the global cache list to find cache entries that haven't been
-    accessed in the given number of seconds.
+    accessed in the given number of seconds, or if a given memory threshold has been breached.
     """
+    if autotune_config:
+        max_cache_memory_usage = autotune_config["max_cache_memory_usage"]
+        target_cache_memory_usage = autotune_config["target_cache_memory_usage"]
+        min_cache_ttl = autotune_config["min_cache_ttl"] / 1000
 
     now = int(clock.time())
     node = GLOBAL_ROOT.prev_node
@@ -119,11 +127,36 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
 
     logger.debug("Searching for stale caches")
 
+    evicting_due_to_memory = False
+
+    # determine if we're evicting due to memory
+    jemalloc_interface = get_jemalloc_stats()
+    if jemalloc_interface and autotune_config:
+        try:
+            jemalloc_interface.refresh_stats()
+            mem_usage = jemalloc_interface.get_stat("allocated")
+            if mem_usage > max_cache_memory_usage:
+                logger.info("Begin memory-based cache eviction.")
+                evicting_due_to_memory = True
+        except Exception:
+            logger.warning(
+                "Unable to read allocated memory, skipping memory-based cache eviction."
+            )
+
     while node is not GLOBAL_ROOT:
         # Only the root node isn't a `_TimedListNode`.
         assert isinstance(node, _TimedListNode)
 
-        if node.last_access_ts_secs > now - expiry_seconds:
+        # if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's
+        # nothing to do here
+        if (
+            node.last_access_ts_secs > now - expiry_seconds
+            and not evicting_due_to_memory
+        ):
+            break
+
+        # if entry is newer than min_cache_entry_ttl then do not evict and don't evict anything newer
+        if evicting_due_to_memory and now - node.last_access_ts_secs < min_cache_ttl:
             break
 
         cache_entry = node.get_cache_entry()
@@ -136,10 +169,29 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
         assert cache_entry is not None
         cache_entry.drop_from_cache()
 
+        # Check mem allocation periodically if we are evicting a bunch of caches
+        if jemalloc_interface and evicting_due_to_memory and (i + 1) % 100 == 0:
+            try:
+                jemalloc_interface.refresh_stats()
+                mem_usage = jemalloc_interface.get_stat("allocated")
+                if mem_usage < target_cache_memory_usage:
+                    evicting_due_to_memory = False
+                    logger.info("Stop memory-based cache eviction.")
+            except Exception:
+                logger.warning(
+                    "Unable to read allocated memory, this may affect memory-based cache eviction."
+                )
+                # If we've failed to read the current memory usage then we
+                # should stop trying to evict based on memory usage
+                evicting_due_to_memory = False
+
         # If we do lots of work at once we yield to allow other stuff to happen.
         if (i + 1) % 10000 == 0:
             logger.debug("Waiting during drop")
-            await clock.sleep(0)
+            if node.last_access_ts_secs > now - expiry_seconds:
+                await clock.sleep(0.5)
+            else:
+                await clock.sleep(0)
             logger.debug("Waking during drop")
 
         node = next_node
@@ -156,21 +208,28 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
 
 def setup_expire_lru_cache_entries(hs: "HomeServer") -> None:
     """Start a background job that expires all cache entries if they have not
-    been accessed for the given number of seconds.
+    been accessed for the given number of seconds, or if a given memory usage threshold has been
+    breached.
     """
-    if not hs.config.caches.expiry_time_msec:
+    if not hs.config.caches.expiry_time_msec and not hs.config.caches.cache_autotuning:
         return
 
-    logger.info(
-        "Expiring LRU caches after %d seconds", hs.config.caches.expiry_time_msec / 1000
-    )
+    if hs.config.caches.expiry_time_msec:
+        expiry_time = hs.config.caches.expiry_time_msec / 1000
+        logger.info("Expiring LRU caches after %d seconds", expiry_time)
+    else:
+        expiry_time = math.inf
 
     global USE_GLOBAL_LIST
     USE_GLOBAL_LIST = True
 
     clock = hs.get_clock()
     clock.looping_call(
-        _expire_old_entries, 30 * 1000, clock, hs.config.caches.expiry_time_msec / 1000
+        _expire_old_entries,
+        30 * 1000,
+        clock,
+        expiry_time,
+        hs.config.caches.cache_autotuning,
     )