summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-02-13 13:24:11 +0000
committerGitHub <noreply@github.com>2024-02-13 13:24:11 +0000
commit7b4d7429f8e655fd5a1c3a65e6347577e8b49784 (patch)
tree3f56dc82682d97a9506e34b779ecd4e6d1edea20 /synapse/util
parentAdd a config to not send out device list updates for specific users (#16909) (diff)
downloadsynapse-7b4d7429f8e655fd5a1c3a65e6347577e8b49784.tar.xz
Don't invalidate the entire event cache when we purge history (#16905)
We do this by adding support to the LRU cache for "extra indices" based
on the cached value. This allows us to efficiently map from room ID to
the cached events and only invalidate those.
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/caches/lrucache.py59
1 files changed, 59 insertions, 0 deletions
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 6e8c1e84ac..a1b4f5b6a7 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -35,6 +35,7 @@ from typing import (
     Iterable,
     List,
     Optional,
+    Set,
     Tuple,
     Type,
     TypeVar,
@@ -386,6 +387,7 @@ class LruCache(Generic[KT, VT]):
         apply_cache_factor_from_config: bool = True,
         clock: Optional[Clock] = None,
         prune_unread_entries: bool = True,
+        extra_index_cb: Optional[Callable[[KT, VT], KT]] = None,
     ):
         """
         Args:
@@ -416,6 +418,20 @@ class LruCache(Generic[KT, VT]):
             prune_unread_entries: If True, cache entries that haven't been read recently
                 will be evicted from the cache in the background. Set to False to
                 opt-out of this behaviour.
+
+            extra_index_cb: If provided, the cache keeps a second index from a
+                (different) key to a cache entry based on the return value of
+                the callback. This can then be used to invalidate entries based
+                on the second type of key.
+
+                For example, for the event cache this would be a callback that
+                maps an event to its room ID, allowing invalidation of all
+                events in a given room.
+
+                Note: Though the two types of key have the same type, they are
+                in different namespaces.
+
+                Note: The new key does not have to be unique.
         """
         # Default `clock` to something sensible. Note that we rename it to
         # `real_clock` so that mypy doesn't think its still `Optional`.
@@ -463,6 +479,8 @@ class LruCache(Generic[KT, VT]):
 
         lock = threading.Lock()
 
+        extra_index: Dict[KT, Set[KT]] = {}
+
         def evict() -> None:
             while cache_len() > self.max_size:
                 # Get the last node in the list (i.e. the oldest node).
@@ -521,6 +539,11 @@ class LruCache(Generic[KT, VT]):
             if size_callback:
                 cached_cache_len[0] += size_callback(node.value)
 
+            if extra_index_cb:
+                index_key = extra_index_cb(node.key, node.value)
+                mapped_keys = extra_index.setdefault(index_key, set())
+                mapped_keys.add(node.key)
+
             if caches.TRACK_MEMORY_USAGE and metrics:
                 metrics.inc_memory_usage(node.memory)
 
@@ -537,6 +560,14 @@ class LruCache(Generic[KT, VT]):
 
             node.run_and_clear_callbacks()
 
+            if extra_index_cb:
+                index_key = extra_index_cb(node.key, node.value)
+                mapped_keys = extra_index.get(index_key)
+                if mapped_keys is not None:
+                    mapped_keys.discard(node.key)
+                    if not mapped_keys:
+                        extra_index.pop(index_key, None)
+
             if caches.TRACK_MEMORY_USAGE and metrics:
                 metrics.dec_memory_usage(node.memory)
 
@@ -748,6 +779,8 @@ class LruCache(Generic[KT, VT]):
             if size_callback:
                 cached_cache_len[0] = 0
 
+            extra_index.clear()
+
             if caches.TRACK_MEMORY_USAGE and metrics:
                 metrics.clear_memory_usage()
 
@@ -755,6 +788,28 @@ class LruCache(Generic[KT, VT]):
         def cache_contains(key: KT) -> bool:
             return key in cache
 
+        @synchronized
+        def cache_invalidate_on_extra_index(index_key: KT) -> None:
+            """Invalidates all entries that match the given extra index key.
+
+            This can only be called when `extra_index_cb` was specified.
+            """
+
+            assert extra_index_cb is not None
+
+            keys = extra_index.pop(index_key, None)
+            if not keys:
+                return
+
+            for key in keys:
+                node = cache.pop(key, None)
+                if not node:
+                    continue
+
+                evicted_len = delete_node(node)
+                if metrics:
+                    metrics.inc_evictions(EvictionReason.invalidation, evicted_len)
+
         # make sure that we clear out any excess entries after we get resized.
         self._on_resize = evict
 
@@ -771,6 +826,7 @@ class LruCache(Generic[KT, VT]):
         self.len = synchronized(cache_len)
         self.contains = cache_contains
         self.clear = cache_clear
+        self.invalidate_on_extra_index = cache_invalidate_on_extra_index
 
     def __getitem__(self, key: KT) -> VT:
         result = self.get(key, _Sentinel.sentinel)
@@ -864,6 +920,9 @@ class AsyncLruCache(Generic[KT, VT]):
         # This method should invalidate any external cache and then invalidate the LruCache.
         return self._lru_cache.invalidate(key)
 
+    def invalidate_on_extra_index_local(self, index_key: KT) -> None:
+        self._lru_cache.invalidate_on_extra_index(index_key)
+
     def invalidate_local(self, key: KT) -> None:
         """Remove an entry from the local cache