summary refs log tree commit diff
path: root/synapse/util/caches
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/caches')
-rw-r--r--synapse/util/caches/__init__.py20
-rw-r--r--synapse/util/caches/descriptors.py52
-rw-r--r--synapse/util/caches/dictionary_cache.py8
-rw-r--r--synapse/util/caches/expiringcache.py8
-rw-r--r--synapse/util/caches/response_cache.py13
-rw-r--r--synapse/util/caches/stream_change_cache.py16
6 files changed, 80 insertions, 37 deletions
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index d53569ca49..ebd715c5dc 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -24,11 +24,21 @@ DEBUG_CACHES = False
 metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
 
 caches_by_name = {}
-cache_counter = metrics.register_cache(
-    "cache",
-    lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
-    labels=["name"],
-)
+# cache_counter = metrics.register_cache(
+#     "cache",
+#     lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
+#     labels=["name"],
+# )
+
+
+def register_cache(name, cache):
+    caches_by_name[name] = cache
+    return metrics.register_cache(
+        "cache",
+        lambda: len(cache),
+        name,
+    )
+
 
 _string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
 caches_by_name["string_cache"] = _string_cache
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 758f5982b0..f31dfb22b7 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -22,7 +22,7 @@ from synapse.util.logcontext import (
     PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn
 )
 
-from . import caches_by_name, DEBUG_CACHES, cache_counter
+from . import DEBUG_CACHES, register_cache
 
 from twisted.internet import defer
 
@@ -33,6 +33,7 @@ import functools
 import inspect
 import threading
 
+
 logger = logging.getLogger(__name__)
 
 
@@ -43,6 +44,15 @@ CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
 
 
 class Cache(object):
+    __slots__ = (
+        "cache",
+        "max_entries",
+        "name",
+        "keylen",
+        "sequence",
+        "thread",
+        "metrics",
+    )
 
     def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
         if lru:
@@ -59,7 +69,7 @@ class Cache(object):
         self.keylen = keylen
         self.sequence = 0
         self.thread = None
-        caches_by_name[name] = self.cache
+        self.metrics = register_cache(name, self.cache)
 
     def check_thread(self):
         expected_thread = self.thread
@@ -74,10 +84,10 @@ class Cache(object):
     def get(self, key, default=_CacheSentinel):
         val = self.cache.get(key, _CacheSentinel)
         if val is not _CacheSentinel:
-            cache_counter.inc_hits(self.name)
+            self.metrics.inc_hits()
             return val
 
-        cache_counter.inc_misses(self.name)
+        self.metrics.inc_misses()
 
         if default is _CacheSentinel:
             raise KeyError()
@@ -293,16 +303,21 @@ class CacheListDescriptor(object):
 
             # cached is a dict arg -> deferred, where deferred results in a
             # 2-tuple (`arg`, `result`)
-            cached = {}
+            results = {}
+            cached_defers = {}
             missing = []
             for arg in list_args:
                 key = list(keyargs)
                 key[self.list_pos] = arg
 
                 try:
-                    res = cache.get(tuple(key)).observe()
-                    res.addCallback(lambda r, arg: (arg, r), arg)
-                    cached[arg] = res
+                    res = cache.get(tuple(key))
+                    if not res.has_succeeded():
+                        res = res.observe()
+                        res.addCallback(lambda r, arg: (arg, r), arg)
+                        cached_defers[arg] = res
+                    else:
+                        results[arg] = res.get_result()
                 except KeyError:
                     missing.append(arg)
 
@@ -340,12 +355,21 @@ class CacheListDescriptor(object):
                     res = observer.observe()
                     res.addCallback(lambda r, arg: (arg, r), arg)
 
-                    cached[arg] = res
-
-            return preserve_context_over_deferred(defer.gatherResults(
-                cached.values(),
-                consumeErrors=True,
-            ).addErrback(unwrapFirstError).addCallback(lambda res: dict(res)))
+                    cached_defers[arg] = res
+
+            if cached_defers:
+                def update_results_dict(res):
+                    results.update(res)
+                    return results
+
+                return preserve_context_over_deferred(defer.gatherResults(
+                    cached_defers.values(),
+                    consumeErrors=True,
+                ).addCallback(update_results_dict).addErrback(
+                    unwrapFirstError
+                ))
+            else:
+                return results
 
         obj.__dict__[self.orig.__name__] = wrapped
 
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index f92d80542b..b0ca1bb79d 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -15,7 +15,7 @@
 
 from synapse.util.caches.lrucache import LruCache
 from collections import namedtuple
-from . import caches_by_name, cache_counter
+from . import register_cache
 import threading
 import logging
 
@@ -43,7 +43,7 @@ class DictionaryCache(object):
             __slots__ = []
 
         self.sentinel = Sentinel()
-        caches_by_name[name] = self.cache
+        self.metrics = register_cache(name, self.cache)
 
     def check_thread(self):
         expected_thread = self.thread
@@ -58,7 +58,7 @@ class DictionaryCache(object):
     def get(self, key, dict_keys=None):
         entry = self.cache.get(key, self.sentinel)
         if entry is not self.sentinel:
-            cache_counter.inc_hits(self.name)
+            self.metrics.inc_hits()
 
             if dict_keys is None:
                 return DictionaryEntry(entry.full, dict(entry.value))
@@ -69,7 +69,7 @@ class DictionaryCache(object):
                     if k in entry.value
                 })
 
-        cache_counter.inc_misses(self.name)
+        self.metrics.inc_misses()
         return DictionaryEntry(False, {})
 
     def invalidate(self, key):
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 2b68c1ac93..080388958f 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches import register_cache
 
 import logging
 
@@ -49,7 +49,7 @@ class ExpiringCache(object):
 
         self._cache = {}
 
-        caches_by_name[cache_name] = self._cache
+        self.metrics = register_cache(cache_name, self._cache)
 
     def start(self):
         if not self._expiry_ms:
@@ -78,9 +78,9 @@ class ExpiringCache(object):
     def __getitem__(self, key):
         try:
             entry = self._cache[key]
-            cache_counter.inc_hits(self._cache_name)
+            self.metrics.inc_hits()
         except KeyError:
-            cache_counter.inc_misses(self._cache_name)
+            self.metrics.inc_misses()
             raise
 
         if self._reset_expiry_on_get:
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 36686b479e..00af539880 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -24,9 +24,12 @@ class ResponseCache(object):
     used rather than trying to compute a new response.
     """
 
-    def __init__(self):
+    def __init__(self, hs, timeout_ms=0):
         self.pending_result_cache = {}  # Requests that haven't finished yet.
 
+        self.clock = hs.get_clock()
+        self.timeout_sec = timeout_ms / 1000.
+
     def get(self, key):
         result = self.pending_result_cache.get(key)
         if result is not None:
@@ -39,7 +42,13 @@ class ResponseCache(object):
         self.pending_result_cache[key] = result
 
         def remove(r):
-            self.pending_result_cache.pop(key, None)
+            if self.timeout_sec:
+                self.clock.call_later(
+                    self.timeout_sec,
+                    self.pending_result_cache.pop, key, None,
+                )
+            else:
+                self.pending_result_cache.pop(key, None)
             return r
 
         result.addBoth(remove)
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index ea8a74ca69..3c051dabc4 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.util.caches import cache_counter, caches_by_name
+from synapse.util.caches import register_cache
 
 
 from blist import sorteddict
@@ -42,7 +42,7 @@ class StreamChangeCache(object):
         self._cache = sorteddict()
         self._earliest_known_stream_pos = current_stream_pos
         self.name = name
-        caches_by_name[self.name] = self._cache
+        self.metrics = register_cache(self.name, self._cache)
 
         for entity, stream_pos in prefilled_cache.items():
             self.entity_has_changed(entity, stream_pos)
@@ -53,19 +53,19 @@ class StreamChangeCache(object):
         assert type(stream_pos) is int
 
         if stream_pos < self._earliest_known_stream_pos:
-            cache_counter.inc_misses(self.name)
+            self.metrics.inc_misses()
             return True
 
         latest_entity_change_pos = self._entity_to_key.get(entity, None)
         if latest_entity_change_pos is None:
-            cache_counter.inc_hits(self.name)
+            self.metrics.inc_hits()
             return False
 
         if stream_pos < latest_entity_change_pos:
-            cache_counter.inc_misses(self.name)
+            self.metrics.inc_misses()
             return True
 
-        cache_counter.inc_hits(self.name)
+        self.metrics.inc_hits()
         return False
 
     def get_entities_changed(self, entities, stream_pos):
@@ -82,10 +82,10 @@ class StreamChangeCache(object):
                 self._cache[k] for k in keys[i:]
             ).intersection(entities)
 
-            cache_counter.inc_hits(self.name)
+            self.metrics.inc_hits()
         else:
             result = entities
-            cache_counter.inc_misses(self.name)
+            self.metrics.inc_misses()
 
         return result