summary refs log tree commit diff
path: root/synapse/handlers/e2e_keys.py
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2023-01-10 11:17:59 +0000
committerGitHub <noreply@github.com>2023-01-10 11:17:59 +0000
commitba4ea7d13ffae53644b206222af95a5171faa27c (patch)
tree7867aabc7a90d7ad1b539c015db7115d50af1d8c /synapse/handlers/e2e_keys.py
parentAdd missing worker settings to shared configuration (#14748) (diff)
downloadsynapse-ba4ea7d13ffae53644b206222af95a5171faa27c.tar.xz
Batch up replication requests to request the resyncing of remote users's devices. (#14716)
Diffstat (limited to 'synapse/handlers/e2e_keys.py')
-rw-r--r--synapse/handlers/e2e_keys.py93
1 files changed, 55 insertions, 38 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5fe102e2f2..d2188ca08f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -36,8 +36,8 @@ from synapse.types import (
     get_domain_from_id,
     get_verify_key_from_cross_signing_key,
 )
-from synapse.util import json_decoder, unwrapFirstError
-from synapse.util.async_helpers import Linearizer, delay_cancellation
+from synapse.util import json_decoder
+from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.cancellation import cancellable
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -238,24 +238,28 @@ class E2eKeysHandler:
             # Now fetch any devices that we don't have in our cache
             # TODO It might make sense to propagate cancellations into the
             #      deferreds which are querying remote homeservers.
-            await make_deferred_yieldable(
-                delay_cancellation(
-                    defer.gatherResults(
-                        [
-                            run_in_background(
-                                self._query_devices_for_destination,
-                                results,
-                                cross_signing_keys,
-                                failures,
-                                destination,
-                                queries,
-                                timeout,
-                            )
-                            for destination, queries in remote_queries_not_in_cache.items()
-                        ],
-                        consumeErrors=True,
-                    ).addErrback(unwrapFirstError)
+            logger.debug(
+                "%d destinations to query devices for", len(remote_queries_not_in_cache)
+            )
+
+            async def _query(
+                destination_queries: Tuple[str, Dict[str, Iterable[str]]]
+            ) -> None:
+                destination, queries = destination_queries
+                return await self._query_devices_for_destination(
+                    results,
+                    cross_signing_keys,
+                    failures,
+                    destination,
+                    queries,
+                    timeout,
                 )
+
+            await concurrently_execute(
+                _query,
+                remote_queries_not_in_cache.items(),
+                10,
+                delay_cancellation=True,
             )
 
             ret = {"device_keys": results, "failures": failures}
@@ -300,28 +304,41 @@ class E2eKeysHandler:
         # queries. We use the more efficient batched query_client_keys for all
         # remaining users
         user_ids_updated = []
-        for (user_id, device_list) in destination_query.items():
-            if user_id in user_ids_updated:
-                continue
 
-            if device_list:
-                continue
+        # Perform a user device resync for each user only once and only as long as:
+        # - they have an empty device_list
+        # - they are in some rooms that this server can see
+        users_to_resync_devices = {
+            user_id
+            for (user_id, device_list) in destination_query.items()
+            if (not device_list) and (await self.store.get_rooms_for_user(user_id))
+        }
 
-            room_ids = await self.store.get_rooms_for_user(user_id)
-            if not room_ids:
-                continue
+        logger.debug(
+            "%d users to resync devices for from destination %s",
+            len(users_to_resync_devices),
+            destination,
+        )
 
-            # We've decided we're sharing a room with this user and should
-            # probably be tracking their device lists. However, we haven't
-            # done an initial sync on the device list so we do it now.
-            try:
-                resync_results = (
-                    await self.device_handler.device_list_updater.user_device_resync(
-                        user_id
-                    )
+        try:
+            user_resync_results = (
+                await self.device_handler.device_list_updater.multi_user_device_resync(
+                    list(users_to_resync_devices)
                 )
+            )
+            for user_id in users_to_resync_devices:
+                resync_results = user_resync_results[user_id]
+
                 if resync_results is None:
-                    raise ValueError("Device resync failed")
+                    # TODO: It's weird that we'll store a failure against a
+                    #       destination, yet continue processing users from that
+                    #       destination.
+                    #       We might want to consider changing this, but for now
+                    #       I'm leaving it as I found it.
+                    failures[destination] = _exception_to_failure(
+                        ValueError(f"Device resync failed for {user_id!r}")
+                    )
+                    continue
 
                 # Add the device keys to the results.
                 user_devices = resync_results["devices"]
@@ -339,8 +356,8 @@ class E2eKeysHandler:
 
                 if self_signing_key:
                     cross_signing_keys["self_signing_keys"][user_id] = self_signing_key
-            except Exception as e:
-                failures[destination] = _exception_to_failure(e)
+        except Exception as e:
+            failures[destination] = _exception_to_failure(e)
 
         if len(destination_query) == len(user_ids_updated):
             # We've updated all the users in the query and we do not need to