summary refs log tree commit diff
path: root/synapse/handlers/e2e_keys.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-07-15 08:48:58 -0400
committerGitHub <noreply@github.com>2020-07-15 08:48:58 -0400
commitb11450dedc59b117ad23426b47f2465c459ea62a (patch)
tree901bc38bc7ec0982faf838348d5471fa5e94c1e1 /synapse/handlers/e2e_keys.py
parentReturn the proper 403 Forbidden error during errors with JWT logins. (#7844) (diff)
downloadsynapse-b11450dedc59b117ad23426b47f2465c459ea62a.tar.xz
Convert E2E key and room key handlers to async/await. (#7851)
Diffstat (limited to 'synapse/handlers/e2e_keys.py')
-rw-r--r--synapse/handlers/e2e_keys.py147
1 files changed, 65 insertions, 82 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index a7e60cbc26..361dd64cd2 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -77,8 +77,7 @@ class E2eKeysHandler(object):
         )
 
     @trace
-    @defer.inlineCallbacks
-    def query_devices(self, query_body, timeout, from_user_id):
+    async def query_devices(self, query_body, timeout, from_user_id):
         """ Handle a device key query from a client
 
         {
@@ -124,7 +123,7 @@ class E2eKeysHandler(object):
         failures = {}
         results = {}
         if local_query:
-            local_result = yield self.query_local_devices(local_query)
+            local_result = await self.query_local_devices(local_query)
             for user_id, keys in local_result.items():
                 if user_id in local_query:
                     results[user_id] = keys
@@ -142,7 +141,7 @@ class E2eKeysHandler(object):
             (
                 user_ids_not_in_cache,
                 remote_results,
-            ) = yield self.store.get_user_devices_from_cache(query_list)
+            ) = await self.store.get_user_devices_from_cache(query_list)
             for user_id, devices in remote_results.items():
                 user_devices = results.setdefault(user_id, {})
                 for device_id, device in devices.items():
@@ -161,14 +160,13 @@ class E2eKeysHandler(object):
                 r[user_id] = remote_queries[user_id]
 
         # Get cached cross-signing keys
-        cross_signing_keys = yield self.get_cross_signing_keys_from_cache(
+        cross_signing_keys = await self.get_cross_signing_keys_from_cache(
             device_keys_query, from_user_id
         )
 
         # Now fetch any devices that we don't have in our cache
         @trace
-        @defer.inlineCallbacks
-        def do_remote_query(destination):
+        async def do_remote_query(destination):
             """This is called when we are querying the device list of a user on
             a remote homeserver and their device list is not in the device list
             cache. If we share a room with this user and we're not querying for
@@ -192,7 +190,7 @@ class E2eKeysHandler(object):
                 if device_list:
                     continue
 
-                room_ids = yield self.store.get_rooms_for_user(user_id)
+                room_ids = await self.store.get_rooms_for_user(user_id)
                 if not room_ids:
                     continue
 
@@ -201,11 +199,11 @@ class E2eKeysHandler(object):
                 # done an initial sync on the device list so we do it now.
                 try:
                     if self._is_master:
-                        user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+                        user_devices = await self.device_handler.device_list_updater.user_device_resync(
                             user_id
                         )
                     else:
-                        user_devices = yield self._user_device_resync_client(
+                        user_devices = await self._user_device_resync_client(
                             user_id=user_id
                         )
 
@@ -227,7 +225,7 @@ class E2eKeysHandler(object):
                 destination_query.pop(user_id)
 
             try:
-                remote_result = yield self.federation.query_client_keys(
+                remote_result = await self.federation.query_client_keys(
                     destination, {"device_keys": destination_query}, timeout=timeout
                 )
 
@@ -251,7 +249,7 @@ class E2eKeysHandler(object):
                 set_tag("error", True)
                 set_tag("reason", failure)
 
-        yield make_deferred_yieldable(
+        await make_deferred_yieldable(
             defer.gatherResults(
                 [
                     run_in_background(do_remote_query, destination)
@@ -267,8 +265,7 @@ class E2eKeysHandler(object):
 
         return ret
 
-    @defer.inlineCallbacks
-    def get_cross_signing_keys_from_cache(self, query, from_user_id):
+    async def get_cross_signing_keys_from_cache(self, query, from_user_id):
         """Get cross-signing keys for users from the database
 
         Args:
@@ -289,7 +286,7 @@ class E2eKeysHandler(object):
 
         user_ids = list(query)
 
-        keys = yield self.store.get_e2e_cross_signing_keys_bulk(user_ids, from_user_id)
+        keys = await self.store.get_e2e_cross_signing_keys_bulk(user_ids, from_user_id)
 
         for user_id, user_info in keys.items():
             if user_info is None:
@@ -315,8 +312,7 @@ class E2eKeysHandler(object):
         }
 
     @trace
-    @defer.inlineCallbacks
-    def query_local_devices(self, query):
+    async def query_local_devices(self, query):
         """Get E2E device keys for local users
 
         Args:
@@ -354,7 +350,7 @@ class E2eKeysHandler(object):
             # make sure that each queried user appears in the result dict
             result_dict[user_id] = {}
 
-        results = yield self.store.get_e2e_device_keys(local_query)
+        results = await self.store.get_e2e_device_keys(local_query)
 
         # Build the result structure
         for user_id, device_keys in results.items():
@@ -364,16 +360,15 @@ class E2eKeysHandler(object):
         log_kv(results)
         return result_dict
 
-    @defer.inlineCallbacks
-    def on_federation_query_client_keys(self, query_body):
+    async def on_federation_query_client_keys(self, query_body):
         """ Handle a device key query from a federated server
         """
         device_keys_query = query_body.get("device_keys", {})
-        res = yield self.query_local_devices(device_keys_query)
+        res = await self.query_local_devices(device_keys_query)
         ret = {"device_keys": res}
 
         # add in the cross-signing keys
-        cross_signing_keys = yield self.get_cross_signing_keys_from_cache(
+        cross_signing_keys = await self.get_cross_signing_keys_from_cache(
             device_keys_query, None
         )
 
@@ -382,8 +377,7 @@ class E2eKeysHandler(object):
         return ret
 
     @trace
-    @defer.inlineCallbacks
-    def claim_one_time_keys(self, query, timeout):
+    async def claim_one_time_keys(self, query, timeout):
         local_query = []
         remote_queries = {}
 
@@ -399,7 +393,7 @@ class E2eKeysHandler(object):
         set_tag("local_key_query", local_query)
         set_tag("remote_key_query", remote_queries)
 
-        results = yield self.store.claim_e2e_one_time_keys(local_query)
+        results = await self.store.claim_e2e_one_time_keys(local_query)
 
         json_result = {}
         failures = {}
@@ -411,12 +405,11 @@ class E2eKeysHandler(object):
                     }
 
         @trace
-        @defer.inlineCallbacks
-        def claim_client_keys(destination):
+        async def claim_client_keys(destination):
             set_tag("destination", destination)
             device_keys = remote_queries[destination]
             try:
-                remote_result = yield self.federation.claim_client_keys(
+                remote_result = await self.federation.claim_client_keys(
                     destination, {"one_time_keys": device_keys}, timeout=timeout
                 )
                 for user_id, keys in remote_result["one_time_keys"].items():
@@ -429,7 +422,7 @@ class E2eKeysHandler(object):
                 set_tag("error", True)
                 set_tag("reason", failure)
 
-        yield make_deferred_yieldable(
+        await make_deferred_yieldable(
             defer.gatherResults(
                 [
                     run_in_background(claim_client_keys, destination)
@@ -454,9 +447,8 @@ class E2eKeysHandler(object):
         log_kv({"one_time_keys": json_result, "failures": failures})
         return {"one_time_keys": json_result, "failures": failures}
 
-    @defer.inlineCallbacks
     @tag_args
-    def upload_keys_for_user(self, user_id, device_id, keys):
+    async def upload_keys_for_user(self, user_id, device_id, keys):
 
         time_now = self.clock.time_msec()
 
@@ -477,12 +469,12 @@ class E2eKeysHandler(object):
                 }
             )
             # TODO: Sign the JSON with the server key
-            changed = yield self.store.set_e2e_device_keys(
+            changed = await self.store.set_e2e_device_keys(
                 user_id, device_id, time_now, device_keys
             )
             if changed:
                 # Only notify about device updates *if* the keys actually changed
-                yield self.device_handler.notify_device_update(user_id, [device_id])
+                await self.device_handler.notify_device_update(user_id, [device_id])
         else:
             log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
         one_time_keys = keys.get("one_time_keys", None)
@@ -494,7 +486,7 @@ class E2eKeysHandler(object):
                     "device_id": device_id,
                 }
             )
-            yield self._upload_one_time_keys_for_user(
+            await self._upload_one_time_keys_for_user(
                 user_id, device_id, time_now, one_time_keys
             )
         else:
@@ -507,15 +499,14 @@ class E2eKeysHandler(object):
         # old access_token without an associated device_id. Either way, we
         # need to double-check the device is registered to avoid ending up with
         # keys without a corresponding device.
-        yield self.device_handler.check_device_registered(user_id, device_id)
+        await self.device_handler.check_device_registered(user_id, device_id)
 
-        result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
+        result = await self.store.count_e2e_one_time_keys(user_id, device_id)
 
         set_tag("one_time_key_counts", result)
         return {"one_time_key_counts": result}
 
-    @defer.inlineCallbacks
-    def _upload_one_time_keys_for_user(
+    async def _upload_one_time_keys_for_user(
         self, user_id, device_id, time_now, one_time_keys
     ):
         logger.info(
@@ -533,7 +524,7 @@ class E2eKeysHandler(object):
             key_list.append((algorithm, key_id, key_obj))
 
         # First we check if we have already persisted any of the keys.
-        existing_key_map = yield self.store.get_e2e_one_time_keys(
+        existing_key_map = await self.store.get_e2e_one_time_keys(
             user_id, device_id, [k_id for _, k_id, _ in key_list]
         )
 
@@ -556,10 +547,9 @@ class E2eKeysHandler(object):
                 )
 
         log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
-        yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
+        await self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
 
-    @defer.inlineCallbacks
-    def upload_signing_keys_for_user(self, user_id, keys):
+    async def upload_signing_keys_for_user(self, user_id, keys):
         """Upload signing keys for cross-signing
 
         Args:
@@ -574,7 +564,7 @@ class E2eKeysHandler(object):
 
             _check_cross_signing_key(master_key, user_id, "master")
         else:
-            master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
+            master_key = await self.store.get_e2e_cross_signing_key(user_id, "master")
 
         # if there is no master key, then we can't do anything, because all the
         # other cross-signing keys need to be signed by the master key
@@ -613,10 +603,10 @@ class E2eKeysHandler(object):
         # if everything checks out, then store the keys and send notifications
         deviceids = []
         if "master_key" in keys:
-            yield self.store.set_e2e_cross_signing_key(user_id, "master", master_key)
+            await self.store.set_e2e_cross_signing_key(user_id, "master", master_key)
             deviceids.append(master_verify_key.version)
         if "self_signing_key" in keys:
-            yield self.store.set_e2e_cross_signing_key(
+            await self.store.set_e2e_cross_signing_key(
                 user_id, "self_signing", self_signing_key
             )
             try:
@@ -626,23 +616,22 @@ class E2eKeysHandler(object):
             except ValueError:
                 raise SynapseError(400, "Invalid self-signing key", Codes.INVALID_PARAM)
         if "user_signing_key" in keys:
-            yield self.store.set_e2e_cross_signing_key(
+            await self.store.set_e2e_cross_signing_key(
                 user_id, "user_signing", user_signing_key
             )
             # the signature stream matches the semantics that we want for
             # user-signing key updates: only the user themselves is notified of
             # their own user-signing key updates
-            yield self.device_handler.notify_user_signature_update(user_id, [user_id])
+            await self.device_handler.notify_user_signature_update(user_id, [user_id])
 
         # master key and self-signing key updates match the semantics of device
         # list updates: all users who share an encrypted room are notified
         if len(deviceids):
-            yield self.device_handler.notify_device_update(user_id, deviceids)
+            await self.device_handler.notify_device_update(user_id, deviceids)
 
         return {}
 
-    @defer.inlineCallbacks
-    def upload_signatures_for_device_keys(self, user_id, signatures):
+    async def upload_signatures_for_device_keys(self, user_id, signatures):
         """Upload device signatures for cross-signing
 
         Args:
@@ -667,13 +656,13 @@ class E2eKeysHandler(object):
         self_signatures = signatures.get(user_id, {})
         other_signatures = {k: v for k, v in signatures.items() if k != user_id}
 
-        self_signature_list, self_failures = yield self._process_self_signatures(
+        self_signature_list, self_failures = await self._process_self_signatures(
             user_id, self_signatures
         )
         signature_list.extend(self_signature_list)
         failures.update(self_failures)
 
-        other_signature_list, other_failures = yield self._process_other_signatures(
+        other_signature_list, other_failures = await self._process_other_signatures(
             user_id, other_signatures
         )
         signature_list.extend(other_signature_list)
@@ -681,21 +670,20 @@ class E2eKeysHandler(object):
 
         # store the signature, and send the appropriate notifications for sync
         logger.debug("upload signature failures: %r", failures)
-        yield self.store.store_e2e_cross_signing_signatures(user_id, signature_list)
+        await self.store.store_e2e_cross_signing_signatures(user_id, signature_list)
 
         self_device_ids = [item.target_device_id for item in self_signature_list]
         if self_device_ids:
-            yield self.device_handler.notify_device_update(user_id, self_device_ids)
+            await self.device_handler.notify_device_update(user_id, self_device_ids)
         signed_users = [item.target_user_id for item in other_signature_list]
         if signed_users:
-            yield self.device_handler.notify_user_signature_update(
+            await self.device_handler.notify_user_signature_update(
                 user_id, signed_users
             )
 
         return {"failures": failures}
 
-    @defer.inlineCallbacks
-    def _process_self_signatures(self, user_id, signatures):
+    async def _process_self_signatures(self, user_id, signatures):
         """Process uploaded signatures of the user's own keys.
 
         Signatures of the user's own keys from this API come in two forms:
@@ -728,7 +716,7 @@ class E2eKeysHandler(object):
                 _,
                 self_signing_key_id,
                 self_signing_verify_key,
-            ) = yield self._get_e2e_cross_signing_verify_key(user_id, "self_signing")
+            ) = await self._get_e2e_cross_signing_verify_key(user_id, "self_signing")
 
             # get our master key, since we may have received a signature of it.
             # We need to fetch it here so that we know what its key ID is, so
@@ -738,12 +726,12 @@ class E2eKeysHandler(object):
                 master_key,
                 _,
                 master_verify_key,
-            ) = yield self._get_e2e_cross_signing_verify_key(user_id, "master")
+            ) = await self._get_e2e_cross_signing_verify_key(user_id, "master")
 
             # fetch our stored devices.  This is used to 1. verify
             # signatures on the master key, and 2. to compare with what
             # was sent if the device was signed
-            devices = yield self.store.get_e2e_device_keys([(user_id, None)])
+            devices = await self.store.get_e2e_device_keys([(user_id, None)])
 
             if user_id not in devices:
                 raise NotFoundError("No device keys found")
@@ -853,8 +841,7 @@ class E2eKeysHandler(object):
 
         return master_key_signature_list
 
-    @defer.inlineCallbacks
-    def _process_other_signatures(self, user_id, signatures):
+    async def _process_other_signatures(self, user_id, signatures):
         """Process uploaded signatures of other users' keys.  These will be the
         target user's master keys, signed by the uploading user's user-signing
         key.
@@ -882,7 +869,7 @@ class E2eKeysHandler(object):
                 user_signing_key,
                 user_signing_key_id,
                 user_signing_verify_key,
-            ) = yield self._get_e2e_cross_signing_verify_key(user_id, "user_signing")
+            ) = await self._get_e2e_cross_signing_verify_key(user_id, "user_signing")
         except SynapseError as e:
             failure = _exception_to_failure(e)
             for user, devicemap in signatures.items():
@@ -905,7 +892,7 @@ class E2eKeysHandler(object):
                     master_key,
                     master_key_id,
                     _,
-                ) = yield self._get_e2e_cross_signing_verify_key(
+                ) = await self._get_e2e_cross_signing_verify_key(
                     target_user, "master", user_id
                 )
 
@@ -958,8 +945,7 @@ class E2eKeysHandler(object):
 
         return signature_list, failures
 
-    @defer.inlineCallbacks
-    def _get_e2e_cross_signing_verify_key(
+    async def _get_e2e_cross_signing_verify_key(
         self, user_id: str, key_type: str, from_user_id: str = None
     ):
         """Fetch locally or remotely query for a cross-signing public key.
@@ -983,7 +969,7 @@ class E2eKeysHandler(object):
             SynapseError: if `user_id` is invalid
         """
         user = UserID.from_string(user_id)
-        key = yield self.store.get_e2e_cross_signing_key(
+        key = await self.store.get_e2e_cross_signing_key(
             user_id, key_type, from_user_id
         )
 
@@ -1009,15 +995,14 @@ class E2eKeysHandler(object):
             key,
             key_id,
             verify_key,
-        ) = yield self._retrieve_cross_signing_keys_for_remote_user(user, key_type)
+        ) = await self._retrieve_cross_signing_keys_for_remote_user(user, key_type)
 
         if key is None:
             raise NotFoundError("No %s key found for %s" % (key_type, user_id))
 
         return key, key_id, verify_key
 
-    @defer.inlineCallbacks
-    def _retrieve_cross_signing_keys_for_remote_user(
+    async def _retrieve_cross_signing_keys_for_remote_user(
         self, user: UserID, desired_key_type: str,
     ):
         """Queries cross-signing keys for a remote user and saves them to the database
@@ -1035,7 +1020,7 @@ class E2eKeysHandler(object):
             If the key cannot be retrieved, all values in the tuple will instead be None.
         """
         try:
-            remote_result = yield self.federation.query_user_devices(
+            remote_result = await self.federation.query_user_devices(
                 user.domain, user.to_string()
             )
         except Exception as e:
@@ -1101,14 +1086,14 @@ class E2eKeysHandler(object):
                 desired_key_id = key_id
 
             # At the same time, store this key in the db for subsequent queries
-            yield self.store.set_e2e_cross_signing_key(
+            await self.store.set_e2e_cross_signing_key(
                 user.to_string(), key_type, key_content
             )
 
         # Notify clients that new devices for this user have been discovered
         if retrieved_device_ids:
             # XXX is this necessary?
-            yield self.device_handler.notify_device_update(
+            await self.device_handler.notify_device_update(
                 user.to_string(), retrieved_device_ids
             )
 
@@ -1250,8 +1235,7 @@ class SigningKeyEduUpdater(object):
             iterable=True,
         )
 
-    @defer.inlineCallbacks
-    def incoming_signing_key_update(self, origin, edu_content):
+    async def incoming_signing_key_update(self, origin, edu_content):
         """Called on incoming signing key update from federation. Responsible for
         parsing the EDU and adding to pending updates list.
 
@@ -1268,7 +1252,7 @@ class SigningKeyEduUpdater(object):
             logger.warning("Got signing key update edu for %r from %r", user_id, origin)
             return
 
-        room_ids = yield self.store.get_rooms_for_user(user_id)
+        room_ids = await self.store.get_rooms_for_user(user_id)
         if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
             # probably won't get any further updates.
@@ -1278,10 +1262,9 @@ class SigningKeyEduUpdater(object):
             (master_key, self_signing_key)
         )
 
-        yield self._handle_signing_key_updates(user_id)
+        await self._handle_signing_key_updates(user_id)
 
-    @defer.inlineCallbacks
-    def _handle_signing_key_updates(self, user_id):
+    async def _handle_signing_key_updates(self, user_id):
         """Actually handle pending updates.
 
         Args:
@@ -1291,7 +1274,7 @@ class SigningKeyEduUpdater(object):
         device_handler = self.e2e_keys_handler.device_handler
         device_list_updater = device_handler.device_list_updater
 
-        with (yield self._remote_edu_linearizer.queue(user_id)):
+        with (await self._remote_edu_linearizer.queue(user_id)):
             pending_updates = self._pending_updates.pop(user_id, [])
             if not pending_updates:
                 # This can happen since we batch updates
@@ -1302,9 +1285,9 @@ class SigningKeyEduUpdater(object):
             logger.info("pending updates: %r", pending_updates)
 
             for master_key, self_signing_key in pending_updates:
-                new_device_ids = yield device_list_updater.process_cross_signing_key_update(
+                new_device_ids = await device_list_updater.process_cross_signing_key_update(
                     user_id, master_key, self_signing_key,
                 )
                 device_ids = device_ids + new_device_ids
 
-            yield device_handler.notify_device_update(user_id, device_ids)
+            await device_handler.notify_device_update(user_id, device_ids)