diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/errors.py | 15 | ||||
-rw-r--r-- | synapse/app/frontend_proxy.py | 4 | ||||
-rw-r--r-- | synapse/handlers/e2e_room_keys.py | 289 | ||||
-rw-r--r-- | synapse/rest/__init__.py | 2 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/room_keys.py | 372 | ||||
-rw-r--r-- | synapse/server.py | 5 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/e2e_room_keys.py | 320 | ||||
-rw-r--r-- | synapse/storage/schema/delta/51/e2e_room_keys.sql | 39 | ||||
-rw-r--r-- | synapse/visibility.py | 2 |
10 files changed, 1047 insertions, 3 deletions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 2e7f98404d..48b903374d 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -59,6 +59,7 @@ class Codes(object): RESOURCE_LIMIT_EXCEEDED = "M_RESOURCE_LIMIT_EXCEEDED" UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION" INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION" + WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION" class CodeMessageException(RuntimeError): @@ -312,6 +313,20 @@ class LimitExceededError(SynapseError): ) +class RoomKeysVersionError(SynapseError): + """A client has tried to upload to a non-current version of the room_keys store + """ + def __init__(self, current_version): + """ + Args: + current_version (str): the current version of the store they should have used + """ + super(RoomKeysVersionError, self).__init__( + 403, "Wrong room_keys version", Codes.WRONG_ROOM_KEYS_VERSION + ) + self.current_version = current_version + + class IncompatibleRoomVersionError(SynapseError): """A server is trying to join a room whose version it does not support.""" diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index fc4b25de1c..f5c61dec5b 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -68,7 +68,7 @@ class PresenceStatusStubServlet(ClientV1RestServlet): "Authorization": auth_headers, } result = yield self.http_client.get_json( - self.main_uri + request.uri, + self.main_uri + request.uri.decode('ascii'), headers=headers, ) defer.returnValue((200, result)) @@ -125,7 +125,7 @@ class KeyUploadServlet(RestServlet): "Authorization": auth_headers, } result = yield self.http_client.post_json_get_json( - self.main_uri + request.uri, + self.main_uri + request.uri.decode('ascii'), body, headers=headers, ) diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py new file mode 100644 index 0000000000..5edb3cfe04 --- /dev/null +++ b/synapse/handlers/e2e_room_keys.py @@ -0,0 +1,289 @@ +# -*- coding: utf-8 -*- +# Copyright 2017, 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from six import iteritems + +from twisted.internet import defer + +from synapse.api.errors import RoomKeysVersionError, StoreError, SynapseError +from synapse.util.async_helpers import Linearizer + +logger = logging.getLogger(__name__) + + +class E2eRoomKeysHandler(object): + """ + Implements an optional realtime backup mechanism for encrypted E2E megolm room keys. + This gives a way for users to store and recover their megolm keys if they lose all + their clients. It should also extend easily to future room key mechanisms. + The actual payload of the encrypted keys is completely opaque to the handler. + """ + + def __init__(self, hs): + self.store = hs.get_datastore() + + # Used to lock whenever a client is uploading key data. This prevents collisions + # between clients trying to upload the details of a new session, given all + # clients belonging to a user will receive and try to upload a new session at + # roughly the same time. Also used to lock out uploads when the key is being + # changed. + self._upload_linearizer = Linearizer("upload_room_keys_lock") + + @defer.inlineCallbacks + def get_room_keys(self, user_id, version, room_id=None, session_id=None): + """Bulk get the E2E room keys for a given backup, optionally filtered to a given + room, or a given session. + See EndToEndRoomKeyStore.get_e2e_room_keys for full details. + + Args: + user_id(str): the user whose keys we're getting + version(str): the version ID of the backup we're getting keys from + room_id(string): room ID to get keys for, for None to get keys for all rooms + session_id(string): session ID to get keys for, for None to get keys for all + sessions + Returns: + A deferred list of dicts giving the session_data and message metadata for + these room keys. + """ + + # we deliberately take the lock to get keys so that changing the version + # works atomically + with (yield self._upload_linearizer.queue(user_id)): + results = yield self.store.get_e2e_room_keys( + user_id, version, room_id, session_id + ) + + if results['rooms'] == {}: + raise SynapseError(404, "No room_keys found") + + defer.returnValue(results) + + @defer.inlineCallbacks + def delete_room_keys(self, user_id, version, room_id=None, session_id=None): + """Bulk delete the E2E room keys for a given backup, optionally filtered to a given + room or a given session. + See EndToEndRoomKeyStore.delete_e2e_room_keys for full details. + + Args: + user_id(str): the user whose backup we're deleting + version(str): the version ID of the backup we're deleting + room_id(string): room ID to delete keys for, for None to delete keys for all + rooms + session_id(string): session ID to delete keys for, for None to delete keys + for all sessions + Returns: + A deferred of the deletion transaction + """ + + # lock for consistency with uploading + with (yield self._upload_linearizer.queue(user_id)): + yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id) + + @defer.inlineCallbacks + def upload_room_keys(self, user_id, version, room_keys): + """Bulk upload a list of room keys into a given backup version, asserting + that the given version is the current backup version. room_keys are merged + into the current backup as described in RoomKeysServlet.on_PUT(). + + Args: + user_id(str): the user whose backup we're setting + version(str): the version ID of the backup we're updating + room_keys(dict): a nested dict describing the room_keys we're setting: + + { + "rooms": { + "!abc:matrix.org": { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + } + } + + Raises: + SynapseError: with code 404 if there are no versions defined + RoomKeysVersionError: if the uploaded version is not the current version + """ + + # TODO: Validate the JSON to make sure it has the right keys. + + # XXX: perhaps we should use a finer grained lock here? + with (yield self._upload_linearizer.queue(user_id)): + + # Check that the version we're trying to upload is the current version + try: + version_info = yield self.store.get_e2e_room_keys_version_info(user_id) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Version '%s' not found" % (version,)) + else: + raise + + if version_info['version'] != version: + # Check that the version we're trying to upload actually exists + try: + version_info = yield self.store.get_e2e_room_keys_version_info( + user_id, version, + ) + # if we get this far, the version must exist + raise RoomKeysVersionError(current_version=version_info['version']) + except StoreError as e: + if e.code == 404: + raise SynapseError(404, "Version '%s' not found" % (version,)) + else: + raise + + # go through the room_keys. + # XXX: this should/could be done concurrently, given we're in a lock. + for room_id, room in iteritems(room_keys['rooms']): + for session_id, session in iteritems(room['sessions']): + yield self._upload_room_key( + user_id, version, room_id, session_id, session + ) + + @defer.inlineCallbacks + def _upload_room_key(self, user_id, version, room_id, session_id, room_key): + """Upload a given room_key for a given room and session into a given + version of the backup. Merges the key with any which might already exist. + + Args: + user_id(str): the user whose backup we're setting + version(str): the version ID of the backup we're updating + room_id(str): the ID of the room whose keys we're setting + session_id(str): the session whose room_key we're setting + room_key(dict): the room_key being set + """ + + # get the room_key for this particular row + current_room_key = None + try: + current_room_key = yield self.store.get_e2e_room_key( + user_id, version, room_id, session_id + ) + except StoreError as e: + if e.code == 404: + pass + else: + raise + + if self._should_replace_room_key(current_room_key, room_key): + yield self.store.set_e2e_room_key( + user_id, version, room_id, session_id, room_key + ) + + @staticmethod + def _should_replace_room_key(current_room_key, room_key): + """ + Determine whether to replace a given current_room_key (if any) + with a newly uploaded room_key backup + + Args: + current_room_key (dict): Optional, the current room_key dict if any + room_key (dict): The new room_key dict which may or may not be fit to + replace the current_room_key + + Returns: + True if current_room_key should be replaced by room_key in the backup + """ + + if current_room_key: + # spelt out with if/elifs rather than nested boolean expressions + # purely for legibility. + + if room_key['is_verified'] and not current_room_key['is_verified']: + return True + elif ( + room_key['first_message_index'] < + current_room_key['first_message_index'] + ): + return True + elif room_key['forwarded_count'] < current_room_key['forwarded_count']: + return True + else: + return False + return True + + @defer.inlineCallbacks + def create_version(self, user_id, version_info): + """Create a new backup version. This automatically becomes the new + backup version for the user's keys; previous backups will no longer be + writeable to. + + Args: + user_id(str): the user whose backup version we're creating + version_info(dict): metadata about the new version being created + + { + "algorithm": "m.megolm_backup.v1", + "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K" + } + + Returns: + A deferred of a string that gives the new version number. + """ + + # TODO: Validate the JSON to make sure it has the right keys. + + # lock everyone out until we've switched version + with (yield self._upload_linearizer.queue(user_id)): + new_version = yield self.store.create_e2e_room_keys_version( + user_id, version_info + ) + defer.returnValue(new_version) + + @defer.inlineCallbacks + def get_version_info(self, user_id, version=None): + """Get the info about a given version of the user's backup + + Args: + user_id(str): the user whose current backup version we're querying + version(str): Optional; if None gives the most recent version + otherwise a historical one. + Raises: + StoreError: code 404 if the requested backup version doesn't exist + Returns: + A deferred of a info dict that gives the info about the new version. + + { + "version": "1234", + "algorithm": "m.megolm_backup.v1", + "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K" + } + """ + + with (yield self._upload_linearizer.queue(user_id)): + res = yield self.store.get_e2e_room_keys_version_info(user_id, version) + defer.returnValue(res) + + @defer.inlineCallbacks + def delete_version(self, user_id, version=None): + """Deletes a given version of the user's e2e_room_keys backup + + Args: + user_id(str): the user whose current backup version we're deleting + version(str): the version id of the backup being deleted + Raises: + StoreError: code 404 if this backup version doesn't exist + """ + + with (yield self._upload_linearizer.queue(user_id)): + yield self.store.delete_e2e_room_keys_version(user_id, version) diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 3418f06fd6..4856822a5d 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -46,6 +46,7 @@ from synapse.rest.client.v2_alpha import ( receipts, register, report_event, + room_keys, sendtodevice, sync, tags, @@ -102,6 +103,7 @@ class ClientRestResource(JsonResource): auth.register_servlets(hs, client_resource) receipts.register_servlets(hs, client_resource) read_marker.register_servlets(hs, client_resource) + room_keys.register_servlets(hs, client_resource) keys.register_servlets(hs, client_resource) tokenrefresh.register_servlets(hs, client_resource) tags.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/v2_alpha/room_keys.py b/synapse/rest/client/v2_alpha/room_keys.py new file mode 100644 index 0000000000..45b5817d8b --- /dev/null +++ b/synapse/rest/client/v2_alpha/room_keys.py @@ -0,0 +1,372 @@ +# -*- coding: utf-8 -*- +# Copyright 2017, 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.api.errors import Codes, SynapseError +from synapse.http.servlet import ( + RestServlet, + parse_json_object_from_request, + parse_string, +) + +from ._base import client_v2_patterns + +logger = logging.getLogger(__name__) + + +class RoomKeysServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/room_keys/keys(/(?P<room_id>[^/]+))?(/(?P<session_id>[^/]+))?$" + ) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(RoomKeysServlet, self).__init__() + self.auth = hs.get_auth() + self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() + + @defer.inlineCallbacks + def on_PUT(self, request, room_id, session_id): + """ + Uploads one or more encrypted E2E room keys for backup purposes. + room_id: the ID of the room the keys are for (optional) + session_id: the ID for the E2E room keys for the room (optional) + version: the version of the user's backup which this data is for. + the version must already have been created via the /room_keys/version API. + + Each session has: + * first_message_index: a numeric index indicating the oldest message + encrypted by this session. + * forwarded_count: how many times the uploading client claims this key + has been shared (forwarded) + * is_verified: whether the client that uploaded the keys claims they + were sent by a device which they've verified + * session_data: base64-encrypted data describing the session. + + Returns 200 OK on success with body {} + Returns 403 Forbidden if the version in question is not the most recently + created version (i.e. if this is an old client trying to write to a stale backup) + Returns 404 Not Found if the version in question doesn't exist + + The API is designed to be otherwise agnostic to the room_key encryption + algorithm being used. Sessions are merged with existing ones in the + backup using the heuristics: + * is_verified sessions always win over unverified sessions + * older first_message_index always win over newer sessions + * lower forwarded_count always wins over higher forwarded_count + + We trust the clients not to lie and corrupt their own backups. + It also means that if your access_token is stolen, the attacker could + delete your backup. + + POST /room_keys/keys/!abc:matrix.org/c0ff33?version=1 HTTP/1.1 + Content-Type: application/json + + { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + + Or... + + POST /room_keys/keys/!abc:matrix.org?version=1 HTTP/1.1 + Content-Type: application/json + + { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + + Or... + + POST /room_keys/keys?version=1 HTTP/1.1 + Content-Type: application/json + + { + "rooms": { + "!abc:matrix.org": { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + } + } + """ + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + body = parse_json_object_from_request(request) + version = parse_string(request, "version") + + if session_id: + body = { + "sessions": { + session_id: body + } + } + + if room_id: + body = { + "rooms": { + room_id: body + } + } + + yield self.e2e_room_keys_handler.upload_room_keys( + user_id, version, body + ) + defer.returnValue((200, {})) + + @defer.inlineCallbacks + def on_GET(self, request, room_id, session_id): + """ + Retrieves one or more encrypted E2E room keys for backup purposes. + Symmetric with the PUT version of the API. + + room_id: the ID of the room to retrieve the keys for (optional) + session_id: the ID for the E2E room keys to retrieve the keys for (optional) + version: the version of the user's backup which this data is for. + the version must already have been created via the /change_secret API. + + Returns as follows: + + GET /room_keys/keys/!abc:matrix.org/c0ff33?version=1 HTTP/1.1 + { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + + Or... + + GET /room_keys/keys/!abc:matrix.org?version=1 HTTP/1.1 + { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + + Or... + + GET /room_keys/keys?version=1 HTTP/1.1 + { + "rooms": { + "!abc:matrix.org": { + "sessions": { + "c0ff33": { + "first_message_index": 1, + "forwarded_count": 1, + "is_verified": false, + "session_data": "SSBBTSBBIEZJU0gK" + } + } + } + } + } + """ + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + version = parse_string(request, "version") + + room_keys = yield self.e2e_room_keys_handler.get_room_keys( + user_id, version, room_id, session_id + ) + + if session_id: + room_keys = room_keys['rooms'][room_id]['sessions'][session_id] + elif room_id: + room_keys = room_keys['rooms'][room_id] + + defer.returnValue((200, room_keys)) + + @defer.inlineCallbacks + def on_DELETE(self, request, room_id, session_id): + """ + Deletes one or more encrypted E2E room keys for a user for backup purposes. + + DELETE /room_keys/keys/!abc:matrix.org/c0ff33?version=1 + HTTP/1.1 200 OK + {} + + room_id: the ID of the room whose keys to delete (optional) + session_id: the ID for the E2E session to delete (optional) + version: the version of the user's backup which this data is for. + the version must already have been created via the /change_secret API. + """ + + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + version = parse_string(request, "version") + + yield self.e2e_room_keys_handler.delete_room_keys( + user_id, version, room_id, session_id + ) + defer.returnValue((200, {})) + + +class RoomKeysNewVersionServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/room_keys/version$" + ) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(RoomKeysNewVersionServlet, self).__init__() + self.auth = hs.get_auth() + self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() + + @defer.inlineCallbacks + def on_POST(self, request): + """ + Create a new backup version for this user's room_keys with the given + info. The version is allocated by the server and returned to the user + in the response. This API is intended to be used whenever the user + changes the encryption key for their backups, ensuring that backups + encrypted with different keys don't collide. + + It takes out an exclusive lock on this user's room_key backups, to ensure + clients only upload to the current backup. + + The algorithm passed in the version info is a reverse-DNS namespaced + identifier to describe the format of the encrypted backupped keys. + + The auth_data is { user_id: "user_id", nonce: <random string> } + encrypted using the algorithm and current encryption key described above. + + POST /room_keys/version + Content-Type: application/json + { + "algorithm": "m.megolm_backup.v1", + "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K" + } + + HTTP/1.1 200 OK + Content-Type: application/json + { + "version": 12345 + } + """ + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + info = parse_json_object_from_request(request) + + new_version = yield self.e2e_room_keys_handler.create_version( + user_id, info + ) + defer.returnValue((200, {"version": new_version})) + + # we deliberately don't have a PUT /version, as these things really should + # be immutable to avoid people footgunning + + +class RoomKeysVersionServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/room_keys/version(/(?P<version>[^/]+))?$" + ) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(RoomKeysVersionServlet, self).__init__() + self.auth = hs.get_auth() + self.e2e_room_keys_handler = hs.get_e2e_room_keys_handler() + + @defer.inlineCallbacks + def on_GET(self, request, version): + """ + Retrieve the version information about a given version of the user's + room_keys backup. If the version part is missing, returns info about the + most current backup version (if any) + + It takes out an exclusive lock on this user's room_key backups, to ensure + clients only upload to the current backup. + + Returns 404 if the given version does not exist. + + GET /room_keys/version/12345 HTTP/1.1 + { + "version": "12345", + "algorithm": "m.megolm_backup.v1", + "auth_data": "dGhpcyBzaG91bGQgYWN0dWFsbHkgYmUgZW5jcnlwdGVkIGpzb24K" + } + """ + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + + try: + info = yield self.e2e_room_keys_handler.get_version_info( + user_id, version + ) + except SynapseError as e: + if e.code == 404: + raise SynapseError(404, "No backup found", Codes.NOT_FOUND) + defer.returnValue((200, info)) + + @defer.inlineCallbacks + def on_DELETE(self, request, version): + """ + Delete the information about a given version of the user's + room_keys backup. If the version part is missing, deletes the most + current backup version (if any). Doesn't delete the actual room data. + + DELETE /room_keys/version/12345 HTTP/1.1 + HTTP/1.1 200 OK + {} + """ + if version is None: + raise SynapseError(400, "No version specified to delete", Codes.NOT_FOUND) + + requester = yield self.auth.get_user_by_req(request, allow_guest=False) + user_id = requester.user.to_string() + + yield self.e2e_room_keys_handler.delete_version( + user_id, version + ) + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + RoomKeysServlet(hs).register(http_server) + RoomKeysVersionServlet(hs).register(http_server) + RoomKeysNewVersionServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index 938a05f9dc..3e9d3d8256 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -51,6 +51,7 @@ from synapse.handlers.deactivate_account import DeactivateAccountHandler from synapse.handlers.device import DeviceHandler from synapse.handlers.devicemessage import DeviceMessageHandler from synapse.handlers.e2e_keys import E2eKeysHandler +from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.groups_local import GroupsLocalHandler from synapse.handlers.initial_sync import InitialSyncHandler @@ -130,6 +131,7 @@ class HomeServer(object): 'auth_handler', 'device_handler', 'e2e_keys_handler', + 'e2e_room_keys_handler', 'event_handler', 'event_stream_handler', 'initial_sync_handler', @@ -299,6 +301,9 @@ class HomeServer(object): def build_e2e_keys_handler(self): return E2eKeysHandler(self) + def build_e2e_room_keys_handler(self): + return E2eRoomKeysHandler(self) + def build_application_service_api(self): return ApplicationServiceApi(self) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 23b4a8d76d..53c685c173 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -30,6 +30,7 @@ from .appservice import ApplicationServiceStore, ApplicationServiceTransactionSt from .client_ips import ClientIpStore from .deviceinbox import DeviceInboxStore from .directory import DirectoryStore +from .e2e_room_keys import EndToEndRoomKeyStore from .end_to_end_keys import EndToEndKeyStore from .engines import PostgresEngine from .event_federation import EventFederationStore @@ -77,6 +78,7 @@ class DataStore(RoomMemberStore, RoomStore, ApplicationServiceTransactionStore, ReceiptsStore, EndToEndKeyStore, + EndToEndRoomKeyStore, SearchStore, TagsStore, AccountDataStore, diff --git a/synapse/storage/e2e_room_keys.py b/synapse/storage/e2e_room_keys.py new file mode 100644 index 0000000000..f25ded2295 --- /dev/null +++ b/synapse/storage/e2e_room_keys.py @@ -0,0 +1,320 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from twisted.internet import defer + +from synapse.api.errors import StoreError + +from ._base import SQLBaseStore + + +class EndToEndRoomKeyStore(SQLBaseStore): + + @defer.inlineCallbacks + def get_e2e_room_key(self, user_id, version, room_id, session_id): + """Get the encrypted E2E room key for a given session from a given + backup version of room_keys. We only store the 'best' room key for a given + session at a given time, as determined by the handler. + + Args: + user_id(str): the user whose backup we're querying + version(str): the version ID of the backup for the set of keys we're querying + room_id(str): the ID of the room whose keys we're querying. + This is a bit redundant as it's implied by the session_id, but + we include for consistency with the rest of the API. + session_id(str): the session whose room_key we're querying. + + Returns: + A deferred dict giving the session_data and message metadata for + this room key. + """ + + row = yield self._simple_select_one( + table="e2e_room_keys", + keyvalues={ + "user_id": user_id, + "version": version, + "room_id": room_id, + "session_id": session_id, + }, + retcols=( + "first_message_index", + "forwarded_count", + "is_verified", + "session_data", + ), + desc="get_e2e_room_key", + ) + + row["session_data"] = json.loads(row["session_data"]) + + defer.returnValue(row) + + @defer.inlineCallbacks + def set_e2e_room_key(self, user_id, version, room_id, session_id, room_key): + """Replaces or inserts the encrypted E2E room key for a given session in + a given backup + + Args: + user_id(str): the user whose backup we're setting + version(str): the version ID of the backup we're updating + room_id(str): the ID of the room whose keys we're setting + session_id(str): the session whose room_key we're setting + room_key(dict): the room_key being set + Raises: + StoreError + """ + + yield self._simple_upsert( + table="e2e_room_keys", + keyvalues={ + "user_id": user_id, + "room_id": room_id, + "session_id": session_id, + }, + values={ + "version": version, + "first_message_index": room_key['first_message_index'], + "forwarded_count": room_key['forwarded_count'], + "is_verified": room_key['is_verified'], + "session_data": json.dumps(room_key['session_data']), + }, + lock=False, + ) + + @defer.inlineCallbacks + def get_e2e_room_keys( + self, user_id, version, room_id=None, session_id=None + ): + """Bulk get the E2E room keys for a given backup, optionally filtered to a given + room, or a given session. + + Args: + user_id(str): the user whose backup we're querying + version(str): the version ID of the backup for the set of keys we're querying + room_id(str): Optional. the ID of the room whose keys we're querying, if any. + If not specified, we return the keys for all the rooms in the backup. + session_id(str): Optional. the session whose room_key we're querying, if any. + If specified, we also require the room_id to be specified. + If not specified, we return all the keys in this version of + the backup (or for the specified room) + + Returns: + A deferred list of dicts giving the session_data and message metadata for + these room keys. + """ + + keyvalues = { + "user_id": user_id, + "version": version, + } + if room_id: + keyvalues['room_id'] = room_id + if session_id: + keyvalues['session_id'] = session_id + + rows = yield self._simple_select_list( + table="e2e_room_keys", + keyvalues=keyvalues, + retcols=( + "user_id", + "room_id", + "session_id", + "first_message_index", + "forwarded_count", + "is_verified", + "session_data", + ), + desc="get_e2e_room_keys", + ) + + sessions = {'rooms': {}} + for row in rows: + room_entry = sessions['rooms'].setdefault(row['room_id'], {"sessions": {}}) + room_entry['sessions'][row['session_id']] = { + "first_message_index": row["first_message_index"], + "forwarded_count": row["forwarded_count"], + "is_verified": row["is_verified"], + "session_data": json.loads(row["session_data"]), + } + + defer.returnValue(sessions) + + @defer.inlineCallbacks + def delete_e2e_room_keys( + self, user_id, version, room_id=None, session_id=None + ): + """Bulk delete the E2E room keys for a given backup, optionally filtered to a given + room or a given session. + + Args: + user_id(str): the user whose backup we're deleting from + version(str): the version ID of the backup for the set of keys we're deleting + room_id(str): Optional. the ID of the room whose keys we're deleting, if any. + If not specified, we delete the keys for all the rooms in the backup. + session_id(str): Optional. the session whose room_key we're querying, if any. + If specified, we also require the room_id to be specified. + If not specified, we delete all the keys in this version of + the backup (or for the specified room) + + Returns: + A deferred of the deletion transaction + """ + + keyvalues = { + "user_id": user_id, + "version": version, + } + if room_id: + keyvalues['room_id'] = room_id + if session_id: + keyvalues['session_id'] = session_id + + yield self._simple_delete( + table="e2e_room_keys", + keyvalues=keyvalues, + desc="delete_e2e_room_keys", + ) + + @staticmethod + def _get_current_version(txn, user_id): + txn.execute( + "SELECT MAX(version) FROM e2e_room_keys_versions " + "WHERE user_id=? AND deleted=0", + (user_id,) + ) + row = txn.fetchone() + if not row: + raise StoreError(404, 'No current backup version') + return row[0] + + def get_e2e_room_keys_version_info(self, user_id, version=None): + """Get info metadata about a version of our room_keys backup. + + Args: + user_id(str): the user whose backup we're querying + version(str): Optional. the version ID of the backup we're querying about + If missing, we return the information about the current version. + Raises: + StoreError: with code 404 if there are no e2e_room_keys_versions present + Returns: + A deferred dict giving the info metadata for this backup version + """ + + def _get_e2e_room_keys_version_info_txn(txn): + if version is None: + this_version = self._get_current_version(txn, user_id) + else: + this_version = version + + result = self._simple_select_one_txn( + txn, + table="e2e_room_keys_versions", + keyvalues={ + "user_id": user_id, + "version": this_version, + "deleted": 0, + }, + retcols=( + "version", + "algorithm", + "auth_data", + ), + ) + result["auth_data"] = json.loads(result["auth_data"]) + return result + + return self.runInteraction( + "get_e2e_room_keys_version_info", + _get_e2e_room_keys_version_info_txn + ) + + def create_e2e_room_keys_version(self, user_id, info): + """Atomically creates a new version of this user's e2e_room_keys store + with the given version info. + + Args: + user_id(str): the user whose backup we're creating a version + info(dict): the info about the backup version to be created + + Returns: + A deferred string for the newly created version ID + """ + + def _create_e2e_room_keys_version_txn(txn): + txn.execute( + "SELECT MAX(version) FROM e2e_room_keys_versions WHERE user_id=?", + (user_id,) + ) + current_version = txn.fetchone()[0] + if current_version is None: + current_version = '0' + + new_version = str(int(current_version) + 1) + + self._simple_insert_txn( + txn, + table="e2e_room_keys_versions", + values={ + "user_id": user_id, + "version": new_version, + "algorithm": info["algorithm"], + "auth_data": json.dumps(info["auth_data"]), + }, + ) + + return new_version + + return self.runInteraction( + "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn + ) + + def delete_e2e_room_keys_version(self, user_id, version=None): + """Delete a given backup version of the user's room keys. + Doesn't delete their actual key data. + + Args: + user_id(str): the user whose backup version we're deleting + version(str): Optional. the version ID of the backup version we're deleting + If missing, we delete the current backup version info. + Raises: + StoreError: with code 404 if there are no e2e_room_keys_versions present, + or if the version requested doesn't exist. + """ + + def _delete_e2e_room_keys_version_txn(txn): + if version is None: + this_version = self._get_current_version(txn, user_id) + else: + this_version = version + + return self._simple_update_one_txn( + txn, + table="e2e_room_keys_versions", + keyvalues={ + "user_id": user_id, + "version": this_version, + }, + updatevalues={ + "deleted": 1, + } + ) + + return self.runInteraction( + "delete_e2e_room_keys_version", + _delete_e2e_room_keys_version_txn + ) diff --git a/synapse/storage/schema/delta/51/e2e_room_keys.sql b/synapse/storage/schema/delta/51/e2e_room_keys.sql new file mode 100644 index 0000000000..c0e66a697d --- /dev/null +++ b/synapse/storage/schema/delta/51/e2e_room_keys.sql @@ -0,0 +1,39 @@ +/* Copyright 2017 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- users' optionally backed up encrypted e2e sessions +CREATE TABLE e2e_room_keys ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + session_id TEXT NOT NULL, + version TEXT NOT NULL, + first_message_index INT, + forwarded_count INT, + is_verified BOOLEAN, + session_data TEXT NOT NULL +); + +CREATE UNIQUE INDEX e2e_room_keys_idx ON e2e_room_keys(user_id, room_id, session_id); + +-- the metadata for each generation of encrypted e2e session backups +CREATE TABLE e2e_room_keys_versions ( + user_id TEXT NOT NULL, + version TEXT NOT NULL, + algorithm TEXT NOT NULL, + auth_data TEXT NOT NULL, + deleted SMALLINT DEFAULT 0 NOT NULL +); + +CREATE UNIQUE INDEX e2e_room_keys_versions_idx ON e2e_room_keys_versions(user_id, version); diff --git a/synapse/visibility.py b/synapse/visibility.py index c64ad2144c..43f48196be 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -219,7 +219,7 @@ def filter_events_for_server(store, server_name, events): # Whatever else we do, we need to check for senders which have requested # erasure of their data. erased_senders = yield store.are_users_erased( - e.sender for e in events, + (e.sender for e in events), ) def redact_disallowed(event, state): |