diff options
author | Kegan Dougal <kegan@matrix.org> | 2015-03-16 10:09:15 +0000 |
---|---|---|
committer | Kegan Dougal <kegan@matrix.org> | 2015-03-16 10:09:15 +0000 |
commit | f9232c7917322de392921336b4bab5d85c213c4e (patch) | |
tree | ab55547f1c895da508e6ac37c8056fef5b3b754e /synapse/storage/_base.py | |
parent | Fix remaining scheduler bugs. Add more informative logging. (diff) | |
parent | add ToC and fix typoe (diff) | |
download | synapse-f9232c7917322de392921336b4bab5d85c213c4e.tar.xz |
Merge branch 'develop' into application-services-txn-reliability
Conflicts: synapse/storage/appservice.py
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r-- | synapse/storage/_base.py | 73 |
1 files changed, 57 insertions, 16 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 3725c9795d..40f2fc6d76 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -20,6 +20,7 @@ from synapse.events.utils import prune_event from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from synapse.util.lrucache import LruCache +import synapse.metrics from twisted.internet import defer @@ -35,9 +36,22 @@ sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") +metrics = synapse.metrics.get_metrics_for("synapse.storage") + +sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) +sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) +sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"]) + +caches_by_name = {} +cache_counter = metrics.register_cache( + "cache", + lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()}, + labels=["name"], +) + + # TODO(paul): # * more generic key management -# * export monitoring stats # * consider other eviction strategies - LRU? def cached(max_entries=1000): """ A method decorator that applies a memoizing cache around the function. @@ -55,6 +69,9 @@ def cached(max_entries=1000): """ def wrap(orig): cache = OrderedDict() + name = orig.__name__ + + caches_by_name[name] = cache def prefill(key, value): while len(cache) > max_entries: @@ -65,8 +82,10 @@ def cached(max_entries=1000): @defer.inlineCallbacks def wrapped(self, key): if key in cache: + cache_counter.inc_hits(name) defer.returnValue(cache[key]) + cache_counter.inc_misses(name) ret = yield orig(self, key) prefill(key, ret) defer.returnValue(ret) @@ -83,7 +102,8 @@ def cached(max_entries=1000): class LoggingTransaction(object): """An object that almost-transparently proxies for the 'txn' object - passed to the constructor. Adds logging to the .execute() method.""" + passed to the constructor. Adds logging and metrics to the .execute() + method.""" __slots__ = ["txn", "name"] def __init__(self, txn, name): @@ -99,6 +119,7 @@ class LoggingTransaction(object): def execute(self, sql, *args, **kwargs): # TODO(paul): Maybe use 'info' and 'debug' for values? sql_logger.debug("[SQL] {%s} %s", self.name, sql) + try: if args and args[0]: values = args[0] @@ -120,8 +141,9 @@ class LoggingTransaction(object): logger.exception("[SQL FAIL] {%s}", self.name) raise finally: - end = time.time() * 1000 - sql_logger.debug("[SQL time] {%s} %f", self.name, end - start) + msecs = (time.time() * 1000) - start + sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) + sql_query_timer.inc_by(msecs, sql.split()[0]) class PerformanceCounters(object): @@ -172,11 +194,18 @@ class SQLBaseStore(object): self._previous_txn_total_time = 0 self._current_txn_total_time = 0 self._previous_loop_ts = 0 + + # TODO(paul): These can eventually be removed once the metrics code + # is running in mainline, and we have some nice monitoring frontends + # to watch it self._txn_perf_counters = PerformanceCounters() self._get_event_counters = PerformanceCounters() self._get_event_cache = LruCache(hs.config.event_cache_size) + # Pretend the getEventCache is just another named cache + caches_by_name["*getEvent*"] = self._get_event_cache + def start_profiling(self): self._previous_loop_ts = self._clock.time_msec() @@ -231,13 +260,13 @@ class SQLBaseStore(object): raise finally: end = time.time() * 1000 - transaction_logger.debug( - "[TXN END] {%s} %f", - name, end - start - ) + duration = end - start + + transaction_logger.debug("[TXN END] {%s} %f", name, duration) - self._current_txn_total_time += end - start + self._current_txn_total_time += duration self._txn_perf_counters.update(desc, start, end) + sql_txn_timer.inc_by(duration, desc) with PreserveLoggingContext(): result = yield self._db_pool.runInteraction( @@ -259,7 +288,7 @@ class SQLBaseStore(object): ) return results - def _execute(self, decoder, query, *args): + def _execute(self, desc, decoder, query, *args): """Runs a single query for a result set. Args: @@ -277,10 +306,10 @@ class SQLBaseStore(object): else: return cursor.fetchall() - return self.runInteraction("_execute", interaction) + return self.runInteraction(desc, interaction) - def _execute_and_decode(self, query, *args): - return self._execute(self.cursor_to_dict, query, *args) + def _execute_and_decode(self, desc, query, *args): + return self._execute(desc, self.cursor_to_dict, query, *args) # "Simple" SQL API methods that operate on a single table with no JOINs, # no complex WHERE clauses, just a dict of values for columns. @@ -638,14 +667,22 @@ class SQLBaseStore(object): get_prev_content=False, allow_rejected=False): start_time = time.time() * 1000 - update_counter = self._get_event_counters.update + + def update_counter(desc, last_time): + curr_time = self._get_event_counters.update(desc, last_time) + sql_getevents_timer.inc_by(curr_time - last_time, desc) + return curr_time cache = self._get_event_cache.setdefault(event_id, {}) try: # Separate cache entries for each way to invoke _get_event_txn - return cache[(check_redacted, get_prev_content, allow_rejected)] + ret = cache[(check_redacted, get_prev_content, allow_rejected)] + + cache_counter.inc_hits("*getEvent*") + return ret except KeyError: + cache_counter.inc_misses("*getEvent*") pass finally: start_time = update_counter("event_cache", start_time) @@ -685,7 +722,11 @@ class SQLBaseStore(object): check_redacted=True, get_prev_content=False): start_time = time.time() * 1000 - update_counter = self._get_event_counters.update + + def update_counter(desc, last_time): + curr_time = self._get_event_counters.update(desc, last_time) + sql_getevents_timer.inc_by(curr_time - last_time, desc) + return curr_time d = json.loads(js) start_time = update_counter("decode_json", start_time) |