diff options
author | Patrick Cloke <clokep@users.noreply.github.com> | 2023-03-28 14:26:27 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-03-28 18:26:27 +0000 |
commit | 5282ba1e2bbff2635dc09aec45fd42a56c1a4545 (patch) | |
tree | 94377879ae342e639bb05c2257765c7f94bc048e /synapse/handlers | |
parent | Speed up generate sample config CI lint (#15340) (diff) | |
download | synapse-5282ba1e2bbff2635dc09aec45fd42a56c1a4545.tar.xz |
Implement MSC3983 to proxy /keys/claim queries to appservices. (#15314)
Experimental support for MSC3983 is behind a configuration flag. If enabled, for users which are exclusively owned by an application service then the appservice will be queried for one-time keys *if* there are none uploaded to Synapse.
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 74 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 57 |
2 files changed, 122 insertions, 9 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ec3ab968e9..953df4d9cd 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -12,7 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Union +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, +) from prometheus_client import Counter @@ -829,3 +838,66 @@ class ApplicationServicesHandler: if unknown_user: return await self.query_user_exists(user_id) return True + + async def claim_e2e_one_time_keys( + self, query: Iterable[Tuple[str, str, str]] + ) -> Tuple[ + Iterable[Dict[str, Dict[str, Dict[str, JsonDict]]]], List[Tuple[str, str, str]] + ]: + """Claim one time keys from application services. + + Args: + query: An iterable of tuples of (user ID, device ID, algorithm). + + Returns: + A tuple of: + An iterable of maps of user ID -> a map device ID -> a map of key ID -> JSON bytes. + + A copy of the input which has not been fulfilled (either because + they are not appservice users or the appservice does not support + providing OTKs). + """ + services = self.store.get_app_services() + + # Partition the users by appservice. + query_by_appservice: Dict[str, List[Tuple[str, str, str]]] = {} + missing = [] + for user_id, device, algorithm in query: + if not self.store.get_if_app_services_interested_in_user(user_id): + missing.append((user_id, device, algorithm)) + continue + + # Find the associated appservice. + for service in services: + if service.is_exclusive_user(user_id): + query_by_appservice.setdefault(service.id, []).append( + (user_id, device, algorithm) + ) + continue + + # Query each service in parallel. + results = await make_deferred_yieldable( + defer.DeferredList( + [ + run_in_background( + self.appservice_api.claim_client_keys, + # We know this must be an app service. + self.store.get_app_service_by_id(service_id), # type: ignore[arg-type] + service_query, + ) + for service_id, service_query in query_by_appservice.items() + ], + consumeErrors=True, + ) + ) + + # Patch together the results -- they are all independent (since they + # require exclusive control over the users). They get returned as a list + # and the caller combines them. + claimed_keys: List[Dict[str, Dict[str, Dict[str, JsonDict]]]] = [] + for success, result in results: + if success: + claimed_keys.append(result[0]) + missing.extend(result[1]) + + return claimed_keys, missing diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 4e9c8d8db0..9e7c2c45b5 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -13,7 +13,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Optional, Tuple @@ -53,6 +52,7 @@ class E2eKeysHandler: self.store = hs.get_datastores().main self.federation = hs.get_federation_client() self.device_handler = hs.get_device_handler() + self._appservice_handler = hs.get_application_service_handler() self.is_mine = hs.is_mine self.clock = hs.get_clock() @@ -88,6 +88,10 @@ class E2eKeysHandler: max_count=10, ) + self._query_appservices_for_otks = ( + hs.config.experimental.msc3983_appservice_otk_claims + ) + @trace @cancellable async def query_devices( @@ -542,6 +546,42 @@ class E2eKeysHandler: return ret + async def claim_local_one_time_keys( + self, local_query: List[Tuple[str, str, str]] + ) -> Iterable[Dict[str, Dict[str, Dict[str, JsonDict]]]]: + """Claim one time keys for local users. + + 1. Attempt to claim OTKs from the database. + 2. Ask application services if they provide OTKs. + 3. Attempt to fetch fallback keys from the database. + + Args: + local_query: An iterable of tuples of (user ID, device ID, algorithm). + + Returns: + An iterable of maps of user ID -> a map device ID -> a map of key ID -> JSON bytes. + """ + + otk_results, not_found = await self.store.claim_e2e_one_time_keys(local_query) + + # If the application services have not provided any keys via the C-S + # API, query it directly for one-time keys. + if self._query_appservices_for_otks: + ( + appservice_results, + not_found, + ) = await self._appservice_handler.claim_e2e_one_time_keys(not_found) + else: + appservice_results = [] + + # For each user that does not have a one-time keys available, see if + # there is a fallback key. + fallback_results = await self.store.claim_e2e_fallback_keys(not_found) + + # Return the results in order, each item from the input query should + # only appear once in the combined list. + return (otk_results, *appservice_results, fallback_results) + @trace async def claim_one_time_keys( self, query: Dict[str, Dict[str, Dict[str, str]]], timeout: Optional[int] @@ -561,17 +601,18 @@ class E2eKeysHandler: set_tag("local_key_query", str(local_query)) set_tag("remote_key_query", str(remote_queries)) - results = await self.store.claim_e2e_one_time_keys(local_query) + results = await self.claim_local_one_time_keys(local_query) # A map of user ID -> device ID -> key ID -> key. json_result: Dict[str, Dict[str, Dict[str, JsonDict]]] = {} + for result in results: + for user_id, device_keys in result.items(): + for device_id, keys in device_keys.items(): + for key_id, key in keys.items(): + json_result.setdefault(user_id, {})[device_id] = {key_id: key} + + # Remote failures. failures: Dict[str, JsonDict] = {} - for user_id, device_keys in results.items(): - for device_id, keys in device_keys.items(): - for key_id, json_str in keys.items(): - json_result.setdefault(user_id, {})[device_id] = { - key_id: json_decoder.decode(json_str) - } @trace async def claim_client_keys(destination: str) -> None: |