diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 6b0f8c2787..efb90c3c91 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -47,10 +47,13 @@ class ReceiptsStore(SQLBaseStore):
# Returns an ObservableDeferred
res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
- if res and res.called and user_id in res.result:
- # We'd only be adding to the set, so no point invalidating if the
- # user is already there
- return
+ if res:
+ if isinstance(res, defer.Deferred) and res.called:
+ res = res.result
+ if user_id in res:
+ # We'd only be adding to the set, so no point invalidating if the
+ # user is already there
+ return
self.get_users_with_read_receipts_in_room.invalidate((room_id,))
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 9d0d0be1f9..807e147657 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -19,7 +19,7 @@ from synapse.util import unwrapFirstError, logcontext
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
-from . import DEBUG_CACHES, register_cache
+from . import register_cache
from twisted.internet import defer
from collections import namedtuple
@@ -76,7 +76,7 @@ class Cache(object):
self.cache = LruCache(
max_size=max_entries, keylen=keylen, cache_type=cache_type,
- size_callback=(lambda d: len(d.result)) if iterable else None,
+ size_callback=(lambda d: len(d)) if iterable else None,
)
self.name = name
@@ -96,6 +96,17 @@ class Cache(object):
)
def get(self, key, default=_CacheSentinel, callback=None):
+ """Looks the key up in the caches.
+
+ Args:
+ key(tuple)
+ default: What is returned if key is not in the caches. If not
+ specified then function throws KeyError instead
+ callback(fn): Gets called when the entry in the cache is invalidated
+
+ Returns:
+ Either a Deferred or the raw result
+ """
callbacks = [callback] if callback else []
val = self._pending_deferred_cache.get(key, _CacheSentinel)
if val is not _CacheSentinel:
@@ -137,7 +148,7 @@ class Cache(object):
if self.sequence == entry.sequence:
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry is entry:
- self.cache.set(key, entry.deferred, entry.callbacks)
+ self.cache.set(key, result, entry.callbacks)
else:
entry.invalidate()
else:
@@ -335,20 +346,10 @@ class CacheDescriptor(_CacheDescriptorBase):
try:
cached_result_d = cache.get(cache_key, callback=invalidate_callback)
- observer = cached_result_d.observe()
- if DEBUG_CACHES:
- @defer.inlineCallbacks
- def check_result(cached_result):
- actual_result = yield self.function_to_call(obj, *args, **kwargs)
- if actual_result != cached_result:
- logger.error(
- "Stale cache entry %s%r: cached: %r, actual %r",
- self.orig.__name__, cache_key,
- cached_result, actual_result,
- )
- raise ValueError("Stale cache entry")
- defer.returnValue(cached_result)
- observer.addCallback(check_result)
+ if isinstance(cached_result_d, ObservableDeferred):
+ observer = cached_result_d.observe()
+ else:
+ observer = cached_result_d
except KeyError:
ret = defer.maybeDeferred(
@@ -447,7 +448,9 @@ class CacheListDescriptor(_CacheDescriptorBase):
try:
res = cache.get(tuple(key), callback=invalidate_callback)
- if not res.has_succeeded():
+ if not isinstance(res, ObservableDeferred):
+ results[arg] = res
+ elif not res.has_succeeded():
res = res.observe()
res.addCallback(lambda r, arg: (arg, r), arg)
cached_defers[arg] = res
|