summary refs log tree commit diff
path: root/synapse/storage/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r--synapse/storage/_base.py49
1 files changed, 34 insertions, 15 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py

index f1265541ba..7b45db4e68 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py
@@ -18,6 +18,7 @@ from synapse.api.errors import StoreError from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_context_over_fn, LoggingContext from synapse.util.lrucache import LruCache +from synapse.util.dictionary_cache import DictionaryCache import synapse.metrics from util.id_generators import IdGenerator, StreamIdGenerator @@ -58,7 +59,7 @@ cache_counter = metrics.register_cache( class Cache(object): - def __init__(self, name, max_entries=1000, keylen=1, lru=False): + def __init__(self, name, max_entries=1000, keylen=1, lru=True): if lru: self.cache = LruCache(max_size=max_entries) self.max_entries = None @@ -72,6 +73,11 @@ class Cache(object): self.thread = None caches_by_name[name] = self.cache + class Sentinel(object): + __slots__ = [] + + self.sentinel = Sentinel() + def check_thread(self): expected_thread = self.thread if expected_thread is None: @@ -83,22 +89,33 @@ class Cache(object): ) def get(self, *keyargs): - if len(keyargs) != self.keylen: - raise ValueError("Expected a key to have %d items", self.keylen) + try: + if len(keyargs) != self.keylen: + raise ValueError("Expected a key to have %d items", self.keylen) - if keyargs in self.cache: - cache_counter.inc_hits(self.name) - return self.cache[keyargs] + val = self.cache.get(keyargs, self.sentinel) + if val is not self.sentinel: + cache_counter.inc_hits(self.name) + return val - cache_counter.inc_misses(self.name) - raise KeyError() + cache_counter.inc_misses(self.name) + raise KeyError() + except KeyError: + raise + except: + logger.exception("Cache.get failed for %s" % (self.name,)) + raise def update(self, sequence, *args): - self.check_thread() - if self.sequence == sequence: - # Only update the cache if the caches sequence number matches the - # number that the cache had before the SELECT was started (SYN-369) - self.prefill(*args) + try: + self.check_thread() + if self.sequence == sequence: + # Only update the cache if the caches sequence number matches the + # number that the cache had before the SELECT was started (SYN-369) + self.prefill(*args) + except: + logger.exception("Cache.update failed for %s" % (self.name,)) + raise def prefill(self, *args): # because I can't *keyargs, value keyargs = args[:-1] @@ -142,7 +159,7 @@ class CacheDescriptor(object): which can be used to insert values into the cache specifically, without calling the calculation function. """ - def __init__(self, orig, max_entries=1000, num_args=1, lru=False, + def __init__(self, orig, max_entries=1000, num_args=1, lru=True, inlineCallbacks=False): self.orig = orig @@ -210,7 +227,7 @@ class CacheDescriptor(object): return wrapped -def cached(max_entries=1000, num_args=1, lru=False): +def cached(max_entries=1000, num_args=1, lru=True): return lambda orig: CacheDescriptor( orig, max_entries=max_entries, @@ -349,6 +366,8 @@ class SQLBaseStore(object): self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True, max_entries=hs.config.event_cache_size) + self._state_group_cache = DictionaryCache("*stateGroupCache*", 100000) + self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0