summary refs log tree commit diff
path: root/synapse/storage/data_stores
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/data_stores')
-rw-r--r--synapse/storage/data_stores/main/e2e_room_keys.py226
-rw-r--r--synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql17
2 files changed, 177 insertions, 66 deletions
diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py
index 1cbbae5b63..113224fd7c 100644
--- a/synapse/storage/data_stores/main/e2e_room_keys.py
+++ b/synapse/storage/data_stores/main/e2e_room_keys.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 New Vector Ltd
+# Copyright 2019 Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -24,49 +25,8 @@ from synapse.storage._base import SQLBaseStore
 
 class EndToEndRoomKeyStore(SQLBaseStore):
     @defer.inlineCallbacks
-    def get_e2e_room_key(self, user_id, version, room_id, session_id):
-        """Get the encrypted E2E room key for a given session from a given
-        backup version of room_keys.  We only store the 'best' room key for a given
-        session at a given time, as determined by the handler.
-
-        Args:
-            user_id(str): the user whose backup we're querying
-            version(str): the version ID of the backup for the set of keys we're querying
-            room_id(str): the ID of the room whose keys we're querying.
-                This is a bit redundant as it's implied by the session_id, but
-                we include for consistency with the rest of the API.
-            session_id(str): the session whose room_key we're querying.
-
-        Returns:
-            A deferred dict giving the session_data and message metadata for
-            this room key.
-        """
-
-        row = yield self._simple_select_one(
-            table="e2e_room_keys",
-            keyvalues={
-                "user_id": user_id,
-                "version": version,
-                "room_id": room_id,
-                "session_id": session_id,
-            },
-            retcols=(
-                "first_message_index",
-                "forwarded_count",
-                "is_verified",
-                "session_data",
-            ),
-            desc="get_e2e_room_key",
-        )
-
-        row["session_data"] = json.loads(row["session_data"])
-
-        return row
-
-    @defer.inlineCallbacks
-    def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
-        """Replaces or inserts the encrypted E2E room key for a given session in
-        a given backup
+    def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
+        """Replaces the encrypted E2E room key for a given session in a given backup
 
         Args:
             user_id(str): the user whose backup we're setting
@@ -78,7 +38,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             StoreError
         """
 
-        yield self._simple_upsert(
+        yield self._simple_update_one(
             table="e2e_room_keys",
             keyvalues={
                 "user_id": user_id,
@@ -86,21 +46,51 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 "room_id": room_id,
                 "session_id": session_id,
             },
-            values={
+            updatevalues={
                 "first_message_index": room_key["first_message_index"],
                 "forwarded_count": room_key["forwarded_count"],
                 "is_verified": room_key["is_verified"],
                 "session_data": json.dumps(room_key["session_data"]),
             },
-            lock=False,
+            desc="update_e2e_room_key",
         )
-        log_kv(
-            {
-                "message": "Set room key",
-                "room_id": room_id,
-                "session_id": session_id,
-                "room_key": room_key,
-            }
+
+    @defer.inlineCallbacks
+    def add_e2e_room_keys(self, user_id, version, room_keys):
+        """Bulk add room keys to a given backup.
+
+        Args:
+            user_id (str): the user whose backup we're adding to
+            version (str): the version ID of the backup for the set of keys we're adding to
+            room_keys (iterable[(str, str, dict)]): the keys to add, in the form
+                (roomID, sessionID, keyData)
+        """
+
+        values = []
+        for (room_id, session_id, room_key) in room_keys:
+            values.append(
+                {
+                    "user_id": user_id,
+                    "version": version,
+                    "room_id": room_id,
+                    "session_id": session_id,
+                    "first_message_index": room_key["first_message_index"],
+                    "forwarded_count": room_key["forwarded_count"],
+                    "is_verified": room_key["is_verified"],
+                    "session_data": json.dumps(room_key["session_data"]),
+                }
+            )
+            log_kv(
+                {
+                    "message": "Set room key",
+                    "room_id": room_id,
+                    "session_id": session_id,
+                    "room_key": room_key,
+                }
+            )
+
+        yield self._simple_insert_many(
+            table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
         )
 
     @trace
@@ -110,11 +100,11 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         room, or a given session.
 
         Args:
-            user_id(str): the user whose backup we're querying
-            version(str): the version ID of the backup for the set of keys we're querying
-            room_id(str): Optional. the ID of the room whose keys we're querying, if any.
+            user_id (str): the user whose backup we're querying
+            version (str): the version ID of the backup for the set of keys we're querying
+            room_id (str): Optional. the ID of the room whose keys we're querying, if any.
                 If not specified, we return the keys for all the rooms in the backup.
-            session_id(str): Optional. the session whose room_key we're querying, if any.
+            session_id (str): Optional. the session whose room_key we're querying, if any.
                 If specified, we also require the room_id to be specified.
                 If not specified, we return all the keys in this version of
                 the backup (or for the specified room)
@@ -162,6 +152,95 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
         return sessions
 
+    def get_e2e_room_keys_multi(self, user_id, version, room_keys):
+        """Get multiple room keys at a time.  The difference between this function and
+        get_e2e_room_keys is that this function can be used to retrieve
+        multiple specific keys at a time, whereas get_e2e_room_keys is used for
+        getting all the keys in a backup version, all the keys for a room, or a
+        specific key.
+
+        Args:
+            user_id (str): the user whose backup we're querying
+            version (str): the version ID of the backup we're querying about
+            room_keys (dict[str, dict[str, iterable[str]]]): a map from
+                room ID -> {"session": [session ids]} indicating the session IDs
+                that we want to query
+
+        Returns:
+           Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
+        """
+
+        return self.runInteraction(
+            "get_e2e_room_keys_multi",
+            self._get_e2e_room_keys_multi_txn,
+            user_id,
+            version,
+            room_keys,
+        )
+
+    @staticmethod
+    def _get_e2e_room_keys_multi_txn(txn, user_id, version, room_keys):
+        if not room_keys:
+            return {}
+
+        where_clauses = []
+        params = [user_id, version]
+        for room_id, room in room_keys.items():
+            sessions = list(room["sessions"])
+            if not sessions:
+                continue
+            params.append(room_id)
+            params.extend(sessions)
+            where_clauses.append(
+                "(room_id = ? AND session_id IN (%s))"
+                % (",".join(["?" for _ in sessions]),)
+            )
+
+        # check if we're actually querying something
+        if not where_clauses:
+            return {}
+
+        sql = """
+        SELECT room_id, session_id, first_message_index, forwarded_count,
+               is_verified, session_data
+        FROM e2e_room_keys
+        WHERE user_id = ? AND version = ? AND (%s)
+        """ % (
+            " OR ".join(where_clauses)
+        )
+
+        txn.execute(sql, params)
+
+        ret = {}
+
+        for row in txn:
+            room_id = row[0]
+            session_id = row[1]
+            ret.setdefault(room_id, {})
+            ret[room_id][session_id] = {
+                "first_message_index": row[2],
+                "forwarded_count": row[3],
+                "is_verified": row[4],
+                "session_data": json.loads(row[5]),
+            }
+
+        return ret
+
+    def count_e2e_room_keys(self, user_id, version):
+        """Get the number of keys in a backup version.
+
+        Args:
+            user_id (str): the user whose backup we're querying
+            version (str): the version ID of the backup we're querying about
+        """
+
+        return self._simple_select_one_onecol(
+            table="e2e_room_keys",
+            keyvalues={"user_id": user_id, "version": version},
+            retcol="COUNT(*)",
+            desc="count_e2e_room_keys",
+        )
+
     @trace
     @defer.inlineCallbacks
     def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
@@ -219,6 +298,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 version(str)
                 algorithm(str)
                 auth_data(object): opaque dict supplied by the client
+                etag(int): tag of the keys in the backup
         """
 
         def _get_e2e_room_keys_version_info_txn(txn):
@@ -236,10 +316,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 txn,
                 table="e2e_room_keys_versions",
                 keyvalues={"user_id": user_id, "version": this_version, "deleted": 0},
-                retcols=("version", "algorithm", "auth_data"),
+                retcols=("version", "algorithm", "auth_data", "etag"),
             )
             result["auth_data"] = json.loads(result["auth_data"])
             result["version"] = str(result["version"])
+            if result["etag"] is None:
+                result["etag"] = 0
             return result
 
         return self.runInteraction(
@@ -288,21 +370,33 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         )
 
     @trace
-    def update_e2e_room_keys_version(self, user_id, version, info):
+    def update_e2e_room_keys_version(
+        self, user_id, version, info=None, version_etag=None
+    ):
         """Update a given backup version
 
         Args:
             user_id(str): the user whose backup version we're updating
             version(str): the version ID of the backup version we're updating
-            info(dict): the new backup version info to store
+            info (dict): the new backup version info to store.  If None, then
+                the backup version info is not updated
+            version_etag (Optional[int]): etag of the keys in the backup.  If
+                None, then the etag is not updated
         """
+        updatevalues = {}
 
-        return self._simple_update(
-            table="e2e_room_keys_versions",
-            keyvalues={"user_id": user_id, "version": version},
-            updatevalues={"auth_data": json.dumps(info["auth_data"])},
-            desc="update_e2e_room_keys_version",
-        )
+        if info is not None and "auth_data" in info:
+            updatevalues["auth_data"] = json.dumps(info["auth_data"])
+        if version_etag is not None:
+            updatevalues["etag"] = version_etag
+
+        if updatevalues:
+            return self._simple_update(
+                table="e2e_room_keys_versions",
+                keyvalues={"user_id": user_id, "version": version},
+                updatevalues=updatevalues,
+                desc="update_e2e_room_keys_version",
+            )
 
     @trace
     def delete_e2e_room_keys_version(self, user_id, version=None):
diff --git a/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql
new file mode 100644
index 0000000000..7d70dd071e
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/room_key_etag.sql
@@ -0,0 +1,17 @@
+/* Copyright 2019 Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- store the current etag of backup version
+ALTER TABLE e2e_room_keys_versions ADD COLUMN etag BIGINT;