diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2262776ab2..22d6257a9f 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,8 +18,8 @@ from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.descriptors import Cache
from synapse.storage.engines import PostgresEngine
-import synapse.metrics
+from prometheus_client import Histogram
from twisted.internet import defer
@@ -27,20 +27,25 @@ import sys
import time
import threading
+from six import itervalues, iterkeys, iteritems
+from six.moves import intern, range
logger = logging.getLogger(__name__)
+try:
+ MAX_TXN_ID = sys.maxint - 1
+except AttributeError:
+ # python 3 does not have a maximum int value
+ MAX_TXN_ID = 2**63 - 1
+
sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn")
perf_logger = logging.getLogger("synapse.storage.TIME")
+sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
-metrics = synapse.metrics.get_metrics_for("synapse.storage")
-
-sql_scheduling_timer = metrics.register_distribution("schedule_time")
-
-sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
-sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
+sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
+sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"])
class LoggingTransaction(object):
@@ -105,7 +110,7 @@ class LoggingTransaction(object):
# Don't let logging failures stop SQL from working
pass
- start = time.time() * 1000
+ start = time.time()
try:
return func(
@@ -115,9 +120,9 @@ class LoggingTransaction(object):
logger.debug("[SQL FAIL] {%s} %s", self.name, e)
raise
finally:
- msecs = (time.time() * 1000) - start
- sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
- sql_query_timer.inc_by(msecs, sql.split()[0])
+ secs = time.time() - start
+ sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs)
+ sql_query_timer.labels(sql.split()[0]).observe(secs)
class PerformanceCounters(object):
@@ -127,7 +132,7 @@ class PerformanceCounters(object):
def update(self, key, start_time, end_time=None):
if end_time is None:
- end_time = time.time() * 1000
+ end_time = time.time()
duration = end_time - start_time
count, cum_time = self.current_counters.get(key, (0, 0))
count += 1
@@ -137,7 +142,7 @@ class PerformanceCounters(object):
def interval(self, interval_duration, limit=3):
counters = []
- for name, (count, cum_time) in self.current_counters.iteritems():
+ for name, (count, cum_time) in iteritems(self.current_counters):
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
counters.append((
(cum_time - prev_time) / interval_duration,
@@ -217,12 +222,12 @@ class SQLBaseStore(object):
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
logging_context, func, *args, **kwargs):
- start = time.time() * 1000
+ start = time.time()
txn_id = self._TXN_ID
# We don't really need these to be unique, so lets stop it from
# growing really large.
- self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
+ self._TXN_ID = (self._TXN_ID + 1) % (MAX_TXN_ID)
name = "%s-%x" % (desc, txn_id, )
@@ -277,17 +282,17 @@ class SQLBaseStore(object):
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
finally:
- end = time.time() * 1000
+ end = time.time()
duration = end - start
if logging_context is not None:
logging_context.add_database_transaction(duration)
- transaction_logger.debug("[TXN END] {%s} %f", name, duration)
+ transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
self._current_txn_total_time += duration
self._txn_perf_counters.update(desc, start, end)
- sql_txn_timer.inc_by(duration, desc)
+ sql_txn_timer.labels(desc).observe(duration)
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
@@ -344,13 +349,13 @@ class SQLBaseStore(object):
"""
current_context = LoggingContext.current_context()
- start_time = time.time() * 1000
+ start_time = time.time()
def inner_func(conn, *args, **kwargs):
with LoggingContext("runWithConnection") as context:
- sched_duration_ms = time.time() * 1000 - start_time
- sql_scheduling_timer.inc_by(sched_duration_ms)
- current_context.add_database_scheduled(sched_duration_ms)
+ sched_duration_sec = time.time() - start_time
+ sql_scheduling_timer.observe(sched_duration_sec)
+ current_context.add_database_scheduled(sched_duration_sec)
if self.database_engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
@@ -543,7 +548,7 @@ class SQLBaseStore(object):
", ".join("%s = ?" % (k,) for k in values),
" AND ".join("%s = ?" % (k,) for k in keyvalues)
)
- sqlargs = values.values() + keyvalues.values()
+ sqlargs = list(values.values()) + list(keyvalues.values())
txn.execute(sql, sqlargs)
if txn.rowcount > 0:
@@ -561,7 +566,7 @@ class SQLBaseStore(object):
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
)
- txn.execute(sql, allvalues.values())
+ txn.execute(sql, list(allvalues.values()))
# successfully inserted
return True
@@ -629,8 +634,8 @@ class SQLBaseStore(object):
}
if keyvalues:
- sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
- txn.execute(sql, keyvalues.values())
+ sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
+ txn.execute(sql, list(keyvalues.values()))
else:
txn.execute(sql)
@@ -694,7 +699,7 @@ class SQLBaseStore(object):
table,
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
)
- txn.execute(sql, keyvalues.values())
+ txn.execute(sql, list(keyvalues.values()))
else:
sql = "SELECT %s FROM %s" % (
", ".join(retcols),
@@ -725,9 +730,12 @@ class SQLBaseStore(object):
if not iterable:
defer.returnValue(results)
+ # iterables can not be sliced, so convert it to a list first
+ it_list = list(iterable)
+
chunks = [
- iterable[i:i + batch_size]
- for i in xrange(0, len(iterable), batch_size)
+ it_list[i:i + batch_size]
+ for i in range(0, len(it_list), batch_size)
]
for chunk in chunks:
rows = yield self.runInteraction(
@@ -767,7 +775,7 @@ class SQLBaseStore(object):
)
values.extend(iterable)
- for key, value in keyvalues.iteritems():
+ for key, value in iteritems(keyvalues):
clauses.append("%s = ?" % (key,))
values.append(value)
@@ -790,7 +798,7 @@ class SQLBaseStore(object):
@staticmethod
def _simple_update_txn(txn, table, keyvalues, updatevalues):
if keyvalues:
- where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
+ where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
else:
where = ""
@@ -802,7 +810,7 @@ class SQLBaseStore(object):
txn.execute(
update_sql,
- updatevalues.values() + keyvalues.values()
+ list(updatevalues.values()) + list(keyvalues.values())
)
return txn.rowcount
@@ -850,7 +858,7 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k,) for k in keyvalues)
)
- txn.execute(select_sql, keyvalues.values())
+ txn.execute(select_sql, list(keyvalues.values()))
row = txn.fetchone()
if not row:
@@ -888,7 +896,7 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
)
- txn.execute(sql, keyvalues.values())
+ txn.execute(sql, list(keyvalues.values()))
if txn.rowcount == 0:
raise StoreError(404, "No row found")
if txn.rowcount > 1:
@@ -906,7 +914,7 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
)
- return txn.execute(sql, keyvalues.values())
+ return txn.execute(sql, list(keyvalues.values()))
def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
return self.runInteraction(
@@ -938,7 +946,7 @@ class SQLBaseStore(object):
)
values.extend(iterable)
- for key, value in keyvalues.iteritems():
+ for key, value in iteritems(keyvalues):
clauses.append("%s = ?" % (key,))
values.append(value)
@@ -978,7 +986,7 @@ class SQLBaseStore(object):
txn.close()
if cache:
- min_val = min(cache.itervalues())
+ min_val = min(itervalues(cache))
else:
min_val = max_value
@@ -1093,7 +1101,7 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k,) for k in keyvalues),
" ? ASC LIMIT ? OFFSET ?"
)
- txn.execute(sql, keyvalues.values() + pagevalues)
+ txn.execute(sql, list(keyvalues.values()) + list(pagevalues))
else:
sql = "SELECT %s FROM %s ORDER BY %s" % (
", ".join(retcols),
|