From 37dbee6490dbb9cd66d887ed4d902ae8482fae2f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 15 May 2018 15:48:40 +0100 Subject: Use events_to_purge table rather than token --- synapse/storage/events.py | 49 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 05cde96afc..b4ae6664f0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1871,6 +1871,10 @@ class EventsStore(EventsWorkerStore): "CREATE INDEX events_to_purge_should_delete" " ON events_to_purge(should_delete)", ) + txn.execute( + "CREATE INDEX events_to_purge_id" + " ON events_to_purge(event_id)", + ) # First ensure that we're not about to delete all the forward extremeties txn.execute( @@ -1927,9 +1931,8 @@ class EventsStore(EventsWorkerStore): txn.execute( "SELECT DISTINCT e.event_id FROM events_to_purge AS e" " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id" - " INNER JOIN events AS e2 ON e2.event_id = ed.event_id" - " WHERE e2.topological_ordering >= ?", - (topological_ordering, ) + " LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id" + " WHERE ep2.event_id IS NULL", ) new_backwards_extrems = txn.fetchall() @@ -1953,16 +1956,22 @@ class EventsStore(EventsWorkerStore): # Get all state groups that are only referenced by events that are # to be deleted. - txn.execute( - "SELECT state_group FROM event_to_state_groups" - " INNER JOIN events USING (event_id)" - " WHERE state_group IN (" - " SELECT DISTINCT state_group FROM events_to_purge" - " INNER JOIN event_to_state_groups USING (event_id)" - " )" - " GROUP BY state_group HAVING MAX(topological_ordering) < ?", - (topological_ordering, ) - ) + # This works by first getting state groups that we may want to delete, + # joining against event_to_state_groups to get events that use that + # state group, then left joining against events_to_purge again. Any + # state group where the left join produce *no nulls* are referenced + # only by events that are going to be purged. + txn.execute(""" + SELECT state_group FROM + ( + SELECT DISTINCT state_group FROM events_to_purge + INNER JOIN event_to_state_groups USING (event_id) + ) AS sp + INNER JOIN event_to_state_groups USING (state_group) + LEFT JOIN events_to_purge AS ep USING (event_id) + GROUP BY state_group + HAVING SUM(CASE WHEN ep.event_id IS NULL THEN 1 ELSE 0 END) = 0 + """) state_rows = txn.fetchall() logger.info("[purge] found %i redundant state groups", len(state_rows)) @@ -2109,10 +2118,20 @@ class EventsStore(EventsWorkerStore): # # So, let's stick it at the end so that we don't block event # persistence. - logger.info("[purge] updating room_depth") + txn.execute(""" + SELECT COALESCE(MIN(depth), 0) + FROM event_backward_extremities AS eb + INNER JOIN event_edges AS eg ON eg.prev_event_id = eb.event_id + INNER JOIN events AS e ON e.event_id = eg.event_id + WHERE eb.room_id = ? + """, (room_id,)) + min_depth, = txn.fetchone() + + logger.info("[purge] updating room_depth to %d", min_depth) + txn.execute( "UPDATE room_depth SET min_depth = ? WHERE room_id = ?", - (topological_ordering, room_id,) + (min_depth, room_id,) ) # finally, drop the temp table. this will commit the txn in sqlite, -- cgit 1.4.1 From 5f27ed75ad804ab9b5287f0deb8fd24b9a3a6232 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 15 May 2018 16:06:30 +0100 Subject: Make purge_history operate on tokens As we're soon going to change how topological_ordering works --- synapse/handlers/message.py | 12 ++++++------ synapse/rest/client/v1/admin.py | 17 ++++++++++------- synapse/storage/events.py | 17 +++++++++-------- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index b793fc4df7..8343b5839d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -86,14 +86,14 @@ class MessageHandler(BaseHandler): # map from purge id to PurgeStatus self._purges_by_id = {} - def start_purge_history(self, room_id, topological_ordering, + def start_purge_history(self, room_id, token, delete_local_events=False): """Start off a history purge on a room. Args: room_id (str): The room to purge from - topological_ordering (int): minimum topo ordering to preserve + token (str): topological token to delete events before delete_local_events (bool): True to delete local events as well as remote ones @@ -115,19 +115,19 @@ class MessageHandler(BaseHandler): self._purges_by_id[purge_id] = PurgeStatus() run_in_background( self._purge_history, - purge_id, room_id, topological_ordering, delete_local_events, + purge_id, room_id, token, delete_local_events, ) return purge_id @defer.inlineCallbacks - def _purge_history(self, purge_id, room_id, topological_ordering, + def _purge_history(self, purge_id, room_id, token, delete_local_events): """Carry out a history purge on a room. Args: purge_id (str): The id for this purge room_id (str): The room to purge from - topological_ordering (int): minimum topo ordering to preserve + token (str): topological token to delete events before delete_local_events (bool): True to delete local events as well as remote ones @@ -138,7 +138,7 @@ class MessageHandler(BaseHandler): try: with (yield self.pagination_lock.write(room_id)): yield self.store.purge_history( - room_id, topological_ordering, delete_local_events, + room_id, token, delete_local_events, ) logger.info("[purge] complete") self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index efd5c9873d..282ce6be42 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -151,10 +151,11 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): if event.room_id != room_id: raise SynapseError(400, "Event is for wrong room.") - depth = event.depth + token = yield self.store.get_topological_token_for_event(event_id) + logger.info( - "[purge] purging up to depth %i (event_id %s)", - depth, event_id, + "[purge] purging up to token %s (event_id %s)", + token, event_id, ) elif 'purge_up_to_ts' in body: ts = body['purge_up_to_ts'] @@ -174,7 +175,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): ) ) if room_event_after_stream_ordering: - (_, depth, _) = room_event_after_stream_ordering + token = yield self.store.get_topological_token_for_event( + room_event_after_stream_ordering, + ) else: logger.warn( "[purge] purging events not possible: No event found " @@ -187,9 +190,9 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): errcode=Codes.NOT_FOUND, ) logger.info( - "[purge] purging up to depth %i (received_ts %i => " + "[purge] purging up to token %d (received_ts %i => " "stream_ordering %i)", - depth, ts, stream_ordering, + token, ts, stream_ordering, ) else: raise SynapseError( @@ -199,7 +202,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet): ) purge_id = yield self.handlers.message_handler.start_purge_history( - room_id, depth, + room_id, token, delete_local_events=delete_local_events, ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index b4ae6664f0..f65e18c1ee 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -33,7 +33,7 @@ from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -from synapse.types import get_domain_from_id +from synapse.types import get_domain_from_id, RoomStreamToken import synapse.metrics # these are only included to make the type annotations work @@ -1803,15 +1803,14 @@ class EventsStore(EventsWorkerStore): return self.runInteraction("get_all_new_events", get_all_new_events_txn) def purge_history( - self, room_id, topological_ordering, delete_local_events, + self, room_id, token, delete_local_events, ): """Deletes room history before a certain point Args: room_id (str): - topological_ordering (int): - minimum topo ordering to preserve + token (str): A topological token to delete events before delete_local_events (bool): if True, we will delete local events as well as remote ones @@ -1821,12 +1820,12 @@ class EventsStore(EventsWorkerStore): return self.runInteraction( "purge_history", - self._purge_history_txn, room_id, topological_ordering, + self._purge_history_txn, room_id, token, delete_local_events, ) def _purge_history_txn( - self, txn, room_id, topological_ordering, delete_local_events, + self, txn, room_id, token, delete_local_events, ): # Tables that should be pruned: # event_auth @@ -1856,6 +1855,8 @@ class EventsStore(EventsWorkerStore): # furthermore, we might already have the table from a previous (failed) # purge attempt, so let's drop the table first. + token = RoomStreamToken.parse(token) + txn.execute("DROP TABLE IF EXISTS events_to_purge") txn.execute( @@ -1888,7 +1889,7 @@ class EventsStore(EventsWorkerStore): rows = txn.fetchall() max_depth = max(row[0] for row in rows) - if max_depth <= topological_ordering: + if max_depth <= token.topological: # We need to ensure we don't delete all the events from the datanase # otherwise we wouldn't be able to send any events (due to not # having any backwards extremeties) @@ -1904,7 +1905,7 @@ class EventsStore(EventsWorkerStore): should_delete_expr += " AND event_id NOT LIKE ?" should_delete_params += ("%:" + self.hs.hostname, ) - should_delete_params += (room_id, topological_ordering) + should_delete_params += (room_id, token.topological) txn.execute( "INSERT INTO events_to_purge" -- cgit 1.4.1 From c945af8799671db8fdfc47acf6b286520fb08715 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 May 2018 10:52:06 +0100 Subject: Move and rename variable --- synapse/storage/events.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f65e18c1ee..22699cb689 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1825,8 +1825,10 @@ class EventsStore(EventsWorkerStore): ) def _purge_history_txn( - self, txn, room_id, token, delete_local_events, + self, txn, room_id, token_str, delete_local_events, ): + token = RoomStreamToken.parse(token_str) + # Tables that should be pruned: # event_auth # event_backward_extremities @@ -1855,8 +1857,6 @@ class EventsStore(EventsWorkerStore): # furthermore, we might already have the table from a previous (failed) # purge attempt, so let's drop the table first. - token = RoomStreamToken.parse(token) - txn.execute("DROP TABLE IF EXISTS events_to_purge") txn.execute( -- cgit 1.4.1 From 43e6e82c4d368d981f8a0015246d19c333511ace Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 May 2018 11:13:31 +0100 Subject: Comments --- synapse/storage/events.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 22699cb689..93ac80ee48 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1872,6 +1872,9 @@ class EventsStore(EventsWorkerStore): "CREATE INDEX events_to_purge_should_delete" " ON events_to_purge(should_delete)", ) + + # We do joins against events_to_purge for e.g. calculating state + # groups to purge, etc., so lets make an index. txn.execute( "CREATE INDEX events_to_purge_id" " ON events_to_purge(event_id)", @@ -2119,6 +2122,11 @@ class EventsStore(EventsWorkerStore): # # So, let's stick it at the end so that we don't block event # persistence. + # + # We do this by calculating the minimum depth of the backwards + # extremities. However, the events in event_backward_extremities + # are ones we don't have yet so we need to look at the events that + # point to it via event_edges table. txn.execute(""" SELECT COALESCE(MIN(depth), 0) FROM event_backward_extremities AS eb -- cgit 1.4.1 From 680530cc7f80533b9d63ce9bed04dc28ca87ca13 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 May 2018 11:47:26 +0100 Subject: Clarify comment --- synapse/storage/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 93ac80ee48..5ebef98c4f 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1931,7 +1931,8 @@ class EventsStore(EventsWorkerStore): logger.info("[purge] Finding new backward extremities") # We calculate the new entries for the backward extremeties by finding - # all events that point to events that are to be purged + # events to be purged that are pointed to by events we're not going to + # purge. txn.execute( "SELECT DISTINCT e.event_id FROM events_to_purge AS e" " INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id" -- cgit 1.4.1