summary refs log tree commit diff
path: root/synapse/handlers/e2e_keys.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/e2e_keys.py97
1 files changed, 82 insertions, 15 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py

index f78e66ad0a..6171aaf29f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py
@@ -39,6 +39,8 @@ from synapse.replication.http.devices import ReplicationUploadKeysForUserRestSer from synapse.types import ( JsonDict, JsonMapping, + ScheduledTask, + TaskStatus, UserID, get_domain_from_id, get_verify_key_from_cross_signing_key, @@ -70,6 +72,7 @@ class E2eKeysHandler: self.is_mine = hs.is_mine self.clock = hs.get_clock() self._worker_lock_handler = hs.get_worker_locks_handler() + self._task_scheduler = hs.get_task_scheduler() federation_registry = hs.get_federation_registry() @@ -116,6 +119,10 @@ class E2eKeysHandler: hs.config.experimental.msc3984_appservice_key_query ) + self._task_scheduler.register_action( + self._delete_old_one_time_keys_task, "delete_old_otks" + ) + @trace @cancellable async def query_devices( @@ -151,7 +158,37 @@ class E2eKeysHandler: the number of in-flight queries at a time. """ async with self._query_devices_linearizer.queue((from_user_id, from_device_id)): - device_keys_query: Dict[str, List[str]] = query_body.get("device_keys", {}) + + async def filter_device_key_query( + query: Dict[str, List[str]], + ) -> Dict[str, List[str]]: + if not self.config.experimental.msc4263_limit_key_queries_to_users_who_share_rooms: + # Only ignore invalid user IDs, which is the same behaviour as if + # the user existed but had no keys. + return { + user_id: v + for user_id, v in query.items() + if UserID.is_valid(user_id) + } + + # Strip invalid user IDs and user IDs the requesting user does not share rooms with. + valid_user_ids = [ + user_id for user_id in query.keys() if UserID.is_valid(user_id) + ] + allowed_user_ids = set( + await self.store.do_users_share_a_room_joined_or_invited( + from_user_id, valid_user_ids + ) + ) + return { + user_id: v + for user_id, v in query.items() + if user_id in allowed_user_ids + } + + device_keys_query: Dict[str, List[str]] = await filter_device_key_query( + query_body.get("device_keys", {}) + ) # separate users by domain. # make a map from domain to user_id to device_ids @@ -159,11 +196,6 @@ class E2eKeysHandler: remote_queries = {} for user_id, device_ids in device_keys_query.items(): - if not UserID.is_valid(user_id): - # Ignore invalid user IDs, which is the same behaviour as if - # the user existed but had no keys. - continue - # we use UserID.from_string to catch invalid user ids if self.is_mine(UserID.from_string(user_id)): local_query[user_id] = device_ids @@ -615,7 +647,7 @@ class E2eKeysHandler: 3. Attempt to fetch fallback keys from the database. Args: - local_query: An iterable of tuples of (user ID, device ID, algorithm). + local_query: An iterable of tuples of (user ID, device ID, algorithm, number of keys). always_include_fallback_keys: True to always include fallback keys. Returns: @@ -1156,7 +1188,7 @@ class E2eKeysHandler: devices = devices[user_id] except SynapseError as e: failure = _exception_to_failure(e) - failures[user_id] = {device: failure for device in signatures.keys()} + failures[user_id] = dict.fromkeys(signatures.keys(), failure) return signature_list, failures for device_id, device in signatures.items(): @@ -1296,7 +1328,7 @@ class E2eKeysHandler: except SynapseError as e: failure = _exception_to_failure(e) for user, devicemap in signatures.items(): - failures[user] = {device_id: failure for device_id in devicemap.keys()} + failures[user] = dict.fromkeys(devicemap.keys(), failure) return signature_list, failures for target_user, devicemap in signatures.items(): @@ -1337,9 +1369,7 @@ class E2eKeysHandler: # other devices were signed -- mark those as failures logger.debug("upload signature: too many devices specified") failure = _exception_to_failure(NotFoundError("Unknown device")) - failures[target_user] = { - device: failure for device in other_devices - } + failures[target_user] = dict.fromkeys(other_devices, failure) if user_signing_key_id in master_key.get("signatures", {}).get( user_id, {} @@ -1360,9 +1390,7 @@ class E2eKeysHandler: except SynapseError as e: failure = _exception_to_failure(e) if device_id is None: - failures[target_user] = { - device_id: failure for device_id in devicemap.keys() - } + failures[target_user] = dict.fromkeys(devicemap.keys(), failure) else: failures.setdefault(target_user, {})[device_id] = failure @@ -1574,6 +1602,45 @@ class E2eKeysHandler: return True return False + async def _delete_old_one_time_keys_task( + self, task: ScheduledTask + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """Scheduler task to delete old one time keys. + + Until Synapse 1.119, Synapse used to issue one-time-keys in a random order, leading to the possibility + that it could still have old OTKs that the client has dropped. This task is scheduled exactly once + by a database schema delta file, and it clears out old one-time-keys that look like they came from libolm. + """ + last_user = task.result.get("from_user", "") if task.result else "" + while True: + # We process users in batches of 100 + users, rowcount = await self.store.delete_old_otks_for_next_user_batch( + last_user, 100 + ) + if len(users) == 0: + # We're done! + return TaskStatus.COMPLETE, None, None + + logger.debug( + "Deleted %i old one-time-keys for users '%s'..'%s'", + rowcount, + users[0], + users[-1], + ) + last_user = users[-1] + + # Store our progress + await self._task_scheduler.update_task( + task.id, result={"from_user": last_user} + ) + + # Sleep a little before doing the next user. + # + # matrix.org has about 15M users in the e2e_one_time_keys_json table + # (comprising 20M devices). We want this to take about a week, so we need + # to do about one batch of 100 users every 4 seconds. + await self.clock.sleep(4) + def _check_cross_signing_key( key: JsonDict, user_id: str, key_type: str, signing_key: Optional[VerifyKey] = None