summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/5030.misc1
-rw-r--r--synapse/crypto/keyring.py51
-rw-r--r--synapse/replication/slave/storage/keys.py18
-rw-r--r--synapse/storage/keys.py74
-rw-r--r--tests/storage/test_keys.py83
5 files changed, 136 insertions, 91 deletions
diff --git a/changelog.d/5030.misc b/changelog.d/5030.misc
new file mode 100644
index 0000000000..3456eb5381
--- /dev/null
+++ b/changelog.d/5030.misc
@@ -0,0 +1 @@
+Rewrite Datastore.get_server_verify_keys to reduce the number of database transactions.
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 04beededdc..834b107705 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -275,10 +275,6 @@ class Keyring(object):
         @defer.inlineCallbacks
         def do_iterations():
             with Measure(self.clock, "get_server_verify_keys"):
-                # dict[str, dict[str, VerifyKey]]: results so far.
-                # map server_name -> key_id -> VerifyKey
-                merged_results = {}
-
                 # dict[str, set(str)]: keys to fetch for each server
                 missing_keys = {}
                 for verify_request in verify_requests:
@@ -288,29 +284,29 @@ class Keyring(object):
 
                 for fn in key_fetch_fns:
                     results = yield fn(missing_keys.items())
-                    merged_results.update(results)
 
                     # We now need to figure out which verify requests we have keys
                     # for and which we don't
                     missing_keys = {}
                     requests_missing_keys = []
                     for verify_request in verify_requests:
-                        server_name = verify_request.server_name
-                        result_keys = merged_results[server_name]
-
                         if verify_request.deferred.called:
                             # We've already called this deferred, which probably
                             # means that we've already found a key for it.
                             continue
 
+                        server_name = verify_request.server_name
+
+                        # see if any of the keys we got this time are sufficient to
+                        # complete this VerifyKeyRequest.
+                        result_keys = results.get(server_name, {})
                         for key_id in verify_request.key_ids:
-                            if key_id in result_keys:
+                            key = result_keys.get(key_id)
+                            if key:
                                 with PreserveLoggingContext():
-                                    verify_request.deferred.callback((
-                                        server_name,
-                                        key_id,
-                                        result_keys[key_id],
-                                    ))
+                                    verify_request.deferred.callback(
+                                        (server_name, key_id, key)
+                                    )
                                 break
                         else:
                             # The else block is only reached if the loop above
@@ -344,27 +340,24 @@ class Keyring(object):
     @defer.inlineCallbacks
     def get_keys_from_store(self, server_name_and_key_ids):
         """
-
         Args:
-            server_name_and_key_ids (list[(str, iterable[str])]):
+            server_name_and_key_ids (iterable(Tuple[str, iterable[str]]):
                 list of (server_name, iterable[key_id]) tuples to fetch keys for
 
         Returns:
-            Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from
+            Deferred: resolves to dict[str, dict[str, VerifyKey|None]]: map from
                 server_name -> key_id -> VerifyKey
         """
-        res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
-            [
-                run_in_background(
-                    self.store.get_server_verify_keys,
-                    server_name, key_ids,
-                ).addCallback(lambda ks, server: (server, ks), server_name)
-                for server_name, key_ids in server_name_and_key_ids
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError))
-
-        defer.returnValue(dict(res))
+        keys_to_fetch = (
+            (server_name, key_id)
+            for server_name, key_ids in server_name_and_key_ids
+            for key_id in key_ids
+        )
+        res = yield self.store.get_server_verify_keys(keys_to_fetch)
+        keys = {}
+        for (server_name, key_id), key in res.items():
+            keys.setdefault(server_name, {})[key_id] = key
+        defer.returnValue(keys)
 
     @defer.inlineCallbacks
     def get_keys_from_perspectives(self, server_name_and_key_ids):
diff --git a/synapse/replication/slave/storage/keys.py b/synapse/replication/slave/storage/keys.py
index de00660c0e..cc6f7f009f 100644
--- a/synapse/replication/slave/storage/keys.py
+++ b/synapse/replication/slave/storage/keys.py
@@ -13,19 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage import DataStore
-from synapse.storage.keys import KeyStore
+from synapse.storage import KeyStore
 
-from ._base import BaseSlavedStore, __func__
+# KeyStore isn't really safe to use from a worker, but for now we do so and hope that
+# the races it creates aren't too bad.
 
-
-class SlavedKeyStore(BaseSlavedStore):
-    _get_server_verify_key = KeyStore.__dict__[
-        "_get_server_verify_key"
-    ]
-
-    get_server_verify_keys = __func__(DataStore.get_server_verify_keys)
-    store_server_verify_key = __func__(DataStore.store_server_verify_key)
-
-    get_server_keys_json = __func__(DataStore.get_server_keys_json)
-    store_server_keys_json = __func__(DataStore.store_server_keys_json)
+SlavedKeyStore = KeyStore
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 47a9aa784b..7036541792 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,15 +14,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import itertools
 import logging
 
 import six
 
 from signedjson.key import decode_verify_key_bytes
 
-from twisted.internet import defer
-
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util import batch_iter
+from synapse.util.caches.descriptors import cached, cachedList
 
 from ._base import SQLBaseStore
 
@@ -38,36 +39,50 @@ else:
 class KeyStore(SQLBaseStore):
     """Persistence for signature verification keys
     """
-    @cachedInlineCallbacks()
-    def _get_server_verify_key(self, server_name, key_id):
-        verify_key_bytes = yield self._simple_select_one_onecol(
-            table="server_signature_keys",
-            keyvalues={"server_name": server_name, "key_id": key_id},
-            retcol="verify_key",
-            desc="_get_server_verify_key",
-            allow_none=True,
-        )
 
-        if verify_key_bytes:
-            defer.returnValue(decode_verify_key_bytes(key_id, bytes(verify_key_bytes)))
+    @cached()
+    def _get_server_verify_key(self, server_name_and_key_id):
+        raise NotImplementedError()
 
-    @defer.inlineCallbacks
-    def get_server_verify_keys(self, server_name, key_ids):
-        """Retrieve the NACL verification key for a given server for the given
-        key_ids
+    @cachedList(
+        cached_method_name="_get_server_verify_key", list_name="server_name_and_key_ids"
+    )
+    def get_server_verify_keys(self, server_name_and_key_ids):
+        """
         Args:
-            server_name (str): The name of the server.
-            key_ids (iterable[str]): key_ids to try and look up.
+            server_name_and_key_ids (iterable[Tuple[str, str]]):
+                iterable of (server_name, key-id) tuples to fetch keys for
+
         Returns:
-            Deferred: resolves to dict[str, VerifyKey]: map from
-               key_id to verification key.
+            Deferred: resolves to dict[Tuple[str, str], VerifyKey|None]:
+                map from (server_name, key_id) -> VerifyKey, or None if the key is
+                unknown
         """
         keys = {}
-        for key_id in key_ids:
-            key = yield self._get_server_verify_key(server_name, key_id)
-            if key:
-                keys[key_id] = key
-        defer.returnValue(keys)
+
+        def _get_keys(txn, batch):
+            """Processes a batch of keys to fetch, and adds the result to `keys`."""
+
+            # batch_iter always returns tuples so it's safe to do len(batch)
+            sql = (
+                "SELECT server_name, key_id, verify_key FROM server_signature_keys "
+                "WHERE 1=0"
+            ) + " OR (server_name=? AND key_id=?)" * len(batch)
+
+            txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
+
+            for row in txn:
+                server_name, key_id, key_bytes = row
+                keys[(server_name, key_id)] = decode_verify_key_bytes(
+                    key_id, bytes(key_bytes)
+                )
+
+        def _txn(txn):
+            for batch in batch_iter(server_name_and_key_ids, 50):
+                _get_keys(txn, batch)
+            return keys
+
+        return self.runInteraction("get_server_verify_keys", _txn)
 
     def store_server_verify_key(
         self, server_name, from_server, time_now_ms, verify_key
@@ -93,8 +108,11 @@ class KeyStore(SQLBaseStore):
                     "verify_key": db_binary_type(verify_key.encode()),
                 },
             )
+            # invalidate takes a tuple corresponding to the params of
+            # _get_server_verify_key. _get_server_verify_key only takes one
+            # param, which is itself the 2-tuple (server_name, key_id).
             txn.call_after(
-                self._get_server_verify_key.invalidate, (server_name, key_id)
+                self._get_server_verify_key.invalidate, ((server_name, key_id),)
             )
 
         return self.runInteraction("store_server_verify_key", _txn)
diff --git a/tests/storage/test_keys.py b/tests/storage/test_keys.py
index 0d2dc9f325..6bfaa00fe9 100644
--- a/tests/storage/test_keys.py
+++ b/tests/storage/test_keys.py
@@ -15,34 +15,77 @@
 
 import signedjson.key
 
-from twisted.internet import defer
+from twisted.internet.defer import Deferred
 
 import tests.unittest
-import tests.utils
 
+KEY_1 = signedjson.key.decode_verify_key_base64(
+    "ed25519", "key1", "fP5l4JzpZPq/zdbBg5xx6lQGAAOM9/3w94cqiJ5jPrw"
+)
+KEY_2 = signedjson.key.decode_verify_key_base64(
+    "ed25519", "key2", "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
+)
 
-class KeyStoreTestCase(tests.unittest.TestCase):
 
-    @defer.inlineCallbacks
-    def setUp(self):
-        hs = yield tests.utils.setup_test_homeserver(self.addCleanup)
-        self.store = hs.get_datastore()
-
-    @defer.inlineCallbacks
+class KeyStoreTestCase(tests.unittest.HomeserverTestCase):
     def test_get_server_verify_keys(self):
-        key1 = signedjson.key.decode_verify_key_base64(
-            "ed25519", "key1", "fP5l4JzpZPq/zdbBg5xx6lQGAAOM9/3w94cqiJ5jPrw"
-        )
-        key2 = signedjson.key.decode_verify_key_base64(
-            "ed25519", "key2", "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw"
+        store = self.hs.get_datastore()
+
+        d = store.store_server_verify_key("server1", "from_server", 0, KEY_1)
+        self.get_success(d)
+        d = store.store_server_verify_key("server1", "from_server", 0, KEY_2)
+        self.get_success(d)
+
+        d = store.get_server_verify_keys(
+            [
+                ("server1", "ed25519:key1"),
+                ("server1", "ed25519:key2"),
+                ("server1", "ed25519:key3"),
+            ]
         )
-        yield self.store.store_server_verify_key("server1", "from_server", 0, key1)
-        yield self.store.store_server_verify_key("server1", "from_server", 0, key2)
+        res = self.get_success(d)
+
+        self.assertEqual(len(res.keys()), 3)
+        self.assertEqual(res[("server1", "ed25519:key1")].version, "key1")
+        self.assertEqual(res[("server1", "ed25519:key2")].version, "key2")
+
+        # non-existent result gives None
+        self.assertIsNone(res[("server1", "ed25519:key3")])
+
+    def test_cache(self):
+        """Check that updates correctly invalidate the cache."""
+
+        store = self.hs.get_datastore()
+
+        key_id_1 = "ed25519:key1"
+        key_id_2 = "ed25519:key2"
+
+        d = store.store_server_verify_key("srv1", "from_server", 0, KEY_1)
+        self.get_success(d)
+        d = store.store_server_verify_key("srv1", "from_server", 0, KEY_2)
+        self.get_success(d)
+
+        d = store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)])
+        res = self.get_success(d)
+        self.assertEqual(len(res.keys()), 2)
+        self.assertEqual(res[("srv1", key_id_1)], KEY_1)
+        self.assertEqual(res[("srv1", key_id_2)], KEY_2)
+
+        # we should be able to look up the same thing again without a db hit
+        res = store.get_server_verify_keys([("srv1", key_id_1)])
+        if isinstance(res, Deferred):
+            res = self.successResultOf(res)
+        self.assertEqual(len(res.keys()), 1)
+        self.assertEqual(res[("srv1", key_id_1)], KEY_1)
 
-        res = yield self.store.get_server_verify_keys(
-            "server1", ["ed25519:key1", "ed25519:key2", "ed25519:key3"]
+        new_key_2 = signedjson.key.get_verify_key(
+            signedjson.key.generate_signing_key("key2")
         )
+        d = store.store_server_verify_key("srv1", "from_server", 10, new_key_2)
+        self.get_success(d)
 
+        d = store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)])
+        res = self.get_success(d)
         self.assertEqual(len(res.keys()), 2)
-        self.assertEqual(res["ed25519:key1"].version, "key1")
-        self.assertEqual(res["ed25519:key2"].version, "key2")
+        self.assertEqual(res[("srv1", key_id_1)], KEY_1)
+        self.assertEqual(res[("srv1", key_id_2)], new_key_2)