summary refs log tree commit diff
path: root/synapse/storage/databases/main/deviceinbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py76
1 files changed, 76 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 25023b5e7a..07333efff8 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -58,6 +58,7 @@ from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.stringutils import parse_and_validate_server_name
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -964,6 +965,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
     DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
     REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
+    CLEANUP_DEVICE_FEDERATION_OUTBOX = "cleanup_device_federation_outbox"
 
     def __init__(
         self,
@@ -989,6 +991,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
             self._remove_dead_devices_from_device_inbox,
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
+            self._cleanup_device_federation_outbox,
+        )
+
     async def _background_drop_index_device_inbox(
         self, progress: JsonDict, batch_size: int
     ) -> int:
@@ -1080,6 +1087,75 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
 
         return batch_size
 
+    async def _cleanup_device_federation_outbox(
+        self,
+        progress: JsonDict,
+        batch_size: int,
+    ) -> int:
+        def _cleanup_device_federation_outbox_txn(
+            txn: LoggingTransaction,
+        ) -> bool:
+            if "max_stream_id" in progress:
+                max_stream_id = progress["max_stream_id"]
+            else:
+                txn.execute("SELECT max(stream_id) FROM device_federation_outbox")
+                res = cast(Tuple[Optional[int]], txn.fetchone())
+                if res[0] is None:
+                    # this can only happen if the `device_inbox` table is empty, in which
+                    # case we have no work to do.
+                    return True
+                else:
+                    max_stream_id = res[0]
+
+            start = progress.get("stream_id", 0)
+            stop = start + batch_size
+
+            sql = """
+                SELECT destination FROM device_federation_outbox
+                WHERE ? < stream_id AND stream_id <= ?
+            """
+
+            txn.execute(sql, (start, stop))
+
+            destinations = {d for d, in txn}
+            to_remove = set()
+            for d in destinations:
+                try:
+                    parse_and_validate_server_name(d)
+                except ValueError:
+                    to_remove.add(d)
+
+            self.db_pool.simple_delete_many_txn(
+                txn,
+                table="device_federation_outbox",
+                column="destination",
+                values=to_remove,
+                keyvalues={},
+            )
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
+                {
+                    "stream_id": stop,
+                    "max_stream_id": max_stream_id,
+                },
+            )
+
+            return stop >= max_stream_id
+
+        finished = await self.db_pool.runInteraction(
+            "_cleanup_device_federation_outbox",
+            _cleanup_device_federation_outbox_txn,
+        )
+
+        if finished:
+            await self.db_pool.updates._end_background_update(
+                self.CLEANUP_DEVICE_FEDERATION_OUTBOX,
+            )
+
+        return batch_size
+
 
 class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
     pass