summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py98
1 files changed, 21 insertions, 77 deletions
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index bcf746b7ef..20698bfd16 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -25,7 +25,9 @@ from twisted.internet import defer
 
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.storage.database import make_in_list_sql_clause
 from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.iterutils import batch_iter
 
 
 class EndToEndKeyWorkerStore(SQLBaseStore):
@@ -268,53 +270,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             "count_e2e_one_time_keys", _count_e2e_one_time_keys
         )
 
-    def _get_e2e_cross_signing_key_txn(self, txn, user_id, key_type, from_user_id=None):
-        """Returns a user's cross-signing key.
-
-        Args:
-            txn (twisted.enterprise.adbapi.Connection): db connection
-            user_id (str): the user whose key is being requested
-            key_type (str): the type of key that is being requested: either 'master'
-                for a master key, 'self_signing' for a self-signing key, or
-                'user_signing' for a user-signing key
-            from_user_id (str): if specified, signatures made by this user on
-                the key will be included in the result
-
-        Returns:
-            dict of the key data or None if not found
-        """
-        sql = (
-            "SELECT keydata "
-            "  FROM e2e_cross_signing_keys "
-            " WHERE user_id = ? AND keytype = ? ORDER BY stream_id DESC LIMIT 1"
-        )
-        txn.execute(sql, (user_id, key_type))
-        row = txn.fetchone()
-        if not row:
-            return None
-        key = json.loads(row[0])
-
-        device_id = None
-        for k in key["keys"].values():
-            device_id = k
-
-        if from_user_id is not None:
-            sql = (
-                "SELECT key_id, signature "
-                "  FROM e2e_cross_signing_signatures "
-                " WHERE user_id = ? "
-                "   AND target_user_id = ? "
-                "   AND target_device_id = ? "
-            )
-            txn.execute(sql, (from_user_id, user_id, device_id))
-            row = txn.fetchone()
-            if row:
-                key.setdefault("signatures", {}).setdefault(from_user_id, {})[
-                    row[0]
-                ] = row[1]
-
-        return key
-
+    @defer.inlineCallbacks
     def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None):
         """Returns a user's cross-signing key.
 
@@ -329,13 +285,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
         Returns:
             dict of the key data or None if not found
         """
-        return self.db.runInteraction(
-            "get_e2e_cross_signing_key",
-            self._get_e2e_cross_signing_key_txn,
-            user_id,
-            key_type,
-            from_user_id,
-        )
+        res = yield self.get_e2e_cross_signing_keys_bulk([user_id], from_user_id)
+        user_keys = res.get(user_id)
+        if not user_keys:
+            return None
+        return user_keys.get(key_type)
 
     @cached(num_args=1)
     def _get_bare_e2e_cross_signing_keys(self, user_id):
@@ -391,26 +345,24 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
         """
         result = {}
 
-        batch_size = 100
-        chunks = [
-            user_ids[i : i + batch_size] for i in range(0, len(user_ids), batch_size)
-        ]
-        for user_chunk in chunks:
-            sql = """
+        for user_chunk in batch_iter(user_ids, 100):
+            clause, params = make_in_list_sql_clause(
+                txn.database_engine, "k.user_id", user_chunk
+            )
+            sql = (
+                """
                 SELECT k.user_id, k.keytype, k.keydata, k.stream_id
                   FROM e2e_cross_signing_keys k
                   INNER JOIN (SELECT user_id, keytype, MAX(stream_id) AS stream_id
                                 FROM e2e_cross_signing_keys
                                GROUP BY user_id, keytype) s
                  USING (user_id, stream_id, keytype)
-                 WHERE k.user_id IN (%s)
-            """ % (
-                ",".join("?" for u in user_chunk),
+                 WHERE
+            """
+                + clause
             )
-            query_params = []
-            query_params.extend(user_chunk)
 
-            txn.execute(sql, query_params)
+            txn.execute(sql, params)
             rows = self.db.cursor_to_dict(txn)
 
             for row in rows:
@@ -453,15 +405,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                     device_id = k
                 devices[(user_id, device_id)] = key_type
 
-        device_list = list(devices)
-
-        # split into batches
-        batch_size = 100
-        chunks = [
-            device_list[i : i + batch_size]
-            for i in range(0, len(device_list), batch_size)
-        ]
-        for user_chunk in chunks:
+        for batch in batch_iter(devices.keys(), size=100):
             sql = """
                 SELECT target_user_id, target_device_id, key_id, signature
                   FROM e2e_cross_signing_signatures
@@ -469,11 +413,11 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                    AND (%s)
             """ % (
                 " OR ".join(
-                    "(target_user_id = ? AND target_device_id = ?)" for d in devices
+                    "(target_user_id = ? AND target_device_id = ?)" for _ in batch
                 )
             )
             query_params = [from_user_id]
-            for item in devices:
+            for item in batch:
                 # item is a (user_id, device_id) tuple
                 query_params.extend(item)