summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-30 11:25:24 +0100
committerGitHub <noreply@github.com>2024-05-30 11:25:24 +0100
commit225f378ffa4893fdba8eeb4a22bff7daade180bd (patch)
treeb8a6ac39008cc9a887409a5876bdad296af6a275 /synapse/storage
parentEnsure we delete media if we reject due to spam check (#17246) (diff)
downloadsynapse-225f378ffa4893fdba8eeb4a22bff7daade180bd.tar.xz
Clean out invalid destinations from outbox (#17242)
We started ensuring we only insert valid destinations:
https://github.com/element-hq/synapse/pull/17240
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py76
-rw-r--r--synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql15
2 files changed, 91 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 25023b5e7a..07333efff8 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -58,6 +58,7 @@ from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.stringutils import parse_and_validate_server_name
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -964,6 +965,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
     DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
     REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
+    CLEANUP_DEVICE_FEDERATION_OUTBOX = "cleanup_device_federation_outbox"
 
     def __init__(
         self,
@@ -989,6 +991,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
             self._remove_dead_devices_from_device_inbox,
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
+            self._cleanup_device_federation_outbox,
+        )
+
     async def _background_drop_index_device_inbox(
         self, progress: JsonDict, batch_size: int
     ) -> int:
@@ -1080,6 +1087,75 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
 
         return batch_size
 
+    async def _cleanup_device_federation_outbox(
+        self,
+        progress: JsonDict,
+        batch_size: int,
+    ) -> int:
+        def _cleanup_device_federation_outbox_txn(
+            txn: LoggingTransaction,
+        ) -> bool:
+            if "max_stream_id" in progress:
+                max_stream_id = progress["max_stream_id"]
+            else:
+                txn.execute("SELECT max(stream_id) FROM device_federation_outbox")
+                res = cast(Tuple[Optional[int]], txn.fetchone())
+                if res[0] is None:
+                    # this can only happen if the `device_inbox` table is empty, in which
+                    # case we have no work to do.
+                    return True
+                else:
+                    max_stream_id = res[0]
+
+            start = progress.get("stream_id", 0)
+            stop = start + batch_size
+
+            sql = """
+                SELECT destination FROM device_federation_outbox
+                WHERE ? < stream_id AND stream_id <= ?
+            """
+
+            txn.execute(sql, (start, stop))
+
+            destinations = {d for d, in txn}
+            to_remove = set()
+            for d in destinations:
+                try:
+                    parse_and_validate_server_name(d)
+                except ValueError:
+                    to_remove.add(d)
+
+            self.db_pool.simple_delete_many_txn(
+                txn,
+                table="device_federation_outbox",
+                column="destination",
+                values=to_remove,
+                keyvalues={},
+            )
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
+                {
+                    "stream_id": stop,
+                    "max_stream_id": max_stream_id,
+                },
+            )
+
+            return stop >= max_stream_id
+
+        finished = await self.db_pool.runInteraction(
+            "_cleanup_device_federation_outbox",
+            _cleanup_device_federation_outbox_txn,
+        )
+
+        if finished:
+            await self.db_pool.updates._end_background_update(
+                self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
+            )
+
+        return batch_size
+
 
 class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
     pass
diff --git a/synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql b/synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql
new file mode 100644
index 0000000000..041b17b0ee
--- /dev/null
+++ b/synapse/storage/schema/main/delta/85/04_cleanup_device_federation_outbox.sql
@@ -0,0 +1,15 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (8504, 'cleanup_device_federation_outbox', '{}');