summary refs log tree commit diff
path: root/synapse/handlers/e2e_keys.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/e2e_keys.py')
-rw-r--r--synapse/handlers/e2e_keys.py174
1 files changed, 152 insertions, 22 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5ea54f60be..f09a0b73c8 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -36,6 +36,8 @@ from synapse.types import (
     get_verify_key_from_cross_signing_key,
 )
 from synapse.util import unwrapFirstError
+from synapse.util.async_helpers import Linearizer
+from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
@@ -49,10 +51,19 @@ class E2eKeysHandler(object):
         self.is_mine = hs.is_mine
         self.clock = hs.get_clock()
 
+        self._edu_updater = SigningKeyEduUpdater(hs, self)
+
+        federation_registry = hs.get_federation_registry()
+
+        # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
+        federation_registry.register_edu_handler(
+            "org.matrix.signing_key_update",
+            self._edu_updater.incoming_signing_key_update,
+        )
         # doesn't really work as part of the generic query API, because the
         # query request requires an object POST, but we abuse the
         # "query handler" interface.
-        hs.get_federation_registry().register_query_handler(
+        federation_registry.register_query_handler(
             "client_keys", self.on_federation_query_client_keys
         )
 
@@ -119,9 +130,10 @@ class E2eKeysHandler(object):
                 else:
                     query_list.append((user_id, None))
 
-            user_ids_not_in_cache, remote_results = (
-                yield self.store.get_user_devices_from_cache(query_list)
-            )
+            (
+                user_ids_not_in_cache,
+                remote_results,
+            ) = yield self.store.get_user_devices_from_cache(query_list)
             for user_id, devices in iteritems(remote_results):
                 user_devices = results.setdefault(user_id, {})
                 for device_id, device in iteritems(devices):
@@ -207,13 +219,15 @@ class E2eKeysHandler(object):
                     if user_id in destination_query:
                         results[user_id] = keys
 
-                for user_id, key in remote_result["master_keys"].items():
-                    if user_id in destination_query:
-                        cross_signing_keys["master_keys"][user_id] = key
+                if "master_keys" in remote_result:
+                    for user_id, key in remote_result["master_keys"].items():
+                        if user_id in destination_query:
+                            cross_signing_keys["master_keys"][user_id] = key
 
-                for user_id, key in remote_result["self_signing_keys"].items():
-                    if user_id in destination_query:
-                        cross_signing_keys["self_signing_keys"][user_id] = key
+                if "self_signing_keys" in remote_result:
+                    for user_id, key in remote_result["self_signing_keys"].items():
+                        if user_id in destination_query:
+                            cross_signing_keys["self_signing_keys"][user_id] = key
 
             except Exception as e:
                 failure = _exception_to_failure(e)
@@ -251,7 +265,7 @@ class E2eKeysHandler(object):
 
         Returns:
             defer.Deferred[dict[str, dict[str, dict]]]: map from
-                (master|self_signing|user_signing) -> user_id -> key
+                (master_keys|self_signing_keys|user_signing_keys) -> user_id -> key
         """
         master_keys = {}
         self_signing_keys = {}
@@ -343,7 +357,16 @@ class E2eKeysHandler(object):
         """
         device_keys_query = query_body.get("device_keys", {})
         res = yield self.query_local_devices(device_keys_query)
-        return {"device_keys": res}
+        ret = {"device_keys": res}
+
+        # add in the cross-signing keys
+        cross_signing_keys = yield self.get_cross_signing_keys_from_cache(
+            device_keys_query, None
+        )
+
+        ret.update(cross_signing_keys)
+
+        return ret
 
     @trace
     @defer.inlineCallbacks
@@ -688,17 +711,21 @@ class E2eKeysHandler(object):
 
         try:
             # get our self-signing key to verify the signatures
-            _, self_signing_key_id, self_signing_verify_key = yield self._get_e2e_cross_signing_verify_key(
-                user_id, "self_signing"
-            )
+            (
+                _,
+                self_signing_key_id,
+                self_signing_verify_key,
+            ) = yield self._get_e2e_cross_signing_verify_key(user_id, "self_signing")
 
             # get our master key, since we may have received a signature of it.
             # We need to fetch it here so that we know what its key ID is, so
             # that we can check if a signature that was sent is a signature of
             # the master key or of a device
-            master_key, _, master_verify_key = yield self._get_e2e_cross_signing_verify_key(
-                user_id, "master"
-            )
+            (
+                master_key,
+                _,
+                master_verify_key,
+            ) = yield self._get_e2e_cross_signing_verify_key(user_id, "master")
 
             # fetch our stored devices.  This is used to 1. verify
             # signatures on the master key, and 2. to compare with what
@@ -838,9 +865,11 @@ class E2eKeysHandler(object):
 
         try:
             # get our user-signing key to verify the signatures
-            user_signing_key, user_signing_key_id, user_signing_verify_key = yield self._get_e2e_cross_signing_verify_key(
-                user_id, "user_signing"
-            )
+            (
+                user_signing_key,
+                user_signing_key_id,
+                user_signing_verify_key,
+            ) = yield self._get_e2e_cross_signing_verify_key(user_id, "user_signing")
         except SynapseError as e:
             failure = _exception_to_failure(e)
             for user, devicemap in signatures.items():
@@ -859,7 +888,11 @@ class E2eKeysHandler(object):
             try:
                 # get the target user's master key, to make sure it matches
                 # what was sent
-                master_key, master_key_id, _ = yield self._get_e2e_cross_signing_verify_key(
+                (
+                    master_key,
+                    master_key_id,
+                    _,
+                ) = yield self._get_e2e_cross_signing_verify_key(
                     target_user, "master", user_id
                 )
 
@@ -1047,3 +1080,100 @@ class SignatureListItem:
     target_user_id = attr.ib()
     target_device_id = attr.ib()
     signature = attr.ib()
+
+
+class SigningKeyEduUpdater(object):
+    """Handles incoming signing key updates from federation and updates the DB"""
+
+    def __init__(self, hs, e2e_keys_handler):
+        self.store = hs.get_datastore()
+        self.federation = hs.get_federation_client()
+        self.clock = hs.get_clock()
+        self.e2e_keys_handler = e2e_keys_handler
+
+        self._remote_edu_linearizer = Linearizer(name="remote_signing_key")
+
+        # user_id -> list of updates waiting to be handled.
+        self._pending_updates = {}
+
+        # Recently seen stream ids. We don't bother keeping these in the DB,
+        # but they're useful to have them about to reduce the number of spurious
+        # resyncs.
+        self._seen_updates = ExpiringCache(
+            cache_name="signing_key_update_edu",
+            clock=self.clock,
+            max_len=10000,
+            expiry_ms=30 * 60 * 1000,
+            iterable=True,
+        )
+
+    @defer.inlineCallbacks
+    def incoming_signing_key_update(self, origin, edu_content):
+        """Called on incoming signing key update from federation. Responsible for
+        parsing the EDU and adding to pending updates list.
+
+        Args:
+            origin (string): the server that sent the EDU
+            edu_content (dict): the contents of the EDU
+        """
+
+        user_id = edu_content.pop("user_id")
+        master_key = edu_content.pop("master_key", None)
+        self_signing_key = edu_content.pop("self_signing_key", None)
+
+        if get_domain_from_id(user_id) != origin:
+            logger.warning("Got signing key update edu for %r from %r", user_id, origin)
+            return
+
+        room_ids = yield self.store.get_rooms_for_user(user_id)
+        if not room_ids:
+            # We don't share any rooms with this user. Ignore update, as we
+            # probably won't get any further updates.
+            return
+
+        self._pending_updates.setdefault(user_id, []).append(
+            (master_key, self_signing_key)
+        )
+
+        yield self._handle_signing_key_updates(user_id)
+
+    @defer.inlineCallbacks
+    def _handle_signing_key_updates(self, user_id):
+        """Actually handle pending updates.
+
+        Args:
+            user_id (string): the user whose updates we are processing
+        """
+
+        device_handler = self.e2e_keys_handler.device_handler
+
+        with (yield self._remote_edu_linearizer.queue(user_id)):
+            pending_updates = self._pending_updates.pop(user_id, [])
+            if not pending_updates:
+                # This can happen since we batch updates
+                return
+
+            device_ids = []
+
+            logger.info("pending updates: %r", pending_updates)
+
+            for master_key, self_signing_key in pending_updates:
+                if master_key:
+                    yield self.store.set_e2e_cross_signing_key(
+                        user_id, "master", master_key
+                    )
+                    _, verify_key = get_verify_key_from_cross_signing_key(master_key)
+                    # verify_key is a VerifyKey from signedjson, which uses
+                    # .version to denote the portion of the key ID after the
+                    # algorithm and colon, which is the device ID
+                    device_ids.append(verify_key.version)
+                if self_signing_key:
+                    yield self.store.set_e2e_cross_signing_key(
+                        user_id, "self_signing", self_signing_key
+                    )
+                    _, verify_key = get_verify_key_from_cross_signing_key(
+                        self_signing_key
+                    )
+                    device_ids.append(verify_key.version)
+
+            yield device_handler.notify_device_update(user_id, device_ids)