summary refs log tree commit diff
path: root/synapse/storage/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r--synapse/storage/_base.py65
1 files changed, 53 insertions, 12 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 3ea7382760..40f2fc6d76 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -20,6 +20,7 @@ from synapse.events.utils import prune_event
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
 from synapse.util.lrucache import LruCache
+import synapse.metrics
 
 from twisted.internet import defer
 
@@ -35,9 +36,22 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
 transaction_logger = logging.getLogger("synapse.storage.txn")
 
 
+metrics = synapse.metrics.get_metrics_for("synapse.storage")
+
+sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
+sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
+sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"])
+
+caches_by_name = {}
+cache_counter = metrics.register_cache(
+    "cache",
+    lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
+    labels=["name"],
+)
+
+
 # TODO(paul):
 #  * more generic key management
-#  * export monitoring stats
 #  * consider other eviction strategies - LRU?
 def cached(max_entries=1000):
     """ A method decorator that applies a memoizing cache around the function.
@@ -55,6 +69,9 @@ def cached(max_entries=1000):
     """
     def wrap(orig):
         cache = OrderedDict()
+        name = orig.__name__
+
+        caches_by_name[name] = cache
 
         def prefill(key, value):
             while len(cache) > max_entries:
@@ -65,8 +82,10 @@ def cached(max_entries=1000):
         @defer.inlineCallbacks
         def wrapped(self, key):
             if key in cache:
+                cache_counter.inc_hits(name)
                 defer.returnValue(cache[key])
 
+            cache_counter.inc_misses(name)
             ret = yield orig(self, key)
             prefill(key, ret)
             defer.returnValue(ret)
@@ -83,7 +102,8 @@ def cached(max_entries=1000):
 
 class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
-    passed to the constructor. Adds logging to the .execute() method."""
+    passed to the constructor. Adds logging and metrics to the .execute()
+    method."""
     __slots__ = ["txn", "name"]
 
     def __init__(self, txn, name):
@@ -99,6 +119,7 @@ class LoggingTransaction(object):
     def execute(self, sql, *args, **kwargs):
         # TODO(paul): Maybe use 'info' and 'debug' for values?
         sql_logger.debug("[SQL] {%s} %s", self.name, sql)
+
         try:
             if args and args[0]:
                 values = args[0]
@@ -120,8 +141,9 @@ class LoggingTransaction(object):
                 logger.exception("[SQL FAIL] {%s}", self.name)
                 raise
         finally:
-            end = time.time() * 1000
-            sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
+            msecs = (time.time() * 1000) - start
+            sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
+            sql_query_timer.inc_by(msecs, sql.split()[0])
 
 
 class PerformanceCounters(object):
@@ -172,11 +194,18 @@ class SQLBaseStore(object):
         self._previous_txn_total_time = 0
         self._current_txn_total_time = 0
         self._previous_loop_ts = 0
+
+        # TODO(paul): These can eventually be removed once the metrics code
+        #   is running in mainline, and we have some nice monitoring frontends
+        #   to watch it
         self._txn_perf_counters = PerformanceCounters()
         self._get_event_counters = PerformanceCounters()
 
         self._get_event_cache = LruCache(hs.config.event_cache_size)
 
+        # Pretend the getEventCache is just another named cache
+        caches_by_name["*getEvent*"] = self._get_event_cache
+
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
 
@@ -231,13 +260,13 @@ class SQLBaseStore(object):
                     raise
                 finally:
                     end = time.time() * 1000
-                    transaction_logger.debug(
-                        "[TXN END] {%s} %f",
-                        name, end - start
-                    )
+                    duration = end - start
+
+                    transaction_logger.debug("[TXN END] {%s} %f", name, duration)
 
-                    self._current_txn_total_time += end - start
+                    self._current_txn_total_time += duration
                     self._txn_perf_counters.update(desc, start, end)
+                    sql_txn_timer.inc_by(duration, desc)
 
         with PreserveLoggingContext():
             result = yield self._db_pool.runInteraction(
@@ -638,14 +667,22 @@ class SQLBaseStore(object):
                        get_prev_content=False, allow_rejected=False):
 
         start_time = time.time() * 1000
-        update_counter = self._get_event_counters.update
+
+        def update_counter(desc, last_time):
+            curr_time = self._get_event_counters.update(desc, last_time)
+            sql_getevents_timer.inc_by(curr_time - last_time, desc)
+            return curr_time
 
         cache = self._get_event_cache.setdefault(event_id, {})
 
         try:
             # Separate cache entries for each way to invoke _get_event_txn
-            return cache[(check_redacted, get_prev_content, allow_rejected)]
+            ret = cache[(check_redacted, get_prev_content, allow_rejected)]
+
+            cache_counter.inc_hits("*getEvent*")
+            return ret
         except KeyError:
+            cache_counter.inc_misses("*getEvent*")
             pass
         finally:
             start_time = update_counter("event_cache", start_time)
@@ -685,7 +722,11 @@ class SQLBaseStore(object):
                                 check_redacted=True, get_prev_content=False):
 
         start_time = time.time() * 1000
-        update_counter = self._get_event_counters.update
+
+        def update_counter(desc, last_time):
+            curr_time = self._get_event_counters.update(desc, last_time)
+            sql_getevents_timer.inc_by(curr_time - last_time, desc)
+            return curr_time
 
         d = json.loads(js)
         start_time = update_counter("decode_json", start_time)