summary refs log tree commit diff
path: root/synapse/storage/databases/main/deviceinbox.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py209
1 files changed, 63 insertions, 146 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 7c0f953365..ab8766c75b 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -599,6 +599,7 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
     DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
     REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox"
     REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox"
+    REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
 
     def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
@@ -614,14 +615,18 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
             self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
         )
 
-        self.db_pool.updates.register_background_update_handler(
-            self.REMOVE_DELETED_DEVICES,
-            self._remove_deleted_devices_from_device_inbox,
+        # Used to be a background update that deletes all device_inboxes for deleted
+        # devices.
+        self.db_pool.updates.register_noop_background_update(
+            self.REMOVE_DELETED_DEVICES
         )
+        # Used to be a background update that deletes all device_inboxes for hidden
+        # devices.
+        self.db_pool.updates.register_noop_background_update(self.REMOVE_HIDDEN_DEVICES)
 
         self.db_pool.updates.register_background_update_handler(
-            self.REMOVE_HIDDEN_DEVICES,
-            self._remove_hidden_devices_from_device_inbox,
+            self.REMOVE_DEAD_DEVICES_FROM_INBOX,
+            self._remove_dead_devices_from_device_inbox,
         )
 
     async def _background_drop_index_device_inbox(self, progress, batch_size):
@@ -636,171 +641,83 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
 
         return 1
 
-    async def _remove_deleted_devices_from_device_inbox(
-        self, progress: JsonDict, batch_size: int
+    async def _remove_dead_devices_from_device_inbox(
+        self,
+        progress: JsonDict,
+        batch_size: int,
     ) -> int:
-        """A background update that deletes all device_inboxes for deleted devices.
-
-        This should only need to be run once (when users upgrade to v1.47.0)
+        """A background update to remove devices that were either deleted or hidden from
+        the device_inbox table.
 
         Args:
-            progress: JsonDict used to store progress of this background update
-            batch_size: the maximum number of rows to retrieve in a single select query
+            progress: The update's progress dict.
+            batch_size: The batch size for this update.
 
         Returns:
-            The number of deleted rows
+            The number of rows deleted.
         """
 
-        def _remove_deleted_devices_from_device_inbox_txn(
+        def _remove_dead_devices_from_device_inbox_txn(
             txn: LoggingTransaction,
-        ) -> int:
-            """stream_id is not unique
-            we need to use an inclusive `stream_id >= ?` clause,
-            since we might not have deleted all dead device messages for the stream_id
-            returned from the previous query
+        ) -> Tuple[int, bool]:
 
-            Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
-            to avoid problems of deleting a large number of rows all at once
-            due to a single device having lots of device messages.
-            """
+            if "max_stream_id" in progress:
+                max_stream_id = progress["max_stream_id"]
+            else:
+                txn.execute("SELECT max(stream_id) FROM device_inbox")
+                # There's a type mismatch here between how we want to type the row and
+                # what fetchone says it returns, but we silence it because we know that
+                # res can't be None.
+                res: Tuple[Optional[int]] = txn.fetchone()  # type: ignore[assignment]
+                if res[0] is None:
+                    # this can only happen if the `device_inbox` table is empty, in which
+                    # case we have no work to do.
+                    return 0, True
+                else:
+                    max_stream_id = res[0]
 
-            last_stream_id = progress.get("stream_id", 0)
+            start = progress.get("stream_id", 0)
+            stop = start + batch_size
 
+            # delete rows in `device_inbox` which do *not* correspond to a known,
+            # unhidden device.
             sql = """
-                SELECT device_id, user_id, stream_id
-                FROM device_inbox
+                DELETE FROM device_inbox
                 WHERE
-                    stream_id >= ?
-                    AND (device_id, user_id) NOT IN (
-                        SELECT device_id, user_id FROM devices
+                    stream_id >= ? AND stream_id < ?
+                    AND NOT EXISTS (
+                        SELECT * FROM devices d
+                        WHERE
+                            d.device_id=device_inbox.device_id
+                            AND d.user_id=device_inbox.user_id
+                            AND NOT hidden
                     )
-                ORDER BY stream_id
-                LIMIT ?
-            """
-
-            txn.execute(sql, (last_stream_id, batch_size))
-            rows = txn.fetchall()
+                """
 
-            num_deleted = 0
-            for row in rows:
-                num_deleted += self.db_pool.simple_delete_txn(
-                    txn,
-                    "device_inbox",
-                    {"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
-                )
+            txn.execute(sql, (start, stop))
 
-            if rows:
-                # send more than stream_id to progress
-                # otherwise it can happen in large deployments that
-                # no change of status is visible in the log file
-                # it may be that the stream_id does not change in several runs
-                self.db_pool.updates._background_update_progress_txn(
-                    txn,
-                    self.REMOVE_DELETED_DEVICES,
-                    {
-                        "device_id": rows[-1][0],
-                        "user_id": rows[-1][1],
-                        "stream_id": rows[-1][2],
-                    },
-                )
-
-            return num_deleted
-
-        number_deleted = await self.db_pool.runInteraction(
-            "_remove_deleted_devices_from_device_inbox",
-            _remove_deleted_devices_from_device_inbox_txn,
-        )
-
-        # The task is finished when no more lines are deleted.
-        if not number_deleted:
-            await self.db_pool.updates._end_background_update(
-                self.REMOVE_DELETED_DEVICES
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                self.REMOVE_DEAD_DEVICES_FROM_INBOX,
+                {
+                    "stream_id": stop,
+                    "max_stream_id": max_stream_id,
+                },
             )
 
-        return number_deleted
-
-    async def _remove_hidden_devices_from_device_inbox(
-        self, progress: JsonDict, batch_size: int
-    ) -> int:
-        """A background update that deletes all device_inboxes for hidden devices.
-
-        This should only need to be run once (when users upgrade to v1.47.0)
-
-        Args:
-            progress: JsonDict used to store progress of this background update
-            batch_size: the maximum number of rows to retrieve in a single select query
-
-        Returns:
-            The number of deleted rows
-        """
-
-        def _remove_hidden_devices_from_device_inbox_txn(
-            txn: LoggingTransaction,
-        ) -> int:
-            """stream_id is not unique
-            we need to use an inclusive `stream_id >= ?` clause,
-            since we might not have deleted all hidden device messages for the stream_id
-            returned from the previous query
-
-            Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
-            to avoid problems of deleting a large number of rows all at once
-            due to a single device having lots of device messages.
-            """
-
-            last_stream_id = progress.get("stream_id", 0)
-
-            sql = """
-                SELECT device_id, user_id, stream_id
-                FROM device_inbox
-                WHERE
-                    stream_id >= ?
-                    AND (device_id, user_id) IN (
-                        SELECT device_id, user_id FROM devices WHERE hidden = ?
-                    )
-                ORDER BY stream_id
-                LIMIT ?
-            """
-
-            txn.execute(sql, (last_stream_id, True, batch_size))
-            rows = txn.fetchall()
-
-            num_deleted = 0
-            for row in rows:
-                num_deleted += self.db_pool.simple_delete_txn(
-                    txn,
-                    "device_inbox",
-                    {"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
-                )
-
-            if rows:
-                # We don't just save the `stream_id` in progress as
-                # otherwise it can happen in large deployments that
-                # no change of status is visible in the log file, as
-                # it may be that the stream_id does not change in several runs
-                self.db_pool.updates._background_update_progress_txn(
-                    txn,
-                    self.REMOVE_HIDDEN_DEVICES,
-                    {
-                        "device_id": rows[-1][0],
-                        "user_id": rows[-1][1],
-                        "stream_id": rows[-1][2],
-                    },
-                )
-
-            return num_deleted
+            return stop > max_stream_id
 
-        number_deleted = await self.db_pool.runInteraction(
-            "_remove_hidden_devices_from_device_inbox",
-            _remove_hidden_devices_from_device_inbox_txn,
+        finished = await self.db_pool.runInteraction(
+            "_remove_devices_from_device_inbox_txn",
+            _remove_dead_devices_from_device_inbox_txn,
         )
 
-        # The task is finished when no more lines are deleted.
-        if not number_deleted:
+        if finished:
             await self.db_pool.updates._end_background_update(
-                self.REMOVE_HIDDEN_DEVICES
+                self.REMOVE_DEAD_DEVICES_FROM_INBOX,
             )
 
-        return number_deleted
+        return batch_size
 
 
 class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):