diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 953df4d9cd..da887647d4 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -18,6 +18,7 @@ from typing import (
Dict,
Iterable,
List,
+ Mapping,
Optional,
Tuple,
Union,
@@ -846,6 +847,10 @@ class ApplicationServicesHandler:
]:
"""Claim one time keys from application services.
+ Users which are exclusively owned by an application service are sent a
+ key claim request to check if the application service provides keys
+ directly.
+
Args:
query: An iterable of tuples of (user ID, device ID, algorithm).
@@ -901,3 +906,59 @@ class ApplicationServicesHandler:
missing.extend(result[1])
return claimed_keys, missing
+
+ async def query_keys(
+ self, query: Mapping[str, Optional[List[str]]]
+ ) -> Dict[str, Dict[str, Dict[str, JsonDict]]]:
+ """Query application services for device keys.
+
+ Users which are exclusively owned by an application service are queried
+ for keys to check if the application service provides keys directly.
+
+ Args:
+ query: map from user_id to a list of devices to query
+
+ Returns:
+ A map from user_id -> device_id -> device details
+ """
+ services = self.store.get_app_services()
+
+ # Partition the users by appservice.
+ query_by_appservice: Dict[str, Dict[str, List[str]]] = {}
+ for user_id, device_ids in query.items():
+ if not self.store.get_if_app_services_interested_in_user(user_id):
+ continue
+
+ # Find the associated appservice.
+ for service in services:
+ if service.is_exclusive_user(user_id):
+ query_by_appservice.setdefault(service.id, {})[user_id] = (
+ device_ids or []
+ )
+ continue
+
+ # Query each service in parallel.
+ results = await make_deferred_yieldable(
+ defer.DeferredList(
+ [
+ run_in_background(
+ self.appservice_api.query_keys,
+ # We know this must be an app service.
+ self.store.get_app_service_by_id(service_id), # type: ignore[arg-type]
+ service_query,
+ )
+ for service_id, service_query in query_by_appservice.items()
+ ],
+ consumeErrors=True,
+ )
+ )
+
+ # Patch together the results -- they are all independent (since they
+ # require exclusive control over the users). They get returned as a single
+ # dictionary.
+ key_queries: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
+ for success, result in results:
+ if success:
+ key_queries.update(result)
+
+ return key_queries
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 9e7c2c45b5..0073667470 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -91,6 +91,9 @@ class E2eKeysHandler:
self._query_appservices_for_otks = (
hs.config.experimental.msc3983_appservice_otk_claims
)
+ self._query_appservices_for_keys = (
+ hs.config.experimental.msc3984_appservice_key_query
+ )
@trace
@cancellable
@@ -497,6 +500,19 @@ class E2eKeysHandler:
local_query, include_displaynames
)
+ # Check if the application services have any additional results.
+ if self._query_appservices_for_keys:
+ # Query the appservices for any keys.
+ appservice_results = await self._appservice_handler.query_keys(query)
+
+ # Merge results, overriding with what the appservice returned.
+ for user_id, devices in appservice_results.get("device_keys", {}).items():
+ # Copy the appservice device info over the homeserver device info, but
+ # don't completely overwrite it.
+ results.setdefault(user_id, {}).update(devices)
+
+ # TODO Handle cross-signing keys.
+
# Build the result structure
for user_id, device_keys in results.items():
for device_id, device_info in device_keys.items():
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 80156ef343..65461a0787 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1949,27 +1949,25 @@ class FederationHandler:
)
for event in events:
for attempt in itertools.count():
+ # We try a new destination on every iteration.
try:
- await self._federation_event_handler.update_state_for_partial_state_event(
- destination, event
- )
+ while True:
+ try:
+ await self._federation_event_handler.update_state_for_partial_state_event(
+ destination, event
+ )
+ break
+ except FederationPullAttemptBackoffError as e:
+ # We are in the backoff period for one of the event's
+ # prev_events. Wait it out and try again after.
+ logger.warning(
+ "%s; waiting for %d ms...", e, e.retry_after_ms
+ )
+ await self.clock.sleep(e.retry_after_ms / 1000)
+
+ # Success, no need to try the rest of the destinations.
break
- except FederationPullAttemptBackoffError as exc:
- # Log a warning about why we failed to process the event (the error message
- # for `FederationPullAttemptBackoffError` is pretty good)
- logger.warning("_sync_partial_state_room: %s", exc)
- # We do not record a failed pull attempt when we backoff fetching a missing
- # `prev_event` because not being able to fetch the `prev_events` just means
- # we won't be able to de-outlier the pulled event. But we can still use an
- # `outlier` in the state/auth chain for another event. So we shouldn't stop
- # a downstream event from trying to pull it.
- #
- # This avoids a cascade of backoff for all events in the DAG downstream from
- # one event backoff upstream.
except FederationError as e:
- # TODO: We should `record_event_failed_pull_attempt` here,
- # see https://github.com/matrix-org/synapse/issues/13700
-
if attempt == len(destinations) - 1:
# We have tried every remote server for this event. Give up.
# TODO(faster_joins) giving up isn't the right thing to do
@@ -1986,6 +1984,8 @@ class FederationHandler:
destination,
e,
)
+ # TODO: We should `record_event_failed_pull_attempt` here,
+ # see https://github.com/matrix-org/synapse/issues/13700
raise
# Try the next remote server.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 648843cdbe..982c8d3b2f 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -140,6 +140,7 @@ class FederationEventHandler:
"""
def __init__(self, hs: "HomeServer"):
+ self._clock = hs.get_clock()
self._store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
@@ -1038,8 +1039,8 @@ class FederationEventHandler:
Raises:
FederationPullAttemptBackoffError if we are are deliberately not attempting
- to pull the given event over federation because we've already done so
- recently and are backing off.
+ to pull one of the given event's `prev_event`s over federation because
+ we've already done so recently and are backing off.
FederationError if we fail to get the state from the remote server after any
missing `prev_event`s.
"""
@@ -1053,13 +1054,22 @@ class FederationEventHandler:
# If we've already recently attempted to pull this missing event, don't
# try it again so soon. Since we have to fetch all of the prev_events, we can
# bail early here if we find any to ignore.
- prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
- room_id, missing_prevs
+ prevs_with_pull_backoff = (
+ await self._store.get_event_ids_to_not_pull_from_backoff(
+ room_id, missing_prevs
+ )
)
- if len(prevs_to_ignore) > 0:
+ if len(prevs_with_pull_backoff) > 0:
raise FederationPullAttemptBackoffError(
- event_ids=prevs_to_ignore,
- message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
+ event_ids=prevs_with_pull_backoff.keys(),
+ message=(
+ f"While computing context for event={event_id}, not attempting to "
+ f"pull missing prev_events={list(prevs_with_pull_backoff.keys())} "
+ "because we already tried to pull recently (backing off)."
+ ),
+ retry_after_ms=(
+ max(prevs_with_pull_backoff.values()) - self._clock.time_msec()
+ ),
)
if not missing_prevs:
diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py
index 0fc829acf7..e7e0b5e049 100644
--- a/synapse/handlers/oidc.py
+++ b/synapse/handlers/oidc.py
@@ -1239,6 +1239,7 @@ class OidcProvider:
grandfather_existing_users,
extra_attributes,
auth_provider_session_id=sid,
+ registration_enabled=self._config.enable_registration,
)
def _remote_id_from_userinfo(self, userinfo: UserInfo) -> str:
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index 4a27c0f051..c28325323c 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -383,6 +383,7 @@ class SsoHandler:
grandfather_existing_users: Callable[[], Awaitable[Optional[str]]],
extra_login_attributes: Optional[JsonDict] = None,
auth_provider_session_id: Optional[str] = None,
+ registration_enabled: bool = True,
) -> None:
"""
Given an SSO ID, retrieve the user ID for it and possibly register the user.
@@ -435,6 +436,10 @@ class SsoHandler:
auth_provider_session_id: An optional session ID from the IdP.
+ registration_enabled: An optional boolean to enable/disable automatic
+ registrations of new users. If false and the user does not exist then the
+ flow is aborted. Defaults to true.
+
Raises:
MappingException if there was a problem mapping the response to a user.
RedirectException: if the mapping provider needs to redirect the user
@@ -462,8 +467,16 @@ class SsoHandler:
auth_provider_id, remote_user_id, user_id
)
- # Otherwise, generate a new user.
- if not user_id:
+ if not user_id and not registration_enabled:
+ logger.info(
+ "User does not exist and registration are disabled for IdP '%s' and remote_user_id '%s'",
+ auth_provider_id,
+ remote_user_id,
+ )
+ raise MappingException(
+ "User does not exist and registrations are disabled"
+ )
+ elif not user_id: # Otherwise, generate a new user.
attributes = await self._call_attribute_mapper(sso_to_matrix_id_mapper)
next_step_url = self._get_url_for_next_new_user_step(
|