diff --git a/changelog.d/15437.misc b/changelog.d/15437.misc
new file mode 100644
index 0000000000..2dea23784f
--- /dev/null
+++ b/changelog.d/15437.misc
@@ -0,0 +1 @@
+Make the `thread_id` column on `event_push_actions`, `event_push_actions_staging`, and `event_push_summary` non-null.
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index a99aea8926..ca085ef800 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -561,6 +561,50 @@ class BackgroundUpdater:
updater, oneshot=True
)
+ def register_background_validate_constraint(
+ self, update_name: str, constraint_name: str, table: str
+ ) -> None:
+ """Helper for store classes to do a background validate constraint.
+
+ This only applies on PostgreSQL.
+
+ To use:
+
+ 1. use a schema delta file to add a background update. Example:
+ INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('validate_my_constraint', '{}');
+
+ 2. In the Store constructor, call this method
+
+ Args:
+ update_name: update_name to register for
+ constraint_name: name of constraint to validate
+ table: table the constraint is applied to
+ """
+
+ def runner(conn: Connection) -> None:
+ c = conn.cursor()
+
+ sql = f"""
+ ALTER TABLE {table} VALIDATE CONSTRAINT {constraint_name};
+ """
+ logger.debug("[SQL] %s", sql)
+ c.execute(sql)
+
+ async def updater(progress: JsonDict, batch_size: int) -> int:
+ assert isinstance(
+ self.db_pool.engine, engines.PostgresEngine
+ ), "validate constraint background update registered for non-Postres database"
+
+ logger.info("Validating constraint %s to %s", constraint_name, table)
+ await self.db_pool.runWithConnection(runner)
+ await self._end_background_update(update_name)
+ return 1
+
+ self._background_update_handlers[update_name] = _BackgroundUpdateHandler(
+ updater, oneshot=True
+ )
+
async def create_index_in_background(
self,
index_name: str,
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index eeccf5db24..ab8f354dc1 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -100,7 +100,6 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
from synapse.storage.databases.main.stream import StreamWorkerStore
-from synapse.types import JsonDict
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -289,180 +288,22 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
unique=True,
)
- self.db_pool.updates.register_background_update_handler(
- "event_push_backfill_thread_id",
- self._background_backfill_thread_id,
+ self.db_pool.updates.register_background_validate_constraint(
+ "event_push_actions_staging_thread_id",
+ constraint_name="event_push_actions_staging_thread_id",
+ table="event_push_actions_staging",
)
-
- # Indexes which will be used to quickly make the thread_id column non-null.
- self.db_pool.updates.register_background_index_update(
- "event_push_actions_thread_id_null",
- index_name="event_push_actions_thread_id_null",
+ self.db_pool.updates.register_background_validate_constraint(
+ "event_push_actions_thread_id",
+ constraint_name="event_push_actions_thread_id",
table="event_push_actions",
- columns=["thread_id"],
- where_clause="thread_id IS NULL",
)
- self.db_pool.updates.register_background_index_update(
- "event_push_summary_thread_id_null",
- index_name="event_push_summary_thread_id_null",
+ self.db_pool.updates.register_background_validate_constraint(
+ "event_push_summary_thread_id",
+ constraint_name="event_push_summary_thread_id",
table="event_push_summary",
- columns=["thread_id"],
- where_clause="thread_id IS NULL",
)
- # Check ASAP (and then later, every 1s) to see if we have finished
- # background updates the event_push_actions and event_push_summary tables.
- self._clock.call_later(0.0, self._check_event_push_backfill_thread_id)
- self._event_push_backfill_thread_id_done = False
-
- @wrap_as_background_process("check_event_push_backfill_thread_id")
- async def _check_event_push_backfill_thread_id(self) -> None:
- """
- Has thread_id finished backfilling?
-
- If not, we need to just-in-time update it so the queries work.
- """
- done = await self.db_pool.updates.has_completed_background_update(
- "event_push_backfill_thread_id"
- )
-
- if done:
- self._event_push_backfill_thread_id_done = True
- else:
- # Reschedule to run.
- self._clock.call_later(15.0, self._check_event_push_backfill_thread_id)
-
- async def _background_backfill_thread_id(
- self, progress: JsonDict, batch_size: int
- ) -> int:
- """
- Fill in the thread_id field for event_push_actions and event_push_summary.
-
- This is preparatory so that it can be made non-nullable in the future.
-
- Because all current (null) data is done in an unthreaded manner this
- simply assumes it is on the "main" timeline. Since event_push_actions
- are periodically cleared it is not possible to correctly re-calculate
- the thread_id.
- """
- event_push_actions_done = progress.get("event_push_actions_done", False)
-
- def add_thread_id_txn(
- txn: LoggingTransaction, start_stream_ordering: int
- ) -> int:
- sql = """
- SELECT stream_ordering
- FROM event_push_actions
- WHERE
- thread_id IS NULL
- AND stream_ordering > ?
- ORDER BY stream_ordering
- LIMIT ?
- """
- txn.execute(sql, (start_stream_ordering, batch_size))
-
- # No more rows to process.
- rows = txn.fetchall()
- if not rows:
- progress["event_push_actions_done"] = True
- self.db_pool.updates._background_update_progress_txn(
- txn, "event_push_backfill_thread_id", progress
- )
- return 0
-
- # Update the thread ID for any of those rows.
- max_stream_ordering = rows[-1][0]
-
- sql = """
- UPDATE event_push_actions
- SET thread_id = 'main'
- WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
- """
- txn.execute(
- sql,
- (
- start_stream_ordering,
- max_stream_ordering,
- ),
- )
-
- # Update progress.
- processed_rows = txn.rowcount
- progress["max_event_push_actions_stream_ordering"] = max_stream_ordering
- self.db_pool.updates._background_update_progress_txn(
- txn, "event_push_backfill_thread_id", progress
- )
-
- return processed_rows
-
- def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
- min_user_id = progress.get("max_summary_user_id", "")
- min_room_id = progress.get("max_summary_room_id", "")
-
- # Slightly overcomplicated query for getting the Nth user ID / room
- # ID tuple, or the last if there are less than N remaining.
- sql = """
- SELECT user_id, room_id FROM (
- SELECT user_id, room_id FROM event_push_summary
- WHERE (user_id, room_id) > (?, ?)
- AND thread_id IS NULL
- ORDER BY user_id, room_id
- LIMIT ?
- ) AS e
- ORDER BY user_id DESC, room_id DESC
- LIMIT 1
- """
-
- txn.execute(sql, (min_user_id, min_room_id, batch_size))
- row = txn.fetchone()
- if not row:
- return 0
-
- max_user_id, max_room_id = row
-
- sql = """
- UPDATE event_push_summary
- SET thread_id = 'main'
- WHERE
- (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?)
- AND thread_id IS NULL
- """
- txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id))
- processed_rows = txn.rowcount
-
- progress["max_summary_user_id"] = max_user_id
- progress["max_summary_room_id"] = max_room_id
- self.db_pool.updates._background_update_progress_txn(
- txn, "event_push_backfill_thread_id", progress
- )
-
- return processed_rows
-
- # First update the event_push_actions table, then the event_push_summary table.
- #
- # Note that the event_push_actions_staging table is ignored since it is
- # assumed that items in that table will only exist for a short period of
- # time.
- if not event_push_actions_done:
- result = await self.db_pool.runInteraction(
- "event_push_backfill_thread_id",
- add_thread_id_txn,
- progress.get("max_event_push_actions_stream_ordering", 0),
- )
- else:
- result = await self.db_pool.runInteraction(
- "event_push_backfill_thread_id",
- add_thread_id_summary_txn,
- )
-
- # Only done after the event_push_summary table is done.
- if not result:
- await self.db_pool.updates._end_background_update(
- "event_push_backfill_thread_id"
- )
-
- return result
-
async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
"""Get the notification count by room for a user. Only considers notifications,
not highlight or unread counts, and threads are currently aggregated under their room.
@@ -711,25 +552,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
)
- # First ensure that the existing rows have an updated thread_id field.
- if not self._event_push_backfill_thread_id_done:
- txn.execute(
- """
- UPDATE event_push_summary
- SET thread_id = ?
- WHERE room_id = ? AND user_id = ? AND thread_id is NULL
- """,
- (MAIN_TIMELINE, room_id, user_id),
- )
- txn.execute(
- """
- UPDATE event_push_actions
- SET thread_id = ?
- WHERE room_id = ? AND user_id = ? AND thread_id is NULL
- """,
- (MAIN_TIMELINE, room_id, user_id),
- )
-
# First we pull the counts from the summary table.
#
# We check that `last_receipt_stream_ordering` matches the stream ordering of the
@@ -1545,25 +1367,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
(room_id, user_id, stream_ordering, *thread_args),
)
- # First ensure that the existing rows have an updated thread_id field.
- if not self._event_push_backfill_thread_id_done:
- txn.execute(
- """
- UPDATE event_push_summary
- SET thread_id = ?
- WHERE room_id = ? AND user_id = ? AND thread_id is NULL
- """,
- (MAIN_TIMELINE, room_id, user_id),
- )
- txn.execute(
- """
- UPDATE event_push_actions
- SET thread_id = ?
- WHERE room_id = ? AND user_id = ? AND thread_id is NULL
- """,
- (MAIN_TIMELINE, room_id, user_id),
- )
-
# Fetch the notification counts between the stream ordering of the
# latest receipt and what was previously summarised.
unread_counts = self._get_notif_unread_count_for_user_room(
@@ -1698,19 +1501,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
"""
- # Ensure that any new actions have an updated thread_id.
- if not self._event_push_backfill_thread_id_done:
- txn.execute(
- """
- UPDATE event_push_actions
- SET thread_id = ?
- WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
- """,
- (MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering),
- )
-
- # XXX Do we need to update summaries here too?
-
# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id, thread_id,
@@ -1773,20 +1563,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
logger.info("Rotating notifications, handling %d rows", len(summaries))
- # Ensure that any updated threads have the proper thread_id.
- if not self._event_push_backfill_thread_id_done:
- txn.execute_batch(
- """
- UPDATE event_push_summary
- SET thread_id = ?
- WHERE room_id = ? AND user_id = ? AND thread_id is NULL
- """,
- [
- (MAIN_TIMELINE, room_id, user_id)
- for user_id, room_id, _ in summaries
- ],
- )
-
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 1672976209..741563abc6 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -106,6 +106,9 @@ Changes in SCHEMA_VERSION = 76:
SCHEMA_COMPAT_VERSION = (
# Queries against `event_stream_ordering` columns in membership tables must
# be disambiguated.
+ #
+ # The threads_id column must written to with non-null values for the
+ # event_push_actions, event_push_actions_staging, and event_push_summary tables.
74
)
"""Limit on how far the synapse codebase can be rolled back without breaking db compat
diff --git a/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql
new file mode 100644
index 0000000000..ce6f9ff937
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql
@@ -0,0 +1,28 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Force the background updates from 06thread_notifications.sql to run in the
+-- foreground as code will now require those to be "done".
+
+DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id';
+
+-- Overwrite any null thread_id values.
+UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL;
+UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL;
+UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL;
+
+-- Drop the background updates to calculate the indexes used to find null thread_ids.
+DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null';
+DELETE FROM background_updates WHERE update_name = 'event_push_summary_thread_id_null';
diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres
new file mode 100644
index 0000000000..40936def6f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres
@@ -0,0 +1,37 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The thread_id columns can now be made non-nullable, this is done by using a
+-- constraint (and not altering the column) to avoid taking out a full table lock.
+--
+-- We initially add an invalid constraint which guards against new data (this
+-- doesn't lock the table).
+ALTER TABLE event_push_actions_staging
+ ADD CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
+ALTER TABLE event_push_actions
+ ADD CONSTRAINT event_push_actions_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
+ALTER TABLE event_push_summary
+ ADD CONSTRAINT event_push_summary_thread_id CHECK (thread_id IS NOT NULL) NOT VALID;
+
+-- We then validate the constraint which doesn't need to worry about new data. It
+-- only needs a SHARE UPDATE EXCLUSIVE lock but can still take a while to complete.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (7605, 'event_push_actions_staging_thread_id', '{}'),
+ (7605, 'event_push_actions_thread_id', '{}'),
+ (7605, 'event_push_summary_thread_id', '{}');
+
+-- Drop the indexes used to find null thread_ids.
+DROP INDEX IF EXISTS event_push_actions_thread_id_null;
+DROP INDEX IF EXISTS event_push_summary_thread_id_null;
diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite
new file mode 100644
index 0000000000..e9372b6cf9
--- /dev/null
+++ b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite
@@ -0,0 +1,102 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ -- The thread_id columns can now be made non-nullable.
+--
+-- SQLite doesn't support modifying columns to an existing table, so it must
+-- be recreated.
+
+-- Create the new tables.
+CREATE TABLE event_push_actions_staging_new (
+ event_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ actions TEXT NOT NULL,
+ notif SMALLINT NOT NULL,
+ highlight SMALLINT NOT NULL,
+ unread SMALLINT,
+ thread_id TEXT,
+ inserted_ts BIGINT,
+ CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id is NOT NULL)
+);
+
+CREATE TABLE event_push_actions_new (
+ room_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ profile_tag VARCHAR(32),
+ actions TEXT NOT NULL,
+ topological_ordering BIGINT,
+ stream_ordering BIGINT,
+ notif SMALLINT,
+ highlight SMALLINT,
+ unread SMALLINT,
+ thread_id TEXT,
+ CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag),
+ CONSTRAINT event_push_actions_thread_id CHECK (thread_id is NOT NULL)
+);
+
+CREATE TABLE event_push_summary_new (
+ user_id TEXT NOT NULL,
+ room_id TEXT NOT NULL,
+ notif_count BIGINT NOT NULL,
+ stream_ordering BIGINT NOT NULL,
+ unread_count BIGINT,
+ last_receipt_stream_ordering BIGINT,
+ thread_id TEXT,
+ CONSTRAINT event_push_summary_thread_id CHECK (thread_id is NOT NULL)
+);
+
+-- Copy the data.
+INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts)
+ SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts
+ FROM event_push_actions_staging;
+
+INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id)
+ SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id
+ FROM event_push_actions;
+
+INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id)
+ SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id
+ FROM event_push_summary;
+
+-- Drop the old tables.
+DROP TABLE event_push_actions_staging;
+DROP TABLE event_push_actions;
+DROP TABLE event_push_summary;
+
+-- Rename the tables.
+ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging;
+ALTER TABLE event_push_actions_new RENAME TO event_push_actions;
+ALTER TABLE event_push_summary_new RENAME TO event_push_summary;
+
+-- Recreate the indexes.
+CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);
+
+CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering);
+CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering );
+CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id);
+CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id );
+CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering);
+
+CREATE UNIQUE INDEX event_push_summary_unique_index2 ON event_push_summary (user_id, room_id, thread_id) ;
+
+-- Recreate some indexes in the background, by re-running the background updates
+-- from 72/02event_push_actions_index.sql and 72/06thread_notifications.sql.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (7403, 'event_push_summary_unique_index2', '{}')
+ ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}';
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (7403, 'event_push_actions_stream_highlight_index', '{}')
+ ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}';
|