summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/__init__.py13
-rw-r--r--synapse/storage/_base.py80
-rw-r--r--synapse/storage/appservice.py2
-rw-r--r--synapse/storage/event_federation.py12
-rw-r--r--synapse/storage/feedback.py2
-rw-r--r--synapse/storage/keys.py4
-rw-r--r--synapse/storage/push_rule.py29
-rw-r--r--synapse/storage/pusher.py5
-rw-r--r--synapse/storage/registration.py10
-rw-r--r--synapse/storage/room.py2
-rw-r--r--synapse/storage/state.py2
11 files changed, 106 insertions, 55 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a3ff995695..4b16f445d6 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -450,7 +450,7 @@ class DataStore(RoomMemberStore, RoomStore,
         else:
             args = (room_id, )
 
-        results = yield self._execute_and_decode(sql, *args)
+        results = yield self._execute_and_decode("get_current_state", sql, *args)
 
         events = yield self._parse_events(results)
         defer.returnValue(events)
@@ -475,7 +475,7 @@ class DataStore(RoomMemberStore, RoomStore,
         sql += " OR s.type = 'm.room.aliases')"
         args = (room_id,)
 
-        results = yield self._execute_and_decode(sql, *args)
+        results = yield self._execute_and_decode("get_current_state", sql, *args)
 
         events = yield self._parse_events(results)
 
@@ -484,17 +484,18 @@ class DataStore(RoomMemberStore, RoomStore,
 
         for e in events:
             if e.type == 'm.room.name':
-                name = e.content['name']
+                if 'name' in e.content:
+                    name = e.content['name']
             elif e.type == 'm.room.aliases':
-                aliases.extend(e.content['aliases'])
+                if 'aliases' in e.content:
+                    aliases.extend(e.content['aliases'])
 
         defer.returnValue((name, aliases))
 
     @defer.inlineCallbacks
     def _get_min_token(self):
         row = yield self._execute(
-            None,
-            "SELECT MIN(stream_ordering) FROM events"
+            "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events"
         )
 
         self.min_token = row[0][0] if row and row[0] and row[0][0] else -1
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 3725c9795d..9125bb1198 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -20,10 +20,12 @@ from synapse.events.utils import prune_event
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
 from synapse.util.lrucache import LruCache
+import synapse.metrics
 
 from twisted.internet import defer
 
 from collections import namedtuple, OrderedDict
+import functools
 import simplejson as json
 import sys
 import time
@@ -35,9 +37,24 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
 transaction_logger = logging.getLogger("synapse.storage.txn")
 
 
+metrics = synapse.metrics.get_metrics_for("synapse.storage")
+
+sql_scheduling_timer = metrics.register_distribution("schedule_time")
+
+sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
+sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
+sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"])
+
+caches_by_name = {}
+cache_counter = metrics.register_cache(
+    "cache",
+    lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
+    labels=["name"],
+)
+
+
 # TODO(paul):
 #  * more generic key management
-#  * export monitoring stats
 #  * consider other eviction strategies - LRU?
 def cached(max_entries=1000):
     """ A method decorator that applies a memoizing cache around the function.
@@ -55,6 +72,9 @@ def cached(max_entries=1000):
     """
     def wrap(orig):
         cache = OrderedDict()
+        name = orig.__name__
+
+        caches_by_name[name] = cache
 
         def prefill(key, value):
             while len(cache) > max_entries:
@@ -62,11 +82,14 @@ def cached(max_entries=1000):
 
             cache[key] = value
 
+        @functools.wraps(orig)
         @defer.inlineCallbacks
         def wrapped(self, key):
             if key in cache:
+                cache_counter.inc_hits(name)
                 defer.returnValue(cache[key])
 
+            cache_counter.inc_misses(name)
             ret = yield orig(self, key)
             prefill(key, ret)
             defer.returnValue(ret)
@@ -83,7 +106,8 @@ def cached(max_entries=1000):
 
 class LoggingTransaction(object):
     """An object that almost-transparently proxies for the 'txn' object
-    passed to the constructor. Adds logging to the .execute() method."""
+    passed to the constructor. Adds logging and metrics to the .execute()
+    method."""
     __slots__ = ["txn", "name"]
 
     def __init__(self, txn, name):
@@ -99,6 +123,7 @@ class LoggingTransaction(object):
     def execute(self, sql, *args, **kwargs):
         # TODO(paul): Maybe use 'info' and 'debug' for values?
         sql_logger.debug("[SQL] {%s} %s", self.name, sql)
+
         try:
             if args and args[0]:
                 values = args[0]
@@ -120,8 +145,9 @@ class LoggingTransaction(object):
                 logger.exception("[SQL FAIL] {%s}", self.name)
                 raise
         finally:
-            end = time.time() * 1000
-            sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
+            msecs = (time.time() * 1000) - start
+            sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
+            sql_query_timer.inc_by(msecs, sql.split()[0])
 
 
 class PerformanceCounters(object):
@@ -172,11 +198,18 @@ class SQLBaseStore(object):
         self._previous_txn_total_time = 0
         self._current_txn_total_time = 0
         self._previous_loop_ts = 0
+
+        # TODO(paul): These can eventually be removed once the metrics code
+        #   is running in mainline, and we have some nice monitoring frontends
+        #   to watch it
         self._txn_perf_counters = PerformanceCounters()
         self._get_event_counters = PerformanceCounters()
 
         self._get_event_cache = LruCache(hs.config.event_cache_size)
 
+        # Pretend the getEventCache is just another named cache
+        caches_by_name["*getEvent*"] = self._get_event_cache
+
     def start_profiling(self):
         self._previous_loop_ts = self._clock.time_msec()
 
@@ -211,6 +244,8 @@ class SQLBaseStore(object):
         """Wraps the .runInteraction() method on the underlying db_pool."""
         current_context = LoggingContext.current_context()
 
+        start_time = time.time() * 1000
+
         def inner_func(txn, *args, **kwargs):
             with LoggingContext("runInteraction") as context:
                 current_context.copy_to(context)
@@ -223,6 +258,7 @@ class SQLBaseStore(object):
 
                 name = "%s-%x" % (desc, txn_id, )
 
+                sql_scheduling_timer.inc_by(time.time() * 1000 - start_time)
                 transaction_logger.debug("[TXN START] {%s}", name)
                 try:
                     return func(LoggingTransaction(txn, name), *args, **kwargs)
@@ -231,13 +267,13 @@ class SQLBaseStore(object):
                     raise
                 finally:
                     end = time.time() * 1000
-                    transaction_logger.debug(
-                        "[TXN END] {%s} %f",
-                        name, end - start
-                    )
+                    duration = end - start
+
+                    transaction_logger.debug("[TXN END] {%s} %f", name, duration)
 
-                    self._current_txn_total_time += end - start
+                    self._current_txn_total_time += duration
                     self._txn_perf_counters.update(desc, start, end)
+                    sql_txn_timer.inc_by(duration, desc)
 
         with PreserveLoggingContext():
             result = yield self._db_pool.runInteraction(
@@ -259,7 +295,7 @@ class SQLBaseStore(object):
         )
         return results
 
-    def _execute(self, decoder, query, *args):
+    def _execute(self, desc, decoder, query, *args):
         """Runs a single query for a result set.
 
         Args:
@@ -277,10 +313,10 @@ class SQLBaseStore(object):
             else:
                 return cursor.fetchall()
 
-        return self.runInteraction("_execute", interaction)
+        return self.runInteraction(desc, interaction)
 
-    def _execute_and_decode(self, query, *args):
-        return self._execute(self.cursor_to_dict, query, *args)
+    def _execute_and_decode(self, desc, query, *args):
+        return self._execute(desc, self.cursor_to_dict, query, *args)
 
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
@@ -638,14 +674,22 @@ class SQLBaseStore(object):
                        get_prev_content=False, allow_rejected=False):
 
         start_time = time.time() * 1000
-        update_counter = self._get_event_counters.update
+
+        def update_counter(desc, last_time):
+            curr_time = self._get_event_counters.update(desc, last_time)
+            sql_getevents_timer.inc_by(curr_time - last_time, desc)
+            return curr_time
 
         cache = self._get_event_cache.setdefault(event_id, {})
 
         try:
             # Separate cache entries for each way to invoke _get_event_txn
-            return cache[(check_redacted, get_prev_content, allow_rejected)]
+            ret = cache[(check_redacted, get_prev_content, allow_rejected)]
+
+            cache_counter.inc_hits("*getEvent*")
+            return ret
         except KeyError:
+            cache_counter.inc_misses("*getEvent*")
             pass
         finally:
             start_time = update_counter("event_cache", start_time)
@@ -685,7 +729,11 @@ class SQLBaseStore(object):
                                 check_redacted=True, get_prev_content=False):
 
         start_time = time.time() * 1000
-        update_counter = self._get_event_counters.update
+
+        def update_counter(desc, last_time):
+            curr_time = self._get_event_counters.update(desc, last_time)
+            sql_getevents_timer.inc_by(curr_time - last_time, desc)
+            return curr_time
 
         d = json.loads(js)
         start_time = update_counter("decode_json", start_time)
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index e30265750a..850676ce6c 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -296,7 +296,7 @@ class ApplicationServiceStore(SQLBaseStore):
         #   }
         # ]
         services = {}
-        results = yield self._execute_and_decode(sql)
+        results = yield self._execute_and_decode("_populate_cache", sql)
         for res in results:
             as_token = res["token"]
             if as_token not in services:
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 2deda8ac50..032334bfd6 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -429,3 +429,15 @@ class EventFederationStore(SQLBaseStore):
         )
 
         return events[:limit]
+
+    def clean_room_for_join(self, room_id):
+        return self.runInteraction(
+            "clean_room_for_join",
+            self._clean_room_for_join_txn,
+            room_id,
+        )
+
+    def _clean_room_for_join_txn(self, txn, room_id):
+        query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
+
+        txn.execute(query, (room_id,))
diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py
index fcf011b234..8eab769b71 100644
--- a/synapse/storage/feedback.py
+++ b/synapse/storage/feedback.py
@@ -37,7 +37,7 @@ class FeedbackStore(SQLBaseStore):
             "WHERE feedback.target_event_id = ? "
         )
 
-        rows = yield self._execute_and_decode(sql, event_id)
+        rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id)
 
         defer.returnValue(
             [
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 1f244019fc..09d1e63657 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -85,7 +85,9 @@ class KeyStore(SQLBaseStore):
             " AND key_id in (" + ",".join("?" for key_id in key_ids) + ")"
         )
 
-        rows = yield self._execute_and_decode(sql, server_name, *key_ids)
+        rows = yield self._execute_and_decode(
+            "get_server_verify_keys", sql, server_name, *key_ids
+        )
 
         keys = []
         for row in rows:
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index bbf322cc84..d769db2c78 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -34,7 +34,7 @@ class PushRuleStore(SQLBaseStore):
             "WHERE user_name = ? "
             "ORDER BY priority_class DESC, priority DESC"
         )
-        rows = yield self._execute(None, sql, user_name)
+        rows = yield self._execute("get_push_rules_for_user", None, sql, user_name)
 
         dicts = []
         for r in rows:
@@ -57,17 +57,6 @@ class PushRuleStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def get_push_rule_enabled_by_user_rule_id(self, user_name, rule_id):
-        results = yield self._simple_select_list(
-            PushRuleEnableTable.table_name,
-            {'user_name': user_name, 'rule_id': rule_id},
-            ['enabled']
-        )
-        if not results:
-            defer.returnValue(True)
-        defer.returnValue(results[0])
-
-    @defer.inlineCallbacks
     def add_push_rule(self, before, after, **kwargs):
         vals = copy.copy(kwargs)
         if 'conditions' in vals:
@@ -217,17 +206,11 @@ class PushRuleStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def set_push_rule_enabled(self, user_name, rule_id, enabled):
-        if enabled:
-            yield self._simple_delete_one(
-                PushRuleEnableTable.table_name,
-                {'user_name': user_name, 'rule_id': rule_id}
-            )
-        else:
-            yield self._simple_upsert(
-                PushRuleEnableTable.table_name,
-                {'user_name': user_name, 'rule_id': rule_id},
-                {'enabled': False}
-            )
+        yield self._simple_upsert(
+            PushRuleEnableTable.table_name,
+            {'user_name': user_name, 'rule_id': rule_id},
+            {'enabled': enabled}
+        )
 
 
 class RuleNotFoundException(Exception):
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 6622b4d18a..587dada68f 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -37,7 +37,8 @@ class PusherStore(SQLBaseStore):
         )
 
         rows = yield self._execute(
-            None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1]
+            "get_pushers_by_app_id_and_pushkey", None, sql,
+            app_id_and_pushkey[0], app_id_and_pushkey[1]
         )
 
         ret = [
@@ -70,7 +71,7 @@ class PusherStore(SQLBaseStore):
             "FROM pushers"
         )
 
-        rows = yield self._execute(None, sql)
+        rows = yield self._execute("get_all_pushers", None, sql)
 
         ret = [
             {
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 029b07cc66..3c2f1d6a15 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -19,7 +19,7 @@ from sqlite3 import IntegrityError
 
 from synapse.api.errors import StoreError, Codes
 
-from ._base import SQLBaseStore
+from ._base import SQLBaseStore, cached
 
 
 class RegistrationStore(SQLBaseStore):
@@ -88,10 +88,14 @@ class RegistrationStore(SQLBaseStore):
         query = ("SELECT users.name, users.password_hash FROM users"
                  " WHERE users.name = ?")
         return self._execute(
-            self.cursor_to_dict,
-            query, user_id
+            "get_user_by_id", self.cursor_to_dict, query, user_id
         )
 
+    @cached()
+    # TODO(paul): Currently there's no code to invalidate this cache. That
+    #   means if/when we ever add internal ways to invalidate access tokens or
+    #   change whether a user is a server admin, those will need to invoke
+    #      store.get_user_by_token.invalidate(token)
     def get_user_by_token(self, token):
         """Get a user from the given access token.
 
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 750b17a45f..549c9af393 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -68,7 +68,7 @@ class RoomStore(SQLBaseStore):
         """
         query = RoomsTable.select_statement("room_id=?")
         return self._execute(
-            RoomsTable.decode_single_result, query, room_id,
+            "get_room", RoomsTable.decode_single_result, query, room_id,
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 71db16d0e5..456e4bd45d 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -82,7 +82,7 @@ class StateStore(SQLBaseStore):
         if context.current_state is None:
             return
 
-        state_events = context.current_state
+        state_events = dict(context.current_state)
 
         if event.is_state():
             state_events[(event.type, event.state_key)] = event