summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-05-23 13:54:32 +0100
committerErik Johnston <erik@matrix.org>2018-06-01 16:13:47 +0100
commitc02b5c3953bd4ed94a0272ef840922df07ed046a (patch)
tree99841dec2bbce6418d7457b2b3bd5a005d430d9e
parentFix backfill (diff)
downloadsynapse-github/erikj/chunk_pag_4.tar.xz
-rw-r--r--synapse/storage/events.py69
1 files changed, 59 insertions, 10 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index d38f65b4e6..d25b2597b7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2103,17 +2103,45 @@ class EventsStore(EventsWorkerStore):
             should_delete_expr += " AND event_id NOT LIKE ?"
             should_delete_params += ("%:" + self.hs.hostname, )
 
-        should_delete_params += (room_id, token.topological)
+        # We insert into events_to_purge by paginating backwards from the
+        # current token.
 
-        txn.execute(
-            "INSERT INTO events_to_purge"
-            " SELECT event_id, %s"
-            " FROM events AS e LEFT JOIN state_events USING (event_id)"
-            " WHERE e.room_id = ? AND topological_ordering < ?" % (
-                should_delete_expr,
-            ),
-            should_delete_params,
+        next_token = RoomStreamToken(
+            token.chunk,
+            token.topological - 1,
+            token.stream,
         )
+        while True:
+            rows, next_token, _ = self._paginate_room_events_txn(
+                txn, room_id, next_token, direction='b', limit=1000,
+            )
+            next_token = RoomStreamToken.parse(next_token)
+
+            if len(rows) == 0:
+                break
+
+            txn.executemany(
+                """INSERT INTO events_to_purge
+                SELECT event_id, %s
+                FROM events
+                LEFT JOIN state_events USING (event_id)
+                WHERE event_id = ?
+                """ % (
+                    should_delete_expr,
+                ),
+                (
+                    should_delete_params + (row.event_id,)
+                    for row in rows
+                ),
+            )
+
+        txn.execute("""
+            DELETE FROM events_to_purge
+            WHERE event_id IN (
+                SELECT event_id FROM event_forward_extremities
+            )
+        """)
+
         txn.execute(
             "SELECT event_id, should_delete FROM events_to_purge"
         )
@@ -2152,6 +2180,27 @@ class EventsStore(EventsWorkerStore):
             ]
         )
 
+        txn.execute(
+            """DELETE FROM chunk_backwards_extremities
+            WHERE event_id IN (
+                SELECT event_id FROM events WHERE room_id = ?
+            )
+            """,
+            (room_id,)
+        )
+
+        txn.execute(
+            """
+            INSERT INTO chunk_backwards_extremities
+            SELECT DISTINCT ee.chunk_id, e.event_id
+            FROM events_to_purge AS e
+            INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id
+            INNER JOIN events AS ee ON ee.event_id = ed.event_id
+            LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id
+            WHERE ep2.event_id IS NULL
+            """,
+        )
+
         logger.info("[purge] finding redundant state groups")
 
         # Get all state groups that are only referenced by events that are
@@ -2304,7 +2353,7 @@ class EventsStore(EventsWorkerStore):
         # Mark all state and own events as outliers
         logger.info("[purge] marking remaining events as outliers")
         txn.execute(
-            "UPDATE events SET outlier = ?"
+            "UPDATE events SET outlier = ?, chunk_id = NULL"
             " WHERE event_id IN ("
             "    SELECT event_id FROM events_to_purge "
             "    WHERE NOT should_delete"