diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py
index 0422c43fab..8fb116ae18 100644
--- a/synapse/crypto/event_signing.py
+++ b/synapse/crypto/event_signing.py
@@ -18,7 +18,7 @@
import collections.abc
import hashlib
import logging
-from typing import Dict
+from typing import Any, Callable, Dict, Tuple
from canonicaljson import encode_canonical_json
from signedjson.sign import sign_json
@@ -27,13 +27,18 @@ from unpaddedbase64 import decode_base64, encode_base64
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersion
+from synapse.events import EventBase
from synapse.events.utils import prune_event, prune_event_dict
from synapse.types import JsonDict
logger = logging.getLogger(__name__)
+Hasher = Callable[[bytes], "hashlib._Hash"]
-def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
+
+def check_event_content_hash(
+ event: EventBase, hash_algorithm: Hasher = hashlib.sha256
+) -> bool:
"""Check whether the hash for this PDU matches the contents"""
name, expected_hash = compute_content_hash(event.get_pdu_json(), hash_algorithm)
logger.debug(
@@ -67,18 +72,19 @@ def check_event_content_hash(event, hash_algorithm=hashlib.sha256):
return message_hash_bytes == expected_hash
-def compute_content_hash(event_dict, hash_algorithm):
+def compute_content_hash(
+ event_dict: Dict[str, Any], hash_algorithm: Hasher
+) -> Tuple[str, bytes]:
"""Compute the content hash of an event, which is the hash of the
unredacted event.
Args:
- event_dict (dict): The unredacted event as a dict
+ event_dict: The unredacted event as a dict
hash_algorithm: A hasher from `hashlib`, e.g. hashlib.sha256, to use
to hash the event
Returns:
- tuple[str, bytes]: A tuple of the name of hash and the hash as raw
- bytes.
+ A tuple of the name of hash and the hash as raw bytes.
"""
event_dict = dict(event_dict)
event_dict.pop("age_ts", None)
@@ -94,18 +100,19 @@ def compute_content_hash(event_dict, hash_algorithm):
return hashed.name, hashed.digest()
-def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
+def compute_event_reference_hash(
+ event, hash_algorithm: Hasher = hashlib.sha256
+) -> Tuple[str, bytes]:
"""Computes the event reference hash. This is the hash of the redacted
event.
Args:
- event (FrozenEvent)
+ event
hash_algorithm: A hasher from `hashlib`, e.g. hashlib.sha256, to use
to hash the event
Returns:
- tuple[str, bytes]: A tuple of the name of hash and the hash as raw
- bytes.
+ A tuple of the name of hash and the hash as raw bytes.
"""
tmp_event = prune_event(event)
event_dict = tmp_event.get_pdu_json()
@@ -156,7 +163,7 @@ def add_hashes_and_signatures(
event_dict: JsonDict,
signature_name: str,
signing_key: SigningKey,
-):
+) -> None:
"""Add content hash and sign the event
Args:
|