diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 0cea445f0d..f1b4424a02 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017, 2018 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -103,14 +104,35 @@ class E2eRoomKeysHandler(object):
rooms
session_id(string): session ID to delete keys for, for None to delete keys
for all sessions
+ Raises:
+ NotFoundError: if the backup version does not exist
Returns:
- A deferred of the deletion transaction
+ A dict containing the count and etag for the backup version
"""
# lock for consistency with uploading
with (yield self._upload_linearizer.queue(user_id)):
+ # make sure the backup version exists
+ try:
+ version_info = yield self.store.get_e2e_room_keys_version_info(
+ user_id, version
+ )
+ except StoreError as e:
+ if e.code == 404:
+ raise NotFoundError("Unknown backup version")
+ else:
+ raise
+
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
+ version_etag = version_info["etag"] + 1
+ yield self.store.update_e2e_room_keys_version(
+ user_id, version, None, version_etag
+ )
+
+ count = yield self.store.count_e2e_room_keys(user_id, version)
+ return {"etag": str(version_etag), "count": count}
+
@trace
@defer.inlineCallbacks
def upload_room_keys(self, user_id, version, room_keys):
@@ -138,6 +160,9 @@ class E2eRoomKeysHandler(object):
}
}
+ Returns:
+ A dict containing the count and etag for the backup version
+
Raises:
NotFoundError: if there are no versions defined
RoomKeysVersionError: if the uploaded version is not the current version
@@ -171,59 +196,62 @@ class E2eRoomKeysHandler(object):
else:
raise
- # go through the room_keys.
- # XXX: this should/could be done concurrently, given we're in a lock.
+ # Fetch any existing room keys for the sessions that have been
+ # submitted. Then compare them with the submitted keys. If the
+ # key is new, insert it; if the key should be updated, then update
+ # it; otherwise, drop it.
+ existing_keys = yield self.store.get_e2e_room_keys_multi(
+ user_id, version, room_keys["rooms"]
+ )
+ to_insert = [] # batch the inserts together
+ changed = False # if anything has changed, we need to update the etag
for room_id, room in iteritems(room_keys["rooms"]):
- for session_id, session in iteritems(room["sessions"]):
- yield self._upload_room_key(
- user_id, version, room_id, session_id, session
+ for session_id, room_key in iteritems(room["sessions"]):
+ log_kv(
+ {
+ "message": "Trying to upload room key",
+ "room_id": room_id,
+ "session_id": session_id,
+ "user_id": user_id,
+ }
)
-
- @defer.inlineCallbacks
- def _upload_room_key(self, user_id, version, room_id, session_id, room_key):
- """Upload a given room_key for a given room and session into a given
- version of the backup. Merges the key with any which might already exist.
-
- Args:
- user_id(str): the user whose backup we're setting
- version(str): the version ID of the backup we're updating
- room_id(str): the ID of the room whose keys we're setting
- session_id(str): the session whose room_key we're setting
- room_key(dict): the room_key being set
- """
- log_kv(
- {
- "message": "Trying to upload room key",
- "room_id": room_id,
- "session_id": session_id,
- "user_id": user_id,
- }
- )
- # get the room_key for this particular row
- current_room_key = None
- try:
- current_room_key = yield self.store.get_e2e_room_key(
- user_id, version, room_id, session_id
- )
- except StoreError as e:
- if e.code == 404:
- log_kv(
- {
- "message": "Room key not found.",
- "room_id": room_id,
- "user_id": user_id,
- }
+ current_room_key = existing_keys.get(room_id, {}).get(session_id)
+ if current_room_key:
+ if self._should_replace_room_key(current_room_key, room_key):
+ log_kv({"message": "Replacing room key."})
+ # updates are done one at a time in the DB, so send
+ # updates right away rather than batching them up,
+ # like we do with the inserts
+ yield self.store.update_e2e_room_key(
+ user_id, version, room_id, session_id, room_key
+ )
+ changed = True
+ else:
+ log_kv({"message": "Not replacing room_key."})
+ else:
+ log_kv(
+ {
+ "message": "Room key not found.",
+ "room_id": room_id,
+ "user_id": user_id,
+ }
+ )
+ log_kv({"message": "Replacing room key."})
+ to_insert.append((room_id, session_id, room_key))
+ changed = True
+
+ if len(to_insert):
+ yield self.store.add_e2e_room_keys(user_id, version, to_insert)
+
+ version_etag = version_info["etag"]
+ if changed:
+ version_etag = version_etag + 1
+ yield self.store.update_e2e_room_keys_version(
+ user_id, version, None, version_etag
)
- else:
- raise
- if self._should_replace_room_key(current_room_key, room_key):
- log_kv({"message": "Replacing room key."})
- yield self.store.set_e2e_room_key(
- user_id, version, room_id, session_id, room_key
- )
- else:
- log_kv({"message": "Not replacing room_key."})
+ count = yield self.store.count_e2e_room_keys(user_id, version)
+ return {"etag": str(version_etag), "count": count}
@staticmethod
def _should_replace_room_key(current_room_key, room_key):
@@ -314,6 +342,8 @@ class E2eRoomKeysHandler(object):
raise NotFoundError("Unknown backup version")
else:
raise
+
+ res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
return res
@trace
diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py
index d596786430..d83ac8e3c5 100644
--- a/synapse/rest/client/v2_alpha/room_keys.py
+++ b/synapse/rest/client/v2_alpha/room_keys.py
@@ -134,8 +134,8 @@ class RoomKeysServlet(RestServlet):
if room_id:
body = {"rooms": {room_id: body}}
- yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body)
- return 200, {}
+ ret = yield self.e2e_room_keys_handler.upload_room_keys(user_id, version, body)
+ return 200, ret
@defer.inlineCallbacks
def on_GET(self, request, room_id, session_id):
@@ -239,10 +239,10 @@ class RoomKeysServlet(RestServlet):
user_id = requester.user.to_string()
version = parse_string(request, "version")
- yield self.e2e_room_keys_handler.delete_room_keys(
+ ret = yield self.e2e_room_keys_handler.delete_room_keys(
user_id, version, room_id, session_id
)
- return 200, {}
+ return 200, ret
class RoomKeysNewVersionServlet(RestServlet):
diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py
index 1cbbae5b63..113224fd7c 100644
--- a/synapse/storage/data_stores/main/e2e_room_keys.py
+++ b/synapse/storage/data_stores/main/e2e_room_keys.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -24,49 +25,8 @@ from synapse.storage._base import SQLBaseStore
class EndToEndRoomKeyStore(SQLBaseStore):
@defer.inlineCallbacks
- def get_e2e_room_key(self, user_id, version, room_id, session_id):
- """Get the encrypted E2E room key for a given session from a given
- backup version of room_keys. We only store the 'best' room key for a given
- session at a given time, as determined by the handler.
-
- Args:
- user_id(str): the user whose backup we're querying
- version(str): the version ID of the backup for the set of keys we're querying
- room_id(str): the ID of the room whose keys we're querying.
- This is a bit redundant as it's implied by the session_id, but
- we include for consistency with the rest of the API.
- session_id(str): the session whose room_key we're querying.
-
- Returns:
- A deferred dict giving the session_data and message metadata for
- this room key.
- """
-
- row = yield self._simple_select_one(
- table="e2e_room_keys",
- keyvalues={
- "user_id": user_id,
- "version": version,
- "room_id": room_id,
- "session_id": session_id,
- },
- retcols=(
- "first_message_index",
- "forwarded_count",
- "is_verified",
- "session_data",
- ),
- desc="get_e2e_room_key",
- )
-
- row["session_data"] = json.loads(row["session_data"])
-
- return row
-
- @defer.inlineCallbacks
- def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
- """Replaces or inserts the encrypted E2E room key for a given session in
- a given backup
+ def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
+ """Replaces the encrypted E2E room key for a given session in a given backup
Args:
user_id(str): the user whose backup we're setting
@@ -78,7 +38,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
StoreError
"""
- yield self._simple_upsert(
+ yield self._simple_update_one(
table="e2e_room_keys",
keyvalues={
"user_id": user_id,
@@ -86,21 +46,51 @@ class EndToEndRoomKeyStore(SQLBaseStore):
"room_id": room_id,
"session_id": session_id,
},
- values={
+ updatevalues={
"first_message_index": room_key["first_message_index"],
"forwarded_count": room_key["forwarded_count"],
"is_verified": room_key["is_verified"],
"session_data": json.dumps(room_key["session_data"]),
},
- lock=False,
+ desc="update_e2e_room_key",
)
- log_kv(
- {
- "message": "Set room key",
- "room_id": room_id,
- "session_id": session_id,
- "room_key": room_key,
- }
+
+ @defer.inlineCallbacks
+ def add_e2e_room_keys(self, user_id, version, room_keys):
+ """Bulk add room keys to a given backup.
+
+ Args:
+ user_id (str): the user whose backup we're adding to
+ version (str): the version ID of the backup for the set of keys we're adding to
+ room_keys (iterable[(str, str, dict)]): the keys to add, in the form
+ (roomID, sessionID, keyData)
+ """
+
+ values = []
+ for (room_id, session_id, room_key) in room_keys:
+ values.append(
+ {
+ "user_id": user_id,
+ "version": version,
+ "room_id": room_id,
+ "session_id": session_id,
+ "first_message_index": room_key["first_message_index"],
+ "forwarded_count": room_key["forwarded_count"],
+ "is_verified": room_key["is_verified"],
+ "session_data": json.dumps(room_key["session_data"]),
+ }
+ )
+ log_kv(
+ {
+ "message": "Set room key",
+ "room_id": room_id,
+ "session_id": session_id,
+ "room_key": room_key,
+ }
+ )
+
+ yield self._simple_insert_many(
+ table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
)
@trace
@@ -110,11 +100,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
room, or a given session.
Args:
- user_id(str): the user whose backup we're querying
- version(str): the version ID of the backup for the set of keys we're querying
- room_id(str): Optional. the ID of the room whose keys we're querying, if any.
+ user_id (str): the user whose backup we're querying
+ version (str): the version ID of the backup for the set of keys we're querying
+ room_id (str): Optional. the ID of the room whose keys we're querying, if any.
If not specified, we return the keys for all the rooms in the backup.
- session_id(str): Optional. the session whose room_key we're querying, if any.
+ session_id (str): Optional. the session whose room_key we're querying, if any.
If specified, we also require the room_id to be specified.
If not specified, we return all the keys in this version of
the backup (or for the specified room)
@@ -162,6 +152,95 @@ class EndToEndRoomKeyStore(SQLBaseStore):
return sessions
+ def get_e2e_room_keys_multi(self, user_id, version, room_keys):
+ """Get multiple room keys at a time. The difference between this function and
+ get_e2e_room_keys is that this function can be used to retrieve
+ multiple specific keys at a time, whereas get_e2e_room_keys is used for
+ getting all the keys in a backup version, all the keys for a room, or a
+ specific key.
+
+ Args:
+ user_id (str): the user whose backup we're querying
+ version (str): the version ID of the backup we're querying about
+ room_keys (dict[str, dict[str, iterable[str]]]): a map from
+ room ID -> {"session": [session ids]} indicating the session IDs
+ that we want to query
+
+ Returns:
+ Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
+ """
+
+ return self.runInteraction(
+ "get_e2e_room_keys_multi",
+ self._get_e2e_room_keys_multi_txn,
+ user_id,
+ version,
+ room_keys,
+ )
+
+ @staticmethod
+ def _get_e2e_room_keys_multi_txn(txn, user_id, version, room_keys):
+ if not room_keys:
+ return {}
+
+ where_clauses = []
+ params = [user_id, version]
+ for room_id, room in room_keys.items():
+ sessions = list(room["sessions"])
+ if not sessions:
+ continue
+ params.append(room_id)
+ params.extend(sessions)
+ where_clauses.append(
+ "(room_id = ? AND session_id IN (%s))"
+ % (",".join(["?" for _ in sessions]),)
+ )
+
+ # check if we're actually querying something
+ if not where_clauses:
+ return {}
+
+ sql = """
+ SELECT room_id, session_id, first_message_index, forwarded_count,
+ is_verified, session_data
+ FROM e2e_room_keys
+ WHERE user_id = ? AND version = ? AND (%s)
+ """ % (
+ " OR ".join(where_clauses)
+ )
+
+ txn.execute(sql, params)
+
+ ret = {}
+
+ for row in txn:
+ room_id = row[0]
+ session_id = row[1]
+ ret.setdefault(room_id, {})
+ ret[room_id][session_id] = {
+ "first_message_index": row[2],
+ "forwarded_count": row[3],
+ "is_verified": row[4],
+ "session_data": json.loads(row[5]),
+ }
+
+ return ret
+
+ def count_e2e_room_keys(self, user_id, version):
+ """Get the number of keys in a backup version.
+
+ Args:
+ user_id (str): the user whose backup we're querying
+ version (str): the version ID of the backup we're querying about
+ """
+
+ return self._simple_select_one_onecol(
+ table="e2e_room_keys",
+ keyvalues={"user_id": user_id, "version": version},
+ retcol="COUNT(*)",
+ desc="count_e2e_room_keys",
+ )
+
@trace
@defer.inlineCallbacks
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
@@ -219,6 +298,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
version(str)
algorithm(str)
auth_data(object): opaque dict supplied by the client
+ etag(int): tag of the keys in the backup
"""
def _get_e2e_room_keys_version_info_txn(txn):
@@ -236,10 +316,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
txn,
table="e2e_room_keys_versions",
keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
- retcols=("version", "algorithm", "auth_data"),
+ retcols=("version", "algorithm", "auth_data", "etag"),
)
result["auth_data"] = json.loads(result["auth_data"])
result["version"] = str(result["version"])
+ if result["etag"] is None:
+ result["etag"] = 0
return result
return self.runInteraction(
@@ -288,21 +370,33 @@ class EndToEndRoomKeyStore(SQLBaseStore):
)
@trace
- def update_e2e_room_keys_version(self, user_id, version, info):
+ def update_e2e_room_keys_version(
+ self, user_id, version, info=None, version_etag=None
+ ):
"""Update a given backup version
Args:
user_id(str): the user whose backup version we're updating
version(str): the version ID of the backup version we're updating
- info(dict): the new backup version info to store
+ info (dict): the new backup version info to store. If None, then
+ the backup version info is not updated
+ version_etag (Optional[int]): etag of the keys in the backup. If
+ None, then the etag is not updated
"""
+ updatevalues = {}
- return self._simple_update(
- table="e2e_room_keys_versions",
- keyvalues={"user_id": user_id, "version": version},
- updatevalues={"auth_data": json.dumps(info["auth_data"])},
- desc="update_e2e_room_keys_version",
- )
+ if info is not None and "auth_data" in info:
+ updatevalues["auth_data"] = json.dumps(info["auth_data"])
+ if version_etag is not None:
+ updatevalues["etag"] = version_etag
+
+ if updatevalues:
+ return self._simple_update(
+ table="e2e_room_keys_versions",
+ keyvalues={"user_id": user_id, "version": version},
+ updatevalues=updatevalues,
+ desc="update_e2e_room_keys_version",
+ )
@trace
def delete_e2e_room_keys_version(self, user_id, version=None):
diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql
new file mode 100644
index 0000000000..7d70dd071e
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- store the current etag of backup version
+ALTER TABLE e2e_room_keys_versions ADD COLUMN etag BIGINT;
|