From 654902a7583d20d7e0b57dc4634fbe573ff99993 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 24 Jul 2023 13:43:43 +0100 Subject: Resync stale devices in background (#15975) This is so we don't block responding to federation transaction while we try and fetch the device lists. --- synapse/handlers/device.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/device.py') diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5d12a39e26..d73d9dca08 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -1124,7 +1124,14 @@ class DeviceListUpdater(DeviceListWorkerUpdater): ) if resync: - await self.multi_user_device_resync([user_id]) + # We mark as stale up front in case we get restarted. + await self.store.mark_remote_users_device_caches_as_stale([user_id]) + run_as_background_process( + "_maybe_retry_device_resync", + self.multi_user_device_resync, + [user_id], + False, + ) else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) -- cgit 1.5.1 From 641ff9ef7eaa7f1a632b983f4d36bb28dc23484d Mon Sep 17 00:00:00 2001 From: Shay Date: Mon, 24 Jul 2023 08:23:19 -0700 Subject: Support MSC3814: Dehydrated Devices (#15929) Signed-off-by: Nicolas Werner Co-authored-by: Nicolas Werner Co-authored-by: Nicolas Werner <89468146+nico-famedly@users.noreply.github.com> Co-authored-by: Hubert Chathi --- changelog.d/15929.feature | 1 + synapse/config/experimental.py | 21 ++++ synapse/handlers/device.py | 4 +- synapse/handlers/devicemessage.py | 108 +++++++++++++++++- synapse/rest/client/devices.py | 232 +++++++++++++++++++++++++++++++++++++- tests/handlers/test_device.py | 99 +++++++++++++++- tests/rest/client/test_devices.py | 150 +++++++++++++++++++++++- 7 files changed, 603 insertions(+), 12 deletions(-) create mode 100644 changelog.d/15929.feature (limited to 'synapse/handlers/device.py') diff --git a/changelog.d/15929.feature b/changelog.d/15929.feature new file mode 100644 index 0000000000..c3aaeae66e --- /dev/null +++ b/changelog.d/15929.feature @@ -0,0 +1 @@ +Implement [MSC3814](https://github.com/matrix-org/matrix-spec-proposals/pull/3814), dehydrated devices v2/shrivelled sessions and move [MSC2697](https://github.com/matrix-org/matrix-spec-proposals/pull/2697) behind a config flag. Contributed by Nico from Famedly and H-Shay. diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 0970f22a75..1695ed8ca3 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -247,6 +247,27 @@ class ExperimentalConfig(Config): # MSC3026 (busy presence state) self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False) + # MSC2697 (device dehydration) + # Enabled by default since this option was added after adding the feature. + # It is not recommended that both MSC2697 and MSC3814 both be enabled at + # once. + self.msc2697_enabled: bool = experimental.get("msc2697_enabled", True) + + # MSC3814 (dehydrated devices with SSSS) + # This is an alternative method to achieve the same goals as MSC2697. + # It is not recommended that both MSC2697 and MSC3814 both be enabled at + # once. + self.msc3814_enabled: bool = experimental.get("msc3814_enabled", False) + + if self.msc2697_enabled and self.msc3814_enabled: + raise ConfigError( + "MSC2697 and MSC3814 should not both be enabled.", + ( + "experimental_features", + "msc3814_enabled", + ), + ) + # MSC3244 (room version capabilities) self.msc3244_enabled: bool = experimental.get("msc3244_enabled", True) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d73d9dca08..f3a713f5fa 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -653,6 +653,7 @@ class DeviceHandler(DeviceWorkerHandler): async def store_dehydrated_device( self, user_id: str, + device_id: Optional[str], device_data: JsonDict, initial_device_display_name: Optional[str] = None, ) -> str: @@ -661,6 +662,7 @@ class DeviceHandler(DeviceWorkerHandler): Args: user_id: the user that we are storing the device for + device_id: device id supplied by client device_data: the dehydrated device information initial_device_display_name: The display name to use for the device Returns: @@ -668,7 +670,7 @@ class DeviceHandler(DeviceWorkerHandler): """ device_id = await self.check_device_registered( user_id, - None, + device_id, initial_device_display_name, ) old_device_id = await self.store.store_dehydrated_device( diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 3caf9b31cc..15e94a03cb 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -13,10 +13,11 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Any, Dict +from http import HTTPStatus +from typing import TYPE_CHECKING, Any, Dict, Optional from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes -from synapse.api.errors import SynapseError +from synapse.api.errors import Codes, SynapseError from synapse.api.ratelimiting import Ratelimiter from synapse.logging.context import run_in_background from synapse.logging.opentracing import ( @@ -48,6 +49,9 @@ class DeviceMessageHandler: self.store = hs.get_datastores().main self.notifier = hs.get_notifier() self.is_mine = hs.is_mine + if hs.config.experimental.msc3814_enabled: + self.event_sources = hs.get_event_sources() + self.device_handler = hs.get_device_handler() # We only need to poke the federation sender explicitly if its on the # same instance. Other federation sender instances will get notified by @@ -303,3 +307,103 @@ class DeviceMessageHandler: # Enqueue a new federation transaction to send the new # device messages to each remote destination. self.federation_sender.send_device_messages(destination) + + async def get_events_for_dehydrated_device( + self, + requester: Requester, + device_id: str, + since_token: Optional[str], + limit: int, + ) -> JsonDict: + """Fetches up to `limit` events sent to `device_id` starting from `since_token` + and returns the new since token. If there are no more messages, returns an empty + array. + + Args: + requester: the user requesting the messages + device_id: ID of the dehydrated device + since_token: stream id to start from when fetching messages + limit: the number of messages to fetch + Returns: + A dict containing the to-device messages, as well as a token that the client + can provide in the next call to fetch the next batch of messages + """ + + user_id = requester.user.to_string() + + # only allow fetching messages for the dehydrated device id currently associated + # with the user + dehydrated_device = await self.device_handler.get_dehydrated_device(user_id) + if dehydrated_device is None: + raise SynapseError( + HTTPStatus.FORBIDDEN, + "No dehydrated device exists", + Codes.FORBIDDEN, + ) + + dehydrated_device_id, _ = dehydrated_device + if device_id != dehydrated_device_id: + raise SynapseError( + HTTPStatus.FORBIDDEN, + "You may only fetch messages for your dehydrated device", + Codes.FORBIDDEN, + ) + + since_stream_id = 0 + if since_token: + if not since_token.startswith("d"): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "from parameter %r has an invalid format" % (since_token,), + errcode=Codes.INVALID_PARAM, + ) + + try: + since_stream_id = int(since_token[1:]) + except Exception: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "from parameter %r has an invalid format" % (since_token,), + errcode=Codes.INVALID_PARAM, + ) + + # if we have a since token, delete any to-device messages before that token + # (since we now know that the device has received them) + deleted = await self.store.delete_messages_for_device( + user_id, device_id, since_stream_id + ) + logger.debug( + "Deleted %d to-device messages up to %d for user_id %s device_id %s", + deleted, + since_stream_id, + user_id, + device_id, + ) + + to_token = self.event_sources.get_current_token().to_device_key + + messages, stream_id = await self.store.get_messages_for_device( + user_id, device_id, since_stream_id, to_token, limit + ) + + for message in messages: + # Remove the message id before sending to client + message_id = message.pop("message_id", None) + if message_id: + set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id) + + logger.debug( + "Returning %d to-device messages between %d and %d (current token: %d) for " + "dehydrated device %s, user_id %s", + len(messages), + since_stream_id, + stream_id, + to_token, + device_id, + user_id, + ) + + return { + "events": messages, + "next_batch": f"d{stream_id}", + } diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py index 38dff9703f..690d2ec406 100644 --- a/synapse/rest/client/devices.py +++ b/synapse/rest/client/devices.py @@ -14,19 +14,22 @@ # limitations under the License. import logging +from http import HTTPStatus from typing import TYPE_CHECKING, List, Optional, Tuple from pydantic import Extra, StrictStr from synapse.api import errors -from synapse.api.errors import NotFoundError, UnrecognizedRequestError +from synapse.api.errors import NotFoundError, SynapseError, UnrecognizedRequestError from synapse.handlers.device import DeviceHandler from synapse.http.server import HttpServer from synapse.http.servlet import ( RestServlet, parse_and_validate_json_object_from_request, + parse_integer, ) from synapse.http.site import SynapseRequest +from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet from synapse.rest.client._base import client_patterns, interactive_auth_handler from synapse.rest.client.models import AuthenticationData from synapse.rest.models import RequestBodyModel @@ -229,6 +232,8 @@ class DehydratedDeviceDataModel(RequestBodyModel): class DehydratedDeviceServlet(RestServlet): """Retrieve or store a dehydrated device. + Implements either MSC2697 or MSC3814. + GET /org.matrix.msc2697.v2/dehydrated_device HTTP/1.1 200 OK @@ -261,9 +266,7 @@ class DehydratedDeviceServlet(RestServlet): """ - PATTERNS = client_patterns("/org.matrix.msc2697.v2/dehydrated_device$", releases=()) - - def __init__(self, hs: "HomeServer"): + def __init__(self, hs: "HomeServer", msc2697: bool = True): super().__init__() self.hs = hs self.auth = hs.get_auth() @@ -271,6 +274,13 @@ class DehydratedDeviceServlet(RestServlet): assert isinstance(handler, DeviceHandler) self.device_handler = handler + self.PATTERNS = client_patterns( + "/org.matrix.msc2697.v2/dehydrated_device$" + if msc2697 + else "/org.matrix.msc3814.v1/dehydrated_device$", + releases=(), + ) + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) dehydrated_device = await self.device_handler.get_dehydrated_device( @@ -293,6 +303,7 @@ class DehydratedDeviceServlet(RestServlet): device_id = await self.device_handler.store_dehydrated_device( requester.user.to_string(), + None, submission.device_data.dict(), submission.initial_device_display_name, ) @@ -347,6 +358,210 @@ class ClaimDehydratedDeviceServlet(RestServlet): return 200, result +class DehydratedDeviceEventsServlet(RestServlet): + PATTERNS = client_patterns( + "/org.matrix.msc3814.v1/dehydrated_device/(?P[^/]*)/events$", + releases=(), + ) + + def __init__(self, hs: "HomeServer"): + super().__init__() + self.message_handler = hs.get_device_message_handler() + self.auth = hs.get_auth() + self.store = hs.get_datastores().main + + class PostBody(RequestBodyModel): + next_batch: Optional[StrictStr] + + async def on_POST( + self, request: SynapseRequest, device_id: str + ) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + + next_batch = parse_and_validate_json_object_from_request( + request, self.PostBody + ).next_batch + limit = parse_integer(request, "limit", 100) + + msgs = await self.message_handler.get_events_for_dehydrated_device( + requester=requester, + device_id=device_id, + since_token=next_batch, + limit=limit, + ) + + return 200, msgs + + +class DehydratedDeviceV2Servlet(RestServlet): + """Upload, retrieve, or delete a dehydrated device. + + GET /org.matrix.msc3814.v1/dehydrated_device + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "device_id": "dehydrated_device_id", + "device_data": { + "algorithm": "org.matrix.msc2697.v1.dehydration.v1.olm", + "account": "dehydrated_device" + } + } + + PUT /org.matrix.msc3814.v1/dehydrated_device + Content-Type: application/json + + { + "device_id": "dehydrated_device_id", + "device_data": { + "algorithm": "org.matrix.msc2697.v1.dehydration.v1.olm", + "account": "dehydrated_device" + }, + "device_keys": { + "user_id": "", + "device_id": "", + "valid_until_ts": , + "algorithms": [ + "m.olm.curve25519-aes-sha2", + ] + "keys": { + ":": "", + }, + "signatures:" { + "" { + ":": "" + } + } + }, + "fallback_keys": { + ":": "", + "signed_:": { + "fallback": true, + "key": "", + "signatures": { + "": { + ":": "" + } + } + } + } + "one_time_keys": { + ":": "" + }, + + } + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "device_id": "dehydrated_device_id" + } + + DELETE /org.matrix.msc3814.v1/dehydrated_device + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "device_id": "dehydrated_device_id", + } + """ + + PATTERNS = [ + *client_patterns("/org.matrix.msc3814.v1/dehydrated_device$", releases=()), + ] + + def __init__(self, hs: "HomeServer"): + super().__init__() + self.hs = hs + self.auth = hs.get_auth() + handler = hs.get_device_handler() + assert isinstance(handler, DeviceHandler) + self.e2e_keys_handler = hs.get_e2e_keys_handler() + self.device_handler = handler + + if hs.config.worker.worker_app is None: + # if main process + self.key_uploader = self.e2e_keys_handler.upload_keys_for_user + else: + # then a worker + self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs) + + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + + dehydrated_device = await self.device_handler.get_dehydrated_device( + requester.user.to_string() + ) + + if dehydrated_device is not None: + (device_id, device_data) = dehydrated_device + result = {"device_id": device_id, "device_data": device_data} + return 200, result + else: + raise errors.NotFoundError("No dehydrated device available") + + async def on_DELETE(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + + dehydrated_device = await self.device_handler.get_dehydrated_device( + requester.user.to_string() + ) + + if dehydrated_device is not None: + (device_id, device_data) = dehydrated_device + + result = await self.device_handler.rehydrate_device( + requester.user.to_string(), + self.auth.get_access_token_from_request(request), + device_id, + ) + + result = {"device_id": device_id} + + return 200, result + else: + raise errors.NotFoundError("No dehydrated device available") + + class PutBody(RequestBodyModel): + device_data: DehydratedDeviceDataModel + device_id: StrictStr + initial_device_display_name: Optional[StrictStr] + + class Config: + extra = Extra.allow + + async def on_PUT(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + submission = parse_and_validate_json_object_from_request(request, self.PutBody) + requester = await self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + + device_info = submission.dict() + if "device_keys" not in device_info.keys(): + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Device key(s) not found, these must be provided.", + ) + + # TODO: Those two operations, creating a device and storing the + # device's keys should be atomic. + device_id = await self.device_handler.store_dehydrated_device( + requester.user.to_string(), + submission.device_id, + submission.device_data.dict(), + submission.initial_device_display_name, + ) + + # TODO: Do we need to do something with the result here? + await self.key_uploader( + user_id=user_id, device_id=submission.device_id, keys=submission.dict() + ) + + return 200, {"device_id": device_id} + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: if ( hs.config.worker.worker_app is None @@ -354,7 +569,12 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ): DeleteDevicesRestServlet(hs).register(http_server) DevicesRestServlet(hs).register(http_server) + if hs.config.worker.worker_app is None: DeviceRestServlet(hs).register(http_server) - DehydratedDeviceServlet(hs).register(http_server) - ClaimDehydratedDeviceServlet(hs).register(http_server) + if hs.config.experimental.msc2697_enabled: + DehydratedDeviceServlet(hs, msc2697=True).register(http_server) + ClaimDehydratedDeviceServlet(hs).register(http_server) + if hs.config.experimental.msc3814_enabled: + DehydratedDeviceV2Servlet(hs).register(http_server) + DehydratedDeviceEventsServlet(hs).register(http_server) diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 66215af2b8..647ee09279 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -17,15 +17,18 @@ from typing import Optional from unittest import mock +from twisted.internet.defer import ensureDeferred from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import RoomEncryptionAlgorithms from synapse.api.errors import NotFoundError, SynapseError from synapse.appservice import ApplicationService from synapse.handlers.device import MAX_DEVICE_DISPLAY_NAME_LEN, DeviceHandler +from synapse.rest import admin +from synapse.rest.client import devices, login, register from synapse.server import HomeServer from synapse.storage.databases.main.appservice import _make_exclusive_regex -from synapse.types import JsonDict +from synapse.types import JsonDict, create_requester from synapse.util import Clock from tests import unittest @@ -399,11 +402,19 @@ class DeviceTestCase(unittest.HomeserverTestCase): class DehydrationTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets_for_client_rest_resource, + login.register_servlets, + register.register_servlets, + devices.register_servlets, + ] + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: hs = self.setup_test_homeserver("server") handler = hs.get_device_handler() assert isinstance(handler, DeviceHandler) self.handler = handler + self.message_handler = hs.get_device_message_handler() self.registration = hs.get_registration_handler() self.auth = hs.get_auth() self.store = hs.get_datastores().main @@ -418,6 +429,7 @@ class DehydrationTestCase(unittest.HomeserverTestCase): stored_dehydrated_device_id = self.get_success( self.handler.store_dehydrated_device( user_id=user_id, + device_id=None, device_data={"device_data": {"foo": "bar"}}, initial_device_display_name="dehydrated device", ) @@ -481,3 +493,88 @@ class DehydrationTestCase(unittest.HomeserverTestCase): ret = self.get_success(self.handler.get_dehydrated_device(user_id=user_id)) self.assertIsNone(ret) + + @unittest.override_config( + {"experimental_features": {"msc2697_enabled": False, "msc3814_enabled": True}} + ) + def test_dehydrate_v2_and_fetch_events(self) -> None: + user_id = "@boris:server" + + self.get_success(self.store.register_user(user_id, "foobar")) + + # First check if we can store and fetch a dehydrated device + stored_dehydrated_device_id = self.get_success( + self.handler.store_dehydrated_device( + user_id=user_id, + device_id=None, + device_data={"device_data": {"foo": "bar"}}, + initial_device_display_name="dehydrated device", + ) + ) + + device_info = self.get_success( + self.handler.get_dehydrated_device(user_id=user_id) + ) + assert device_info is not None + retrieved_device_id, device_data = device_info + self.assertEqual(retrieved_device_id, stored_dehydrated_device_id) + self.assertEqual(device_data, {"device_data": {"foo": "bar"}}) + + # Create a new login for the user + device_id, access_token, _expiration_time, _refresh_token = self.get_success( + self.registration.register_device( + user_id=user_id, + device_id=None, + initial_display_name="new device", + ) + ) + + requester = create_requester(user_id, device_id=device_id) + + # Fetching messages for a non-existing device should return an error + self.get_failure( + self.message_handler.get_events_for_dehydrated_device( + requester=requester, + device_id="not the right device ID", + since_token=None, + limit=10, + ), + SynapseError, + ) + + # Send a message to the dehydrated device + ensureDeferred( + self.message_handler.send_device_message( + requester=requester, + message_type="test.message", + messages={user_id: {stored_dehydrated_device_id: {"body": "foo"}}}, + ) + ) + self.pump() + + # Fetch the message of the dehydrated device + res = self.get_success( + self.message_handler.get_events_for_dehydrated_device( + requester=requester, + device_id=stored_dehydrated_device_id, + since_token=None, + limit=10, + ) + ) + + self.assertTrue(len(res["next_batch"]) > 1) + self.assertEqual(len(res["events"]), 1) + self.assertEqual(res["events"][0]["content"]["body"], "foo") + + # Fetch the message of the dehydrated device again, which should return nothing + # and delete the old messages + res = self.get_success( + self.message_handler.get_events_for_dehydrated_device( + requester=requester, + device_id=stored_dehydrated_device_id, + since_token=res["next_batch"], + limit=10, + ) + ) + self.assertTrue(len(res["next_batch"]) > 1) + self.assertEqual(len(res["events"]), 0) diff --git a/tests/rest/client/test_devices.py b/tests/rest/client/test_devices.py index d80eea17d3..b7d420cfec 100644 --- a/tests/rest/client/test_devices.py +++ b/tests/rest/client/test_devices.py @@ -13,12 +13,14 @@ # limitations under the License. from http import HTTPStatus +from twisted.internet.defer import ensureDeferred from twisted.test.proto_helpers import MemoryReactor from synapse.api.errors import NotFoundError from synapse.rest import admin, devices, room, sync -from synapse.rest.client import account, login, register +from synapse.rest.client import account, keys, login, register from synapse.server import HomeServer +from synapse.types import JsonDict, create_requester from synapse.util import Clock from tests import unittest @@ -208,8 +210,13 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase): login.register_servlets, register.register_servlets, devices.register_servlets, + keys.register_servlets, ] + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.registration = hs.get_registration_handler() + self.message_handler = hs.get_device_message_handler() + def test_PUT(self) -> None: """Sanity-check that we can PUT a dehydrated device. @@ -226,7 +233,21 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase): "device_data": { "algorithm": "org.matrix.msc2697.v1.dehydration.v1.olm", "account": "dehydrated_device", - } + }, + "device_keys": { + "user_id": "@alice:test", + "device_id": "device1", + "valid_until_ts": "80", + "algorithms": [ + "m.olm.curve25519-aes-sha2", + ], + "keys": { + ":": "", + }, + "signatures": { + "": {":": ""} + }, + }, }, access_token=token, shorthand=False, @@ -234,3 +255,128 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) device_id = channel.json_body.get("device_id") self.assertIsInstance(device_id, str) + + @unittest.override_config( + {"experimental_features": {"msc2697_enabled": False, "msc3814_enabled": True}} + ) + def test_dehydrate_msc3814(self) -> None: + user = self.register_user("mikey", "pass") + token = self.login(user, "pass", device_id="device1") + content: JsonDict = { + "device_data": { + "algorithm": "m.dehydration.v1.olm", + }, + "device_id": "device1", + "initial_device_display_name": "foo bar", + "device_keys": { + "user_id": "@mikey:test", + "device_id": "device1", + "valid_until_ts": "80", + "algorithms": [ + "m.olm.curve25519-aes-sha2", + ], + "keys": { + ":": "", + }, + "signatures": { + "": {":": ""} + }, + }, + } + channel = self.make_request( + "PUT", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + content=content, + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + device_id = channel.json_body.get("device_id") + assert device_id is not None + self.assertIsInstance(device_id, str) + self.assertEqual("device1", device_id) + + # test that we can now GET the dehydrated device info + channel = self.make_request( + "GET", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + returned_device_id = channel.json_body.get("device_id") + self.assertEqual(returned_device_id, device_id) + device_data = channel.json_body.get("device_data") + expected_device_data = { + "algorithm": "m.dehydration.v1.olm", + } + self.assertEqual(device_data, expected_device_data) + + # create another device for the user + ( + new_device_id, + _, + _, + _, + ) = self.get_success( + self.registration.register_device( + user_id=user, + device_id=None, + initial_display_name="new device", + ) + ) + requester = create_requester(user, device_id=new_device_id) + + # Send a message to the dehydrated device + ensureDeferred( + self.message_handler.send_device_message( + requester=requester, + message_type="test.message", + messages={user: {device_id: {"body": "test_message"}}}, + ) + ) + self.pump() + + # make sure we can fetch the message with our dehydrated device id + channel = self.make_request( + "POST", + f"_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device/{device_id}/events", + content={}, + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + expected_content = {"body": "test_message"} + self.assertEqual(channel.json_body["events"][0]["content"], expected_content) + next_batch_token = channel.json_body.get("next_batch") + + # fetch messages again and make sure that the message was deleted and we are returned an + # empty array + content = {"next_batch": next_batch_token} + channel = self.make_request( + "POST", + f"_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device/{device_id}/events", + content=content, + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + self.assertEqual(channel.json_body["events"], []) + + # make sure we can delete the dehydrated device + channel = self.make_request( + "DELETE", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + + # ...and after deleting it is no longer available + channel = self.make_request( + "GET", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 404) -- cgit 1.5.1 From 84ae2e3f6fb86115df767bb2f1fb16ac2fbaa7c3 Mon Sep 17 00:00:00 2001 From: Shay Date: Fri, 4 Aug 2023 10:49:54 -0700 Subject: Fix deletion for Dehydrated Devices (#16046) --- changelog.d/16046.bugfix | 1 + synapse/handlers/device.py | 16 +++++ synapse/rest/client/devices.py | 14 ++-- tests/rest/client/test_devices.py | 139 +++++++++++++++++++++++++++++++++++++- 4 files changed, 165 insertions(+), 5 deletions(-) create mode 100644 changelog.d/16046.bugfix (limited to 'synapse/handlers/device.py') diff --git a/changelog.d/16046.bugfix b/changelog.d/16046.bugfix new file mode 100644 index 0000000000..ce5a9ae4b5 --- /dev/null +++ b/changelog.d/16046.bugfix @@ -0,0 +1 @@ +Fix deletion in dehydrated devices v2. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f3a713f5fa..b7bf70a72d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -722,6 +722,22 @@ class DeviceHandler(DeviceWorkerHandler): return {"success": True} + async def delete_dehydrated_device(self, user_id: str, device_id: str) -> None: + """ + Delete a stored dehydrated device. + + Args: + user_id: the user_id to delete the device from + device_id: id of the dehydrated device to delete + """ + success = await self.store.remove_dehydrated_device(user_id, device_id) + + if not success: + raise errors.NotFoundError() + + await self.delete_devices(user_id, [device_id]) + await self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id) + @wrap_as_background_process("_handle_new_device_update_async") async def _handle_new_device_update_async(self) -> None: """Called when we have a new local device list update that we need to diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py index 690d2ec406..dd3f7fd666 100644 --- a/synapse/rest/client/devices.py +++ b/synapse/rest/client/devices.py @@ -513,10 +513,8 @@ class DehydratedDeviceV2Servlet(RestServlet): if dehydrated_device is not None: (device_id, device_data) = dehydrated_device - result = await self.device_handler.rehydrate_device( - requester.user.to_string(), - self.auth.get_access_token_from_request(request), - device_id, + await self.device_handler.delete_dehydrated_device( + requester.user.to_string(), device_id ) result = {"device_id": device_id} @@ -538,6 +536,14 @@ class DehydratedDeviceV2Servlet(RestServlet): requester = await self.auth.get_user_by_req(request) user_id = requester.user.to_string() + old_dehydrated_device = await self.device_handler.get_dehydrated_device(user_id) + + # if an old device exists, delete it before creating a new one + if old_dehydrated_device: + await self.device_handler.delete_dehydrated_device( + user_id, old_dehydrated_device[0] + ) + device_info = submission.dict() if "device_keys" not in device_info.keys(): raise SynapseError( diff --git a/tests/rest/client/test_devices.py b/tests/rest/client/test_devices.py index b7d420cfec..3cf29c10ea 100644 --- a/tests/rest/client/test_devices.py +++ b/tests/rest/client/test_devices.py @@ -379,4 +379,141 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase): access_token=token, shorthand=False, ) - self.assertEqual(channel.code, 404) + self.assertEqual(channel.code, 401) + + @unittest.override_config( + {"experimental_features": {"msc2697_enabled": False, "msc3814_enabled": True}} + ) + def test_msc3814_dehydrated_device_delete_works(self) -> None: + user = self.register_user("mikey", "pass") + token = self.login(user, "pass", device_id="device1") + content: JsonDict = { + "device_data": { + "algorithm": "m.dehydration.v1.olm", + }, + "device_id": "device2", + "initial_device_display_name": "foo bar", + "device_keys": { + "user_id": "@mikey:test", + "device_id": "device2", + "valid_until_ts": "80", + "algorithms": [ + "m.olm.curve25519-aes-sha2", + ], + "keys": { + ":": "", + }, + "signatures": { + "": {":": ""} + }, + }, + } + channel = self.make_request( + "PUT", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + content=content, + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + device_id = channel.json_body.get("device_id") + assert device_id is not None + self.assertIsInstance(device_id, str) + self.assertEqual("device2", device_id) + + # ensure that keys were uploaded and available + channel = self.make_request( + "POST", + "/_matrix/client/r0/keys/query", + { + "device_keys": { + user: ["device2"], + }, + }, + token, + ) + self.assertEqual( + channel.json_body["device_keys"][user]["device2"]["keys"], + { + ":": "", + }, + ) + + # delete the dehydrated device + channel = self.make_request( + "DELETE", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + + # ensure that keys are no longer available for deleted device + channel = self.make_request( + "POST", + "/_matrix/client/r0/keys/query", + { + "device_keys": { + user: ["device2"], + }, + }, + token, + ) + self.assertEqual(channel.json_body["device_keys"], {"@mikey:test": {}}) + + # check that an old device is deleted when user PUTs a new device + # First, create a device + content["device_id"] = "device3" + content["device_keys"]["device_id"] = "device3" + channel = self.make_request( + "PUT", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + content=content, + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + device_id = channel.json_body.get("device_id") + assert device_id is not None + self.assertIsInstance(device_id, str) + self.assertEqual("device3", device_id) + + # create a second device without deleting first device + content["device_id"] = "device4" + content["device_keys"]["device_id"] = "device4" + channel = self.make_request( + "PUT", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + content=content, + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + device_id = channel.json_body.get("device_id") + assert device_id is not None + self.assertIsInstance(device_id, str) + self.assertEqual("device4", device_id) + + # check that the second device that was created is what is returned when we GET + channel = self.make_request( + "GET", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + returned_device_id = channel.json_body["device_id"] + self.assertEqual(returned_device_id, "device4") + + # and that if we query the keys for the first device they are not there + channel = self.make_request( + "POST", + "/_matrix/client/r0/keys/query", + { + "device_keys": { + user: ["device3"], + }, + }, + token, + ) + self.assertEqual(channel.json_body["device_keys"], {"@mikey:test": {}}) -- cgit 1.5.1 From 0328b56468fe12c4d86ef636b60964527a510160 Mon Sep 17 00:00:00 2001 From: Shay Date: Tue, 8 Aug 2023 12:04:46 -0700 Subject: Support MSC3814: Dehydrated Devices Part 2 (#16010) --- changelog.d/16010.misc | 1 + synapse/handlers/device.py | 14 +- synapse/handlers/devicemessage.py | 13 -- synapse/rest/client/devices.py | 16 +- synapse/storage/databases/main/devices.py | 51 ++++++- synapse/storage/databases/main/end_to_end_keys.py | 170 ++++++++++++++-------- tests/handlers/test_device.py | 9 +- tests/rest/client/test_devices.py | 77 +++++++++- 8 files changed, 254 insertions(+), 97 deletions(-) create mode 100644 changelog.d/16010.misc (limited to 'synapse/handlers/device.py') diff --git a/changelog.d/16010.misc b/changelog.d/16010.misc new file mode 100644 index 0000000000..1e1a148069 --- /dev/null +++ b/changelog.d/16010.misc @@ -0,0 +1 @@ +Update dehydrated devices implementation. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index b7bf70a72d..5ae427d52c 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -385,6 +385,7 @@ class DeviceHandler(DeviceWorkerHandler): self.federation_sender = hs.get_federation_sender() self._account_data_handler = hs.get_account_data_handler() self._storage_controllers = hs.get_storage_controllers() + self.db_pool = hs.get_datastores().main.db_pool self.device_list_updater = DeviceListUpdater(hs, self) @@ -656,15 +657,17 @@ class DeviceHandler(DeviceWorkerHandler): device_id: Optional[str], device_data: JsonDict, initial_device_display_name: Optional[str] = None, + keys_for_device: Optional[JsonDict] = None, ) -> str: - """Store a dehydrated device for a user. If the user had a previous - dehydrated device, it is removed. + """Store a dehydrated device for a user, optionally storing the keys associated with + it as well. If the user had a previous dehydrated device, it is removed. Args: user_id: the user that we are storing the device for device_id: device id supplied by client device_data: the dehydrated device information initial_device_display_name: The display name to use for the device + keys_for_device: keys for the dehydrated device Returns: device id of the dehydrated device """ @@ -673,11 +676,16 @@ class DeviceHandler(DeviceWorkerHandler): device_id, initial_device_display_name, ) + + time_now = self.clock.time_msec() + old_device_id = await self.store.store_dehydrated_device( - user_id, device_id, device_data + user_id, device_id, device_data, time_now, keys_for_device ) + if old_device_id is not None: await self.delete_devices(user_id, [old_device_id]) + return device_id async def rehydrate_device( diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 15e94a03cb..17ff8821d9 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -367,19 +367,6 @@ class DeviceMessageHandler: errcode=Codes.INVALID_PARAM, ) - # if we have a since token, delete any to-device messages before that token - # (since we now know that the device has received them) - deleted = await self.store.delete_messages_for_device( - user_id, device_id, since_stream_id - ) - logger.debug( - "Deleted %d to-device messages up to %d for user_id %s device_id %s", - deleted, - since_stream_id, - user_id, - device_id, - ) - to_token = self.event_sources.get_current_token().to_device_key messages, stream_id = await self.store.get_messages_for_device( diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py index 51f17f80da..925f037743 100644 --- a/synapse/rest/client/devices.py +++ b/synapse/rest/client/devices.py @@ -29,7 +29,6 @@ from synapse.http.servlet import ( parse_integer, ) from synapse.http.site import SynapseRequest -from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet from synapse.rest.client._base import client_patterns, interactive_auth_handler from synapse.rest.client.models import AuthenticationData from synapse.rest.models import RequestBodyModel @@ -480,13 +479,6 @@ class DehydratedDeviceV2Servlet(RestServlet): self.e2e_keys_handler = hs.get_e2e_keys_handler() self.device_handler = handler - if hs.config.worker.worker_app is None: - # if main process - self.key_uploader = self.e2e_keys_handler.upload_keys_for_user - else: - # then a worker - self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs) - async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) @@ -549,18 +541,12 @@ class DehydratedDeviceV2Servlet(RestServlet): "Device key(s) not found, these must be provided.", ) - # TODO: Those two operations, creating a device and storing the - # device's keys should be atomic. device_id = await self.device_handler.store_dehydrated_device( requester.user.to_string(), submission.device_id, submission.device_data.dict(), submission.initial_device_display_name, - ) - - # TODO: Do we need to do something with the result here? - await self.key_uploader( - user_id=user_id, device_id=submission.device_id, keys=submission.dict() + device_info, ) return 200, {"device_id": device_id} diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index d9df437e51..e4162f846b 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -28,6 +28,7 @@ from typing import ( cast, ) +from canonicaljson import encode_canonical_json from typing_extensions import Literal from synapse.api.constants import EduTypes @@ -1188,8 +1189,42 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): ) def _store_dehydrated_device_txn( - self, txn: LoggingTransaction, user_id: str, device_id: str, device_data: str + self, + txn: LoggingTransaction, + user_id: str, + device_id: str, + device_data: str, + time: int, + keys: Optional[JsonDict] = None, ) -> Optional[str]: + # TODO: make keys non-optional once support for msc2697 is dropped + if keys: + device_keys = keys.get("device_keys", None) + if device_keys: + # Type ignore - this function is defined on EndToEndKeyStore which we do + # have access to due to hs.get_datastore() "magic" + self._set_e2e_device_keys_txn( # type: ignore[attr-defined] + txn, user_id, device_id, time, device_keys + ) + + one_time_keys = keys.get("one_time_keys", None) + if one_time_keys: + key_list = [] + for key_id, key_obj in one_time_keys.items(): + algorithm, key_id = key_id.split(":") + key_list.append( + ( + algorithm, + key_id, + encode_canonical_json(key_obj).decode("ascii"), + ) + ) + self._add_e2e_one_time_keys_txn(txn, user_id, device_id, time, key_list) + + fallback_keys = keys.get("fallback_keys", None) + if fallback_keys: + self._set_e2e_fallback_keys_txn(txn, user_id, device_id, fallback_keys) + old_device_id = self.db_pool.simple_select_one_onecol_txn( txn, table="dehydrated_devices", @@ -1203,10 +1238,16 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): keyvalues={"user_id": user_id}, values={"device_id": device_id, "device_data": device_data}, ) + return old_device_id async def store_dehydrated_device( - self, user_id: str, device_id: str, device_data: JsonDict + self, + user_id: str, + device_id: str, + device_data: JsonDict, + time_now: int, + keys: Optional[dict] = None, ) -> Optional[str]: """Store a dehydrated device for a user. @@ -1214,15 +1255,21 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): user_id: the user that we are storing the device for device_id: the ID of the dehydrated device device_data: the dehydrated device information + time_now: current time at the request in milliseconds + keys: keys for the dehydrated device + Returns: device id of the user's previous dehydrated device, if any """ + return await self.db_pool.runInteraction( "store_dehydrated_device_txn", self._store_dehydrated_device_txn, user_id, device_id, json_encoder.encode(device_data), + time_now, + keys, ) async def remove_dehydrated_device(self, user_id: str, device_id: str) -> bool: diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 91ae9c457d..b49dea577c 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -522,36 +522,57 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker new_keys: keys to add - each a tuple of (algorithm, key_id, key json) """ - def _add_e2e_one_time_keys(txn: LoggingTransaction) -> None: - set_tag("user_id", user_id) - set_tag("device_id", device_id) - set_tag("new_keys", str(new_keys)) - # We are protected from race between lookup and insertion due to - # a unique constraint. If there is a race of two calls to - # `add_e2e_one_time_keys` then they'll conflict and we will only - # insert one set. - self.db_pool.simple_insert_many_txn( - txn, - table="e2e_one_time_keys_json", - keys=( - "user_id", - "device_id", - "algorithm", - "key_id", - "ts_added_ms", - "key_json", - ), - values=[ - (user_id, device_id, algorithm, key_id, time_now, json_bytes) - for algorithm, key_id, json_bytes in new_keys - ], - ) - self._invalidate_cache_and_stream( - txn, self.count_e2e_one_time_keys, (user_id, device_id) - ) - await self.db_pool.runInteraction( - "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys + "add_e2e_one_time_keys_insert", + self._add_e2e_one_time_keys_txn, + user_id, + device_id, + time_now, + new_keys, + ) + + def _add_e2e_one_time_keys_txn( + self, + txn: LoggingTransaction, + user_id: str, + device_id: str, + time_now: int, + new_keys: Iterable[Tuple[str, str, str]], + ) -> None: + """Insert some new one time keys for a device. Errors if any of the keys already exist. + + Args: + user_id: id of user to get keys for + device_id: id of device to get keys for + time_now: insertion time to record (ms since epoch) + new_keys: keys to add - each a tuple of (algorithm, key_id, key json) - note + that the key JSON must be in canonical JSON form + """ + set_tag("user_id", user_id) + set_tag("device_id", device_id) + set_tag("new_keys", str(new_keys)) + # We are protected from race between lookup and insertion due to + # a unique constraint. If there is a race of two calls to + # `add_e2e_one_time_keys` then they'll conflict and we will only + # insert one set. + self.db_pool.simple_insert_many_txn( + txn, + table="e2e_one_time_keys_json", + keys=( + "user_id", + "device_id", + "algorithm", + "key_id", + "ts_added_ms", + "key_json", + ), + values=[ + (user_id, device_id, algorithm, key_id, time_now, json_bytes) + for algorithm, key_id, json_bytes in new_keys + ], + ) + self._invalidate_cache_and_stream( + txn, self.count_e2e_one_time_keys, (user_id, device_id) ) @cached(max_entries=10000) @@ -723,6 +744,14 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker device_id: str, fallback_keys: JsonDict, ) -> None: + """Set the user's e2e fallback keys. + + Args: + user_id: the user whose keys are being set + device_id: the device whose keys are being set + fallback_keys: the keys to set. This is a map from key ID (which is + of the form "algorithm:id") to key data. + """ # fallback_keys will usually only have one item in it, so using a for # loop (as opposed to calling simple_upsert_many_txn) won't be too bad # FIXME: make sure that only one key per algorithm is uploaded @@ -1304,42 +1333,69 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): ) -> bool: """Stores device keys for a device. Returns whether there was a change or the keys were already in the database. + + Args: + user_id: user_id of the user to store keys for + device_id: device_id of the device to store keys for + time_now: time at the request to store the keys + device_keys: the keys to store """ - def _set_e2e_device_keys_txn(txn: LoggingTransaction) -> bool: - set_tag("user_id", user_id) - set_tag("device_id", device_id) - set_tag("time_now", time_now) - set_tag("device_keys", str(device_keys)) + return await self.db_pool.runInteraction( + "set_e2e_device_keys", + self._set_e2e_device_keys_txn, + user_id, + device_id, + time_now, + device_keys, + ) - old_key_json = self.db_pool.simple_select_one_onecol_txn( - txn, - table="e2e_device_keys_json", - keyvalues={"user_id": user_id, "device_id": device_id}, - retcol="key_json", - allow_none=True, - ) + def _set_e2e_device_keys_txn( + self, + txn: LoggingTransaction, + user_id: str, + device_id: str, + time_now: int, + device_keys: JsonDict, + ) -> bool: + """Stores device keys for a device. Returns whether there was a change + or the keys were already in the database. - # In py3 we need old_key_json to match new_key_json type. The DB - # returns unicode while encode_canonical_json returns bytes. - new_key_json = encode_canonical_json(device_keys).decode("utf-8") + Args: + user_id: user_id of the user to store keys for + device_id: device_id of the device to store keys for + time_now: time at the request to store the keys + device_keys: the keys to store + """ + set_tag("user_id", user_id) + set_tag("device_id", device_id) + set_tag("time_now", time_now) + set_tag("device_keys", str(device_keys)) + + old_key_json = self.db_pool.simple_select_one_onecol_txn( + txn, + table="e2e_device_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + retcol="key_json", + allow_none=True, + ) - if old_key_json == new_key_json: - log_kv({"Message": "Device key already stored."}) - return False + # In py3 we need old_key_json to match new_key_json type. The DB + # returns unicode while encode_canonical_json returns bytes. + new_key_json = encode_canonical_json(device_keys).decode("utf-8") - self.db_pool.simple_upsert_txn( - txn, - table="e2e_device_keys_json", - keyvalues={"user_id": user_id, "device_id": device_id}, - values={"ts_added_ms": time_now, "key_json": new_key_json}, - ) - log_kv({"message": "Device keys stored."}) - return True + if old_key_json == new_key_json: + log_kv({"Message": "Device key already stored."}) + return False - return await self.db_pool.runInteraction( - "set_e2e_device_keys", _set_e2e_device_keys_txn + self.db_pool.simple_upsert_txn( + txn, + table="e2e_device_keys_json", + keyvalues={"user_id": user_id, "device_id": device_id}, + values={"ts_added_ms": time_now, "key_json": new_key_json}, ) + log_kv({"message": "Device keys stored."}) + return True async def delete_e2e_keys_by_device(self, user_id: str, device_id: str) -> None: def delete_e2e_keys_by_device_txn(txn: LoggingTransaction) -> None: diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 647ee09279..e1e58fa6e6 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -566,15 +566,16 @@ class DehydrationTestCase(unittest.HomeserverTestCase): self.assertEqual(len(res["events"]), 1) self.assertEqual(res["events"][0]["content"]["body"], "foo") - # Fetch the message of the dehydrated device again, which should return nothing - # and delete the old messages + # Fetch the message of the dehydrated device again, which should return + # the same message as it has not been deleted res = self.get_success( self.message_handler.get_events_for_dehydrated_device( requester=requester, device_id=stored_dehydrated_device_id, - since_token=res["next_batch"], + since_token=None, limit=10, ) ) self.assertTrue(len(res["next_batch"]) > 1) - self.assertEqual(len(res["events"]), 0) + self.assertEqual(len(res["events"]), 1) + self.assertEqual(res["events"][0]["content"]["body"], "foo") diff --git a/tests/rest/client/test_devices.py b/tests/rest/client/test_devices.py index 3cf29c10ea..60099f8c59 100644 --- a/tests/rest/client/test_devices.py +++ b/tests/rest/client/test_devices.py @@ -20,7 +20,7 @@ from synapse.api.errors import NotFoundError from synapse.rest import admin, devices, room, sync from synapse.rest.client import account, keys, login, register from synapse.server import HomeServer -from synapse.types import JsonDict, create_requester +from synapse.types import JsonDict, UserID, create_requester from synapse.util import Clock from tests import unittest @@ -282,6 +282,17 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase): "": {":": ""} }, }, + "fallback_keys": { + "alg1:device1": "f4llb4ckk3y", + "signed_:": { + "fallback": "true", + "key": "f4llb4ckk3y", + "signatures": { + "": {":": ""} + }, + }, + }, + "one_time_keys": {"alg1:k1": "0net1m3k3y"}, } channel = self.make_request( "PUT", @@ -312,6 +323,55 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase): } self.assertEqual(device_data, expected_device_data) + # test that the keys are correctly uploaded + channel = self.make_request( + "POST", + "/_matrix/client/r0/keys/query", + { + "device_keys": { + user: ["device1"], + }, + }, + token, + ) + self.assertEqual(channel.code, 200) + self.assertEqual( + channel.json_body["device_keys"][user][device_id]["keys"], + content["device_keys"]["keys"], + ) + # first claim should return the onetime key we uploaded + res = self.get_success( + self.hs.get_e2e_keys_handler().claim_one_time_keys( + {user: {device_id: {"alg1": 1}}}, + UserID.from_string(user), + timeout=None, + always_include_fallback_keys=False, + ) + ) + self.assertEqual( + res, + { + "failures": {}, + "one_time_keys": {user: {device_id: {"alg1:k1": "0net1m3k3y"}}}, + }, + ) + # second claim should return fallback key + res2 = self.get_success( + self.hs.get_e2e_keys_handler().claim_one_time_keys( + {user: {device_id: {"alg1": 1}}}, + UserID.from_string(user), + timeout=None, + always_include_fallback_keys=False, + ) + ) + self.assertEqual( + res2, + { + "failures": {}, + "one_time_keys": {user: {device_id: {"alg1:device1": "f4llb4ckk3y"}}}, + }, + ) + # create another device for the user ( new_device_id, @@ -348,10 +408,21 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200) expected_content = {"body": "test_message"} self.assertEqual(channel.json_body["events"][0]["content"], expected_content) + + # fetch messages again and make sure that the message was not deleted + channel = self.make_request( + "POST", + f"_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device/{device_id}/events", + content={}, + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + self.assertEqual(channel.json_body["events"][0]["content"], expected_content) next_batch_token = channel.json_body.get("next_batch") - # fetch messages again and make sure that the message was deleted and we are returned an - # empty array + # make sure fetching messages with next batch token works - there are no unfetched + # messages so we should receive an empty array content = {"next_batch": next_batch_token} channel = self.make_request( "POST", -- cgit 1.5.1 From d35bed8369514fe727b4fe1afb68f48cc8b2655a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 4 Sep 2023 17:14:09 +0100 Subject: Don't wake up destination transaction queue if they're not due for retry. (#16223) --- changelog.d/16223.feature | 1 + synapse/federation/send_queue.py | 12 +-- synapse/federation/sender/__init__.py | 86 +++++++++++++++------- synapse/federation/sender/per_destination_queue.py | 6 +- synapse/handlers/device.py | 26 +++---- synapse/handlers/devicemessage.py | 7 +- synapse/handlers/presence.py | 16 ++-- synapse/handlers/typing.py | 14 +++- synapse/module_api/__init__.py | 2 +- synapse/replication/tcp/client.py | 8 +- synapse/storage/databases/main/transactions.py | 26 ++++++- synapse/util/retryutils.py | 25 +++++++ tests/federation/test_federation_sender.py | 27 ++++--- tests/handlers/test_presence.py | 60 ++++++++++++--- tests/handlers/test_typing.py | 2 - 15 files changed, 228 insertions(+), 90 deletions(-) create mode 100644 changelog.d/16223.feature (limited to 'synapse/handlers/device.py') diff --git a/changelog.d/16223.feature b/changelog.d/16223.feature new file mode 100644 index 0000000000..a52d66658b --- /dev/null +++ b/changelog.d/16223.feature @@ -0,0 +1 @@ +Improve resource usage when sending data to a large number of remote hosts that are marked as "down". diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index fb448f2155..6520795635 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -49,7 +49,7 @@ from synapse.api.presence import UserPresenceState from synapse.federation.sender import AbstractFederationSender, FederationSender from synapse.metrics import LaterGauge from synapse.replication.tcp.streams.federation import FederationStream -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken +from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection from synapse.util.metrics import Measure from .units import Edu @@ -229,7 +229,7 @@ class FederationRemoteSendQueue(AbstractFederationSender): """ # nothing to do here: the replication listener will handle it. - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """As per FederationSender @@ -245,7 +245,9 @@ class FederationRemoteSendQueue(AbstractFederationSender): self.notifier.on_new_replication_data() - def send_device_messages(self, destination: str, immediate: bool = True) -> None: + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. @@ -463,7 +465,7 @@ class ParsedFederationStreamData: edus: Dict[str, List[Edu]] -def process_rows_for_federation( +async def process_rows_for_federation( transaction_queue: FederationSender, rows: List[FederationStream.FederationStreamRow], ) -> None: @@ -496,7 +498,7 @@ def process_rows_for_federation( parsed_row.add_to_buffer(buff) for state, destinations in buff.presence_destinations: - transaction_queue.send_presence_to_destinations( + await transaction_queue.send_presence_to_destinations( states=[state], destinations=destinations ) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 97abbdee18..fb20fd8a10 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -147,7 +147,10 @@ from twisted.internet import defer import synapse.metrics from synapse.api.presence import UserPresenceState from synapse.events import EventBase -from synapse.federation.sender.per_destination_queue import PerDestinationQueue +from synapse.federation.sender.per_destination_queue import ( + CATCHUP_RETRY_INTERVAL, + PerDestinationQueue, +) from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -161,9 +164,10 @@ from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken +from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection from synapse.util import Clock from synapse.util.metrics import Measure +from synapse.util.retryutils import filter_destinations_by_retry_limiter if TYPE_CHECKING: from synapse.events.presence_router import PresenceRouter @@ -213,7 +217,7 @@ class AbstractFederationSender(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """Send the given presence states to the given destinations. @@ -242,9 +246,11 @@ class AbstractFederationSender(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def send_device_messages(self, destination: str, immediate: bool = True) -> None: + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: """Tells the sender that a new device message is ready to be sent to the - destination. The `immediate` flag specifies whether the messages should + destinations. The `immediate` flag specifies whether the messages should be tried to be sent immediately, or whether it can be delayed for a short while (to aid performance). """ @@ -716,6 +722,13 @@ class FederationSender(AbstractFederationSender): pdu.internal_metadata.stream_ordering, ) + destinations = await filter_destinations_by_retry_limiter( + destinations, + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu) @@ -763,12 +776,20 @@ class FederationSender(AbstractFederationSender): domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) - domains = [ + domains: StrCollection = [ d for d in domains_set if not self.is_mine_server_name(d) and self._federation_shard_config.should_handle(self._instance_name, d) ] + + domains = await filter_destinations_by_retry_limiter( + domains, + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + if not domains: return @@ -816,7 +837,7 @@ class FederationSender(AbstractFederationSender): for queue in queues: queue.flush_read_receipts_for_room(room_id) - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """Send the given presence states to the given destinations. @@ -831,13 +852,20 @@ class FederationSender(AbstractFederationSender): for state in states: assert self.is_mine_id(state.user_id) + destinations = await filter_destinations_by_retry_limiter( + [ + d + for d in destinations + if self._federation_shard_config.should_handle(self._instance_name, d) + ], + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + for destination in destinations: if self.is_mine_server_name(destination): continue - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - continue self._get_per_destination_queue(destination).send_presence( states, start_loop=False @@ -896,21 +924,29 @@ class FederationSender(AbstractFederationSender): else: queue.send_edu(edu) - def send_device_messages(self, destination: str, immediate: bool = True) -> None: - if self.is_mine_server_name(destination): - logger.warning("Not sending device update to ourselves") - return - - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - return + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: + destinations = await filter_destinations_by_retry_limiter( + [ + destination + for destination in destinations + if self._federation_shard_config.should_handle( + self._instance_name, destination + ) + and not self.is_mine_server_name(destination) + ], + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) - if immediate: - self._get_per_destination_queue(destination).attempt_new_transaction() - else: - self._get_per_destination_queue(destination).mark_new_data() - self._destination_wakeup_queue.add_to_queue(destination) + for destination in destinations: + if immediate: + self._get_per_destination_queue(destination).attempt_new_transaction() + else: + self._get_per_destination_queue(destination).mark_new_data() + self._destination_wakeup_queue.add_to_queue(destination) def wake_destination(self, destination: str) -> None: """Called when we want to retry sending transactions to a remote. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 31c5c2b7de..9105ba664c 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -59,6 +59,10 @@ sent_edus_by_type = Counter( ) +# If the retry interval is larger than this then we enter "catchup" mode +CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000 + + class PerDestinationQueue: """ Manages the per-destination transmission queues. @@ -370,7 +374,7 @@ class PerDestinationQueue: ), ) - if e.retry_interval > 60 * 60 * 1000: + if e.retry_interval > CATCHUP_RETRY_INTERVAL: # we won't retry for another hour! # (this suggests a significant outage) # We drop pending EDUs because otherwise they will diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5ae427d52c..763f56dfc1 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -836,17 +836,16 @@ class DeviceHandler(DeviceWorkerHandler): user_id, hosts, ) - for host in hosts: - self.federation_sender.send_device_messages( - host, immediate=False - ) - # TODO: when called, this isn't in a logging context. - # This leads to log spam, sentry event spam, and massive - # memory usage. - # See https://github.com/matrix-org/synapse/issues/12552. - # log_kv( - # {"message": "sent device update to host", "host": host} - # ) + await self.federation_sender.send_device_messages( + hosts, immediate=False + ) + # TODO: when called, this isn't in a logging context. + # This leads to log spam, sentry event spam, and massive + # memory usage. + # See https://github.com/matrix-org/synapse/issues/12552. + # log_kv( + # {"message": "sent device update to host", "host": host} + # ) if current_stream_id != stream_id: # Clear the set of hosts we've already sent to as we're @@ -951,8 +950,9 @@ class DeviceHandler(DeviceWorkerHandler): # Notify things that device lists need to be sent out. self.notifier.notify_replication() - for host in potentially_changed_hosts: - self.federation_sender.send_device_messages(host, immediate=False) + await self.federation_sender.send_device_messages( + potentially_changed_hosts, immediate=False + ) def _update_device_from_client_ips( diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 798c7039f9..1c79f7a61e 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -302,10 +302,9 @@ class DeviceMessageHandler: ) if self.federation_sender: - for destination in remote_messages.keys(): - # Enqueue a new federation transaction to send the new - # device messages to each remote destination. - self.federation_sender.send_device_messages(destination) + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + await self.federation_sender.send_device_messages(remote_messages.keys()) async def get_events_for_dehydrated_device( self, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2f841863ae..f31e18328b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -354,7 +354,9 @@ class BasePresenceHandler(abc.ABC): ) for destination, host_states in hosts_to_states.items(): - self._federation.send_presence_to_destinations(host_states, [destination]) + await self._federation.send_presence_to_destinations( + host_states, [destination] + ) async def send_full_presence_to_users(self, user_ids: StrCollection) -> None: """ @@ -936,7 +938,7 @@ class PresenceHandler(BasePresenceHandler): ) for destination, states in hosts_to_states.items(): - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( states, [destination] ) @@ -1508,7 +1510,7 @@ class PresenceHandler(BasePresenceHandler): or state.status_msg is not None ] - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( destinations=newly_joined_remote_hosts, states=states, ) @@ -1519,7 +1521,7 @@ class PresenceHandler(BasePresenceHandler): prev_remote_hosts or newly_joined_remote_hosts ): local_states = await self.current_state_for_users(newly_joined_local_users) - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( destinations=prev_remote_hosts | newly_joined_remote_hosts, states=list(local_states.values()), ) @@ -2182,7 +2184,7 @@ class PresenceFederationQueue: index = bisect(self._queue, (clear_before,)) self._queue = self._queue[index:] - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Collection[UserPresenceState], destinations: StrCollection ) -> None: """Send the presence states to the given destinations. @@ -2202,7 +2204,7 @@ class PresenceFederationQueue: return if self._federation: - self._federation.send_presence_to_destinations( + await self._federation.send_presence_to_destinations( states=states, destinations=destinations, ) @@ -2325,7 +2327,7 @@ class PresenceFederationQueue: for host, user_ids in hosts_to_users.items(): states = await self._presence_handler.current_state_for_users(user_ids) - self._federation.send_presence_to_destinations( + await self._federation.send_presence_to_destinations( states=states.values(), destinations=[host], ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 7aeae5319c..4b4227003d 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -26,9 +26,10 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.replication.tcp.streams import TypingStream from synapse.streams import EventSource -from synapse.types import JsonDict, Requester, StreamKeyType, UserID +from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure +from synapse.util.retryutils import filter_destinations_by_retry_limiter from synapse.util.wheel_timer import WheelTimer if TYPE_CHECKING: @@ -150,8 +151,15 @@ class FollowerTypingHandler: now=now, obj=member, then=now + FEDERATION_PING_INTERVAL ) - hosts = await self._storage_controllers.state.get_current_hosts_in_room( - member.room_id + hosts: StrCollection = ( + await self._storage_controllers.state.get_current_hosts_in_room( + member.room_id + ) + ) + hosts = await filter_destinations_by_retry_limiter( + hosts, + clock=self.clock, + store=self.store, ) for domain in hosts: if not self.is_mine_server_name(domain): diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 9ad8e038ae..2f00a7ba20 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -1180,7 +1180,7 @@ class ModuleApi: # Send to remote destinations. destination = UserID.from_string(user).domain - presence_handler.get_federation_queue().send_presence_to_destinations( + await presence_handler.get_federation_queue().send_presence_to_destinations( presence_events, [destination] ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 3b88dc68ea..51285e6d33 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -422,7 +422,7 @@ class FederationSenderHandler: # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": - send_queue.process_rows_for_federation(self.federation_sender, rows) + await send_queue.process_rows_for_federation(self.federation_sender, rows) await self.update_token(token) # ... and when new receipts happen @@ -439,16 +439,14 @@ class FederationSenderHandler: for row in rows if not row.entity.startswith("@") and not row.is_signature } - for host in hosts: - self.federation_sender.send_device_messages(host, immediate=False) + await self.federation_sender.send_device_messages(hosts, immediate=False) elif stream_name == ToDeviceStream.NAME: # The to_device stream includes stuff to be pushed to both local # clients and remote servers, so we ignore entities that start with # '@' (since they'll be local users rather than destinations). hosts = {row.entity for row in rows if not row.entity.startswith("@")} - for host in hosts: - self.federation_sender.send_device_messages(host) + await self.federation_sender.send_device_messages(hosts) async def _on_new_receipts( self, rows: Iterable[ReceiptsStream.ReceiptsStreamRow] diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 860bbf7c0f..efd21b5bfc 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,7 +14,7 @@ import logging from enum import Enum -from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, cast import attr from canonicaljson import encode_canonical_json @@ -28,8 +28,8 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore -from synapse.types import JsonDict -from synapse.util.caches.descriptors import cached +from synapse.types import JsonDict, StrCollection +from synapse.util.caches.descriptors import cached, cachedList if TYPE_CHECKING: from synapse.server import HomeServer @@ -205,6 +205,26 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): else: return None + @cachedList( + cached_method_name="get_destination_retry_timings", list_name="destinations" + ) + async def get_destination_retry_timings_batch( + self, destinations: StrCollection + ) -> Dict[str, Optional[DestinationRetryTimings]]: + rows = await self.db_pool.simple_select_many_batch( + table="destinations", + iterable=destinations, + column="destination", + retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"), + desc="get_destination_retry_timings_batch", + ) + + return { + row.pop("destination"): DestinationRetryTimings(**row) + for row in rows + if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"] + } + async def set_destination_retry_timings( self, destination: str, diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 9d2065372c..0e1f907667 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Optional, Type from synapse.api.errors import CodeMessageException from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage import DataStore +from synapse.types import StrCollection from synapse.util import Clock if TYPE_CHECKING: @@ -116,6 +117,30 @@ async def get_retry_limiter( ) +async def filter_destinations_by_retry_limiter( + destinations: StrCollection, + clock: Clock, + store: DataStore, + retry_due_within_ms: int = 0, +) -> StrCollection: + """Filter down the list of destinations to only those that will are either + alive or due for a retry (within `retry_due_within_ms`) + """ + if not destinations: + return destinations + + retry_timings = await store.get_destination_retry_timings_batch(destinations) + + now = int(clock.time_msec()) + + return [ + destination + for destination, timings in retry_timings.items() + if timings is None + or timings.retry_last_ts + timings.retry_interval <= now + retry_due_within_ms + ] + + class RetryDestinationLimiter: def __init__( self, diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 7bd3d06859..caf04b54cb 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -75,7 +75,7 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): thread_id=None, data={"ts": 1234}, ) - self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) + self.get_success(sender.send_read_receipt(receipt)) self.pump() @@ -111,6 +111,9 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): # * The same room / user on multiple threads. # * A different user in the same room. sender = self.hs.get_federation_sender() + # Hack so that we have a txn in-flight so we batch up read receipts + # below + sender.wake_destination("host2") for user, thread in ( ("alice", None), ("alice", "thread"), @@ -125,9 +128,7 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): thread_id=thread, data={"ts": 1234}, ) - self.successResultOf( - defer.ensureDeferred(sender.send_read_receipt(receipt)) - ) + defer.ensureDeferred(sender.send_read_receipt(receipt)) self.pump() @@ -191,7 +192,7 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): thread_id=None, data={"ts": 1234}, ) - self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) + self.get_success(sender.send_read_receipt(receipt)) self.pump() @@ -342,7 +343,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): self.reactor.advance(1) # a second call should produce no new device EDUs - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) self.assertEqual(self.edus, []) # a second device @@ -550,7 +553,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): # recover the server mock_send_txn.side_effect = self.record_transaction - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) # We queue up device list updates to be sent over federation, so we # advance to clear the queue. @@ -601,7 +606,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): # recover the server mock_send_txn.side_effect = self.record_transaction - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) # We queue up device list updates to be sent over federation, so we # advance to clear the queue. @@ -656,7 +663,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): # recover the server mock_send_txn.side_effect = self.record_transaction - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) # We queue up device list updates to be sent over federation, so we # advance to clear the queue. diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index a987267308..88a16193a3 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -909,8 +909,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) now_token = self.queue.get_current_token(self.instance_name) @@ -946,11 +952,17 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) now_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) rows, upto_token, limited = self.get_success( self.queue.get_replication_rows("master", prev_token, now_token, 10) @@ -989,8 +1001,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) self.reactor.advance(10 * 60 * 1000) @@ -1005,8 +1023,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) now_token = self.queue.get_current_token(self.instance_name) @@ -1033,11 +1057,17 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) self.reactor.advance(2 * 60 * 1000) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) self.reactor.advance(4 * 60 * 1000) @@ -1053,8 +1083,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) now_token = self.queue.get_current_token(self.instance_name) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 43c513b157..95106ec8f3 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -120,8 +120,6 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.datastore = hs.get_datastores().main - self.datastore.get_destination_retry_timings = AsyncMock(return_value=None) - self.datastore.get_device_updates_by_remote = AsyncMock( # type: ignore[method-assign] return_value=(0, []) ) -- cgit 1.5.1 From 4f1840a88ad3a93244fc23149c56245704eab824 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 6 Sep 2023 09:30:53 +0200 Subject: Delete device messages asynchronously and in staged batches (#16240) --- changelog.d/16240.misc | 1 + synapse/handlers/device.py | 48 ++++++++++++++++++++++ synapse/handlers/presence.py | 4 +- synapse/handlers/sync.py | 16 ++++++-- synapse/storage/databases/main/deviceinbox.py | 26 +++++++++--- synapse/storage/databases/main/devices.py | 8 ---- synapse/storage/databases/main/receipts.py | 6 +-- synapse/storage/engines/_base.py | 6 +++ synapse/storage/engines/postgres.py | 4 ++ synapse/storage/engines/sqlite.py | 4 ++ .../schema/main/delta/48/group_unique_indexes.py | 4 +- synapse/util/task_scheduler.py | 17 ++++---- tests/handlers/test_device.py | 47 +++++++++++++++++++++ 13 files changed, 154 insertions(+), 37 deletions(-) create mode 100644 changelog.d/16240.misc (limited to 'synapse/handlers/device.py') diff --git a/changelog.d/16240.misc b/changelog.d/16240.misc new file mode 100644 index 0000000000..4f266c1fb0 --- /dev/null +++ b/changelog.d/16240.misc @@ -0,0 +1 @@ +Delete device messages asynchronously and in staged batches using the task scheduler. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 763f56dfc1..9e52af5f13 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -43,9 +43,12 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.types import ( JsonDict, + JsonMapping, + ScheduledTask, StrCollection, StreamKeyType, StreamToken, + TaskStatus, UserID, get_domain_from_id, get_verify_key_from_cross_signing_key, @@ -62,6 +65,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages" MAX_DEVICE_DISPLAY_NAME_LEN = 100 DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000 @@ -78,6 +82,7 @@ class DeviceWorkerHandler: self._appservice_handler = hs.get_application_service_handler() self._state_storage = hs.get_storage_controllers().state self._auth_handler = hs.get_auth_handler() + self._event_sources = hs.get_event_sources() self.server_name = hs.hostname self._msc3852_enabled = hs.config.experimental.msc3852_enabled self._query_appservices_for_keys = ( @@ -386,6 +391,7 @@ class DeviceHandler(DeviceWorkerHandler): self._account_data_handler = hs.get_account_data_handler() self._storage_controllers = hs.get_storage_controllers() self.db_pool = hs.get_datastores().main.db_pool + self._task_scheduler = hs.get_task_scheduler() self.device_list_updater = DeviceListUpdater(hs, self) @@ -419,6 +425,10 @@ class DeviceHandler(DeviceWorkerHandler): self._delete_stale_devices, ) + self._task_scheduler.register_action( + self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME + ) + def _check_device_name_length(self, name: Optional[str]) -> None: """ Checks whether a device name is longer than the maximum allowed length. @@ -530,6 +540,7 @@ class DeviceHandler(DeviceWorkerHandler): user_id: The user to delete devices from. device_ids: The list of device IDs to delete """ + to_device_stream_id = self._event_sources.get_current_token().to_device_key try: await self.store.delete_devices(user_id, device_ids) @@ -559,12 +570,49 @@ class DeviceHandler(DeviceWorkerHandler): f"org.matrix.msc3890.local_notification_settings.{device_id}", ) + # Delete device messages asynchronously and in batches using the task scheduler + await self._task_scheduler.schedule_task( + DELETE_DEVICE_MSGS_TASK_NAME, + resource_id=device_id, + params={ + "user_id": user_id, + "device_id": device_id, + "up_to_stream_id": to_device_stream_id, + }, + ) + # Pushers are deleted after `delete_access_tokens_for_user` is called so that # modules using `on_logged_out` hook can use them if needed. await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids) await self.notify_device_update(user_id, device_ids) + DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 + + async def _delete_device_messages( + self, + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`.""" + assert task.params is not None + user_id = task.params["user_id"] + device_id = task.params["device_id"] + up_to_stream_id = task.params["up_to_stream_id"] + + res = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=up_to_stream_id, + limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, + ) + + if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + return TaskStatus.COMPLETE, None, None + else: + # There is probably still device messages to be deleted, let's keep the task active and it will be run + # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). + return TaskStatus.ACTIVE, None, None + async def update_device(self, user_id: str, device_id: str, content: dict) -> None: """Update the given device diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a4b05b72e7..375c7d0901 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -183,6 +183,7 @@ class BasePresenceHandler(abc.ABC): writer""" def __init__(self, hs: "HomeServer"): + self.hs = hs self.clock = hs.get_clock() self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() @@ -473,8 +474,6 @@ class _NullContextManager(ContextManager[None]): class WorkerPresenceHandler(BasePresenceHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.hs = hs - self._presence_writer_instance = hs.config.worker.writers.presence[0] # Route presence EDUs to the right worker @@ -738,7 +737,6 @@ class WorkerPresenceHandler(BasePresenceHandler): class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.hs = hs self.wheel_timer: WheelTimer[str] = WheelTimer() self.notifier = hs.get_notifier() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 60a9f341b5..0ccd7d250c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -40,6 +40,7 @@ from synapse.api.filtering import FilterCollection from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase +from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME from synapse.handlers.relations import BundledAggregations from synapse.logging import issue9533_logger from synapse.logging.context import current_context @@ -268,6 +269,7 @@ class SyncHandler: self._storage_controllers = hs.get_storage_controllers() self._state_storage_controller = self._storage_controllers.state self._device_handler = hs.get_device_handler() + self._task_scheduler = hs.get_task_scheduler() self.should_calculate_push_rules = hs.config.push.enable_push @@ -360,11 +362,19 @@ class SyncHandler: # (since we now know that the device has received them) if since_token is not None: since_stream_id = since_token.to_device_key - deleted = await self.store.delete_messages_for_device( - sync_config.user.to_string(), sync_config.device_id, since_stream_id + # Delete device messages asynchronously and in batches using the task scheduler + await self._task_scheduler.schedule_task( + DELETE_DEVICE_MSGS_TASK_NAME, + resource_id=sync_config.device_id, + params={ + "user_id": sync_config.user.to_string(), + "device_id": sync_config.device_id, + "up_to_stream_id": since_stream_id, + }, ) logger.debug( - "Deleted %d to-device messages up to %d", deleted, since_stream_id + "Deletion of to-device messages up to %d scheduled", + since_stream_id, ) if timeout == 0 or since_token is None or full_state: diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 271cdf923c..744e98c6d0 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore): @trace async def delete_messages_for_device( - self, user_id: str, device_id: Optional[str], up_to_stream_id: int + self, + user_id: str, + device_id: Optional[str], + up_to_stream_id: int, + limit: int, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. + limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 + ROW_ID_NAME = self.database_engine.row_id_name + def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - sql = ( - "DELETE FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND stream_id <= ?" - ) + sql = f""" + DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN ( + SELECT {ROW_ID_NAME} FROM device_inbox + WHERE user_id = ? AND device_id = ? AND stream_id <= ? + LIMIT {limit} + ) + """ txn.execute(sql, (user_id, device_id, up_to_stream_id)) return txn.rowcount @@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": f"deleted {count} messages for device", "count": count}) + # In this case we don't know if we hit the limit or the delete is complete + # so let's not update the cache. + if count == limit: + return count + # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index fa69a4a298..7208fc8b33 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1766,14 +1766,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): keyvalues={"user_id": user_id, "hidden": False}, ) - self.db_pool.simple_delete_many_txn( - txn, - table="device_inbox", - column="device_id", - values=device_ids, - keyvalues={"user_id": user_id}, - ) - self.db_pool.simple_delete_many_txn( txn, table="device_auth_providers", diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 5ee5c7ad9f..e4d10ff250 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -939,11 +939,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): receipts.""" def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: - if isinstance(self.database_engine, PostgresEngine): - ROW_ID_NAME = "ctid" - else: - ROW_ID_NAME = "rowid" - + ROW_ID_NAME = self.database_engine.row_id_name # Identify any duplicate receipts arising from # https://github.com/matrix-org/synapse/issues/14406. # The following query takes less than a minute on matrix.org. diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 0b5b3bf03e..b1a2418cbd 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -100,6 +100,12 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM """Gets a string giving the server version. For example: '3.22.0'""" ... + @property + @abc.abstractmethod + def row_id_name(self) -> str: + """Gets the literal name representing a row id for this engine.""" + ... + @abc.abstractmethod def in_transaction(self, conn: ConnectionType) -> bool: """Whether the connection is currently in a transaction.""" diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 05a72dc554..6309363217 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -211,6 +211,10 @@ class PostgresEngine( else: return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100) + @property + def row_id_name(self) -> str: + return "ctid" + def in_transaction(self, conn: psycopg2.extensions.connection) -> bool: return conn.status != psycopg2.extensions.STATUS_READY diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index ca8c59297c..802069e1e1 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -123,6 +123,10 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]): """Gets a string giving the server version. For example: '3.22.0'.""" return "%i.%i.%i" % sqlite3.sqlite_version_info + @property + def row_id_name(self) -> str: + return "rowid" + def in_transaction(self, conn: sqlite3.Connection) -> bool: return conn.in_transaction diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py index ad2da4c8af..622686d28f 100644 --- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py +++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py @@ -14,7 +14,7 @@ from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.prepare_database import get_statements FIX_INDEXES = """ @@ -37,7 +37,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id); def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" + rowid = database_engine.row_id_name # remove duplicates from group_users & group_invites tables cur.execute( diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 9e89aeb748..9b2581e51a 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -77,6 +77,7 @@ class TaskScheduler: LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs def __init__(self, hs: "HomeServer"): + self._hs = hs self._store = hs.get_datastores().main self._clock = hs.get_clock() self._running_tasks: Set[str] = set() @@ -97,8 +98,6 @@ class TaskScheduler: "handle_scheduled_tasks", self._handle_scheduled_tasks, ) - else: - self.replication_client = hs.get_replication_command_handler() def register_action( self, @@ -133,7 +132,7 @@ class TaskScheduler: params: Optional[JsonMapping] = None, ) -> str: """Schedule a new potentially resumable task. A function matching the specified - `action` should have been previously registered with `register_action`. + `action` should have be registered with `register_action` before the task is run. Args: action: the name of a previously registered action @@ -149,11 +148,6 @@ class TaskScheduler: Returns: The id of the scheduled task """ - if action not in self._actions: - raise Exception( - f"No function associated with action {action} of the scheduled task" - ) - status = TaskStatus.SCHEDULED if timestamp is None or timestamp < self._clock.time_msec(): timestamp = self._clock.time_msec() @@ -175,7 +169,7 @@ class TaskScheduler: if self._run_background_tasks: await self._launch_task(task) else: - self.replication_client.send_new_active_task(task.id) + self._hs.get_replication_command_handler().send_new_active_task(task.id) return task.id @@ -315,7 +309,10 @@ class TaskScheduler: """ assert self._run_background_tasks - assert task.action in self._actions + if task.action not in self._actions: + raise Exception( + f"No function associated with action {task.action} of the scheduled task {task.id}" + ) function = self._actions[task.action] async def wrapper() -> None: diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 55a4f95ef3..9659a4a355 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -30,6 +30,7 @@ from synapse.server import HomeServer from synapse.storage.databases.main.appservice import _make_exclusive_regex from synapse.types import JsonDict, create_requester from synapse.util import Clock +from synapse.util.task_scheduler import TaskScheduler from tests import unittest from tests.unittest import override_config @@ -49,6 +50,7 @@ class DeviceTestCase(unittest.HomeserverTestCase): assert isinstance(handler, DeviceHandler) self.handler = handler self.store = hs.get_datastores().main + self.device_message_handler = hs.get_device_message_handler() return hs def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -211,6 +213,51 @@ class DeviceTestCase(unittest.HomeserverTestCase): ) self.assertIsNone(res) + def test_delete_device_and_big_device_inbox(self) -> None: + """Check that deleting a big device inbox is staged and batched asynchronously.""" + DEVICE_ID = "abc" + sender = "@sender:" + self.hs.hostname + receiver = "@receiver:" + self.hs.hostname + self._record_user(sender, DEVICE_ID, DEVICE_ID) + self._record_user(receiver, DEVICE_ID, DEVICE_ID) + + # queue a bunch of messages in the inbox + requester = create_requester(sender, device_id=DEVICE_ID) + for i in range(0, DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT + 10): + self.get_success( + self.device_message_handler.send_device_message( + requester, "message_type", {receiver: {"*": {"val": i}}} + ) + ) + + # delete the device + self.get_success(self.handler.delete_devices(receiver, [DEVICE_ID])) + + # messages should be deleted up to DEVICE_MSGS_DELETE_BATCH_LIMIT straight away + res = self.get_success( + self.store.db_pool.simple_select_list( + table="device_inbox", + keyvalues={"user_id": receiver}, + retcols=("user_id", "device_id", "stream_id"), + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(10, len(res)) + + # wait for the task scheduler to do a second delete pass + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + + # remaining messages should now be deleted + res = self.get_success( + self.store.db_pool.simple_select_list( + table="device_inbox", + keyvalues={"user_id": receiver}, + retcols=("user_id", "device_id", "stream_id"), + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(0, len(res)) + def test_update_device(self) -> None: self._record_users() -- cgit 1.5.1 From 1cd410a7833984ef69a7dcecf8997f4c45d609cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Sep 2023 13:45:43 +0100 Subject: Recheck if remote device is cached before requesting it (#16252) This fixes a bug where we could get stuck re-requesting the device over replication again and again. --- changelog.d/16252.bugfix | 1 + synapse/handlers/device.py | 21 +++++++++++++++------ synapse/replication/http/devices.py | 4 ++-- synapse/storage/databases/main/devices.py | 26 +++++++++++++++++--------- 4 files changed, 35 insertions(+), 17 deletions(-) create mode 100644 changelog.d/16252.bugfix (limited to 'synapse/handlers/device.py') diff --git a/changelog.d/16252.bugfix b/changelog.d/16252.bugfix new file mode 100644 index 0000000000..881bc00e61 --- /dev/null +++ b/changelog.d/16252.bugfix @@ -0,0 +1 @@ +Fix bug when using workers where Synapse could end up re-requesting the same remote device repeatedly. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9e52af5f13..9356ae998e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -1030,7 +1030,7 @@ class DeviceListWorkerUpdater: async def multi_user_device_resync( self, user_ids: List[str], mark_failed_as_stale: bool = True - ) -> Dict[str, Optional[JsonDict]]: + ) -> Dict[str, Optional[JsonMapping]]: """ Like `user_device_resync` but operates on multiple users **from the same origin** at once. @@ -1059,6 +1059,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): self._notifier = hs.get_notifier() self._remote_edu_linearizer = Linearizer(name="remote_device_list") + self._resync_linearizer = Linearizer(name="remote_device_resync") # user_id -> list of updates waiting to be handled. self._pending_updates: Dict[ @@ -1301,7 +1302,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): async def multi_user_device_resync( self, user_ids: List[str], mark_failed_as_stale: bool = True - ) -> Dict[str, Optional[JsonDict]]: + ) -> Dict[str, Optional[JsonMapping]]: """ Like `user_device_resync` but operates on multiple users **from the same origin** at once. @@ -1321,9 +1322,11 @@ class DeviceListUpdater(DeviceListWorkerUpdater): failed = set() # TODO(Perf): Actually batch these up for user_id in user_ids: - user_result, user_failed = await self._user_device_resync_returning_failed( - user_id - ) + async with self._resync_linearizer.queue(user_id): + ( + user_result, + user_failed, + ) = await self._user_device_resync_returning_failed(user_id) result[user_id] = user_result if user_failed: failed.add(user_id) @@ -1335,7 +1338,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): async def _user_device_resync_returning_failed( self, user_id: str - ) -> Tuple[Optional[JsonDict], bool]: + ) -> Tuple[Optional[JsonMapping], bool]: """Fetches all devices for a user and updates the device cache with them. Args: @@ -1348,6 +1351,12 @@ class DeviceListUpdater(DeviceListWorkerUpdater): e.g. due to a connection problem. - True iff the resync failed and the device list should be marked as stale. """ + # Check that we haven't gone and fetched the devices since we last + # checked if we needed to resync these device lists. + if await self.store.get_users_whose_devices_are_cached([user_id]): + cached = await self.store.get_cached_devices_for_user(user_id) + return cached, False + logger.debug("Attempting to resync the device list for %s", user_id) log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index 209833d287..b8198e059c 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -20,7 +20,7 @@ from twisted.web.server import Request from synapse.http.server import HttpServer from synapse.logging.opentracing import active_span from synapse.replication.http._base import ReplicationEndpoint -from synapse.types import JsonDict +from synapse.types import JsonDict, JsonMapping if TYPE_CHECKING: from synapse.server import HomeServer @@ -82,7 +82,7 @@ class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint): async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict - ) -> Tuple[int, Dict[str, Optional[JsonDict]]]: + ) -> Tuple[int, Dict[str, Optional[JsonMapping]]]: user_ids: List[str] = content["user_ids"] logger.info("Resync for %r", user_ids) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 324fdfa892..70faf4b1ec 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -759,18 +759,10 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): mapping of user_id -> device_id -> device_info. """ unique_user_ids = user_ids | {user_id for user_id, _ in user_and_device_ids} - user_map = await self.get_device_list_last_stream_id_for_remotes( - list(unique_user_ids) - ) - # We go and check if any of the users need to have their device lists - # resynced. If they do then we remove them from the cached list. - users_needing_resync = await self.get_user_ids_requiring_device_list_resync( + user_ids_in_cache = await self.get_users_whose_devices_are_cached( unique_user_ids ) - user_ids_in_cache = { - user_id for user_id, stream_id in user_map.items() if stream_id - } - users_needing_resync user_ids_not_in_cache = unique_user_ids - user_ids_in_cache # First fetch all the users which all devices are to be returned. @@ -792,6 +784,22 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): return user_ids_not_in_cache, results + async def get_users_whose_devices_are_cached( + self, user_ids: StrCollection + ) -> Set[str]: + """Checks which of the given users we have cached the devices for.""" + user_map = await self.get_device_list_last_stream_id_for_remotes(user_ids) + + # We go and check if any of the users need to have their device lists + # resynced. If they do then we remove them from the cached list. + users_needing_resync = await self.get_user_ids_requiring_device_list_resync( + user_ids + ) + user_ids_in_cache = { + user_id for user_id, stream_id in user_map.items() if stream_id + } - users_needing_resync + return user_ids_in_cache + @cached(num_args=2, tree=True) async def _get_cached_user_device(self, user_id: str, device_id: str) -> JsonDict: content = await self.db_pool.simple_select_one_onecol( -- cgit 1.5.1 From 151e4bbc45dbf7b767b1a6a74ffb4cd7889ccf78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 Sep 2023 13:11:02 +0100 Subject: Filter out down hosts when retrying fetching device lists (#16298) --- changelog.d/16298.misc | 1 + synapse/handlers/device.py | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 changelog.d/16298.misc (limited to 'synapse/handlers/device.py') diff --git a/changelog.d/16298.misc b/changelog.d/16298.misc new file mode 100644 index 0000000000..75b546d424 --- /dev/null +++ b/changelog.d/16298.misc @@ -0,0 +1 @@ +Don't try refetching device lists for users on remote hosts that are marked as "down". diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9356ae998e..9d240ad4ee 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -58,7 +58,10 @@ from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.cancellation import cancellable from synapse.util.metrics import measure_func -from synapse.util.retryutils import NotRetryingDestination +from synapse.util.retryutils import ( + NotRetryingDestination, + filter_destinations_by_retry_limiter, +) if TYPE_CHECKING: from synapse.server import HomeServer @@ -1269,8 +1272,18 @@ class DeviceListUpdater(DeviceListWorkerUpdater): self._resync_retry_in_progress = True # Get all of the users that need resyncing. need_resync = await self.store.get_user_ids_requiring_device_list_resync() + + # Filter out users whose host is marked as "down" up front. + hosts = await filter_destinations_by_retry_limiter( + {get_domain_from_id(u) for u in need_resync}, self.clock, self.store + ) + hosts = set(hosts) + # Iterate over the set of user IDs. for user_id in need_resync: + if get_domain_from_id(user_id) not in hosts: + continue + try: # Try to resync the current user's devices list. result = (await self.multi_user_device_resync([user_id], False))[ -- cgit 1.5.1 From be3c7b08a3e6888e60497a80ebd143bd4df9a719 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 11:54:16 +0100 Subject: Fix deleting device inbox when using background worker (#16311) Introduced in #16240 The action for the task was only defined on the "master" handler, rather than the base worker one. --- changelog.d/16311.misc | 1 + synapse/handlers/device.py | 62 +++++++++++++++++++++++----------------------- 2 files changed, 32 insertions(+), 31 deletions(-) create mode 100644 changelog.d/16311.misc (limited to 'synapse/handlers/device.py') diff --git a/changelog.d/16311.misc b/changelog.d/16311.misc new file mode 100644 index 0000000000..4f266c1fb0 --- /dev/null +++ b/changelog.d/16311.misc @@ -0,0 +1 @@ +Delete device messages asynchronously and in staged batches using the task scheduler. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9d240ad4ee..e2ae3da67e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -91,9 +91,14 @@ class DeviceWorkerHandler: self._query_appservices_for_keys = ( hs.config.experimental.msc3984_appservice_key_query ) + self._task_scheduler = hs.get_task_scheduler() self.device_list_updater = DeviceListWorkerUpdater(hs) + self._task_scheduler.register_action( + self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME + ) + @trace async def get_devices_by_user(self, user_id: str) -> List[JsonDict]: """ @@ -383,6 +388,32 @@ class DeviceWorkerHandler: "Trying handling device list state for partial join: not supported on workers." ) + DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 + + async def _delete_device_messages( + self, + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`.""" + assert task.params is not None + user_id = task.params["user_id"] + device_id = task.params["device_id"] + up_to_stream_id = task.params["up_to_stream_id"] + + res = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=up_to_stream_id, + limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, + ) + + if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + return TaskStatus.COMPLETE, None, None + else: + # There is probably still device messages to be deleted, let's keep the task active and it will be run + # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). + return TaskStatus.ACTIVE, None, None + class DeviceHandler(DeviceWorkerHandler): device_list_updater: "DeviceListUpdater" @@ -394,7 +425,6 @@ class DeviceHandler(DeviceWorkerHandler): self._account_data_handler = hs.get_account_data_handler() self._storage_controllers = hs.get_storage_controllers() self.db_pool = hs.get_datastores().main.db_pool - self._task_scheduler = hs.get_task_scheduler() self.device_list_updater = DeviceListUpdater(hs, self) @@ -428,10 +458,6 @@ class DeviceHandler(DeviceWorkerHandler): self._delete_stale_devices, ) - self._task_scheduler.register_action( - self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME - ) - def _check_device_name_length(self, name: Optional[str]) -> None: """ Checks whether a device name is longer than the maximum allowed length. @@ -590,32 +616,6 @@ class DeviceHandler(DeviceWorkerHandler): await self.notify_device_update(user_id, device_ids) - DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 - - async def _delete_device_messages( - self, - task: ScheduledTask, - ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: - """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`.""" - assert task.params is not None - user_id = task.params["user_id"] - device_id = task.params["device_id"] - up_to_stream_id = task.params["up_to_stream_id"] - - res = await self.store.delete_messages_for_device( - user_id=user_id, - device_id=device_id, - up_to_stream_id=up_to_stream_id, - limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, - ) - - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: - return TaskStatus.COMPLETE, None, None - else: - # There is probably still device messages to be deleted, let's keep the task active and it will be run - # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). - return TaskStatus.ACTIVE, None, None - async def update_device(self, user_id: str, device_id: str, content: dict) -> None: """Update the given device -- cgit 1.5.1 From 7afb5e041004bab8b0aaf7909ce3c7a9ef80077f Mon Sep 17 00:00:00 2001 From: Hanadi Date: Wed, 13 Sep 2023 14:33:39 +0200 Subject: Fix using dehydrated devices (MSC2697) & refresh tokens (#16288) Refresh tokens were not correctly moved to the rehydrated device (similar to how the access token is currently handled). This resulted in invalid refresh tokens after rehydration. --- changelog.d/16288.bugfix | 1 + synapse/handlers/device.py | 7 ++++--- synapse/storage/databases/main/registration.py | 20 ++++++++++++++++++++ tests/handlers/test_device.py | 10 +++++++++- 4 files changed, 34 insertions(+), 4 deletions(-) create mode 100644 changelog.d/16288.bugfix (limited to 'synapse/handlers/device.py') diff --git a/changelog.d/16288.bugfix b/changelog.d/16288.bugfix new file mode 100644 index 0000000000..f08d10d1f3 --- /dev/null +++ b/changelog.d/16288.bugfix @@ -0,0 +1 @@ +Fix bug introduced in Synapse 1.49.0 when using dehydrated devices ([MSC2697](https://github.com/matrix-org/matrix-spec-proposals/pull/2697)) and refresh tokens. Contributed by Hanadi. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index e2ae3da67e..0d3d5ebc86 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -758,12 +758,13 @@ class DeviceHandler(DeviceWorkerHandler): # If the dehydrated device was successfully deleted (the device ID # matched the stored dehydrated device), then modify the access - # token to use the dehydrated device's ID and copy the old device - # display name to the dehydrated device, and destroy the old device - # ID + # token and refresh token to use the dehydrated device's ID and + # copy the old device display name to the dehydrated device, + # and destroy the old device ID old_device_id = await self.store.set_device_for_access_token( access_token, device_id ) + await self.store.set_device_for_refresh_token(user_id, old_device_id, device_id) old_device = await self.store.get_device(user_id, old_device_id) if old_device is None: raise errors.NotFoundError() diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 7e85b73e8e..e34156dc55 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -2312,6 +2312,26 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): return next_id + async def set_device_for_refresh_token( + self, user_id: str, old_device_id: str, device_id: str + ) -> None: + """Moves refresh tokens from old device to current device + + Args: + user_id: The user of the devices. + old_device_id: The old device. + device_id: The new device ID. + Returns: + None + """ + + await self.db_pool.simple_update( + "refresh_tokens", + keyvalues={"user_id": user_id, "device_id": old_device_id}, + updatevalues={"device_id": device_id}, + desc="set_device_for_refresh_token", + ) + def _set_device_for_access_token_txn( self, txn: LoggingTransaction, token: str, device_id: str ) -> str: diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 79d327499b..d4ed068357 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -461,6 +461,7 @@ class DehydrationTestCase(unittest.HomeserverTestCase): self.message_handler = hs.get_device_message_handler() self.registration = hs.get_registration_handler() self.auth = hs.get_auth() + self.auth_handler = hs.get_auth_handler() self.store = hs.get_datastores().main return hs @@ -487,11 +488,12 @@ class DehydrationTestCase(unittest.HomeserverTestCase): self.assertEqual(device_data, {"device_data": {"foo": "bar"}}) # Create a new login for the user and dehydrated the device - device_id, access_token, _expiration_time, _refresh_token = self.get_success( + device_id, access_token, _expiration_time, refresh_token = self.get_success( self.registration.register_device( user_id=user_id, device_id=None, initial_display_name="new device", + should_issue_refresh_token=True, ) ) @@ -522,6 +524,12 @@ class DehydrationTestCase(unittest.HomeserverTestCase): self.assertEqual(user_info.device_id, retrieved_device_id) + # make sure the user device has the refresh token + assert refresh_token is not None + self.get_success( + self.auth_handler.refresh_token(refresh_token, 5 * 60 * 1000, 5 * 60 * 1000) + ) + # make sure the device has the display name that was set from the login res = self.get_success(self.handler.get_device(user_id, retrieved_device_id)) -- cgit 1.5.1 From e9e2904eb2c0b73eb4154faf41bd360e6168cc92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Sep 2023 14:56:07 +0100 Subject: Speed up deleting to-device messages task (#16318) --- changelog.d/16318.misc | 1 + synapse/handlers/device.py | 27 ++++++++++++++------------- 2 files changed, 15 insertions(+), 13 deletions(-) create mode 100644 changelog.d/16318.misc (limited to 'synapse/handlers/device.py') diff --git a/changelog.d/16318.misc b/changelog.d/16318.misc new file mode 100644 index 0000000000..1433a2f246 --- /dev/null +++ b/changelog.d/16318.misc @@ -0,0 +1 @@ +Speed up task to delete to-device messages. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 0d3d5ebc86..86ad96d030 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -388,7 +388,8 @@ class DeviceWorkerHandler: "Trying handling device list state for partial join: not supported on workers." ) - DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 + DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000 + DEVICE_MSGS_DELETE_SLEEP_MS = 1000 async def _delete_device_messages( self, @@ -400,19 +401,19 @@ class DeviceWorkerHandler: device_id = task.params["device_id"] up_to_stream_id = task.params["up_to_stream_id"] - res = await self.store.delete_messages_for_device( - user_id=user_id, - device_id=device_id, - up_to_stream_id=up_to_stream_id, - limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, - ) + # Delete the messages in batches to avoid too much DB load. + while True: + res = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=up_to_stream_id, + limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, + ) - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: - return TaskStatus.COMPLETE, None, None - else: - # There is probably still device messages to be deleted, let's keep the task active and it will be run - # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). - return TaskStatus.ACTIVE, None, None + if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + return TaskStatus.COMPLETE, None, None + + await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) class DeviceHandler(DeviceWorkerHandler): -- cgit 1.5.1