diff options
Diffstat (limited to 'synapse')
6 files changed, 234 insertions, 224 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index ca085ef800..a99aea8926 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -561,50 +561,6 @@ class BackgroundUpdater: updater, oneshot=True ) - def register_background_validate_constraint( - self, update_name: str, constraint_name: str, table: str - ) -> None: - """Helper for store classes to do a background validate constraint. - - This only applies on PostgreSQL. - - To use: - - 1. use a schema delta file to add a background update. Example: - INSERT INTO background_updates (update_name, progress_json) VALUES - ('validate_my_constraint', '{}'); - - 2. In the Store constructor, call this method - - Args: - update_name: update_name to register for - constraint_name: name of constraint to validate - table: table the constraint is applied to - """ - - def runner(conn: Connection) -> None: - c = conn.cursor() - - sql = f""" - ALTER TABLE {table} VALIDATE CONSTRAINT {constraint_name}; - """ - logger.debug("[SQL] %s", sql) - c.execute(sql) - - async def updater(progress: JsonDict, batch_size: int) -> int: - assert isinstance( - self.db_pool.engine, engines.PostgresEngine - ), "validate constraint background update registered for non-Postres database" - - logger.info("Validating constraint %s to %s", constraint_name, table) - await self.db_pool.runWithConnection(runner) - await self._end_background_update(update_name) - return 1 - - self._background_update_handlers[update_name] = _BackgroundUpdateHandler( - updater, oneshot=True - ) - async def create_index_in_background( self, index_name: str, diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 2e98a29fef..6fdb1e292e 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -100,6 +100,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.stream import StreamWorkerStore +from synapse.types import JsonDict from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -288,22 +289,180 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas unique=True, ) - self.db_pool.updates.register_background_validate_constraint( - "event_push_actions_staging_thread_id", - constraint_name="event_push_actions_staging_thread_id", - table="event_push_actions_staging", + self.db_pool.updates.register_background_update_handler( + "event_push_backfill_thread_id", + self._background_backfill_thread_id, ) - self.db_pool.updates.register_background_validate_constraint( - "event_push_actions_thread_id", - constraint_name="event_push_actions_thread_id", + + # Indexes which will be used to quickly make the thread_id column non-null. + self.db_pool.updates.register_background_index_update( + "event_push_actions_thread_id_null", + index_name="event_push_actions_thread_id_null", table="event_push_actions", + columns=["thread_id"], + where_clause="thread_id IS NULL", ) - self.db_pool.updates.register_background_validate_constraint( - "event_push_summary_thread_id", - constraint_name="event_push_summary_thread_id", + self.db_pool.updates.register_background_index_update( + "event_push_summary_thread_id_null", + index_name="event_push_summary_thread_id_null", table="event_push_summary", + columns=["thread_id"], + where_clause="thread_id IS NULL", ) + # Check ASAP (and then later, every 1s) to see if we have finished + # background updates the event_push_actions and event_push_summary tables. + self._clock.call_later(0.0, self._check_event_push_backfill_thread_id) + self._event_push_backfill_thread_id_done = False + + @wrap_as_background_process("check_event_push_backfill_thread_id") + async def _check_event_push_backfill_thread_id(self) -> None: + """ + Has thread_id finished backfilling? + + If not, we need to just-in-time update it so the queries work. + """ + done = await self.db_pool.updates.has_completed_background_update( + "event_push_backfill_thread_id" + ) + + if done: + self._event_push_backfill_thread_id_done = True + else: + # Reschedule to run. + self._clock.call_later(15.0, self._check_event_push_backfill_thread_id) + + async def _background_backfill_thread_id( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Fill in the thread_id field for event_push_actions and event_push_summary. + + This is preparatory so that it can be made non-nullable in the future. + + Because all current (null) data is done in an unthreaded manner this + simply assumes it is on the "main" timeline. Since event_push_actions + are periodically cleared it is not possible to correctly re-calculate + the thread_id. + """ + event_push_actions_done = progress.get("event_push_actions_done", False) + + def add_thread_id_txn( + txn: LoggingTransaction, start_stream_ordering: int + ) -> int: + sql = """ + SELECT stream_ordering + FROM event_push_actions + WHERE + thread_id IS NULL + AND stream_ordering > ? + ORDER BY stream_ordering + LIMIT ? + """ + txn.execute(sql, (start_stream_ordering, batch_size)) + + # No more rows to process. + rows = txn.fetchall() + if not rows: + progress["event_push_actions_done"] = True + self.db_pool.updates._background_update_progress_txn( + txn, "event_push_backfill_thread_id", progress + ) + return 0 + + # Update the thread ID for any of those rows. + max_stream_ordering = rows[-1][0] + + sql = """ + UPDATE event_push_actions + SET thread_id = 'main' + WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL + """ + txn.execute( + sql, + ( + start_stream_ordering, + max_stream_ordering, + ), + ) + + # Update progress. + processed_rows = txn.rowcount + progress["max_event_push_actions_stream_ordering"] = max_stream_ordering + self.db_pool.updates._background_update_progress_txn( + txn, "event_push_backfill_thread_id", progress + ) + + return processed_rows + + def add_thread_id_summary_txn(txn: LoggingTransaction) -> int: + min_user_id = progress.get("max_summary_user_id", "") + min_room_id = progress.get("max_summary_room_id", "") + + # Slightly overcomplicated query for getting the Nth user ID / room + # ID tuple, or the last if there are less than N remaining. + sql = """ + SELECT user_id, room_id FROM ( + SELECT user_id, room_id FROM event_push_summary + WHERE (user_id, room_id) > (?, ?) + AND thread_id IS NULL + ORDER BY user_id, room_id + LIMIT ? + ) AS e + ORDER BY user_id DESC, room_id DESC + LIMIT 1 + """ + + txn.execute(sql, (min_user_id, min_room_id, batch_size)) + row = txn.fetchone() + if not row: + return 0 + + max_user_id, max_room_id = row + + sql = """ + UPDATE event_push_summary + SET thread_id = 'main' + WHERE + (?, ?) < (user_id, room_id) AND (user_id, room_id) <= (?, ?) + AND thread_id IS NULL + """ + txn.execute(sql, (min_user_id, min_room_id, max_user_id, max_room_id)) + processed_rows = txn.rowcount + + progress["max_summary_user_id"] = max_user_id + progress["max_summary_room_id"] = max_room_id + self.db_pool.updates._background_update_progress_txn( + txn, "event_push_backfill_thread_id", progress + ) + + return processed_rows + + # First update the event_push_actions table, then the event_push_summary table. + # + # Note that the event_push_actions_staging table is ignored since it is + # assumed that items in that table will only exist for a short period of + # time. + if not event_push_actions_done: + result = await self.db_pool.runInteraction( + "event_push_backfill_thread_id", + add_thread_id_txn, + progress.get("max_event_push_actions_stream_ordering", 0), + ) + else: + result = await self.db_pool.runInteraction( + "event_push_backfill_thread_id", + add_thread_id_summary_txn, + ) + + # Only done after the event_push_summary table is done. + if not result: + await self.db_pool.updates._end_background_update( + "event_push_backfill_thread_id" + ) + + return result + async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]: """Get the notification count by room for a user. Only considers notifications, not highlight or unread counts, and threads are currently aggregated under their room. @@ -552,6 +711,25 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), ) + # First ensure that the existing rows have an updated thread_id field. + if not self._event_push_backfill_thread_id_done: + txn.execute( + """ + UPDATE event_push_summary + SET thread_id = ? + WHERE room_id = ? AND user_id = ? AND thread_id is NULL + """, + (MAIN_TIMELINE, room_id, user_id), + ) + txn.execute( + """ + UPDATE event_push_actions + SET thread_id = ? + WHERE room_id = ? AND user_id = ? AND thread_id is NULL + """, + (MAIN_TIMELINE, room_id, user_id), + ) + # First we pull the counts from the summary table. # # We check that `last_receipt_stream_ordering` matches the stream ordering of the @@ -1367,6 +1545,25 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas (room_id, user_id, stream_ordering, *thread_args), ) + # First ensure that the existing rows have an updated thread_id field. + if not self._event_push_backfill_thread_id_done: + txn.execute( + """ + UPDATE event_push_summary + SET thread_id = ? + WHERE room_id = ? AND user_id = ? AND thread_id is NULL + """, + (MAIN_TIMELINE, room_id, user_id), + ) + txn.execute( + """ + UPDATE event_push_actions + SET thread_id = ? + WHERE room_id = ? AND user_id = ? AND thread_id is NULL + """, + (MAIN_TIMELINE, room_id, user_id), + ) + # Fetch the notification counts between the stream ordering of the # latest receipt and what was previously summarised. unread_counts = self._get_notif_unread_count_for_user_room( @@ -1501,6 +1698,19 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas rotate_to_stream_ordering: The new maximum event stream ordering to summarise. """ + # Ensure that any new actions have an updated thread_id. + if not self._event_push_backfill_thread_id_done: + txn.execute( + """ + UPDATE event_push_actions + SET thread_id = ? + WHERE ? < stream_ordering AND stream_ordering <= ? AND thread_id IS NULL + """, + (MAIN_TIMELINE, old_rotate_stream_ordering, rotate_to_stream_ordering), + ) + + # XXX Do we need to update summaries here too? + # Calculate the new counts that should be upserted into event_push_summary sql = """ SELECT user_id, room_id, thread_id, @@ -1563,6 +1773,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas logger.info("Rotating notifications, handling %d rows", len(summaries)) + # Ensure that any updated threads have the proper thread_id. + if not self._event_push_backfill_thread_id_done: + txn.execute_batch( + """ + UPDATE event_push_summary + SET thread_id = ? + WHERE room_id = ? AND user_id = ? AND thread_id is NULL + """, + [ + (MAIN_TIMELINE, room_id, user_id) + for user_id, room_id, _ in summaries + ], + ) + self.db_pool.simple_upsert_many_txn( txn, table="event_push_summary", diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 741563abc6..1672976209 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -106,9 +106,6 @@ Changes in SCHEMA_VERSION = 76: SCHEMA_COMPAT_VERSION = ( # Queries against `event_stream_ordering` columns in membership tables must # be disambiguated. - # - # The threads_id column must written to with non-null values for the - # event_push_actions, event_push_actions_staging, and event_push_summary tables. 74 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat diff --git a/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql b/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql deleted file mode 100644 index ce6f9ff937..0000000000 --- a/synapse/storage/schema/main/delta/76/04thread_notifications_backfill.sql +++ /dev/null @@ -1,28 +0,0 @@ -/* Copyright 2023 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- Force the background updates from 06thread_notifications.sql to run in the --- foreground as code will now require those to be "done". - -DELETE FROM background_updates WHERE update_name = 'event_push_backfill_thread_id'; - --- Overwrite any null thread_id values. -UPDATE event_push_actions_staging SET thread_id = 'main' WHERE thread_id IS NULL; -UPDATE event_push_actions SET thread_id = 'main' WHERE thread_id IS NULL; -UPDATE event_push_summary SET thread_id = 'main' WHERE thread_id IS NULL; - --- Drop the background updates to calculate the indexes used to find null thread_ids. -DELETE FROM background_updates WHERE update_name = 'event_push_actions_thread_id_null'; -DELETE FROM background_updates WHERE update_name = 'event_push_summary_thread_id_null'; diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres deleted file mode 100644 index 40936def6f..0000000000 --- a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.postgres +++ /dev/null @@ -1,37 +0,0 @@ -/* Copyright 2022 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- The thread_id columns can now be made non-nullable, this is done by using a --- constraint (and not altering the column) to avoid taking out a full table lock. --- --- We initially add an invalid constraint which guards against new data (this --- doesn't lock the table). -ALTER TABLE event_push_actions_staging - ADD CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id IS NOT NULL) NOT VALID; -ALTER TABLE event_push_actions - ADD CONSTRAINT event_push_actions_thread_id CHECK (thread_id IS NOT NULL) NOT VALID; -ALTER TABLE event_push_summary - ADD CONSTRAINT event_push_summary_thread_id CHECK (thread_id IS NOT NULL) NOT VALID; - --- We then validate the constraint which doesn't need to worry about new data. It --- only needs a SHARE UPDATE EXCLUSIVE lock but can still take a while to complete. -INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (7605, 'event_push_actions_staging_thread_id', '{}'), - (7605, 'event_push_actions_thread_id', '{}'), - (7605, 'event_push_summary_thread_id', '{}'); - --- Drop the indexes used to find null thread_ids. -DROP INDEX IF EXISTS event_push_actions_thread_id_null; -DROP INDEX IF EXISTS event_push_summary_thread_id_null; diff --git a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite b/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite deleted file mode 100644 index e9372b6cf9..0000000000 --- a/synapse/storage/schema/main/delta/76/05thread_notifications_not_null.sql.sqlite +++ /dev/null @@ -1,102 +0,0 @@ -/* Copyright 2022 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -- The thread_id columns can now be made non-nullable. --- --- SQLite doesn't support modifying columns to an existing table, so it must --- be recreated. - --- Create the new tables. -CREATE TABLE event_push_actions_staging_new ( - event_id TEXT NOT NULL, - user_id TEXT NOT NULL, - actions TEXT NOT NULL, - notif SMALLINT NOT NULL, - highlight SMALLINT NOT NULL, - unread SMALLINT, - thread_id TEXT, - inserted_ts BIGINT, - CONSTRAINT event_push_actions_staging_thread_id CHECK (thread_id is NOT NULL) -); - -CREATE TABLE event_push_actions_new ( - room_id TEXT NOT NULL, - event_id TEXT NOT NULL, - user_id TEXT NOT NULL, - profile_tag VARCHAR(32), - actions TEXT NOT NULL, - topological_ordering BIGINT, - stream_ordering BIGINT, - notif SMALLINT, - highlight SMALLINT, - unread SMALLINT, - thread_id TEXT, - CONSTRAINT event_id_user_id_profile_tag_uniqueness UNIQUE (room_id, event_id, user_id, profile_tag), - CONSTRAINT event_push_actions_thread_id CHECK (thread_id is NOT NULL) -); - -CREATE TABLE event_push_summary_new ( - user_id TEXT NOT NULL, - room_id TEXT NOT NULL, - notif_count BIGINT NOT NULL, - stream_ordering BIGINT NOT NULL, - unread_count BIGINT, - last_receipt_stream_ordering BIGINT, - thread_id TEXT, - CONSTRAINT event_push_summary_thread_id CHECK (thread_id is NOT NULL) -); - --- Copy the data. -INSERT INTO event_push_actions_staging_new (event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts) - SELECT event_id, user_id, actions, notif, highlight, unread, thread_id, inserted_ts - FROM event_push_actions_staging; - -INSERT INTO event_push_actions_new (room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id) - SELECT room_id, event_id, user_id, profile_tag, actions, topological_ordering, stream_ordering, notif, highlight, unread, thread_id - FROM event_push_actions; - -INSERT INTO event_push_summary_new (user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id) - SELECT user_id, room_id, notif_count, stream_ordering, unread_count, last_receipt_stream_ordering, thread_id - FROM event_push_summary; - --- Drop the old tables. -DROP TABLE event_push_actions_staging; -DROP TABLE event_push_actions; -DROP TABLE event_push_summary; - --- Rename the tables. -ALTER TABLE event_push_actions_staging_new RENAME TO event_push_actions_staging; -ALTER TABLE event_push_actions_new RENAME TO event_push_actions; -ALTER TABLE event_push_summary_new RENAME TO event_push_summary; - --- Recreate the indexes. -CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id); - -CREATE INDEX event_push_actions_highlights_index ON event_push_actions (user_id, room_id, topological_ordering, stream_ordering); -CREATE INDEX event_push_actions_rm_tokens on event_push_actions( user_id, room_id, topological_ordering, stream_ordering ); -CREATE INDEX event_push_actions_room_id_user_id on event_push_actions(room_id, user_id); -CREATE INDEX event_push_actions_stream_ordering on event_push_actions( stream_ordering, user_id ); -CREATE INDEX event_push_actions_u_highlight ON event_push_actions (user_id, stream_ordering); - -CREATE UNIQUE INDEX event_push_summary_unique_index2 ON event_push_summary (user_id, room_id, thread_id) ; - --- Recreate some indexes in the background, by re-running the background updates --- from 72/02event_push_actions_index.sql and 72/06thread_notifications.sql. -INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (7403, 'event_push_summary_unique_index2', '{}') - ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}'; -INSERT INTO background_updates (ordering, update_name, progress_json) VALUES - (7403, 'event_push_actions_stream_highlight_index', '{}') - ON CONFLICT (update_name) DO UPDATE SET progress_json = '{}'; |