summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/storage/receipts.py11
-rw-r--r--synapse/util/caches/__init__.py2
-rw-r--r--synapse/util/caches/descriptors.py39
3 files changed, 28 insertions, 24 deletions
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 6b0f8c2787..efb90c3c91 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -47,10 +47,13 @@ class ReceiptsStore(SQLBaseStore):
         # Returns an ObservableDeferred
         res = self.get_users_with_read_receipts_in_room.cache.get((room_id,), None)
 
-        if res and res.called and user_id in res.result:
-            # We'd only be adding to the set, so no point invalidating if the
-            # user is already there
-            return
+        if res:
+            if isinstance(res, defer.Deferred) and res.called:
+                res = res.result
+            if user_id in res:
+                # We'd only be adding to the set, so no point invalidating if the
+                # user is already there
+                return
 
         self.get_users_with_read_receipts_in_room.invalidate((room_id,))
 
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 9fd35a8134..4a83c46d98 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -18,8 +18,6 @@ import os
 
 CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
 
-DEBUG_CACHES = False
-
 metrics = synapse.metrics.get_metrics_for("synapse.util.caches")
 
 caches_by_name = {}
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 9d0d0be1f9..807e147657 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -19,7 +19,7 @@ from synapse.util import unwrapFirstError, logcontext
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
 
-from . import DEBUG_CACHES, register_cache
+from . import register_cache
 
 from twisted.internet import defer
 from collections import namedtuple
@@ -76,7 +76,7 @@ class Cache(object):
 
         self.cache = LruCache(
             max_size=max_entries, keylen=keylen, cache_type=cache_type,
-            size_callback=(lambda d: len(d.result)) if iterable else None,
+            size_callback=(lambda d: len(d)) if iterable else None,
         )
 
         self.name = name
@@ -96,6 +96,17 @@ class Cache(object):
                 )
 
     def get(self, key, default=_CacheSentinel, callback=None):
+        """Looks the key up in the caches.
+
+        Args:
+            key(tuple)
+            default: What is returned if key is not in the caches. If not
+                specified then function throws KeyError instead
+            callback(fn): Gets called when the entry in the cache is invalidated
+
+        Returns:
+            Either a Deferred or the raw result
+        """
         callbacks = [callback] if callback else []
         val = self._pending_deferred_cache.get(key, _CacheSentinel)
         if val is not _CacheSentinel:
@@ -137,7 +148,7 @@ class Cache(object):
             if self.sequence == entry.sequence:
                 existing_entry = self._pending_deferred_cache.pop(key, None)
                 if existing_entry is entry:
-                    self.cache.set(key, entry.deferred, entry.callbacks)
+                    self.cache.set(key, result, entry.callbacks)
                 else:
                     entry.invalidate()
             else:
@@ -335,20 +346,10 @@ class CacheDescriptor(_CacheDescriptorBase):
             try:
                 cached_result_d = cache.get(cache_key, callback=invalidate_callback)
 
-                observer = cached_result_d.observe()
-                if DEBUG_CACHES:
-                    @defer.inlineCallbacks
-                    def check_result(cached_result):
-                        actual_result = yield self.function_to_call(obj, *args, **kwargs)
-                        if actual_result != cached_result:
-                            logger.error(
-                                "Stale cache entry %s%r: cached: %r, actual %r",
-                                self.orig.__name__, cache_key,
-                                cached_result, actual_result,
-                            )
-                            raise ValueError("Stale cache entry")
-                        defer.returnValue(cached_result)
-                    observer.addCallback(check_result)
+                if isinstance(cached_result_d, ObservableDeferred):
+                    observer = cached_result_d.observe()
+                else:
+                    observer = cached_result_d
 
             except KeyError:
                 ret = defer.maybeDeferred(
@@ -447,7 +448,9 @@ class CacheListDescriptor(_CacheDescriptorBase):
 
                 try:
                     res = cache.get(tuple(key), callback=invalidate_callback)
-                    if not res.has_succeeded():
+                    if not isinstance(res, ObservableDeferred):
+                        results[arg] = res
+                    elif not res.has_succeeded():
                         res = res.observe()
                         res.addCallback(lambda r, arg: (arg, r), arg)
                         cached_defers[arg] = res