summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-04-08 14:51:07 +0100
committerRichard van der Hoff <richard@matrix.org>2019-04-09 00:00:10 +0100
commit18b69be00f9fa79cf2b237992ef1f0094d1dc453 (patch)
treea71af9dc0dcee3c322a704754fa8d610014b74d6
parentRewrite test_keys as a HomeserverTestCase (diff)
downloadsynapse-18b69be00f9fa79cf2b237992ef1f0094d1dc453.tar.xz
Rewrite Datastore.get_server_verify_keys
Rewrite this so that it doesn't hammer the database.
-rw-r--r--synapse/crypto/keyring.py38
-rw-r--r--synapse/storage/keys.py74
-rw-r--r--tests/storage/test_keys.py53
3 files changed, 113 insertions, 52 deletions
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index ede120b2a6..834b107705 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -301,13 +301,12 @@ class Keyring(object):
                         # complete this VerifyKeyRequest.
                         result_keys = results.get(server_name, {})
                         for key_id in verify_request.key_ids:
-                            if key_id in result_keys:
+                            key = result_keys.get(key_id)
+                            if key:
                                 with PreserveLoggingContext():
-                                    verify_request.deferred.callback((
-                                        server_name,
-                                        key_id,
-                                        result_keys[key_id],
-                                    ))
+                                    verify_request.deferred.callback(
+                                        (server_name, key_id, key)
+                                    )
                                 break
                         else:
                             # The else block is only reached if the loop above
@@ -341,27 +340,24 @@ class Keyring(object):
     @defer.inlineCallbacks
     def get_keys_from_store(self, server_name_and_key_ids):
         """
-
         Args:
-            server_name_and_key_ids (list[(str, iterable[str])]):
+            server_name_and_key_ids (iterable(Tuple[str, iterable[str]]):
                 list of (server_name, iterable[key_id]) tuples to fetch keys for
 
         Returns:
-            Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from
+            Deferred: resolves to dict[str, dict[str, VerifyKey|None]]: map from
                 server_name -> key_id -> VerifyKey
         """
-        res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                run_in_background(
-                    self.store.get_server_verify_keys,
-                    server_name, key_ids,
-                ).addCallback(lambda ks, server: (server, ks), server_name)
-                for server_name, key_ids in server_name_and_key_ids
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError))
-
-        defer.returnValue(dict(res))
+        keys_to_fetch = (
+            (server_name, key_id)
+            for server_name, key_ids in server_name_and_key_ids
+            for key_id in key_ids
+        )
+        res = yield self.store.get_server_verify_keys(keys_to_fetch)
+        keys = {}
+        for (server_name, key_id), key in res.items():
+            keys.setdefault(server_name, {})[key_id] = key
+        defer.returnValue(keys)
 
     @defer.inlineCallbacks
     def get_keys_from_perspectives(self, server_name_and_key_ids):
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 47a9aa784b..7036541792 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,15 +14,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import itertools
 import logging
 
 import six
 
 from signedjson.key import decode_verify_key_bytes
 
-from twisted.internet import defer
-
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util import batch_iter
+from synapse.util.caches.descriptors import cached, cachedList
 
 from ._base import SQLBaseStore
 
@@ -38,36 +39,50 @@ else:
 class KeyStore(SQLBaseStore):
     """Persistence for signature verification keys
     """
-    @cachedInlineCallbacks()
-    def _get_server_verify_key(self, server_name, key_id):
-        verify_key_bytes = yield self._simple_select_one_onecol(
-            table="server_signature_keys",
-            keyvalues={"server_name": server_name, "key_id": key_id},
-            retcol="verify_key",
-            desc="_get_server_verify_key",
-            allow_none=True,
-        )
 
-        if verify_key_bytes:
-            defer.returnValue(decode_verify_key_bytes(key_id, bytes(verify_key_bytes)))
+    @cached()
+    def _get_server_verify_key(self, server_name_and_key_id):
+        raise NotImplementedError()
 
-    @defer.inlineCallbacks
-    def get_server_verify_keys(self, server_name, key_ids):
-        """Retrieve the NACL verification key for a given server for the given
-        key_ids
+    @cachedList(
+        cached_method_name="_get_server_verify_key", list_name="server_name_and_key_ids"
+    )
+    def get_server_verify_keys(self, server_name_and_key_ids):
+        """
         Args:
-            server_name (str): The name of the server.
-            key_ids (iterable[str]): key_ids to try and look up.
+            server_name_and_key_ids (iterable[Tuple[str, str]]):
+                iterable of (server_name, key-id) tuples to fetch keys for
+
         Returns:
-            Deferred: resolves to dict[str, VerifyKey]: map from
-               key_id to verification key.
+            Deferred: resolves to dict[Tuple[str, str], VerifyKey|None]:
+                map from (server_name, key_id) -> VerifyKey, or None if the key is
+                unknown
         """
         keys = {}
-        for key_id in key_ids:
-            key = yield self._get_server_verify_key(server_name, key_id)
-            if key:
-                keys[key_id] = key
-        defer.returnValue(keys)
+
+        def _get_keys(txn, batch):
+            """Processes a batch of keys to fetch, and adds the result to `keys`."""
+
+            # batch_iter always returns tuples so it's safe to do len(batch)
+            sql = (
+                "SELECT server_name, key_id, verify_key FROM server_signature_keys "
+                "WHERE 1=0"
+            ) + " OR (server_name=? AND key_id=?)" * len(batch)
+
+            txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
+
+            for row in txn:
+                server_name, key_id, key_bytes = row
+                keys[(server_name, key_id)] = decode_verify_key_bytes(
+                    key_id, bytes(key_bytes)
+                )
+
+        def _txn(txn):
+            for batch in batch_iter(server_name_and_key_ids, 50):
+                _get_keys(txn, batch)
+            return keys
+
+        return self.runInteraction("get_server_verify_keys", _txn)
 
     def store_server_verify_key(
         self, server_name, from_server, time_now_ms, verify_key
@@ -93,8 +108,11 @@ class KeyStore(SQLBaseStore):
                     "verify_key": db_binary_type(verify_key.encode()),
                 },
             )
+            # invalidate takes a tuple corresponding to the params of
+            # _get_server_verify_key. _get_server_verify_key only takes one
+            # param, which is itself the 2-tuple (server_name, key_id).
             txn.call_after(
-                self._get_server_verify_key.invalidate, (server_name, key_id)
+                self._get_server_verify_key.invalidate, ((server_name, key_id),)
             )
 
         return self.runInteraction("store_server_verify_key", _txn)
diff --git a/tests/storage/test_keys.py b/tests/storage/test_keys.py
index 7170ae76c7..6bfaa00fe9 100644
--- a/tests/storage/test_keys.py
+++ b/tests/storage/test_keys.py
@@ -15,6 +15,8 @@
 
 import signedjson.key
 
+from twisted.internet.defer import Deferred
+
 import tests.unittest
 
 KEY_1 = signedjson.key.decode_verify_key_base64(
@@ -35,10 +37,55 @@ class KeyStoreTestCase(tests.unittest.HomeserverTestCase):
         self.get_success(d)
 
         d = store.get_server_verify_keys(
-            "server1", ["ed25519:key1", "ed25519:key2", "ed25519:key3"]
+            [
+                ("server1", "ed25519:key1"),
+                ("server1", "ed25519:key2"),
+                ("server1", "ed25519:key3"),
+            ]
         )
         res = self.get_success(d)
 
+        self.assertEqual(len(res.keys()), 3)
+        self.assertEqual(res[("server1", "ed25519:key1")].version, "key1")
+        self.assertEqual(res[("server1", "ed25519:key2")].version, "key2")
+
+        # non-existent result gives None
+        self.assertIsNone(res[("server1", "ed25519:key3")])
+
+    def test_cache(self):
+        """Check that updates correctly invalidate the cache."""
+
+        store = self.hs.get_datastore()
+
+        key_id_1 = "ed25519:key1"
+        key_id_2 = "ed25519:key2"
+
+        d = store.store_server_verify_key("srv1", "from_server", 0, KEY_1)
+        self.get_success(d)
+        d = store.store_server_verify_key("srv1", "from_server", 0, KEY_2)
+        self.get_success(d)
+
+        d = store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)])
+        res = self.get_success(d)
+        self.assertEqual(len(res.keys()), 2)
+        self.assertEqual(res[("srv1", key_id_1)], KEY_1)
+        self.assertEqual(res[("srv1", key_id_2)], KEY_2)
+
+        # we should be able to look up the same thing again without a db hit
+        res = store.get_server_verify_keys([("srv1", key_id_1)])
+        if isinstance(res, Deferred):
+            res = self.successResultOf(res)
+        self.assertEqual(len(res.keys()), 1)
+        self.assertEqual(res[("srv1", key_id_1)], KEY_1)
+
+        new_key_2 = signedjson.key.get_verify_key(
+            signedjson.key.generate_signing_key("key2")
+        )
+        d = store.store_server_verify_key("srv1", "from_server", 10, new_key_2)
+        self.get_success(d)
+
+        d = store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)])
+        res = self.get_success(d)
         self.assertEqual(len(res.keys()), 2)
-        self.assertEqual(res["ed25519:key1"].version, "key1")
-        self.assertEqual(res["ed25519:key2"].version, "key2")
+        self.assertEqual(res[("srv1", key_id_1)], KEY_1)
+        self.assertEqual(res[("srv1", key_id_2)], new_key_2)