diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index f1265541ba..7b45db4e68 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,6 +18,7 @@ from synapse.api.errors import StoreError
from synapse.util.logutils import log_function
from synapse.util.logcontext import preserve_context_over_fn, LoggingContext
from synapse.util.lrucache import LruCache
+from synapse.util.dictionary_cache import DictionaryCache
import synapse.metrics
from util.id_generators import IdGenerator, StreamIdGenerator
@@ -58,7 +59,7 @@ cache_counter = metrics.register_cache(
class Cache(object):
- def __init__(self, name, max_entries=1000, keylen=1, lru=False):
+ def __init__(self, name, max_entries=1000, keylen=1, lru=True):
if lru:
self.cache = LruCache(max_size=max_entries)
self.max_entries = None
@@ -72,6 +73,11 @@ class Cache(object):
self.thread = None
caches_by_name[name] = self.cache
+ class Sentinel(object):
+ __slots__ = []
+
+ self.sentinel = Sentinel()
+
def check_thread(self):
expected_thread = self.thread
if expected_thread is None:
@@ -83,22 +89,33 @@ class Cache(object):
)
def get(self, *keyargs):
- if len(keyargs) != self.keylen:
- raise ValueError("Expected a key to have %d items", self.keylen)
+ try:
+ if len(keyargs) != self.keylen:
+ raise ValueError("Expected a key to have %d items", self.keylen)
- if keyargs in self.cache:
- cache_counter.inc_hits(self.name)
- return self.cache[keyargs]
+ val = self.cache.get(keyargs, self.sentinel)
+ if val is not self.sentinel:
+ cache_counter.inc_hits(self.name)
+ return val
- cache_counter.inc_misses(self.name)
- raise KeyError()
+ cache_counter.inc_misses(self.name)
+ raise KeyError()
+ except KeyError:
+ raise
+ except:
+ logger.exception("Cache.get failed for %s" % (self.name,))
+ raise
def update(self, sequence, *args):
- self.check_thread()
- if self.sequence == sequence:
- # Only update the cache if the caches sequence number matches the
- # number that the cache had before the SELECT was started (SYN-369)
- self.prefill(*args)
+ try:
+ self.check_thread()
+ if self.sequence == sequence:
+ # Only update the cache if the caches sequence number matches the
+ # number that the cache had before the SELECT was started (SYN-369)
+ self.prefill(*args)
+ except:
+ logger.exception("Cache.update failed for %s" % (self.name,))
+ raise
def prefill(self, *args): # because I can't *keyargs, value
keyargs = args[:-1]
@@ -142,7 +159,7 @@ class CacheDescriptor(object):
which can be used to insert values into the cache specifically, without
calling the calculation function.
"""
- def __init__(self, orig, max_entries=1000, num_args=1, lru=False,
+ def __init__(self, orig, max_entries=1000, num_args=1, lru=True,
inlineCallbacks=False):
self.orig = orig
@@ -210,7 +227,7 @@ class CacheDescriptor(object):
return wrapped
-def cached(max_entries=1000, num_args=1, lru=False):
+def cached(max_entries=1000, num_args=1, lru=True):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
@@ -349,6 +366,8 @@ class SQLBaseStore(object):
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
max_entries=hs.config.event_cache_size)
+ self._state_group_cache = DictionaryCache("*stateGroupCache*", 100000)
+
self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
self._event_fetch_ongoing = 0
|