diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 2262776ab2..22d6257a9f 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,8 +18,8 @@ from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.descriptors import Cache
from synapse.storage.engines import PostgresEngine
-import synapse.metrics
+from prometheus_client import Histogram
from twisted.internet import defer
@@ -27,20 +27,25 @@ import sys
import time
import threading
+from six import itervalues, iterkeys, iteritems
+from six.moves import intern, range
logger = logging.getLogger(__name__)
+try:
+ MAX_TXN_ID = sys.maxint - 1
+except AttributeError:
+ # python 3 does not have a maximum int value
+ MAX_TXN_ID = 2**63 - 1
+
sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn")
perf_logger = logging.getLogger("synapse.storage.TIME")
+sql_scheduling_timer = Histogram("synapse_storage_schedule_time", "sec")
-metrics = synapse.metrics.get_metrics_for("synapse.storage")
-
-sql_scheduling_timer = metrics.register_distribution("schedule_time")
-
-sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
-sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
+sql_query_timer = Histogram("synapse_storage_query_time", "sec", ["verb"])
+sql_txn_timer = Histogram("synapse_storage_transaction_time", "sec", ["desc"])
class LoggingTransaction(object):
@@ -105,7 +110,7 @@ class LoggingTransaction(object):
# Don't let logging failures stop SQL from working
pass
- start = time.time() * 1000
+ start = time.time()
try:
return func(
@@ -115,9 +120,9 @@ class LoggingTransaction(object):
logger.debug("[SQL FAIL] {%s} %s", self.name, e)
raise
finally:
- msecs = (time.time() * 1000) - start
- sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
- sql_query_timer.inc_by(msecs, sql.split()[0])
+ secs = time.time() - start
+ sql_logger.debug("[SQL time] {%s} %f sec", self.name, secs)
+ sql_query_timer.labels(sql.split()[0]).observe(secs)
class PerformanceCounters(object):
@@ -127,7 +132,7 @@ class PerformanceCounters(object):
def update(self, key, start_time, end_time=None):
if end_time is None:
- end_time = time.time() * 1000
+ end_time = time.time()
duration = end_time - start_time
count, cum_time = self.current_counters.get(key, (0, 0))
count += 1
@@ -137,7 +142,7 @@ class PerformanceCounters(object):
def interval(self, interval_duration, limit=3):
counters = []
- for name, (count, cum_time) in self.current_counters.iteritems():
+ for name, (count, cum_time) in iteritems(self.current_counters):
prev_count, prev_time = self.previous_counters.get(name, (0, 0))
counters.append((
(cum_time - prev_time) / interval_duration,
@@ -217,12 +222,12 @@ class SQLBaseStore(object):
def _new_transaction(self, conn, desc, after_callbacks, exception_callbacks,
logging_context, func, *args, **kwargs):
- start = time.time() * 1000
+ start = time.time()
txn_id = self._TXN_ID
# We don't really need these to be unique, so lets stop it from
# growing really large.
- self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1)
+ self._TXN_ID = (self._TXN_ID + 1) % (MAX_TXN_ID)
name = "%s-%x" % (desc, txn_id, )
@@ -277,17 +282,17 @@ class SQLBaseStore(object):
logger.debug("[TXN FAIL] {%s} %s", name, e)
raise
finally:
- end = time.time() * 1000
+ end = time.time()
duration = end - start
if logging_context is not None:
logging_context.add_database_transaction(duration)
- transaction_logger.debug("[TXN END] {%s} %f", name, duration)
+ transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
self._current_txn_total_time += duration
self._txn_perf_counters.update(desc, start, end)
- sql_txn_timer.inc_by(duration, desc)
+ sql_txn_timer.labels(desc).observe(duration)
@defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
@@ -344,13 +349,13 @@ class SQLBaseStore(object):
"""
current_context = LoggingContext.current_context()
- start_time = time.time() * 1000
+ start_time = time.time()
def inner_func(conn, *args, **kwargs):
with LoggingContext("runWithConnection") as context:
- sched_duration_ms = time.time() * 1000 - start_time
- sql_scheduling_timer.inc_by(sched_duration_ms)
- current_context.add_database_scheduled(sched_duration_ms)
+ sched_duration_sec = time.time() - start_time
+ sql_scheduling_timer.observe(sched_duration_sec)
+ current_context.add_database_scheduled(sched_duration_sec)
if self.database_engine.is_connection_closed(conn):
logger.debug("Reconnecting closed database connection")
@@ -543,7 +548,7 @@ class SQLBaseStore(object):
", ".join("%s = ?" % (k,) for k in values),
" AND ".join("%s = ?" % (k,) for k in keyvalues)
)
- sqlargs = values.values() + keyvalues.values()
+ sqlargs = list(values.values()) + list(keyvalues.values())
txn.execute(sql, sqlargs)
if txn.rowcount > 0:
@@ -561,7 +566,7 @@ class SQLBaseStore(object):
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
)
- txn.execute(sql, allvalues.values())
+ txn.execute(sql, list(allvalues.values()))
# successfully inserted
return True
@@ -629,8 +634,8 @@ class SQLBaseStore(object):
}
if keyvalues:
- sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
- txn.execute(sql, keyvalues.values())
+ sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
+ txn.execute(sql, list(keyvalues.values()))
else:
txn.execute(sql)
@@ -694,7 +699,7 @@ class SQLBaseStore(object):
table,
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
)
- txn.execute(sql, keyvalues.values())
+ txn.execute(sql, list(keyvalues.values()))
else:
sql = "SELECT %s FROM %s" % (
", ".join(retcols),
@@ -725,9 +730,12 @@ class SQLBaseStore(object):
if not iterable:
defer.returnValue(results)
+ # iterables can not be sliced, so convert it to a list first
+ it_list = list(iterable)
+
chunks = [
- iterable[i:i + batch_size]
- for i in xrange(0, len(iterable), batch_size)
+ it_list[i:i + batch_size]
+ for i in range(0, len(it_list), batch_size)
]
for chunk in chunks:
rows = yield self.runInteraction(
@@ -767,7 +775,7 @@ class SQLBaseStore(object):
)
values.extend(iterable)
- for key, value in keyvalues.iteritems():
+ for key, value in iteritems(keyvalues):
clauses.append("%s = ?" % (key,))
values.append(value)
@@ -790,7 +798,7 @@ class SQLBaseStore(object):
@staticmethod
def _simple_update_txn(txn, table, keyvalues, updatevalues):
if keyvalues:
- where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.iterkeys())
+ where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
else:
where = ""
@@ -802,7 +810,7 @@ class SQLBaseStore(object):
txn.execute(
update_sql,
- updatevalues.values() + keyvalues.values()
+ list(updatevalues.values()) + list(keyvalues.values())
)
return txn.rowcount
@@ -850,7 +858,7 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k,) for k in keyvalues)
)
- txn.execute(select_sql, keyvalues.values())
+ txn.execute(select_sql, list(keyvalues.values()))
row = txn.fetchone()
if not row:
@@ -888,7 +896,7 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
)
- txn.execute(sql, keyvalues.values())
+ txn.execute(sql, list(keyvalues.values()))
if txn.rowcount == 0:
raise StoreError(404, "No row found")
if txn.rowcount > 1:
@@ -906,7 +914,7 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k, ) for k in keyvalues)
)
- return txn.execute(sql, keyvalues.values())
+ return txn.execute(sql, list(keyvalues.values()))
def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
return self.runInteraction(
@@ -938,7 +946,7 @@ class SQLBaseStore(object):
)
values.extend(iterable)
- for key, value in keyvalues.iteritems():
+ for key, value in iteritems(keyvalues):
clauses.append("%s = ?" % (key,))
values.append(value)
@@ -978,7 +986,7 @@ class SQLBaseStore(object):
txn.close()
if cache:
- min_val = min(cache.itervalues())
+ min_val = min(itervalues(cache))
else:
min_val = max_value
@@ -1093,7 +1101,7 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k,) for k in keyvalues),
" ? ASC LIMIT ? OFFSET ?"
)
- txn.execute(sql, keyvalues.values() + pagevalues)
+ txn.execute(sql, list(keyvalues.values()) + list(pagevalues))
else:
sql = "SELECT %s FROM %s ORDER BY %s" % (
", ".join(retcols),
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index ba46907737..ce338514e8 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -22,6 +22,8 @@ from . import background_updates
from synapse.util.caches import CACHE_SIZE_FACTOR
+from six import iteritems
+
logger = logging.getLogger(__name__)
@@ -99,7 +101,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
def _update_client_ips_batch_txn(self, txn, to_update):
self.database_engine.lock_table(txn, "user_ips")
- for entry in to_update.iteritems():
+ for entry in iteritems(to_update):
(user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
self._simple_upsert_txn(
@@ -231,5 +233,5 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
"user_agent": user_agent,
"last_seen": last_seen,
}
- for (access_token, ip), (user_agent, last_seen) in results.iteritems()
+ for (access_token, ip), (user_agent, last_seen) in iteritems(results)
))
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 712106b83a..d149d8392e 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,7 @@ from synapse.api.errors import StoreError
from ._base import SQLBaseStore, Cache
from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
+from six import itervalues, iteritems
logger = logging.getLogger(__name__)
@@ -360,7 +361,7 @@ class DeviceStore(SQLBaseStore):
return (now_stream_id, [])
if len(query_map) >= 20:
- now_stream_id = max(stream_id for stream_id in query_map.itervalues())
+ now_stream_id = max(stream_id for stream_id in itervalues(query_map))
devices = self._get_e2e_device_keys_txn(
txn, query_map.keys(), include_all_devices=True
@@ -373,13 +374,13 @@ class DeviceStore(SQLBaseStore):
"""
results = []
- for user_id, user_devices in devices.iteritems():
+ for user_id, user_devices in iteritems(devices):
# The prev_id for the first row is always the last row before
# `from_stream_id`
txn.execute(prev_sent_id_sql, (destination, user_id, from_stream_id))
rows = txn.fetchall()
prev_id = rows[0][0]
- for device_id, device in user_devices.iteritems():
+ for device_id, device in iteritems(user_devices):
stream_id = query_map[(user_id, device_id)]
result = {
"user_id": user_id,
@@ -483,7 +484,7 @@ class DeviceStore(SQLBaseStore):
if devices:
user_devices = devices[user_id]
results = []
- for device_id, device in user_devices.iteritems():
+ for device_id, device in iteritems(user_devices):
result = {
"device_id": device_id,
}
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index ff8538ddf8..b146487943 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -21,6 +21,8 @@ import simplejson as json
from ._base import SQLBaseStore
+from six import iteritems
+
class EndToEndKeyStore(SQLBaseStore):
def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
@@ -81,8 +83,8 @@ class EndToEndKeyStore(SQLBaseStore):
query_list, include_all_devices,
)
- for user_id, device_keys in results.iteritems():
- for device_id, device_info in device_keys.iteritems():
+ for user_id, device_keys in iteritems(results):
+ for device_id, device_info in iteritems(device_keys):
device_info["keys"] = json.loads(device_info.pop("key_json"))
defer.returnValue(results)
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index f084a5f54b..d0350ee5fe 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -22,6 +22,8 @@ from synapse.util.caches.descriptors import cachedInlineCallbacks
import logging
import simplejson as json
+from six import iteritems
+
logger = logging.getLogger(__name__)
@@ -420,7 +422,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
txn.executemany(sql, (
_gen_entry(user_id, actions)
- for user_id, actions in user_id_actions.iteritems()
+ for user_id, actions in iteritems(user_id_actions)
))
return self.runInteraction(
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5ebef98c4f..cb1082e864 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -40,30 +40,30 @@ import synapse.metrics
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
-logger = logging.getLogger(__name__)
+from six.moves import range
+from six import itervalues, iteritems
+from prometheus_client import Counter
-metrics = synapse.metrics.get_metrics_for(__name__)
-persist_event_counter = metrics.register_counter("persisted_events")
-event_counter = metrics.register_counter(
- "persisted_events_sep", labels=["type", "origin_type", "origin_entity"]
-)
+logger = logging.getLogger(__name__)
+
+persist_event_counter = Counter("synapse_storage_events_persisted_events", "")
+event_counter = Counter("synapse_storage_events_persisted_events_sep", "",
+ ["type", "origin_type", "origin_entity"])
# The number of times we are recalculating the current state
-state_delta_counter = metrics.register_counter(
- "state_delta",
-)
+state_delta_counter = Counter("synapse_storage_events_state_delta", "")
+
# The number of times we are recalculating state when there is only a
# single forward extremity
-state_delta_single_event_counter = metrics.register_counter(
- "state_delta_single_event",
-)
+state_delta_single_event_counter = Counter(
+ "synapse_storage_events_state_delta_single_event", "")
+
# The number of times we are reculating state when we could have resonably
# calculated the delta when we calculated the state for an event we were
# persisting.
-state_delta_reuse_delta_counter = metrics.register_counter(
- "state_delta_reuse_delta",
-)
+state_delta_reuse_delta_counter = Counter(
+ "synapse_storage_events_state_delta_reuse_delta", "")
def encode_json(json_object):
@@ -248,7 +248,7 @@ class EventsStore(EventsWorkerStore):
partitioned.setdefault(event.room_id, []).append((event, ctx))
deferreds = []
- for room_id, evs_ctxs in partitioned.iteritems():
+ for room_id, evs_ctxs in iteritems(partitioned):
d = self._event_persist_queue.add_to_queue(
room_id, evs_ctxs,
backfilled=backfilled,
@@ -333,7 +333,7 @@ class EventsStore(EventsWorkerStore):
chunks = [
events_and_contexts[x:x + 100]
- for x in xrange(0, len(events_and_contexts), 100)
+ for x in range(0, len(events_and_contexts), 100)
]
for chunk in chunks:
@@ -367,7 +367,7 @@ class EventsStore(EventsWorkerStore):
(event, context)
)
- for room_id, ev_ctx_rm in events_by_room.iteritems():
+ for room_id, ev_ctx_rm in iteritems(events_by_room):
# Work out new extremities by recursively adding and removing
# the new events.
latest_event_ids = yield self.get_latest_event_ids_in_room(
@@ -445,7 +445,7 @@ class EventsStore(EventsWorkerStore):
state_delta_for_room=state_delta_for_room,
new_forward_extremeties=new_forward_extremeties,
)
- persist_event_counter.inc_by(len(chunk))
+ persist_event_counter.inc(len(chunk))
synapse.metrics.event_persisted_position.set(
chunk[-1][0].internal_metadata.stream_ordering,
)
@@ -460,14 +460,14 @@ class EventsStore(EventsWorkerStore):
origin_type = "remote"
origin_entity = get_domain_from_id(event.sender)
- event_counter.inc(event.type, origin_type, origin_entity)
+ event_counter.labels(event.type, origin_type, origin_entity).inc()
- for room_id, new_state in current_state_for_room.iteritems():
+ for room_id, new_state in iteritems(current_state_for_room):
self.get_current_state_ids.prefill(
(room_id, ), new_state
)
- for room_id, latest_event_ids in new_forward_extremeties.iteritems():
+ for room_id, latest_event_ids in iteritems(new_forward_extremeties):
self.get_latest_event_ids_in_room.prefill(
(room_id,), list(latest_event_ids)
)
@@ -644,20 +644,20 @@ class EventsStore(EventsWorkerStore):
"""
existing_state = yield self.get_current_state_ids(room_id)
- existing_events = set(existing_state.itervalues())
- new_events = set(ev_id for ev_id in current_state.itervalues())
+ existing_events = set(itervalues(existing_state))
+ new_events = set(ev_id for ev_id in itervalues(current_state))
changed_events = existing_events ^ new_events
if not changed_events:
return
to_delete = {
- key: ev_id for key, ev_id in existing_state.iteritems()
+ key: ev_id for key, ev_id in iteritems(existing_state)
if ev_id in changed_events
}
events_to_insert = (new_events - existing_events)
to_insert = {
- key: ev_id for key, ev_id in current_state.iteritems()
+ key: ev_id for key, ev_id in iteritems(current_state)
if ev_id in events_to_insert
}
@@ -760,11 +760,11 @@ class EventsStore(EventsWorkerStore):
)
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
- for room_id, current_state_tuple in state_delta_by_room.iteritems():
+ for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
- [(ev_id,) for ev_id in to_delete.itervalues()],
+ [(ev_id,) for ev_id in itervalues(to_delete)],
)
self._simple_insert_many_txn(
@@ -777,7 +777,7 @@ class EventsStore(EventsWorkerStore):
"type": key[0],
"state_key": key[1],
}
- for key, ev_id in to_insert.iteritems()
+ for key, ev_id in iteritems(to_insert)
],
)
@@ -796,7 +796,7 @@ class EventsStore(EventsWorkerStore):
"event_id": ev_id,
"prev_event_id": to_delete.get(key, None),
}
- for key, ev_id in state_deltas.iteritems()
+ for key, ev_id in iteritems(state_deltas)
]
)
@@ -839,7 +839,7 @@ class EventsStore(EventsWorkerStore):
def _update_forward_extremities_txn(self, txn, new_forward_extremities,
max_stream_order):
- for room_id, new_extrem in new_forward_extremities.iteritems():
+ for room_id, new_extrem in iteritems(new_forward_extremities):
self._simple_delete_txn(
txn,
table="event_forward_extremities",
@@ -857,7 +857,7 @@ class EventsStore(EventsWorkerStore):
"event_id": ev_id,
"room_id": room_id,
}
- for room_id, new_extrem in new_forward_extremities.iteritems()
+ for room_id, new_extrem in iteritems(new_forward_extremities)
for ev_id in new_extrem
],
)
@@ -874,7 +874,7 @@ class EventsStore(EventsWorkerStore):
"event_id": event_id,
"stream_ordering": max_stream_order,
}
- for room_id, new_extrem in new_forward_extremities.iteritems()
+ for room_id, new_extrem in iteritems(new_forward_extremities)
for event_id in new_extrem
]
)
@@ -902,7 +902,7 @@ class EventsStore(EventsWorkerStore):
new_events_and_contexts[event.event_id] = (event, context)
else:
new_events_and_contexts[event.event_id] = (event, context)
- return new_events_and_contexts.values()
+ return list(new_events_and_contexts.values())
def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
"""Update min_depth for each room
@@ -928,7 +928,7 @@ class EventsStore(EventsWorkerStore):
event.depth, depth_updates.get(event.room_id, event.depth)
)
- for room_id, depth in depth_updates.iteritems():
+ for room_id, depth in iteritems(depth_updates):
self._update_min_depth_for_room_txn(txn, room_id, depth)
def _update_outliers_txn(self, txn, events_and_contexts):
@@ -1312,7 +1312,7 @@ class EventsStore(EventsWorkerStore):
" WHERE e.event_id IN (%s)"
) % (",".join(["?"] * len(ev_map)),)
- txn.execute(sql, ev_map.keys())
+ txn.execute(sql, list(ev_map))
rows = self.cursor_to_dict(txn)
for row in rows:
event = ev_map[row["event_id"]]
@@ -1575,7 +1575,7 @@ class EventsStore(EventsWorkerStore):
chunks = [
event_ids[i:i + 100]
- for i in xrange(0, len(event_ids), 100)
+ for i in range(0, len(event_ids), 100)
]
for chunk in chunks:
ev_rows = self._simple_select_many_txn(
@@ -1989,7 +1989,7 @@ class EventsStore(EventsWorkerStore):
logger.info("[purge] finding state groups which depend on redundant"
" state groups")
remaining_state_groups = []
- for i in xrange(0, len(state_rows), 100):
+ for i in range(0, len(state_rows), 100):
chunk = [sg for sg, in state_rows[i:i + 100]]
# look for state groups whose prev_state_group is one we are about
# to delete
@@ -2045,7 +2045,7 @@ class EventsStore(EventsWorkerStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in curr_state.iteritems()
+ for key, state_id in iteritems(curr_state)
],
)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index ba834854e1..32d9d00ffb 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -337,7 +337,7 @@ class EventsWorkerStore(SQLBaseStore):
def _fetch_event_rows(self, txn, events):
rows = []
N = 200
- for i in range(1 + len(events) / N):
+ for i in range(1 + len(events) // N):
evs = events[i * N:(i + 1) * N]
if not evs:
break
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 78b1e30945..2e2763126d 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -44,7 +44,7 @@ class FilteringStore(SQLBaseStore):
desc="get_user_filter",
)
- defer.returnValue(json.loads(str(def_json).decode("utf-8")))
+ defer.returnValue(json.loads(bytes(def_json).decode("utf-8")))
def add_user_filter(self, user_localpart, user_filter):
def_json = encode_canonical_json(user_filter)
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 87aeaf71d6..0f13b61da8 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
from twisted.internet import defer
+import six
import OpenSSL
from signedjson.key import decode_verify_key_bytes
@@ -26,6 +27,13 @@ import logging
logger = logging.getLogger(__name__)
+# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
+# despite being deprecated and removed in favor of memoryview
+if six.PY2:
+ db_binary_type = buffer
+else:
+ db_binary_type = memoryview
+
class KeyStore(SQLBaseStore):
"""Persistence for signature verification keys and tls X.509 certificates
@@ -72,7 +80,7 @@ class KeyStore(SQLBaseStore):
values={
"from_server": from_server,
"ts_added_ms": time_now_ms,
- "tls_certificate": buffer(tls_certificate_bytes),
+ "tls_certificate": db_binary_type(tls_certificate_bytes),
},
desc="store_server_certificate",
)
@@ -92,7 +100,7 @@ class KeyStore(SQLBaseStore):
if verify_key_bytes:
defer.returnValue(decode_verify_key_bytes(
- key_id, str(verify_key_bytes)
+ key_id, bytes(verify_key_bytes)
))
@defer.inlineCallbacks
@@ -135,7 +143,7 @@ class KeyStore(SQLBaseStore):
values={
"from_server": from_server,
"ts_added_ms": time_now_ms,
- "verify_key": buffer(verify_key.encode()),
+ "verify_key": db_binary_type(verify_key.encode()),
},
)
txn.call_after(
@@ -172,7 +180,7 @@ class KeyStore(SQLBaseStore):
"from_server": from_server,
"ts_added_ms": ts_now_ms,
"ts_valid_until_ms": ts_expires_ms,
- "key_json": buffer(key_json_bytes),
+ "key_json": db_binary_type(key_json_bytes),
},
desc="store_server_keys_json",
)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c08e9cd65a..cf2aae0468 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 49
+SCHEMA_VERSION = 50
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 9e9d3c2591..f05d91cc58 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -16,6 +16,7 @@
from ._base import SQLBaseStore
from synapse.api.constants import PresenceState
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
+from synapse.util import batch_iter
from collections import namedtuple
from twisted.internet import defer
@@ -115,11 +116,7 @@ class PresenceStore(SQLBaseStore):
" AND user_id IN (%s)"
)
- batches = (
- presence_states[i:i + 50]
- for i in xrange(0, len(presence_states), 50)
- )
- for states in batches:
+ for states in batch_iter(presence_states, 50):
args = [stream_id]
args.extend(s.user_id for s in states)
txn.execute(
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 709c69a926..c93c228f6e 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -332,6 +332,35 @@ class ReceiptsStore(ReceiptsWorkerStore):
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
user_id, event_id, data, stream_id):
+ res = self._simple_select_one_txn(
+ txn,
+ table="events",
+ retcols=["topological_ordering", "stream_ordering"],
+ keyvalues={"event_id": event_id},
+ allow_none=True
+ )
+
+ stream_ordering = int(res["stream_ordering"]) if res else None
+
+ # We don't want to clobber receipts for more recent events, so we
+ # have to compare orderings of existing receipts
+ if stream_ordering is not None:
+ sql = (
+ "SELECT stream_ordering, event_id FROM events"
+ " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
+ " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
+ )
+ txn.execute(sql, (room_id, receipt_type, user_id))
+
+ for so, eid in txn:
+ if int(so) >= stream_ordering:
+ logger.debug(
+ "Ignoring new receipt for %s in favour of existing "
+ "one for later event %s",
+ event_id, eid,
+ )
+ return False
+
txn.call_after(
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
)
@@ -355,34 +384,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
(user_id, room_id, receipt_type)
)
- res = self._simple_select_one_txn(
- txn,
- table="events",
- retcols=["topological_ordering", "stream_ordering"],
- keyvalues={"event_id": event_id},
- allow_none=True
- )
-
- topological_ordering = int(res["topological_ordering"]) if res else None
- stream_ordering = int(res["stream_ordering"]) if res else None
-
- # We don't want to clobber receipts for more recent events, so we
- # have to compare orderings of existing receipts
- sql = (
- "SELECT topological_ordering, stream_ordering, event_id FROM events"
- " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
- " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
- )
-
- txn.execute(sql, (room_id, receipt_type, user_id))
-
- if topological_ordering:
- for to, so, _ in txn:
- if int(to) > topological_ordering:
- return False
- elif int(to) == topological_ordering and int(so) >= stream_ordering:
- return False
-
self._simple_delete_txn(
txn,
table="receipts_linearized",
@@ -406,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
}
)
- if receipt_type == "m.read" and topological_ordering:
+ if receipt_type == "m.read" and stream_ordering is not None:
self._remove_old_push_actions_before_txn(
txn,
room_id=room_id,
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index a530e29f43..c241167fbe 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -36,6 +36,7 @@ class RegistrationWorkerStore(SQLBaseStore):
retcols=[
"name", "password_hash", "is_guest",
"consent_version", "consent_server_notice_sent",
+ "appservice_id",
],
allow_none=True,
desc="get_user_by_id",
@@ -101,6 +102,13 @@ class RegistrationStore(RegistrationWorkerStore,
columns=["user_id", "device_id"],
)
+ self.register_background_index_update(
+ "users_creation_ts",
+ index_name="users_creation_ts",
+ table="users",
+ columns=["creation_ts"],
+ )
+
# we no longer use refresh tokens, but it's possible that some people
# might have a background update queued to build this index. Just
# clear the background update.
@@ -485,6 +493,35 @@ class RegistrationStore(RegistrationWorkerStore,
ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret)
+ def count_daily_user_type(self):
+ """
+ Counts 1) native non guest users
+ 2) native guests users
+ 3) bridged users
+ who registered on the homeserver in the past 24 hours
+ """
+ def _count_daily_user_type(txn):
+ yesterday = int(self._clock.time()) - (60 * 60 * 24)
+
+ sql = """
+ SELECT user_type, COALESCE(count(*), 0) AS count FROM (
+ SELECT
+ CASE
+ WHEN is_guest=0 AND appservice_id IS NULL THEN 'native'
+ WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest'
+ WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged'
+ END AS user_type
+ FROM users
+ WHERE creation_ts > ?
+ ) AS t GROUP BY user_type
+ """
+ results = {'native': 0, 'guest': 0, 'bridged': 0}
+ txn.execute(sql, (yesterday,))
+ for row in txn:
+ results[row[0]] = row[1]
+ return results
+ return self.runInteraction("count_daily_user_type", _count_daily_user_type)
+
@defer.inlineCallbacks
def count_nonbridged_users(self):
def _count_users(txn):
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 6a861943a2..7bfc3d91b5 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -30,6 +30,8 @@ from synapse.types import get_domain_from_id
import logging
import simplejson as json
+from six import itervalues, iteritems
+
logger = logging.getLogger(__name__)
@@ -272,7 +274,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
users_in_room = {}
member_event_ids = [
e_id
- for key, e_id in current_state_ids.iteritems()
+ for key, e_id in iteritems(current_state_ids)
if key[0] == EventTypes.Member
]
@@ -289,7 +291,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
users_in_room = dict(prev_res)
member_event_ids = [
e_id
- for key, e_id in context.delta_ids.iteritems()
+ for key, e_id in iteritems(context.delta_ids)
if key[0] == EventTypes.Member
]
for etype, state_key in context.delta_ids:
@@ -741,7 +743,7 @@ class _JoinedHostsCache(object):
if state_entry.state_group == self.state_group:
pass
elif state_entry.prev_group == self.state_group:
- for (typ, state_key), event_id in state_entry.delta_ids.iteritems():
+ for (typ, state_key), event_id in iteritems(state_entry.delta_ids):
if typ != EventTypes.Member:
continue
@@ -771,7 +773,7 @@ class _JoinedHostsCache(object):
self.state_group = state_entry.state_group
else:
self.state_group = object()
- self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues())
+ self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users))
defer.returnValue(frozenset(self.hosts_to_joined_users))
def __len__(self):
diff --git a/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql
new file mode 100644
index 0000000000..c93ae47532
--- /dev/null
+++ b/synapse/storage/schema/delta/50/add_creation_ts_users_index.sql
@@ -0,0 +1,19 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('users_creation_ts', '{}');
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 6ba3e59889..f0fa5d7631 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -18,13 +18,14 @@ import logging
import re
import simplejson as json
+from six import string_types
+
from twisted.internet import defer
from .background_updates import BackgroundUpdateStore
from synapse.api.errors import SynapseError
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
-
logger = logging.getLogger(__name__)
SearchEntry = namedtuple('SearchEntry', [
@@ -126,7 +127,7 @@ class SearchStore(BackgroundUpdateStore):
# skip over it.
continue
- if not isinstance(value, basestring):
+ if not isinstance(value, string_types):
# If the event body, name or topic isn't a string
# then skip over it
continue
@@ -447,7 +448,7 @@ class SearchStore(BackgroundUpdateStore):
"search_msgs", self.cursor_to_dict, sql, *args
)
- results = filter(lambda row: row["room_id"] in room_ids, results)
+ results = list(filter(lambda row: row["room_id"] in room_ids, results))
events = yield self._get_events([r["event_id"] for r in results])
@@ -602,7 +603,7 @@ class SearchStore(BackgroundUpdateStore):
"search_rooms", self.cursor_to_dict, sql, *args
)
- results = filter(lambda row: row["room_id"] in room_ids, results)
+ results = list(filter(lambda row: row["room_id"] in room_ids, results))
events = yield self._get_events([r["event_id"] for r in results])
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index 9e6eaaa532..25922e5a9c 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -14,6 +14,7 @@
# limitations under the License.
from twisted.internet import defer
+import six
from ._base import SQLBaseStore
@@ -21,6 +22,13 @@ from unpaddedbase64 import encode_base64
from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.util.caches.descriptors import cached, cachedList
+# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
+# despite being deprecated and removed in favor of memoryview
+if six.PY2:
+ db_binary_type = buffer
+else:
+ db_binary_type = memoryview
+
class SignatureWorkerStore(SQLBaseStore):
@cached()
@@ -56,7 +64,7 @@ class SignatureWorkerStore(SQLBaseStore):
for e_id, h in hashes.items()
}
- defer.returnValue(hashes.items())
+ defer.returnValue(list(hashes.items()))
def _get_event_reference_hashes_txn(self, txn, event_id):
"""Get all the hashes for a given PDU.
@@ -91,7 +99,7 @@ class SignatureStore(SignatureWorkerStore):
vals.append({
"event_id": event.event_id,
"algorithm": ref_alg,
- "hash": buffer(ref_hash_bytes),
+ "hash": db_binary_type(ref_hash_bytes),
})
self._simple_insert_many_txn(
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index ffa4246031..bdee14a8eb 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -16,6 +16,9 @@
from collections import namedtuple
import logging
+from six import iteritems, itervalues
+from six.moves import range
+
from twisted.internet import defer
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -134,7 +137,7 @@ class StateGroupWorkerStore(SQLBaseStore):
event_ids,
)
- groups = set(event_to_groups.itervalues())
+ groups = set(itervalues(event_to_groups))
group_to_state = yield self._get_state_for_groups(groups)
defer.returnValue(group_to_state)
@@ -166,18 +169,18 @@ class StateGroupWorkerStore(SQLBaseStore):
state_event_map = yield self.get_events(
[
- ev_id for group_ids in group_to_ids.itervalues()
- for ev_id in group_ids.itervalues()
+ ev_id for group_ids in itervalues(group_to_ids)
+ for ev_id in itervalues(group_ids)
],
get_prev_content=False
)
defer.returnValue({
group: [
- state_event_map[v] for v in event_id_map.itervalues()
+ state_event_map[v] for v in itervalues(event_id_map)
if v in state_event_map
]
- for group, event_id_map in group_to_ids.iteritems()
+ for group, event_id_map in iteritems(group_to_ids)
})
@defer.inlineCallbacks
@@ -186,7 +189,7 @@ class StateGroupWorkerStore(SQLBaseStore):
"""
results = {}
- chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
+ chunks = [groups[i:i + 100] for i in range(0, len(groups), 100)]
for chunk in chunks:
res = yield self.runInteraction(
"_get_state_groups_from_groups",
@@ -347,21 +350,21 @@ class StateGroupWorkerStore(SQLBaseStore):
event_ids,
)
- groups = set(event_to_groups.itervalues())
+ groups = set(itervalues(event_to_groups))
group_to_state = yield self._get_state_for_groups(groups, types)
state_event_map = yield self.get_events(
- [ev_id for sd in group_to_state.itervalues() for ev_id in sd.itervalues()],
+ [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
get_prev_content=False
)
event_to_state = {
event_id: {
k: state_event_map[v]
- for k, v in group_to_state[group].iteritems()
+ for k, v in iteritems(group_to_state[group])
if v in state_event_map
}
- for event_id, group in event_to_groups.iteritems()
+ for event_id, group in iteritems(event_to_groups)
}
defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -384,12 +387,12 @@ class StateGroupWorkerStore(SQLBaseStore):
event_ids,
)
- groups = set(event_to_groups.itervalues())
+ groups = set(itervalues(event_to_groups))
group_to_state = yield self._get_state_for_groups(groups, types)
event_to_state = {
event_id: group_to_state[group]
- for event_id, group in event_to_groups.iteritems()
+ for event_id, group in iteritems(event_to_groups)
}
defer.returnValue({event: event_to_state[event] for event in event_ids})
@@ -503,7 +506,7 @@ class StateGroupWorkerStore(SQLBaseStore):
got_all = is_all or not missing_types
return {
- k: v for k, v in state_dict_ids.iteritems()
+ k: v for k, v in iteritems(state_dict_ids)
if include(k[0], k[1])
}, missing_types, got_all
@@ -562,12 +565,12 @@ class StateGroupWorkerStore(SQLBaseStore):
# Now we want to update the cache with all the things we fetched
# from the database.
- for group, group_state_dict in group_to_state_dict.iteritems():
+ for group, group_state_dict in iteritems(group_to_state_dict):
state_dict = results[group]
state_dict.update(
((intern_string(k[0]), intern_string(k[1])), to_ascii(v))
- for k, v in group_state_dict.iteritems()
+ for k, v in iteritems(group_state_dict)
)
self._state_group_cache.update(
@@ -654,7 +657,7 @@ class StateGroupWorkerStore(SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in delta_ids.iteritems()
+ for key, state_id in iteritems(delta_ids)
],
)
else:
@@ -669,7 +672,7 @@ class StateGroupWorkerStore(SQLBaseStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in current_state_ids.iteritems()
+ for key, state_id in iteritems(current_state_ids)
],
)
@@ -794,11 +797,11 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
"state_group": state_group_id,
"event_id": event_id,
}
- for event_id, state_group_id in state_groups.iteritems()
+ for event_id, state_group_id in iteritems(state_groups)
],
)
- for event_id, state_group_id in state_groups.iteritems():
+ for event_id, state_group_id in iteritems(state_groups):
txn.call_after(
self._get_state_group_for_event.prefill,
(event_id,), state_group_id
@@ -826,7 +829,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
def reindex_txn(txn):
new_last_state_group = last_state_group
- for count in xrange(batch_size):
+ for count in range(batch_size):
txn.execute(
"SELECT id, room_id FROM state_groups"
" WHERE ? < id AND id <= ?"
@@ -884,7 +887,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
# of keys
delta_state = {
- key: value for key, value in curr_state.iteritems()
+ key: value for key, value in iteritems(curr_state)
if prev_state.get(key, None) != value
}
@@ -924,7 +927,7 @@ class StateStore(StateGroupWorkerStore, BackgroundUpdateStore):
"state_key": key[1],
"event_id": state_id,
}
- for key, state_id in delta_state.iteritems()
+ for key, state_id in iteritems(delta_state)
],
)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index f825264ea9..e485d19b84 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -17,6 +17,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from twisted.internet import defer
+import six
from canonicaljson import encode_canonical_json
@@ -25,6 +26,13 @@ from collections import namedtuple
import logging
import simplejson as json
+# py2 sqlite has buffer hardcoded as only binary type, so we must use it,
+# despite being deprecated and removed in favor of memoryview
+if six.PY2:
+ db_binary_type = buffer
+else:
+ db_binary_type = memoryview
+
logger = logging.getLogger(__name__)
@@ -110,7 +118,7 @@ class TransactionStore(SQLBaseStore):
"transaction_id": transaction_id,
"origin": origin,
"response_code": code,
- "response_json": buffer(encode_canonical_json(response_dict)),
+ "response_json": db_binary_type(encode_canonical_json(response_dict)),
"ts": self._clock.time_msec(),
},
or_ignore=True,
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index d6e289ffbe..275c299998 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -22,6 +22,8 @@ from synapse.api.constants import EventTypes, JoinRules
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id, get_localpart_from_id
+from six import iteritems
+
import re
import logging
@@ -100,7 +102,7 @@ class UserDirectoryStore(SQLBaseStore):
user_id, get_localpart_from_id(user_id), get_domain_from_id(user_id),
profile.display_name,
)
- for user_id, profile in users_with_profile.iteritems()
+ for user_id, profile in iteritems(users_with_profile)
)
elif isinstance(self.database_engine, Sqlite3Engine):
sql = """
@@ -112,7 +114,7 @@ class UserDirectoryStore(SQLBaseStore):
user_id,
"%s %s" % (user_id, p.display_name,) if p.display_name else user_id
)
- for user_id, p in users_with_profile.iteritems()
+ for user_id, p in iteritems(users_with_profile)
)
else:
# This should be unreachable.
@@ -130,7 +132,7 @@ class UserDirectoryStore(SQLBaseStore):
"display_name": profile.display_name,
"avatar_url": profile.avatar_url,
}
- for user_id, profile in users_with_profile.iteritems()
+ for user_id, profile in iteritems(users_with_profile)
]
)
for user_id in users_with_profile:
|