diff --git a/changelog.d/17242.misc b/changelog.d/17242.misc
new file mode 100644
index 0000000000..5bd627da57
--- /dev/null
+++ b/changelog.d/17242.misc
@@ -0,0 +1 @@
+Clean out invalid destinations from `device_federation_outbox` table.
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 25023b5e7a..07333efff8 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -58,6 +58,7 @@ from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.stringutils import parse_and_validate_server_name
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -964,6 +965,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
+ CLEANUP_DEVICE_FEDERATION_OUTBOX = "cleanup_device_federation_outbox"
def __init__(
self,
@@ -989,6 +991,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
self._remove_dead_devices_from_device_inbox,
)
+ self.db_pool.updates.register_background_update_handler(
+ self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
+ self._cleanup_device_federation_outbox,
+ )
+
async def _background_drop_index_device_inbox(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -1080,6 +1087,75 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
return batch_size
+ async def _cleanup_device_federation_outbox(
+ self,
+ progress: JsonDict,
+ batch_size: int,
+ ) -> int:
+ def _cleanup_device_federation_outbox_txn(
+ txn: LoggingTransaction,
+ ) -> bool:
+ if "max_stream_id" in progress:
+ max_stream_id = progress["max_stream_id"]
+ else:
+ txn.execute("SELECT max(stream_id) FROM device_federation_outbox")
+ res = cast(Tuple[Optional[int]], txn.fetchone())
+ if res[0] is None:
+ # this can only happen if the `device_inbox` table is empty, in which
+ # case we have no work to do.
+ return True
+ else:
+ max_stream_id = res[0]
+
+ start = progress.get("stream_id", 0)
+ stop = start + batch_size
+
+ sql = """
+ SELECT destination FROM device_federation_outbox
+ WHERE ? < stream_id AND stream_id <= ?
+ """
+
+ txn.execute(sql, (start, stop))
+
+ destinations = {d for d, in txn}
+ to_remove = set()
+ for d in destinations:
+ try:
+ parse_and_validate_server_name(d)
+ except ValueError:
+ to_remove.add(d)
+
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table="device_federation_outbox",
+ column="destination",
+ values=to_remove,
+ keyvalues={},
+ )
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
+ {
+ "stream_id": stop,
+ "max_stream_id": max_stream_id,
+ },
+ )
+
+ return stop >= max_stream_id
+
+ finished = await self.db_pool.runInteraction(
+ "_cleanup_device_federation_outbox",
+ _cleanup_device_federation_outbox_txn,
+ )
+
+ if finished:
+ await self.db_pool.updates._end_background_update(
+ self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
+ )
+
+ return batch_size
+
class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
pass
diff --git a/synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql b/synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql
new file mode 100644
index 0000000000..041b17b0ee
--- /dev/null
+++ b/synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql
@@ -0,0 +1,15 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (8504, 'cleanup_device_federation_outbox', '{}');
|