diff options
author | Amber Brown <hawkowl@atleastfornow.net> | 2018-05-22 10:50:26 -0500 |
---|---|---|
committer | Amber Brown <hawkowl@atleastfornow.net> | 2018-05-22 10:50:26 -0500 |
commit | a8990fa2ec98ea14493515a92d6228729024409b (patch) | |
tree | a0ce94c793d02e80ac95ec43a16c3b11a1884efa /synapse/storage/events.py | |
parent | rest of the changes (diff) | |
parent | Merge pull request #3262 from matrix-org/rav/has_already_consented (diff) | |
download | synapse-a8990fa2ec98ea14493515a92d6228729024409b.tar.xz |
Merge remote-tracking branch 'origin/develop' into 3218-official-prom
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 77 |
1 files changed, 53 insertions, 24 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 96b48cfdbb..00d66886ad 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -33,7 +33,7 @@ from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -from synapse.types import get_domain_from_id +from synapse.types import get_domain_from_id, RoomStreamToken import synapse.metrics # these are only included to make the type annotations work @@ -1797,15 +1797,14 @@ class EventsStore(EventsWorkerStore): return self.runInteraction("get_all_new_events", get_all_new_events_txn) def purge_history( - self, room_id, topological_ordering, delete_local_events, + self, room_id, token, delete_local_events, ): """Deletes room history before a certain point Args: room_id (str): - topological_ordering (int): - minimum topo ordering to preserve + token (str): A topological token to delete events before delete_local_events (bool): if True, we will delete local events as well as remote ones @@ -1815,13 +1814,15 @@ class EventsStore(EventsWorkerStore): return self.runInteraction( "purge_history", - self._purge_history_txn, room_id, topological_ordering, + self._purge_history_txn, room_id, token, delete_local_events, ) def _purge_history_txn( - self, txn, room_id, topological_ordering, delete_local_events, + self, txn, room_id, token_str, delete_local_events, ): + token = RoomStreamToken.parse(token_str) + # Tables that should be pruned: # event_auth # event_backward_extremities @@ -1866,6 +1867,13 @@ class EventsStore(EventsWorkerStore): " ON events_to_purge(should_delete)", ) + # We do joins against events_to_purge for e.g. calculating state + # groups to purge, etc., so lets make an index. + txn.execute( + "CREATE INDEX events_to_purge_id" + " ON events_to_purge(event_id)", + ) + # First ensure that we're not about to delete all the forward extremeties txn.execute( "SELECT e.event_id, e.depth FROM events as e " @@ -1878,7 +1886,7 @@ class EventsStore(EventsWorkerStore): rows = txn.fetchall() max_depth = max(row[0] for row in rows) - if max_depth <= topological_ordering: + if max_depth <= token.topological: # We need to ensure we don't delete all the events from the datanase # otherwise we wouldn't be able to send any events (due to not # having any backwards extremeties) @@ -1894,7 +1902,7 @@ class EventsStore(EventsWorkerStore): should_delete_expr += " AND event_id NOT LIKE ?" should_delete_params += ("%:" + self.hs.hostname, ) - should_delete_params += (room_id, topological_ordering) + should_delete_params += (room_id, token.topological) txn.execute( "INSERT INTO events_to_purge" @@ -1917,13 +1925,13 @@ class EventsStore(EventsWorkerStore): logger.info("[purge] Finding new backward extremities") # We calculate the new entries for the backward extremeties by finding - # all events that point to events that are to be purged + # events to be purged that are pointed to by events we're not going to + # purge. txn.execute( "SELECT DISTINCT e.event_id FROM events_to_purge AS e" " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id" - " INNER JOIN events AS e2 ON e2.event_id = ed.event_id" - " WHERE e2.topological_ordering >= ?", - (topological_ordering, ) + " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id" + " WHERE ep2.event_id IS NULL", ) new_backwards_extrems = txn.fetchall() @@ -1947,16 +1955,22 @@ class EventsStore(EventsWorkerStore): # Get all state groups that are only referenced by events that are # to be deleted. - txn.execute( - "SELECT state_group FROM event_to_state_groups" - " INNER JOIN events USING (event_id)" - " WHERE state_group IN (" - " SELECT DISTINCT state_group FROM events_to_purge" - " INNER JOIN event_to_state_groups USING (event_id)" - " )" - " GROUP BY state_group HAVING MAX(topological_ordering) < ?", - (topological_ordering, ) - ) + # This works by first getting state groups that we may want to delete, + # joining against event_to_state_groups to get events that use that + # state group, then left joining against events_to_purge again. Any + # state group where the left join produce *no nulls* are referenced + # only by events that are going to be purged. + txn.execute(""" + SELECT state_group FROM + ( + SELECT DISTINCT state_group FROM events_to_purge + INNER JOIN event_to_state_groups USING (event_id) + ) AS sp + INNER JOIN event_to_state_groups USING (state_group) + LEFT JOIN events_to_purge AS ep USING (event_id) + GROUP BY state_group + HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0 + """) state_rows = txn.fetchall() logger.info("[purge] found %i redundant state groups", len(state_rows)) @@ -2103,10 +2117,25 @@ class EventsStore(EventsWorkerStore): # # So, let's stick it at the end so that we don't block event # persistence. - logger.info("[purge] updating room_depth") + # + # We do this by calculating the minimum depth of the backwards + # extremities. However, the events in event_backward_extremities + # are ones we don't have yet so we need to look at the events that + # point to it via event_edges table. + txn.execute(""" + SELECT COALESCE(MIN(depth), 0) + FROM event_backward_extremities AS eb + INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id + INNER JOIN events AS e ON e.event_id = eg.event_id + WHERE eb.room_id = ? + """, (room_id,)) + min_depth, = txn.fetchone() + + logger.info("[purge] updating room_depth to %d", min_depth) + txn.execute( "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", - (topological_ordering, room_id,) + (min_depth, room_id,) ) # finally, drop the temp table. this will commit the txn in sqlite, |