summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2023-03-30 15:11:31 -0400
committerGitHub <noreply@github.com>2023-03-30 15:11:31 -0400
commit2a234b788e2b5706ee83cf8eb86dfd004bc7c166 (patch)
tree5d84fd514b83e346a6e082e48118fae2b290ea10
parentto_device updates could be dropped when consuming the replication stream (#15... (diff)
downloadsynapse-2a234b788e2b5706ee83cf8eb86dfd004bc7c166.tar.xz
Set thread_id column to non-null for event_push_{actions,actions_staging,summary} (#15350)
Clean-up from adding the thread_id column, which was initially
null but backfilled with values. It is desirable to require it to now
be non-null.

In addition to altering this column to be non-null, we clean up
obsolete background jobs, indexes, and just-in-time updating
code.
-rw-r--r--changelog.d/15350.misc1
-rw-r--r--synapse/storage/databases/main/event_push_actions.py240
-rw-r--r--synapse/storage/schema/__init__.py6
-rw-r--r--synapse/storage/schema/main/delta/74/02thread_notifications_backfill.sql28
-rw-r--r--synapse/storage/schema/main/delta/74/03thread_notifications_not_null.sql.postgres23
-rw-r--r--synapse/storage/schema/main/delta/74/03thread_notifications_not_null.sql.sqlite99
6 files changed, 154 insertions, 243 deletions
diff --git a/changelog.d/15350.misc b/changelog.d/15350.misc
new file mode 100644
index 0000000000..2dea23784f
--- /dev/null
+++ b/changelog.d/15350.misc
@@ -0,0 +1 @@
+Make the `thread_id` column on `event_push_actions`, `event_push_actions_staging`, and `event_push_summary` non-null.
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index eeccf5db24..6afc51320a 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -100,7 +100,6 @@ from synapse.storage.database import (
 )
 from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
 from synapse.storage.databases.main.stream import StreamWorkerStore
-from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
 
@@ -289,180 +288,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             unique=True,
         )
 
-        self.db_pool.updates.register_background_update_handler(
-            "event_push_backfill_thread_id",
-            self._background_backfill_thread_id,
-        )
-
-        # Indexes which will be used to quickly make the thread_id column non-null.
-        self.db_pool.updates.register_background_index_update(
-            "event_push_actions_thread_id_null",
-            index_name="event_push_actions_thread_id_null",
-            table="event_push_actions",
-            columns=["thread_id"],
-            where_clause="thread_id IS NULL",
-        )
-        self.db_pool.updates.register_background_index_update(
-            "event_push_summary_thread_id_null",
-            index_name="event_push_summary_thread_id_null",
-            table="event_push_summary",
-            columns=["thread_id"],
-            where_clause="thread_id IS NULL",
-        )
-
-        # Check ASAP (and then later, every 1s) to see if we have finished
-        # background updates the event_push_actions and event_push_summary tables.
-        self._clock.call_later(0.0, self._check_event_push_backfill_thread_id)
-        self._event_push_backfill_thread_id_done = False
-
-    @wrap_as_background_process("check_event_push_backfill_thread_id")
-    async def _check_event_push_backfill_thread_id(self) -> None:
-        """
-        Has thread_id finished backfilling?
-
-        If not, we need to just-in-time update it so the queries work.
-        """
-        done = await self.db_pool.updates.has_completed_background_update(
-            "event_push_backfill_thread_id"
-        )
-
-        if done:
-            self._event_push_backfill_thread_id_done = True
-        else:
-            # Reschedule to run.
-            self._clock.call_later(15.0, self._check_event_push_backfill_thread_id)
-
-    async def _background_backfill_thread_id(
-        self, progress: JsonDict, batch_size: int
-    ) -> int:
-        """
-        Fill in the thread_id field for event_push_actions and event_push_summary.
-
-        This is preparatory so that it can be made non-nullable in the future.
-
-        Because all current (null) data is done in an unthreaded manner this
-        simply assumes it is on the "main" timeline. Since event_push_actions
-        are periodically cleared it is not possible to correctly re-calculate
-        the thread_id.
-        """
-        event_push_actions_done = progress.get("event_push_actions_done", False)
-
-        def add_thread_id_txn(
-            txn: LoggingTransaction, start_stream_ordering: int
-        ) -> int:
-            sql = """
-            SELECT stream_ordering
-            FROM event_push_actions
-            WHERE
-                thread_id IS NULL
-                AND stream_ordering > ?
-            ORDER BY stream_ordering
-            LIMIT ?
-            """
-            txn.execute(sql, (start_stream_ordering, batch_size))
-
-            # No more rows to process.
-            rows = txn.fetchall()
-            if not rows:
-                progress["event_push_actions_done"] = True
-                self.db_pool.updates._background_update_progress_txn(
-                    txn, "event_push_backfill_thread_id", progress
-                )
-                return 0
-
-            # Update the thread ID for any of those rows.
-            max_stream_ordering = rows[-1][0]
-
-            sql = """
-            UPDATE event_push_actions
-            SET thread_id = 'main'
-            WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
-            """
-            txn.execute(
-                sql,
-                (
-                    start_stream_ordering,
-                    max_stream_ordering,
-                ),
-            )
-
-            # Update progress.
-            processed_rows = txn.rowcount
-            progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
-            self.db_pool.updates._background_update_progress_txn(
-                txn, "event_push_backfill_thread_id", progress
-            )
-
-            return processed_rows
-
-        def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
-            min_user_id = progress.get("max_summary_user_id", "")
-            min_room_id = progress.get("max_summary_room_id", "")
-
-            # Slightly overcomplicated query for getting the Nth user ID / room
-            # ID tuple, or the last if there are less than N remaining.
-            sql = """
-            SELECT user_id, room_id FROM (
-                SELECT user_id, room_id FROM event_push_summary
-                WHERE (user_id, room_id) > (?, ?)
-                    AND thread_id IS NULL
-                ORDER BY user_id, room_id
-                LIMIT ?
-            ) AS e
-            ORDER BY user_id DESC, room_id DESC
-            LIMIT 1
-            """
-
-            txn.execute(sql, (min_user_id, min_room_id, batch_size))
-            row = txn.fetchone()
-            if not row:
-                return 0
-
-            max_user_id, max_room_id = row
-
-            sql = """
-            UPDATE event_push_summary
-            SET thread_id = 'main'
-            WHERE
-                (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
-                AND thread_id IS NULL
-            """
-            txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
-            processed_rows = txn.rowcount
-
-            progress["max_summary_user_id"] = max_user_id
-            progress["max_summary_room_id"] = max_room_id
-            self.db_pool.updates._background_update_progress_txn(
-                txn, "event_push_backfill_thread_id", progress
-            )
-
-            return processed_rows
-
-        # First update the event_push_actions table, then the event_push_summary table.
-        #
-        # Note that the event_push_actions_staging table is ignored since it is
-        # assumed that items in that table will only exist for a short period of
-        # time.
-        if not event_push_actions_done:
-            result = await self.db_pool.runInteraction(
-                "event_push_backfill_thread_id",
-                add_thread_id_txn,
-                progress.get("max_event_push_actions_stream_ordering", 0),
-            )
-        else:
-            result = await self.db_pool.runInteraction(
-                "event_push_backfill_thread_id",
-                add_thread_id_summary_txn,
-            )
-
-            # Only done after the event_push_summary table is done.
-            if not result:
-                await self.db_pool.updates._end_background_update(
-                    "event_push_backfill_thread_id"
-                )
-
-        return result
-
     async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
         """Get the notification count by room for a user. Only considers notifications,
         not highlight or unread counts, and threads are currently aggregated under their room.
@@ -711,25 +536,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
         )
 
-        # First ensure that the existing rows have an updated thread_id field.
-        if not self._event_push_backfill_thread_id_done:
-            txn.execute(
-                """
-                UPDATE event_push_summary
-                SET thread_id = ?
-                WHERE room_id = ? AND user_id = ? AND thread_id is NULL
-                """,
-                (MAIN_TIMELINE, room_id, user_id),
-            )
-            txn.execute(
-                """
-                UPDATE event_push_actions
-                SET thread_id = ?
-                WHERE room_id = ? AND user_id = ? AND thread_id is NULL
-                """,
-                (MAIN_TIMELINE, room_id, user_id),
-            )
-
         # First we pull the counts from the summary table.
         #
         # We check that `last_receipt_stream_ordering` matches the stream ordering of the
@@ -1545,25 +1351,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 (room_id, user_id, stream_ordering, *thread_args),
             )
 
-            # First ensure that the existing rows have an updated thread_id field.
-            if not self._event_push_backfill_thread_id_done:
-                txn.execute(
-                    """
-                    UPDATE event_push_summary
-                    SET thread_id = ?
-                    WHERE room_id = ? AND user_id = ? AND thread_id is NULL
-                    """,
-                    (MAIN_TIMELINE, room_id, user_id),
-                )
-                txn.execute(
-                    """
-                    UPDATE event_push_actions
-                    SET thread_id = ?
-                    WHERE room_id = ? AND user_id = ? AND thread_id is NULL
-                    """,
-                    (MAIN_TIMELINE, room_id, user_id),
-                )
-
             # Fetch the notification counts between the stream ordering of the
             # latest receipt and what was previously summarised.
             unread_counts = self._get_notif_unread_count_for_user_room(
@@ -1698,19 +1485,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
         """
 
-        # Ensure that any new actions have an updated thread_id.
-        if not self._event_push_backfill_thread_id_done:
-            txn.execute(
-                """
-                UPDATE event_push_actions
-                SET thread_id = ?
-                WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
-                """,
-                (MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering),
-            )
-
-        # XXX Do we need to update summaries here too?
-
         # Calculate the new counts that should be upserted into event_push_summary
         sql = """
             SELECT user_id, room_id, thread_id,
@@ -1773,20 +1547,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
         logger.info("Rotating notifications, handling %d rows", len(summaries))
 
-        # Ensure that any updated threads have the proper thread_id.
-        if not self._event_push_backfill_thread_id_done:
-            txn.execute_batch(
-                """
-                UPDATE event_push_summary
-                SET thread_id = ?
-                WHERE room_id = ? AND user_id = ? AND thread_id is NULL
-                """,
-                [
-                    (MAIN_TIMELINE, room_id, user_id)
-                    for user_id, room_id, _ in summaries
-                ],
-            )
-
         self.db_pool.simple_upsert_many_txn(
             txn,
             table="event_push_summary",
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index d3103a6c7a..72bbb3a7c2 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -95,9 +95,9 @@ Changes in SCHEMA_VERSION = 74:
 
 
 SCHEMA_COMPAT_VERSION = (
-    # The threads_id column must exist for event_push_actions, event_push_summary,
-    # receipts_linearized, and receipts_graph.
-    73
+    # The threads_id column must written to with non-null values event_push_actions,
+    # event_push_actions_staging, and event_push_summary.
+    74
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
 
diff --git a/synapse/storage/schema/main/delta/74/02thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/74/02thread_notifications_backfill.sql
new file mode 100644
index 0000000000..ce6f9ff937
--- /dev/null
+++ b/synapse/storage/schema/main/delta/74/02thread_notifications_backfill.sql
@@ -0,0 +1,28 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Force the background updates from 06thread_notifications.sql to run in the
+-- foreground as code will now require those to be "done".
+
+DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id';
+
+-- Overwrite any null thread_id values.
+UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL;
+UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL;
+UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL;
+
+-- Drop the background updates to calculate the indexes used to find null thread_ids.
+DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null';
+DELETE FROM background_updates WHERE update_name = 'event_push_summary_thread_id_null';
diff --git a/synapse/storage/schema/main/delta/74/03thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/74/03thread_notifications_not_null.sql.postgres
new file mode 100644
index 0000000000..5f68667425
--- /dev/null
+++ b/synapse/storage/schema/main/delta/74/03thread_notifications_not_null.sql.postgres
@@ -0,0 +1,23 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop the indexes used to find null thread_ids.
+DROP INDEX IF EXISTS event_push_actions_thread_id_null;
+DROP INDEX IF EXISTS event_push_summary_thread_id_null;
+
+-- The thread_id columns can now be made non-nullable.
+ALTER TABLE event_push_actions_staging ALTER COLUMN thread_id SET NOT NULL;
+ALTER TABLE event_push_actions ALTER COLUMN thread_id SET NOT NULL;
+ALTER TABLE event_push_summary ALTER COLUMN thread_id SET NOT NULL;
diff --git a/synapse/storage/schema/main/delta/74/03thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/74/03thread_notifications_not_null.sql.sqlite
new file mode 100644
index 0000000000..f46b233560
--- /dev/null
+++ b/synapse/storage/schema/main/delta/74/03thread_notifications_not_null.sql.sqlite
@@ -0,0 +1,99 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ -- The thread_id columns can now be made non-nullable.
+--
+-- SQLite doesn't support modifying columns to an existing table, so it must
+-- be recreated.
+
+-- Create the new tables.
+CREATE TABLE event_push_actions_staging_new (
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    actions TEXT NOT NULL,
+    notif SMALLINT NOT NULL,
+    highlight SMALLINT NOT NULL,
+    unread SMALLINT,
+    thread_id TEXT NOT NULL,
+    inserted_ts BIGINT
+);
+
+CREATE TABLE event_push_actions_new (
+    room_id TEXT NOT NULL,
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    profile_tag VARCHAR(32),
+    actions TEXT NOT NULL,
+    topological_ordering BIGINT,
+    stream_ordering BIGINT,
+    notif SMALLINT,
+    highlight SMALLINT,
+    unread SMALLINT,
+    thread_id TEXT NOT NULL,
+    CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag)
+);
+
+CREATE TABLE event_push_summary_new (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    notif_count BIGINT NOT NULL,
+    stream_ordering BIGINT NOT NULL,
+    unread_count BIGINT,
+    last_receipt_stream_ordering BIGINT,
+    thread_id TEXT NOT NULL
+);
+
+-- Copy the data.
+INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts)
+    SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts
+    FROM event_push_actions_staging;
+
+INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id)
+    SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id
+    FROM event_push_actions;
+
+INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id)
+    SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id
+    FROM event_push_summary;
+
+-- Drop the old tables.
+DROP TABLE event_push_actions_staging;
+DROP TABLE event_push_actions;
+DROP TABLE event_push_summary;
+
+-- Rename the tables.
+ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging;
+ALTER TABLE event_push_actions_new RENAME TO event_push_actions;
+ALTER TABLE event_push_summary_new RENAME TO event_push_summary;
+
+-- Recreate the indexes.
+CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);
+
+CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering);
+CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering );
+CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id);
+CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id );
+CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering);
+
+CREATE UNIQUE INDEX event_push_summary_unique_index2 ON event_push_summary (user_id, room_id, thread_id) ;
+
+-- Recreate some indexes in the background, by re-running the background updates
+-- from 72/02event_push_actions_index.sql and 72/06thread_notifications.sql.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7403, 'event_push_summary_unique_index2', '{}')
+  ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}';
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7403, 'event_push_actions_stream_highlight_index', '{}')
+  ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}';