diff --git a/changelog.d/14222.feature b/changelog.d/14222.feature
new file mode 100644
index 0000000000..5d0ae16e13
--- /dev/null
+++ b/changelog.d/14222.feature
@@ -0,0 +1 @@
+Support for thread-specific notifications & receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771) and [MSC3773](https://github.com/matrix-org/matrix-spec-proposals/pull/3773)).
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index f070e6e88a..b283ab0f9c 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -294,6 +294,44 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
self._background_backfill_thread_id,
)
+ # Indexes which will be used to quickly make the thread_id column non-null.
+ self.db_pool.updates.register_background_index_update(
+ "event_push_actions_thread_id_null",
+ index_name="event_push_actions_thread_id_null",
+ table="event_push_actions",
+ columns=["thread_id"],
+ where_clause="thread_id IS NULL",
+ )
+ self.db_pool.updates.register_background_index_update(
+ "event_push_summary_thread_id_null",
+ index_name="event_push_summary_thread_id_null",
+ table="event_push_summary",
+ columns=["thread_id"],
+ where_clause="thread_id IS NULL",
+ )
+
+ # Check ASAP (and then later, every 1s) to see if we have finished
+ # background updates the event_push_actions and event_push_summary tables.
+ self._clock.call_later(0.0, self._check_event_push_backfill_thread_id)
+ self._event_push_backfill_thread_id_done = False
+
+ @wrap_as_background_process("check_event_push_backfill_thread_id")
+ async def _check_event_push_backfill_thread_id(self) -> None:
+ """
+ Has thread_id finished backfilling?
+
+ If not, we need to just-in-time update it so the queries work.
+ """
+ done = await self.db_pool.updates.has_completed_background_update(
+ "event_push_backfill_thread_id"
+ )
+
+ if done:
+ self._event_push_backfill_thread_id_done = True
+ else:
+ # Reschedule to run.
+ self._clock.call_later(15.0, self._check_event_push_backfill_thread_id)
+
async def _background_backfill_thread_id(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -526,6 +564,25 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
)
+ # First ensure that the existing rows have an updated thread_id field.
+ if not self._event_push_backfill_thread_id_done:
+ txn.execute(
+ """
+ UPDATE event_push_summary
+ SET thread_id = ?
+ WHERE room_id = ? AND user_id = ? AND thread_id is NULL
+ """,
+ (MAIN_TIMELINE, room_id, user_id),
+ )
+ txn.execute(
+ """
+ UPDATE event_push_actions
+ SET thread_id = ?
+ WHERE room_id = ? AND user_id = ? AND thread_id is NULL
+ """,
+ (MAIN_TIMELINE, room_id, user_id),
+ )
+
# First we pull the counts from the summary table.
#
# We check that `last_receipt_stream_ordering` matches the stream ordering of the
@@ -1341,6 +1398,25 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
(room_id, user_id, stream_ordering, *thread_args),
)
+ # First ensure that the existing rows have an updated thread_id field.
+ if not self._event_push_backfill_thread_id_done:
+ txn.execute(
+ """
+ UPDATE event_push_summary
+ SET thread_id = ?
+ WHERE room_id = ? AND user_id = ? AND thread_id is NULL
+ """,
+ (MAIN_TIMELINE, room_id, user_id),
+ )
+ txn.execute(
+ """
+ UPDATE event_push_actions
+ SET thread_id = ?
+ WHERE room_id = ? AND user_id = ? AND thread_id is NULL
+ """,
+ (MAIN_TIMELINE, room_id, user_id),
+ )
+
# Fetch the notification counts between the stream ordering of the
# latest receipt and what was previously summarised.
unread_counts = self._get_notif_unread_count_for_user_room(
@@ -1475,6 +1551,19 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
"""
+ # Ensure that any new actions have an updated thread_id.
+ if not self._event_push_backfill_thread_id_done:
+ txn.execute(
+ """
+ UPDATE event_push_actions
+ SET thread_id = ?
+ WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL
+ """,
+ (MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering),
+ )
+
+ # XXX Do we need to update summaries here too?
+
# Calculate the new counts that should be upserted into event_push_summary
sql = """
SELECT user_id, room_id, thread_id,
@@ -1537,6 +1626,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
logger.info("Rotating notifications, handling %d rows", len(summaries))
+ # Ensure that any updated threads have the proper thread_id.
+ if not self._event_push_backfill_thread_id_done:
+ txn.execute_batch(
+ """
+ UPDATE event_push_summary
+ SET thread_id = ?
+ WHERE room_id = ? AND user_id = ? AND thread_id is NULL
+ """,
+ [
+ (MAIN_TIMELINE, room_id, user_id)
+ for user_id, room_id, _ in summaries
+ ],
+ )
+
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
diff --git a/synapse/storage/schema/main/delta/73/06thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/73/06thread_notifications_backfill.sql
deleted file mode 100644
index 0ffde9bbeb..0000000000
--- a/synapse/storage/schema/main/delta/73/06thread_notifications_backfill.sql
+++ /dev/null
@@ -1,29 +0,0 @@
-/* Copyright 2022 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- Forces the background updates from 06thread_notifications.sql to run in the
--- foreground as code will now require those to be "done".
-
-DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id';
-
--- Overwrite any null thread_id columns.
-UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL;
-UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL;
-UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL;
-
--- Do not run the event_push_summary_unique_index job if it is pending; the
--- thread_id field will be made required.
-DELETE FROM background_updates WHERE update_name = 'event_push_summary_unique_index';
-DROP INDEX IF EXISTS event_push_summary_unique_index;
diff --git a/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/73/06thread_notifications_thread_id_idx.sql
index 33674f8c62..8b3c636594 100644
--- a/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.postgres
+++ b/synapse/storage/schema/main/delta/73/06thread_notifications_thread_id_idx.sql
@@ -13,7 +13,11 @@
* limitations under the License.
*/
--- The columns can now be made non-nullable.
-ALTER TABLE event_push_actions_staging ALTER COLUMN thread_id SET NOT NULL;
-ALTER TABLE event_push_actions ALTER COLUMN thread_id SET NOT NULL;
-ALTER TABLE event_push_summary ALTER COLUMN thread_id SET NOT NULL;
+-- Allow there to be multiple summaries per user/room.
+DROP INDEX IF EXISTS event_push_summary_unique_index;
+
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+ (7306, 'event_push_actions_thread_id_null', '{}', 'event_push_backfill_thread_id');
+
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+ (7306, 'event_push_summary_thread_id_null', '{}', 'event_push_backfill_thread_id');
diff --git a/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite
deleted file mode 100644
index 5322ad77a4..0000000000
--- a/synapse/storage/schema/main/delta/73/07thread_notifications_not_null.sql.sqlite
+++ /dev/null
@@ -1,101 +0,0 @@
-/* Copyright 2022 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- SQLite doesn't support modifying columns to an existing table, so it must
--- be recreated.
-
--- Create the new tables.
-CREATE TABLE event_push_actions_staging_new (
- event_id TEXT NOT NULL,
- user_id TEXT NOT NULL,
- actions TEXT NOT NULL,
- notif SMALLINT NOT NULL,
- highlight SMALLINT NOT NULL,
- unread SMALLINT,
- thread_id TEXT NOT NULL,
- inserted_ts BIGINT
-);
-
-CREATE TABLE event_push_actions_new (
- room_id TEXT NOT NULL,
- event_id TEXT NOT NULL,
- user_id TEXT NOT NULL,
- profile_tag VARCHAR(32),
- actions TEXT NOT NULL,
- topological_ordering BIGINT,
- stream_ordering BIGINT,
- notif SMALLINT,
- highlight SMALLINT,
- unread SMALLINT,
- thread_id TEXT NOT NULL,
- CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag)
-);
-
-CREATE TABLE event_push_summary_new (
- user_id TEXT NOT NULL,
- room_id TEXT NOT NULL,
- notif_count BIGINT NOT NULL,
- stream_ordering BIGINT NOT NULL,
- unread_count BIGINT,
- last_receipt_stream_ordering BIGINT,
- thread_id TEXT NOT NULL
-);
-
--- Swap the indexes.
-DROP INDEX IF EXISTS event_push_actions_staging_id;
-CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging_new(event_id);
-
-DROP INDEX IF EXISTS event_push_actions_room_id_user_id;
-DROP INDEX IF EXISTS event_push_actions_rm_tokens;
-DROP INDEX IF EXISTS event_push_actions_stream_ordering;
-DROP INDEX IF EXISTS event_push_actions_u_highlight;
-DROP INDEX IF EXISTS event_push_actions_highlights_index;
-CREATE INDEX event_push_actions_room_id_user_id on event_push_actions_new(room_id, user_id);
-CREATE INDEX event_push_actions_rm_tokens on event_push_actions_new( user_id, room_id, topological_ordering, stream_ordering );
-CREATE INDEX event_push_actions_stream_ordering on event_push_actions_new( stream_ordering, user_id );
-CREATE INDEX event_push_actions_u_highlight ON event_push_actions_new (user_id, stream_ordering);
-CREATE INDEX event_push_actions_highlights_index ON event_push_actions_new (user_id, room_id, topological_ordering, stream_ordering);
-
--- Copy the data.
-INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts)
- SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts
- FROM event_push_actions_staging;
-
-INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id)
- SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id
- FROM event_push_actions;
-
-INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id)
- SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id
- FROM event_push_summary;
-
--- Drop the old tables.
-DROP TABLE event_push_actions_staging;
-DROP TABLE event_push_actions;
-DROP TABLE event_push_summary;
-
--- Rename the tables.
-ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging;
-ALTER TABLE event_push_actions_new RENAME TO event_push_actions;
-ALTER TABLE event_push_summary_new RENAME TO event_push_summary;
-
--- Re-run background updates from 72/02event_push_actions_index.sql and
--- 72/06thread_notifications.sql.
-INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
- (7307, 'event_push_summary_unique_index2', '{}')
- ON CONFLICT (update_name) DO NOTHING;
-INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
- (7307, 'event_push_actions_stream_highlight_index', '{}')
- ON CONFLICT (update_name) DO NOTHING;
|