diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py
index f728cd2cf2..601305487c 100644
--- a/synapse/util/caches/deferred_cache.py
+++ b/synapse/util/caches/deferred_cache.py
@@ -17,14 +17,23 @@
import enum
import threading
-from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, cast
+from typing import (
+ Callable,
+ Generic,
+ Iterable,
+ MutableMapping,
+ Optional,
+ TypeVar,
+ Union,
+ cast,
+)
from prometheus_client import Gauge
from twisted.internet import defer
+from twisted.python import failure
from synapse.util.async_helpers import ObservableDeferred
-from synapse.util.caches import register_cache
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
@@ -34,7 +43,7 @@ cache_pending_metric = Gauge(
["name"],
)
-
+T = TypeVar("T")
KT = TypeVar("KT")
VT = TypeVar("VT")
@@ -49,15 +58,12 @@ class DeferredCache(Generic[KT, VT]):
"""Wraps an LruCache, adding support for Deferred results.
It expects that each entry added with set() will be a Deferred; likewise get()
- may return an ObservableDeferred.
+ will return a Deferred.
"""
__slots__ = (
"cache",
- "name",
- "keylen",
"thread",
- "metrics",
"_pending_deferred_cache",
)
@@ -89,37 +95,27 @@ class DeferredCache(Generic[KT, VT]):
cache_type()
) # type: MutableMapping[KT, CacheEntry]
+ def metrics_cb():
+ cache_pending_metric.labels(name).set(len(self._pending_deferred_cache))
+
# cache is used for completed results and maps to the result itself, rather than
# a Deferred.
self.cache = LruCache(
max_size=max_entries,
keylen=keylen,
+ cache_name=name,
cache_type=cache_type,
size_callback=(lambda d: len(d)) if iterable else None,
- evicted_callback=self._on_evicted,
+ metrics_collection_callback=metrics_cb,
apply_cache_factor_from_config=apply_cache_factor_from_config,
- )
+ ) # type: LruCache[KT, VT]
- self.name = name
- self.keylen = keylen
self.thread = None # type: Optional[threading.Thread]
- self.metrics = register_cache(
- "cache",
- name,
- self.cache,
- collect_callback=self._metrics_collection_callback,
- )
@property
def max_entries(self):
return self.cache.max_size
- def _on_evicted(self, evicted_count):
- self.metrics.inc_evictions(evicted_count)
-
- def _metrics_collection_callback(self):
- cache_pending_metric.labels(self.name).set(len(self._pending_deferred_cache))
-
def check_thread(self):
expected_thread = self.thread
if expected_thread is None:
@@ -133,62 +129,113 @@ class DeferredCache(Generic[KT, VT]):
def get(
self,
key: KT,
- default=_Sentinel.sentinel,
callback: Optional[Callable[[], None]] = None,
update_metrics: bool = True,
- ):
+ ) -> defer.Deferred:
"""Looks the key up in the caches.
+ For symmetry with set(), this method does *not* follow the synapse logcontext
+ rules: the logcontext will not be cleared on return, and the Deferred will run
+ its callbacks in the sentinel context. In other words: wrap the result with
+ make_deferred_yieldable() before `await`ing it.
+
Args:
- key(tuple)
- default: What is returned if key is not in the caches. If not
- specified then function throws KeyError instead
- callback(fn): Gets called when the entry in the cache is invalidated
+ key:
+ callback: Gets called when the entry in the cache is invalidated
update_metrics (bool): whether to update the cache hit rate metrics
Returns:
- Either an ObservableDeferred or the result itself
+ A Deferred which completes with the result. Note that this may later fail
+ if there is an ongoing set() operation which later completes with a failure.
+
+ Raises:
+ KeyError if the key is not found in the cache
"""
callbacks = [callback] if callback else []
val = self._pending_deferred_cache.get(key, _Sentinel.sentinel)
if val is not _Sentinel.sentinel:
val.callbacks.update(callbacks)
if update_metrics:
- self.metrics.inc_hits()
- return val.deferred
-
- val = self.cache.get(key, _Sentinel.sentinel, callbacks=callbacks)
- if val is not _Sentinel.sentinel:
- self.metrics.inc_hits()
- return val
+ m = self.cache.metrics
+ assert m # we always have a name, so should always have metrics
+ m.inc_hits()
+ return val.deferred.observe()
- if update_metrics:
- self.metrics.inc_misses()
-
- if default is _Sentinel.sentinel:
+ val2 = self.cache.get(
+ key, _Sentinel.sentinel, callbacks=callbacks, update_metrics=update_metrics
+ )
+ if val2 is _Sentinel.sentinel:
raise KeyError()
else:
- return default
+ return defer.succeed(val2)
+
+ def get_immediate(
+ self, key: KT, default: T, update_metrics: bool = True
+ ) -> Union[VT, T]:
+ """If we have a *completed* cached value, return it."""
+ return self.cache.get(key, default, update_metrics=update_metrics)
def set(
self,
key: KT,
value: defer.Deferred,
callback: Optional[Callable[[], None]] = None,
- ) -> ObservableDeferred:
+ ) -> defer.Deferred:
+ """Adds a new entry to the cache (or updates an existing one).
+
+ The given `value` *must* be a Deferred.
+
+ First any existing entry for the same key is invalidated. Then a new entry
+ is added to the cache for the given key.
+
+ Until the `value` completes, calls to `get()` for the key will also result in an
+ incomplete Deferred, which will ultimately complete with the same result as
+ `value`.
+
+ If `value` completes successfully, subsequent calls to `get()` will then return
+ a completed deferred with the same result. If it *fails*, the cache is
+ invalidated and subequent calls to `get()` will raise a KeyError.
+
+ If another call to `set()` happens before `value` completes, then (a) any
+ invalidation callbacks registered in the interim will be called, (b) any
+ `get()`s in the interim will continue to complete with the result from the
+ *original* `value`, (c) any future calls to `get()` will complete with the
+ result from the *new* `value`.
+
+ It is expected that `value` does *not* follow the synapse logcontext rules - ie,
+ if it is incomplete, it runs its callbacks in the sentinel context.
+
+ Args:
+ key: Key to be set
+ value: a deferred which will complete with a result to add to the cache
+ callback: An optional callback to be called when the entry is invalidated
+ """
if not isinstance(value, defer.Deferred):
raise TypeError("not a Deferred")
callbacks = [callback] if callback else []
self.check_thread()
- observable = ObservableDeferred(value, consumeErrors=True)
- observer = observable.observe()
- entry = CacheEntry(deferred=observable, callbacks=callbacks)
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry:
existing_entry.invalidate()
+ # XXX: why don't we invalidate the entry in `self.cache` yet?
+
+ # we can save a whole load of effort if the deferred is ready.
+ if value.called:
+ result = value.result
+ if not isinstance(result, failure.Failure):
+ self.cache.set(key, result, callbacks)
+ return value
+
+ # otherwise, we'll add an entry to the _pending_deferred_cache for now,
+ # and add callbacks to add it to the cache properly later.
+
+ observable = ObservableDeferred(value, consumeErrors=True)
+ observer = observable.observe()
+ entry = CacheEntry(deferred=observable, callbacks=callbacks)
+
self._pending_deferred_cache[key] = entry
def compare_and_pop():
@@ -232,7 +279,9 @@ class DeferredCache(Generic[KT, VT]):
# _pending_deferred_cache to the real cache.
#
observer.addCallbacks(cb, eb)
- return observable
+
+ # we return a new Deferred which will be called before any subsequent observers.
+ return observable.observe()
def prefill(self, key: KT, value: VT, callback: Callable[[], None] = None):
callbacks = [callback] if callback else []
@@ -257,11 +306,12 @@ class DeferredCache(Generic[KT, VT]):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError("The cache key must be a tuple not %r" % (type(key),))
+ key = cast(KT, key)
self.cache.del_multi(key)
# if we have a pending lookup for this key, remove it from the
# _pending_deferred_cache, as above
- entry_dict = self._pending_deferred_cache.pop(cast(KT, key), None)
+ entry_dict = self._pending_deferred_cache.pop(key, None)
if entry_dict is not None:
for entry in iterate_tree_cache_entry(entry_dict):
entry.invalidate()
|