From 5e7847dc923142bc68834f9b9538ada3fdd887d5 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 23 Aug 2022 10:49:59 +0100 Subject: Cache user IDs instead of profile objects (#13573) The profile objects are never used and increase cache size significantly. --- synapse/util/caches/descriptors.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) (limited to 'synapse/util/caches/descriptors.py') diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 867f315b2a..9d4bc89edb 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -73,8 +73,10 @@ class _CacheDescriptorBase: num_args: Optional[int], uncached_args: Optional[Collection[str]] = None, cache_context: bool = False, + name: Optional[str] = None, ): self.orig = orig + self.name = name or orig.__name__ arg_spec = inspect.getfullargspec(orig) all_args = arg_spec.args @@ -211,7 +213,7 @@ class LruCacheDescriptor(_CacheDescriptorBase): def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]: cache: LruCache[CacheKey, Any] = LruCache( - cache_name=self.orig.__name__, + cache_name=self.name, max_size=self.max_entries, ) @@ -241,7 +243,7 @@ class LruCacheDescriptor(_CacheDescriptorBase): wrapped = cast(_CachedFunction, _wrapped) wrapped.cache = cache - obj.__dict__[self.orig.__name__] = wrapped + obj.__dict__[self.name] = wrapped return wrapped @@ -301,12 +303,14 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): cache_context: bool = False, iterable: bool = False, prune_unread_entries: bool = True, + name: Optional[str] = None, ): super().__init__( orig, num_args=num_args, uncached_args=uncached_args, cache_context=cache_context, + name=name, ) if tree and self.num_args < 2: @@ -321,7 +325,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]: cache: DeferredCache[CacheKey, Any] = DeferredCache( - name=self.orig.__name__, + name=self.name, max_entries=self.max_entries, tree=self.tree, iterable=self.iterable, @@ -372,7 +376,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): wrapped.cache = cache wrapped.num_args = self.num_args - obj.__dict__[self.orig.__name__] = wrapped + obj.__dict__[self.name] = wrapped return wrapped @@ -393,6 +397,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): cached_method_name: str, list_name: str, num_args: Optional[int] = None, + name: Optional[str] = None, ): """ Args: @@ -403,7 +408,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): but including list_name) to use as cache keys. Defaults to all named args of the function. """ - super().__init__(orig, num_args=num_args, uncached_args=None) + super().__init__(orig, num_args=num_args, uncached_args=None, name=name) self.list_name = list_name @@ -525,7 +530,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): else: return defer.succeed(results) - obj.__dict__[self.orig.__name__] = wrapped + obj.__dict__[self.name] = wrapped return wrapped @@ -577,6 +582,7 @@ def cached( cache_context: bool = False, iterable: bool = False, prune_unread_entries: bool = True, + name: Optional[str] = None, ) -> Callable[[F], _CachedFunction[F]]: func = lambda orig: DeferredCacheDescriptor( orig, @@ -587,13 +593,18 @@ def cached( cache_context=cache_context, iterable=iterable, prune_unread_entries=prune_unread_entries, + name=name, ) return cast(Callable[[F], _CachedFunction[F]], func) def cachedList( - *, cached_method_name: str, list_name: str, num_args: Optional[int] = None + *, + cached_method_name: str, + list_name: str, + num_args: Optional[int] = None, + name: Optional[str] = None, ) -> Callable[[F], _CachedFunction[F]]: """Creates a descriptor that wraps a function in a `DeferredCacheListDescriptor`. @@ -628,6 +639,7 @@ def cachedList( cached_method_name=cached_method_name, list_name=list_name, num_args=num_args, + name=name, ) return cast(Callable[[F], _CachedFunction[F]], func) -- cgit 1.5.1 From f7ddfe17a30a50205a23bf5ca4d7d71e691e1e48 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 23 Aug 2022 15:53:27 +0100 Subject: Speed up `@cachedList` (#13591) This speeds things up by ~2x. The vast majority of the time is now spent in `LruCache` moving things around the linked lists. We do this via two things: 1. Don't create a deferred per-key during bulk set operations in `DeferredCache`. Instead, only create them if a subsequent caller asks for the key. 2. Add a bulk lookup API to `DeferredCache` rather than use a loop. --- changelog.d/13591.misc | 1 + synapse/util/caches/deferred_cache.py | 346 +++++++++++++++++++++++++--------- synapse/util/caches/descriptors.py | 89 ++++----- synapse/util/caches/treecache.py | 3 + 4 files changed, 298 insertions(+), 141 deletions(-) create mode 100644 changelog.d/13591.misc (limited to 'synapse/util/caches/descriptors.py') diff --git a/changelog.d/13591.misc b/changelog.d/13591.misc new file mode 100644 index 0000000000..080e865e55 --- /dev/null +++ b/changelog.d/13591.misc @@ -0,0 +1 @@ +Improve performance of `@cachedList`. diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 1d6ec22191..6425f851ea 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -14,15 +14,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc import enum import threading from typing import ( Callable, + Collection, + Dict, Generic, - Iterable, MutableMapping, Optional, + Set, Sized, + Tuple, TypeVar, Union, cast, @@ -31,7 +35,6 @@ from typing import ( from prometheus_client import Gauge from twisted.internet import defer -from twisted.python import failure from twisted.python.failure import Failure from synapse.util.async_helpers import ObservableDeferred @@ -94,7 +97,7 @@ class DeferredCache(Generic[KT, VT]): # _pending_deferred_cache maps from the key value to a `CacheEntry` object. self._pending_deferred_cache: Union[ - TreeCache, "MutableMapping[KT, CacheEntry]" + TreeCache, "MutableMapping[KT, CacheEntry[KT, VT]]" ] = cache_type() def metrics_cb() -> None: @@ -159,15 +162,16 @@ class DeferredCache(Generic[KT, VT]): Raises: KeyError if the key is not found in the cache """ - callbacks = [callback] if callback else [] val = self._pending_deferred_cache.get(key, _Sentinel.sentinel) if val is not _Sentinel.sentinel: - val.callbacks.update(callbacks) + val.add_invalidation_callback(key, callback) if update_metrics: m = self.cache.metrics assert m # we always have a name, so should always have metrics m.inc_hits() - return val.deferred.observe() + return val.deferred(key) + + callbacks = (callback,) if callback else () val2 = self.cache.get( key, _Sentinel.sentinel, callbacks=callbacks, update_metrics=update_metrics @@ -177,6 +181,73 @@ class DeferredCache(Generic[KT, VT]): else: return defer.succeed(val2) + def get_bulk( + self, + keys: Collection[KT], + callback: Optional[Callable[[], None]] = None, + ) -> Tuple[Dict[KT, VT], Optional["defer.Deferred[Dict[KT, VT]]"], Collection[KT]]: + """Bulk lookup of items in the cache. + + Returns: + A 3-tuple of: + 1. a dict of key/value of items already cached; + 2. a deferred that resolves to a dict of key/value of items + we're already fetching; and + 3. a collection of keys that don't appear in the previous two. + """ + + # The cached results + cached = {} + + # List of pending deferreds + pending = [] + + # Dict that gets filled out when the pending deferreds complete + pending_results = {} + + # List of keys that aren't in either cache + missing = [] + + callbacks = (callback,) if callback else () + + for key in keys: + # Check if its in the main cache. + immediate_value = self.cache.get( + key, + _Sentinel.sentinel, + callbacks=callbacks, + ) + if immediate_value is not _Sentinel.sentinel: + cached[key] = immediate_value + continue + + # Check if its in the pending cache + pending_value = self._pending_deferred_cache.get(key, _Sentinel.sentinel) + if pending_value is not _Sentinel.sentinel: + pending_value.add_invalidation_callback(key, callback) + + def completed_cb(value: VT, key: KT) -> VT: + pending_results[key] = value + return value + + # Add a callback to fill out `pending_results` when that completes + d = pending_value.deferred(key).addCallback(completed_cb, key) + pending.append(d) + continue + + # Not in either cache + missing.append(key) + + # If we've got pending deferreds, squash them into a single one that + # returns `pending_results`. + pending_deferred = None + if pending: + pending_deferred = defer.gatherResults( + pending, consumeErrors=True + ).addCallback(lambda _: pending_results) + + return (cached, pending_deferred, missing) + def get_immediate( self, key: KT, default: T, update_metrics: bool = True ) -> Union[VT, T]: @@ -218,84 +289,89 @@ class DeferredCache(Generic[KT, VT]): value: a deferred which will complete with a result to add to the cache callback: An optional callback to be called when the entry is invalidated """ - if not isinstance(value, defer.Deferred): - raise TypeError("not a Deferred") - - callbacks = [callback] if callback else [] self.check_thread() - existing_entry = self._pending_deferred_cache.pop(key, None) - if existing_entry: - existing_entry.invalidate() + self._pending_deferred_cache.pop(key, None) # XXX: why don't we invalidate the entry in `self.cache` yet? - # we can save a whole load of effort if the deferred is ready. - if value.called: - result = value.result - if not isinstance(result, failure.Failure): - self.cache.set(key, cast(VT, result), callbacks) - return value - # otherwise, we'll add an entry to the _pending_deferred_cache for now, # and add callbacks to add it to the cache properly later. + entry = CacheEntrySingle[KT, VT](value) + entry.add_invalidation_callback(key, callback) + self._pending_deferred_cache[key] = entry + deferred = entry.deferred(key).addCallbacks( + self._completed_callback, + self._error_callback, + callbackArgs=(entry, key), + errbackArgs=(entry, key), + ) - observable = ObservableDeferred(value, consumeErrors=True) - observer = observable.observe() - entry = CacheEntry(deferred=observable, callbacks=callbacks) + # we return a new Deferred which will be called before any subsequent observers. + return deferred - self._pending_deferred_cache[key] = entry + def start_bulk_input( + self, + keys: Collection[KT], + callback: Optional[Callable[[], None]] = None, + ) -> "CacheMultipleEntries[KT, VT]": + """Bulk set API for use when fetching multiple keys at once from the DB. - def compare_and_pop() -> bool: - """Check if our entry is still the one in _pending_deferred_cache, and - if so, pop it. - - Returns true if the entries matched. - """ - existing_entry = self._pending_deferred_cache.pop(key, None) - if existing_entry is entry: - return True - - # oops, the _pending_deferred_cache has been updated since - # we started our query, so we are out of date. - # - # Better put back whatever we took out. (We do it this way - # round, rather than peeking into the _pending_deferred_cache - # and then removing on a match, to make the common case faster) - if existing_entry is not None: - self._pending_deferred_cache[key] = existing_entry - - return False - - def cb(result: VT) -> None: - if compare_and_pop(): - self.cache.set(key, result, entry.callbacks) - else: - # we're not going to put this entry into the cache, so need - # to make sure that the invalidation callbacks are called. - # That was probably done when _pending_deferred_cache was - # updated, but it's possible that `set` was called without - # `invalidate` being previously called, in which case it may - # not have been. Either way, let's double-check now. - entry.invalidate() - - def eb(_fail: Failure) -> None: - compare_and_pop() - entry.invalidate() - - # once the deferred completes, we can move the entry from the - # _pending_deferred_cache to the real cache. - # - observer.addCallbacks(cb, eb) + Called *before* starting the fetch from the DB, and the caller *must* + call either `complete_bulk(..)` or `error_bulk(..)` on the return value. + """ - # we return a new Deferred which will be called before any subsequent observers. - return observable.observe() + entry = CacheMultipleEntries[KT, VT]() + entry.add_global_invalidation_callback(callback) + + for key in keys: + self._pending_deferred_cache[key] = entry + + return entry + + def _completed_callback( + self, value: VT, entry: "CacheEntry[KT, VT]", key: KT + ) -> VT: + """Called when a deferred is completed.""" + # We check if the current entry matches the entry associated with the + # deferred. If they don't match then it got invalidated. + current_entry = self._pending_deferred_cache.pop(key, None) + if current_entry is not entry: + if current_entry: + self._pending_deferred_cache[key] = current_entry + return value + + self.cache.set(key, value, entry.get_invalidation_callbacks(key)) + + return value + + def _error_callback( + self, + failure: Failure, + entry: "CacheEntry[KT, VT]", + key: KT, + ) -> Failure: + """Called when a deferred errors.""" + + # We check if the current entry matches the entry associated with the + # deferred. If they don't match then it got invalidated. + current_entry = self._pending_deferred_cache.pop(key, None) + if current_entry is not entry: + if current_entry: + self._pending_deferred_cache[key] = current_entry + return failure + + for cb in entry.get_invalidation_callbacks(key): + cb() + + return failure def prefill( self, key: KT, value: VT, callback: Optional[Callable[[], None]] = None ) -> None: - callbacks = [callback] if callback else [] + callbacks = (callback,) if callback else () self.cache.set(key, value, callbacks=callbacks) + self._pending_deferred_cache.pop(key, None) def invalidate(self, key: KT) -> None: """Delete a key, or tree of entries @@ -311,41 +387,129 @@ class DeferredCache(Generic[KT, VT]): self.cache.del_multi(key) # if we have a pending lookup for this key, remove it from the - # _pending_deferred_cache, which will (a) stop it being returned - # for future queries and (b) stop it being persisted as a proper entry + # _pending_deferred_cache, which will (a) stop it being returned for + # future queries and (b) stop it being persisted as a proper entry # in self.cache. entry = self._pending_deferred_cache.pop(key, None) - - # run the invalidation callbacks now, rather than waiting for the - # deferred to resolve. if entry: # _pending_deferred_cache.pop should either return a CacheEntry, or, in the # case of a TreeCache, a dict of keys to cache entries. Either way calling # iterate_tree_cache_entry on it will do the right thing. for entry in iterate_tree_cache_entry(entry): - entry.invalidate() + for cb in entry.get_invalidation_callbacks(key): + cb() def invalidate_all(self) -> None: self.check_thread() self.cache.clear() - for entry in self._pending_deferred_cache.values(): - entry.invalidate() + for key, entry in self._pending_deferred_cache.items(): + for cb in entry.get_invalidation_callbacks(key): + cb() + self._pending_deferred_cache.clear() -class CacheEntry: - __slots__ = ["deferred", "callbacks", "invalidated"] +class CacheEntry(Generic[KT, VT], metaclass=abc.ABCMeta): + """Abstract class for entries in `DeferredCache[KT, VT]`""" - def __init__( - self, deferred: ObservableDeferred, callbacks: Iterable[Callable[[], None]] - ): - self.deferred = deferred - self.callbacks = set(callbacks) - self.invalidated = False - - def invalidate(self) -> None: - if not self.invalidated: - self.invalidated = True - for callback in self.callbacks: - callback() - self.callbacks.clear() + @abc.abstractmethod + def deferred(self, key: KT) -> "defer.Deferred[VT]": + """Get a deferred that a caller can wait on to get the value at the + given key""" + ... + + @abc.abstractmethod + def add_invalidation_callback( + self, key: KT, callback: Optional[Callable[[], None]] + ) -> None: + """Add an invalidation callback""" + ... + + @abc.abstractmethod + def get_invalidation_callbacks(self, key: KT) -> Collection[Callable[[], None]]: + """Get all invalidation callbacks""" + ... + + +class CacheEntrySingle(CacheEntry[KT, VT]): + """An implementation of `CacheEntry` wrapping a deferred that results in a + single cache entry. + """ + + __slots__ = ["_deferred", "_callbacks"] + + def __init__(self, deferred: "defer.Deferred[VT]") -> None: + self._deferred = ObservableDeferred(deferred, consumeErrors=True) + self._callbacks: Set[Callable[[], None]] = set() + + def deferred(self, key: KT) -> "defer.Deferred[VT]": + return self._deferred.observe() + + def add_invalidation_callback( + self, key: KT, callback: Optional[Callable[[], None]] + ) -> None: + if callback is None: + return + + self._callbacks.add(callback) + + def get_invalidation_callbacks(self, key: KT) -> Collection[Callable[[], None]]: + return self._callbacks + + +class CacheMultipleEntries(CacheEntry[KT, VT]): + """Cache entry that is used for bulk lookups and insertions.""" + + __slots__ = ["_deferred", "_callbacks", "_global_callbacks"] + + def __init__(self) -> None: + self._deferred: Optional[ObservableDeferred[Dict[KT, VT]]] = None + self._callbacks: Dict[KT, Set[Callable[[], None]]] = {} + self._global_callbacks: Set[Callable[[], None]] = set() + + def deferred(self, key: KT) -> "defer.Deferred[VT]": + if not self._deferred: + self._deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) + return self._deferred.observe().addCallback(lambda res: res.get(key)) + + def add_invalidation_callback( + self, key: KT, callback: Optional[Callable[[], None]] + ) -> None: + if callback is None: + return + + self._callbacks.setdefault(key, set()).add(callback) + + def get_invalidation_callbacks(self, key: KT) -> Collection[Callable[[], None]]: + return self._callbacks.get(key, set()) | self._global_callbacks + + def add_global_invalidation_callback( + self, callback: Optional[Callable[[], None]] + ) -> None: + """Add a callback for when any keys get invalidated.""" + if callback is None: + return + + self._global_callbacks.add(callback) + + def complete_bulk( + self, + cache: DeferredCache[KT, VT], + result: Dict[KT, VT], + ) -> None: + """Called when there is a result""" + for key, value in result.items(): + cache._completed_callback(value, self, key) + + if self._deferred: + self._deferred.callback(result) + + def error_bulk( + self, cache: DeferredCache[KT, VT], keys: Collection[KT], failure: Failure + ) -> None: + """Called when bulk lookup failed.""" + for key in keys: + cache._error_callback(failure, self, key) + + if self._deferred: + self._deferred.errback(failure) diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 9d4bc89edb..10aff4d04a 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -25,6 +25,7 @@ from typing import ( Generic, Hashable, Iterable, + List, Mapping, Optional, Sequence, @@ -440,16 +441,6 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): keyargs = [arg_dict[arg_nm] for arg_nm in self.arg_names] list_args = arg_dict[self.list_name] - results = {} - - def update_results_dict(res: Any, arg: Hashable) -> None: - results[arg] = res - - # list of deferreds to wait for - cached_defers = [] - - missing = set() - # If the cache takes a single arg then that is used as the key, # otherwise a tuple is used. if num_args == 1: @@ -457,6 +448,9 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): def arg_to_cache_key(arg: Hashable) -> Hashable: return arg + def cache_key_to_arg(key: tuple) -> Hashable: + return key + else: keylist = list(keyargs) @@ -464,58 +458,53 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): keylist[self.list_pos] = arg return tuple(keylist) - for arg in list_args: - try: - res = cache.get(arg_to_cache_key(arg), callback=invalidate_callback) - if not res.called: - res.addCallback(update_results_dict, arg) - cached_defers.append(res) - else: - results[arg] = res.result - except KeyError: - missing.add(arg) + def cache_key_to_arg(key: tuple) -> Hashable: + return key[self.list_pos] + + cache_keys = [arg_to_cache_key(arg) for arg in list_args] + immediate_results, pending_deferred, missing = cache.get_bulk( + cache_keys, callback=invalidate_callback + ) + + results = {cache_key_to_arg(key): v for key, v in immediate_results.items()} + + cached_defers: List["defer.Deferred[Any]"] = [] + if pending_deferred: + + def update_results(r: Dict) -> None: + for k, v in r.items(): + results[cache_key_to_arg(k)] = v + + pending_deferred.addCallback(update_results) + cached_defers.append(pending_deferred) if missing: - # we need a deferred for each entry in the list, - # which we put in the cache. Each deferred resolves with the - # relevant result for that key. - deferreds_map = {} - for arg in missing: - deferred: "defer.Deferred[Any]" = defer.Deferred() - deferreds_map[arg] = deferred - key = arg_to_cache_key(arg) - cached_defers.append( - cache.set(key, deferred, callback=invalidate_callback) - ) + cache_entry = cache.start_bulk_input(missing, invalidate_callback) def complete_all(res: Dict[Hashable, Any]) -> None: - # the wrapped function has completed. It returns a dict. - # We can now update our own result map, and then resolve the - # observable deferreds in the cache. - for e, d1 in deferreds_map.items(): - val = res.get(e, None) - # make sure we update the results map before running the - # deferreds, because as soon as we run the last deferred, the - # gatherResults() below will complete and return the result - # dict to our caller. - results[e] = val - d1.callback(val) + missing_results = {} + for key in missing: + arg = cache_key_to_arg(key) + val = res.get(arg, None) + + results[arg] = val + missing_results[key] = val + + cache_entry.complete_bulk(cache, missing_results) def errback_all(f: Failure) -> None: - # the wrapped function has failed. Propagate the failure into - # the cache, which will invalidate the entry, and cause the - # relevant cached_deferreds to fail, which will propagate the - # failure to our caller. - for d1 in deferreds_map.values(): - d1.errback(f) + cache_entry.error_bulk(cache, missing, f) args_to_call = dict(arg_dict) - args_to_call[self.list_name] = missing + args_to_call[self.list_name] = { + cache_key_to_arg(key) for key in missing + } # dispatch the call, and attach the two handlers - defer.maybeDeferred( + missing_d = defer.maybeDeferred( preserve_fn(self.orig), **args_to_call ).addCallbacks(complete_all, errback_all) + cached_defers.append(missing_d) if cached_defers: d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks( diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index c1b8ec0c73..fec31da2b6 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -135,6 +135,9 @@ class TreeCache: def values(self): return iterate_tree_cache_entry(self.root) + def items(self): + return iterate_tree_cache_items((), self.root) + def __len__(self) -> int: return self.size -- cgit 1.5.1 From 6bd8763804dc0987c7ecd37bcb5ebff465fffa29 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 21 Sep 2022 15:32:01 +0200 Subject: Add cache invalidation across workers to module API (#13667) Signed-off-by: Mathieu Velten --- changelog.d/13667.feature | 1 + scripts-dev/mypy_synapse_plugin.py | 4 +- synapse/module_api/__init__.py | 33 ++++++++- synapse/storage/_base.py | 23 +++++-- synapse/storage/databases/main/cache.py | 20 ++++-- synapse/util/caches/descriptors.py | 14 ++-- .../replication/test_module_cache_invalidation.py | 79 ++++++++++++++++++++++ 7 files changed, 153 insertions(+), 21 deletions(-) create mode 100644 changelog.d/13667.feature create mode 100644 tests/replication/test_module_cache_invalidation.py (limited to 'synapse/util/caches/descriptors.py') diff --git a/changelog.d/13667.feature b/changelog.d/13667.feature new file mode 100644 index 0000000000..a0b3cfe18c --- /dev/null +++ b/changelog.d/13667.feature @@ -0,0 +1 @@ +Add cache invalidation across workers to module API. diff --git a/scripts-dev/mypy_synapse_plugin.py b/scripts-dev/mypy_synapse_plugin.py index d08517a953..2c377533c0 100644 --- a/scripts-dev/mypy_synapse_plugin.py +++ b/scripts-dev/mypy_synapse_plugin.py @@ -29,7 +29,7 @@ class SynapsePlugin(Plugin): self, fullname: str ) -> Optional[Callable[[MethodSigContext], CallableType]]: if fullname.startswith( - "synapse.util.caches.descriptors._CachedFunction.__call__" + "synapse.util.caches.descriptors.CachedFunction.__call__" ) or fullname.startswith( "synapse.util.caches.descriptors._LruCachedFunction.__call__" ): @@ -38,7 +38,7 @@ class SynapsePlugin(Plugin): def cached_function_method_signature(ctx: MethodSigContext) -> CallableType: - """Fixes the `_CachedFunction.__call__` signature to be correct. + """Fixes the `CachedFunction.__call__` signature to be correct. It already has *almost* the correct signature, except: diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 9287c0fb8d..59755bff6d 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -125,7 +125,7 @@ from synapse.types import ( ) from synapse.util import Clock from synapse.util.async_helpers import maybe_awaitable -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import CachedFunction, cached from synapse.util.frozenutils import freeze if TYPE_CHECKING: @@ -836,6 +836,37 @@ class ModuleApi: self._store.db_pool.runInteraction(desc, func, *args, **kwargs) # type: ignore[arg-type] ) + def register_cached_function(self, cached_func: CachedFunction) -> None: + """Register a cached function that should be invalidated across workers. + Invalidation local to a worker can be done directly using `cached_func.invalidate`, + however invalidation that needs to go to other workers needs to call `invalidate_cache` + on the module API instead. + + Args: + cached_function: The cached function that will be registered to receive invalidation + locally and from other workers. + """ + self._store.register_external_cached_function( + f"{cached_func.__module__}.{cached_func.__name__}", cached_func + ) + + async def invalidate_cache( + self, cached_func: CachedFunction, keys: Tuple[Any, ...] + ) -> None: + """Invalidate a cache entry of a cached function across workers. The cached function + needs to be registered on all workers first with `register_cached_function`. + + Args: + cached_function: The cached function that needs an invalidation + keys: keys of the entry to invalidate, usually matching the arguments of the + cached function. + """ + cached_func.invalidate(keys) + await self._store.send_invalidation_to_replication( + f"{cached_func.__module__}.{cached_func.__name__}", + keys, + ) + async def complete_sso_login_async( self, registered_user_id: str, diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e30f9c76d4..303a5d5298 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -15,12 +15,13 @@ # limitations under the License. import logging from abc import ABCMeta -from typing import TYPE_CHECKING, Any, Collection, Iterable, Optional, Union +from typing import TYPE_CHECKING, Any, Collection, Dict, Iterable, Optional, Union from synapse.storage.database import make_in_list_sql_clause # noqa: F401; noqa: F401 from synapse.storage.database import DatabasePool, LoggingDatabaseConnection from synapse.types import get_domain_from_id from synapse.util import json_decoder +from synapse.util.caches.descriptors import CachedFunction if TYPE_CHECKING: from synapse.server import HomeServer @@ -47,6 +48,8 @@ class SQLBaseStore(metaclass=ABCMeta): self.database_engine = database.engine self.db_pool = database + self.external_cached_functions: Dict[str, CachedFunction] = {} + def process_replication_rows( self, stream_name: str, @@ -95,7 +98,7 @@ class SQLBaseStore(metaclass=ABCMeta): def _attempt_to_invalidate_cache( self, cache_name: str, key: Optional[Collection[Any]] - ) -> None: + ) -> bool: """Attempts to invalidate the cache of the given name, ignoring if the cache doesn't exist. Mainly used for invalidating caches on workers, where they may not have the cache. @@ -113,9 +116,12 @@ class SQLBaseStore(metaclass=ABCMeta): try: cache = getattr(self, cache_name) except AttributeError: - # We probably haven't pulled in the cache in this worker, - # which is fine. - return + # Check if an externally defined module cache has been registered + cache = self.external_cached_functions.get(cache_name) + if not cache: + # We probably haven't pulled in the cache in this worker, + # which is fine. + return False if key is None: cache.invalidate_all() @@ -125,6 +131,13 @@ class SQLBaseStore(metaclass=ABCMeta): invalidate_method = getattr(cache, "invalidate_local", cache.invalidate) invalidate_method(tuple(key)) + return True + + def register_external_cached_function( + self, cache_name: str, func: CachedFunction + ) -> None: + self.external_cached_functions[cache_name] = func + def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any: """ diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 12e9a42382..2c421151c1 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -33,7 +33,7 @@ from synapse.storage.database import ( ) from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.util.caches.descriptors import _CachedFunction +from synapse.util.caches.descriptors import CachedFunction from synapse.util.iterutils import batch_iter if TYPE_CHECKING: @@ -269,9 +269,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): return cache_func.invalidate(keys) - await self.db_pool.runInteraction( - "invalidate_cache_and_stream", - self._send_invalidation_to_replication, + await self.send_invalidation_to_replication( cache_func.__name__, keys, ) @@ -279,7 +277,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): def _invalidate_cache_and_stream( self, txn: LoggingTransaction, - cache_func: _CachedFunction, + cache_func: CachedFunction, keys: Tuple[Any, ...], ) -> None: """Invalidates the cache and adds it to the cache stream so slaves @@ -293,7 +291,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._send_invalidation_to_replication(txn, cache_func.__name__, keys) def _invalidate_all_cache_and_stream( - self, txn: LoggingTransaction, cache_func: _CachedFunction + self, txn: LoggingTransaction, cache_func: CachedFunction ) -> None: """Invalidates the entire cache and adds it to the cache stream so slaves will know to invalidate their caches. @@ -334,6 +332,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore): txn, CURRENT_STATE_CACHE_NAME, [room_id] ) + async def send_invalidation_to_replication( + self, cache_name: str, keys: Optional[Collection[Any]] + ) -> None: + await self.db_pool.runInteraction( + "send_invalidation_to_replication", + self._send_invalidation_to_replication, + cache_name, + keys, + ) + def _send_invalidation_to_replication( self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[Any]] ) -> None: diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 10aff4d04a..3909f1caea 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -53,7 +53,7 @@ CacheKey = Union[Tuple, Any] F = TypeVar("F", bound=Callable[..., Any]) -class _CachedFunction(Generic[F]): +class CachedFunction(Generic[F]): invalidate: Any = None invalidate_all: Any = None prefill: Any = None @@ -242,7 +242,7 @@ class LruCacheDescriptor(_CacheDescriptorBase): return ret2 - wrapped = cast(_CachedFunction, _wrapped) + wrapped = cast(CachedFunction, _wrapped) wrapped.cache = cache obj.__dict__[self.name] = wrapped @@ -363,7 +363,7 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): return make_deferred_yieldable(ret) - wrapped = cast(_CachedFunction, _wrapped) + wrapped = cast(CachedFunction, _wrapped) if self.num_args == 1: assert not self.tree @@ -572,7 +572,7 @@ def cached( iterable: bool = False, prune_unread_entries: bool = True, name: Optional[str] = None, -) -> Callable[[F], _CachedFunction[F]]: +) -> Callable[[F], CachedFunction[F]]: func = lambda orig: DeferredCacheDescriptor( orig, max_entries=max_entries, @@ -585,7 +585,7 @@ def cached( name=name, ) - return cast(Callable[[F], _CachedFunction[F]], func) + return cast(Callable[[F], CachedFunction[F]], func) def cachedList( @@ -594,7 +594,7 @@ def cachedList( list_name: str, num_args: Optional[int] = None, name: Optional[str] = None, -) -> Callable[[F], _CachedFunction[F]]: +) -> Callable[[F], CachedFunction[F]]: """Creates a descriptor that wraps a function in a `DeferredCacheListDescriptor`. Used to do batch lookups for an already created cache. One of the arguments @@ -631,7 +631,7 @@ def cachedList( name=name, ) - return cast(Callable[[F], _CachedFunction[F]], func) + return cast(Callable[[F], CachedFunction[F]], func) def _get_cache_key_builder( diff --git a/tests/replication/test_module_cache_invalidation.py b/tests/replication/test_module_cache_invalidation.py new file mode 100644 index 0000000000..b93cae67d3 --- /dev/null +++ b/tests/replication/test_module_cache_invalidation.py @@ -0,0 +1,79 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +import synapse +from synapse.module_api import cached + +from tests.replication._base import BaseMultiWorkerStreamTestCase + +logger = logging.getLogger(__name__) + +FIRST_VALUE = "one" +SECOND_VALUE = "two" + +KEY = "mykey" + + +class TestCache: + current_value = FIRST_VALUE + + @cached() + async def cached_function(self, user_id: str) -> str: + return self.current_value + + +class ModuleCacheInvalidationTestCase(BaseMultiWorkerStreamTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + ] + + def test_module_cache_full_invalidation(self): + main_cache = TestCache() + self.hs.get_module_api().register_cached_function(main_cache.cached_function) + + worker_hs = self.make_worker_hs("synapse.app.generic_worker") + + worker_cache = TestCache() + worker_hs.get_module_api().register_cached_function( + worker_cache.cached_function + ) + + self.assertEqual(FIRST_VALUE, self.get_success(main_cache.cached_function(KEY))) + self.assertEqual( + FIRST_VALUE, self.get_success(worker_cache.cached_function(KEY)) + ) + + main_cache.current_value = SECOND_VALUE + worker_cache.current_value = SECOND_VALUE + # No invalidation yet, should return the cached value on both the main process and the worker + self.assertEqual(FIRST_VALUE, self.get_success(main_cache.cached_function(KEY))) + self.assertEqual( + FIRST_VALUE, self.get_success(worker_cache.cached_function(KEY)) + ) + + # Full invalidation on the main process, should be replicated on the worker that + # should returned the updated value too + self.get_success( + self.hs.get_module_api().invalidate_cache( + main_cache.cached_function, (KEY,) + ) + ) + + self.assertEqual( + SECOND_VALUE, self.get_success(main_cache.cached_function(KEY)) + ) + self.assertEqual( + SECOND_VALUE, self.get_success(worker_cache.cached_function(KEY)) + ) -- cgit 1.5.1 From 29269d9d3f3419a3d92cdd80dae4a37e2d99a395 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Tue, 27 Sep 2022 15:55:43 -0500 Subject: Fix `have_seen_event` cache not being invalidated (#13863) Fix https://github.com/matrix-org/synapse/issues/13856 Fix https://github.com/matrix-org/synapse/issues/13865 > Discovered while trying to make Synapse fast enough for [this MSC2716 test for importing many batches](https://github.com/matrix-org/complement/pull/214#discussion_r741678240). As an example, disabling the `have_seen_event` cache saves 10 seconds for each `/messages` request in that MSC2716 Complement test because we're not making as many federation requests for `/state` (speeding up `have_seen_event` itself is related to https://github.com/matrix-org/synapse/issues/13625) > > But this will also make `/messages` faster in general so we can include it in the [faster `/messages` milestone](https://github.com/matrix-org/synapse/milestone/11). > > *-- https://github.com/matrix-org/synapse/issues/13856* ### The problem `_invalidate_caches_for_event` doesn't run in monolith mode which means we never even tried to clear the `have_seen_event` and other caches. And even in worker mode, it only runs on the workers, not the master (AFAICT). Additionally there was bug with the key being wrong so `_invalidate_caches_for_event` never invalidates the `have_seen_event` cache even when it does run. Because we were using the `@cachedList` wrong, it was putting items in the cache under keys like `((room_id, event_id),)` with a `set` in a `set` (ex. `(('!TnCIJPKzdQdUlIyXdQ:test', '$Iu0eqEBN7qcyF1S9B3oNB3I91v2o5YOgRNPwi_78s-k'),)`) and we we're trying to invalidate with just `(room_id, event_id)` which did nothing. --- changelog.d/13863.bugfix | 1 + synapse/storage/databases/main/events_worker.py | 40 +++--- synapse/util/caches/descriptors.py | 6 + tests/storage/databases/main/test_events_worker.py | 152 ++++++++++++++------- tests/util/caches/test_descriptors.py | 33 ++++- 5 files changed, 165 insertions(+), 67 deletions(-) create mode 100644 changelog.d/13863.bugfix (limited to 'synapse/util/caches/descriptors.py') diff --git a/changelog.d/13863.bugfix b/changelog.d/13863.bugfix new file mode 100644 index 0000000000..74264a4fab --- /dev/null +++ b/changelog.d/13863.bugfix @@ -0,0 +1 @@ +Fix `have_seen_event` cache not being invalidated after we persist an event which causes inefficiency effects like extra `/state` federation calls. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 52914febf9..7cdc9fe98f 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1474,32 +1474,38 @@ class EventsWorkerStore(SQLBaseStore): # the batches as big as possible. results: Set[str] = set() - for chunk in batch_iter(event_ids, 500): - r = await self._have_seen_events_dict( - [(room_id, event_id) for event_id in chunk] + for event_ids_chunk in batch_iter(event_ids, 500): + events_seen_dict = await self._have_seen_events_dict( + room_id, event_ids_chunk + ) + results.update( + eid for (eid, have_event) in events_seen_dict.items() if have_event ) - results.update(eid for ((_rid, eid), have_event) in r.items() if have_event) return results - @cachedList(cached_method_name="have_seen_event", list_name="keys") + @cachedList(cached_method_name="have_seen_event", list_name="event_ids") async def _have_seen_events_dict( - self, keys: Collection[Tuple[str, str]] - ) -> Dict[Tuple[str, str], bool]: + self, + room_id: str, + event_ids: Collection[str], + ) -> Dict[str, bool]: """Helper for have_seen_events Returns: - a dict {(room_id, event_id)-> bool} + a dict {event_id -> bool} """ # if the event cache contains the event, obviously we've seen it. cache_results = { - (rid, eid) - for (rid, eid) in keys - if await self._get_event_cache.contains((eid,)) + event_id + for event_id in event_ids + if await self._get_event_cache.contains((event_id,)) } results = dict.fromkeys(cache_results, True) - remaining = [k for k in keys if k not in cache_results] + remaining = [ + event_id for event_id in event_ids if event_id not in cache_results + ] if not remaining: return results @@ -1511,23 +1517,21 @@ class EventsWorkerStore(SQLBaseStore): sql = "SELECT event_id FROM events AS e WHERE " clause, args = make_in_list_sql_clause( - txn.database_engine, "e.event_id", [eid for (_rid, eid) in remaining] + txn.database_engine, "e.event_id", remaining ) txn.execute(sql + clause, args) found_events = {eid for eid, in txn} # ... and then we can update the results for each key - results.update( - {(rid, eid): (eid in found_events) for (rid, eid) in remaining} - ) + results.update({eid: (eid in found_events) for eid in remaining}) await self.db_pool.runInteraction("have_seen_events", have_seen_events_txn) return results @cached(max_entries=100000, tree=True) async def have_seen_event(self, room_id: str, event_id: str) -> bool: - res = await self._have_seen_events_dict(((room_id, event_id),)) - return res[(room_id, event_id)] + res = await self._have_seen_events_dict(room_id, [event_id]) + return res[event_id] def _get_current_state_event_counts_txn( self, txn: LoggingTransaction, room_id: str diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 3909f1caea..0391966462 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -431,6 +431,12 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): cache: DeferredCache[CacheKey, Any] = cached_method.cache num_args = cached_method.num_args + if num_args != self.num_args: + raise Exception( + "Number of args (%s) does not match underlying cache_method_name=%s (%s)." + % (self.num_args, self.cached_method_name, num_args) + ) + @functools.wraps(self.orig) def wrapped(*args: Any, **kwargs: Any) -> "defer.Deferred[Dict]": # If we're passed a cache_context then we'll want to call its diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 67401272ac..32a798d74b 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -35,66 +35,45 @@ from synapse.util import Clock from synapse.util.async_helpers import yieldable_gather_results from tests import unittest +from tests.test_utils.event_injection import create_event, inject_event class HaveSeenEventsTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + def prepare(self, reactor, clock, hs): + self.hs = hs self.store: EventsWorkerStore = hs.get_datastores().main - # insert some test data - for rid in ("room1", "room2"): - self.get_success( - self.store.db_pool.simple_insert( - "rooms", - {"room_id": rid, "room_version": 4}, - ) - ) + self.user = self.register_user("user", "pass") + self.token = self.login(self.user, "pass") + self.room_id = self.helper.create_room_as(self.user, tok=self.token) self.event_ids: List[str] = [] - for idx, rid in enumerate( - ( - "room1", - "room1", - "room1", - "room2", - ) - ): - event_json = {"type": f"test {idx}", "room_id": rid} - event = make_event_from_dict(event_json, room_version=RoomVersions.V4) - event_id = event.event_id - - self.get_success( - self.store.db_pool.simple_insert( - "events", - { - "event_id": event_id, - "room_id": rid, - "topological_ordering": idx, - "stream_ordering": idx, - "type": event.type, - "processed": True, - "outlier": False, - }, + for i in range(3): + event = self.get_success( + inject_event( + hs, + room_version=RoomVersions.V7.identifier, + room_id=self.room_id, + sender=self.user, + type="test_event_type", + content={"body": f"foobarbaz{i}"}, ) ) - self.get_success( - self.store.db_pool.simple_insert( - "event_json", - { - "event_id": event_id, - "room_id": rid, - "json": json.dumps(event_json), - "internal_metadata": "{}", - "format_version": 3, - }, - ) - ) - self.event_ids.append(event_id) + + self.event_ids.append(event.event_id) def test_simple(self): with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + self.store.have_seen_events( + self.room_id, [self.event_ids[0], "eventdoesnotexist"] + ) ) self.assertEqual(res, {self.event_ids[0]}) @@ -104,7 +83,9 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase): # a second lookup of the same events should cause no queries with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + self.store.have_seen_events( + self.room_id, [self.event_ids[0], "eventdoesnotexist"] + ) ) self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) @@ -116,11 +97,86 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase): # looking it up should now cause no db hits with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0]]) + self.store.have_seen_events(self.room_id, [self.event_ids[0]]) ) self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) + def test_persisting_event_invalidates_cache(self): + """ + Test to make sure that the `have_seen_event` cache + is invalidated after we persist an event and returns + the updated value. + """ + event, event_context = self.get_success( + create_event( + self.hs, + room_id=self.room_id, + sender=self.user, + type="test_event_type", + content={"body": "garply"}, + ) + ) + + with LoggingContext(name="test") as ctx: + # First, check `have_seen_event` for an event we have not seen yet + # to prime the cache with a `false` value. + res = self.get_success( + self.store.have_seen_events(event.room_id, [event.event_id]) + ) + self.assertEqual(res, set()) + + # That should result in a single db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + + # Persist the event which should invalidate or prefill the + # `have_seen_event` cache so we don't return stale values. + persistence = self.hs.get_storage_controllers().persistence + self.get_success( + persistence.persist_event( + event, + event_context, + ) + ) + + with LoggingContext(name="test") as ctx: + # Check `have_seen_event` again and we should see the updated fact + # that we have now seen the event after persisting it. + res = self.get_success( + self.store.have_seen_events(event.room_id, [event.event_id]) + ) + self.assertEqual(res, {event.event_id}) + + # That should result in a single db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + + def test_invalidate_cache_by_room_id(self): + """ + Test to make sure that all events associated with the given `(room_id,)` + are invalidated in the `have_seen_event` cache. + """ + with LoggingContext(name="test") as ctx: + # Prime the cache with some values + res = self.get_success( + self.store.have_seen_events(self.room_id, self.event_ids) + ) + self.assertEqual(res, set(self.event_ids)) + + # That should result in a single db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + + # Clear the cache with any events associated with the `room_id` + self.store.have_seen_event.invalidate((self.room_id,)) + + with LoggingContext(name="test") as ctx: + res = self.get_success( + self.store.have_seen_events(self.room_id, self.event_ids) + ) + self.assertEqual(res, set(self.event_ids)) + + # Since we cleared the cache, it should result in another db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + class EventCacheTestCase(unittest.HomeserverTestCase): """Test that the various layers of event cache works.""" diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 48e616ac74..90861fe522 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Set +from typing import Iterable, Set, Tuple from unittest import mock from twisted.internet import defer, reactor @@ -1008,3 +1008,34 @@ class CachedListDescriptorTestCase(unittest.TestCase): obj.inner_context_was_finished, "Tried to restart a finished logcontext" ) self.assertEqual(current_context(), SENTINEL_CONTEXT) + + def test_num_args_mismatch(self): + """ + Make sure someone does not accidentally use @cachedList on a method with + a mismatch in the number args to the underlying single cache method. + """ + + class Cls: + @descriptors.cached(tree=True) + def fn(self, room_id, event_id): + pass + + # This is wrong ❌. `@cachedList` expects to be given the same number + # of arguments as the underlying cached function, just with one of + # the arguments being an iterable + @descriptors.cachedList(cached_method_name="fn", list_name="keys") + def list_fn(self, keys: Iterable[Tuple[str, str]]): + pass + + # Corrected syntax ✅ + # + # @cachedList(cached_method_name="fn", list_name="event_ids") + # async def list_fn( + # self, room_id: str, event_ids: Collection[str], + # ) + + obj = Cls() + + # Make sure this raises an error about the arg mismatch + with self.assertRaises(Exception): + obj.list_fn([("foo", "bar")]) -- cgit 1.5.1 From 0b7830e457359ce651b293c8748bf636973404a9 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 19 Oct 2022 19:38:24 +0000 Subject: Bump flake8-bugbear from 21.3.2 to 22.9.23 (#14042) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Erik Johnston Co-authored-by: David Robertson --- .flake8 | 9 ++++++++- changelog.d/14042.misc | 1 + poetry.lock | 8 ++++---- synapse/storage/databases/main/roommember.py | 4 ++-- synapse/util/caches/deferred_cache.py | 4 ++-- synapse/util/caches/descriptors.py | 2 +- tests/federation/transport/test_client.py | 7 +++---- tests/util/caches/test_descriptors.py | 2 +- 8 files changed, 22 insertions(+), 15 deletions(-) create mode 100644 changelog.d/14042.misc (limited to 'synapse/util/caches/descriptors.py') diff --git a/.flake8 b/.flake8 index acb118c86e..4c6a4d5843 100644 --- a/.flake8 +++ b/.flake8 @@ -8,4 +8,11 @@ # E203: whitespace before ':' (which is contrary to pep8?) # E731: do not assign a lambda expression, use a def # E501: Line too long (black enforces this for us) -ignore=W503,W504,E203,E731,E501 +# +# flake8-bugbear runs extra checks. Its error codes are described at +# https://github.com/PyCQA/flake8-bugbear#list-of-warnings +# B019: Use of functools.lru_cache or functools.cache on methods can lead to memory leaks +# B023: Functions defined inside a loop must not use variables redefined in the loop +# B024: Abstract base class with no abstract method. + +ignore=W503,W504,E203,E731,E501,B019,B023,B024 diff --git a/changelog.d/14042.misc b/changelog.d/14042.misc new file mode 100644 index 0000000000..868d55e76a --- /dev/null +++ b/changelog.d/14042.misc @@ -0,0 +1 @@ +Bump flake8-bugbear from 21.3.2 to 22.9.23. diff --git a/poetry.lock b/poetry.lock index ed0b59fbe5..0a2f9ab69e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -260,7 +260,7 @@ pyflakes = ">=2.4.0,<2.5.0" [[package]] name = "flake8-bugbear" -version = "21.3.2" +version = "22.9.23" description = "A plugin for flake8 finding likely bugs and design problems in your program. Contains warnings that don't belong in pyflakes and pycodestyle." category = "dev" optional = false @@ -271,7 +271,7 @@ attrs = ">=19.2.0" flake8 = ">=3.0.0" [package.extras] -dev = ["black", "coverage", "hypothesis", "hypothesmith"] +dev = ["coverage", "hypothesis", "hypothesmith (>=0.2)", "pre-commit"] [[package]] name = "flake8-comprehensions" @@ -1826,8 +1826,8 @@ flake8 = [ {file = "flake8-4.0.1.tar.gz", hash = "sha256:806e034dda44114815e23c16ef92f95c91e4c71100ff52813adf7132a6ad870d"}, ] flake8-bugbear = [ - {file = "flake8-bugbear-21.3.2.tar.gz", hash = "sha256:cadce434ceef96463b45a7c3000f23527c04ea4b531d16c7ac8886051f516ca0"}, - {file = "flake8_bugbear-21.3.2-py36.py37.py38-none-any.whl", hash = "sha256:5d6ccb0c0676c738a6e066b4d50589c408dcc1c5bf1d73b464b18b73cd6c05c2"}, + {file = "flake8-bugbear-22.9.23.tar.gz", hash = "sha256:17b9623325e6e0dcdcc80ed9e4aa811287fcc81d7e03313b8736ea5733759937"}, + {file = "flake8_bugbear-22.9.23-py3-none-any.whl", hash = "sha256:cd2779b2b7ada212d7a322814a1e5651f1868ab0d3f24cc9da66169ab8fda474"}, ] flake8-comprehensions = [ {file = "flake8-comprehensions-3.8.0.tar.gz", hash = "sha256:8e108707637b1d13734f38e03435984f6b7854fa6b5a4e34f93e69534be8e521"}, diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 2ed6ad754f..32e1e983a5 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -707,8 +707,8 @@ class RoomMemberWorkerStore(EventsWorkerStore): # 250 users is pretty arbitrary but the data can be quite large if users # are in many rooms. - for user_ids in batch_iter(user_ids, 250): - all_user_rooms.update(await self._get_rooms_for_users(user_ids)) + for batch_user_ids in batch_iter(user_ids, 250): + all_user_rooms.update(await self._get_rooms_for_users(batch_user_ids)) return all_user_rooms diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 6425f851ea..bcb1cba362 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -395,8 +395,8 @@ class DeferredCache(Generic[KT, VT]): # _pending_deferred_cache.pop should either return a CacheEntry, or, in the # case of a TreeCache, a dict of keys to cache entries. Either way calling # iterate_tree_cache_entry on it will do the right thing. - for entry in iterate_tree_cache_entry(entry): - for cb in entry.get_invalidation_callbacks(key): + for iter_entry in iterate_tree_cache_entry(entry): + for cb in iter_entry.get_invalidation_callbacks(key): cb() def invalidate_all(self) -> None: diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 0391966462..b3c748ef44 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -432,7 +432,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): num_args = cached_method.num_args if num_args != self.num_args: - raise Exception( + raise TypeError( "Number of args (%s) does not match underlying cache_method_name=%s (%s)." % (self.num_args, self.cached_method_name, num_args) ) diff --git a/tests/federation/transport/test_client.py b/tests/federation/transport/test_client.py index 0926e0583d..dd4d1b56de 100644 --- a/tests/federation/transport/test_client.py +++ b/tests/federation/transport/test_client.py @@ -17,6 +17,7 @@ from unittest.mock import Mock from synapse.api.room_versions import RoomVersions from synapse.federation.transport.client import SendJoinParser +from synapse.util import ExceptionBundle from tests.unittest import TestCase @@ -121,10 +122,8 @@ class SendJoinParserTestCase(TestCase): # Send half of the data to the parser parser.write(serialisation[: len(serialisation) // 2]) - # Close the parser. There should be _some_ kind of exception, but it need not - # be that RuntimeError directly. E.g. we might want to raise a wrapper - # encompassing multiple errors from multiple coroutines. - with self.assertRaises(Exception): + # Close the parser. There should be _some_ kind of exception. + with self.assertRaises(ExceptionBundle): parser.finish() # In any case, we should have tried to close both coros. diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 90861fe522..78fd7b6961 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -1037,5 +1037,5 @@ class CachedListDescriptorTestCase(unittest.TestCase): obj = Cls() # Make sure this raises an error about the arg mismatch - with self.assertRaises(Exception): + with self.assertRaises(TypeError): obj.list_fn([("foo", "bar")]) -- cgit 1.5.1 From c9dffd5b330553c5803784be5bc0e2479fab79b0 Mon Sep 17 00:00:00 2001 From: Nick Mills-Barrett Date: Tue, 25 Oct 2022 11:39:25 +0100 Subject: Remove unused `@lru_cache` decorator (#13595) * Remove unused `@lru_cache` decorator Spotted this working on something else. Co-authored-by: David Robertson --- changelog.d/13595.misc | 1 + synapse/util/caches/descriptors.py | 104 ---------------------------------- tests/util/caches/test_descriptors.py | 40 ++----------- 3 files changed, 5 insertions(+), 140 deletions(-) create mode 100644 changelog.d/13595.misc (limited to 'synapse/util/caches/descriptors.py') diff --git a/changelog.d/13595.misc b/changelog.d/13595.misc new file mode 100644 index 0000000000..71959a6ee7 --- /dev/null +++ b/changelog.d/13595.misc @@ -0,0 +1 @@ +Remove unused `@lru_cache` decorator. diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index b3c748ef44..75428d19ba 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import enum import functools import inspect import logging @@ -146,109 +145,6 @@ class _CacheDescriptorBase: ) -class _LruCachedFunction(Generic[F]): - cache: LruCache[CacheKey, Any] - __call__: F - - -def lru_cache( - *, max_entries: int = 1000, cache_context: bool = False -) -> Callable[[F], _LruCachedFunction[F]]: - """A method decorator that applies a memoizing cache around the function. - - This is more-or-less a drop-in equivalent to functools.lru_cache, although note - that the signature is slightly different. - - The main differences with functools.lru_cache are: - (a) the size of the cache can be controlled via the cache_factor mechanism - (b) the wrapped function can request a "cache_context" which provides a - callback mechanism to indicate that the result is no longer valid - (c) prometheus metrics are exposed automatically. - - The function should take zero or more arguments, which are used as the key for the - cache. Single-argument functions use that argument as the cache key; otherwise the - arguments are built into a tuple. - - Cached functions can be "chained" (i.e. a cached function can call other cached - functions and get appropriately invalidated when they called caches are - invalidated) by adding a special "cache_context" argument to the function - and passing that as a kwarg to all caches called. For example: - - @lru_cache(cache_context=True) - def foo(self, key, cache_context): - r1 = self.bar1(key, on_invalidate=cache_context.invalidate) - r2 = self.bar2(key, on_invalidate=cache_context.invalidate) - return r1 + r2 - - The wrapped function also has a 'cache' property which offers direct access to the - underlying LruCache. - """ - - def func(orig: F) -> _LruCachedFunction[F]: - desc = LruCacheDescriptor( - orig, - max_entries=max_entries, - cache_context=cache_context, - ) - return cast(_LruCachedFunction[F], desc) - - return func - - -class LruCacheDescriptor(_CacheDescriptorBase): - """Helper for @lru_cache""" - - class _Sentinel(enum.Enum): - sentinel = object() - - def __init__( - self, - orig: Callable[..., Any], - max_entries: int = 1000, - cache_context: bool = False, - ): - super().__init__( - orig, num_args=None, uncached_args=None, cache_context=cache_context - ) - self.max_entries = max_entries - - def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]: - cache: LruCache[CacheKey, Any] = LruCache( - cache_name=self.name, - max_size=self.max_entries, - ) - - get_cache_key = self.cache_key_builder - sentinel = LruCacheDescriptor._Sentinel.sentinel - - @functools.wraps(self.orig) - def _wrapped(*args: Any, **kwargs: Any) -> Any: - invalidate_callback = kwargs.pop("on_invalidate", None) - callbacks = (invalidate_callback,) if invalidate_callback else () - - cache_key = get_cache_key(args, kwargs) - - ret = cache.get(cache_key, default=sentinel, callbacks=callbacks) - if ret != sentinel: - return ret - - # Add our own `cache_context` to argument list if the wrapped function - # has asked for one - if self.add_cache_context: - kwargs["cache_context"] = _CacheContext.get_instance(cache, cache_key) - - ret2 = self.orig(obj, *args, **kwargs) - cache.set(cache_key, ret2, callbacks=callbacks) - - return ret2 - - wrapped = cast(CachedFunction, _wrapped) - wrapped.cache = cache - obj.__dict__[self.name] = wrapped - - return wrapped - - class DeferredCacheDescriptor(_CacheDescriptorBase): """A method decorator that applies a memoizing cache around the function. diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 78fd7b6961..43475a307f 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -28,7 +28,7 @@ from synapse.logging.context import ( make_deferred_yieldable, ) from synapse.util.caches import descriptors -from synapse.util.caches.descriptors import cached, cachedList, lru_cache +from synapse.util.caches.descriptors import cached, cachedList from tests import unittest from tests.test_utils import get_awaitable_result @@ -36,38 +36,6 @@ from tests.test_utils import get_awaitable_result logger = logging.getLogger(__name__) -class LruCacheDecoratorTestCase(unittest.TestCase): - def test_base(self): - class Cls: - def __init__(self): - self.mock = mock.Mock() - - @lru_cache() - def fn(self, arg1, arg2): - return self.mock(arg1, arg2) - - obj = Cls() - obj.mock.return_value = "fish" - r = obj.fn(1, 2) - self.assertEqual(r, "fish") - obj.mock.assert_called_once_with(1, 2) - obj.mock.reset_mock() - - # a call with different params should call the mock again - obj.mock.return_value = "chips" - r = obj.fn(1, 3) - self.assertEqual(r, "chips") - obj.mock.assert_called_once_with(1, 3) - obj.mock.reset_mock() - - # the two values should now be cached - r = obj.fn(1, 2) - self.assertEqual(r, "fish") - r = obj.fn(1, 3) - self.assertEqual(r, "chips") - obj.mock.assert_not_called() - - def run_on_reactor(): d = defer.Deferred() reactor.callLater(0, d.callback, 0) @@ -478,10 +446,10 @@ class DescriptorTestCase(unittest.TestCase): @cached(cache_context=True) async def func2(self, key, cache_context): - return self.func3(key, on_invalidate=cache_context.invalidate) + return await self.func3(key, on_invalidate=cache_context.invalidate) - @lru_cache(cache_context=True) - def func3(self, key, cache_context): + @cached(cache_context=True) + async def func3(self, key, cache_context): self.invalidate = cache_context.invalidate return 42 -- cgit 1.5.1 From 1799a54a545618782840a60950ef4b64da9ee24d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 22 Nov 2022 07:26:11 -0500 Subject: Batch fetch bundled annotations (#14491) Avoid an n+1 query problem and fetch the bundled aggregations for m.annotation relations in a single query instead of a query per event. This applies similar logic for as was previously done for edits in 8b309adb436c162510ed1402f33b8741d71fc058 (#11660) and threads in b65acead428653b988351ae8d7b22127a22039cd (#11752). --- changelog.d/14491.feature | 1 + synapse/handlers/relations.py | 197 ++++++++++++++++------------ synapse/storage/databases/main/relations.py | 139 ++++++++++++-------- synapse/util/caches/descriptors.py | 2 +- tests/rest/client/test_relations.py | 4 +- 5 files changed, 202 insertions(+), 141 deletions(-) create mode 100644 changelog.d/14491.feature (limited to 'synapse/util/caches/descriptors.py') diff --git a/changelog.d/14491.feature b/changelog.d/14491.feature new file mode 100644 index 0000000000..4fca7282f7 --- /dev/null +++ b/changelog.d/14491.feature @@ -0,0 +1 @@ +Reduce database load of [Client-Server endpoints](https://spec.matrix.org/v1.4/client-server-api/#aggregations) which return bundled aggregations. diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 8e71dda970..ca94239f61 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -13,7 +13,16 @@ # limitations under the License. import enum import logging -from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Tuple +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Tuple, +) import attr @@ -259,48 +268,64 @@ class RelationsHandler: e.msg, ) - async def get_annotations_for_event( - self, - event_id: str, - room_id: str, - limit: int = 5, - ignored_users: FrozenSet[str] = frozenset(), - ) -> List[JsonDict]: - """Get a list of annotations on the event, grouped by event type and + async def get_annotations_for_events( + self, event_ids: Collection[str], ignored_users: FrozenSet[str] = frozenset() + ) -> Dict[str, List[JsonDict]]: + """Get a list of annotations to the given events, grouped by event type and aggregation key, sorted by count. - This is used e.g. to get the what and how many reactions have happend + This is used e.g. to get the what and how many reactions have happened on an event. Args: - event_id: Fetch events that relate to this event ID. - room_id: The room the event belongs to. - limit: Only fetch the `limit` groups. + event_ids: Fetch events that relate to these event IDs. ignored_users: The users ignored by the requesting user. Returns: - List of groups of annotations that match. Each row is a dict with - `type`, `key` and `count` fields. + A map of event IDs to a list of groups of annotations that match. + Each entry is a dict with `type`, `key` and `count` fields. """ # Get the base results for all users. - full_results = await self._main_store.get_aggregation_groups_for_event( - event_id, room_id, limit + full_results = await self._main_store.get_aggregation_groups_for_events( + event_ids ) + # Avoid additional logic if there are no ignored users. + if not ignored_users: + return { + event_id: results + for event_id, results in full_results.items() + if results + } + # Then subtract off the results for any ignored users. ignored_results = await self._main_store.get_aggregation_groups_for_users( - event_id, room_id, limit, ignored_users + [event_id for event_id, results in full_results.items() if results], + ignored_users, ) - filtered_results = [] - for result in full_results: - key = (result["type"], result["key"]) - if key in ignored_results: - result = result.copy() - result["count"] -= ignored_results[key] - if result["count"] <= 0: - continue - filtered_results.append(result) + filtered_results = {} + for event_id, results in full_results.items(): + # If no annotations, skip. + if not results: + continue + + # If there are not ignored results for this event, copy verbatim. + if event_id not in ignored_results: + filtered_results[event_id] = results + continue + + # Otherwise, subtract out the ignored results. + event_ignored_results = ignored_results[event_id] + for result in results: + key = (result["type"], result["key"]) + if key in event_ignored_results: + # Ensure to not modify the cache. + result = result.copy() + result["count"] -= event_ignored_results[key] + if result["count"] <= 0: + continue + filtered_results.setdefault(event_id, []).append(result) return filtered_results @@ -366,59 +391,62 @@ class RelationsHandler: results = {} for event_id, summary in summaries.items(): - if summary: - thread_count, latest_thread_event = summary - - # Subtract off the count of any ignored users. - for ignored_user in ignored_users: - thread_count -= ignored_results.get((event_id, ignored_user), 0) - - # This is gnarly, but if the latest event is from an ignored user, - # attempt to find one that isn't from an ignored user. - if latest_thread_event.sender in ignored_users: - room_id = latest_thread_event.room_id - - # If the root event is not found, something went wrong, do - # not include a summary of the thread. - event = await self._event_handler.get_event(user, room_id, event_id) - if event is None: - continue + # If no thread, skip. + if not summary: + continue - potential_events, _ = await self.get_relations_for_event( - event_id, - event, - room_id, - RelationTypes.THREAD, - ignored_users, - ) + thread_count, latest_thread_event = summary - # If all found events are from ignored users, do not include - # a summary of the thread. - if not potential_events: - continue + # Subtract off the count of any ignored users. + for ignored_user in ignored_users: + thread_count -= ignored_results.get((event_id, ignored_user), 0) - # The *last* event returned is the one that is cared about. - event = await self._event_handler.get_event( - user, room_id, potential_events[-1].event_id - ) - # It is unexpected that the event will not exist. - if event is None: - logger.warning( - "Unable to fetch latest event in a thread with event ID: %s", - potential_events[-1].event_id, - ) - continue - latest_thread_event = event - - results[event_id] = _ThreadAggregation( - latest_event=latest_thread_event, - count=thread_count, - # If there's a thread summary it must also exist in the - # participated dictionary. - current_user_participated=events_by_id[event_id].sender == user_id - or participated[event_id], + # This is gnarly, but if the latest event is from an ignored user, + # attempt to find one that isn't from an ignored user. + if latest_thread_event.sender in ignored_users: + room_id = latest_thread_event.room_id + + # If the root event is not found, something went wrong, do + # not include a summary of the thread. + event = await self._event_handler.get_event(user, room_id, event_id) + if event is None: + continue + + potential_events, _ = await self.get_relations_for_event( + event_id, + event, + room_id, + RelationTypes.THREAD, + ignored_users, ) + # If all found events are from ignored users, do not include + # a summary of the thread. + if not potential_events: + continue + + # The *last* event returned is the one that is cared about. + event = await self._event_handler.get_event( + user, room_id, potential_events[-1].event_id + ) + # It is unexpected that the event will not exist. + if event is None: + logger.warning( + "Unable to fetch latest event in a thread with event ID: %s", + potential_events[-1].event_id, + ) + continue + latest_thread_event = event + + results[event_id] = _ThreadAggregation( + latest_event=latest_thread_event, + count=thread_count, + # If there's a thread summary it must also exist in the + # participated dictionary. + current_user_participated=events_by_id[event_id].sender == user_id + or participated[event_id], + ) + return results @trace @@ -496,17 +524,18 @@ class RelationsHandler: # (as that is what makes it part of the thread). relations_by_id[latest_thread_event.event_id] = RelationTypes.THREAD - # Fetch other relations per event. - for event in events_by_id.values(): - # Fetch any annotations (ie, reactions) to bundle with this event. - annotations = await self.get_annotations_for_event( - event.event_id, event.room_id, ignored_users=ignored_users - ) + # Fetch any annotations (ie, reactions) to bundle with this event. + annotations_by_event_id = await self.get_annotations_for_events( + events_by_id.keys(), ignored_users=ignored_users + ) + for event_id, annotations in annotations_by_event_id.items(): if annotations: - results.setdefault( - event.event_id, BundledAggregations() - ).annotations = {"chunk": annotations} + results.setdefault(event_id, BundledAggregations()).annotations = { + "chunk": annotations + } + # Fetch other relations per event. + for event in events_by_id.values(): # Fetch any references to bundle with this event. references, next_token = await self.get_relations_for_event( event.event_id, diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index ca431002c8..f96a16956a 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -20,6 +20,7 @@ from typing import ( FrozenSet, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -394,106 +395,136 @@ class RelationsWorkerStore(SQLBaseStore): ) return result is not None - @cached(tree=True) - async def get_aggregation_groups_for_event( - self, event_id: str, room_id: str, limit: int = 5 - ) -> List[JsonDict]: - """Get a list of annotations on the event, grouped by event type and + @cached() + async def get_aggregation_groups_for_event(self, event_id: str) -> List[JsonDict]: + raise NotImplementedError() + + @cachedList( + cached_method_name="get_aggregation_groups_for_event", list_name="event_ids" + ) + async def get_aggregation_groups_for_events( + self, event_ids: Collection[str] + ) -> Mapping[str, Optional[List[JsonDict]]]: + """Get a list of annotations on the given events, grouped by event type and aggregation key, sorted by count. This is used e.g. to get the what and how many reactions have happend on an event. Args: - event_id: Fetch events that relate to this event ID. - room_id: The room the event belongs to. - limit: Only fetch the `limit` groups. + event_ids: Fetch events that relate to these event IDs. Returns: - List of groups of annotations that match. Each row is a dict with - `type`, `key` and `count` fields. + A map of event IDs to a list of groups of annotations that match. + Each entry is a dict with `type`, `key` and `count` fields. """ + # The number of entries to return per event ID. + limit = 5 - args = [ - event_id, - room_id, - RelationTypes.ANNOTATION, - limit, - ] + clause, args = make_in_list_sql_clause( + self.database_engine, "relates_to_id", event_ids + ) + args.append(RelationTypes.ANNOTATION) - sql = """ - SELECT type, aggregation_key, COUNT(DISTINCT sender) - FROM event_relations - INNER JOIN events USING (event_id) - WHERE relates_to_id = ? AND room_id = ? AND relation_type = ? - GROUP BY relation_type, type, aggregation_key - ORDER BY COUNT(*) DESC - LIMIT ? + sql = f""" + SELECT + relates_to_id, + annotation.type, + aggregation_key, + COUNT(DISTINCT annotation.sender) + FROM events AS annotation + INNER JOIN event_relations USING (event_id) + INNER JOIN events AS parent ON + parent.event_id = relates_to_id + AND parent.room_id = annotation.room_id + WHERE + {clause} + AND relation_type = ? + GROUP BY relates_to_id, annotation.type, aggregation_key + ORDER BY relates_to_id, COUNT(*) DESC """ - def _get_aggregation_groups_for_event_txn( + def _get_aggregation_groups_for_events_txn( txn: LoggingTransaction, - ) -> List[JsonDict]: + ) -> Mapping[str, List[JsonDict]]: txn.execute(sql, args) - return [{"type": row[0], "key": row[1], "count": row[2]} for row in txn] + result: Dict[str, List[JsonDict]] = {} + for event_id, type, key, count in cast( + List[Tuple[str, str, str, int]], txn + ): + event_results = result.setdefault(event_id, []) + + # Limit the number of results per event ID. + if len(event_results) == limit: + continue + + event_results.append({"type": type, "key": key, "count": count}) + + return result return await self.db_pool.runInteraction( - "get_aggregation_groups_for_event", _get_aggregation_groups_for_event_txn + "get_aggregation_groups_for_events", _get_aggregation_groups_for_events_txn ) async def get_aggregation_groups_for_users( - self, - event_id: str, - room_id: str, - limit: int, - users: FrozenSet[str] = frozenset(), - ) -> Dict[Tuple[str, str], int]: + self, event_ids: Collection[str], users: FrozenSet[str] + ) -> Dict[str, Dict[Tuple[str, str], int]]: """Fetch the partial aggregations for an event for specific users. This is used, in conjunction with get_aggregation_groups_for_event, to remove information from the results for ignored users. Args: - event_id: Fetch events that relate to this event ID. - room_id: The room the event belongs to. - limit: Only fetch the `limit` groups. + event_ids: Fetch events that relate to these event IDs. users: The users to fetch information for. Returns: - A map of (event type, aggregation key) to a count of users. + A map of event ID to a map of (event type, aggregation key) to a + count of users. """ if not users: return {} - args: List[Union[str, int]] = [ - event_id, - room_id, - RelationTypes.ANNOTATION, - ] + events_sql, args = make_in_list_sql_clause( + self.database_engine, "relates_to_id", event_ids + ) users_sql, users_args = make_in_list_sql_clause( - self.database_engine, "sender", users + self.database_engine, "annotation.sender", users ) args.extend(users_args) + args.append(RelationTypes.ANNOTATION) sql = f""" - SELECT type, aggregation_key, COUNT(DISTINCT sender) - FROM event_relations - INNER JOIN events USING (event_id) - WHERE relates_to_id = ? AND room_id = ? AND relation_type = ? AND {users_sql} - GROUP BY relation_type, type, aggregation_key - ORDER BY COUNT(*) DESC - LIMIT ? + SELECT + relates_to_id, + annotation.type, + aggregation_key, + COUNT(DISTINCT annotation.sender) + FROM events AS annotation + INNER JOIN event_relations USING (event_id) + INNER JOIN events AS parent ON + parent.event_id = relates_to_id + AND parent.room_id = annotation.room_id + WHERE {events_sql} AND {users_sql} AND relation_type = ? + GROUP BY relates_to_id, annotation.type, aggregation_key + ORDER BY relates_to_id, COUNT(*) DESC """ def _get_aggregation_groups_for_users_txn( txn: LoggingTransaction, - ) -> Dict[Tuple[str, str], int]: - txn.execute(sql, args + [limit]) + ) -> Dict[str, Dict[Tuple[str, str], int]]: + txn.execute(sql, args) - return {(row[0], row[1]): row[2] for row in txn} + result: Dict[str, Dict[Tuple[str, str], int]] = {} + for event_id, type, key, count in cast( + List[Tuple[str, str, str, int]], txn + ): + result.setdefault(event_id, {})[(type, key)] = count + + return result return await self.db_pool.runInteraction( "get_aggregation_groups_for_users", _get_aggregation_groups_for_users_txn diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 75428d19ba..72227359b9 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -503,7 +503,7 @@ def cachedList( is specified as a list that is iterated through to lookup keys in the original cache. A new tuple consisting of the (deduplicated) keys that weren't in the cache gets passed to the original function, which is expected to results - in a map of key to value for each passed value. THe new results are stored in the + in a map of key to value for each passed value. The new results are stored in the original cache. Note that any missing values are cached as None. Args: diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index e3d801f7a8..2d2b683548 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -1108,7 +1108,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase): # The "user" sent the root event and is making queries for the bundled # aggregations: they have participated. - self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 9) + self._test_bundled_aggregations(RelationTypes.THREAD, _gen_assert(True), 8) # The "user2" sent replies in the thread and is making queries for the # bundled aggregations: they have participated. # @@ -1170,7 +1170,7 @@ class BundledAggregationsTestCase(BaseRelationsTestCase): bundled_aggregations["latest_event"].get("unsigned"), ) - self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 9) + self._test_bundled_aggregations(RelationTypes.THREAD, assert_thread, 8) def test_nested_thread(self) -> None: """ -- cgit 1.5.1