diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a8773374be..a3e12f1e9b 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2296,11 +2296,9 @@ class PersistEventsStore:
self.db_pool.simple_insert_many_txn(
txn,
table="event_edges",
- keys=("event_id", "prev_event_id", "room_id", "is_state"),
+ keys=("event_id", "prev_event_id"),
values=[
- (ev.event_id, e_id, ev.room_id, False)
- for ev in events
- for e_id in ev.prev_event_ids()
+ (ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids()
],
)
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index bea34a4c4a..eeca85fc94 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1,4 +1,4 @@
-# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -64,6 +64,9 @@ class _BackgroundUpdates:
INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
+ EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
+ EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
+
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
@@ -235,6 +238,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
################################################################################
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS,
+ self._background_drop_invalid_event_edges_rows,
+ )
+
+ self.db_pool.updates.register_background_index_update(
+ _BackgroundUpdates.EVENT_EDGES_REPLACE_INDEX,
+ index_name="event_edges_event_id_prev_event_id_idx",
+ table="event_edges",
+ columns=["event_id", "prev_event_id"],
+ unique=True,
+ # the old index which just covered event_id is now redundant.
+ replaces_index="ev_edges_id",
+ )
+
async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -1285,3 +1303,99 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
return 0
+
+ async def _background_drop_invalid_event_edges_rows(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """Drop invalid rows from event_edges
+
+ This only runs for postgres. For SQLite, it all happens synchronously.
+
+ Firstly, drop any rows with is_state=True. These may have been added a long time
+ ago, but they are no longer used.
+
+ We also drop rows that do not correspond to entries in `events`, and add a
+ foreign key.
+ """
+
+ last_event_id = progress.get("last_event_id", "")
+
+ def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool:
+ """Returns True if we're done."""
+
+ # first we need to find an endpoint.
+ txn.execute(
+ """
+ SELECT event_id FROM event_edges
+ WHERE event_id > ?
+ ORDER BY event_id
+ LIMIT 1 OFFSET ?
+ """,
+ (last_event_id, batch_size),
+ )
+
+ endpoint = None
+ row = txn.fetchone()
+
+ if row:
+ endpoint = row[0]
+
+ where_clause = "ee.event_id > ?"
+ args = [last_event_id]
+ if endpoint:
+ where_clause += " AND ee.event_id <= ?"
+ args.append(endpoint)
+
+ # now delete any that:
+ # - have is_state=TRUE, or
+ # - do not correspond to a row in `events`
+ txn.execute(
+ f"""
+ DELETE FROM event_edges
+ WHERE event_id IN (
+ SELECT ee.event_id
+ FROM event_edges ee
+ LEFT JOIN events ev USING (event_id)
+ WHERE ({where_clause}) AND
+ (is_state OR ev.event_id IS NULL)
+ )""",
+ args,
+ )
+
+ logger.info(
+ "cleaned up event_edges up to %s: removed %i/%i rows",
+ endpoint,
+ txn.rowcount,
+ batch_size,
+ )
+
+ if endpoint is not None:
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS,
+ {"last_event_id": endpoint},
+ )
+ return False
+
+ # if that was the final batch, we validate the foreign key.
+ #
+ # The constraint should have been in place and enforced for new rows since
+ # before we started deleting invalid rows, so there's no chance for any
+ # invalid rows to have snuck in the meantime. In other words, this really
+ # ought to succeed.
+ logger.info("cleaned up event_edges; enabling foreign key")
+ txn.execute(
+ "ALTER TABLE event_edges VALIDATE CONSTRAINT event_edges_event_id_fkey"
+ )
+ return True
+
+ done = await self.db_pool.runInteraction(
+ desc="drop_invalid_event_edges", func=drop_invalid_event_edges_txn
+ )
+
+ if done:
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS
+ )
+
+ return batch_size
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ba385f9fc4..87b0d09039 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -214,10 +214,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# Delete all remote non-state events
for table in (
+ "event_edges",
"events",
"event_json",
"event_auth",
- "event_edges",
"event_forward_extremities",
"event_relations",
"event_search",
|