summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/background_updates.py44
-rw-r--r--synapse/storage/databases/main/event_push_actions.py244
-rw-r--r--synapse/storage/schema/__init__.py3
-rw-r--r--synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql28
-rw-r--r--synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres37
-rw-r--r--synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite102
6 files changed, 234 insertions, 224 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index ca085ef800..a99aea8926 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -561,50 +561,6 @@ class BackgroundUpdater:
             updater, oneshot=True
         )
 
-    def register_background_validate_constraint(
-        self, update_name: str, constraint_name: str, table: str
-    ) -> None:
-        """Helper for store classes to do a background validate constraint.
-
-        This only applies on PostgreSQL.
-
-        To use:
-
-        1. use a schema delta file to add a background update. Example:
-            INSERT INTO background_updates (update_name, progress_json) VALUES
-                ('validate_my_constraint', '{}');
-
-        2. In the Store constructor, call this method
-
-        Args:
-            update_name: update_name to register for
-            constraint_name: name of constraint to validate
-            table: table the constraint is applied to
-        """
-
-        def runner(conn: Connection) -> None:
-            c = conn.cursor()
-
-            sql = f"""
-            ALTER TABLE {table} VALIDATE CONSTRAINT {constraint_name};
-            """
-            logger.debug("[SQL] %s", sql)
-            c.execute(sql)
-
-        async def updater(progress: JsonDict, batch_size: int) -> int:
-            assert isinstance(
-                self.db_pool.engine, engines.PostgresEngine
-            ), "validate constraint background update registered for non-Postres database"
-
-            logger.info("Validating constraint %s to %s", constraint_name, table)
-            await self.db_pool.runWithConnection(runner)
-            await self._end_background_update(update_name)
-            return 1
-
-        self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
-            updater, oneshot=True
-        )
-
     async def create_index_in_background(
         self,
         index_name: str,
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 2e98a29fef..6fdb1e292e 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -100,6 +100,7 @@ from synapse.storage.database import (
 )
 from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
 from synapse.storage.databases.main.stream import StreamWorkerStore
+from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
 
@@ -288,22 +289,180 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             unique=True,
         )
 
-        self.db_pool.updates.register_background_validate_constraint(
-            "event_push_actions_staging_thread_id",
-            constraint_name="event_push_actions_staging_thread_id",
-            table="event_push_actions_staging",
+        self.db_pool.updates.register_background_update_handler(
+            "event_push_backfill_thread_id",
+            self._background_backfill_thread_id,
         )
-        self.db_pool.updates.register_background_validate_constraint(
-            "event_push_actions_thread_id",
-            constraint_name="event_push_actions_thread_id",
+
+        # Indexes which will be used to quickly make the thread_id column non-null.
+        self.db_pool.updates.register_background_index_update(
+            "event_push_actions_thread_id_null",
+            index_name="event_push_actions_thread_id_null",
             table="event_push_actions",
+            columns=["thread_id"],
+            where_clause="thread_id IS NULL",
         )
-        self.db_pool.updates.register_background_validate_constraint(
-            "event_push_summary_thread_id",
-            constraint_name="event_push_summary_thread_id",
+        self.db_pool.updates.register_background_index_update(
+            "event_push_summary_thread_id_null",
+            index_name="event_push_summary_thread_id_null",
             table="event_push_summary",
+            columns=["thread_id"],
+            where_clause="thread_id IS NULL",
         )
 
+        # Check ASAP (and then later, every 1s) to see if we have finished
+        # background updates the event_push_actions and event_push_summary tables.
+        self._clock.call_later(0.0, self._check_event_push_backfill_thread_id)
+        self._event_push_backfill_thread_id_done = False
+
+    @wrap_as_background_process("check_event_push_backfill_thread_id")
+    async def _check_event_push_backfill_thread_id(self) -> None:
+        """
+        Has thread_id finished backfilling?
+
+        If not, we need to just-in-time update it so the queries work.
+        """
+        done = await self.db_pool.updates.has_completed_background_update(
+            "event_push_backfill_thread_id"
+        )
+
+        if done:
+            self._event_push_backfill_thread_id_done = True
+        else:
+            # Reschedule to run.
+            self._clock.call_later(15.0, self._check_event_push_backfill_thread_id)
+
+    async def _background_backfill_thread_id(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """
+        Fill in the thread_id field for event_push_actions and event_push_summary.
+
+        This is preparatory so that it can be made non-nullable in the future.
+
+        Because all current (null) data is done in an unthreaded manner this
+        simply assumes it is on the "main" timeline. Since event_push_actions
+        are periodically cleared it is not possible to correctly re-calculate
+        the thread_id.
+        """
+        event_push_actions_done = progress.get("event_push_actions_done", False)
+
+        def add_thread_id_txn(
+            txn: LoggingTransaction, start_stream_ordering: int
+        ) -> int:
+            sql = """
+            SELECT stream_ordering
+            FROM event_push_actions
+            WHERE
+                thread_id IS NULL
+                AND stream_ordering > ?
+            ORDER BY stream_ordering
+            LIMIT ?
+            """
+            txn.execute(sql, (start_stream_ordering, batch_size))
+
+            # No more rows to process.
+            rows = txn.fetchall()
+            if not rows:
+                progress["event_push_actions_done"] = True
+                self.db_pool.updates._background_update_progress_txn(
+                    txn, "event_push_backfill_thread_id", progress
+                )
+                return 0
+
+            # Update the thread ID for any of those rows.
+            max_stream_ordering = rows[-1][0]
+
+            sql = """
+            UPDATE event_push_actions
+            SET thread_id = 'main'
+            WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
+            """
+            txn.execute(
+                sql,
+                (
+                    start_stream_ordering,
+                    max_stream_ordering,
+                ),
+            )
+
+            # Update progress.
+            processed_rows = txn.rowcount
+            progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "event_push_backfill_thread_id", progress
+            )
+
+            return processed_rows
+
+        def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
+            min_user_id = progress.get("max_summary_user_id", "")
+            min_room_id = progress.get("max_summary_room_id", "")
+
+            # Slightly overcomplicated query for getting the Nth user ID / room
+            # ID tuple, or the last if there are less than N remaining.
+            sql = """
+            SELECT user_id, room_id FROM (
+                SELECT user_id, room_id FROM event_push_summary
+                WHERE (user_id, room_id) > (?, ?)
+                    AND thread_id IS NULL
+                ORDER BY user_id, room_id
+                LIMIT ?
+            ) AS e
+            ORDER BY user_id DESC, room_id DESC
+            LIMIT 1
+            """
+
+            txn.execute(sql, (min_user_id, min_room_id, batch_size))
+            row = txn.fetchone()
+            if not row:
+                return 0
+
+            max_user_id, max_room_id = row
+
+            sql = """
+            UPDATE event_push_summary
+            SET thread_id = 'main'
+            WHERE
+                (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
+                AND thread_id IS NULL
+            """
+            txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
+            processed_rows = txn.rowcount
+
+            progress["max_summary_user_id"] = max_user_id
+            progress["max_summary_room_id"] = max_room_id
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "event_push_backfill_thread_id", progress
+            )
+
+            return processed_rows
+
+        # First update the event_push_actions table, then the event_push_summary table.
+        #
+        # Note that the event_push_actions_staging table is ignored since it is
+        # assumed that items in that table will only exist for a short period of
+        # time.
+        if not event_push_actions_done:
+            result = await self.db_pool.runInteraction(
+                "event_push_backfill_thread_id",
+                add_thread_id_txn,
+                progress.get("max_event_push_actions_stream_ordering", 0),
+            )
+        else:
+            result = await self.db_pool.runInteraction(
+                "event_push_backfill_thread_id",
+                add_thread_id_summary_txn,
+            )
+
+            # Only done after the event_push_summary table is done.
+            if not result:
+                await self.db_pool.updates._end_background_update(
+                    "event_push_backfill_thread_id"
+                )
+
+        return result
+
     async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
         """Get the notification count by room for a user. Only considers notifications,
         not highlight or unread counts, and threads are currently aggregated under their room.
@@ -552,6 +711,25 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
         )
 
+        # First ensure that the existing rows have an updated thread_id field.
+        if not self._event_push_backfill_thread_id_done:
+            txn.execute(
+                """
+                UPDATE event_push_summary
+                SET thread_id = ?
+                WHERE room_id = ? AND user_id = ? AND thread_id is NULL
+                """,
+                (MAIN_TIMELINE, room_id, user_id),
+            )
+            txn.execute(
+                """
+                UPDATE event_push_actions
+                SET thread_id = ?
+                WHERE room_id = ? AND user_id = ? AND thread_id is NULL
+                """,
+                (MAIN_TIMELINE, room_id, user_id),
+            )
+
         # First we pull the counts from the summary table.
         #
         # We check that `last_receipt_stream_ordering` matches the stream ordering of the
@@ -1367,6 +1545,25 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 (room_id, user_id, stream_ordering, *thread_args),
             )
 
+            # First ensure that the existing rows have an updated thread_id field.
+            if not self._event_push_backfill_thread_id_done:
+                txn.execute(
+                    """
+                    UPDATE event_push_summary
+                    SET thread_id = ?
+                    WHERE room_id = ? AND user_id = ? AND thread_id is NULL
+                    """,
+                    (MAIN_TIMELINE, room_id, user_id),
+                )
+                txn.execute(
+                    """
+                    UPDATE event_push_actions
+                    SET thread_id = ?
+                    WHERE room_id = ? AND user_id = ? AND thread_id is NULL
+                    """,
+                    (MAIN_TIMELINE, room_id, user_id),
+                )
+
             # Fetch the notification counts between the stream ordering of the
             # latest receipt and what was previously summarised.
             unread_counts = self._get_notif_unread_count_for_user_room(
@@ -1501,6 +1698,19 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
         """
 
+        # Ensure that any new actions have an updated thread_id.
+        if not self._event_push_backfill_thread_id_done:
+            txn.execute(
+                """
+                UPDATE event_push_actions
+                SET thread_id = ?
+                WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
+                """,
+                (MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering),
+            )
+
+        # XXX Do we need to update summaries here too?
+
         # Calculate the new counts that should be upserted into event_push_summary
         sql = """
             SELECT user_id, room_id, thread_id,
@@ -1563,6 +1773,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
         logger.info("Rotating notifications, handling %d rows", len(summaries))
 
+        # Ensure that any updated threads have the proper thread_id.
+        if not self._event_push_backfill_thread_id_done:
+            txn.execute_batch(
+                """
+                UPDATE event_push_summary
+                SET thread_id = ?
+                WHERE room_id = ? AND user_id = ? AND thread_id is NULL
+                """,
+                [
+                    (MAIN_TIMELINE, room_id, user_id)
+                    for user_id, room_id, _ in summaries
+                ],
+            )
+
         self.db_pool.simple_upsert_many_txn(
             txn,
             table="event_push_summary",
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 741563abc6..1672976209 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -106,9 +106,6 @@ Changes in SCHEMA_VERSION = 76:
 SCHEMA_COMPAT_VERSION = (
     # Queries against `event_stream_ordering` columns in membership tables must
     # be disambiguated.
-    #
-    # The threads_id column must written to with non-null values for the
-    # event_push_actions, event_push_actions_staging, and event_push_summary tables.
     74
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
diff --git a/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql
deleted file mode 100644
index ce6f9ff937..0000000000
--- a/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql
+++ /dev/null
@@ -1,28 +0,0 @@
-/* Copyright 2023 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- Force the background updates from 06thread_notifications.sql to run in the
--- foreground as code will now require those to be "done".
-
-DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id';
-
--- Overwrite any null thread_id values.
-UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL;
-UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL;
-UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL;
-
--- Drop the background updates to calculate the indexes used to find null thread_ids.
-DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null';
-DELETE FROM background_updates WHERE update_name = 'event_push_summary_thread_id_null';
diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres
deleted file mode 100644
index 40936def6f..0000000000
--- a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres
+++ /dev/null
@@ -1,37 +0,0 @@
-/* Copyright 2022 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- The thread_id columns can now be made non-nullable, this is done by using a
--- constraint (and not altering the column) to avoid taking out a full table lock.
---
--- We initially add an invalid constraint which guards against new data (this
--- doesn't lock the table).
-ALTER TABLE event_push_actions_staging
-    ADD CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
-ALTER TABLE event_push_actions
-    ADD CONSTRAINT event_push_actions_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
-ALTER TABLE event_push_summary
-    ADD CONSTRAINT event_push_summary_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
-
--- We then validate the constraint which doesn't need to worry about new data. It
--- only needs a SHARE UPDATE EXCLUSIVE lock but can still take a while to complete.
-INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
-  (7605, 'event_push_actions_staging_thread_id', '{}'),
-  (7605, 'event_push_actions_thread_id', '{}'),
-  (7605, 'event_push_summary_thread_id', '{}');
-
--- Drop the indexes used to find null thread_ids.
-DROP INDEX IF EXISTS event_push_actions_thread_id_null;
-DROP INDEX IF EXISTS event_push_summary_thread_id_null;
diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite
deleted file mode 100644
index e9372b6cf9..0000000000
--- a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite
+++ /dev/null
@@ -1,102 +0,0 @@
-/* Copyright 2022 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- -- The thread_id columns can now be made non-nullable.
---
--- SQLite doesn't support modifying columns to an existing table, so it must
--- be recreated.
-
--- Create the new tables.
-CREATE TABLE event_push_actions_staging_new (
-    event_id TEXT NOT NULL,
-    user_id TEXT NOT NULL,
-    actions TEXT NOT NULL,
-    notif SMALLINT NOT NULL,
-    highlight SMALLINT NOT NULL,
-    unread SMALLINT,
-    thread_id TEXT,
-    inserted_ts BIGINT,
-    CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id is NOT NULL)
-);
-
-CREATE TABLE event_push_actions_new (
-    room_id TEXT NOT NULL,
-    event_id TEXT NOT NULL,
-    user_id TEXT NOT NULL,
-    profile_tag VARCHAR(32),
-    actions TEXT NOT NULL,
-    topological_ordering BIGINT,
-    stream_ordering BIGINT,
-    notif SMALLINT,
-    highlight SMALLINT,
-    unread SMALLINT,
-    thread_id TEXT,
-    CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag),
-    CONSTRAINT event_push_actions_thread_id CHECK (thread_id is NOT NULL)
-);
-
-CREATE TABLE event_push_summary_new (
-    user_id TEXT NOT NULL,
-    room_id TEXT NOT NULL,
-    notif_count BIGINT NOT NULL,
-    stream_ordering BIGINT NOT NULL,
-    unread_count BIGINT,
-    last_receipt_stream_ordering BIGINT,
-    thread_id TEXT,
-    CONSTRAINT event_push_summary_thread_id CHECK (thread_id is NOT NULL)
-);
-
--- Copy the data.
-INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts)
-    SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts
-    FROM event_push_actions_staging;
-
-INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id)
-    SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id
-    FROM event_push_actions;
-
-INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id)
-    SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id
-    FROM event_push_summary;
-
--- Drop the old tables.
-DROP TABLE event_push_actions_staging;
-DROP TABLE event_push_actions;
-DROP TABLE event_push_summary;
-
--- Rename the tables.
-ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging;
-ALTER TABLE event_push_actions_new RENAME TO event_push_actions;
-ALTER TABLE event_push_summary_new RENAME TO event_push_summary;
-
--- Recreate the indexes.
-CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);
-
-CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering);
-CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering );
-CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id);
-CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id );
-CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering);
-
-CREATE UNIQUE INDEX event_push_summary_unique_index2 ON event_push_summary (user_id, room_id, thread_id) ;
-
--- Recreate some indexes in the background, by re-running the background updates
--- from 72/02event_push_actions_index.sql and 72/06thread_notifications.sql.
-INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
-  (7403, 'event_push_summary_unique_index2', '{}')
-  ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}';
-INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
-  (7403, 'event_push_actions_stream_highlight_index', '{}')
-  ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}';