diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/data_stores/main/end_to_end_keys.py | 98 |
1 files changed, 21 insertions, 77 deletions
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index bcf746b7ef..20698bfd16 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -25,7 +25,9 @@ from twisted.internet import defer from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage.database import make_in_list_sql_clause from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.iterutils import batch_iter class EndToEndKeyWorkerStore(SQLBaseStore): @@ -268,53 +270,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): "count_e2e_one_time_keys", _count_e2e_one_time_keys ) - def _get_e2e_cross_signing_key_txn(self, txn, user_id, key_type, from_user_id=None): - """Returns a user's cross-signing key. - - Args: - txn (twisted.enterprise.adbapi.Connection): db connection - user_id (str): the user whose key is being requested - key_type (str): the type of key that is being requested: either 'master' - for a master key, 'self_signing' for a self-signing key, or - 'user_signing' for a user-signing key - from_user_id (str): if specified, signatures made by this user on - the key will be included in the result - - Returns: - dict of the key data or None if not found - """ - sql = ( - "SELECT keydata " - " FROM e2e_cross_signing_keys " - " WHERE user_id = ? AND keytype = ? ORDER BY stream_id DESC LIMIT 1" - ) - txn.execute(sql, (user_id, key_type)) - row = txn.fetchone() - if not row: - return None - key = json.loads(row[0]) - - device_id = None - for k in key["keys"].values(): - device_id = k - - if from_user_id is not None: - sql = ( - "SELECT key_id, signature " - " FROM e2e_cross_signing_signatures " - " WHERE user_id = ? " - " AND target_user_id = ? " - " AND target_device_id = ? " - ) - txn.execute(sql, (from_user_id, user_id, device_id)) - row = txn.fetchone() - if row: - key.setdefault("signatures", {}).setdefault(from_user_id, {})[ - row[0] - ] = row[1] - - return key - + @defer.inlineCallbacks def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None): """Returns a user's cross-signing key. @@ -329,13 +285,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore): Returns: dict of the key data or None if not found """ - return self.db.runInteraction( - "get_e2e_cross_signing_key", - self._get_e2e_cross_signing_key_txn, - user_id, - key_type, - from_user_id, - ) + res = yield self.get_e2e_cross_signing_keys_bulk([user_id], from_user_id) + user_keys = res.get(user_id) + if not user_keys: + return None + return user_keys.get(key_type) @cached(num_args=1) def _get_bare_e2e_cross_signing_keys(self, user_id): @@ -391,26 +345,24 @@ class EndToEndKeyWorkerStore(SQLBaseStore): """ result = {} - batch_size = 100 - chunks = [ - user_ids[i : i + batch_size] for i in range(0, len(user_ids), batch_size) - ] - for user_chunk in chunks: - sql = """ + for user_chunk in batch_iter(user_ids, 100): + clause, params = make_in_list_sql_clause( + txn.database_engine, "k.user_id", user_chunk + ) + sql = ( + """ SELECT k.user_id, k.keytype, k.keydata, k.stream_id FROM e2e_cross_signing_keys k INNER JOIN (SELECT user_id, keytype, MAX(stream_id) AS stream_id FROM e2e_cross_signing_keys GROUP BY user_id, keytype) s USING (user_id, stream_id, keytype) - WHERE k.user_id IN (%s) - """ % ( - ",".join("?" for u in user_chunk), + WHERE + """ + + clause ) - query_params = [] - query_params.extend(user_chunk) - txn.execute(sql, query_params) + txn.execute(sql, params) rows = self.db.cursor_to_dict(txn) for row in rows: @@ -453,15 +405,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): device_id = k devices[(user_id, device_id)] = key_type - device_list = list(devices) - - # split into batches - batch_size = 100 - chunks = [ - device_list[i : i + batch_size] - for i in range(0, len(device_list), batch_size) - ] - for user_chunk in chunks: + for batch in batch_iter(devices.keys(), size=100): sql = """ SELECT target_user_id, target_device_id, key_id, signature FROM e2e_cross_signing_signatures @@ -469,11 +413,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore): AND (%s) """ % ( " OR ".join( - "(target_user_id = ? AND target_device_id = ?)" for d in devices + "(target_user_id = ? AND target_device_id = ?)" for _ in batch ) ) query_params = [from_user_id] - for item in devices: + for item in batch: # item is a (user_id, device_id) tuple query_params.extend(item) |