diff options
author | Mark Haines <mark.haines@matrix.org> | 2016-09-13 11:35:35 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2016-09-13 11:35:35 +0100 |
commit | 18ab019a4a4f046e82abb822dcf6556613f88a03 (patch) | |
tree | 61adf5e9a59240a1ccb743d7900f30aff3b891fa /synapse/handlers | |
parent | Merge pull request #1110 from matrix-org/markjh/e2e_timeout (diff) | |
download | synapse-18ab019a4a4f046e82abb822dcf6556613f88a03.tar.xz |
Move the E2E key handling into the e2e handler
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/e2e_keys.py | 105 |
1 files changed, 102 insertions, 3 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 5bfd700931..60a1acc6f4 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -13,9 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json +import ujson as json import logging +from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.api.errors import SynapseError, CodeMessageException @@ -29,7 +30,9 @@ class E2eKeysHandler(object): def __init__(self, hs): self.store = hs.get_datastore() self.federation = hs.get_replication_layer() + self.device_handler = hs.get_device_handler() self.is_mine_id = hs.is_mine_id + self.clock = hs.get_clock() # doesn't really work as part of the generic query API, because the # query request requires an object POST, but we abuse the @@ -103,9 +106,9 @@ class E2eKeysHandler(object): for destination in remote_queries ])) - defer.returnValue((200, { + defer.returnValue({ "device_keys": results, "failures": failures, - })) + }) @defer.inlineCallbacks def query_local_devices(self, query): @@ -159,3 +162,99 @@ class E2eKeysHandler(object): device_keys_query = query_body.get("device_keys", {}) res = yield self.query_local_devices(device_keys_query) defer.returnValue({"device_keys": res}) + + @defer.inlineCallbacks + def claim_one_time_keys(self, query, timeout): + local_query = [] + remote_queries = {} + + for user_id, device_keys in query.get("one_time_keys", {}).items(): + if self.is_mine_id(user_id): + for device_id, algorithm in device_keys.items(): + local_query.append((user_id, device_id, algorithm)) + else: + domain = get_domain_from_id(user_id) + remote_queries.setdefault(domain, {})[user_id] = device_keys + + results = yield self.store.claim_e2e_one_time_keys(local_query) + + json_result = {} + failures = {} + for user_id, device_keys in results.items(): + for device_id, keys in device_keys.items(): + for key_id, json_bytes in keys.items(): + json_result.setdefault(user_id, {})[device_id] = { + key_id: json.loads(json_bytes) + } + + @defer.inlineCallbacks + def claim_client_keys(destination): + device_keys = remote_queries[destination] + try: + remote_result = yield self.federation.claim_client_keys( + destination, + {"one_time_keys": device_keys}, + timeout=timeout + ) + for user_id, keys in remote_result["one_time_keys"].items(): + if user_id in device_keys: + json_result[user_id] = keys + except CodeMessageException as e: + failures[destination] = { + "status": e.code, "message": e.message + } + + yield preserve_context_over_deferred(defer.gatherResults([ + preserve_fn(claim_client_keys)(destination) + for destination in remote_queries + ])) + + defer.returnValue({ + "one_time_keys": json_result, + "failures": failures + }) + + @defer.inlineCallbacks + def upload_keys_for_user(self, user_id, device_id, keys): + time_now = self.clock.time_msec() + + # TODO: Validate the JSON to make sure it has the right keys. + device_keys = keys.get("device_keys", None) + if device_keys: + logger.info( + "Updating device_keys for device %r for user %s at %d", + device_id, user_id, time_now + ) + # TODO: Sign the JSON with the server key + yield self.store.set_e2e_device_keys( + user_id, device_id, time_now, + encode_canonical_json(device_keys) + ) + + one_time_keys = keys.get("one_time_keys", None) + if one_time_keys: + logger.info( + "Adding %d one_time_keys for device %r for user %r at %d", + len(one_time_keys), device_id, user_id, time_now + ) + key_list = [] + for key_id, key_json in one_time_keys.items(): + algorithm, key_id = key_id.split(":") + key_list.append(( + algorithm, key_id, encode_canonical_json(key_json) + )) + + yield self.store.add_e2e_one_time_keys( + user_id, device_id, time_now, key_list + ) + + # the device should have been registered already, but it may have been + # deleted due to a race with a DELETE request. Or we may be using an + # old access_token without an associated device_id. Either way, we + # need to double-check the device is registered to avoid ending up with + # keys without a corresponding device. + self.device_handler.check_device_registered(user_id, device_id) + + result = yield self.store.count_e2e_one_time_keys(user_id, device_id) + + defer.returnValue({"one_time_key_counts": result}) |