diff options
author | Erik Johnston <erik@matrix.org> | 2023-04-14 16:10:32 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-04-14 16:10:32 +0100 |
commit | b5192355f6ac11eec4781d73a59b14cfc8732d1f (patch) | |
tree | 3c4f2e0d40e254dbb8de2d187b2e3e1d49559012 /synapse/storage/databases/main | |
parent | Delete pushers after calling on_logged_out module hook on device delete (#15410) (diff) | |
download | synapse-b5192355f6ac11eec4781d73a59b14cfc8732d1f.tar.xz |
User directory background update speedup (#15435)
c.f. #15264 The two changes are: 1. Add indexes so that the select / deletes don't do sequential scans 2. Don't repeatedly call `SELECT count(*)` each iteration, as that's slow
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r-- | synapse/storage/databases/main/user_directory.py | 89 |
1 files changed, 44 insertions, 45 deletions
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 9fced4b997..5d65faed16 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -102,44 +102,34 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) -> int: # Get all the rooms that we want to process. def _make_staging_area(txn: LoggingTransaction) -> None: - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)" - ) - txn.execute(sql) - - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_position(position TEXT NOT NULL)" - ) - txn.execute(sql) - - # Get rooms we want to process from the database - sql = """ - SELECT room_id, count(*) FROM current_state_events + sql = f""" + CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_rooms AS + SELECT room_id, count(*) AS events + FROM current_state_events GROUP BY room_id """ txn.execute(sql) - rooms = list(txn.fetchall()) - self.db_pool.simple_insert_many_txn( - txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms + txn.execute( + f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_rm ON {TEMP_TABLE}_rooms (room_id)" ) - del rooms - - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_users(user_id TEXT NOT NULL)" + txn.execute( + f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_evs ON {TEMP_TABLE}_rooms (events)" ) - txn.execute(sql) - txn.execute("SELECT name FROM users") - users = list(txn.fetchall()) + sql = f""" + CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_position ( + position TEXT NOT NULL + ) + """ + txn.execute(sql) - self.db_pool.simple_insert_many_txn( - txn, TEMP_TABLE + "_users", keys=("user_id",), values=users + sql = f""" + CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_users AS + SELECT name AS user_id FROM users + """ + txn.execute(sql) + txn.execute( + f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_users_idx ON {TEMP_TABLE}_users (user_id)" ) new_pos = await self.get_max_stream_id_in_current_state_deltas() @@ -222,12 +212,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): if not rooms_to_work_on: return None - # Get how many are left to process, so we can give status on how - # far we are in processing - txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") - result = txn.fetchone() - assert result is not None - progress["remaining"] = result[0] + if "remaining" not in progress: + # Get how many are left to process, so we can give status on how + # far we are in processing + txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") + result = txn.fetchone() + assert result is not None + progress["remaining"] = result[0] return rooms_to_work_on @@ -332,7 +323,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): if processed_event_count > batch_size: # Don't process any more rooms, we've hit our batch size. - return processed_event_count + break + + await self.db_pool.runInteraction( + "populate_user_directory", + self.db_pool.updates._background_update_progress_txn, + "populate_user_directory_process_rooms", + progress, + ) return processed_event_count @@ -356,13 +354,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): users_to_work_on = [x[0] for x in user_result] - # Get how many are left to process, so we can give status on how - # far we are in processing - sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users" - txn.execute(sql) - count_result = txn.fetchone() - assert count_result is not None - progress["remaining"] = count_result[0] + if "remaining" not in progress: + # Get how many are left to process, so we can give status on how + # far we are in processing + sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users" + txn.execute(sql) + count_result = txn.fetchone() + assert count_result is not None + progress["remaining"] = count_result[0] return users_to_work_on |