summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2022-09-07 11:03:32 +0000
committerGitHub <noreply@github.com>2022-09-07 12:03:32 +0100
commitd3d9ca156e323fe194b1bcb1af1628f65a2f3c1c (patch)
tree74e19b799192c8be69179e27ceadda7a8beac18c /synapse/handlers
parentRename the `EventFormatVersions` enum values so that they line up with room v... (diff)
downloadsynapse-d3d9ca156e323fe194b1bcb1af1628f65a2f3c1c.tar.xz
Cancel the processing of key query requests when they time out. (#13680)
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/device.py3
-rw-r--r--synapse/handlers/e2e_keys.py40
2 files changed, 27 insertions, 16 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9c2c3a0e68..c5ac169644 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -52,6 +52,7 @@ from synapse.types import (
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.cancellation import cancellable
 from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -124,6 +125,7 @@ class DeviceWorkerHandler:
 
         return device
 
+    @cancellable
     async def get_device_changes_in_shared_rooms(
         self, user_id: str, room_ids: Collection[str], from_token: StreamToken
     ) -> Collection[str]:
@@ -163,6 +165,7 @@ class DeviceWorkerHandler:
 
     @trace
     @measure_func("device.get_user_ids_changed")
+    @cancellable
     async def get_user_ids_changed(
         self, user_id: str, from_token: StreamToken
     ) -> JsonDict:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index c938339ddd..ec81639c78 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -37,7 +37,8 @@ from synapse.types import (
     get_verify_key_from_cross_signing_key,
 )
 from synapse.util import json_decoder, unwrapFirstError
-from synapse.util.async_helpers import Linearizer
+from synapse.util.async_helpers import Linearizer, delay_cancellation
+from synapse.util.cancellation import cancellable
 from synapse.util.retryutils import NotRetryingDestination
 
 if TYPE_CHECKING:
@@ -91,6 +92,7 @@ class E2eKeysHandler:
         )
 
     @trace
+    @cancellable
     async def query_devices(
         self,
         query_body: JsonDict,
@@ -208,22 +210,26 @@ class E2eKeysHandler:
                     r[user_id] = remote_queries[user_id]
 
             # Now fetch any devices that we don't have in our cache
+            # TODO It might make sense to propagate cancellations into the
+            #      deferreds which are querying remote homeservers.
             await make_deferred_yieldable(
-                defer.gatherResults(
-                    [
-                        run_in_background(
-                            self._query_devices_for_destination,
-                            results,
-                            cross_signing_keys,
-                            failures,
-                            destination,
-                            queries,
-                            timeout,
-                        )
-                        for destination, queries in remote_queries_not_in_cache.items()
-                    ],
-                    consumeErrors=True,
-                ).addErrback(unwrapFirstError)
+                delay_cancellation(
+                    defer.gatherResults(
+                        [
+                            run_in_background(
+                                self._query_devices_for_destination,
+                                results,
+                                cross_signing_keys,
+                                failures,
+                                destination,
+                                queries,
+                                timeout,
+                            )
+                            for destination, queries in remote_queries_not_in_cache.items()
+                        ],
+                        consumeErrors=True,
+                    ).addErrback(unwrapFirstError)
+                )
             )
 
             ret = {"device_keys": results, "failures": failures}
@@ -347,6 +353,7 @@ class E2eKeysHandler:
 
         return
 
+    @cancellable
     async def get_cross_signing_keys_from_cache(
         self, query: Iterable[str], from_user_id: Optional[str]
     ) -> Dict[str, Dict[str, dict]]:
@@ -393,6 +400,7 @@ class E2eKeysHandler:
         }
 
     @trace
+    @cancellable
     async def query_local_devices(
         self, query: Mapping[str, Optional[List[str]]]
     ) -> Dict[str, Dict[str, dict]]: