summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-02-14 11:02:22 +0000
committerRichard van der Hoff <richard@matrix.org>2018-02-14 11:02:22 +0000
commit5fcbf1e07c5f7c2ce0ec44c2569116507caa0183 (patch)
tree24858e06d4c8100260e1f2776f1a61aba49ec60b /synapse/storage
parentFix log message in purge_history (diff)
downloadsynapse-5fcbf1e07c5f7c2ce0ec44c2569116507caa0183.tar.xz
Rework event purge to use a temporary table
... which should speed things up by reducing the amount of data being shuffled
across the connection
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/events.py93
1 files changed, 58 insertions, 35 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index bbb6aa992c..5a2e6a03d5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2115,23 +2115,44 @@ class EventsStore(SQLBaseStore):
 
         logger.info("[purge] looking for events to delete")
 
+        # we build a temporary table listing the events so that we don't have
+        # to keep shovelling the list back and forth across the connection.
+
         txn.execute(
-            "SELECT event_id, state_key FROM events"
-            " LEFT JOIN state_events USING (room_id, event_id)"
-            " WHERE room_id = ? AND topological_ordering < ?",
-            (room_id, topological_ordering,)
+            "CREATE TEMPORARY TABLE events_to_purge ("
+            "    event_id TEXT NOT NULL,"
+            "    should_delete BOOLEAN NOT NULL"
+            ")"
         )
-        event_rows = txn.fetchall()
 
-        to_delete = [
-            (event_id,) for event_id, state_key in event_rows
-            if state_key is None and (
-                delete_local_events or not self.hs.is_mine_id(event_id)
-            )
-        ]
+        # create an index on should_delete because later we'll be looking for
+        # the should_delete / shouldn't_delete subsets
+        txn.execute("CREATE INDEX ON events_to_purge(should_delete)")
+
+        should_delete_expr = "state_key IS NULL"
+        should_delete_params = ()
+        if not delete_local_events:
+            should_delete_expr += " AND event_id NOT LIKE ?"
+            should_delete_params += ("%:" + self.hs.hostname, )
+
+        should_delete_params += (room_id, topological_ordering)
+
+        txn.execute(
+            "INSERT INTO events_to_purge"
+            " SELECT event_id, %s"
+            " FROM events AS e LEFT JOIN state_events USING (event_id)"
+            " WHERE e.room_id = ? AND topological_ordering < ?" % (
+                should_delete_expr,
+            ),
+            should_delete_params,
+        )
+        txn.execute(
+            "SELECT event_id, should_delete FROM events_to_purge"
+        )
+        event_rows = txn.fetchall()
         logger.info(
             "[purge] found %i events before cutoff, of which %i can be deleted",
-            len(event_rows), len(to_delete),
+            len(event_rows), sum(1 for e in event_rows if e[1]),
         )
 
         logger.info("[purge] Finding new backward extremities")
@@ -2139,12 +2160,11 @@ class EventsStore(SQLBaseStore):
         # We calculate the new entries for the backward extremeties by finding
         # all events that point to events that are to be purged
         txn.execute(
-            "SELECT DISTINCT e.event_id FROM events as e"
-            " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
-            " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
-            " WHERE e.room_id = ? AND e.topological_ordering < ?"
-            " AND e2.topological_ordering >= ?",
-            (room_id, topological_ordering, topological_ordering)
+            "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
+            " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
+            " INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
+            " WHERE e2.topological_ordering >= ?",
+            (topological_ordering, )
         )
         new_backwards_extrems = txn.fetchall()
 
@@ -2172,12 +2192,11 @@ class EventsStore(SQLBaseStore):
             "SELECT state_group FROM event_to_state_groups"
             " INNER JOIN events USING (event_id)"
             " WHERE state_group IN ("
-            "   SELECT DISTINCT state_group FROM events"
+            "   SELECT DISTINCT state_group FROM events_to_purge"
             "   INNER JOIN event_to_state_groups USING (event_id)"
-            "   WHERE room_id = ? AND topological_ordering < ?"
             " )"
             " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
-            (room_id, topological_ordering, topological_ordering)
+            (topological_ordering, )
         )
 
         state_rows = txn.fetchall()
@@ -2262,9 +2281,9 @@ class EventsStore(SQLBaseStore):
         )
 
         logger.info("[purge] removing events from event_to_state_groups")
-        txn.executemany(
-            "DELETE FROM event_to_state_groups WHERE event_id = ?",
-            [(event_id,) for event_id, _ in event_rows]
+        txn.execute(
+            "DELETE FROM event_to_state_groups "
+            "WHERE event_id IN (SELECT event_id from events_to_purge)"
         )
         for event_id, _ in event_rows:
             txn.call_after(self._get_state_group_for_event.invalidate, (
@@ -2289,22 +2308,26 @@ class EventsStore(SQLBaseStore):
         ):
             logger.info("[purge] removing events from %s", table)
 
-            txn.executemany(
-                "DELETE FROM %s WHERE event_id = ?" % (table,),
-                to_delete
+            txn.execute(
+                "DELETE FROM %s WHERE event_id IN ("
+                "    SELECT event_id FROM events_to_purge WHERE should_delete"
+                ")" % (table,),
             )
 
         # Mark all state and own events as outliers
         logger.info("[purge] marking remaining events as outliers")
-        txn.executemany(
+        txn.execute(
             "UPDATE events SET outlier = ?"
-            " WHERE event_id = ?",
-            [
-                (True, event_id,) for event_id, state_key in event_rows
-                if state_key is not None or (
-                    not delete_local_events and self.hs.is_mine_id(event_id)
-                )
-            ]
+            " WHERE event_id IN ("
+            "    SELECT event_id FROM events_to_purge "
+            "    WHERE NOT should_delete"
+            ")",
+            (True,),
+        )
+
+        # we're now done with the temporary table
+        txn.execute(
+            "DROP TABLE events_to_purge"
         )
 
         # synapse tries to take out an exclusive lock on room_depth whenever it