summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-04-14 16:10:32 +0100
committerGitHub <noreply@github.com>2023-04-14 16:10:32 +0100
commitb5192355f6ac11eec4781d73a59b14cfc8732d1f (patch)
tree3c4f2e0d40e254dbb8de2d187b2e3e1d49559012 /synapse/storage
parentDelete pushers after calling on_logged_out module hook on device delete (#15410) (diff)
downloadsynapse-b5192355f6ac11eec4781d73a59b14cfc8732d1f.tar.xz
User directory background update speedup (#15435)
c.f. #15264

The two changes are:
1. Add indexes so that the select / deletes don't do sequential scans
2. Don't repeatedly call `SELECT count(*)` each iteration, as that's slow
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/user_directory.py89
1 files changed, 44 insertions, 45 deletions
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 9fced4b997..5d65faed16 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -102,44 +102,34 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
     ) -> int:
         # Get all the rooms that we want to process.
         def _make_staging_area(txn: LoggingTransaction) -> None:
-            sql = (
-                "CREATE TABLE IF NOT EXISTS "
-                + TEMP_TABLE
-                + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)"
-            )
-            txn.execute(sql)
-
-            sql = (
-                "CREATE TABLE IF NOT EXISTS "
-                + TEMP_TABLE
-                + "_position(position TEXT NOT NULL)"
-            )
-            txn.execute(sql)
-
-            # Get rooms we want to process from the database
-            sql = """
-                SELECT room_id, count(*) FROM current_state_events
+            sql = f"""
+                CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_rooms AS
+                SELECT room_id, count(*) AS events
+                FROM current_state_events
                 GROUP BY room_id
             """
             txn.execute(sql)
-            rooms = list(txn.fetchall())
-            self.db_pool.simple_insert_many_txn(
-                txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms
+            txn.execute(
+                f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_rm ON {TEMP_TABLE}_rooms (room_id)"
             )
-            del rooms
-
-            sql = (
-                "CREATE TABLE IF NOT EXISTS "
-                + TEMP_TABLE
-                + "_users(user_id TEXT NOT NULL)"
+            txn.execute(
+                f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_rooms_evs ON {TEMP_TABLE}_rooms (events)"
             )
-            txn.execute(sql)
 
-            txn.execute("SELECT name FROM users")
-            users = list(txn.fetchall())
+            sql = f"""
+                CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_position (
+                    position TEXT NOT NULL
+                )
+            """
+            txn.execute(sql)
 
-            self.db_pool.simple_insert_many_txn(
-                txn, TEMP_TABLE + "_users", keys=("user_id",), values=users
+            sql = f"""
+                CREATE TABLE IF NOT EXISTS {TEMP_TABLE}_users AS
+                SELECT name AS user_id FROM users
+            """
+            txn.execute(sql)
+            txn.execute(
+                f"CREATE INDEX IF NOT EXISTS {TEMP_TABLE}_users_idx ON {TEMP_TABLE}_users (user_id)"
             )
 
         new_pos = await self.get_max_stream_id_in_current_state_deltas()
@@ -222,12 +212,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             if not rooms_to_work_on:
                 return None
 
-            # Get how many are left to process, so we can give status on how
-            # far we are in processing
-            txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
-            result = txn.fetchone()
-            assert result is not None
-            progress["remaining"] = result[0]
+            if "remaining" not in progress:
+                # Get how many are left to process, so we can give status on how
+                # far we are in processing
+                txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+                result = txn.fetchone()
+                assert result is not None
+                progress["remaining"] = result[0]
 
             return rooms_to_work_on
 
@@ -332,7 +323,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
 
             if processed_event_count > batch_size:
                 # Don't process any more rooms, we've hit our batch size.
-                return processed_event_count
+                break
+
+        await self.db_pool.runInteraction(
+            "populate_user_directory",
+            self.db_pool.updates._background_update_progress_txn,
+            "populate_user_directory_process_rooms",
+            progress,
+        )
 
         return processed_event_count
 
@@ -356,13 +354,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
 
             users_to_work_on = [x[0] for x in user_result]
 
-            # Get how many are left to process, so we can give status on how
-            # far we are in processing
-            sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
-            txn.execute(sql)
-            count_result = txn.fetchone()
-            assert count_result is not None
-            progress["remaining"] = count_result[0]
+            if "remaining" not in progress:
+                # Get how many are left to process, so we can give status on how
+                # far we are in processing
+                sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users"
+                txn.execute(sql)
+                count_result = txn.fetchone()
+                assert count_result is not None
+                progress["remaining"] = count_result[0]
 
             return users_to_work_on