summary refs log tree commit diff
path: root/synapse/storage/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r--synapse/storage/_base.py70
1 files changed, 42 insertions, 28 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 27ea65a0f6..e3e67d8e0d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -35,6 +35,7 @@ logger = logging.getLogger(__name__)
 
 sql_logger = logging.getLogger("synapse.storage.SQL")
 transaction_logger = logging.getLogger("synapse.storage.txn")
+perf_logger = logging.getLogger("synapse.storage.TIME")
 
 
 metrics = synapse.metrics.get_metrics_for("synapse.storage")
@@ -55,10 +56,14 @@ cache_counter = metrics.register_cache(
 
 class Cache(object):
 
-    def __init__(self, name, max_entries=1000, keylen=1):
-        self.cache = OrderedDict()
+    def __init__(self, name, max_entries=1000, keylen=1, lru=False):
+        if lru:
+            self.cache = LruCache(max_size=max_entries)
+            self.max_entries = None
+        else:
+            self.cache = OrderedDict()
+            self.max_entries = max_entries
 
-        self.max_entries = max_entries
         self.name = name
         self.keylen = keylen
 
@@ -82,8 +87,9 @@ class Cache(object):
         if len(keyargs) != self.keylen:
             raise ValueError("Expected a key to have %d items", self.keylen)
 
-        while len(self.cache) > self.max_entries:
-            self.cache.popitem(last=False)
+        if self.max_entries is not None:
+            while len(self.cache) >= self.max_entries:
+                self.cache.popitem(last=False)
 
         self.cache[keyargs] = value
 
@@ -94,9 +100,7 @@ class Cache(object):
         self.cache.pop(keyargs, None)
 
 
-# TODO(paul):
-#  * consider other eviction strategies - LRU?
-def cached(max_entries=1000, num_args=1):
+def cached(max_entries=1000, num_args=1, lru=False):
     """ A method decorator that applies a memoizing cache around the function.
 
     The function is presumed to take zero or more arguments, which are used in
@@ -115,6 +119,7 @@ def cached(max_entries=1000, num_args=1):
             name=orig.__name__,
             max_entries=max_entries,
             keylen=num_args,
+            lru=lru,
         )
 
         @functools.wraps(orig)
@@ -237,10 +242,8 @@ class SQLBaseStore(object):
         self._txn_perf_counters = PerformanceCounters()
         self._get_event_counters = PerformanceCounters()
 
-        self._get_event_cache = LruCache(hs.config.event_cache_size)
-
-        # Pretend the getEventCache is just another named cache
-        caches_by_name["*getEvent*"] = self._get_event_cache
+        self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
+                                      max_entries=hs.config.event_cache_size)
 
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
@@ -264,7 +267,7 @@ class SQLBaseStore(object):
                 time_now - time_then, limit=3
             )
 
-            logger.info(
+            perf_logger.info(
                 "Total database time: %.3f%% {%s} {%s}",
                 ratio * 100, top_three_counters, top_3_event_counters
             )
@@ -728,6 +731,12 @@ class SQLBaseStore(object):
 
         return [e for e in events if e]
 
+    def _invalidate_get_event_cache(self, event_id):
+        for check_redacted in (False, True):
+            for get_prev_content in (False, True):
+                self._get_event_cache.invalidate(event_id, check_redacted,
+                                                 get_prev_content)
+
     def _get_event_txn(self, txn, event_id, check_redacted=True,
                        get_prev_content=False, allow_rejected=False):
 
@@ -738,16 +747,14 @@ class SQLBaseStore(object):
             sql_getevents_timer.inc_by(curr_time - last_time, desc)
             return curr_time
 
-        cache = self._get_event_cache.setdefault(event_id, {})
-
         try:
-            # Separate cache entries for each way to invoke _get_event_txn
-            ret = cache[(check_redacted, get_prev_content, allow_rejected)]
+            ret = self._get_event_cache.get(event_id, check_redacted, get_prev_content)
 
-            cache_counter.inc_hits("*getEvent*")
-            return ret
+            if allow_rejected or not ret.rejected_reason:
+                return ret
+            else:
+                return None
         except KeyError:
-            cache_counter.inc_misses("*getEvent*")
             pass
         finally:
             start_time = update_counter("event_cache", start_time)
@@ -772,19 +779,22 @@ class SQLBaseStore(object):
 
         start_time = update_counter("select_event", start_time)
 
+        result = self._get_event_from_row_txn(
+            txn, internal_metadata, js, redacted,
+            check_redacted=check_redacted,
+            get_prev_content=get_prev_content,
+            rejected_reason=rejected_reason,
+        )
+        self._get_event_cache.prefill(event_id, check_redacted, get_prev_content, result)
+
         if allow_rejected or not rejected_reason:
-            result = self._get_event_from_row_txn(
-                txn, internal_metadata, js, redacted,
-                check_redacted=check_redacted,
-                get_prev_content=get_prev_content,
-            )
-            cache[(check_redacted, get_prev_content, allow_rejected)] = result
             return result
         else:
             return None
 
     def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
-                                check_redacted=True, get_prev_content=False):
+                                check_redacted=True, get_prev_content=False,
+                                rejected_reason=None):
 
         start_time = time.time() * 1000
 
@@ -799,7 +809,11 @@ class SQLBaseStore(object):
         internal_metadata = json.loads(internal_metadata)
         start_time = update_counter("decode_internal", start_time)
 
-        ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
+        ev = FrozenEvent(
+            d,
+            internal_metadata_dict=internal_metadata,
+            rejected_reason=rejected_reason,
+        )
         start_time = update_counter("build_frozen_event", start_time)
 
         if check_redacted and redacted: