diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 9fced4b997..5d65faed16 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -102,44 +102,34 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
) -> int:
# Get all the rooms that we want to process.
def _make_staging_area(txn: LoggingTransaction) -> None:
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
- )
- txn.execute(sql)
-
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_position(position TEXT NOT NULL)"
- )
- txn.execute(sql)
-
- # Get rooms we want to process from the database
- sql = """
- SELECT room_id, count(*) FROM current_state_events
+ sql = f"""
+ CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_rooms AS
+ SELECT room_id, count(*) AS events
+ FROM current_state_events
GROUP BY room_id
"""
txn.execute(sql)
- rooms = list(txn.fetchall())
- self.db_pool.simple_insert_many_txn(
- txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms
+ txn.execute(
+ f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_rm ON {TEMP_TABLE}_rooms (room_id)"
)
- del rooms
-
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_users(user_id TEXT NOT NULL)"
+ txn.execute(
+ f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_evs ON {TEMP_TABLE}_rooms (events)"
)
- txn.execute(sql)
- txn.execute("SELECT name FROM users")
- users = list(txn.fetchall())
+ sql = f"""
+ CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_position (
+ position TEXT NOT NULL
+ )
+ """
+ txn.execute(sql)
- self.db_pool.simple_insert_many_txn(
- txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
+ sql = f"""
+ CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_users AS
+ SELECT name AS user_id FROM users
+ """
+ txn.execute(sql)
+ txn.execute(
+ f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_users_idx ON {TEMP_TABLE}_users (user_id)"
)
new_pos = await self.get_max_stream_id_in_current_state_deltas()
@@ -222,12 +212,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
if not rooms_to_work_on:
return None
- # Get how many are left to process, so we can give status on how
- # far we are in processing
- txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
- result = txn.fetchone()
- assert result is not None
- progress["remaining"] = result[0]
+ if "remaining" not in progress:
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+ result = txn.fetchone()
+ assert result is not None
+ progress["remaining"] = result[0]
return rooms_to_work_on
@@ -332,7 +323,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
- return processed_event_count
+ break
+
+ await self.db_pool.runInteraction(
+ "populate_user_directory",
+ self.db_pool.updates._background_update_progress_txn,
+ "populate_user_directory_process_rooms",
+ progress,
+ )
return processed_event_count
@@ -356,13 +354,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
users_to_work_on = [x[0] for x in user_result]
- # Get how many are left to process, so we can give status on how
- # far we are in processing
- sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
- txn.execute(sql)
- count_result = txn.fetchone()
- assert count_result is not None
- progress["remaining"] = count_result[0]
+ if "remaining" not in progress:
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
+ txn.execute(sql)
+ count_result = txn.fetchone()
+ assert count_result is not None
+ progress["remaining"] = count_result[0]
return users_to_work_on
|