summary refs log tree commit diff
path: root/synapse/storage/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r--synapse/storage/_base.py179
1 files changed, 167 insertions, 12 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index f660fc6eaf..be9934c66f 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -19,11 +19,12 @@ from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
+from synapse.util.lrucache import LruCache
 
 from twisted.internet import defer
 
 import collections
-import json
+import simplejson as json
 import sys
 import time
 
@@ -77,6 +78,43 @@ class LoggingTransaction(object):
             sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
 
 
+class PerformanceCounters(object):
+    def __init__(self):
+        self.current_counters = {}
+        self.previous_counters = {}
+
+    def update(self, key, start_time, end_time=None):
+        if end_time is None:
+            end_time = time.time() * 1000
+        duration = end_time - start_time
+        count, cum_time = self.current_counters.get(key, (0, 0))
+        count += 1
+        cum_time += duration
+        self.current_counters[key] = (count, cum_time)
+        return end_time
+
+    def interval(self, interval_duration, limit=3):
+        counters = []
+        for name, (count, cum_time) in self.current_counters.items():
+            prev_count, prev_time = self.previous_counters.get(name, (0, 0))
+            counters.append((
+                (cum_time - prev_time) / interval_duration,
+                count - prev_count,
+                name
+            ))
+
+        self.previous_counters = dict(self.current_counters)
+
+        counters.sort(reverse=True)
+
+        top_n_counters = ", ".join(
+            "%s(%d): %.3f%%" % (name, count, 100 * ratio)
+            for ratio, count, name in counters[:limit]
+        )
+
+        return top_n_counters
+
+
 class SQLBaseStore(object):
     _TXN_ID = 0
 
@@ -85,6 +123,43 @@ class SQLBaseStore(object):
         self._db_pool = hs.get_db_pool()
         self._clock = hs.get_clock()
 
+        self._previous_txn_total_time = 0
+        self._current_txn_total_time = 0
+        self._previous_loop_ts = 0
+        self._txn_perf_counters = PerformanceCounters()
+        self._get_event_counters = PerformanceCounters()
+
+        self._get_event_cache = LruCache(hs.config.event_cache_size)
+
+    def start_profiling(self):
+        self._previous_loop_ts = self._clock.time_msec()
+
+        def loop():
+            curr = self._current_txn_total_time
+            prev = self._previous_txn_total_time
+            self._previous_txn_total_time = curr
+
+            time_now = self._clock.time_msec()
+            time_then = self._previous_loop_ts
+            self._previous_loop_ts = time_now
+
+            ratio = (curr - prev)/(time_now - time_then)
+
+            top_three_counters = self._txn_perf_counters.interval(
+                time_now - time_then, limit=3
+            )
+
+            top_3_event_counters = self._get_event_counters.interval(
+                time_now - time_then, limit=3
+            )
+
+            logger.info(
+                "Total database time: %.3f%% {%s} {%s}",
+                ratio * 100, top_three_counters, top_3_event_counters
+            )
+
+        self._clock.looping_call(loop, 10000)
+
     @defer.inlineCallbacks
     def runInteraction(self, desc, func, *args, **kwargs):
         """Wraps the .runInteraction() method on the underlying db_pool."""
@@ -94,7 +169,7 @@ class SQLBaseStore(object):
             with LoggingContext("runInteraction") as context:
                 current_context.copy_to(context)
                 start = time.time() * 1000
-                txn_id = SQLBaseStore._TXN_ID
+                txn_id = self._TXN_ID
 
                 # We don't really need these to be unique, so lets stop it from
                 # growing really large.
@@ -114,6 +189,10 @@ class SQLBaseStore(object):
                         "[TXN END] {%s} %f",
                         name, end - start
                     )
+
+                    self._current_txn_total_time += end - start
+                    self._txn_perf_counters.update(desc, start, end)
+
         with PreserveLoggingContext():
             result = yield self._db_pool.runInteraction(
                 inner_func, *args, **kwargs
@@ -193,6 +272,50 @@ class SQLBaseStore(object):
         txn.execute(sql, values.values())
         return txn.lastrowid
 
+    def _simple_upsert(self, table, keyvalues, values):
+        """
+        Args:
+            table (str): The table to upsert into
+            keyvalues (dict): The unique key tables and their new values
+            values (dict): The nonunique columns and their new values
+        Returns: A deferred
+        """
+        return self.runInteraction(
+            "_simple_upsert",
+            self._simple_upsert_txn, table, keyvalues, values
+        )
+
+    def _simple_upsert_txn(self, txn, table, keyvalues, values):
+        # Try to update
+        sql = "UPDATE %s SET %s WHERE %s" % (
+            table,
+            ", ".join("%s = ?" % (k,) for k in values),
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
+        )
+        sqlargs = values.values() + keyvalues.values()
+        logger.debug(
+            "[SQL] %s Args=%s",
+            sql, sqlargs,
+        )
+
+        txn.execute(sql, sqlargs)
+        if txn.rowcount == 0:
+            # We didn't update and rows so insert a new one
+            allvalues = {}
+            allvalues.update(keyvalues)
+            allvalues.update(values)
+
+            sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+                table,
+                ", ".join(k for k in allvalues),
+                ", ".join("?" for _ in allvalues)
+            )
+            logger.debug(
+                "[SQL] %s Args=%s",
+                sql, keyvalues.values(),
+            )
+            txn.execute(sql, allvalues.values())
+
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False):
         """Executes a SELECT query on the named table, which is expected to
@@ -344,8 +467,8 @@ class SQLBaseStore(object):
         if updatevalues:
             update_sql = "UPDATE %s SET %s WHERE %s" % (
                 table,
-                ", ".join("%s = ?" % (k) for k in updatevalues),
-                " AND ".join("%s = ?" % (k) for k in keyvalues)
+                ", ".join("%s = ?" % (k,) for k in updatevalues),
+                " AND ".join("%s = ?" % (k,) for k in keyvalues)
             )
 
         def func(txn):
@@ -458,10 +581,25 @@ class SQLBaseStore(object):
         return [e for e in events if e]
 
     def _get_event_txn(self, txn, event_id, check_redacted=True,
-                       get_prev_content=False):
+                       get_prev_content=False, allow_rejected=False):
+
+        start_time = time.time() * 1000
+        update_counter = self._get_event_counters.update
+
+        try:
+            cache = self._get_event_cache.setdefault(event_id, {})
+            # Separate cache entries for each way to invoke _get_event_txn
+            return cache[(check_redacted, get_prev_content, allow_rejected)]
+        except KeyError:
+            pass
+        finally:
+            start_time = update_counter("event_cache", start_time)
+
         sql = (
-            "SELECT internal_metadata, json, r.event_id FROM event_json as e "
+            "SELECT e.internal_metadata, e.json, r.event_id, rej.reason "
+            "FROM event_json as e "
             "LEFT JOIN redactions as r ON e.event_id = r.redacts "
+            "LEFT JOIN rejections as rej on rej.event_id = e.event_id  "
             "WHERE e.event_id = ? "
             "LIMIT 1 "
         )
@@ -473,20 +611,35 @@ class SQLBaseStore(object):
         if not res:
             return None
 
-        internal_metadata, js, redacted = res
+        internal_metadata, js, redacted, rejected_reason = res
 
-        return self._get_event_from_row_txn(
-            txn, internal_metadata, js, redacted,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-        )
+        start_time = update_counter("select_event", start_time)
+
+        if allow_rejected or not rejected_reason:
+            result = self._get_event_from_row_txn(
+                txn, internal_metadata, js, redacted,
+                check_redacted=check_redacted,
+                get_prev_content=get_prev_content,
+            )
+            cache[(check_redacted, get_prev_content, allow_rejected)] = result
+            return result
+        else:
+            return None
 
     def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
                                 check_redacted=True, get_prev_content=False):
+
+        start_time = time.time() * 1000
+        update_counter = self._get_event_counters.update
+
         d = json.loads(js)
+        start_time = update_counter("decode_json", start_time)
+
         internal_metadata = json.loads(internal_metadata)
+        start_time = update_counter("decode_internal", start_time)
 
         ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
+        start_time = update_counter("build_frozen_event", start_time)
 
         if check_redacted and redacted:
             ev = prune_event(ev)
@@ -502,6 +655,7 @@ class SQLBaseStore(object):
 
             if because:
                 ev.unsigned["redacted_because"] = because
+            start_time = update_counter("redact_event", start_time)
 
         if get_prev_content and "replaces_state" in ev.unsigned:
             prev = self._get_event_txn(
@@ -511,6 +665,7 @@ class SQLBaseStore(object):
             )
             if prev:
                 ev.unsigned["prev_content"] = prev.get_dict()["content"]
+            start_time = update_counter("get_prev_content", start_time)
 
         return ev