diff --git a/synapse/config/key.py b/synapse/config/key.py
index 035ee2416b..ee83c6c06b 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -16,12 +16,14 @@
import hashlib
import logging
import os
-from typing import Any, Dict
+from typing import Any, Dict, Iterator, List, Optional
import attr
import jsonschema
from signedjson.key import (
NACL_ED25519,
+ SigningKey,
+ VerifyKey,
decode_signing_key_base64,
decode_verify_key_bytes,
generate_signing_key,
@@ -31,6 +33,7 @@ from signedjson.key import (
)
from unpaddedbase64 import decode_base64
+from synapse.types import JsonDict
from synapse.util.stringutils import random_string, random_string_with_symbols
from ._base import Config, ConfigError
@@ -81,14 +84,13 @@ To suppress this warning and continue using 'matrix.org', admins should set
logger = logging.getLogger(__name__)
-@attr.s
+@attr.s(slots=True, auto_attribs=True)
class TrustedKeyServer:
- # string: name of the server.
- server_name = attr.ib()
+ # name of the server.
+ server_name: str
- # dict[str,VerifyKey]|None: map from key id to key object, or None to disable
- # signature verification.
- verify_keys = attr.ib(default=None)
+ # map from key id to key object, or None to disable signature verification.
+ verify_keys: Optional[Dict[str, VerifyKey]] = None
class KeyConfig(Config):
@@ -279,15 +281,15 @@ class KeyConfig(Config):
% locals()
)
- def read_signing_keys(self, signing_key_path, name):
+ def read_signing_keys(self, signing_key_path: str, name: str) -> List[SigningKey]:
"""Read the signing keys in the given path.
Args:
- signing_key_path (str)
- name (str): Associated config key name
+ signing_key_path
+ name: Associated config key name
Returns:
- list[SigningKey]
+ The signing keys read from the given path.
"""
signing_keys = self.read_file(signing_key_path, name)
@@ -296,7 +298,9 @@ class KeyConfig(Config):
except Exception as e:
raise ConfigError("Error reading %s: %s" % (name, str(e)))
- def read_old_signing_keys(self, old_signing_keys):
+ def read_old_signing_keys(
+ self, old_signing_keys: Optional[JsonDict]
+ ) -> Dict[str, VerifyKey]:
if old_signing_keys is None:
return {}
keys = {}
@@ -340,7 +344,7 @@ class KeyConfig(Config):
write_signing_keys(signing_key_file, (key,))
-def _perspectives_to_key_servers(config):
+def _perspectives_to_key_servers(config: JsonDict) -> Iterator[JsonDict]:
"""Convert old-style 'perspectives' configs into new-style 'trusted_key_servers'
Returns an iterable of entries to add to trusted_key_servers.
@@ -402,7 +406,9 @@ TRUSTED_KEY_SERVERS_SCHEMA = {
}
-def _parse_key_servers(key_servers, federation_verify_certificates):
+def _parse_key_servers(
+ key_servers: List[Any], federation_verify_certificates: bool
+) -> Iterator[TrustedKeyServer]:
try:
jsonschema.validate(key_servers, TRUSTED_KEY_SERVERS_SCHEMA)
except jsonschema.ValidationError as e:
@@ -444,7 +450,7 @@ def _parse_key_servers(key_servers, federation_verify_certificates):
yield result
-def _assert_keyserver_has_verify_keys(trusted_key_server):
+def _assert_keyserver_has_verify_keys(trusted_key_server: TrustedKeyServer) -> None:
if not trusted_key_server.verify_keys:
raise ConfigError(INSECURE_NOTARY_ERROR)
|