diff options
author | Dirk Klimpel <5740567+dklimpel@users.noreply.github.com> | 2021-11-02 14:18:30 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-02 13:18:30 +0000 |
commit | 4535532526581834ab798996ffe73f6d19c25123 (patch) | |
tree | d9851bcf1f77d2e10509b803960685f6b7b3a1ab /synapse/storage | |
parent | Fix providing a `RoomStreamToken` instance to `_notify_app_services_ephemeral... (diff) | |
download | synapse-4535532526581834ab798996ffe73f6d19c25123.tar.xz |
Delete messages for hidden devices from `device_inbox` (#11199)
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/deviceinbox.py | 89 | ||||
-rw-r--r-- | synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql | 22 |
2 files changed, 111 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 25e9c1efe1..264e625bd7 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -561,6 +561,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox" + REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox" def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"): super().__init__(database, db_conn, hs) @@ -581,6 +582,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): self._remove_deleted_devices_from_device_inbox, ) + self.db_pool.updates.register_background_update_handler( + self.REMOVE_HIDDEN_DEVICES, + self._remove_hidden_devices_from_device_inbox, + ) + async def _background_drop_index_device_inbox(self, progress, batch_size): def reindex_txn(conn): txn = conn.cursor() @@ -676,6 +682,89 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): return number_deleted + async def _remove_hidden_devices_from_device_inbox( + self, progress: JsonDict, batch_size: int + ) -> int: + """A background update that deletes all device_inboxes for hidden devices. + + This should only need to be run once (when users upgrade to v1.47.0) + + Args: + progress: JsonDict used to store progress of this background update + batch_size: the maximum number of rows to retrieve in a single select query + + Returns: + The number of deleted rows + """ + + def _remove_hidden_devices_from_device_inbox_txn( + txn: LoggingTransaction, + ) -> int: + """stream_id is not unique + we need to use an inclusive `stream_id >= ?` clause, + since we might not have deleted all hidden device messages for the stream_id + returned from the previous query + + Then delete only rows matching the `(user_id, device_id, stream_id)` tuple, + to avoid problems of deleting a large number of rows all at once + due to a single device having lots of device messages. + """ + + last_stream_id = progress.get("stream_id", 0) + + sql = """ + SELECT device_id, user_id, stream_id + FROM device_inbox + WHERE + stream_id >= ? + AND (device_id, user_id) IN ( + SELECT device_id, user_id FROM devices WHERE hidden = ? + ) + ORDER BY stream_id + LIMIT ? + """ + + txn.execute(sql, (last_stream_id, True, batch_size)) + rows = txn.fetchall() + + num_deleted = 0 + for row in rows: + num_deleted += self.db_pool.simple_delete_txn( + txn, + "device_inbox", + {"device_id": row[0], "user_id": row[1], "stream_id": row[2]}, + ) + + if rows: + # We don't just save the `stream_id` in progress as + # otherwise it can happen in large deployments that + # no change of status is visible in the log file, as + # it may be that the stream_id does not change in several runs + self.db_pool.updates._background_update_progress_txn( + txn, + self.REMOVE_HIDDEN_DEVICES, + { + "device_id": rows[-1][0], + "user_id": rows[-1][1], + "stream_id": rows[-1][2], + }, + ) + + return num_deleted + + number_deleted = await self.db_pool.runInteraction( + "_remove_hidden_devices_from_device_inbox", + _remove_hidden_devices_from_device_inbox_txn, + ) + + # The task is finished when no more lines are deleted. + if not number_deleted: + await self.db_pool.updates._end_background_update( + self.REMOVE_HIDDEN_DEVICES + ) + + return number_deleted + class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore): pass diff --git a/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql new file mode 100644 index 0000000000..7b3592dcf0 --- /dev/null +++ b/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql @@ -0,0 +1,22 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- Remove messages from the device_inbox table which were orphaned +-- because a device was hidden using Synapse earlier than 1.47.0. +-- This runs as background task, but may take a bit to finish. + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (6503, 'remove_hidden_devices_from_device_inbox', '{}'); |