diff --git a/changelog.d/12701.feature b/changelog.d/12701.feature
new file mode 100644
index 0000000000..bb2264602c
--- /dev/null
+++ b/changelog.d/12701.feature
@@ -0,0 +1 @@
+Add a config options to allow for auto-tuning of caches.
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 03a0f6314c..05a3606da0 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -784,6 +784,24 @@ caches:
#
#cache_entry_ttl: 30m
+ # This flag enables cache autotuning, and is further specified by the sub-options `max_cache_memory_usage`,
+ # `target_cache_memory_usage`, `min_cache_ttl`. These flags work in conjunction with each other to maintain
+ # a balance between cache memory usage and cache entry availability. You must be using jemalloc to utilize
+ # this option, and all three of the options must be specified for this feature to work.
+ #cache_autotuning:
+ # This flag sets a ceiling on much memory the cache can use before caches begin to be continuously evicted.
+ # They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in
+ # the flag below, or until the `min_cache_ttl` is hit.
+ #max_cache_memory_usage: 1024M
+
+ # This flag sets a rough target for the desired memory usage of the caches.
+ #target_cache_memory_usage: 758M
+
+ # 'min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when
+ # caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches
+ # from being emptied while Synapse is evicting due to memory.
+ #min_cache_ttl: 5m
+
# Controls how long the results of a /sync request are cached for after
# a successful response is returned. A higher duration can help clients with
# intermittent connections, at the cost of higher memory usage.
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 2af1f284b1..ca44363105 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -1119,7 +1119,17 @@ Caching can be configured through the following sub-options:
with intermittent connections, at the cost of higher memory usage.
By default, this is zero, which means that sync responses are not cached
at all.
-
+* `cache_autotuning` and its sub-options `max_cache_memory_usage`, `target_cache_memory_usage`, and
+ `min_cache_ttl` work in conjunction with each other to maintain a balance between cache memory
+ usage and cache entry availability. You must be using [jemalloc](https://github.com/matrix-org/synapse#help-synapse-is-slow-and-eats-all-my-ramcpu)
+ to utilize this option, and all three of the options must be specified for this feature to work.
+ * `max_cache_memory_usage` sets a ceiling on how much memory the cache can use before caches begin to be continuously evicted.
+ They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in
+ the flag below, or until the `min_cache_ttl` is hit.
+ * `target_memory_usage` sets a rough target for the desired memory usage of the caches.
+ * `min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when
+ caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches
+ from being emptied while Synapse is evicting due to memory.
Example configuration:
```yaml
@@ -1127,8 +1137,11 @@ caches:
global_factor: 1.0
per_cache_factors:
get_users_who_share_room_with_user: 2.0
- expire_caches: false
sync_response_cache_duration: 2m
+ cache_autotuning:
+ max_cache_memory_usage: 1024M
+ target_cache_memory_usage: 758M
+ min_cache_ttl: 5m
```
### Reloading cache factors
diff --git a/synapse/config/cache.py b/synapse/config/cache.py
index 58b2fe5519..d2f55534d7 100644
--- a/synapse/config/cache.py
+++ b/synapse/config/cache.py
@@ -176,6 +176,24 @@ class CacheConfig(Config):
#
#cache_entry_ttl: 30m
+ # This flag enables cache autotuning, and is further specified by the sub-options `max_cache_memory_usage`,
+ # `target_cache_memory_usage`, `min_cache_ttl`. These flags work in conjunction with each other to maintain
+ # a balance between cache memory usage and cache entry availability. You must be using jemalloc to utilize
+ # this option, and all three of the options must be specified for this feature to work.
+ #cache_autotuning:
+ # This flag sets a ceiling on much memory the cache can use before caches begin to be continuously evicted.
+ # They will continue to be evicted until the memory usage drops below the `target_memory_usage`, set in
+ # the flag below, or until the `min_cache_ttl` is hit.
+ #max_cache_memory_usage: 1024M
+
+ # This flag sets a rough target for the desired memory usage of the caches.
+ #target_cache_memory_usage: 758M
+
+ # 'min_cache_ttl` sets a limit under which newer cache entries are not evicted and is only applied when
+ # caches are actively being evicted/`max_cache_memory_usage` has been exceeded. This is to protect hot caches
+ # from being emptied while Synapse is evicting due to memory.
+ #min_cache_ttl: 5m
+
# Controls how long the results of a /sync request are cached for after
# a successful response is returned. A higher duration can help clients with
# intermittent connections, at the cost of higher memory usage.
@@ -263,6 +281,21 @@ class CacheConfig(Config):
)
self.expiry_time_msec = self.parse_duration(expiry_time)
+ self.cache_autotuning = cache_config.get("cache_autotuning")
+ if self.cache_autotuning:
+ max_memory_usage = self.cache_autotuning.get("max_cache_memory_usage")
+ self.cache_autotuning["max_cache_memory_usage"] = self.parse_size(
+ max_memory_usage
+ )
+
+ target_mem_size = self.cache_autotuning.get("target_cache_memory_usage")
+ self.cache_autotuning["target_cache_memory_usage"] = self.parse_size(
+ target_mem_size
+ )
+
+ min_cache_ttl = self.cache_autotuning.get("min_cache_ttl")
+ self.cache_autotuning["min_cache_ttl"] = self.parse_duration(min_cache_ttl)
+
self.sync_response_cache_duration = self.parse_duration(
cache_config.get("sync_response_cache_duration", 0)
)
diff --git a/synapse/metrics/jemalloc.py b/synapse/metrics/jemalloc.py
index 6bc329f04a..1fc8a0e888 100644
--- a/synapse/metrics/jemalloc.py
+++ b/synapse/metrics/jemalloc.py
@@ -18,6 +18,7 @@ import os
import re
from typing import Iterable, Optional, overload
+import attr
from prometheus_client import REGISTRY, Metric
from typing_extensions import Literal
@@ -27,52 +28,24 @@ from synapse.metrics._types import Collector
logger = logging.getLogger(__name__)
-def _setup_jemalloc_stats() -> None:
- """Checks to see if jemalloc is loaded, and hooks up a collector to record
- statistics exposed by jemalloc.
- """
-
- # Try to find the loaded jemalloc shared library, if any. We need to
- # introspect into what is loaded, rather than loading whatever is on the
- # path, as if we load a *different* jemalloc version things will seg fault.
-
- # We look in `/proc/self/maps`, which only exists on linux.
- if not os.path.exists("/proc/self/maps"):
- logger.debug("Not looking for jemalloc as no /proc/self/maps exist")
- return
-
- # We're looking for a path at the end of the line that includes
- # "libjemalloc".
- regex = re.compile(r"/\S+/libjemalloc.*$")
-
- jemalloc_path = None
- with open("/proc/self/maps") as f:
- for line in f:
- match = regex.search(line.strip())
- if match:
- jemalloc_path = match.group()
-
- if not jemalloc_path:
- # No loaded jemalloc was found.
- logger.debug("jemalloc not found")
- return
-
- logger.debug("Found jemalloc at %s", jemalloc_path)
-
- jemalloc = ctypes.CDLL(jemalloc_path)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class JemallocStats:
+ jemalloc: ctypes.CDLL
@overload
def _mallctl(
- name: str, read: Literal[True] = True, write: Optional[int] = None
+ self, name: str, read: Literal[True] = True, write: Optional[int] = None
) -> int:
...
@overload
- def _mallctl(name: str, read: Literal[False], write: Optional[int] = None) -> None:
+ def _mallctl(
+ self, name: str, read: Literal[False], write: Optional[int] = None
+ ) -> None:
...
def _mallctl(
- name: str, read: bool = True, write: Optional[int] = None
+ self, name: str, read: bool = True, write: Optional[int] = None
) -> Optional[int]:
"""Wrapper around `mallctl` for reading and writing integers to
jemalloc.
@@ -120,7 +93,7 @@ def _setup_jemalloc_stats() -> None:
# Where oldp/oldlenp is a buffer where the old value will be written to
# (if not null), and newp/newlen is the buffer with the new value to set
# (if not null). Note that they're all references *except* newlen.
- result = jemalloc.mallctl(
+ result = self.jemalloc.mallctl(
name.encode("ascii"),
input_var_ref,
input_len_ref,
@@ -136,21 +109,80 @@ def _setup_jemalloc_stats() -> None:
return input_var.value
- def _jemalloc_refresh_stats() -> None:
+ def refresh_stats(self) -> None:
"""Request that jemalloc updates its internal statistics. This needs to
be called before querying for stats, otherwise it will return stale
values.
"""
try:
- _mallctl("epoch", read=False, write=1)
+ self._mallctl("epoch", read=False, write=1)
except Exception as e:
logger.warning("Failed to reload jemalloc stats: %s", e)
+ def get_stat(self, name: str) -> int:
+ """Request the stat of the given name at the time of the last
+ `refresh_stats` call. This may throw if we fail to read
+ the stat.
+ """
+ return self._mallctl(f"stats.{name}")
+
+
+_JEMALLOC_STATS: Optional[JemallocStats] = None
+
+
+def get_jemalloc_stats() -> Optional[JemallocStats]:
+ """Returns an interface to jemalloc, if it is being used.
+
+ Note that this will always return None until `setup_jemalloc_stats` has been
+ called.
+ """
+ return _JEMALLOC_STATS
+
+
+def _setup_jemalloc_stats() -> None:
+ """Checks to see if jemalloc is loaded, and hooks up a collector to record
+ statistics exposed by jemalloc.
+ """
+
+ global _JEMALLOC_STATS
+
+ # Try to find the loaded jemalloc shared library, if any. We need to
+ # introspect into what is loaded, rather than loading whatever is on the
+ # path, as if we load a *different* jemalloc version things will seg fault.
+
+ # We look in `/proc/self/maps`, which only exists on linux.
+ if not os.path.exists("/proc/self/maps"):
+ logger.debug("Not looking for jemalloc as no /proc/self/maps exist")
+ return
+
+ # We're looking for a path at the end of the line that includes
+ # "libjemalloc".
+ regex = re.compile(r"/\S+/libjemalloc.*$")
+
+ jemalloc_path = None
+ with open("/proc/self/maps") as f:
+ for line in f:
+ match = regex.search(line.strip())
+ if match:
+ jemalloc_path = match.group()
+
+ if not jemalloc_path:
+ # No loaded jemalloc was found.
+ logger.debug("jemalloc not found")
+ return
+
+ logger.debug("Found jemalloc at %s", jemalloc_path)
+
+ jemalloc_dll = ctypes.CDLL(jemalloc_path)
+
+ stats = JemallocStats(jemalloc_dll)
+ _JEMALLOC_STATS = stats
+
class JemallocCollector(Collector):
"""Metrics for internal jemalloc stats."""
def collect(self) -> Iterable[Metric]:
- _jemalloc_refresh_stats()
+ stats.refresh_stats()
g = GaugeMetricFamily(
"jemalloc_stats_app_memory_bytes",
@@ -184,7 +216,7 @@ def _setup_jemalloc_stats() -> None:
"metadata",
):
try:
- value = _mallctl(f"stats.{t}")
+ value = stats.get_stat(t)
except Exception as e:
# There was an error fetching the value, skip.
logger.warning("Failed to read jemalloc stats.%s: %s", t, e)
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 45ff0de638..a3b60578e3 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -13,6 +13,7 @@
# limitations under the License.
import logging
+import math
import threading
import weakref
from enum import Enum
@@ -40,6 +41,7 @@ from twisted.internet.interfaces import IReactorTime
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.metrics.jemalloc import get_jemalloc_stats
from synapse.util import Clock, caches
from synapse.util.caches import CacheMetric, EvictionReason, register_cache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
@@ -106,10 +108,16 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node()
@wrap_as_background_process("LruCache._expire_old_entries")
-async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
+async def _expire_old_entries(
+ clock: Clock, expiry_seconds: int, autotune_config: Optional[dict]
+) -> None:
"""Walks the global cache list to find cache entries that haven't been
- accessed in the given number of seconds.
+ accessed in the given number of seconds, or if a given memory threshold has been breached.
"""
+ if autotune_config:
+ max_cache_memory_usage = autotune_config["max_cache_memory_usage"]
+ target_cache_memory_usage = autotune_config["target_cache_memory_usage"]
+ min_cache_ttl = autotune_config["min_cache_ttl"] / 1000
now = int(clock.time())
node = GLOBAL_ROOT.prev_node
@@ -119,11 +127,36 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
logger.debug("Searching for stale caches")
+ evicting_due_to_memory = False
+
+ # determine if we're evicting due to memory
+ jemalloc_interface = get_jemalloc_stats()
+ if jemalloc_interface and autotune_config:
+ try:
+ jemalloc_interface.refresh_stats()
+ mem_usage = jemalloc_interface.get_stat("allocated")
+ if mem_usage > max_cache_memory_usage:
+ logger.info("Begin memory-based cache eviction.")
+ evicting_due_to_memory = True
+ except Exception:
+ logger.warning(
+ "Unable to read allocated memory, skipping memory-based cache eviction."
+ )
+
while node is not GLOBAL_ROOT:
# Only the root node isn't a `_TimedListNode`.
assert isinstance(node, _TimedListNode)
- if node.last_access_ts_secs > now - expiry_seconds:
+ # if node has not aged past expiry_seconds and we are not evicting due to memory usage, there's
+ # nothing to do here
+ if (
+ node.last_access_ts_secs > now - expiry_seconds
+ and not evicting_due_to_memory
+ ):
+ break
+
+ # if entry is newer than min_cache_entry_ttl then do not evict and don't evict anything newer
+ if evicting_due_to_memory and now - node.last_access_ts_secs < min_cache_ttl:
break
cache_entry = node.get_cache_entry()
@@ -136,10 +169,29 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
assert cache_entry is not None
cache_entry.drop_from_cache()
+ # Check mem allocation periodically if we are evicting a bunch of caches
+ if jemalloc_interface and evicting_due_to_memory and (i + 1) % 100 == 0:
+ try:
+ jemalloc_interface.refresh_stats()
+ mem_usage = jemalloc_interface.get_stat("allocated")
+ if mem_usage < target_cache_memory_usage:
+ evicting_due_to_memory = False
+ logger.info("Stop memory-based cache eviction.")
+ except Exception:
+ logger.warning(
+ "Unable to read allocated memory, this may affect memory-based cache eviction."
+ )
+ # If we've failed to read the current memory usage then we
+ # should stop trying to evict based on memory usage
+ evicting_due_to_memory = False
+
# If we do lots of work at once we yield to allow other stuff to happen.
if (i + 1) % 10000 == 0:
logger.debug("Waiting during drop")
- await clock.sleep(0)
+ if node.last_access_ts_secs > now - expiry_seconds:
+ await clock.sleep(0.5)
+ else:
+ await clock.sleep(0)
logger.debug("Waking during drop")
node = next_node
@@ -156,21 +208,28 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
def setup_expire_lru_cache_entries(hs: "HomeServer") -> None:
"""Start a background job that expires all cache entries if they have not
- been accessed for the given number of seconds.
+ been accessed for the given number of seconds, or if a given memory usage threshold has been
+ breached.
"""
- if not hs.config.caches.expiry_time_msec:
+ if not hs.config.caches.expiry_time_msec and not hs.config.caches.cache_autotuning:
return
- logger.info(
- "Expiring LRU caches after %d seconds", hs.config.caches.expiry_time_msec / 1000
- )
+ if hs.config.caches.expiry_time_msec:
+ expiry_time = hs.config.caches.expiry_time_msec / 1000
+ logger.info("Expiring LRU caches after %d seconds", expiry_time)
+ else:
+ expiry_time = math.inf
global USE_GLOBAL_LIST
USE_GLOBAL_LIST = True
clock = hs.get_clock()
clock.looping_call(
- _expire_old_entries, 30 * 1000, clock, hs.config.caches.expiry_time_msec / 1000
+ _expire_old_entries,
+ 30 * 1000,
+ clock,
+ expiry_time,
+ hs.config.caches.cache_autotuning,
)
diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py
index 321fc1776f..67173a4f5b 100644
--- a/tests/util/test_lrucache.py
+++ b/tests/util/test_lrucache.py
@@ -14,8 +14,9 @@
from typing import List
-from unittest.mock import Mock
+from unittest.mock import Mock, patch
+from synapse.metrics.jemalloc import JemallocStats
from synapse.util.caches.lrucache import LruCache, setup_expire_lru_cache_entries
from synapse.util.caches.treecache import TreeCache
@@ -316,3 +317,58 @@ class TimeEvictionTestCase(unittest.HomeserverTestCase):
self.assertEqual(cache.get("key1"), None)
self.assertEqual(cache.get("key2"), 3)
+
+
+class MemoryEvictionTestCase(unittest.HomeserverTestCase):
+ @override_config(
+ {
+ "caches": {
+ "cache_autotuning": {
+ "max_cache_memory_usage": "700M",
+ "target_cache_memory_usage": "500M",
+ "min_cache_ttl": "5m",
+ }
+ }
+ }
+ )
+ @patch("synapse.util.caches.lrucache.get_jemalloc_stats")
+ def test_evict_memory(self, jemalloc_interface) -> None:
+ mock_jemalloc_class = Mock(spec=JemallocStats)
+ jemalloc_interface.return_value = mock_jemalloc_class
+
+ # set the return value of get_stat() to be greater than max_cache_memory_usage
+ mock_jemalloc_class.get_stat.return_value = 924288000
+
+ setup_expire_lru_cache_entries(self.hs)
+ cache = LruCache(4, clock=self.hs.get_clock())
+
+ cache["key1"] = 1
+ cache["key2"] = 2
+
+ # advance the reactor less than the min_cache_ttl
+ self.reactor.advance(60 * 2)
+
+ # our items should still be in the cache
+ self.assertEqual(cache.get("key1"), 1)
+ self.assertEqual(cache.get("key2"), 2)
+
+ # advance the reactor past the min_cache_ttl
+ self.reactor.advance(60 * 6)
+
+ # the items should be cleared from cache
+ self.assertEqual(cache.get("key1"), None)
+ self.assertEqual(cache.get("key2"), None)
+
+ # add more stuff to caches
+ cache["key1"] = 1
+ cache["key2"] = 2
+
+ # set the return value of get_stat() to be lower than target_cache_memory_usage
+ mock_jemalloc_class.get_stat.return_value = 10000
+
+ # advance the reactor past the min_cache_ttl
+ self.reactor.advance(60 * 6)
+
+ # the items should still be in the cache
+ self.assertEqual(cache.get("key1"), 1)
+ self.assertEqual(cache.get("key2"), 2)
|