summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/data_stores/main/devices.py34
-rw-r--r--synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql28
-rw-r--r--synapse/storage/data_stores/main/schema/delta/58/06dlols_unique_idx.py80
-rw-r--r--synapse/storage/database.py1
-rw-r--r--synapse/storage/prepare_database.py13
5 files changed, 94 insertions, 62 deletions
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index 417ac8dc7c..fb9f798e29 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -55,10 +55,6 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
 
 BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
 
-BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX = (
-    "drop_device_lists_outbound_last_success_non_unique_idx"
-)
-
 
 class DeviceWorkerStore(SQLBaseStore):
     def get_device(self, user_id, device_id):
@@ -749,19 +745,13 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
             BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes,
         )
 
-        # create a unique index on device_lists_outbound_last_success
-        self.db.updates.register_background_index_update(
+        # a pair of background updates that were added during the 1.14 release cycle,
+        # but replaced with 58/06dlols_unique_idx.py
+        self.db.updates.register_noop_background_update(
             "device_lists_outbound_last_success_unique_idx",
-            index_name="device_lists_outbound_last_success_unique_idx",
-            table="device_lists_outbound_last_success",
-            columns=["destination", "user_id"],
-            unique=True,
         )
-
-        # once that completes, we can remove the old non-unique index.
-        self.db.updates.register_background_update_handler(
-            BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX,
-            self._drop_device_lists_outbound_last_success_non_unique_idx,
+        self.db.updates.register_noop_background_update(
+            "drop_device_lists_outbound_last_success_non_unique_idx",
         )
 
     @defer.inlineCallbacks
@@ -838,20 +828,6 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
 
         return rows
 
-    async def _drop_device_lists_outbound_last_success_non_unique_idx(
-        self, progress, batch_size
-    ):
-        def f(txn):
-            txn.execute("DROP INDEX IF EXISTS device_lists_outbound_last_success_idx")
-
-        await self.db.runInteraction(
-            "drop_device_lists_outbound_last_success_non_unique_idx", f,
-        )
-        await self.db.updates._end_background_update(
-            BG_UPDATE_DROP_DEVICE_LISTS_OUTBOUND_LAST_SUCCESS_NON_UNIQUE_IDX
-        )
-        return 1
-
 
 class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
     def __init__(self, database: Database, db_conn, hs):
diff --git a/synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql b/synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql
deleted file mode 100644
index d5e6deb878..0000000000
--- a/synapse/storage/data_stores/main/schema/delta/58/04device_lists_outbound_last_success_unique_idx.sql
+++ /dev/null
@@ -1,28 +0,0 @@
-/* Copyright 2020 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- register a background update which will create a unique index on
--- device_lists_outbound_last_success
-INSERT into background_updates (ordering, update_name, progress_json)
-    VALUES (5804, 'device_lists_outbound_last_success_unique_idx', '{}');
-
--- once that completes, we can drop the old index.
-INSERT into background_updates (ordering, update_name, progress_json, depends_on)
-    VALUES (
-        5804,
-        'drop_device_lists_outbound_last_success_non_unique_idx',
-        '{}',
-        'device_lists_outbound_last_success_unique_idx'
-    );
diff --git a/synapse/storage/data_stores/main/schema/delta/58/06dlols_unique_idx.py b/synapse/storage/data_stores/main/schema/delta/58/06dlols_unique_idx.py
new file mode 100644
index 0000000000..d353f2bcb3
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/06dlols_unique_idx.py
@@ -0,0 +1,80 @@
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This migration rebuilds the device_lists_outbound_last_success table without duplicate
+entries, and with a UNIQUE index.
+"""
+
+import logging
+from io import StringIO
+
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+from synapse.storage.prepare_database import execute_statements_from_stream
+from synapse.storage.types import Cursor
+
+logger = logging.getLogger(__name__)
+
+
+def run_upgrade(*args, **kwargs):
+    pass
+
+
+def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+    # some instances might already have this index, in which case we can skip this
+    if isinstance(database_engine, PostgresEngine):
+        cur.execute(
+            """
+            SELECT 1 FROM pg_class WHERE relkind = 'i'
+            AND relname = 'device_lists_outbound_last_success_unique_idx'
+            """
+        )
+
+        if cur.rowcount:
+            logger.info(
+                "Unique index exists on device_lists_outbound_last_success: "
+                "skipping rebuild"
+            )
+            return
+
+    logger.info("Rebuilding device_lists_outbound_last_success with unique index")
+    execute_statements_from_stream(cur, StringIO(_rebuild_commands))
+
+
+# there might be duplicates, so the easiest way to achieve this is to create a new
+# table with the right data, and renaming it into place
+
+_rebuild_commands = """
+DROP TABLE IF EXISTS device_lists_outbound_last_success_new;
+
+CREATE TABLE device_lists_outbound_last_success_new (
+    destination TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    stream_id BIGINT NOT NULL
+);
+
+-- this took about 30 seconds on matrix.org's 16 million rows.
+INSERT INTO device_lists_outbound_last_success_new
+    SELECT destination, user_id, MAX(stream_id) FROM device_lists_outbound_last_success
+    GROUP BY destination, user_id;
+
+-- and this another 30 seconds.
+CREATE UNIQUE INDEX device_lists_outbound_last_success_unique_idx
+    ON device_lists_outbound_last_success_new (destination, user_id);
+
+DROP TABLE device_lists_outbound_last_success;
+
+ALTER TABLE device_lists_outbound_last_success_new
+    RENAME TO device_lists_outbound_last_success;
+"""
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 9947dbce77..b112ff3df2 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -78,7 +78,6 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
     "device_lists_remote_extremeties": "device_lists_remote_extremeties_unique_idx",
     "device_lists_remote_cache": "device_lists_remote_cache_unique_idx",
     "event_search": "event_search_event_id_idx",
-    "device_lists_outbound_last_success": "device_lists_outbound_last_success_unique_idx",
 }
 
 
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 640f242584..9afc145340 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -19,10 +19,12 @@ import logging
 import os
 import re
 from collections import Counter
+from typing import TextIO
 
 import attr
 
 from synapse.storage.engines.postgres import PostgresEngine
+from synapse.storage.types import Cursor
 
 logger = logging.getLogger(__name__)
 
@@ -479,8 +481,7 @@ def _apply_module_schema_files(cur, database_engine, modname, names_and_streams)
             )
 
         logger.info("applying schema %s for %s", name, modname)
-        for statement in get_statements(stream):
-            cur.execute(statement)
+        execute_statements_from_stream(cur, stream)
 
         # Mark as done.
         cur.execute(
@@ -538,8 +539,12 @@ def get_statements(f):
 
 def executescript(txn, schema_path):
     with open(schema_path, "r") as f:
-        for statement in get_statements(f):
-            txn.execute(statement)
+        execute_statements_from_stream(txn, f)
+
+
+def execute_statements_from_stream(cur: Cursor, f: TextIO):
+    for statement in get_statements(f):
+        cur.execute(statement)
 
 
 def _get_or_create_schema_state(txn, database_engine):