diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2022-06-15 12:29:42 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-06-15 12:29:42 +0100 |
commit | 75fb10ee45950a175ee286b36fb5a46f123d7db5 (patch) | |
tree | 6e8283206657ac86a16cd19f1ddc1f3bf61fd301 /synapse/storage/databases | |
parent | Fix typechecks against twisted trunk (#13061) (diff) | |
download | synapse-75fb10ee45950a175ee286b36fb5a46f123d7db5.tar.xz |
Clean up schema for `event_edges` (#12893)
* Remove redundant references to `event_edges.room_id` We don't need to care about the room_id here, because we are already checking the event id. * Clean up the event_edges table We make a number of changes to `event_edges`: * We give the `room_id` and `is_state` columns defaults (null and false respectively) so that we can stop populating them. * We drop any rows that have `is_state` set true - they should no longer exist. * We drop any rows that do not exist in `events` - these should not exist either. * We drop the old unique constraint on all the colums, which wasn't much use. * We create a new unique index on `(event_id, prev_event_id)`. * We add a foreign key constraint to `events`. These happen rather differently depending on whether we are on Postgres or SQLite. For SQLite, we just rebuild the whole table, copying only the rows we want to keep. For Postgres, we try to do things in the background as much as possible. * Stop populating `event_edges.room_id` and `is_state` We can just rely on the defaults.
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/events.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_bg_updates.py | 116 | ||||
-rw-r--r-- | synapse/storage/databases/main/purge_events.py | 2 |
3 files changed, 118 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a8773374be..a3e12f1e9b 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2296,11 +2296,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="event_edges", - keys=("event_id", "prev_event_id", "room_id", "is_state"), + keys=("event_id", "prev_event_id"), values=[ - (ev.event_id, e_id, ev.room_id, False) - for ev in events - for e_id in ev.prev_event_ids() + (ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids() ], ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index bea34a4c4a..eeca85fc94 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1,4 +1,4 @@ -# Copyright 2019-2021 The Matrix.org Foundation C.I.C. +# Copyright 2019-2022 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -64,6 +64,9 @@ class _BackgroundUpdates: INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts" REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" + EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows" + EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index" + @attr.s(slots=True, frozen=True, auto_attribs=True) class _CalculateChainCover: @@ -235,6 +238,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ################################################################################ + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS, + self._background_drop_invalid_event_edges_rows, + ) + + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.EVENT_EDGES_REPLACE_INDEX, + index_name="event_edges_event_id_prev_event_id_idx", + table="event_edges", + columns=["event_id", "prev_event_id"], + unique=True, + # the old index which just covered event_id is now redundant. + replaces_index="ev_edges_id", + ) + async def _background_reindex_fields_sender( self, progress: JsonDict, batch_size: int ) -> int: @@ -1285,3 +1303,99 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) return 0 + + async def _background_drop_invalid_event_edges_rows( + self, progress: JsonDict, batch_size: int + ) -> int: + """Drop invalid rows from event_edges + + This only runs for postgres. For SQLite, it all happens synchronously. + + Firstly, drop any rows with is_state=True. These may have been added a long time + ago, but they are no longer used. + + We also drop rows that do not correspond to entries in `events`, and add a + foreign key. + """ + + last_event_id = progress.get("last_event_id", "") + + def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool: + """Returns True if we're done.""" + + # first we need to find an endpoint. + txn.execute( + """ + SELECT event_id FROM event_edges + WHERE event_id > ? + ORDER BY event_id + LIMIT 1 OFFSET ? + """, + (last_event_id, batch_size), + ) + + endpoint = None + row = txn.fetchone() + + if row: + endpoint = row[0] + + where_clause = "ee.event_id > ?" + args = [last_event_id] + if endpoint: + where_clause += " AND ee.event_id <= ?" + args.append(endpoint) + + # now delete any that: + # - have is_state=TRUE, or + # - do not correspond to a row in `events` + txn.execute( + f""" + DELETE FROM event_edges + WHERE event_id IN ( + SELECT ee.event_id + FROM event_edges ee + LEFT JOIN events ev USING (event_id) + WHERE ({where_clause}) AND + (is_state OR ev.event_id IS NULL) + )""", + args, + ) + + logger.info( + "cleaned up event_edges up to %s: removed %i/%i rows", + endpoint, + txn.rowcount, + batch_size, + ) + + if endpoint is not None: + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS, + {"last_event_id": endpoint}, + ) + return False + + # if that was the final batch, we validate the foreign key. + # + # The constraint should have been in place and enforced for new rows since + # before we started deleting invalid rows, so there's no chance for any + # invalid rows to have snuck in the meantime. In other words, this really + # ought to succeed. + logger.info("cleaned up event_edges; enabling foreign key") + txn.execute( + "ALTER TABLE event_edges VALIDATE CONSTRAINT event_edges_event_id_fkey" + ) + return True + + done = await self.db_pool.runInteraction( + desc="drop_invalid_event_edges", func=drop_invalid_event_edges_txn + ) + + if done: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS + ) + + return batch_size diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index ba385f9fc4..87b0d09039 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -214,10 +214,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # Delete all remote non-state events for table in ( + "event_edges", "events", "event_json", "event_auth", - "event_edges", "event_forward_extremities", "event_relations", "event_search", |