From 225f378ffa4893fdba8eeb4a22bff7daade180bd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 30 May 2024 11:25:24 +0100 Subject: Clean out invalid destinations from outbox (#17242) We started ensuring we only insert valid destinations: https://github.com/element-hq/synapse/pull/17240 --- changelog.d/17242.misc | 1 + synapse/storage/databases/main/deviceinbox.py | 76 ++++++++++++++++++++++ .../85/04_cleanup_device_federation_outbox.sql | 15 +++++ 3 files changed, 92 insertions(+) create mode 100644 changelog.d/17242.misc create mode 100644 synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql diff --git a/changelog.d/17242.misc b/changelog.d/17242.misc new file mode 100644 index 0000000000..5bd627da57 --- /dev/null +++ b/changelog.d/17242.misc @@ -0,0 +1 @@ +Clean out invalid destinations from `device_federation_outbox` table. diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 25023b5e7a..07333efff8 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -58,6 +58,7 @@ from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.stream_change_cache import StreamChangeCache +from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: from synapse.server import HomeServer @@ -964,6 +965,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox" + CLEANUP_DEVICE_FEDERATION_OUTBOX = "cleanup_device_federation_outbox" def __init__( self, @@ -989,6 +991,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): self._remove_dead_devices_from_device_inbox, ) + self.db_pool.updates.register_background_update_handler( + self.CLEANUP_DEVICE_FEDERATION_OUTBOX, + self._cleanup_device_federation_outbox, + ) + async def _background_drop_index_device_inbox( self, progress: JsonDict, batch_size: int ) -> int: @@ -1080,6 +1087,75 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): return batch_size + async def _cleanup_device_federation_outbox( + self, + progress: JsonDict, + batch_size: int, + ) -> int: + def _cleanup_device_federation_outbox_txn( + txn: LoggingTransaction, + ) -> bool: + if "max_stream_id" in progress: + max_stream_id = progress["max_stream_id"] + else: + txn.execute("SELECT max(stream_id) FROM device_federation_outbox") + res = cast(Tuple[Optional[int]], txn.fetchone()) + if res[0] is None: + # this can only happen if the `device_inbox` table is empty, in which + # case we have no work to do. + return True + else: + max_stream_id = res[0] + + start = progress.get("stream_id", 0) + stop = start + batch_size + + sql = """ + SELECT destination FROM device_federation_outbox + WHERE ? < stream_id AND stream_id <= ? + """ + + txn.execute(sql, (start, stop)) + + destinations = {d for d, in txn} + to_remove = set() + for d in destinations: + try: + parse_and_validate_server_name(d) + except ValueError: + to_remove.add(d) + + self.db_pool.simple_delete_many_txn( + txn, + table="device_federation_outbox", + column="destination", + values=to_remove, + keyvalues={}, + ) + + self.db_pool.updates._background_update_progress_txn( + txn, + self.CLEANUP_DEVICE_FEDERATION_OUTBOX, + { + "stream_id": stop, + "max_stream_id": max_stream_id, + }, + ) + + return stop >= max_stream_id + + finished = await self.db_pool.runInteraction( + "_cleanup_device_federation_outbox", + _cleanup_device_federation_outbox_txn, + ) + + if finished: + await self.db_pool.updates._end_background_update( + self.CLEANUP_DEVICE_FEDERATION_OUTBOX, + ) + + return batch_size + class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): pass diff --git a/synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql b/synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql new file mode 100644 index 0000000000..041b17b0ee --- /dev/null +++ b/synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql @@ -0,0 +1,15 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2024 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8504, 'cleanup_device_federation_outbox', '{}'); -- cgit 1.4.1