diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 867f315b2a..72227359b9 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import enum
import functools
import inspect
import logging
@@ -25,6 +24,7 @@ from typing import (
Generic,
Hashable,
Iterable,
+ List,
Mapping,
Optional,
Sequence,
@@ -52,7 +52,7 @@ CacheKey = Union[Tuple, Any]
F = TypeVar("F", bound=Callable[..., Any])
-class _CachedFunction(Generic[F]):
+class CachedFunction(Generic[F]):
invalidate: Any = None
invalidate_all: Any = None
prefill: Any = None
@@ -73,8 +73,10 @@ class _CacheDescriptorBase:
num_args: Optional[int],
uncached_args: Optional[Collection[str]] = None,
cache_context: bool = False,
+ name: Optional[str] = None,
):
self.orig = orig
+ self.name = name or orig.__name__
arg_spec = inspect.getfullargspec(orig)
all_args = arg_spec.args
@@ -143,109 +145,6 @@ class _CacheDescriptorBase:
)
-class _LruCachedFunction(Generic[F]):
- cache: LruCache[CacheKey, Any]
- __call__: F
-
-
-def lru_cache(
- *, max_entries: int = 1000, cache_context: bool = False
-) -> Callable[[F], _LruCachedFunction[F]]:
- """A method decorator that applies a memoizing cache around the function.
-
- This is more-or-less a drop-in equivalent to functools.lru_cache, although note
- that the signature is slightly different.
-
- The main differences with functools.lru_cache are:
- (a) the size of the cache can be controlled via the cache_factor mechanism
- (b) the wrapped function can request a "cache_context" which provides a
- callback mechanism to indicate that the result is no longer valid
- (c) prometheus metrics are exposed automatically.
-
- The function should take zero or more arguments, which are used as the key for the
- cache. Single-argument functions use that argument as the cache key; otherwise the
- arguments are built into a tuple.
-
- Cached functions can be "chained" (i.e. a cached function can call other cached
- functions and get appropriately invalidated when they called caches are
- invalidated) by adding a special "cache_context" argument to the function
- and passing that as a kwarg to all caches called. For example:
-
- @lru_cache(cache_context=True)
- def foo(self, key, cache_context):
- r1 = self.bar1(key, on_invalidate=cache_context.invalidate)
- r2 = self.bar2(key, on_invalidate=cache_context.invalidate)
- return r1 + r2
-
- The wrapped function also has a 'cache' property which offers direct access to the
- underlying LruCache.
- """
-
- def func(orig: F) -> _LruCachedFunction[F]:
- desc = LruCacheDescriptor(
- orig,
- max_entries=max_entries,
- cache_context=cache_context,
- )
- return cast(_LruCachedFunction[F], desc)
-
- return func
-
-
-class LruCacheDescriptor(_CacheDescriptorBase):
- """Helper for @lru_cache"""
-
- class _Sentinel(enum.Enum):
- sentinel = object()
-
- def __init__(
- self,
- orig: Callable[..., Any],
- max_entries: int = 1000,
- cache_context: bool = False,
- ):
- super().__init__(
- orig, num_args=None, uncached_args=None, cache_context=cache_context
- )
- self.max_entries = max_entries
-
- def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
- cache: LruCache[CacheKey, Any] = LruCache(
- cache_name=self.orig.__name__,
- max_size=self.max_entries,
- )
-
- get_cache_key = self.cache_key_builder
- sentinel = LruCacheDescriptor._Sentinel.sentinel
-
- @functools.wraps(self.orig)
- def _wrapped(*args: Any, **kwargs: Any) -> Any:
- invalidate_callback = kwargs.pop("on_invalidate", None)
- callbacks = (invalidate_callback,) if invalidate_callback else ()
-
- cache_key = get_cache_key(args, kwargs)
-
- ret = cache.get(cache_key, default=sentinel, callbacks=callbacks)
- if ret != sentinel:
- return ret
-
- # Add our own `cache_context` to argument list if the wrapped function
- # has asked for one
- if self.add_cache_context:
- kwargs["cache_context"] = _CacheContext.get_instance(cache, cache_key)
-
- ret2 = self.orig(obj, *args, **kwargs)
- cache.set(cache_key, ret2, callbacks=callbacks)
-
- return ret2
-
- wrapped = cast(_CachedFunction, _wrapped)
- wrapped.cache = cache
- obj.__dict__[self.orig.__name__] = wrapped
-
- return wrapped
-
-
class DeferredCacheDescriptor(_CacheDescriptorBase):
"""A method decorator that applies a memoizing cache around the function.
@@ -301,12 +200,14 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
cache_context: bool = False,
iterable: bool = False,
prune_unread_entries: bool = True,
+ name: Optional[str] = None,
):
super().__init__(
orig,
num_args=num_args,
uncached_args=uncached_args,
cache_context=cache_context,
+ name=name,
)
if tree and self.num_args < 2:
@@ -321,7 +222,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
cache: DeferredCache[CacheKey, Any] = DeferredCache(
- name=self.orig.__name__,
+ name=self.name,
max_entries=self.max_entries,
tree=self.tree,
iterable=self.iterable,
@@ -358,7 +259,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
return make_deferred_yieldable(ret)
- wrapped = cast(_CachedFunction, _wrapped)
+ wrapped = cast(CachedFunction, _wrapped)
if self.num_args == 1:
assert not self.tree
@@ -372,7 +273,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
wrapped.cache = cache
wrapped.num_args = self.num_args
- obj.__dict__[self.orig.__name__] = wrapped
+ obj.__dict__[self.name] = wrapped
return wrapped
@@ -393,6 +294,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
cached_method_name: str,
list_name: str,
num_args: Optional[int] = None,
+ name: Optional[str] = None,
):
"""
Args:
@@ -403,7 +305,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
but including list_name) to use as cache keys. Defaults to all
named args of the function.
"""
- super().__init__(orig, num_args=num_args, uncached_args=None)
+ super().__init__(orig, num_args=num_args, uncached_args=None, name=name)
self.list_name = list_name
@@ -425,6 +327,12 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
cache: DeferredCache[CacheKey, Any] = cached_method.cache
num_args = cached_method.num_args
+ if num_args != self.num_args:
+ raise TypeError(
+ "Number of args (%s) does not match underlying cache_method_name=%s (%s)."
+ % (self.num_args, self.cached_method_name, num_args)
+ )
+
@functools.wraps(self.orig)
def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]":
# If we're passed a cache_context then we'll want to call its
@@ -435,16 +343,6 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names]
list_args = arg_dict[self.list_name]
- results = {}
-
- def update_results_dict(res: Any, arg: Hashable) -> None:
- results[arg] = res
-
- # list of deferreds to wait for
- cached_defers = []
-
- missing = set()
-
# If the cache takes a single arg then that is used as the key,
# otherwise a tuple is used.
if num_args == 1:
@@ -452,6 +350,9 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
def arg_to_cache_key(arg: Hashable) -> Hashable:
return arg
+ def cache_key_to_arg(key: tuple) -> Hashable:
+ return key
+
else:
keylist = list(keyargs)
@@ -459,58 +360,53 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
keylist[self.list_pos] = arg
return tuple(keylist)
- for arg in list_args:
- try:
- res = cache.get(arg_to_cache_key(arg), callback=invalidate_callback)
- if not res.called:
- res.addCallback(update_results_dict, arg)
- cached_defers.append(res)
- else:
- results[arg] = res.result
- except KeyError:
- missing.add(arg)
+ def cache_key_to_arg(key: tuple) -> Hashable:
+ return key[self.list_pos]
+
+ cache_keys = [arg_to_cache_key(arg) for arg in list_args]
+ immediate_results, pending_deferred, missing = cache.get_bulk(
+ cache_keys, callback=invalidate_callback
+ )
+
+ results = {cache_key_to_arg(key): v for key, v in immediate_results.items()}
+
+ cached_defers: List["defer.Deferred[Any]"] = []
+ if pending_deferred:
+
+ def update_results(r: Dict) -> None:
+ for k, v in r.items():
+ results[cache_key_to_arg(k)] = v
+
+ pending_deferred.addCallback(update_results)
+ cached_defers.append(pending_deferred)
if missing:
- # we need a deferred for each entry in the list,
- # which we put in the cache. Each deferred resolves with the
- # relevant result for that key.
- deferreds_map = {}
- for arg in missing:
- deferred: "defer.Deferred[Any]" = defer.Deferred()
- deferreds_map[arg] = deferred
- key = arg_to_cache_key(arg)
- cached_defers.append(
- cache.set(key, deferred, callback=invalidate_callback)
- )
+ cache_entry = cache.start_bulk_input(missing, invalidate_callback)
def complete_all(res: Dict[Hashable, Any]) -> None:
- # the wrapped function has completed. It returns a dict.
- # We can now update our own result map, and then resolve the
- # observable deferreds in the cache.
- for e, d1 in deferreds_map.items():
- val = res.get(e, None)
- # make sure we update the results map before running the
- # deferreds, because as soon as we run the last deferred, the
- # gatherResults() below will complete and return the result
- # dict to our caller.
- results[e] = val
- d1.callback(val)
+ missing_results = {}
+ for key in missing:
+ arg = cache_key_to_arg(key)
+ val = res.get(arg, None)
+
+ results[arg] = val
+ missing_results[key] = val
+
+ cache_entry.complete_bulk(cache, missing_results)
def errback_all(f: Failure) -> None:
- # the wrapped function has failed. Propagate the failure into
- # the cache, which will invalidate the entry, and cause the
- # relevant cached_deferreds to fail, which will propagate the
- # failure to our caller.
- for d1 in deferreds_map.values():
- d1.errback(f)
+ cache_entry.error_bulk(cache, missing, f)
args_to_call = dict(arg_dict)
- args_to_call[self.list_name] = missing
+ args_to_call[self.list_name] = {
+ cache_key_to_arg(key) for key in missing
+ }
# dispatch the call, and attach the two handlers
- defer.maybeDeferred(
+ missing_d = defer.maybeDeferred(
preserve_fn(self.orig), **args_to_call
).addCallbacks(complete_all, errback_all)
+ cached_defers.append(missing_d)
if cached_defers:
d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks(
@@ -525,7 +421,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
else:
return defer.succeed(results)
- obj.__dict__[self.orig.__name__] = wrapped
+ obj.__dict__[self.name] = wrapped
return wrapped
@@ -577,7 +473,8 @@ def cached(
cache_context: bool = False,
iterable: bool = False,
prune_unread_entries: bool = True,
-) -> Callable[[F], _CachedFunction[F]]:
+ name: Optional[str] = None,
+) -> Callable[[F], CachedFunction[F]]:
func = lambda orig: DeferredCacheDescriptor(
orig,
max_entries=max_entries,
@@ -587,21 +484,26 @@ def cached(
cache_context=cache_context,
iterable=iterable,
prune_unread_entries=prune_unread_entries,
+ name=name,
)
- return cast(Callable[[F], _CachedFunction[F]], func)
+ return cast(Callable[[F], CachedFunction[F]], func)
def cachedList(
- *, cached_method_name: str, list_name: str, num_args: Optional[int] = None
-) -> Callable[[F], _CachedFunction[F]]:
+ *,
+ cached_method_name: str,
+ list_name: str,
+ num_args: Optional[int] = None,
+ name: Optional[str] = None,
+) -> Callable[[F], CachedFunction[F]]:
"""Creates a descriptor that wraps a function in a `DeferredCacheListDescriptor`.
Used to do batch lookups for an already created cache. One of the arguments
is specified as a list that is iterated through to lookup keys in the
original cache. A new tuple consisting of the (deduplicated) keys that weren't in
the cache gets passed to the original function, which is expected to results
- in a map of key to value for each passed value. THe new results are stored in the
+ in a map of key to value for each passed value. The new results are stored in the
original cache. Note that any missing values are cached as None.
Args:
@@ -628,9 +530,10 @@ def cachedList(
cached_method_name=cached_method_name,
list_name=list_name,
num_args=num_args,
+ name=name,
)
- return cast(Callable[[F], _CachedFunction[F]], func)
+ return cast(Callable[[F], CachedFunction[F]], func)
def _get_cache_key_builder(
|