diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 0cea445f0d..f1b4424a02 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017, 2018 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -103,14 +104,35 @@ class E2eRoomKeysHandler(object):
rooms
session_id(string): session ID to delete keys for, for None to delete keys
for all sessions
+ Raises:
+ NotFoundError: if the backup version does not exist
Returns:
- A deferred of the deletion transaction
+ A dict containing the count and etag for the backup version
"""
# lock for consistency with uploading
with (yield self._upload_linearizer.queue(user_id)):
+ # make sure the backup version exists
+ try:
+ version_info = yield self.store.get_e2e_room_keys_version_info(
+ user_id, version
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise NotFoundError("Unknown backup version")
+ else:
+ raise
+
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
+ version_etag = version_info["etag"] + 1
+ yield self.store.update_e2e_room_keys_version(
+ user_id, version, None, version_etag
+ )
+
+ count = yield self.store.count_e2e_room_keys(user_id, version)
+ return {"etag": str(version_etag), "count": count}
+
@trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
@@ -138,6 +160,9 @@ class E2eRoomKeysHandler(object):
}
}
+ Returns:
+ A dict containing the count and etag for the backup version
+
Raises:
NotFoundError: if there are no versions defined
RoomKeysVersionError: if the uploaded version is not the current version
@@ -171,59 +196,62 @@ class E2eRoomKeysHandler(object):
else:
raise
- # go through the room_keys.
- # XXX: this should/could be done concurrently, given we're in a lock.
+ # Fetch any existing room keys for the sessions that have been
+ # submitted. Then compare them with the submitted keys. If the
+ # key is new, insert it; if the key should be updated, then update
+ # it; otherwise, drop it.
+ existing_keys = yield self.store.get_e2e_room_keys_multi(
+ user_id, version, room_keys["rooms"]
+ )
+ to_insert = [] # batch the inserts together
+ changed = False # if anything has changed, we need to update the etag
for room_id, room in iteritems(room_keys["rooms"]):
- for session_id, session in iteritems(room["sessions"]):
- yield self._upload_room_key(
- user_id, version, room_id, session_id, session
+ for session_id, room_key in iteritems(room["sessions"]):
+ log_kv(
+ {
+ "message": "Trying to upload room key",
+ "room_id": room_id,
+ "session_id": session_id,
+ "user_id": user_id,
+ }
)
-
- @defer.inlineCallbacks
- def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
- """Upload a given room_key for a given room and session into a given
- version of the backup. Merges the key with any which might already exist.
-
- Args:
- user_id(str): the user whose backup we're setting
- version(str): the version ID of the backup we're updating
- room_id(str): the ID of the room whose keys we're setting
- session_id(str): the session whose room_key we're setting
- room_key(dict): the room_key being set
- """
- log_kv(
- {
- "message": "Trying to upload room key",
- "room_id": room_id,
- "session_id": session_id,
- "user_id": user_id,
- }
- )
- # get the room_key for this particular row
- current_room_key = None
- try:
- current_room_key = yield self.store.get_e2e_room_key(
- user_id, version, room_id, session_id
- )
- except StoreError as e:
- if e.code == 404:
- log_kv(
- {
- "message": "Room key not found.",
- "room_id": room_id,
- "user_id": user_id,
- }
+ current_room_key = existing_keys.get(room_id, {}).get(session_id)
+ if current_room_key:
+ if self._should_replace_room_key(current_room_key, room_key):
+ log_kv({"message": "Replacing room key."})
+ # updates are done one at a time in the DB, so send
+ # updates right away rather than batching them up,
+ # like we do with the inserts
+ yield self.store.update_e2e_room_key(
+ user_id, version, room_id, session_id, room_key
+ )
+ changed = True
+ else:
+ log_kv({"message": "Not replacing room_key."})
+ else:
+ log_kv(
+ {
+ "message": "Room key not found.",
+ "room_id": room_id,
+ "user_id": user_id,
+ }
+ )
+ log_kv({"message": "Replacing room key."})
+ to_insert.append((room_id, session_id, room_key))
+ changed = True
+
+ if len(to_insert):
+ yield self.store.add_e2e_room_keys(user_id, version, to_insert)
+
+ version_etag = version_info["etag"]
+ if changed:
+ version_etag = version_etag + 1
+ yield self.store.update_e2e_room_keys_version(
+ user_id, version, None, version_etag
)
- else:
- raise
- if self._should_replace_room_key(current_room_key, room_key):
- log_kv({"message": "Replacing room key."})
- yield self.store.set_e2e_room_key(
- user_id, version, room_id, session_id, room_key
- )
- else:
- log_kv({"message": "Not replacing room_key."})
+ count = yield self.store.count_e2e_room_keys(user_id, version)
+ return {"etag": str(version_etag), "count": count}
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
@@ -314,6 +342,8 @@ class E2eRoomKeysHandler(object):
raise NotFoundError("Unknown backup version")
else:
raise
+
+ res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
return res
@trace
|