diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 13 | ||||
-rw-r--r-- | synapse/storage/_base.py | 80 | ||||
-rw-r--r-- | synapse/storage/appservice.py | 2 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 12 | ||||
-rw-r--r-- | synapse/storage/feedback.py | 2 | ||||
-rw-r--r-- | synapse/storage/keys.py | 4 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 29 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 5 | ||||
-rw-r--r-- | synapse/storage/registration.py | 10 | ||||
-rw-r--r-- | synapse/storage/room.py | 2 | ||||
-rw-r--r-- | synapse/storage/state.py | 2 |
11 files changed, 106 insertions, 55 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a3ff995695..4b16f445d6 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -450,7 +450,7 @@ class DataStore(RoomMemberStore, RoomStore, else: args = (room_id, ) - results = yield self._execute_and_decode(sql, *args) + results = yield self._execute_and_decode("get_current_state", sql, *args) events = yield self._parse_events(results) defer.returnValue(events) @@ -475,7 +475,7 @@ class DataStore(RoomMemberStore, RoomStore, sql += " OR s.type = 'm.room.aliases')" args = (room_id,) - results = yield self._execute_and_decode(sql, *args) + results = yield self._execute_and_decode("get_current_state", sql, *args) events = yield self._parse_events(results) @@ -484,17 +484,18 @@ class DataStore(RoomMemberStore, RoomStore, for e in events: if e.type == 'm.room.name': - name = e.content['name'] + if 'name' in e.content: + name = e.content['name'] elif e.type == 'm.room.aliases': - aliases.extend(e.content['aliases']) + if 'aliases' in e.content: + aliases.extend(e.content['aliases']) defer.returnValue((name, aliases)) @defer.inlineCallbacks def _get_min_token(self): row = yield self._execute( - None, - "SELECT MIN(stream_ordering) FROM events" + "_get_min_token", None, "SELECT MIN(stream_ordering) FROM events" ) self.min_token = row[0][0] if row and row[0] and row[0][0] else -1 diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 3725c9795d..9125bb1198 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -20,10 +20,12 @@ from synapse.events.utils import prune_event from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from synapse.util.lrucache import LruCache +import synapse.metrics from twisted.internet import defer from collections import namedtuple, OrderedDict +import functools import simplejson as json import sys import time @@ -35,9 +37,24 @@ sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") +metrics = synapse.metrics.get_metrics_for("synapse.storage") + +sql_scheduling_timer = metrics.register_distribution("schedule_time") + +sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) +sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) +sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"]) + +caches_by_name = {} +cache_counter = metrics.register_cache( + "cache", + lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, + labels=["name"], +) + + # TODO(paul): # * more generic key management -# * export monitoring stats # * consider other eviction strategies - LRU? def cached(max_entries=1000): """ A method decorator that applies a memoizing cache around the function. @@ -55,6 +72,9 @@ def cached(max_entries=1000): """ def wrap(orig): cache = OrderedDict() + name = orig.__name__ + + caches_by_name[name] = cache def prefill(key, value): while len(cache) > max_entries: @@ -62,11 +82,14 @@ def cached(max_entries=1000): cache[key] = value + @functools.wraps(orig) @defer.inlineCallbacks def wrapped(self, key): if key in cache: + cache_counter.inc_hits(name) defer.returnValue(cache[key]) + cache_counter.inc_misses(name) ret = yield orig(self, key) prefill(key, ret) defer.returnValue(ret) @@ -83,7 +106,8 @@ def cached(max_entries=1000): class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object - passed to the constructor. Adds logging to the .execute() method.""" + passed to the constructor. Adds logging and metrics to the .execute() + method.""" __slots__ = ["txn", "name"] def __init__(self, txn, name): @@ -99,6 +123,7 @@ class LoggingTransaction(object): def execute(self, sql, *args, **kwargs): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) + try: if args and args[0]: values = args[0] @@ -120,8 +145,9 @@ class LoggingTransaction(object): logger.exception("[SQL FAIL] {%s}", self.name) raise finally: - end = time.time() * 1000 - sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) + msecs = (time.time() * 1000) - start + sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) + sql_query_timer.inc_by(msecs, sql.split()[0]) class PerformanceCounters(object): @@ -172,11 +198,18 @@ class SQLBaseStore(object): self._previous_txn_total_time = 0 self._current_txn_total_time = 0 self._previous_loop_ts = 0 + + # TODO(paul): These can eventually be removed once the metrics code + # is running in mainline, and we have some nice monitoring frontends + # to watch it self._txn_perf_counters = PerformanceCounters() self._get_event_counters = PerformanceCounters() self._get_event_cache = LruCache(hs.config.event_cache_size) + # Pretend the getEventCache is just another named cache + caches_by_name["*getEvent*"] = self._get_event_cache + def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -211,6 +244,8 @@ class SQLBaseStore(object): """Wraps the .runInteraction() method on the underlying db_pool.""" current_context = LoggingContext.current_context() + start_time = time.time() * 1000 + def inner_func(txn, *args, **kwargs): with LoggingContext("runInteraction") as context: current_context.copy_to(context) @@ -223,6 +258,7 @@ class SQLBaseStore(object): name = "%s-%x" % (desc, txn_id, ) + sql_scheduling_timer.inc_by(time.time() * 1000 - start_time) transaction_logger.debug("[TXN START] {%s}", name) try: return func(LoggingTransaction(txn, name), *args, **kwargs) @@ -231,13 +267,13 @@ class SQLBaseStore(object): raise finally: end = time.time() * 1000 - transaction_logger.debug( - "[TXN END] {%s} %f", - name, end - start - ) + duration = end - start + + transaction_logger.debug("[TXN END] {%s} %f", name, duration) - self._current_txn_total_time += end - start + self._current_txn_total_time += duration self._txn_perf_counters.update(desc, start, end) + sql_txn_timer.inc_by(duration, desc) with PreserveLoggingContext(): result = yield self._db_pool.runInteraction( @@ -259,7 +295,7 @@ class SQLBaseStore(object): ) return results - def _execute(self, decoder, query, *args): + def _execute(self, desc, decoder, query, *args): """Runs a single query for a result set. Args: @@ -277,10 +313,10 @@ class SQLBaseStore(object): else: return cursor.fetchall() - return self.runInteraction("_execute", interaction) + return self.runInteraction(desc, interaction) - def _execute_and_decode(self, query, *args): - return self._execute(self.cursor_to_dict, query, *args) + def _execute_and_decode(self, desc, query, *args): + return self._execute(desc, self.cursor_to_dict, query, *args) # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. @@ -638,14 +674,22 @@ class SQLBaseStore(object): get_prev_content=False, allow_rejected=False): start_time = time.time() * 1000 - update_counter = self._get_event_counters.update + + def update_counter(desc, last_time): + curr_time = self._get_event_counters.update(desc, last_time) + sql_getevents_timer.inc_by(curr_time - last_time, desc) + return curr_time cache = self._get_event_cache.setdefault(event_id, {}) try: # Separate cache entries for each way to invoke _get_event_txn - return cache[(check_redacted, get_prev_content, allow_rejected)] + ret = cache[(check_redacted, get_prev_content, allow_rejected)] + + cache_counter.inc_hits("*getEvent*") + return ret except KeyError: + cache_counter.inc_misses("*getEvent*") pass finally: start_time = update_counter("event_cache", start_time) @@ -685,7 +729,11 @@ class SQLBaseStore(object): check_redacted=True, get_prev_content=False): start_time = time.time() * 1000 - update_counter = self._get_event_counters.update + + def update_counter(desc, last_time): + curr_time = self._get_event_counters.update(desc, last_time) + sql_getevents_timer.inc_by(curr_time - last_time, desc) + return curr_time d = json.loads(js) start_time = update_counter("decode_json", start_time) diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index e30265750a..850676ce6c 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -296,7 +296,7 @@ class ApplicationServiceStore(SQLBaseStore): # } # ] services = {} - results = yield self._execute_and_decode(sql) + results = yield self._execute_and_decode("_populate_cache", sql) for res in results: as_token = res["token"] if as_token not in services: diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 2deda8ac50..032334bfd6 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -429,3 +429,15 @@ class EventFederationStore(SQLBaseStore): ) return events[:limit] + + def clean_room_for_join(self, room_id): + return self.runInteraction( + "clean_room_for_join", + self._clean_room_for_join_txn, + room_id, + ) + + def _clean_room_for_join_txn(self, txn, room_id): + query = "DELETE FROM event_forward_extremities WHERE room_id = ?" + + txn.execute(query, (room_id,)) diff --git a/synapse/storage/feedback.py b/synapse/storage/feedback.py index fcf011b234..8eab769b71 100644 --- a/synapse/storage/feedback.py +++ b/synapse/storage/feedback.py @@ -37,7 +37,7 @@ class FeedbackStore(SQLBaseStore): "WHERE feedback.target_event_id = ? " ) - rows = yield self._execute_and_decode(sql, event_id) + rows = yield self._execute_and_decode("get_feedback_for_event", sql, event_id) defer.returnValue( [ diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 1f244019fc..09d1e63657 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -85,7 +85,9 @@ class KeyStore(SQLBaseStore): " AND key_id in (" + ",".join("?" for key_id in key_ids) + ")" ) - rows = yield self._execute_and_decode(sql, server_name, *key_ids) + rows = yield self._execute_and_decode( + "get_server_verify_keys", sql, server_name, *key_ids + ) keys = [] for row in rows: diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index bbf322cc84..d769db2c78 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -34,7 +34,7 @@ class PushRuleStore(SQLBaseStore): "WHERE user_name = ? " "ORDER BY priority_class DESC, priority DESC" ) - rows = yield self._execute(None, sql, user_name) + rows = yield self._execute("get_push_rules_for_user", None, sql, user_name) dicts = [] for r in rows: @@ -57,17 +57,6 @@ class PushRuleStore(SQLBaseStore): ) @defer.inlineCallbacks - def get_push_rule_enabled_by_user_rule_id(self, user_name, rule_id): - results = yield self._simple_select_list( - PushRuleEnableTable.table_name, - {'user_name': user_name, 'rule_id': rule_id}, - ['enabled'] - ) - if not results: - defer.returnValue(True) - defer.returnValue(results[0]) - - @defer.inlineCallbacks def add_push_rule(self, before, after, **kwargs): vals = copy.copy(kwargs) if 'conditions' in vals: @@ -217,17 +206,11 @@ class PushRuleStore(SQLBaseStore): @defer.inlineCallbacks def set_push_rule_enabled(self, user_name, rule_id, enabled): - if enabled: - yield self._simple_delete_one( - PushRuleEnableTable.table_name, - {'user_name': user_name, 'rule_id': rule_id} - ) - else: - yield self._simple_upsert( - PushRuleEnableTable.table_name, - {'user_name': user_name, 'rule_id': rule_id}, - {'enabled': False} - ) + yield self._simple_upsert( + PushRuleEnableTable.table_name, + {'user_name': user_name, 'rule_id': rule_id}, + {'enabled': enabled} + ) class RuleNotFoundException(Exception): diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 6622b4d18a..587dada68f 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -37,7 +37,8 @@ class PusherStore(SQLBaseStore): ) rows = yield self._execute( - None, sql, app_id_and_pushkey[0], app_id_and_pushkey[1] + "get_pushers_by_app_id_and_pushkey", None, sql, + app_id_and_pushkey[0], app_id_and_pushkey[1] ) ret = [ @@ -70,7 +71,7 @@ class PusherStore(SQLBaseStore): "FROM pushers" ) - rows = yield self._execute(None, sql) + rows = yield self._execute("get_all_pushers", None, sql) ret = [ { diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 029b07cc66..3c2f1d6a15 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -19,7 +19,7 @@ from sqlite3 import IntegrityError from synapse.api.errors import StoreError, Codes -from ._base import SQLBaseStore +from ._base import SQLBaseStore, cached class RegistrationStore(SQLBaseStore): @@ -88,10 +88,14 @@ class RegistrationStore(SQLBaseStore): query = ("SELECT users.name, users.password_hash FROM users" " WHERE users.name = ?") return self._execute( - self.cursor_to_dict, - query, user_id + "get_user_by_id", self.cursor_to_dict, query, user_id ) + @cached() + # TODO(paul): Currently there's no code to invalidate this cache. That + # means if/when we ever add internal ways to invalidate access tokens or + # change whether a user is a server admin, those will need to invoke + # store.get_user_by_token.invalidate(token) def get_user_by_token(self, token): """Get a user from the given access token. diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 750b17a45f..549c9af393 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -68,7 +68,7 @@ class RoomStore(SQLBaseStore): """ query = RoomsTable.select_statement("room_id=?") return self._execute( - RoomsTable.decode_single_result, query, room_id, + "get_room", RoomsTable.decode_single_result, query, room_id, ) @defer.inlineCallbacks diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 71db16d0e5..456e4bd45d 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -82,7 +82,7 @@ class StateStore(SQLBaseStore): if context.current_state is None: return - state_events = context.current_state + state_events = dict(context.current_state) if event.is_state(): state_events[(event.type, event.state_key)] = event |