diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index bbb6aa992c..28cce2979c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2093,6 +2093,30 @@ class EventsStore(SQLBaseStore):
# state_groups
# state_groups_state
+ # we will build a temporary table listing the events so that we don't
+ # have to keep shovelling the list back and forth across the
+ # connection. Annoyingly the python sqlite driver commits the
+ # transaction on CREATE, so let's do this first.
+ #
+ # furthermore, we might already have the table from a previous (failed)
+ # purge attempt, so let's drop the table first.
+
+ txn.execute("DROP TABLE IF EXISTS events_to_purge")
+
+ txn.execute(
+ "CREATE TEMPORARY TABLE events_to_purge ("
+ " event_id TEXT NOT NULL,"
+ " should_delete BOOLEAN NOT NULL"
+ ")"
+ )
+
+ # create an index on should_delete because later we'll be looking for
+ # the should_delete / shouldn't_delete subsets
+ txn.execute(
+ "CREATE INDEX events_to_purge_should_delete"
+ " ON events_to_purge(should_delete)",
+ )
+
# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
@@ -2115,23 +2139,30 @@ class EventsStore(SQLBaseStore):
logger.info("[purge] looking for events to delete")
+ should_delete_expr = "state_key IS NULL"
+ should_delete_params = ()
+ if not delete_local_events:
+ should_delete_expr += " AND event_id NOT LIKE ?"
+ should_delete_params += ("%:" + self.hs.hostname, )
+
+ should_delete_params += (room_id, topological_ordering)
+
+ txn.execute(
+ "INSERT INTO events_to_purge"
+ " SELECT event_id, %s"
+ " FROM events AS e LEFT JOIN state_events USING (event_id)"
+ " WHERE e.room_id = ? AND topological_ordering < ?" % (
+ should_delete_expr,
+ ),
+ should_delete_params,
+ )
txn.execute(
- "SELECT event_id, state_key FROM events"
- " LEFT JOIN state_events USING (room_id, event_id)"
- " WHERE room_id = ? AND topological_ordering < ?",
- (room_id, topological_ordering,)
+ "SELECT event_id, should_delete FROM events_to_purge"
)
event_rows = txn.fetchall()
-
- to_delete = [
- (event_id,) for event_id, state_key in event_rows
- if state_key is None and (
- delete_local_events or not self.hs.is_mine_id(event_id)
- )
- ]
logger.info(
"[purge] found %i events before cutoff, of which %i can be deleted",
- len(event_rows), len(to_delete),
+ len(event_rows), sum(1 for e in event_rows if e[1]),
)
logger.info("[purge] Finding new backward extremities")
@@ -2139,12 +2170,11 @@ class EventsStore(SQLBaseStore):
# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
txn.execute(
- "SELECT DISTINCT e.event_id FROM events as e"
- " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
- " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
- " WHERE e.room_id = ? AND e.topological_ordering < ?"
- " AND e2.topological_ordering >= ?",
- (room_id, topological_ordering, topological_ordering)
+ "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
+ " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
+ " INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
+ " WHERE e2.topological_ordering >= ?",
+ (topological_ordering, )
)
new_backwards_extrems = txn.fetchall()
@@ -2172,12 +2202,11 @@ class EventsStore(SQLBaseStore):
"SELECT state_group FROM event_to_state_groups"
" INNER JOIN events USING (event_id)"
" WHERE state_group IN ("
- " SELECT DISTINCT state_group FROM events"
+ " SELECT DISTINCT state_group FROM events_to_purge"
" INNER JOIN event_to_state_groups USING (event_id)"
- " WHERE room_id = ? AND topological_ordering < ?"
" )"
" GROUP BY state_group HAVING MAX(topological_ordering) < ?",
- (room_id, topological_ordering, topological_ordering)
+ (topological_ordering, )
)
state_rows = txn.fetchall()
@@ -2262,9 +2291,9 @@ class EventsStore(SQLBaseStore):
)
logger.info("[purge] removing events from event_to_state_groups")
- txn.executemany(
- "DELETE FROM event_to_state_groups WHERE event_id = ?",
- [(event_id,) for event_id, _ in event_rows]
+ txn.execute(
+ "DELETE FROM event_to_state_groups "
+ "WHERE event_id IN (SELECT event_id from events_to_purge)"
)
for event_id, _ in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (
@@ -2289,22 +2318,35 @@ class EventsStore(SQLBaseStore):
):
logger.info("[purge] removing events from %s", table)
- txn.executemany(
- "DELETE FROM %s WHERE event_id = ?" % (table,),
- to_delete
+ txn.execute(
+ "DELETE FROM %s WHERE event_id IN ("
+ " SELECT event_id FROM events_to_purge WHERE should_delete"
+ ")" % (table,),
+ )
+
+ # event_push_actions lacks an index on event_id, and has one on
+ # (room_id, event_id) instead.
+ for table in (
+ "event_push_actions",
+ ):
+ logger.info("[purge] removing events from %s", table)
+
+ txn.execute(
+ "DELETE FROM %s WHERE room_id = ? AND event_id IN ("
+ " SELECT event_id FROM events_to_purge WHERE should_delete"
+ ")" % (table,),
+ (room_id, )
)
# Mark all state and own events as outliers
logger.info("[purge] marking remaining events as outliers")
- txn.executemany(
+ txn.execute(
"UPDATE events SET outlier = ?"
- " WHERE event_id = ?",
- [
- (True, event_id,) for event_id, state_key in event_rows
- if state_key is not None or (
- not delete_local_events and self.hs.is_mine_id(event_id)
- )
- ]
+ " WHERE event_id IN ("
+ " SELECT event_id FROM events_to_purge "
+ " WHERE NOT should_delete"
+ ")",
+ (True,),
)
# synapse tries to take out an exclusive lock on room_depth whenever it
@@ -2319,6 +2361,12 @@ class EventsStore(SQLBaseStore):
(topological_ordering, room_id,)
)
+ # finally, drop the temp table. this will commit the txn in sqlite,
+ # so make sure to keep this actually last.
+ txn.execute(
+ "DROP TABLE events_to_purge"
+ )
+
logger.info("[purge] done")
@defer.inlineCallbacks
|