diff options
author | Neil Johnson <neil@matrix.org> | 2018-06-06 12:27:33 +0100 |
---|---|---|
committer | Neil Johnson <neil@matrix.org> | 2018-06-06 12:27:33 +0100 |
commit | 752b7b32ed1c33651c0c64fbbb4289c3b62ac89b (patch) | |
tree | e120f6fffa36a2d4f6eae37f555be078940b7854 /synapse/storage/_base.py | |
parent | Merge pull request #3290 from rubo77/patch-7 (diff) | |
parent | 7 char sha in changelog (diff) | |
download | synapse-752b7b32ed1c33651c0c64fbbb4289c3b62ac89b.tar.xz |
Merge tag 'v0.31.0'
Changes in synapse v0.31.0 (2018-06-06) ====================================== Most notable change from v0.30.0 is to switch to python prometheus library to improve system stats reporting. WARNING this changes a number of prometheus metrics in a backwards-incompatible manner. For more details, see `docs/metrics-howto.rst <docs/metrics-howto.rst#removal-of-deprecated-metrics--time-based-counters-becoming-histograms-in-0310>`_. Bug Fixes: * Fix metric documentation tables (PR #3341) * Fix LaterGuage error handling (694968f) * Fix replication metrics (b7e7fd2) Changes in synapse v0.31.0-rc1 (2018-06-04) ========================================== Features: * Switch to the Python Prometheus library (PR #3256, #3274) * Let users leave the server notice room after joining (PR #3287) Changes: * daily user type phone home stats (PR #3264) * Use iter* methods for _filter_events_for_server (PR #3267) * Docs on consent bits (PR #3268) * Remove users from user directory on deactivate (PR #3277) * Avoid sending consent notice to guest users (PR #3288) * disable CPUMetrics if no /proc/self/stat (PR #3299) * Add local and loopback IPv6 addresses to url_preview_ip_range_blacklist (PR #3312) Thanks to @thegcat! * Consistently use six's iteritems and wrap lazy keys/values in list() if they're not meant to be lazy (PR #3307) * Add private IPv6 addresses to example config for url preview blacklist (PR #3317) Thanks to @thegcat! * Reduce stuck read-receipts: ignore depth when updating (PR #3318) * Put python's logs into Trial when running unit tests (PR #3319) Changes, python 3 migration: * Replace some more comparisons with six (PR #3243) Thanks to @NotAFile! * replace some iteritems with six (PR #3244) Thanks to @NotAFile! * Add batch_iter to utils (PR #3245) Thanks to @NotAFile! * use repr, not str (PR #3246) Thanks to @NotAFile! * Misc Python3 fixes (PR #3247) Thanks to @NotAFile! * Py3 storage/_base.py (PR #3278) Thanks to @NotAFile! * more six iteritems (PR #3279) Thanks to @NotAFile! * More Misc. py3 fixes (PR #3280) Thanks to @NotAFile! * remaining isintance fixes (PR #3281) Thanks to @NotAFile! * py3-ize state.py (PR #3283) Thanks to @NotAFile! * extend tox testing for py3 to avoid regressions (PR #3302) Thanks to @krombel! * use memoryview in py3 (PR #3303) Thanks to @NotAFile! Bugs: * Fix federation backfill bugs (PR #3261) * federation: fix LaterGauge usage (PR #3328) Thanks to @intelfx!
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r-- | synapse/storage/_base.py | 84 |
1 files changed, 46 insertions, 38 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 2262776ab2..22d6257a9f 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -18,8 +18,8 @@ from synapse.api.errors import StoreError from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.descriptors import Cache from synapse.storage.engines import PostgresEngine -import synapse.metrics +from prometheus_client import Histogram from twisted.internet import defer @@ -27,20 +27,25 @@ import sys import time import threading +from six import itervalues, iterkeys, iteritems +from six.moves import intern, range logger = logging.getLogger(__name__) +try: + MAX_TXN_ID = sys.maxint - 1 +except AttributeError: + # python 3 does not have a maximum int value + MAX_TXN_ID = 2**63 - 1 + sql_logger = logging.getLogger("synapse.storage.SQL") transaction_logger = logging.getLogger("synapse.storage.txn") perf_logger = logging.getLogger("synapse.storage.TIME") +sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec") -metrics = synapse.metrics.get_metrics_for("synapse.storage") - -sql_scheduling_timer = metrics.register_distribution("schedule_time") - -sql_query_timer = metrics.register_distribution("query_time", labels=["verb"]) -sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"]) +sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"]) +sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"]) class LoggingTransaction(object): @@ -105,7 +110,7 @@ class LoggingTransaction(object): # Don't let logging failures stop SQL from working pass - start = time.time() * 1000 + start = time.time() try: return func( @@ -115,9 +120,9 @@ class LoggingTransaction(object): logger.debug("[SQL FAIL] {%s} %s", self.name, e) raise finally: - msecs = (time.time() * 1000) - start - sql_logger.debug("[SQL time] {%s} %f", self.name, msecs) - sql_query_timer.inc_by(msecs, sql.split()[0]) + secs = time.time() - start + sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs) + sql_query_timer.labels(sql.split()[0]).observe(secs) class PerformanceCounters(object): @@ -127,7 +132,7 @@ class PerformanceCounters(object): def update(self, key, start_time, end_time=None): if end_time is None: - end_time = time.time() * 1000 + end_time = time.time() duration = end_time - start_time count, cum_time = self.current_counters.get(key, (0, 0)) count += 1 @@ -137,7 +142,7 @@ class PerformanceCounters(object): def interval(self, interval_duration, limit=3): counters = [] - for name, (count, cum_time) in self.current_counters.iteritems(): + for name, (count, cum_time) in iteritems(self.current_counters): prev_count, prev_time = self.previous_counters.get(name, (0, 0)) counters.append(( (cum_time - prev_time) / interval_duration, @@ -217,12 +222,12 @@ class SQLBaseStore(object): def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks, logging_context, func, *args, **kwargs): - start = time.time() * 1000 + start = time.time() txn_id = self._TXN_ID # We don't really need these to be unique, so lets stop it from # growing really large. - self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) + self._TXN_ID = (self._TXN_ID + 1) % (MAX_TXN_ID) name = "%s-%x" % (desc, txn_id, ) @@ -277,17 +282,17 @@ class SQLBaseStore(object): logger.debug("[TXN FAIL] {%s} %s", name, e) raise finally: - end = time.time() * 1000 + end = time.time() duration = end - start if logging_context is not None: logging_context.add_database_transaction(duration) - transaction_logger.debug("[TXN END] {%s} %f", name, duration) + transaction_logger.debug("[TXN END] {%s} %f sec", name, duration) self._current_txn_total_time += duration self._txn_perf_counters.update(desc, start, end) - sql_txn_timer.inc_by(duration, desc) + sql_txn_timer.labels(desc).observe(duration) @defer.inlineCallbacks def runInteraction(self, desc, func, *args, **kwargs): @@ -344,13 +349,13 @@ class SQLBaseStore(object): """ current_context = LoggingContext.current_context() - start_time = time.time() * 1000 + start_time = time.time() def inner_func(conn, *args, **kwargs): with LoggingContext("runWithConnection") as context: - sched_duration_ms = time.time() * 1000 - start_time - sql_scheduling_timer.inc_by(sched_duration_ms) - current_context.add_database_scheduled(sched_duration_ms) + sched_duration_sec = time.time() - start_time + sql_scheduling_timer.observe(sched_duration_sec) + current_context.add_database_scheduled(sched_duration_sec) if self.database_engine.is_connection_closed(conn): logger.debug("Reconnecting closed database connection") @@ -543,7 +548,7 @@ class SQLBaseStore(object): ", ".join("%s = ?" % (k,) for k in values), " AND ".join("%s = ?" % (k,) for k in keyvalues) ) - sqlargs = values.values() + keyvalues.values() + sqlargs = list(values.values()) + list(keyvalues.values()) txn.execute(sql, sqlargs) if txn.rowcount > 0: @@ -561,7 +566,7 @@ class SQLBaseStore(object): ", ".join(k for k in allvalues), ", ".join("?" for _ in allvalues) ) - txn.execute(sql, allvalues.values()) + txn.execute(sql, list(allvalues.values())) # successfully inserted return True @@ -629,8 +634,8 @@ class SQLBaseStore(object): } if keyvalues: - sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) - txn.execute(sql, keyvalues.values()) + sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) + txn.execute(sql, list(keyvalues.values())) else: txn.execute(sql) @@ -694,7 +699,7 @@ class SQLBaseStore(object): table, " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - txn.execute(sql, keyvalues.values()) + txn.execute(sql, list(keyvalues.values())) else: sql = "SELECT %s FROM %s" % ( ", ".join(retcols), @@ -725,9 +730,12 @@ class SQLBaseStore(object): if not iterable: defer.returnValue(results) + # iterables can not be sliced, so convert it to a list first + it_list = list(iterable) + chunks = [ - iterable[i:i + batch_size] - for i in xrange(0, len(iterable), batch_size) + it_list[i:i + batch_size] + for i in range(0, len(it_list), batch_size) ] for chunk in chunks: rows = yield self.runInteraction( @@ -767,7 +775,7 @@ class SQLBaseStore(object): ) values.extend(iterable) - for key, value in keyvalues.iteritems(): + for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) values.append(value) @@ -790,7 +798,7 @@ class SQLBaseStore(object): @staticmethod def _simple_update_txn(txn, table, keyvalues, updatevalues): if keyvalues: - where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys()) + where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)) else: where = "" @@ -802,7 +810,7 @@ class SQLBaseStore(object): txn.execute( update_sql, - updatevalues.values() + keyvalues.values() + list(updatevalues.values()) + list(keyvalues.values()) ) return txn.rowcount @@ -850,7 +858,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k,) for k in keyvalues) ) - txn.execute(select_sql, keyvalues.values()) + txn.execute(select_sql, list(keyvalues.values())) row = txn.fetchone() if not row: @@ -888,7 +896,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - txn.execute(sql, keyvalues.values()) + txn.execute(sql, list(keyvalues.values())) if txn.rowcount == 0: raise StoreError(404, "No row found") if txn.rowcount > 1: @@ -906,7 +914,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k, ) for k in keyvalues) ) - return txn.execute(sql, keyvalues.values()) + return txn.execute(sql, list(keyvalues.values())) def _simple_delete_many(self, table, column, iterable, keyvalues, desc): return self.runInteraction( @@ -938,7 +946,7 @@ class SQLBaseStore(object): ) values.extend(iterable) - for key, value in keyvalues.iteritems(): + for key, value in iteritems(keyvalues): clauses.append("%s = ?" % (key,)) values.append(value) @@ -978,7 +986,7 @@ class SQLBaseStore(object): txn.close() if cache: - min_val = min(cache.itervalues()) + min_val = min(itervalues(cache)) else: min_val = max_value @@ -1093,7 +1101,7 @@ class SQLBaseStore(object): " AND ".join("%s = ?" % (k,) for k in keyvalues), " ? ASC LIMIT ? OFFSET ?" ) - txn.execute(sql, keyvalues.values() + pagevalues) + txn.execute(sql, list(keyvalues.values()) + list(pagevalues)) else: sql = "SELECT %s FROM %s ORDER BY %s" % ( ", ".join(retcols), |