summary refs log tree commit diff
path: root/synapse/storage/data_stores/main/devices.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/data_stores/main/devices.py')
-rw-r--r--synapse/storage/data_stores/main/devices.py60
1 files changed, 42 insertions, 18 deletions
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py

index ee3a2ab031..536cef3abd 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py
@@ -55,6 +55,10 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes" +BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX = ( + "drop_device_lists_outbound_last_success_non_unique_idx" +) + class DeviceWorkerStore(SQLBaseStore): def get_device(self, user_id, device_id): @@ -342,32 +346,23 @@ class DeviceWorkerStore(SQLBaseStore): def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): # We update the device_lists_outbound_last_success with the successfully - # poked users. We do the join to see which users need to be inserted and - # which updated. + # poked users. sql = """ - SELECT user_id, coalesce(max(o.stream_id), 0), (max(s.stream_id) IS NOT NULL) + SELECT user_id, coalesce(max(o.stream_id), 0) FROM device_lists_outbound_pokes as o - LEFT JOIN device_lists_outbound_last_success as s - USING (destination, user_id) WHERE destination = ? AND o.stream_id <= ? GROUP BY user_id """ txn.execute(sql, (destination, stream_id)) rows = txn.fetchall() - sql = """ - UPDATE device_lists_outbound_last_success - SET stream_id = ? - WHERE destination = ? AND user_id = ? - """ - txn.executemany(sql, ((row[1], destination, row[0]) for row in rows if row[2])) - - sql = """ - INSERT INTO device_lists_outbound_last_success - (destination, user_id, stream_id) VALUES (?, ?, ?) - """ - txn.executemany( - sql, ((destination, row[0], row[1]) for row in rows if not row[2]) + self.db.simple_upsert_many_txn( + txn=txn, + table="device_lists_outbound_last_success", + key_names=("destination", "user_id"), + key_values=((destination, user_id) for user_id, _ in rows), + value_names=("stream_id",), + value_values=((stream_id,) for _, stream_id in rows), ) # Delete all sent outbound pokes @@ -725,6 +720,21 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes, ) + # create a unique index on device_lists_outbound_last_success + self.db.updates.register_background_index_update( + "device_lists_outbound_last_success_unique_idx", + index_name="device_lists_outbound_last_success_unique_idx", + table="device_lists_outbound_last_success", + columns=["destination", "user_id"], + unique=True, + ) + + # once that completes, we can remove the old non-unique index. + self.db.updates.register_background_update_handler( + BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX, + self._drop_device_lists_outbound_last_success_non_unique_idx, + ) + @defer.inlineCallbacks def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): def f(conn): @@ -799,6 +809,20 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): return rows + async def _drop_device_lists_outbound_last_success_non_unique_idx( + self, progress, batch_size + ): + def f(txn): + txn.execute("DROP INDEX IF EXISTS device_lists_outbound_last_success_idx") + + await self.db.runInteraction( + "drop_device_lists_outbound_last_success_non_unique_idx", f, + ) + await self.db.updates._end_background_update( + BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX + ) + return 1 + class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): def __init__(self, database: Database, db_conn, hs):