summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py77
1 files changed, 53 insertions, 24 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 96b48cfdbb..00d66886ad 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -33,7 +33,7 @@ from synapse.util.metrics import Measure
 from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from synapse.types import get_domain_from_id
+from synapse.types import get_domain_from_id, RoomStreamToken
 import synapse.metrics
 
 # these are only included to make the type annotations work
@@ -1797,15 +1797,14 @@ class EventsStore(EventsWorkerStore):
         return self.runInteraction("get_all_new_events", get_all_new_events_txn)
 
     def purge_history(
-        self, room_id, topological_ordering, delete_local_events,
+        self, room_id, token, delete_local_events,
     ):
         """Deletes room history before a certain point
 
         Args:
             room_id (str):
 
-            topological_ordering (int):
-                minimum topo ordering to preserve
+            token (str): A topological token to delete events before
 
             delete_local_events (bool):
                 if True, we will delete local events as well as remote ones
@@ -1815,13 +1814,15 @@ class EventsStore(EventsWorkerStore):
 
         return self.runInteraction(
             "purge_history",
-            self._purge_history_txn, room_id, topological_ordering,
+            self._purge_history_txn, room_id, token,
             delete_local_events,
         )
 
     def _purge_history_txn(
-        self, txn, room_id, topological_ordering, delete_local_events,
+        self, txn, room_id, token_str, delete_local_events,
     ):
+        token = RoomStreamToken.parse(token_str)
+
         # Tables that should be pruned:
         #     event_auth
         #     event_backward_extremities
@@ -1866,6 +1867,13 @@ class EventsStore(EventsWorkerStore):
             " ON events_to_purge(should_delete)",
         )
 
+        # We do joins against events_to_purge for e.g. calculating state
+        # groups to purge, etc., so lets make an index.
+        txn.execute(
+            "CREATE INDEX events_to_purge_id"
+            " ON events_to_purge(event_id)",
+        )
+
         # First ensure that we're not about to delete all the forward extremeties
         txn.execute(
             "SELECT e.event_id, e.depth FROM events as e "
@@ -1878,7 +1886,7 @@ class EventsStore(EventsWorkerStore):
         rows = txn.fetchall()
         max_depth = max(row[0] for row in rows)
 
-        if max_depth <= topological_ordering:
+        if max_depth <= token.topological:
             # We need to ensure we don't delete all the events from the datanase
             # otherwise we wouldn't be able to send any events (due to not
             # having any backwards extremeties)
@@ -1894,7 +1902,7 @@ class EventsStore(EventsWorkerStore):
             should_delete_expr += " AND event_id NOT LIKE ?"
             should_delete_params += ("%:" + self.hs.hostname, )
 
-        should_delete_params += (room_id, topological_ordering)
+        should_delete_params += (room_id, token.topological)
 
         txn.execute(
             "INSERT INTO events_to_purge"
@@ -1917,13 +1925,13 @@ class EventsStore(EventsWorkerStore):
         logger.info("[purge] Finding new backward extremities")
 
         # We calculate the new entries for the backward extremeties by finding
-        # all events that point to events that are to be purged
+        # events to be purged that are pointed to by events we're not going to
+        # purge.
         txn.execute(
             "SELECT DISTINCT e.event_id FROM events_to_purge AS e"
             " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id"
-            " INNER JOIN events AS e2 ON e2.event_id = ed.event_id"
-            " WHERE e2.topological_ordering >= ?",
-            (topological_ordering, )
+            " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id"
+            " WHERE ep2.event_id IS NULL",
         )
         new_backwards_extrems = txn.fetchall()
 
@@ -1947,16 +1955,22 @@ class EventsStore(EventsWorkerStore):
 
         # Get all state groups that are only referenced by events that are
         # to be deleted.
-        txn.execute(
-            "SELECT state_group FROM event_to_state_groups"
-            " INNER JOIN events USING (event_id)"
-            " WHERE state_group IN ("
-            "   SELECT DISTINCT state_group FROM events_to_purge"
-            "   INNER JOIN event_to_state_groups USING (event_id)"
-            " )"
-            " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
-            (topological_ordering, )
-        )
+        # This works by first getting state groups that we may want to delete,
+        # joining against event_to_state_groups to get events that use that
+        # state group, then left joining against events_to_purge again. Any
+        # state group where the left join produce *no nulls* are referenced
+        # only by events that are going to be purged.
+        txn.execute("""
+            SELECT state_group FROM
+            (
+                SELECT DISTINCT state_group FROM events_to_purge
+                INNER JOIN event_to_state_groups USING (event_id)
+            ) AS sp
+            INNER JOIN event_to_state_groups USING (state_group)
+            LEFT JOIN events_to_purge AS ep USING (event_id)
+            GROUP BY state_group
+            HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0
+        """)
 
         state_rows = txn.fetchall()
         logger.info("[purge] found %i redundant state groups", len(state_rows))
@@ -2103,10 +2117,25 @@ class EventsStore(EventsWorkerStore):
         #
         # So, let's stick it at the end so that we don't block event
         # persistence.
-        logger.info("[purge] updating room_depth")
+        #
+        # We do this by calculating the minimum depth of the backwards
+        # extremities. However, the events in event_backward_extremities
+        # are ones we don't have yet so we need to look at the events that
+        # point to it via event_edges table.
+        txn.execute("""
+            SELECT COALESCE(MIN(depth), 0)
+            FROM event_backward_extremities AS eb
+            INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id
+            INNER JOIN events AS e ON e.event_id = eg.event_id
+            WHERE eb.room_id = ?
+        """, (room_id,))
+        min_depth, = txn.fetchone()
+
+        logger.info("[purge] updating room_depth to %d", min_depth)
+
         txn.execute(
             "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
-            (topological_ordering, room_id,)
+            (min_depth, room_id,)
         )
 
         # finally, drop the temp table. this will commit the txn in sqlite,