diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index f78e66ad0a..6171aaf29f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -39,6 +39,8 @@ from synapse.replication.http.devices import ReplicationUploadKeysForUserRestSer
from synapse.types import (
JsonDict,
JsonMapping,
+ ScheduledTask,
+ TaskStatus,
UserID,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
@@ -70,6 +72,7 @@ class E2eKeysHandler:
self.is_mine = hs.is_mine
self.clock = hs.get_clock()
self._worker_lock_handler = hs.get_worker_locks_handler()
+ self._task_scheduler = hs.get_task_scheduler()
federation_registry = hs.get_federation_registry()
@@ -116,6 +119,10 @@ class E2eKeysHandler:
hs.config.experimental.msc3984_appservice_key_query
)
+ self._task_scheduler.register_action(
+ self._delete_old_one_time_keys_task, "delete_old_otks"
+ )
+
@trace
@cancellable
async def query_devices(
@@ -151,7 +158,37 @@ class E2eKeysHandler:
the number of in-flight queries at a time.
"""
async with self._query_devices_linearizer.queue((from_user_id, from_device_id)):
- device_keys_query: Dict[str, List[str]] = query_body.get("device_keys", {})
+
+ async def filter_device_key_query(
+ query: Dict[str, List[str]],
+ ) -> Dict[str, List[str]]:
+ if not self.config.experimental.msc4263_limit_key_queries_to_users_who_share_rooms:
+ # Only ignore invalid user IDs, which is the same behaviour as if
+ # the user existed but had no keys.
+ return {
+ user_id: v
+ for user_id, v in query.items()
+ if UserID.is_valid(user_id)
+ }
+
+ # Strip invalid user IDs and user IDs the requesting user does not share rooms with.
+ valid_user_ids = [
+ user_id for user_id in query.keys() if UserID.is_valid(user_id)
+ ]
+ allowed_user_ids = set(
+ await self.store.do_users_share_a_room_joined_or_invited(
+ from_user_id, valid_user_ids
+ )
+ )
+ return {
+ user_id: v
+ for user_id, v in query.items()
+ if user_id in allowed_user_ids
+ }
+
+ device_keys_query: Dict[str, List[str]] = await filter_device_key_query(
+ query_body.get("device_keys", {})
+ )
# separate users by domain.
# make a map from domain to user_id to device_ids
@@ -159,11 +196,6 @@ class E2eKeysHandler:
remote_queries = {}
for user_id, device_ids in device_keys_query.items():
- if not UserID.is_valid(user_id):
- # Ignore invalid user IDs, which is the same behaviour as if
- # the user existed but had no keys.
- continue
-
# we use UserID.from_string to catch invalid user ids
if self.is_mine(UserID.from_string(user_id)):
local_query[user_id] = device_ids
@@ -615,7 +647,7 @@ class E2eKeysHandler:
3. Attempt to fetch fallback keys from the database.
Args:
- local_query: An iterable of tuples of (user ID, device ID, algorithm).
+ local_query: An iterable of tuples of (user ID, device ID, algorithm, number of keys).
always_include_fallback_keys: True to always include fallback keys.
Returns:
@@ -1156,7 +1188,7 @@ class E2eKeysHandler:
devices = devices[user_id]
except SynapseError as e:
failure = _exception_to_failure(e)
- failures[user_id] = {device: failure for device in signatures.keys()}
+ failures[user_id] = dict.fromkeys(signatures.keys(), failure)
return signature_list, failures
for device_id, device in signatures.items():
@@ -1296,7 +1328,7 @@ class E2eKeysHandler:
except SynapseError as e:
failure = _exception_to_failure(e)
for user, devicemap in signatures.items():
- failures[user] = {device_id: failure for device_id in devicemap.keys()}
+ failures[user] = dict.fromkeys(devicemap.keys(), failure)
return signature_list, failures
for target_user, devicemap in signatures.items():
@@ -1337,9 +1369,7 @@ class E2eKeysHandler:
# other devices were signed -- mark those as failures
logger.debug("upload signature: too many devices specified")
failure = _exception_to_failure(NotFoundError("Unknown device"))
- failures[target_user] = {
- device: failure for device in other_devices
- }
+ failures[target_user] = dict.fromkeys(other_devices, failure)
if user_signing_key_id in master_key.get("signatures", {}).get(
user_id, {}
@@ -1360,9 +1390,7 @@ class E2eKeysHandler:
except SynapseError as e:
failure = _exception_to_failure(e)
if device_id is None:
- failures[target_user] = {
- device_id: failure for device_id in devicemap.keys()
- }
+ failures[target_user] = dict.fromkeys(devicemap.keys(), failure)
else:
failures.setdefault(target_user, {})[device_id] = failure
@@ -1574,6 +1602,45 @@ class E2eKeysHandler:
return True
return False
+ async def _delete_old_one_time_keys_task(
+ self, task: ScheduledTask
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ """Scheduler task to delete old one time keys.
+
+ Until Synapse 1.119, Synapse used to issue one-time-keys in a random order, leading to the possibility
+ that it could still have old OTKs that the client has dropped. This task is scheduled exactly once
+ by a database schema delta file, and it clears out old one-time-keys that look like they came from libolm.
+ """
+ last_user = task.result.get("from_user", "") if task.result else ""
+ while True:
+ # We process users in batches of 100
+ users, rowcount = await self.store.delete_old_otks_for_next_user_batch(
+ last_user, 100
+ )
+ if len(users) == 0:
+ # We're done!
+ return TaskStatus.COMPLETE, None, None
+
+ logger.debug(
+ "Deleted %i old one-time-keys for users '%s'..'%s'",
+ rowcount,
+ users[0],
+ users[-1],
+ )
+ last_user = users[-1]
+
+ # Store our progress
+ await self._task_scheduler.update_task(
+ task.id, result={"from_user": last_user}
+ )
+
+ # Sleep a little before doing the next user.
+ #
+ # matrix.org has about 15M users in the e2e_one_time_keys_json table
+ # (comprising 20M devices). We want this to take about a week, so we need
+ # to do about one batch of 100 users every 4 seconds.
+ await self.clock.sleep(4)
+
def _check_cross_signing_key(
key: JsonDict, user_id: str, key_type: str, signing_key: Optional[VerifyKey] = None
|