diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 4812fb4496..51ee0e79df 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -388,6 +388,62 @@ class ApplicationServiceApi(SimpleHttpClient):
failed_transactions_counter.labels(service.id).inc()
return False
+ async def claim_client_keys(
+ self, service: "ApplicationService", query: List[Tuple[str, str, str]]
+ ) -> Tuple[Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str]]]:
+ """Claim one time keys from an application service.
+
+ Args:
+ query: An iterable of tuples of (user ID, device ID, algorithm).
+
+ Returns:
+ A tuple of:
+ A map of user ID -> a map device ID -> a map of key ID -> JSON dict.
+
+ A copy of the input which has not been fulfilled because the
+ appservice doesn't support this endpoint or has not returned
+ data for that tuple.
+ """
+ if service.url is None:
+ return {}, query
+
+ # This is required by the configuration.
+ assert service.hs_token is not None
+
+ # Create the expected payload shape.
+ body: Dict[str, Dict[str, List[str]]] = {}
+ for user_id, device, algorithm in query:
+ body.setdefault(user_id, {}).setdefault(device, []).append(algorithm)
+
+ uri = f"{service.url}/_matrix/app/unstable/org.matrix.msc3983/keys/claim"
+ try:
+ response = await self.post_json_get_json(
+ uri,
+ body,
+ headers={"Authorization": [f"Bearer {service.hs_token}"]},
+ )
+ except CodeMessageException as e:
+ # The appservice doesn't support this endpoint.
+ if e.code == 404 or e.code == 405:
+ return {}, query
+ logger.warning("claim_keys to %s received %s", uri, e.code)
+ return {}, query
+ except Exception as ex:
+ logger.warning("claim_keys to %s threw exception %s", uri, ex)
+ return {}, query
+
+ # Check if the appservice fulfilled all of the queried user/device/algorithms
+ # or if some are still missing.
+ #
+ # TODO This places a lot of faith in the response shape being correct.
+ missing = [
+ (user_id, device, algorithm)
+ for user_id, device, algorithm in query
+ if algorithm not in response.get(user_id, {}).get(device, [])
+ ]
+
+ return response, missing
+
def _serialize(
self, service: "ApplicationService", events: Iterable[EventBase]
) -> List[JsonDict]:
|