diff options
author | reivilibre <oliverw@matrix.org> | 2023-01-10 11:17:59 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-10 11:17:59 +0000 |
commit | ba4ea7d13ffae53644b206222af95a5171faa27c (patch) | |
tree | 7867aabc7a90d7ad1b539c015db7115d50af1d8c /synapse/handlers/e2e_keys.py | |
parent | Add missing worker settings to shared configuration (#14748) (diff) | |
download | synapse-ba4ea7d13ffae53644b206222af95a5171faa27c.tar.xz |
Batch up replication requests to request the resyncing of remote users's devices. (#14716)
Diffstat (limited to 'synapse/handlers/e2e_keys.py')
-rw-r--r-- | synapse/handlers/e2e_keys.py | 93 |
1 files changed, 55 insertions, 38 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 5fe102e2f2..d2188ca08f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -36,8 +36,8 @@ from synapse.types import ( get_domain_from_id, get_verify_key_from_cross_signing_key, ) -from synapse.util import json_decoder, unwrapFirstError -from synapse.util.async_helpers import Linearizer, delay_cancellation +from synapse.util import json_decoder +from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.cancellation import cancellable from synapse.util.retryutils import NotRetryingDestination @@ -238,24 +238,28 @@ class E2eKeysHandler: # Now fetch any devices that we don't have in our cache # TODO It might make sense to propagate cancellations into the # deferreds which are querying remote homeservers. - await make_deferred_yieldable( - delay_cancellation( - defer.gatherResults( - [ - run_in_background( - self._query_devices_for_destination, - results, - cross_signing_keys, - failures, - destination, - queries, - timeout, - ) - for destination, queries in remote_queries_not_in_cache.items() - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + logger.debug( + "%d destinations to query devices for", len(remote_queries_not_in_cache) + ) + + async def _query( + destination_queries: Tuple[str, Dict[str, Iterable[str]]] + ) -> None: + destination, queries = destination_queries + return await self._query_devices_for_destination( + results, + cross_signing_keys, + failures, + destination, + queries, + timeout, ) + + await concurrently_execute( + _query, + remote_queries_not_in_cache.items(), + 10, + delay_cancellation=True, ) ret = {"device_keys": results, "failures": failures} @@ -300,28 +304,41 @@ class E2eKeysHandler: # queries. We use the more efficient batched query_client_keys for all # remaining users user_ids_updated = [] - for (user_id, device_list) in destination_query.items(): - if user_id in user_ids_updated: - continue - if device_list: - continue + # Perform a user device resync for each user only once and only as long as: + # - they have an empty device_list + # - they are in some rooms that this server can see + users_to_resync_devices = { + user_id + for (user_id, device_list) in destination_query.items() + if (not device_list) and (await self.store.get_rooms_for_user(user_id)) + } - room_ids = await self.store.get_rooms_for_user(user_id) - if not room_ids: - continue + logger.debug( + "%d users to resync devices for from destination %s", + len(users_to_resync_devices), + destination, + ) - # We've decided we're sharing a room with this user and should - # probably be tracking their device lists. However, we haven't - # done an initial sync on the device list so we do it now. - try: - resync_results = ( - await self.device_handler.device_list_updater.user_device_resync( - user_id - ) + try: + user_resync_results = ( + await self.device_handler.device_list_updater.multi_user_device_resync( + list(users_to_resync_devices) ) + ) + for user_id in users_to_resync_devices: + resync_results = user_resync_results[user_id] + if resync_results is None: - raise ValueError("Device resync failed") + # TODO: It's weird that we'll store a failure against a + # destination, yet continue processing users from that + # destination. + # We might want to consider changing this, but for now + # I'm leaving it as I found it. + failures[destination] = _exception_to_failure( + ValueError(f"Device resync failed for {user_id!r}") + ) + continue # Add the device keys to the results. user_devices = resync_results["devices"] @@ -339,8 +356,8 @@ class E2eKeysHandler: if self_signing_key: cross_signing_keys["self_signing_keys"][user_id] = self_signing_key - except Exception as e: - failures[destination] = _exception_to_failure(e) + except Exception as e: + failures[destination] = _exception_to_failure(e) if len(destination_query) == len(user_ids_updated): # We've updated all the users in the query and we do not need to |