diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 60a1acc6f4..fd11935b40 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -22,6 +22,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, CodeMessageException
from synapse.types import get_domain_from_id
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
logger = logging.getLogger(__name__)
@@ -88,18 +89,28 @@ class E2eKeysHandler(object):
def do_remote_query(destination):
destination_query = remote_queries[destination]
try:
- remote_result = yield self.federation.query_client_keys(
- destination,
- {"device_keys": destination_query},
- timeout=timeout
+ limiter = yield get_retry_limiter(
+ destination, self.clock, self.store
)
+ with limiter:
+ remote_result = yield self.federation.query_client_keys(
+ destination,
+ {"device_keys": destination_query},
+ timeout=timeout
+ )
+
for user_id, keys in remote_result["device_keys"].items():
if user_id in destination_query:
results[user_id] = keys
+
except CodeMessageException as e:
failures[destination] = {
"status": e.code, "message": e.message
}
+ except NotRetryingDestination as e:
+ failures[destination] = {
+ "status": 503, "message": "Not ready for retry",
+ }
yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(do_remote_query)(destination)
@@ -191,18 +202,26 @@ class E2eKeysHandler(object):
def claim_client_keys(destination):
device_keys = remote_queries[destination]
try:
- remote_result = yield self.federation.claim_client_keys(
- destination,
- {"one_time_keys": device_keys},
- timeout=timeout
+ limiter = yield get_retry_limiter(
+ destination, self.clock, self.store
)
- for user_id, keys in remote_result["one_time_keys"].items():
- if user_id in device_keys:
- json_result[user_id] = keys
+ with limiter:
+ remote_result = yield self.federation.claim_client_keys(
+ destination,
+ {"one_time_keys": device_keys},
+ timeout=timeout
+ )
+ for user_id, keys in remote_result["one_time_keys"].items():
+ if user_id in device_keys:
+ json_result[user_id] = keys
except CodeMessageException as e:
failures[destination] = {
"status": e.code, "message": e.message
}
+ except NotRetryingDestination as e:
+ failures[destination] = {
+ "status": 503, "message": "Not ready for retry",
+ }
yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(claim_client_keys)(destination)
|