diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 774a252619..d629c7c16c 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -16,12 +16,11 @@
# limitations under the License.
import logging
-
-from six import iteritems
+from typing import Dict, List, Optional, Tuple
import attr
-from canonicaljson import encode_canonical_json, json
-from signedjson.key import decode_verify_key_bytes
+from canonicaljson import encode_canonical_json
+from signedjson.key import VerifyKey, decode_verify_key_bytes
from signedjson.sign import SignatureVerifyException, verify_signed_json
from unpaddedbase64 import decode_base64
@@ -36,7 +35,7 @@ from synapse.types import (
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
-from synapse.util import unwrapFirstError
+from synapse.util import json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
@@ -44,7 +43,7 @@ from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
-class E2eKeysHandler(object):
+class E2eKeysHandler:
def __init__(self, hs):
self.store = hs.get_datastore()
self.federation = hs.get_federation_client()
@@ -79,8 +78,7 @@ class E2eKeysHandler(object):
)
@trace
- @defer.inlineCallbacks
- def query_devices(self, query_body, timeout, from_user_id):
+ async def query_devices(self, query_body, timeout, from_user_id):
""" Handle a device key query from a client
{
@@ -126,7 +124,7 @@ class E2eKeysHandler(object):
failures = {}
results = {}
if local_query:
- local_result = yield self.query_local_devices(local_query)
+ local_result = await self.query_local_devices(local_query)
for user_id, keys in local_result.items():
if user_id in local_query:
results[user_id] = keys
@@ -135,7 +133,7 @@ class E2eKeysHandler(object):
remote_queries_not_in_cache = {}
if remote_queries:
query_list = []
- for user_id, device_ids in iteritems(remote_queries):
+ for user_id, device_ids in remote_queries.items():
if device_ids:
query_list.extend((user_id, device_id) for device_id in device_ids)
else:
@@ -144,10 +142,10 @@ class E2eKeysHandler(object):
(
user_ids_not_in_cache,
remote_results,
- ) = yield self.store.get_user_devices_from_cache(query_list)
- for user_id, devices in iteritems(remote_results):
+ ) = await self.store.get_user_devices_from_cache(query_list)
+ for user_id, devices in remote_results.items():
user_devices = results.setdefault(user_id, {})
- for device_id, device in iteritems(devices):
+ for device_id, device in devices.items():
keys = device.get("keys", None)
device_display_name = device.get("device_display_name", None)
if keys:
@@ -163,14 +161,13 @@ class E2eKeysHandler(object):
r[user_id] = remote_queries[user_id]
# Get cached cross-signing keys
- cross_signing_keys = yield self.get_cross_signing_keys_from_cache(
+ cross_signing_keys = await self.get_cross_signing_keys_from_cache(
device_keys_query, from_user_id
)
# Now fetch any devices that we don't have in our cache
@trace
- @defer.inlineCallbacks
- def do_remote_query(destination):
+ async def do_remote_query(destination):
"""This is called when we are querying the device list of a user on
a remote homeserver and their device list is not in the device list
cache. If we share a room with this user and we're not querying for
@@ -194,7 +191,7 @@ class E2eKeysHandler(object):
if device_list:
continue
- room_ids = yield self.store.get_rooms_for_user(user_id)
+ room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
continue
@@ -203,11 +200,11 @@ class E2eKeysHandler(object):
# done an initial sync on the device list so we do it now.
try:
if self._is_master:
- user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+ user_devices = await self.device_handler.device_list_updater.user_device_resync(
user_id
)
else:
- user_devices = yield self._user_device_resync_client(
+ user_devices = await self._user_device_resync_client(
user_id=user_id
)
@@ -229,7 +226,7 @@ class E2eKeysHandler(object):
destination_query.pop(user_id)
try:
- remote_result = yield self.federation.query_client_keys(
+ remote_result = await self.federation.query_client_keys(
destination, {"device_keys": destination_query}, timeout=timeout
)
@@ -253,7 +250,7 @@ class E2eKeysHandler(object):
set_tag("error", True)
set_tag("reason", failure)
- yield make_deferred_yieldable(
+ await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(do_remote_query, destination)
@@ -269,8 +266,9 @@ class E2eKeysHandler(object):
return ret
- @defer.inlineCallbacks
- def get_cross_signing_keys_from_cache(self, query, from_user_id):
+ async def get_cross_signing_keys_from_cache(
+ self, query, from_user_id
+ ) -> Dict[str, Dict[str, dict]]:
"""Get cross-signing keys for users from the database
Args:
@@ -282,8 +280,7 @@ class E2eKeysHandler(object):
can see.
Returns:
- defer.Deferred[dict[str, dict[str, dict]]]: map from
- (master_keys|self_signing_keys|user_signing_keys) -> user_id -> key
+ A map from (master_keys|self_signing_keys|user_signing_keys) -> user_id -> key
"""
master_keys = {}
self_signing_keys = {}
@@ -291,7 +288,7 @@ class E2eKeysHandler(object):
user_ids = list(query)
- keys = yield self.store.get_e2e_cross_signing_keys_bulk(user_ids, from_user_id)
+ keys = await self.store.get_e2e_cross_signing_keys_bulk(user_ids, from_user_id)
for user_id, user_info in keys.items():
if user_info is None:
@@ -317,17 +314,17 @@ class E2eKeysHandler(object):
}
@trace
- @defer.inlineCallbacks
- def query_local_devices(self, query):
+ async def query_local_devices(
+ self, query: Dict[str, Optional[List[str]]]
+ ) -> Dict[str, Dict[str, dict]]:
"""Get E2E device keys for local users
Args:
- query (dict[string, list[string]|None): map from user_id to a list
+ query: map from user_id to a list
of devices to query (None for all devices)
Returns:
- defer.Deferred: (resolves to dict[string, dict[string, dict]]):
- map from user_id -> device_id -> device details
+ A map from user_id -> device_id -> device details
"""
set_tag("local_query", query)
local_query = []
@@ -356,7 +353,7 @@ class E2eKeysHandler(object):
# make sure that each queried user appears in the result dict
result_dict[user_id] = {}
- results = yield self.store.get_e2e_device_keys(local_query)
+ results = await self.store.get_e2e_device_keys_for_cs_api(local_query)
# Build the result structure
for user_id, device_keys in results.items():
@@ -366,16 +363,15 @@ class E2eKeysHandler(object):
log_kv(results)
return result_dict
- @defer.inlineCallbacks
- def on_federation_query_client_keys(self, query_body):
+ async def on_federation_query_client_keys(self, query_body):
""" Handle a device key query from a federated server
"""
device_keys_query = query_body.get("device_keys", {})
- res = yield self.query_local_devices(device_keys_query)
+ res = await self.query_local_devices(device_keys_query)
ret = {"device_keys": res}
# add in the cross-signing keys
- cross_signing_keys = yield self.get_cross_signing_keys_from_cache(
+ cross_signing_keys = await self.get_cross_signing_keys_from_cache(
device_keys_query, None
)
@@ -384,8 +380,7 @@ class E2eKeysHandler(object):
return ret
@trace
- @defer.inlineCallbacks
- def claim_one_time_keys(self, query, timeout):
+ async def claim_one_time_keys(self, query, timeout):
local_query = []
remote_queries = {}
@@ -401,7 +396,7 @@ class E2eKeysHandler(object):
set_tag("local_key_query", local_query)
set_tag("remote_key_query", remote_queries)
- results = yield self.store.claim_e2e_one_time_keys(local_query)
+ results = await self.store.claim_e2e_one_time_keys(local_query)
json_result = {}
failures = {}
@@ -409,16 +404,15 @@ class E2eKeysHandler(object):
for device_id, keys in device_keys.items():
for key_id, json_bytes in keys.items():
json_result.setdefault(user_id, {})[device_id] = {
- key_id: json.loads(json_bytes)
+ key_id: json_decoder.decode(json_bytes)
}
@trace
- @defer.inlineCallbacks
- def claim_client_keys(destination):
+ async def claim_client_keys(destination):
set_tag("destination", destination)
device_keys = remote_queries[destination]
try:
- remote_result = yield self.federation.claim_client_keys(
+ remote_result = await self.federation.claim_client_keys(
destination, {"one_time_keys": device_keys}, timeout=timeout
)
for user_id, keys in remote_result["one_time_keys"].items():
@@ -431,7 +425,7 @@ class E2eKeysHandler(object):
set_tag("error", True)
set_tag("reason", failure)
- yield make_deferred_yieldable(
+ await make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(claim_client_keys, destination)
@@ -446,9 +440,9 @@ class E2eKeysHandler(object):
",".join(
(
"%s for %s:%s" % (key_id, user_id, device_id)
- for user_id, user_keys in iteritems(json_result)
- for device_id, device_keys in iteritems(user_keys)
- for key_id, _ in iteritems(device_keys)
+ for user_id, user_keys in json_result.items()
+ for device_id, device_keys in user_keys.items()
+ for key_id, _ in device_keys.items()
)
),
)
@@ -456,9 +450,8 @@ class E2eKeysHandler(object):
log_kv({"one_time_keys": json_result, "failures": failures})
return {"one_time_keys": json_result, "failures": failures}
- @defer.inlineCallbacks
@tag_args
- def upload_keys_for_user(self, user_id, device_id, keys):
+ async def upload_keys_for_user(self, user_id, device_id, keys):
time_now = self.clock.time_msec()
@@ -479,12 +472,12 @@ class E2eKeysHandler(object):
}
)
# TODO: Sign the JSON with the server key
- changed = yield self.store.set_e2e_device_keys(
+ changed = await self.store.set_e2e_device_keys(
user_id, device_id, time_now, device_keys
)
if changed:
# Only notify about device updates *if* the keys actually changed
- yield self.device_handler.notify_device_update(user_id, [device_id])
+ await self.device_handler.notify_device_update(user_id, [device_id])
else:
log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
one_time_keys = keys.get("one_time_keys", None)
@@ -496,7 +489,7 @@ class E2eKeysHandler(object):
"device_id": device_id,
}
)
- yield self._upload_one_time_keys_for_user(
+ await self._upload_one_time_keys_for_user(
user_id, device_id, time_now, one_time_keys
)
else:
@@ -509,15 +502,14 @@ class E2eKeysHandler(object):
# old access_token without an associated device_id. Either way, we
# need to double-check the device is registered to avoid ending up with
# keys without a corresponding device.
- yield self.device_handler.check_device_registered(user_id, device_id)
+ await self.device_handler.check_device_registered(user_id, device_id)
- result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
+ result = await self.store.count_e2e_one_time_keys(user_id, device_id)
set_tag("one_time_key_counts", result)
return {"one_time_key_counts": result}
- @defer.inlineCallbacks
- def _upload_one_time_keys_for_user(
+ async def _upload_one_time_keys_for_user(
self, user_id, device_id, time_now, one_time_keys
):
logger.info(
@@ -535,7 +527,7 @@ class E2eKeysHandler(object):
key_list.append((algorithm, key_id, key_obj))
# First we check if we have already persisted any of the keys.
- existing_key_map = yield self.store.get_e2e_one_time_keys(
+ existing_key_map = await self.store.get_e2e_one_time_keys(
user_id, device_id, [k_id for _, k_id, _ in key_list]
)
@@ -558,10 +550,9 @@ class E2eKeysHandler(object):
)
log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
- yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
+ await self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
- @defer.inlineCallbacks
- def upload_signing_keys_for_user(self, user_id, keys):
+ async def upload_signing_keys_for_user(self, user_id, keys):
"""Upload signing keys for cross-signing
Args:
@@ -576,7 +567,7 @@ class E2eKeysHandler(object):
_check_cross_signing_key(master_key, user_id, "master")
else:
- master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
+ master_key = await self.store.get_e2e_cross_signing_key(user_id, "master")
# if there is no master key, then we can't do anything, because all the
# other cross-signing keys need to be signed by the master key
@@ -615,10 +606,10 @@ class E2eKeysHandler(object):
# if everything checks out, then store the keys and send notifications
deviceids = []
if "master_key" in keys:
- yield self.store.set_e2e_cross_signing_key(user_id, "master", master_key)
+ await self.store.set_e2e_cross_signing_key(user_id, "master", master_key)
deviceids.append(master_verify_key.version)
if "self_signing_key" in keys:
- yield self.store.set_e2e_cross_signing_key(
+ await self.store.set_e2e_cross_signing_key(
user_id, "self_signing", self_signing_key
)
try:
@@ -628,23 +619,22 @@ class E2eKeysHandler(object):
except ValueError:
raise SynapseError(400, "Invalid self-signing key", Codes.INVALID_PARAM)
if "user_signing_key" in keys:
- yield self.store.set_e2e_cross_signing_key(
+ await self.store.set_e2e_cross_signing_key(
user_id, "user_signing", user_signing_key
)
# the signature stream matches the semantics that we want for
# user-signing key updates: only the user themselves is notified of
# their own user-signing key updates
- yield self.device_handler.notify_user_signature_update(user_id, [user_id])
+ await self.device_handler.notify_user_signature_update(user_id, [user_id])
# master key and self-signing key updates match the semantics of device
# list updates: all users who share an encrypted room are notified
if len(deviceids):
- yield self.device_handler.notify_device_update(user_id, deviceids)
+ await self.device_handler.notify_device_update(user_id, deviceids)
return {}
- @defer.inlineCallbacks
- def upload_signatures_for_device_keys(self, user_id, signatures):
+ async def upload_signatures_for_device_keys(self, user_id, signatures):
"""Upload device signatures for cross-signing
Args:
@@ -669,13 +659,13 @@ class E2eKeysHandler(object):
self_signatures = signatures.get(user_id, {})
other_signatures = {k: v for k, v in signatures.items() if k != user_id}
- self_signature_list, self_failures = yield self._process_self_signatures(
+ self_signature_list, self_failures = await self._process_self_signatures(
user_id, self_signatures
)
signature_list.extend(self_signature_list)
failures.update(self_failures)
- other_signature_list, other_failures = yield self._process_other_signatures(
+ other_signature_list, other_failures = await self._process_other_signatures(
user_id, other_signatures
)
signature_list.extend(other_signature_list)
@@ -683,21 +673,20 @@ class E2eKeysHandler(object):
# store the signature, and send the appropriate notifications for sync
logger.debug("upload signature failures: %r", failures)
- yield self.store.store_e2e_cross_signing_signatures(user_id, signature_list)
+ await self.store.store_e2e_cross_signing_signatures(user_id, signature_list)
self_device_ids = [item.target_device_id for item in self_signature_list]
if self_device_ids:
- yield self.device_handler.notify_device_update(user_id, self_device_ids)
+ await self.device_handler.notify_device_update(user_id, self_device_ids)
signed_users = [item.target_user_id for item in other_signature_list]
if signed_users:
- yield self.device_handler.notify_user_signature_update(
+ await self.device_handler.notify_user_signature_update(
user_id, signed_users
)
return {"failures": failures}
- @defer.inlineCallbacks
- def _process_self_signatures(self, user_id, signatures):
+ async def _process_self_signatures(self, user_id, signatures):
"""Process uploaded signatures of the user's own keys.
Signatures of the user's own keys from this API come in two forms:
@@ -730,7 +719,7 @@ class E2eKeysHandler(object):
_,
self_signing_key_id,
self_signing_verify_key,
- ) = yield self._get_e2e_cross_signing_verify_key(user_id, "self_signing")
+ ) = await self._get_e2e_cross_signing_verify_key(user_id, "self_signing")
# get our master key, since we may have received a signature of it.
# We need to fetch it here so that we know what its key ID is, so
@@ -740,12 +729,12 @@ class E2eKeysHandler(object):
master_key,
_,
master_verify_key,
- ) = yield self._get_e2e_cross_signing_verify_key(user_id, "master")
+ ) = await self._get_e2e_cross_signing_verify_key(user_id, "master")
# fetch our stored devices. This is used to 1. verify
# signatures on the master key, and 2. to compare with what
# was sent if the device was signed
- devices = yield self.store.get_e2e_device_keys([(user_id, None)])
+ devices = await self.store.get_e2e_device_keys_for_cs_api([(user_id, None)])
if user_id not in devices:
raise NotFoundError("No device keys found")
@@ -855,8 +844,7 @@ class E2eKeysHandler(object):
return master_key_signature_list
- @defer.inlineCallbacks
- def _process_other_signatures(self, user_id, signatures):
+ async def _process_other_signatures(self, user_id, signatures):
"""Process uploaded signatures of other users' keys. These will be the
target user's master keys, signed by the uploading user's user-signing
key.
@@ -884,7 +872,7 @@ class E2eKeysHandler(object):
user_signing_key,
user_signing_key_id,
user_signing_verify_key,
- ) = yield self._get_e2e_cross_signing_verify_key(user_id, "user_signing")
+ ) = await self._get_e2e_cross_signing_verify_key(user_id, "user_signing")
except SynapseError as e:
failure = _exception_to_failure(e)
for user, devicemap in signatures.items():
@@ -907,7 +895,7 @@ class E2eKeysHandler(object):
master_key,
master_key_id,
_,
- ) = yield self._get_e2e_cross_signing_verify_key(
+ ) = await self._get_e2e_cross_signing_verify_key(
target_user, "master", user_id
)
@@ -960,8 +948,7 @@ class E2eKeysHandler(object):
return signature_list, failures
- @defer.inlineCallbacks
- def _get_e2e_cross_signing_verify_key(
+ async def _get_e2e_cross_signing_verify_key(
self, user_id: str, key_type: str, from_user_id: str = None
):
"""Fetch locally or remotely query for a cross-signing public key.
@@ -985,7 +972,7 @@ class E2eKeysHandler(object):
SynapseError: if `user_id` is invalid
"""
user = UserID.from_string(user_id)
- key = yield self.store.get_e2e_cross_signing_key(
+ key = await self.store.get_e2e_cross_signing_key(
user_id, key_type, from_user_id
)
@@ -1011,17 +998,16 @@ class E2eKeysHandler(object):
key,
key_id,
verify_key,
- ) = yield self._retrieve_cross_signing_keys_for_remote_user(user, key_type)
+ ) = await self._retrieve_cross_signing_keys_for_remote_user(user, key_type)
if key is None:
raise NotFoundError("No %s key found for %s" % (key_type, user_id))
return key, key_id, verify_key
- @defer.inlineCallbacks
- def _retrieve_cross_signing_keys_for_remote_user(
+ async def _retrieve_cross_signing_keys_for_remote_user(
self, user: UserID, desired_key_type: str,
- ):
+ ) -> Tuple[Optional[dict], Optional[str], Optional[VerifyKey]]:
"""Queries cross-signing keys for a remote user and saves them to the database
Only the key specified by `key_type` will be returned, while all retrieved keys
@@ -1032,12 +1018,11 @@ class E2eKeysHandler(object):
desired_key_type: The type of key to receive. One of "master", "self_signing"
Returns:
- Deferred[Tuple[Optional[Dict], Optional[str], Optional[VerifyKey]]]: A tuple
- of the retrieved key content, the key's ID and the matching VerifyKey.
+ A tuple of the retrieved key content, the key's ID and the matching VerifyKey.
If the key cannot be retrieved, all values in the tuple will instead be None.
"""
try:
- remote_result = yield self.federation.query_user_devices(
+ remote_result = await self.federation.query_user_devices(
user.domain, user.to_string()
)
except Exception as e:
@@ -1103,14 +1088,14 @@ class E2eKeysHandler(object):
desired_key_id = key_id
# At the same time, store this key in the db for subsequent queries
- yield self.store.set_e2e_cross_signing_key(
+ await self.store.set_e2e_cross_signing_key(
user.to_string(), key_type, key_content
)
# Notify clients that new devices for this user have been discovered
if retrieved_device_ids:
# XXX is this necessary?
- yield self.device_handler.notify_device_update(
+ await self.device_handler.notify_device_update(
user.to_string(), retrieved_device_ids
)
@@ -1201,7 +1186,7 @@ def _exception_to_failure(e):
def _one_time_keys_match(old_key_json, new_key):
- old_key = json.loads(old_key_json)
+ old_key = json_decoder.decode(old_key_json)
# if either is a string rather than an object, they must match exactly
if not isinstance(old_key, dict) or not isinstance(new_key, dict):
@@ -1227,7 +1212,7 @@ class SignatureListItem:
signature = attr.ib()
-class SigningKeyEduUpdater(object):
+class SigningKeyEduUpdater:
"""Handles incoming signing key updates from federation and updates the DB"""
def __init__(self, hs, e2e_keys_handler):
@@ -1252,8 +1237,7 @@ class SigningKeyEduUpdater(object):
iterable=True,
)
- @defer.inlineCallbacks
- def incoming_signing_key_update(self, origin, edu_content):
+ async def incoming_signing_key_update(self, origin, edu_content):
"""Called on incoming signing key update from federation. Responsible for
parsing the EDU and adding to pending updates list.
@@ -1270,7 +1254,7 @@ class SigningKeyEduUpdater(object):
logger.warning("Got signing key update edu for %r from %r", user_id, origin)
return
- room_ids = yield self.store.get_rooms_for_user(user_id)
+ room_ids = await self.store.get_rooms_for_user(user_id)
if not room_ids:
# We don't share any rooms with this user. Ignore update, as we
# probably won't get any further updates.
@@ -1280,10 +1264,9 @@ class SigningKeyEduUpdater(object):
(master_key, self_signing_key)
)
- yield self._handle_signing_key_updates(user_id)
+ await self._handle_signing_key_updates(user_id)
- @defer.inlineCallbacks
- def _handle_signing_key_updates(self, user_id):
+ async def _handle_signing_key_updates(self, user_id):
"""Actually handle pending updates.
Args:
@@ -1293,7 +1276,7 @@ class SigningKeyEduUpdater(object):
device_handler = self.e2e_keys_handler.device_handler
device_list_updater = device_handler.device_list_updater
- with (yield self._remote_edu_linearizer.queue(user_id)):
+ with (await self._remote_edu_linearizer.queue(user_id)):
pending_updates = self._pending_updates.pop(user_id, [])
if not pending_updates:
# This can happen since we batch updates
@@ -1304,9 +1287,9 @@ class SigningKeyEduUpdater(object):
logger.info("pending updates: %r", pending_updates)
for master_key, self_signing_key in pending_updates:
- new_device_ids = yield device_list_updater.process_cross_signing_key_update(
+ new_device_ids = await device_list_updater.process_cross_signing_key_update(
user_id, master_key, self_signing_key,
)
device_ids = device_ids + new_device_ids
- yield device_handler.notify_device_update(user_id, device_ids)
+ await device_handler.notify_device_update(user_id, device_ids)
|