summary refs log tree commit diff
path: root/synapse/storage/databases/main/events_bg_updates.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2021-06-30 15:01:24 +0100
committerGitHub <noreply@github.com>2021-06-30 15:01:24 +0100
commit859dc05b3692a3672c1a0db8deaaa9274b6aa6f5 (patch)
treed4c9c7690a7a1da20f831af5bfbffeef6e621f55 /synapse/storage/databases/main/events_bg_updates.py
parentMerge branch 'master' into develop (diff)
downloadsynapse-859dc05b3692a3672c1a0db8deaaa9274b6aa6f5.tar.xz
Rebuild other indexes using `stream_ordering` (#10282)
We need to rebuild *all* of the indexes that use the current `stream_ordering`
column.
Diffstat (limited to 'synapse/storage/databases/main/events_bg_updates.py')
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py50
1 files changed, 47 insertions, 3 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index da3a7df27b..1c95c66648 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -29,13 +29,18 @@ from synapse.types import JsonDict
 logger = logging.getLogger(__name__)
 
 
-_REPLACE_STREAM_ORDRING_SQL_COMMANDS = (
+_REPLACE_STREAM_ORDERING_SQL_COMMANDS = (
     # there should be no leftover rows without a stream_ordering2, but just in case...
     "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL",
-    # finally, we can drop the rule and switch the columns
+    # now we can drop the rule and switch the columns
     "DROP RULE populate_stream_ordering2 ON events",
     "ALTER TABLE events DROP COLUMN stream_ordering",
     "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering",
+    # ... and finally, rename the indexes into place for consistency with sqlite
+    "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index",
+    "ALTER INDEX events_order_room2 RENAME TO events_order_room",
+    "ALTER INDEX events_room_stream2 RENAME TO events_room_stream",
+    "ALTER INDEX events_ts2 RENAME TO events_ts",
 )
 
 
@@ -45,6 +50,10 @@ class _BackgroundUpdates:
     DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
     POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2"
     INDEX_STREAM_ORDERING2 = "index_stream_ordering2"
+    INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url"
+    INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order"
+    INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream"
+    INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
     REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
 
 
@@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             self._purged_chain_cover_index,
         )
 
+        ################################################################################
+
         # bg updates for replacing stream_ordering with a BIGINT
         # (these only run on postgres.)
+
         self.db_pool.updates.register_background_update_handler(
             _BackgroundUpdates.POPULATE_STREAM_ORDERING2,
             self._background_populate_stream_ordering2,
         )
+        # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2);
         self.db_pool.updates.register_background_index_update(
             _BackgroundUpdates.INDEX_STREAM_ORDERING2,
             index_name="events_stream_ordering",
@@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             columns=["stream_ordering2"],
             unique=True,
         )
+        # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false;
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL,
+            index_name="event_contains_url_index2",
+            table="events",
+            columns=["room_id", "topological_ordering", "stream_ordering2"],
+            where_clause="contains_url = true AND outlier = false",
+        )
+        # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER,
+            index_name="events_order_room2",
+            table="events",
+            columns=["room_id", "topological_ordering", "stream_ordering2"],
+        )
+        # CREATE INDEX events_room_stream ON events(room_id, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM,
+            index_name="events_room_stream2",
+            table="events",
+            columns=["room_id", "stream_ordering2"],
+        )
+        # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering);
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.INDEX_STREAM_ORDERING2_TS,
+            index_name="events_ts2",
+            table="events",
+            columns=["origin_server_ts", "stream_ordering2"],
+        )
         self.db_pool.updates.register_background_update_handler(
             _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN,
             self._background_replace_stream_ordering_column,
         )
 
+        ################################################################################
+
     async def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
@@ -1098,7 +1142,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place."""
 
         def process(txn: Cursor) -> None:
-            for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS:
+            for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS:
                 logger.info("completing stream_ordering migration: %s", sql)
                 txn.execute(sql)