summary refs log tree commit diff
path: root/synapse/storage/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r--synapse/storage/_base.py253
1 files changed, 231 insertions, 22 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ce63f12008..3725c9795d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -19,11 +19,12 @@ from synapse.events import FrozenEvent
 from synapse.events.utils import prune_event
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
+from synapse.util.lrucache import LruCache
 
 from twisted.internet import defer
 
-import collections
-import json
+from collections import namedtuple, OrderedDict
+import simplejson as json
 import sys
 import time
 
@@ -34,6 +35,52 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
 transaction_logger = logging.getLogger("synapse.storage.txn")
 
 
+# TODO(paul):
+#  * more generic key management
+#  * export monitoring stats
+#  * consider other eviction strategies - LRU?
+def cached(max_entries=1000):
+    """ A method decorator that applies a memoizing cache around the function.
+
+    The function is presumed to take one additional argument, which is used as
+    the key for the cache. Cache hits are served directly from the cache;
+    misses use the function body to generate the value.
+
+    The wrapped function has an additional member, a callable called
+    "invalidate". This can be used to remove individual entries from the cache.
+
+    The wrapped function has another additional callable, called "prefill",
+    which can be used to insert values into the cache specifically, without
+    calling the calculation function.
+    """
+    def wrap(orig):
+        cache = OrderedDict()
+
+        def prefill(key, value):
+            while len(cache) > max_entries:
+                cache.popitem(last=False)
+
+            cache[key] = value
+
+        @defer.inlineCallbacks
+        def wrapped(self, key):
+            if key in cache:
+                defer.returnValue(cache[key])
+
+            ret = yield orig(self, key)
+            prefill(key, ret)
+            defer.returnValue(ret)
+
+        def invalidate(key):
+            cache.pop(key, None)
+
+        wrapped.invalidate = invalidate
+        wrapped.prefill = prefill
+        return wrapped
+
+    return wrap
+
+
 class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
     passed to the constructor. Adds logging to the .execute() method."""
@@ -77,6 +124,43 @@ class LoggingTransaction(object):
             sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
 
 
+class PerformanceCounters(object):
+    def __init__(self):
+        self.current_counters = {}
+        self.previous_counters = {}
+
+    def update(self, key, start_time, end_time=None):
+        if end_time is None:
+            end_time = time.time() * 1000
+        duration = end_time - start_time
+        count, cum_time = self.current_counters.get(key, (0, 0))
+        count += 1
+        cum_time += duration
+        self.current_counters[key] = (count, cum_time)
+        return end_time
+
+    def interval(self, interval_duration, limit=3):
+        counters = []
+        for name, (count, cum_time) in self.current_counters.items():
+            prev_count, prev_time = self.previous_counters.get(name, (0, 0))
+            counters.append((
+                (cum_time - prev_time) / interval_duration,
+                count - prev_count,
+                name
+            ))
+
+        self.previous_counters = dict(self.current_counters)
+
+        counters.sort(reverse=True)
+
+        top_n_counters = ", ".join(
+            "%s(%d): %.3f%%" % (name, count, 100 * ratio)
+            for ratio, count, name in counters[:limit]
+        )
+
+        return top_n_counters
+
+
 class SQLBaseStore(object):
     _TXN_ID = 0
 
@@ -85,6 +169,43 @@ class SQLBaseStore(object):
         self._db_pool = hs.get_db_pool()
         self._clock = hs.get_clock()
 
+        self._previous_txn_total_time = 0
+        self._current_txn_total_time = 0
+        self._previous_loop_ts = 0
+        self._txn_perf_counters = PerformanceCounters()
+        self._get_event_counters = PerformanceCounters()
+
+        self._get_event_cache = LruCache(hs.config.event_cache_size)
+
+    def start_profiling(self):
+        self._previous_loop_ts = self._clock.time_msec()
+
+        def loop():
+            curr = self._current_txn_total_time
+            prev = self._previous_txn_total_time
+            self._previous_txn_total_time = curr
+
+            time_now = self._clock.time_msec()
+            time_then = self._previous_loop_ts
+            self._previous_loop_ts = time_now
+
+            ratio = (curr - prev)/(time_now - time_then)
+
+            top_three_counters = self._txn_perf_counters.interval(
+                time_now - time_then, limit=3
+            )
+
+            top_3_event_counters = self._get_event_counters.interval(
+                time_now - time_then, limit=3
+            )
+
+            logger.info(
+                "Total database time: %.3f%% {%s} {%s}",
+                ratio * 100, top_three_counters, top_3_event_counters
+            )
+
+        self._clock.looping_call(loop, 10000)
+
     @defer.inlineCallbacks
     def runInteraction(self, desc, func, *args, **kwargs):
         """Wraps the .runInteraction() method on the underlying db_pool."""
@@ -94,8 +215,7 @@ class SQLBaseStore(object):
             with LoggingContext("runInteraction") as context:
                 current_context.copy_to(context)
                 start = time.time() * 1000
-                txn_id = SQLBaseStore._TXN_ID
-                SQLBaseStore._TXN_ID += 1
+                txn_id = self._TXN_ID
 
                 # We don't really need these to be unique, so lets stop it from
                 # growing really large.
@@ -115,6 +235,10 @@ class SQLBaseStore(object):
                         "[TXN END] {%s} %f",
                         name, end - start
                     )
+
+                    self._current_txn_total_time += end - start
+                    self._txn_perf_counters.update(desc, start, end)
+
         with PreserveLoggingContext():
             result = yield self._db_pool.runInteraction(
                 inner_func, *args, **kwargs
@@ -194,6 +318,50 @@ class SQLBaseStore(object):
         txn.execute(sql, values.values())
         return txn.lastrowid
 
+    def _simple_upsert(self, table, keyvalues, values):
+        """
+        Args:
+            table (str): The table to upsert into
+            keyvalues (dict): The unique key tables and their new values
+            values (dict): The nonunique columns and their new values
+        Returns: A deferred
+        """
+        return self.runInteraction(
+            "_simple_upsert",
+            self._simple_upsert_txn, table, keyvalues, values
+        )
+
+    def _simple_upsert_txn(self, txn, table, keyvalues, values):
+        # Try to update
+        sql = "UPDATE %s SET %s WHERE %s" % (
+            table,
+            ", ".join("%s = ?" % (k,) for k in values),
+            " AND ".join("%s = ?" % (k,) for k in keyvalues)
+        )
+        sqlargs = values.values() + keyvalues.values()
+        logger.debug(
+            "[SQL] %s Args=%s",
+            sql, sqlargs,
+        )
+
+        txn.execute(sql, sqlargs)
+        if txn.rowcount == 0:
+            # We didn't update and rows so insert a new one
+            allvalues = {}
+            allvalues.update(keyvalues)
+            allvalues.update(values)
+
+            sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+                table,
+                ", ".join(k for k in allvalues),
+                ", ".join("?" for _ in allvalues)
+            )
+            logger.debug(
+                "[SQL] %s Args=%s",
+                sql, keyvalues.values(),
+            )
+            txn.execute(sql, allvalues.values())
+
     def _simple_select_one(self, table, keyvalues, retcols,
                            allow_none=False):
         """Executes a SELECT query on the named table, which is expected to
@@ -282,7 +450,8 @@ class SQLBaseStore(object):
 
         Args:
             table : string giving the table name
-            keyvalues : dict of column names and values to select the rows with
+            keyvalues : dict of column names and values to select the rows with,
+            or None to not apply a WHERE clause.
             retcols : list of strings giving the names of the columns to return
         """
         return self.runInteraction(
@@ -301,13 +470,20 @@ class SQLBaseStore(object):
             keyvalues : dict of column names and values to select the rows with
             retcols : list of strings giving the names of the columns to return
         """
-        sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
-            ", ".join(retcols),
-            table,
-            " AND ".join("%s = ?" % (k, ) for k in keyvalues)
-        )
+        if keyvalues:
+            sql = "SELECT %s FROM %s WHERE %s ORDER BY rowid asc" % (
+                ", ".join(retcols),
+                table,
+                " AND ".join("%s = ?" % (k, ) for k in keyvalues)
+            )
+            txn.execute(sql, keyvalues.values())
+        else:
+            sql = "SELECT %s FROM %s ORDER BY rowid asc" % (
+                ", ".join(retcols),
+                table
+            )
+            txn.execute(sql)
 
-        txn.execute(sql, keyvalues.values())
         return self.cursor_to_dict(txn)
 
     def _simple_update_one(self, table, keyvalues, updatevalues,
@@ -345,8 +521,8 @@ class SQLBaseStore(object):
         if updatevalues:
             update_sql = "UPDATE %s SET %s WHERE %s" % (
                 table,
-                ", ".join("%s = ?" % (k) for k in updatevalues),
-                " AND ".join("%s = ?" % (k) for k in keyvalues)
+                ", ".join("%s = ?" % (k,) for k in updatevalues),
+                " AND ".join("%s = ?" % (k,) for k in keyvalues)
             )
 
         def func(txn):
@@ -459,10 +635,26 @@ class SQLBaseStore(object):
         return [e for e in events if e]
 
     def _get_event_txn(self, txn, event_id, check_redacted=True,
-                       get_prev_content=False):
+                       get_prev_content=False, allow_rejected=False):
+
+        start_time = time.time() * 1000
+        update_counter = self._get_event_counters.update
+
+        cache = self._get_event_cache.setdefault(event_id, {})
+
+        try:
+            # Separate cache entries for each way to invoke _get_event_txn
+            return cache[(check_redacted, get_prev_content, allow_rejected)]
+        except KeyError:
+            pass
+        finally:
+            start_time = update_counter("event_cache", start_time)
+
         sql = (
-            "SELECT internal_metadata, json, r.event_id FROM event_json as e "
+            "SELECT e.internal_metadata, e.json, r.event_id, rej.reason "
+            "FROM event_json as e "
             "LEFT JOIN redactions as r ON e.event_id = r.redacts "
+            "LEFT JOIN rejections as rej on rej.event_id = e.event_id  "
             "WHERE e.event_id = ? "
             "LIMIT 1 "
         )
@@ -474,20 +666,35 @@ class SQLBaseStore(object):
         if not res:
             return None
 
-        internal_metadata, js, redacted = res
+        internal_metadata, js, redacted, rejected_reason = res
 
-        return self._get_event_from_row_txn(
-            txn, internal_metadata, js, redacted,
-            check_redacted=check_redacted,
-            get_prev_content=get_prev_content,
-        )
+        start_time = update_counter("select_event", start_time)
+
+        if allow_rejected or not rejected_reason:
+            result = self._get_event_from_row_txn(
+                txn, internal_metadata, js, redacted,
+                check_redacted=check_redacted,
+                get_prev_content=get_prev_content,
+            )
+            cache[(check_redacted, get_prev_content, allow_rejected)] = result
+            return result
+        else:
+            return None
 
     def _get_event_from_row_txn(self, txn, internal_metadata, js, redacted,
                                 check_redacted=True, get_prev_content=False):
+
+        start_time = time.time() * 1000
+        update_counter = self._get_event_counters.update
+
         d = json.loads(js)
+        start_time = update_counter("decode_json", start_time)
+
         internal_metadata = json.loads(internal_metadata)
+        start_time = update_counter("decode_internal", start_time)
 
         ev = FrozenEvent(d, internal_metadata_dict=internal_metadata)
+        start_time = update_counter("build_frozen_event", start_time)
 
         if check_redacted and redacted:
             ev = prune_event(ev)
@@ -503,6 +710,7 @@ class SQLBaseStore(object):
 
             if because:
                 ev.unsigned["redacted_because"] = because
+            start_time = update_counter("redact_event", start_time)
 
         if get_prev_content and "replaces_state" in ev.unsigned:
             prev = self._get_event_txn(
@@ -512,6 +720,7 @@ class SQLBaseStore(object):
             )
             if prev:
                 ev.unsigned["prev_content"] = prev.get_dict()["content"]
+            start_time = update_counter("get_prev_content", start_time)
 
         return ev
 
@@ -632,7 +841,7 @@ class JoinHelper(object):
         for table in self.tables:
             res += [f for f in table.fields if f not in res]
 
-        self.EntryType = collections.namedtuple("JoinHelperEntry", res)
+        self.EntryType = namedtuple("JoinHelperEntry", res)
 
     def get_fields(self, **prefixes):
         """Get a string representing a list of fields for use in SELECT