diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 0970f22a75..1695ed8ca3 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -247,6 +247,27 @@ class ExperimentalConfig(Config):
# MSC3026 (busy presence state)
self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False)
+ # MSC2697 (device dehydration)
+ # Enabled by default since this option was added after adding the feature.
+ # It is not recommended that both MSC2697 and MSC3814 both be enabled at
+ # once.
+ self.msc2697_enabled: bool = experimental.get("msc2697_enabled", True)
+
+ # MSC3814 (dehydrated devices with SSSS)
+ # This is an alternative method to achieve the same goals as MSC2697.
+ # It is not recommended that both MSC2697 and MSC3814 both be enabled at
+ # once.
+ self.msc3814_enabled: bool = experimental.get("msc3814_enabled", False)
+
+ if self.msc2697_enabled and self.msc3814_enabled:
+ raise ConfigError(
+ "MSC2697 and MSC3814 should not both be enabled.",
+ (
+ "experimental_features",
+ "msc3814_enabled",
+ ),
+ )
+
# MSC3244 (room version capabilities)
self.msc3244_enabled: bool = experimental.get("msc3244_enabled", True)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index d73d9dca08..f3a713f5fa 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -653,6 +653,7 @@ class DeviceHandler(DeviceWorkerHandler):
async def store_dehydrated_device(
self,
user_id: str,
+ device_id: Optional[str],
device_data: JsonDict,
initial_device_display_name: Optional[str] = None,
) -> str:
@@ -661,6 +662,7 @@ class DeviceHandler(DeviceWorkerHandler):
Args:
user_id: the user that we are storing the device for
+ device_id: device id supplied by client
device_data: the dehydrated device information
initial_device_display_name: The display name to use for the device
Returns:
@@ -668,7 +670,7 @@ class DeviceHandler(DeviceWorkerHandler):
"""
device_id = await self.check_device_registered(
user_id,
- None,
+ device_id,
initial_device_display_name,
)
old_device_id = await self.store.store_dehydrated_device(
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 3caf9b31cc..15e94a03cb 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -13,10 +13,11 @@
# limitations under the License.
import logging
-from typing import TYPE_CHECKING, Any, Dict
+from http import HTTPStatus
+from typing import TYPE_CHECKING, Any, Dict, Optional
from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
-from synapse.api.errors import SynapseError
+from synapse.api.errors import Codes, SynapseError
from synapse.api.ratelimiting import Ratelimiter
from synapse.logging.context import run_in_background
from synapse.logging.opentracing import (
@@ -48,6 +49,9 @@ class DeviceMessageHandler:
self.store = hs.get_datastores().main
self.notifier = hs.get_notifier()
self.is_mine = hs.is_mine
+ if hs.config.experimental.msc3814_enabled:
+ self.event_sources = hs.get_event_sources()
+ self.device_handler = hs.get_device_handler()
# We only need to poke the federation sender explicitly if its on the
# same instance. Other federation sender instances will get notified by
@@ -303,3 +307,103 @@ class DeviceMessageHandler:
# Enqueue a new federation transaction to send the new
# device messages to each remote destination.
self.federation_sender.send_device_messages(destination)
+
+ async def get_events_for_dehydrated_device(
+ self,
+ requester: Requester,
+ device_id: str,
+ since_token: Optional[str],
+ limit: int,
+ ) -> JsonDict:
+ """Fetches up to `limit` events sent to `device_id` starting from `since_token`
+ and returns the new since token. If there are no more messages, returns an empty
+ array.
+
+ Args:
+ requester: the user requesting the messages
+ device_id: ID of the dehydrated device
+ since_token: stream id to start from when fetching messages
+ limit: the number of messages to fetch
+ Returns:
+ A dict containing the to-device messages, as well as a token that the client
+ can provide in the next call to fetch the next batch of messages
+ """
+
+ user_id = requester.user.to_string()
+
+ # only allow fetching messages for the dehydrated device id currently associated
+ # with the user
+ dehydrated_device = await self.device_handler.get_dehydrated_device(user_id)
+ if dehydrated_device is None:
+ raise SynapseError(
+ HTTPStatus.FORBIDDEN,
+ "No dehydrated device exists",
+ Codes.FORBIDDEN,
+ )
+
+ dehydrated_device_id, _ = dehydrated_device
+ if device_id != dehydrated_device_id:
+ raise SynapseError(
+ HTTPStatus.FORBIDDEN,
+ "You may only fetch messages for your dehydrated device",
+ Codes.FORBIDDEN,
+ )
+
+ since_stream_id = 0
+ if since_token:
+ if not since_token.startswith("d"):
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "from parameter %r has an invalid format" % (since_token,),
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ try:
+ since_stream_id = int(since_token[1:])
+ except Exception:
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "from parameter %r has an invalid format" % (since_token,),
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ # if we have a since token, delete any to-device messages before that token
+ # (since we now know that the device has received them)
+ deleted = await self.store.delete_messages_for_device(
+ user_id, device_id, since_stream_id
+ )
+ logger.debug(
+ "Deleted %d to-device messages up to %d for user_id %s device_id %s",
+ deleted,
+ since_stream_id,
+ user_id,
+ device_id,
+ )
+
+ to_token = self.event_sources.get_current_token().to_device_key
+
+ messages, stream_id = await self.store.get_messages_for_device(
+ user_id, device_id, since_stream_id, to_token, limit
+ )
+
+ for message in messages:
+ # Remove the message id before sending to client
+ message_id = message.pop("message_id", None)
+ if message_id:
+ set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
+
+ logger.debug(
+ "Returning %d to-device messages between %d and %d (current token: %d) for "
+ "dehydrated device %s, user_id %s",
+ len(messages),
+ since_stream_id,
+ stream_id,
+ to_token,
+ device_id,
+ user_id,
+ )
+
+ return {
+ "events": messages,
+ "next_batch": f"d{stream_id}",
+ }
diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py
index 38dff9703f..690d2ec406 100644
--- a/synapse/rest/client/devices.py
+++ b/synapse/rest/client/devices.py
@@ -14,19 +14,22 @@
# limitations under the License.
import logging
+from http import HTTPStatus
from typing import TYPE_CHECKING, List, Optional, Tuple
from pydantic import Extra, StrictStr
from synapse.api import errors
-from synapse.api.errors import NotFoundError, UnrecognizedRequestError
+from synapse.api.errors import NotFoundError, SynapseError, UnrecognizedRequestError
from synapse.handlers.device import DeviceHandler
from synapse.http.server import HttpServer
from synapse.http.servlet import (
RestServlet,
parse_and_validate_json_object_from_request,
+ parse_integer,
)
from synapse.http.site import SynapseRequest
+from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet
from synapse.rest.client._base import client_patterns, interactive_auth_handler
from synapse.rest.client.models import AuthenticationData
from synapse.rest.models import RequestBodyModel
@@ -229,6 +232,8 @@ class DehydratedDeviceDataModel(RequestBodyModel):
class DehydratedDeviceServlet(RestServlet):
"""Retrieve or store a dehydrated device.
+ Implements either MSC2697 or MSC3814.
+
GET /org.matrix.msc2697.v2/dehydrated_device
HTTP/1.1 200 OK
@@ -261,9 +266,7 @@ class DehydratedDeviceServlet(RestServlet):
"""
- PATTERNS = client_patterns("/org.matrix.msc2697.v2/dehydrated_device$", releases=())
-
- def __init__(self, hs: "HomeServer"):
+ def __init__(self, hs: "HomeServer", msc2697: bool = True):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
@@ -271,6 +274,13 @@ class DehydratedDeviceServlet(RestServlet):
assert isinstance(handler, DeviceHandler)
self.device_handler = handler
+ self.PATTERNS = client_patterns(
+ "/org.matrix.msc2697.v2/dehydrated_device$"
+ if msc2697
+ else "/org.matrix.msc3814.v1/dehydrated_device$",
+ releases=(),
+ )
+
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
dehydrated_device = await self.device_handler.get_dehydrated_device(
@@ -293,6 +303,7 @@ class DehydratedDeviceServlet(RestServlet):
device_id = await self.device_handler.store_dehydrated_device(
requester.user.to_string(),
+ None,
submission.device_data.dict(),
submission.initial_device_display_name,
)
@@ -347,6 +358,210 @@ class ClaimDehydratedDeviceServlet(RestServlet):
return 200, result
+class DehydratedDeviceEventsServlet(RestServlet):
+ PATTERNS = client_patterns(
+ "/org.matrix.msc3814.v1/dehydrated_device/(?P<device_id>[^/]*)/events$",
+ releases=(),
+ )
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__()
+ self.message_handler = hs.get_device_message_handler()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastores().main
+
+ class PostBody(RequestBodyModel):
+ next_batch: Optional[StrictStr]
+
+ async def on_POST(
+ self, request: SynapseRequest, device_id: str
+ ) -> Tuple[int, JsonDict]:
+ requester = await self.auth.get_user_by_req(request)
+
+ next_batch = parse_and_validate_json_object_from_request(
+ request, self.PostBody
+ ).next_batch
+ limit = parse_integer(request, "limit", 100)
+
+ msgs = await self.message_handler.get_events_for_dehydrated_device(
+ requester=requester,
+ device_id=device_id,
+ since_token=next_batch,
+ limit=limit,
+ )
+
+ return 200, msgs
+
+
+class DehydratedDeviceV2Servlet(RestServlet):
+ """Upload, retrieve, or delete a dehydrated device.
+
+ GET /org.matrix.msc3814.v1/dehydrated_device
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+
+ {
+ "device_id": "dehydrated_device_id",
+ "device_data": {
+ "algorithm": "org.matrix.msc2697.v1.dehydration.v1.olm",
+ "account": "dehydrated_device"
+ }
+ }
+
+ PUT /org.matrix.msc3814.v1/dehydrated_device
+ Content-Type: application/json
+
+ {
+ "device_id": "dehydrated_device_id",
+ "device_data": {
+ "algorithm": "org.matrix.msc2697.v1.dehydration.v1.olm",
+ "account": "dehydrated_device"
+ },
+ "device_keys": {
+ "user_id": "<user_id>",
+ "device_id": "<device_id>",
+ "valid_until_ts": <millisecond_timestamp>,
+ "algorithms": [
+ "m.olm.curve25519-aes-sha2",
+ ]
+ "keys": {
+ "<algorithm>:<device_id>": "<key_base64>",
+ },
+ "signatures:" {
+ "<user_id>" {
+ "<algorithm>:<device_id>": "<signature_base64>"
+ }
+ }
+ },
+ "fallback_keys": {
+ "<algorithm>:<device_id>": "<key_base64>",
+ "signed_<algorithm>:<device_id>": {
+ "fallback": true,
+ "key": "<key_base64>",
+ "signatures": {
+ "<user_id>": {
+ "<algorithm>:<device_id>": "<key_base64>"
+ }
+ }
+ }
+ }
+ "one_time_keys": {
+ "<algorithm>:<key_id>": "<key_base64>"
+ },
+
+ }
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+
+ {
+ "device_id": "dehydrated_device_id"
+ }
+
+ DELETE /org.matrix.msc3814.v1/dehydrated_device
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+
+ {
+ "device_id": "dehydrated_device_id",
+ }
+ """
+
+ PATTERNS = [
+ *client_patterns("/org.matrix.msc3814.v1/dehydrated_device$", releases=()),
+ ]
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__()
+ self.hs = hs
+ self.auth = hs.get_auth()
+ handler = hs.get_device_handler()
+ assert isinstance(handler, DeviceHandler)
+ self.e2e_keys_handler = hs.get_e2e_keys_handler()
+ self.device_handler = handler
+
+ if hs.config.worker.worker_app is None:
+ # if main process
+ self.key_uploader = self.e2e_keys_handler.upload_keys_for_user
+ else:
+ # then a worker
+ self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs)
+
+ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ requester = await self.auth.get_user_by_req(request)
+
+ dehydrated_device = await self.device_handler.get_dehydrated_device(
+ requester.user.to_string()
+ )
+
+ if dehydrated_device is not None:
+ (device_id, device_data) = dehydrated_device
+ result = {"device_id": device_id, "device_data": device_data}
+ return 200, result
+ else:
+ raise errors.NotFoundError("No dehydrated device available")
+
+ async def on_DELETE(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ requester = await self.auth.get_user_by_req(request)
+
+ dehydrated_device = await self.device_handler.get_dehydrated_device(
+ requester.user.to_string()
+ )
+
+ if dehydrated_device is not None:
+ (device_id, device_data) = dehydrated_device
+
+ result = await self.device_handler.rehydrate_device(
+ requester.user.to_string(),
+ self.auth.get_access_token_from_request(request),
+ device_id,
+ )
+
+ result = {"device_id": device_id}
+
+ return 200, result
+ else:
+ raise errors.NotFoundError("No dehydrated device available")
+
+ class PutBody(RequestBodyModel):
+ device_data: DehydratedDeviceDataModel
+ device_id: StrictStr
+ initial_device_display_name: Optional[StrictStr]
+
+ class Config:
+ extra = Extra.allow
+
+ async def on_PUT(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ submission = parse_and_validate_json_object_from_request(request, self.PutBody)
+ requester = await self.auth.get_user_by_req(request)
+ user_id = requester.user.to_string()
+
+ device_info = submission.dict()
+ if "device_keys" not in device_info.keys():
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "Device key(s) not found, these must be provided.",
+ )
+
+ # TODO: Those two operations, creating a device and storing the
+ # device's keys should be atomic.
+ device_id = await self.device_handler.store_dehydrated_device(
+ requester.user.to_string(),
+ submission.device_id,
+ submission.device_data.dict(),
+ submission.initial_device_display_name,
+ )
+
+ # TODO: Do we need to do something with the result here?
+ await self.key_uploader(
+ user_id=user_id, device_id=submission.device_id, keys=submission.dict()
+ )
+
+ return 200, {"device_id": device_id}
+
+
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
if (
hs.config.worker.worker_app is None
@@ -354,7 +569,12 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
):
DeleteDevicesRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)
+
if hs.config.worker.worker_app is None:
DeviceRestServlet(hs).register(http_server)
- DehydratedDeviceServlet(hs).register(http_server)
- ClaimDehydratedDeviceServlet(hs).register(http_server)
+ if hs.config.experimental.msc2697_enabled:
+ DehydratedDeviceServlet(hs, msc2697=True).register(http_server)
+ ClaimDehydratedDeviceServlet(hs).register(http_server)
+ if hs.config.experimental.msc3814_enabled:
+ DehydratedDeviceV2Servlet(hs).register(http_server)
+ DehydratedDeviceEventsServlet(hs).register(http_server)
|