summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorRichard van der Hoff <github@rvanderhoff.org.uk>2018-02-15 09:49:07 +0000
committerGitHub <noreply@github.com>2018-02-15 09:49:07 +0000
commitb8d821aa68a81e22b40806dfd66718b2470c9dbd (patch)
tree0551ee91fcb64d25c6f441c6bd9622fa779d507a /synapse/storage/events.py
parentMerge pull request #2769 from matrix-org/matthew/hit_the_gin (diff)
parentpurge_history: fix sqlite syntax error (diff)
downloadsynapse-b8d821aa68a81e22b40806dfd66718b2470c9dbd.tar.xz
Merge pull request #2867 from matrix-org/rav/rework_purge
purge_history cleanups
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py118
1 files changed, 83 insertions, 35 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index bbb6aa992c..28cce2979c 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2093,6 +2093,30 @@ class EventsStore(SQLBaseStore):
         #     state_groups
         #     state_groups_state
 
+        # we will build a temporary table listing the events so that we don't
+        # have to keep shovelling the list back and forth across the
+        # connection. Annoyingly the python sqlite driver commits the
+        # transaction on CREATE, so let's do this first.
+        #
+        # furthermore, we might already have the table from a previous (failed)
+        # purge attempt, so let's drop the table first.
+
+        txn.execute("DROP TABLE IF EXISTS events_to_purge")
+
+        txn.execute(
+            "CREATE TEMPORARY TABLE events_to_purge ("
+            "    event_id TEXT NOT NULL,"
+            "    should_delete BOOLEAN NOT NULL"
+            ")"
+        )
+
+        # create an index on should_delete because later we'll be looking for
+        # the should_delete / shouldn't_delete subsets
+        txn.execute(
+            "CREATE INDEX events_to_purge_should_delete"
+            " ON events_to_purge(should_delete)",
+        )
+
         # First ensure that we're not about to delete all the forward extremeties
         txn.execute(
             "SELECT e.event_id, e.depth FROM events as e "
@@ -2115,23 +2139,30 @@ class EventsStore(SQLBaseStore):
 
         logger.info("[purge] looking for events to delete")
 
+        should_delete_expr = "state_key IS NULL"
+        should_delete_params = ()
+        if not delete_local_events:
+            should_delete_expr += " AND event_id NOT LIKE ?"
+            should_delete_params += ("%:" + self.hs.hostname, )
+
+        should_delete_params += (room_id, topological_ordering)
+
+        txn.execute(
+            "INSERT INTO events_to_purge"
+            " SELECT event_id, %s"
+            " FROM events AS e LEFT JOIN state_events USING (event_id)"
+            " WHERE e.room_id = ? AND topological_ordering < ?" % (
+                should_delete_expr,
+            ),
+            should_delete_params,
+        )
         txn.execute(
-            "SELECT event_id, state_key FROM events"
-            " LEFT JOIN state_events USING (room_id, event_id)"
-            " WHERE room_id = ? AND topological_ordering < ?",
-            (room_id, topological_ordering,)
+            "SELECT event_id, should_delete FROM events_to_purge"
         )
         event_rows = txn.fetchall()
-
-        to_delete = [
-            (event_id,) for event_id, state_key in event_rows
-            if state_key is None and (
-                delete_local_events or not self.hs.is_mine_id(event_id)
-            )
-        ]
         logger.info(
             "[purge] found %i events before cutoff, of which %i can be deleted",
-            len(event_rows), len(to_delete),
+            len(event_rows), sum(1 for e in event_rows if e[1]),
         )
 
         logger.info("[purge] Finding new backward extremities")
@@ -2139,12 +2170,11 @@ class EventsStore(SQLBaseStore):
         # We calculate the new entries for the backward extremeties by finding
         # all events that point to events that are to be purged
         txn.execute(
-            "SELECT DISTINCT e.event_id FROM events as e"
-            " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
-            " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
-            " WHERE e.room_id = ? AND e.topological_ordering < ?"
-            " AND e2.topological_ordering >= ?",
-            (room_id, topological_ordering, topological_ordering)
+            "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
+            " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
+            " INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
+            " WHERE e2.topological_ordering >= ?",
+            (topological_ordering, )
         )
         new_backwards_extrems = txn.fetchall()
 
@@ -2172,12 +2202,11 @@ class EventsStore(SQLBaseStore):
             "SELECT state_group FROM event_to_state_groups"
             " INNER JOIN events USING (event_id)"
             " WHERE state_group IN ("
-            "   SELECT DISTINCT state_group FROM events"
+            "   SELECT DISTINCT state_group FROM events_to_purge"
             "   INNER JOIN event_to_state_groups USING (event_id)"
-            "   WHERE room_id = ? AND topological_ordering < ?"
             " )"
             " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
-            (room_id, topological_ordering, topological_ordering)
+            (topological_ordering, )
         )
 
         state_rows = txn.fetchall()
@@ -2262,9 +2291,9 @@ class EventsStore(SQLBaseStore):
         )
 
         logger.info("[purge] removing events from event_to_state_groups")
-        txn.executemany(
-            "DELETE FROM event_to_state_groups WHERE event_id = ?",
-            [(event_id,) for event_id, _ in event_rows]
+        txn.execute(
+            "DELETE FROM event_to_state_groups "
+            "WHERE event_id IN (SELECT event_id from events_to_purge)"
         )
         for event_id, _ in event_rows:
             txn.call_after(self._get_state_group_for_event.invalidate, (
@@ -2289,22 +2318,35 @@ class EventsStore(SQLBaseStore):
         ):
             logger.info("[purge] removing events from %s", table)
 
-            txn.executemany(
-                "DELETE FROM %s WHERE event_id = ?" % (table,),
-                to_delete
+            txn.execute(
+                "DELETE FROM %s WHERE event_id IN ("
+                "    SELECT event_id FROM events_to_purge WHERE should_delete"
+                ")" % (table,),
+            )
+
+        # event_push_actions lacks an index on event_id, and has one on
+        # (room_id, event_id) instead.
+        for table in (
+            "event_push_actions",
+        ):
+            logger.info("[purge] removing events from %s", table)
+
+            txn.execute(
+                "DELETE FROM %s WHERE room_id = ? AND event_id IN ("
+                "    SELECT event_id FROM events_to_purge WHERE should_delete"
+                ")" % (table,),
+                (room_id, )
             )
 
         # Mark all state and own events as outliers
         logger.info("[purge] marking remaining events as outliers")
-        txn.executemany(
+        txn.execute(
             "UPDATE events SET outlier = ?"
-            " WHERE event_id = ?",
-            [
-                (True, event_id,) for event_id, state_key in event_rows
-                if state_key is not None or (
-                    not delete_local_events and self.hs.is_mine_id(event_id)
-                )
-            ]
+            " WHERE event_id IN ("
+            "    SELECT event_id FROM events_to_purge "
+            "    WHERE NOT should_delete"
+            ")",
+            (True,),
         )
 
         # synapse tries to take out an exclusive lock on room_depth whenever it
@@ -2319,6 +2361,12 @@ class EventsStore(SQLBaseStore):
             (topological_ordering, room_id,)
         )
 
+        # finally, drop the temp table. this will commit the txn in sqlite,
+        # so make sure to keep this actually last.
+        txn.execute(
+            "DROP TABLE events_to_purge"
+        )
+
         logger.info("[purge] done")
 
     @defer.inlineCallbacks