summary refs log tree commit diff
path: root/synapse/crypto/keyring.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/crypto/keyring.py')
-rw-r--r--synapse/crypto/keyring.py339
1 files changed, 203 insertions, 136 deletions
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 1bb27edc0f..e95b9fb43e 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,33 +14,37 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.crypto.keyclient import fetch_server_key
-from synapse.api.errors import SynapseError, Codes
-from synapse.util import unwrapFirstError
-from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import (
-    preserve_context_over_deferred, preserve_context_over_fn, PreserveLoggingContext,
-    preserve_fn
-)
-from synapse.util.metrics import Measure
-
-from twisted.internet import defer
+import hashlib
+import logging
+import urllib
+from collections import namedtuple
 
-from signedjson.sign import (
-    verify_signed_json, signature_ids, sign_json, encode_canonical_json
-)
 from signedjson.key import (
-    is_signing_algorithm_supported, decode_verify_key_bytes
+    decode_verify_key_bytes,
+    encode_verify_key_base64,
+    is_signing_algorithm_supported,
+)
+from signedjson.sign import (
+    SignatureVerifyException,
+    encode_canonical_json,
+    sign_json,
+    signature_ids,
+    verify_signed_json,
 )
 from unpaddedbase64 import decode_base64, encode_base64
 
 from OpenSSL import crypto
+from twisted.internet import defer
 
-from collections import namedtuple
-import urllib
-import hashlib
-import logging
-
+from synapse.api.errors import Codes, SynapseError
+from synapse.crypto.keyclient import fetch_server_key
+from synapse.util import logcontext, unwrapFirstError
+from synapse.util.logcontext import (
+    PreserveLoggingContext,
+    preserve_fn,
+    run_in_background,
+)
+from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -55,9 +60,10 @@ Attributes:
     key_ids(set(str)): The set of key_ids to that could be used to verify the
         JSON object
     json_object(dict): The JSON object to verify.
-    deferred(twisted.internet.defer.Deferred):
+    deferred(Deferred[str, str, nacl.signing.VerifyKey]):
         A deferred (server_name, key_id, verify_key) tuple that resolves when
-        a verify key has been fetched
+        a verify key has been fetched. The deferreds' callbacks are run with no
+        logcontext.
 """
 
 
@@ -74,23 +80,32 @@ class Keyring(object):
         self.perspective_servers = self.config.perspectives
         self.hs = hs
 
+        # map from server name to Deferred. Has an entry for each server with
+        # an ongoing key download; the Deferred completes once the download
+        # completes.
+        #
+        # These are regular, logcontext-agnostic Deferreds.
         self.key_downloads = {}
 
     def verify_json_for_server(self, server_name, json_object):
-        return self.verify_json_objects_for_server(
-            [(server_name, json_object)]
-        )[0]
+        return logcontext.make_deferred_yieldable(
+            self.verify_json_objects_for_server(
+                [(server_name, json_object)]
+            )[0]
+        )
 
     def verify_json_objects_for_server(self, server_and_json):
-        """Bulk verfies signatures of json objects, bulk fetching keys as
+        """Bulk verifies signatures of json objects, bulk fetching keys as
         necessary.
 
         Args:
             server_and_json (list): List of pairs of (server_name, json_object)
 
         Returns:
-            list of deferreds indicating success or failure to verify each
-            json object's signature for the given server_name.
+            List<Deferred>: for each input pair, a deferred indicating success
+                or failure to verify each json object's signature for the given
+                server_name. The deferreds run their callbacks in the sentinel
+                logcontext.
         """
         verify_requests = []
 
@@ -117,73 +132,60 @@ class Keyring(object):
 
             verify_requests.append(verify_request)
 
-        @defer.inlineCallbacks
-        def handle_key_deferred(verify_request):
-            server_name = verify_request.server_name
-            try:
-                _, key_id, verify_key = yield verify_request.deferred
-            except IOError as e:
-                logger.warn(
-                    "Got IOError when downloading keys for %s: %s %s",
-                    server_name, type(e).__name__, str(e.message),
-                )
-                raise SynapseError(
-                    502,
-                    "Error downloading keys for %s" % (server_name,),
-                    Codes.UNAUTHORIZED,
-                )
-            except Exception as e:
-                logger.exception(
-                    "Got Exception when downloading keys for %s: %s %s",
-                    server_name, type(e).__name__, str(e.message),
-                )
-                raise SynapseError(
-                    401,
-                    "No key for %s with id %s" % (server_name, key_ids),
-                    Codes.UNAUTHORIZED,
-                )
+        run_in_background(self._start_key_lookups, verify_requests)
 
-            json_object = verify_request.json_object
+        # Pass those keys to handle_key_deferred so that the json object
+        # signatures can be verified
+        handle = preserve_fn(_handle_key_deferred)
+        return [
+            handle(rq) for rq in verify_requests
+        ]
 
-            logger.debug("Got key %s %s:%s for server %s, verifying" % (
-                key_id, verify_key.alg, verify_key.version, server_name,
-            ))
-            try:
-                verify_signed_json(json_object, server_name, verify_key)
-            except:
-                raise SynapseError(
-                    401,
-                    "Invalid signature for server %s with key %s:%s" % (
-                        server_name, verify_key.alg, verify_key.version
-                    ),
-                    Codes.UNAUTHORIZED,
-                )
+    @defer.inlineCallbacks
+    def _start_key_lookups(self, verify_requests):
+        """Sets off the key fetches for each verify request
 
-        server_to_deferred = {
-            server_name: defer.Deferred()
-            for server_name, _ in server_and_json
-        }
+        Once each fetch completes, verify_request.deferred will be resolved.
 
-        with PreserveLoggingContext():
+        Args:
+            verify_requests (List[VerifyKeyRequest]):
+        """
+
+        try:
+            # create a deferred for each server we're going to look up the keys
+            # for; we'll resolve them once we have completed our lookups.
+            # These will be passed into wait_for_previous_lookups to block
+            # any other lookups until we have finished.
+            # The deferreds are called with no logcontext.
+            server_to_deferred = {
+                rq.server_name: defer.Deferred()
+                for rq in verify_requests
+            }
 
             # We want to wait for any previous lookups to complete before
             # proceeding.
-            wait_on_deferred = self.wait_for_previous_lookups(
-                [server_name for server_name, _ in server_and_json],
+            yield self.wait_for_previous_lookups(
+                [rq.server_name for rq in verify_requests],
                 server_to_deferred,
             )
 
             # Actually start fetching keys.
-            wait_on_deferred.addBoth(
-                lambda _: self.get_server_verify_keys(verify_requests)
-            )
+            self._get_server_verify_keys(verify_requests)
 
             # When we've finished fetching all the keys for a given server_name,
             # resolve the deferred passed to `wait_for_previous_lookups` so that
             # any lookups waiting will proceed.
+            #
+            # map from server name to a set of request ids
             server_to_request_ids = {}
 
-            def remove_deferreds(res, server_name, verify_request):
+            for verify_request in verify_requests:
+                server_name = verify_request.server_name
+                request_id = id(verify_request)
+                server_to_request_ids.setdefault(server_name, set()).add(request_id)
+
+            def remove_deferreds(res, verify_request):
+                server_name = verify_request.server_name
                 request_id = id(verify_request)
                 server_to_request_ids[server_name].discard(request_id)
                 if not server_to_request_ids[server_name]:
@@ -193,17 +195,11 @@ class Keyring(object):
                 return res
 
             for verify_request in verify_requests:
-                server_name = verify_request.server_name
-                request_id = id(verify_request)
-                server_to_request_ids.setdefault(server_name, set()).add(request_id)
-                deferred.addBoth(remove_deferreds, server_name, verify_request)
-
-        # Pass those keys to handle_key_deferred so that the json object
-        # signatures can be verified
-        return [
-            preserve_context_over_fn(handle_key_deferred, verify_request)
-            for verify_request in verify_requests
-        ]
+                verify_request.deferred.addBoth(
+                    remove_deferreds, verify_request,
+                )
+        except Exception:
+            logger.exception("Error starting key lookups")
 
     @defer.inlineCallbacks
     def wait_for_previous_lookups(self, server_names, server_to_deferred):
@@ -212,7 +208,13 @@ class Keyring(object):
         Args:
             server_names (list): list of server_names we want to lookup
             server_to_deferred (dict): server_name to deferred which gets
-                resolved once we've finished looking up keys for that server
+                resolved once we've finished looking up keys for that server.
+                The Deferreds should be regular twisted ones which call their
+                callbacks with no logcontext.
+
+        Returns: a Deferred which resolves once all key lookups for the given
+            servers have completed. Follows the synapse rules of logcontext
+            preservation.
         """
         while True:
             wait_on = [
@@ -226,17 +228,15 @@ class Keyring(object):
             else:
                 break
 
-        for server_name, deferred in server_to_deferred.items():
-            d = ObservableDeferred(preserve_context_over_deferred(deferred))
-            self.key_downloads[server_name] = d
-
-            def rm(r, server_name):
-                self.key_downloads.pop(server_name, None)
-                return r
+        def rm(r, server_name_):
+            self.key_downloads.pop(server_name_, None)
+            return r
 
-            d.addBoth(rm, server_name)
+        for server_name, deferred in server_to_deferred.items():
+            self.key_downloads[server_name] = deferred
+            deferred.addBoth(rm, server_name)
 
-    def get_server_verify_keys(self, verify_requests):
+    def _get_server_verify_keys(self, verify_requests):
         """Tries to find at least one key for each verify request
 
         For each verify_request, verify_request.deferred is called back with
@@ -305,21 +305,23 @@ class Keyring(object):
                     if not missing_keys:
                         break
 
-                for verify_request in requests_missing_keys.values():
-                    verify_request.deferred.errback(SynapseError(
-                        401,
-                        "No key for %s with id %s" % (
-                            verify_request.server_name, verify_request.key_ids,
-                        ),
-                        Codes.UNAUTHORIZED,
-                    ))
+                with PreserveLoggingContext():
+                    for verify_request in requests_missing_keys:
+                        verify_request.deferred.errback(SynapseError(
+                            401,
+                            "No key for %s with id %s" % (
+                                verify_request.server_name, verify_request.key_ids,
+                            ),
+                            Codes.UNAUTHORIZED,
+                        ))
 
         def on_err(err):
-            for verify_request in verify_requests:
-                if not verify_request.deferred.called:
-                    verify_request.deferred.errback(err)
+            with PreserveLoggingContext():
+                for verify_request in verify_requests:
+                    if not verify_request.deferred.called:
+                        verify_request.deferred.errback(err)
 
-        do_iterations().addErrback(on_err)
+        run_in_background(do_iterations).addErrback(on_err)
 
     @defer.inlineCallbacks
     def get_keys_from_store(self, server_name_and_key_ids):
@@ -333,15 +335,16 @@ class Keyring(object):
             Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from
                 server_name -> key_id -> VerifyKey
         """
-        res = yield preserve_context_over_deferred(defer.gatherResults(
+        res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store.get_server_verify_keys)(
-                    server_name, key_ids
+                run_in_background(
+                    self.store.get_server_verify_keys,
+                    server_name, key_ids,
                 ).addCallback(lambda ks, server: (server, ks), server_name)
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(dict(res))
 
@@ -358,17 +361,17 @@ class Keyring(object):
                 logger.exception(
                     "Unable to get key from %r: %s %s",
                     perspective_name,
-                    type(e).__name__, str(e.message),
+                    type(e).__name__, str(e),
                 )
                 defer.returnValue({})
 
-        results = yield preserve_context_over_deferred(defer.gatherResults(
+        results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(get_key)(p_name, p_keys)
+                run_in_background(get_key, p_name, p_keys)
                 for p_name, p_keys in self.perspective_servers.items()
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         union_of_keys = {}
         for result in results:
@@ -390,7 +393,7 @@ class Keyring(object):
                 logger.info(
                     "Unable to get key %r for %r directly: %s %s",
                     key_ids, server_name,
-                    type(e).__name__, str(e.message),
+                    type(e).__name__, str(e),
                 )
 
             if not keys:
@@ -402,13 +405,13 @@ class Keyring(object):
 
             defer.returnValue(keys)
 
-        results = yield preserve_context_over_deferred(defer.gatherResults(
+        results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(get_key)(server_name, key_ids)
+                run_in_background(get_key, server_name, key_ids)
                 for server_name, key_ids in server_name_and_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         merged = {}
         for result in results:
@@ -485,9 +488,10 @@ class Keyring(object):
             for server_name, response_keys in processed_response.items():
                 keys.setdefault(server_name, {}).update(response_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store_keys)(
+                run_in_background(
+                    self.store_keys,
                     server_name=server_name,
                     from_server=perspective_name,
                     verify_keys=response_keys,
@@ -495,7 +499,7 @@ class Keyring(object):
                 for server_name, response_keys in keys.items()
             ],
             consumeErrors=True
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(keys)
 
@@ -543,9 +547,10 @@ class Keyring(object):
 
             keys.update(response_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store_keys)(
+                run_in_background(
+                    self.store_keys,
                     server_name=key_server_name,
                     from_server=server_name,
                     verify_keys=verify_keys,
@@ -553,7 +558,7 @@ class Keyring(object):
                 for key_server_name, verify_keys in keys.items()
             ],
             consumeErrors=True
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         defer.returnValue(keys)
 
@@ -619,9 +624,10 @@ class Keyring(object):
         response_keys.update(verify_keys)
         response_keys.update(old_verify_keys)
 
-        yield preserve_context_over_deferred(defer.gatherResults(
+        yield logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store.store_server_keys_json)(
+                run_in_background(
+                    self.store.store_server_keys_json,
                     server_name=server_name,
                     key_id=key_id,
                     from_server=server_name,
@@ -632,7 +638,7 @@ class Keyring(object):
                 for key_id in updated_key_ids
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
 
         results[server_name] = response_keys
 
@@ -710,7 +716,6 @@ class Keyring(object):
 
         defer.returnValue(verify_keys)
 
-    @defer.inlineCallbacks
     def store_keys(self, server_name, from_server, verify_keys):
         """Store a collection of verify keys for a given server
         Args:
@@ -721,12 +726,74 @@ class Keyring(object):
             A deferred that completes when the keys are stored.
         """
         # TODO(markjh): Store whether the keys have expired.
-        yield preserve_context_over_deferred(defer.gatherResults(
+        return logcontext.make_deferred_yieldable(defer.gatherResults(
             [
-                preserve_fn(self.store.store_server_verify_key)(
+                run_in_background(
+                    self.store.store_server_verify_key,
                     server_name, server_name, key.time_added, key
                 )
                 for key_id, key in verify_keys.items()
             ],
             consumeErrors=True,
-        )).addErrback(unwrapFirstError)
+        ).addErrback(unwrapFirstError))
+
+
+@defer.inlineCallbacks
+def _handle_key_deferred(verify_request):
+    """Waits for the key to become available, and then performs a verification
+
+    Args:
+        verify_request (VerifyKeyRequest):
+
+    Returns:
+        Deferred[None]
+
+    Raises:
+        SynapseError if there was a problem performing the verification
+    """
+    server_name = verify_request.server_name
+    try:
+        with PreserveLoggingContext():
+            _, key_id, verify_key = yield verify_request.deferred
+    except IOError as e:
+        logger.warn(
+            "Got IOError when downloading keys for %s: %s %s",
+            server_name, type(e).__name__, str(e),
+        )
+        raise SynapseError(
+            502,
+            "Error downloading keys for %s" % (server_name,),
+            Codes.UNAUTHORIZED,
+        )
+    except Exception as e:
+        logger.exception(
+            "Got Exception when downloading keys for %s: %s %s",
+            server_name, type(e).__name__, str(e),
+        )
+        raise SynapseError(
+            401,
+            "No key for %s with id %s" % (server_name, verify_request.key_ids),
+            Codes.UNAUTHORIZED,
+        )
+
+    json_object = verify_request.json_object
+
+    logger.debug("Got key %s %s:%s for server %s, verifying" % (
+        key_id, verify_key.alg, verify_key.version, server_name,
+    ))
+    try:
+        verify_signed_json(json_object, server_name, verify_key)
+    except SignatureVerifyException as e:
+        logger.debug(
+            "Error verifying signature for %s:%s:%s with key %s: %s",
+            server_name, verify_key.alg, verify_key.version,
+            encode_verify_key_base64(verify_key),
+            str(e),
+        )
+        raise SynapseError(
+            401,
+            "Invalid signature for server %s with key %s:%s: %s" % (
+                server_name, verify_key.alg, verify_key.version, str(e),
+            ),
+            Codes.UNAUTHORIZED,
+        )