diff options
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/events.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_bg_updates.py | 116 | ||||
-rw-r--r-- | synapse/storage/databases/main/purge_events.py | 2 |
3 files changed, 118 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a8773374be..a3e12f1e9b 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -2296,11 +2296,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="event_edges", - keys=("event_id", "prev_event_id", "room_id", "is_state"), + keys=("event_id", "prev_event_id"), values=[ - (ev.event_id, e_id, ev.room_id, False) - for ev in events - for e_id in ev.prev_event_ids() + (ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids() ], ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index bea34a4c4a..eeca85fc94 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1,4 +1,4 @@ -# Copyright 2019-2021 The Matrix.org Foundation C.I.C. +# Copyright 2019-2022 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -64,6 +64,9 @@ class _BackgroundUpdates: INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts" REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" + EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows" + EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index" + @attr.s(slots=True, frozen=True, auto_attribs=True) class _CalculateChainCover: @@ -235,6 +238,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ################################################################################ + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS, + self._background_drop_invalid_event_edges_rows, + ) + + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.EVENT_EDGES_REPLACE_INDEX, + index_name="event_edges_event_id_prev_event_id_idx", + table="event_edges", + columns=["event_id", "prev_event_id"], + unique=True, + # the old index which just covered event_id is now redundant. + replaces_index="ev_edges_id", + ) + async def _background_reindex_fields_sender( self, progress: JsonDict, batch_size: int ) -> int: @@ -1285,3 +1303,99 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) return 0 + + async def _background_drop_invalid_event_edges_rows( + self, progress: JsonDict, batch_size: int + ) -> int: + """Drop invalid rows from event_edges + + This only runs for postgres. For SQLite, it all happens synchronously. + + Firstly, drop any rows with is_state=True. These may have been added a long time + ago, but they are no longer used. + + We also drop rows that do not correspond to entries in `events`, and add a + foreign key. + """ + + last_event_id = progress.get("last_event_id", "") + + def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool: + """Returns True if we're done.""" + + # first we need to find an endpoint. + txn.execute( + """ + SELECT event_id FROM event_edges + WHERE event_id > ? + ORDER BY event_id + LIMIT 1 OFFSET ? + """, + (last_event_id, batch_size), + ) + + endpoint = None + row = txn.fetchone() + + if row: + endpoint = row[0] + + where_clause = "ee.event_id > ?" + args = [last_event_id] + if endpoint: + where_clause += " AND ee.event_id <= ?" + args.append(endpoint) + + # now delete any that: + # - have is_state=TRUE, or + # - do not correspond to a row in `events` + txn.execute( + f""" + DELETE FROM event_edges + WHERE event_id IN ( + SELECT ee.event_id + FROM event_edges ee + LEFT JOIN events ev USING (event_id) + WHERE ({where_clause}) AND + (is_state OR ev.event_id IS NULL) + )""", + args, + ) + + logger.info( + "cleaned up event_edges up to %s: removed %i/%i rows", + endpoint, + txn.rowcount, + batch_size, + ) + + if endpoint is not None: + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS, + {"last_event_id": endpoint}, + ) + return False + + # if that was the final batch, we validate the foreign key. + # + # The constraint should have been in place and enforced for new rows since + # before we started deleting invalid rows, so there's no chance for any + # invalid rows to have snuck in the meantime. In other words, this really + # ought to succeed. + logger.info("cleaned up event_edges; enabling foreign key") + txn.execute( + "ALTER TABLE event_edges VALIDATE CONSTRAINT event_edges_event_id_fkey" + ) + return True + + done = await self.db_pool.runInteraction( + desc="drop_invalid_event_edges", func=drop_invalid_event_edges_txn + ) + + if done: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS + ) + + return batch_size diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index ba385f9fc4..87b0d09039 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -214,10 +214,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # Delete all remote non-state events for table in ( + "event_edges", "events", "event_json", "event_auth", - "event_edges", "event_forward_extremities", "event_relations", "event_search", |