From 60efc51a2bbc31f18a71ad1338afc430bfa65597 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 29 Jun 2021 11:25:34 +0100 Subject: Migrate stream_ordering to a bigint (#10264) * Move background update names out to a separate class `EventsBackgroundUpdatesStore` gets inherited and we don't really want to further pollute the namespace. * Migrate stream_ordering to a bigint * changelog --- .../storage/databases/main/events_bg_updates.py | 136 ++++++++++++++++++--- 1 file changed, 121 insertions(+), 15 deletions(-) (limited to 'synapse/storage/databases/main/events_bg_updates.py') diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index cbe4be1437..39aaee743c 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -29,6 +29,25 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) +_REPLACE_STREAM_ORDRING_SQL_COMMANDS = ( + # there should be no leftover rows without a stream_ordering2, but just in case... + "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL", + # finally, we can drop the rule and switch the columns + "DROP RULE populate_stream_ordering2 ON events", + "ALTER TABLE events DROP COLUMN stream_ordering", + "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering", +) + + +class _BackgroundUpdates: + EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" + EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" + DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" + POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2" + INDEX_STREAM_ORDERING2 = "index_stream_ordering2" + REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" + + @attr.s(slots=True, frozen=True) class _CalculateChainCover: """Return value for _calculate_chain_cover_txn.""" @@ -48,19 +67,15 @@ class _CalculateChainCover: class EventsBackgroundUpdatesStore(SQLBaseStore): - - EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts" - EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url" - DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" - def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) self.db_pool.updates.register_background_update_handler( - self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts + _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, + self._background_reindex_origin_server_ts, ) self.db_pool.updates.register_background_update_handler( - self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, + _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, self._background_reindex_fields_sender, ) @@ -85,7 +100,8 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) self.db_pool.updates.register_background_update_handler( - self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update + _BackgroundUpdates.DELETE_SOFT_FAILED_EXTREMITIES, + self._cleanup_extremities_bg_update, ) self.db_pool.updates.register_background_update_handler( @@ -139,6 +155,24 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) + # bg updates for replacing stream_ordering with a BIGINT + # (these only run on postgres.) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.POPULATE_STREAM_ORDERING2, + self._background_populate_stream_ordering2, + ) + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2, + index_name="events_stream_ordering", + table="events", + columns=["stream_ordering2"], + unique=True, + ) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN, + self._background_replace_stream_ordering_column, + ) + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -190,18 +224,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): } self.db_pool.updates._background_update_progress_txn( - txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress + txn, _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress ) return len(rows) result = await self.db_pool.runInteraction( - self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn + _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn ) if not result: await self.db_pool.updates._end_background_update( - self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME + _BackgroundUpdates.EVENT_FIELDS_SENDER_URL_UPDATE_NAME ) return result @@ -264,18 +298,18 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): } self.db_pool.updates._background_update_progress_txn( - txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress + txn, _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, progress ) return len(rows_to_update) result = await self.db_pool.runInteraction( - self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn + _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn ) if not result: await self.db_pool.updates._end_background_update( - self.EVENT_ORIGIN_SERVER_TS_NAME + _BackgroundUpdates.EVENT_ORIGIN_SERVER_TS_NAME ) return result @@ -454,7 +488,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): if not num_handled: await self.db_pool.updates._end_background_update( - self.DELETE_SOFT_FAILED_EXTREMITIES + _BackgroundUpdates.DELETE_SOFT_FAILED_EXTREMITIES ) def _drop_table_txn(txn): @@ -1009,3 +1043,75 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): await self.db_pool.updates._end_background_update("purged_chain_cover") return result + + async def _background_populate_stream_ordering2( + self, progress: JsonDict, batch_size: int + ) -> int: + """Populate events.stream_ordering2, then replace stream_ordering + + This is to deal with the fact that stream_ordering was initially created as a + 32-bit integer field. + """ + batch_size = max(batch_size, 1) + + def process(txn: Cursor) -> int: + # if this is the first pass, find the minimum stream ordering + last_stream = progress.get("last_stream") + if last_stream is None: + txn.execute( + """ + SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1 + """ + ) + rows = txn.fetchall() + if not rows: + return 0 + last_stream = rows[0][0] - 1 + + txn.execute( + """ + UPDATE events SET stream_ordering2=stream_ordering + WHERE stream_ordering > ? AND stream_ordering <= ? + """, + (last_stream, last_stream + batch_size), + ) + row_count = txn.rowcount + + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.POPULATE_STREAM_ORDERING2, + {"last_stream": last_stream + batch_size}, + ) + return row_count + + result = await self.db_pool.runInteraction( + "_background_populate_stream_ordering2", process + ) + + if result != 0: + return result + + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.POPULATE_STREAM_ORDERING2 + ) + return 0 + + async def _background_replace_stream_ordering_column( + self, progress: JsonDict, batch_size: int + ) -> int: + """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place.""" + + def process(txn: Cursor) -> None: + for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS: + logger.info("completing stream_ordering migration: %s", sql) + txn.execute(sql) + + await self.db_pool.runInteraction( + "_background_replace_stream_ordering_column", process + ) + + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN + ) + + return 0 -- cgit 1.5.1 From 7647b0337fb5d936c88c5949fa92c07bf2137ad0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 29 Jun 2021 12:43:36 +0100 Subject: Fix `populate_stream_ordering2` background job (#10267) It was possible for us not to find any rows in a batch, and hence conclude that we had finished. Let's not do that. --- changelog.d/10267.bugfix | 1 + .../storage/databases/main/events_bg_updates.py | 28 ++++++++++------------ 2 files changed, 13 insertions(+), 16 deletions(-) create mode 100644 changelog.d/10267.bugfix (limited to 'synapse/storage/databases/main/events_bg_updates.py') diff --git a/changelog.d/10267.bugfix b/changelog.d/10267.bugfix new file mode 100644 index 0000000000..7ebda7cdc2 --- /dev/null +++ b/changelog.d/10267.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse would return errors after 231 events were handled by the server. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 39aaee743c..da3a7df27b 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1055,32 +1055,28 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): batch_size = max(batch_size, 1) def process(txn: Cursor) -> int: - # if this is the first pass, find the minimum stream ordering - last_stream = progress.get("last_stream") - if last_stream is None: - txn.execute( - """ - SELECT stream_ordering FROM events ORDER BY stream_ordering LIMIT 1 - """ - ) - rows = txn.fetchall() - if not rows: - return 0 - last_stream = rows[0][0] - 1 - + last_stream = progress.get("last_stream", -(1 << 31)) txn.execute( """ UPDATE events SET stream_ordering2=stream_ordering - WHERE stream_ordering > ? AND stream_ordering <= ? + WHERE stream_ordering IN ( + SELECT stream_ordering FROM events WHERE stream_ordering > ? + ORDER BY stream_ordering LIMIT ? + ) + RETURNING stream_ordering; """, - (last_stream, last_stream + batch_size), + (last_stream, batch_size), ) row_count = txn.rowcount + if row_count == 0: + return 0 + last_stream = max(row[0] for row in txn) + logger.info("populated stream_ordering2 up to %i", last_stream) self.db_pool.updates._background_update_progress_txn( txn, _BackgroundUpdates.POPULATE_STREAM_ORDERING2, - {"last_stream": last_stream + batch_size}, + {"last_stream": last_stream}, ) return row_count -- cgit 1.5.1 From 859dc05b3692a3672c1a0db8deaaa9274b6aa6f5 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 30 Jun 2021 15:01:24 +0100 Subject: Rebuild other indexes using `stream_ordering` (#10282) We need to rebuild *all* of the indexes that use the current `stream_ordering` column. --- changelog.d/10282.bugfix | 1 + .../storage/databases/main/events_bg_updates.py | 50 ++++++++++++++++++++-- .../60/01recreate_stream_ordering.sql.postgres | 11 +++-- 3 files changed, 56 insertions(+), 6 deletions(-) create mode 100644 changelog.d/10282.bugfix (limited to 'synapse/storage/databases/main/events_bg_updates.py') diff --git a/changelog.d/10282.bugfix b/changelog.d/10282.bugfix new file mode 100644 index 0000000000..7ebda7cdc2 --- /dev/null +++ b/changelog.d/10282.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse would return errors after 231 events were handled by the server. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index da3a7df27b..1c95c66648 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -29,13 +29,18 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) -_REPLACE_STREAM_ORDRING_SQL_COMMANDS = ( +_REPLACE_STREAM_ORDERING_SQL_COMMANDS = ( # there should be no leftover rows without a stream_ordering2, but just in case... "UPDATE events SET stream_ordering2 = stream_ordering WHERE stream_ordering2 IS NULL", - # finally, we can drop the rule and switch the columns + # now we can drop the rule and switch the columns "DROP RULE populate_stream_ordering2 ON events", "ALTER TABLE events DROP COLUMN stream_ordering", "ALTER TABLE events RENAME COLUMN stream_ordering2 TO stream_ordering", + # ... and finally, rename the indexes into place for consistency with sqlite + "ALTER INDEX event_contains_url_index2 RENAME TO event_contains_url_index", + "ALTER INDEX events_order_room2 RENAME TO events_order_room", + "ALTER INDEX events_room_stream2 RENAME TO events_room_stream", + "ALTER INDEX events_ts2 RENAME TO events_ts", ) @@ -45,6 +50,10 @@ class _BackgroundUpdates: DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities" POPULATE_STREAM_ORDERING2 = "populate_stream_ordering2" INDEX_STREAM_ORDERING2 = "index_stream_ordering2" + INDEX_STREAM_ORDERING2_CONTAINS_URL = "index_stream_ordering2_contains_url" + INDEX_STREAM_ORDERING2_ROOM_ORDER = "index_stream_ordering2_room_order" + INDEX_STREAM_ORDERING2_ROOM_STREAM = "index_stream_ordering2_room_stream" + INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts" REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" @@ -155,12 +164,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) + ################################################################################ + # bg updates for replacing stream_ordering with a BIGINT # (these only run on postgres.) + self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.POPULATE_STREAM_ORDERING2, self._background_populate_stream_ordering2, ) + # CREATE UNIQUE INDEX events_stream_ordering ON events(stream_ordering2); self.db_pool.updates.register_background_index_update( _BackgroundUpdates.INDEX_STREAM_ORDERING2, index_name="events_stream_ordering", @@ -168,11 +181,42 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): columns=["stream_ordering2"], unique=True, ) + # CREATE INDEX event_contains_url_index ON events(room_id, topological_ordering, stream_ordering) WHERE contains_url = true AND outlier = false; + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_CONTAINS_URL, + index_name="event_contains_url_index2", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering2"], + where_clause="contains_url = true AND outlier = false", + ) + # CREATE INDEX events_order_room ON events(room_id, topological_ordering, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_ORDER, + index_name="events_order_room2", + table="events", + columns=["room_id", "topological_ordering", "stream_ordering2"], + ) + # CREATE INDEX events_room_stream ON events(room_id, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_ROOM_STREAM, + index_name="events_room_stream2", + table="events", + columns=["room_id", "stream_ordering2"], + ) + # CREATE INDEX events_ts ON events(origin_server_ts, stream_ordering); + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.INDEX_STREAM_ORDERING2_TS, + index_name="events_ts2", + table="events", + columns=["origin_server_ts", "stream_ordering2"], + ) self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.REPLACE_STREAM_ORDERING_COLUMN, self._background_replace_stream_ordering_column, ) + ################################################################################ + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -1098,7 +1142,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): """Drop the old 'stream_ordering' column and rename 'stream_ordering2' into its place.""" def process(txn: Cursor) -> None: - for sql in _REPLACE_STREAM_ORDRING_SQL_COMMANDS: + for sql in _REPLACE_STREAM_ORDERING_SQL_COMMANDS: logger.info("completing stream_ordering migration: %s", sql) txn.execute(sql) diff --git a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres index 88c9f8bd0d..b5fb763ddd 100644 --- a/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres +++ b/synapse/storage/schema/main/delta/60/01recreate_stream_ordering.sql.postgres @@ -31,10 +31,15 @@ CREATE OR REPLACE RULE "populate_stream_ordering2" AS INSERT INTO background_updates (ordering, update_name, progress_json) VALUES (6001, 'populate_stream_ordering2', '{}'); --- ... and another to build an index on it +-- ... and some more to build indexes on it. These aren't really interdependent +-- but the backround_updates manager can only handle a single dependency per update. INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'); + (6001, 'index_stream_ordering2', '{}', 'populate_stream_ordering2'), + (6001, 'index_stream_ordering2_room_order', '{}', 'index_stream_ordering2'), + (6001, 'index_stream_ordering2_contains_url', '{}', 'index_stream_ordering2_room_order'), + (6001, 'index_stream_ordering2_room_stream', '{}', 'index_stream_ordering2_contains_url'), + (6001, 'index_stream_ordering2_ts', '{}', 'index_stream_ordering2_room_stream'); -- ... and another to do the switcheroo INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES - (6001, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2'); + (6003, 'replace_stream_ordering_column', '{}', 'index_stream_ordering2_ts'); -- cgit 1.5.1 From 9ad84558951dd970dc2a362c923552141a42a5f3 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 7 Jul 2021 11:56:17 +0200 Subject: ANALYZE new stream ordering column (#10326) Fixes #10325 --- changelog.d/10326.bugfix | 1 + synapse/storage/databases/main/events_bg_updates.py | 10 ++++++++++ 2 files changed, 11 insertions(+) create mode 100644 changelog.d/10326.bugfix (limited to 'synapse/storage/databases/main/events_bg_updates.py') diff --git a/changelog.d/10326.bugfix b/changelog.d/10326.bugfix new file mode 100644 index 0000000000..7ebda7cdc2 --- /dev/null +++ b/changelog.d/10326.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse would return errors after 231 events were handled by the server. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 1c95c66648..29f33bac55 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1146,6 +1146,16 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): logger.info("completing stream_ordering migration: %s", sql) txn.execute(sql) + # ANALYZE the new column to build stats on it, to encourage PostgreSQL to use the + # indexes on it. + # We need to pass execute a dummy function to handle the txn's result otherwise + # it tries to call fetchall() on it and fails because there's no result to fetch. + await self.db_pool.execute( + "background_analyze_new_stream_ordering_column", + lambda txn: None, + "ANALYZE events(stream_ordering2)", + ) + await self.db_pool.runInteraction( "_background_replace_stream_ordering_column", process ) -- cgit 1.5.1