diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index c63f106cf3..0fd15287e7 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -15,12 +15,13 @@
# limitations under the License.
import logging
-from collections import namedtuple
+from collections import defaultdict
import six
from six import raise_from
from six.moves import urllib
+import attr
from signedjson.key import (
decode_verify_key_bytes,
encode_verify_key_base64,
@@ -45,6 +46,7 @@ from synapse.api.errors import (
)
from synapse.storage.keys import FetchKeyResult
from synapse.util import logcontext, unwrapFirstError
+from synapse.util.async_helpers import yieldable_gather_results
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
@@ -57,22 +59,36 @@ from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
-VerifyKeyRequest = namedtuple(
- "VerifyRequest", ("server_name", "key_ids", "json_object", "deferred")
-)
-"""
-A request for a verify key to verify a JSON object.
+@attr.s(slots=True, cmp=False)
+class VerifyKeyRequest(object):
+ """
+ A request for a verify key to verify a JSON object.
+
+ Attributes:
+ server_name(str): The name of the server to verify against.
+
+ key_ids(set[str]): The set of key_ids to that could be used to verify the
+ JSON object
-Attributes:
- server_name(str): The name of the server to verify against.
- key_ids(set(str)): The set of key_ids to that could be used to verify the
- JSON object
- json_object(dict): The JSON object to verify.
- deferred(Deferred[str, str, nacl.signing.VerifyKey]):
- A deferred (server_name, key_id, verify_key) tuple that resolves when
- a verify key has been fetched. The deferreds' callbacks are run with no
- logcontext.
-"""
+ json_object(dict): The JSON object to verify.
+
+ minimum_valid_until_ts (int): time at which we require the signing key to
+ be valid. (0 implies we don't care)
+
+ deferred(Deferred[str, str, nacl.signing.VerifyKey]):
+ A deferred (server_name, key_id, verify_key) tuple that resolves when
+ a verify key has been fetched. The deferreds' callbacks are run with no
+ logcontext.
+
+ If we are unable to find a key which satisfies the request, the deferred
+ errbacks with an M_UNAUTHORIZED SynapseError.
+ """
+
+ server_name = attr.ib()
+ key_ids = attr.ib()
+ json_object = attr.ib()
+ minimum_valid_until_ts = attr.ib()
+ deferred = attr.ib(default=attr.Factory(defer.Deferred))
class KeyLookupError(ValueError):
@@ -80,14 +96,16 @@ class KeyLookupError(ValueError):
class Keyring(object):
- def __init__(self, hs):
+ def __init__(self, hs, key_fetchers=None):
self.clock = hs.get_clock()
- self._key_fetchers = (
- StoreKeyFetcher(hs),
- PerspectivesKeyFetcher(hs),
- ServerKeyFetcher(hs),
- )
+ if key_fetchers is None:
+ key_fetchers = (
+ StoreKeyFetcher(hs),
+ PerspectivesKeyFetcher(hs),
+ ServerKeyFetcher(hs),
+ )
+ self._key_fetchers = key_fetchers
# map from server name to Deferred. Has an entry for each server with
# an ongoing key download; the Deferred completes once the download
@@ -96,9 +114,25 @@ class Keyring(object):
# These are regular, logcontext-agnostic Deferreds.
self.key_downloads = {}
- def verify_json_for_server(self, server_name, json_object):
+ def verify_json_for_server(self, server_name, json_object, validity_time):
+ """Verify that a JSON object has been signed by a given server
+
+ Args:
+ server_name (str): name of the server which must have signed this object
+
+ json_object (dict): object to be checked
+
+ validity_time (int): timestamp at which we require the signing key to
+ be valid. (0 implies we don't care)
+
+ Returns:
+ Deferred[None]: completes if the the object was correctly signed, otherwise
+ errbacks with an error
+ """
+ req = server_name, json_object, validity_time
+
return logcontext.make_deferred_yieldable(
- self.verify_json_objects_for_server([(server_name, json_object)])[0]
+ self.verify_json_objects_for_server((req,))[0]
)
def verify_json_objects_for_server(self, server_and_json):
@@ -106,10 +140,12 @@ class Keyring(object):
necessary.
Args:
- server_and_json (list): List of pairs of (server_name, json_object)
+ server_and_json (iterable[Tuple[str, dict, int]):
+ Iterable of triplets of (server_name, json_object, validity_time)
+ validity_time is a timestamp at which the signing key must be valid.
Returns:
- List<Deferred>: for each input pair, a deferred indicating success
+ List<Deferred[None]>: for each input triplet, a deferred indicating success
or failure to verify each json object's signature for the given
server_name. The deferreds run their callbacks in the sentinel
logcontext.
@@ -118,12 +154,12 @@ class Keyring(object):
verify_requests = []
handle = preserve_fn(_handle_key_deferred)
- def process(server_name, json_object):
+ def process(server_name, json_object, validity_time):
"""Process an entry in the request list
- Given a (server_name, json_object) pair from the request list,
- adds a key request to verify_requests, and returns a deferred which will
- complete or fail (in the sentinel context) when verification completes.
+ Given a (server_name, json_object, validity_time) triplet from the request
+ list, adds a key request to verify_requests, and returns a deferred which
+ will complete or fail (in the sentinel context) when verification completes.
"""
key_ids = signature_ids(json_object, server_name)
@@ -134,11 +170,16 @@ class Keyring(object):
)
)
- logger.debug("Verifying for %s with key_ids %s", server_name, key_ids)
+ logger.debug(
+ "Verifying for %s with key_ids %s, min_validity %i",
+ server_name,
+ key_ids,
+ validity_time,
+ )
# add the key request to the queue, but don't start it off yet.
verify_request = VerifyKeyRequest(
- server_name, key_ids, json_object, defer.Deferred()
+ server_name, key_ids, json_object, validity_time
)
verify_requests.append(verify_request)
@@ -150,8 +191,8 @@ class Keyring(object):
return handle(verify_request)
results = [
- process(server_name, json_object)
- for server_name, json_object in server_and_json
+ process(server_name, json_object, validity_time)
+ for server_name, json_object, validity_time in server_and_json
]
if verify_requests:
@@ -270,84 +311,112 @@ class Keyring(object):
verify_requests (list[VerifyKeyRequest]): list of verify requests
"""
+ remaining_requests = set(
+ (rq for rq in verify_requests if not rq.deferred.called)
+ )
+
@defer.inlineCallbacks
def do_iterations():
with Measure(self.clock, "get_server_verify_keys"):
- # dict[str, set(str)]: keys to fetch for each server
- missing_keys = {}
- for verify_request in verify_requests:
- missing_keys.setdefault(verify_request.server_name, set()).update(
- verify_request.key_ids
- )
-
for f in self._key_fetchers:
- results = yield f.get_keys(missing_keys.items())
-
- # We now need to figure out which verify requests we have keys
- # for and which we don't
- missing_keys = {}
- requests_missing_keys = []
- for verify_request in verify_requests:
- if verify_request.deferred.called:
- # We've already called this deferred, which probably
- # means that we've already found a key for it.
- continue
-
- server_name = verify_request.server_name
-
- # see if any of the keys we got this time are sufficient to
- # complete this VerifyKeyRequest.
- result_keys = results.get(server_name, {})
- for key_id in verify_request.key_ids:
- fetch_key_result = result_keys.get(key_id)
- if fetch_key_result:
- with PreserveLoggingContext():
- verify_request.deferred.callback(
- (
- server_name,
- key_id,
- fetch_key_result.verify_key,
- )
- )
- break
- else:
- # The else block is only reached if the loop above
- # doesn't break.
- missing_keys.setdefault(server_name, set()).update(
- verify_request.key_ids
- )
- requests_missing_keys.append(verify_request)
-
- if not missing_keys:
- break
+ if not remaining_requests:
+ return
+ yield self._attempt_key_fetches_with_fetcher(f, remaining_requests)
+ # look for any requests which weren't satisfied
with PreserveLoggingContext():
- for verify_request in requests_missing_keys:
+ for verify_request in remaining_requests:
verify_request.deferred.errback(
SynapseError(
401,
- "No key for %s with id %s"
- % (verify_request.server_name, verify_request.key_ids),
+ "No key for %s with ids in %s (min_validity %i)"
+ % (
+ verify_request.server_name,
+ verify_request.key_ids,
+ verify_request.minimum_valid_until_ts,
+ ),
Codes.UNAUTHORIZED,
)
)
def on_err(err):
+ # we don't really expect to get here, because any errors should already
+ # have been caught and logged. But if we do, let's log the error and make
+ # sure that all of the deferreds are resolved.
+ logger.error("Unexpected error in _get_server_verify_keys: %s", err)
with PreserveLoggingContext():
- for verify_request in verify_requests:
+ for verify_request in remaining_requests:
if not verify_request.deferred.called:
verify_request.deferred.errback(err)
run_in_background(do_iterations).addErrback(on_err)
+ @defer.inlineCallbacks
+ def _attempt_key_fetches_with_fetcher(self, fetcher, remaining_requests):
+ """Use a key fetcher to attempt to satisfy some key requests
+
+ Args:
+ fetcher (KeyFetcher): fetcher to use to fetch the keys
+ remaining_requests (set[VerifyKeyRequest]): outstanding key requests.
+ Any successfully-completed requests will be removed from the list.
+ """
+ # dict[str, dict[str, int]]: keys to fetch.
+ # server_name -> key_id -> min_valid_ts
+ missing_keys = defaultdict(dict)
+
+ for verify_request in remaining_requests:
+ # any completed requests should already have been removed
+ assert not verify_request.deferred.called
+ keys_for_server = missing_keys[verify_request.server_name]
+
+ for key_id in verify_request.key_ids:
+ # If we have several requests for the same key, then we only need to
+ # request that key once, but we should do so with the greatest
+ # min_valid_until_ts of the requests, so that we can satisfy all of
+ # the requests.
+ keys_for_server[key_id] = max(
+ keys_for_server.get(key_id, -1),
+ verify_request.minimum_valid_until_ts
+ )
+
+ results = yield fetcher.get_keys(missing_keys)
+
+ completed = list()
+ for verify_request in remaining_requests:
+ server_name = verify_request.server_name
+
+ # see if any of the keys we got this time are sufficient to
+ # complete this VerifyKeyRequest.
+ result_keys = results.get(server_name, {})
+ for key_id in verify_request.key_ids:
+ fetch_key_result = result_keys.get(key_id)
+ if not fetch_key_result:
+ # we didn't get a result for this key
+ continue
+
+ if (
+ fetch_key_result.valid_until_ts
+ < verify_request.minimum_valid_until_ts
+ ):
+ # key was not valid at this point
+ continue
+
+ with PreserveLoggingContext():
+ verify_request.deferred.callback(
+ (server_name, key_id, fetch_key_result.verify_key)
+ )
+ completed.append(verify_request)
+ break
+
+ remaining_requests.difference_update(completed)
+
class KeyFetcher(object):
- def get_keys(self, server_name_and_key_ids):
+ def get_keys(self, keys_to_fetch):
"""
Args:
- server_name_and_key_ids (iterable[Tuple[str, iterable[str]]]):
- list of (server_name, iterable[key_id]) tuples to fetch keys for
- Note that the iterables may be iterated more than once.
+ keys_to_fetch (dict[str, dict[str, int]]):
+ the keys to be fetched. server_name -> key_id -> min_valid_ts
Returns:
Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]:
@@ -363,13 +432,15 @@ class StoreKeyFetcher(KeyFetcher):
self.store = hs.get_datastore()
@defer.inlineCallbacks
- def get_keys(self, server_name_and_key_ids):
+ def get_keys(self, keys_to_fetch):
"""see KeyFetcher.get_keys"""
+
keys_to_fetch = (
(server_name, key_id)
- for server_name, key_ids in server_name_and_key_ids
- for key_id in key_ids
+ for server_name, keys_for_server in keys_to_fetch.items()
+ for key_id in keys_for_server.keys()
)
+
res = yield self.store.get_server_verify_keys(keys_to_fetch)
keys = {}
for (server_name, key_id), key in res.items():
@@ -384,7 +455,7 @@ class BaseV2KeyFetcher(object):
@defer.inlineCallbacks
def process_v2_response(
- self, from_server, response_json, time_added_ms, requested_ids=[]
+ self, from_server, response_json, time_added_ms
):
"""Parse a 'Server Keys' structure from the result of a /key request
@@ -407,10 +478,6 @@ class BaseV2KeyFetcher(object):
time_added_ms (int): the timestamp to record in server_keys_json
- requested_ids (iterable[str]): a list of the key IDs that were requested.
- We will store the json for these key ids as well as any that are
- actually in the response
-
Returns:
Deferred[dict[str, FetchKeyResult]]: map from key_id to result object
"""
@@ -466,11 +533,6 @@ class BaseV2KeyFetcher(object):
signed_key_json_bytes = encode_canonical_json(signed_key_json)
- # for reasons I don't quite understand, we store this json for the key ids we
- # requested, as well as those we got.
- updated_key_ids = set(requested_ids)
- updated_key_ids.update(verify_keys)
-
yield logcontext.make_deferred_yieldable(
defer.gatherResults(
[
@@ -483,7 +545,7 @@ class BaseV2KeyFetcher(object):
ts_expires_ms=ts_valid_until_ms,
key_json_bytes=signed_key_json_bytes,
)
- for key_id in updated_key_ids
+ for key_id in verify_keys
],
consumeErrors=True,
).addErrback(unwrapFirstError)
@@ -502,14 +564,14 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
self.perspective_servers = self.config.perspectives
@defer.inlineCallbacks
- def get_keys(self, server_name_and_key_ids):
+ def get_keys(self, keys_to_fetch):
"""see KeyFetcher.get_keys"""
@defer.inlineCallbacks
def get_key(perspective_name, perspective_keys):
try:
result = yield self.get_server_verify_key_v2_indirect(
- server_name_and_key_ids, perspective_name, perspective_keys
+ keys_to_fetch, perspective_name, perspective_keys
)
defer.returnValue(result)
except KeyLookupError as e:
@@ -543,13 +605,15 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
@defer.inlineCallbacks
def get_server_verify_key_v2_indirect(
- self, server_names_and_key_ids, perspective_name, perspective_keys
+ self, keys_to_fetch, perspective_name, perspective_keys
):
"""
Args:
- server_names_and_key_ids (iterable[Tuple[str, iterable[str]]]):
- list of (server_name, iterable[key_id]) tuples to fetch keys for
+ keys_to_fetch (dict[str, dict[str, int]]):
+ the keys to be fetched. server_name -> key_id -> min_valid_ts
+
perspective_name (str): name of the notary server to query for the keys
+
perspective_keys (dict[str, VerifyKey]): map of key_id->key for the
notary server
@@ -563,12 +627,10 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
"""
logger.info(
"Requesting keys %s from notary server %s",
- server_names_and_key_ids,
+ keys_to_fetch.items(),
perspective_name,
)
- # TODO(mark): Set the minimum_valid_until_ts to that needed by
- # the events being validated or the current time if validating
- # an incoming request.
+
try:
query_response = yield self.client.post_json(
destination=perspective_name,
@@ -576,12 +638,12 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
data={
u"server_keys": {
server_name: {
- key_id: {u"minimum_valid_until_ts": 0} for key_id in key_ids
+ key_id: {u"minimum_valid_until_ts": min_valid_ts}
+ for key_id, min_valid_ts in server_keys.items()
}
- for server_name, key_ids in server_names_and_key_ids
+ for server_name, server_keys in keys_to_fetch.items()
}
},
- long_retries=True,
)
except (NotRetryingDestination, RequestSendFailed) as e:
raise_from(KeyLookupError("Failed to connect to remote server"), e)
@@ -687,34 +749,54 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
self.clock = hs.get_clock()
self.client = hs.get_http_client()
- @defer.inlineCallbacks
- def get_keys(self, server_name_and_key_ids):
- """see KeyFetcher.get_keys"""
- results = yield logcontext.make_deferred_yieldable(
- defer.gatherResults(
- [
- run_in_background(
- self.get_server_verify_key_v2_direct, server_name, key_ids
- )
- for server_name, key_ids in server_name_and_key_ids
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
- )
+ def get_keys(self, keys_to_fetch):
+ """
+ Args:
+ keys_to_fetch (dict[str, iterable[str]]):
+ the keys to be fetched. server_name -> key_ids
- merged = {}
- for result in results:
- merged.update(result)
+ Returns:
+ Deferred[dict[str, dict[str, synapse.storage.keys.FetchKeyResult|None]]]:
+ map from server_name -> key_id -> FetchKeyResult
+ """
- defer.returnValue(
- {server_name: keys for server_name, keys in merged.items() if keys}
+ results = {}
+
+ @defer.inlineCallbacks
+ def get_key(key_to_fetch_item):
+ server_name, key_ids = key_to_fetch_item
+ try:
+ keys = yield self.get_server_verify_key_v2_direct(server_name, key_ids)
+ results[server_name] = keys
+ except KeyLookupError as e:
+ logger.warning(
+ "Error looking up keys %s from %s: %s", key_ids, server_name, e
+ )
+ except Exception:
+ logger.exception("Error getting keys %s from %s", key_ids, server_name)
+
+ return yieldable_gather_results(get_key, keys_to_fetch.items()).addCallback(
+ lambda _: results
)
@defer.inlineCallbacks
def get_server_verify_key_v2_direct(self, server_name, key_ids):
+ """
+
+ Args:
+ server_name (str):
+ key_ids (iterable[str]):
+
+ Returns:
+ Deferred[dict[str, FetchKeyResult]]: map from key ID to lookup result
+
+ Raises:
+ KeyLookupError if there was a problem making the lookup
+ """
keys = {} # type: dict[str, FetchKeyResult]
for requested_key_id in key_ids:
+ # we may have found this key as a side-effect of asking for another.
if requested_key_id in keys:
continue
@@ -725,6 +807,19 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
path="/_matrix/key/v2/server/"
+ urllib.parse.quote(requested_key_id),
ignore_backoff=True,
+
+ # we only give the remote server 10s to respond. It should be an
+ # easy request to handle, so if it doesn't reply within 10s, it's
+ # probably not going to.
+ #
+ # Furthermore, when we are acting as a notary server, we cannot
+ # wait all day for all of the origin servers, as the requesting
+ # server will otherwise time out before we can respond.
+ #
+ # (Note that get_json may make 4 attempts, so this can still take
+ # almost 45 seconds to fetch the headers, plus up to another 60s to
+ # read the response).
+ timeout=10000,
)
except (NotRetryingDestination, RequestSendFailed) as e:
raise_from(KeyLookupError("Failed to connect to remote server"), e)
@@ -739,7 +834,6 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
response_keys = yield self.process_v2_response(
from_server=server_name,
- requested_ids=[requested_key_id],
response_json=response,
time_added_ms=time_now_ms,
)
@@ -750,7 +844,7 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
)
keys.update(response_keys)
- defer.returnValue({server_name: keys})
+ defer.returnValue(keys)
@defer.inlineCallbacks
@@ -767,31 +861,8 @@ def _handle_key_deferred(verify_request):
SynapseError if there was a problem performing the verification
"""
server_name = verify_request.server_name
- try:
- with PreserveLoggingContext():
- _, key_id, verify_key = yield verify_request.deferred
- except KeyLookupError as e:
- logger.warn(
- "Failed to download keys for %s: %s %s",
- server_name,
- type(e).__name__,
- str(e),
- )
- raise SynapseError(
- 502, "Error downloading keys for %s" % (server_name,), Codes.UNAUTHORIZED
- )
- except Exception as e:
- logger.exception(
- "Got Exception when downloading keys for %s: %s %s",
- server_name,
- type(e).__name__,
- str(e),
- )
- raise SynapseError(
- 401,
- "No key for %s with id %s" % (server_name, verify_request.key_ids),
- Codes.UNAUTHORIZED,
- )
+ with PreserveLoggingContext():
+ _, key_id, verify_key = yield verify_request.deferred
json_object = verify_request.json_object
|