diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/events.py | 349 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/events_bg_updates.py | 63 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/group_server.py | 15 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/registration.py | 8 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql | 17 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/state.py | 29 | ||||
-rw-r--r-- | synapse/storage/purge_events.py | 117 |
8 files changed, 423 insertions, 177 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 0a1a8cc1e5..0460fe8cc9 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -30,6 +30,7 @@ stored in `synapse.storage.schema`. from synapse.storage.data_stores import DataStores from synapse.storage.data_stores.main import DataStore from synapse.storage.persist_events import EventsPersistenceStorage +from synapse.storage.purge_events import PurgeEventsStorage from synapse.storage.state import StateGroupStorage __all__ = ["DataStores", "DataStore"] @@ -46,6 +47,7 @@ class Storage(object): self.main = stores.main self.persistence = EventsPersistenceStorage(hs, stores) + self.purge_events = PurgeEventsStorage(hs, stores) self.state = StateGroupStorage(hs, stores) diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index 301f8ea128..878f7568a6 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1375,6 +1375,10 @@ class EventsStore( if True, we will delete local events as well as remote ones (instead of just marking them as outliers and deleting their state groups). + + Returns: + Deferred[set[int]]: The set of state groups that are referenced by + deleted events. """ return self.runInteraction( @@ -1511,11 +1515,10 @@ class EventsStore( [(room_id, event_id) for event_id, in new_backwards_extrems], ) - logger.info("[purge] finding redundant state groups") + logger.info("[purge] finding state groups referenced by deleted events") # Get all state groups that are referenced by events that are to be - # deleted. We then go and check if they are referenced by other events - # or state groups, and if not we delete them. + # deleted. txn.execute( """ SELECT DISTINCT state_group FROM events_to_purge @@ -1528,60 +1531,6 @@ class EventsStore( "[purge] found %i referenced state groups", len(referenced_state_groups) ) - logger.info("[purge] finding state groups that can be deleted") - - _ = self._find_unreferenced_groups_during_purge(txn, referenced_state_groups) - state_groups_to_delete, remaining_state_groups = _ - - logger.info( - "[purge] found %i state groups to delete", len(state_groups_to_delete) - ) - - logger.info( - "[purge] de-delta-ing %i remaining state groups", - len(remaining_state_groups), - ) - - # Now we turn the state groups that reference to-be-deleted state - # groups to non delta versions. - for sg in remaining_state_groups: - logger.info("[purge] de-delta-ing remaining state group %s", sg) - curr_state = self._get_state_groups_from_groups_txn(txn, [sg]) - curr_state = curr_state[sg] - - self._simple_delete_txn( - txn, table="state_groups_state", keyvalues={"state_group": sg} - ) - - self._simple_delete_txn( - txn, table="state_group_edges", keyvalues={"state_group": sg} - ) - - self._simple_insert_many_txn( - txn, - table="state_groups_state", - values=[ - { - "state_group": sg, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } - for key, state_id in iteritems(curr_state) - ], - ) - - logger.info("[purge] removing redundant state groups") - txn.executemany( - "DELETE FROM state_groups_state WHERE state_group = ?", - ((sg,) for sg in state_groups_to_delete), - ) - txn.executemany( - "DELETE FROM state_groups WHERE id = ?", - ((sg,) for sg in state_groups_to_delete), - ) - logger.info("[purge] removing events from event_to_state_groups") txn.execute( "DELETE FROM event_to_state_groups " @@ -1668,138 +1617,35 @@ class EventsStore( logger.info("[purge] done") - def _find_unreferenced_groups_during_purge(self, txn, state_groups): - """Used when purging history to figure out which state groups can be - deleted and which need to be de-delta'ed (due to one of its prev groups - being scheduled for deletion). - - Args: - txn - state_groups (set[int]): Set of state groups referenced by events - that are going to be deleted. - - Returns: - tuple[set[int], set[int]]: The set of state groups that can be - deleted and the set of state groups that need to be de-delta'ed - """ - # Graph of state group -> previous group - graph = {} - - # Set of events that we have found to be referenced by events - referenced_groups = set() - - # Set of state groups we've already seen - state_groups_seen = set(state_groups) - - # Set of state groups to handle next. - next_to_search = set(state_groups) - while next_to_search: - # We bound size of groups we're looking up at once, to stop the - # SQL query getting too big - if len(next_to_search) < 100: - current_search = next_to_search - next_to_search = set() - else: - current_search = set(itertools.islice(next_to_search, 100)) - next_to_search -= current_search - - # Check if state groups are referenced - sql = """ - SELECT DISTINCT state_group FROM event_to_state_groups - LEFT JOIN events_to_purge AS ep USING (event_id) - WHERE ep.event_id IS NULL AND - """ - clause, args = make_in_list_sql_clause( - txn.database_engine, "state_group", current_search - ) - txn.execute(sql + clause, list(args)) - - referenced = set(sg for sg, in txn) - referenced_groups |= referenced - - # We don't continue iterating up the state group graphs for state - # groups that are referenced. - current_search -= referenced - - rows = self._simple_select_many_txn( - txn, - table="state_group_edges", - column="prev_state_group", - iterable=current_search, - keyvalues={}, - retcols=("prev_state_group", "state_group"), - ) - - prevs = set(row["state_group"] for row in rows) - # We don't bother re-handling groups we've already seen - prevs -= state_groups_seen - next_to_search |= prevs - state_groups_seen |= prevs - - for row in rows: - # Note: Each state group can have at most one prev group - graph[row["state_group"]] = row["prev_state_group"] - - to_delete = state_groups_seen - referenced_groups - - to_dedelta = set() - for sg in referenced_groups: - prev_sg = graph.get(sg) - if prev_sg and prev_sg in to_delete: - to_dedelta.add(sg) - - return to_delete, to_dedelta + return referenced_state_groups def purge_room(self, room_id): """Deletes all record of a room Args: - room_id (str): + room_id (str) + + Returns: + Deferred[List[int]]: The list of state groups to delete. """ return self.runInteraction("purge_room", self._purge_room_txn, room_id) def _purge_room_txn(self, txn, room_id): - # first we have to delete the state groups states - logger.info("[purge] removing %s from state_groups_state", room_id) - + # First we fetch all the state groups that should be deleted, before + # we delete that information. txn.execute( """ - DELETE FROM state_groups_state WHERE state_group IN ( - SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) - WHERE events.room_id=? - ) + SELECT DISTINCT state_group FROM events + INNER JOIN event_to_state_groups USING(event_id) + WHERE events.room_id = ? """, (room_id,), ) - # ... and the state group edges - logger.info("[purge] removing %s from state_group_edges", room_id) - - txn.execute( - """ - DELETE FROM state_group_edges WHERE state_group IN ( - SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) - WHERE events.room_id=? - ) - """, - (room_id,), - ) - - # ... and the state groups - logger.info("[purge] removing %s from state_groups", room_id) - - txn.execute( - """ - DELETE FROM state_groups WHERE id IN ( - SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) - WHERE events.room_id=? - ) - """, - (room_id,), - ) + state_groups = [row[0] for row in txn] - # and then tables which lack an index on room_id but have one on event_id + # Now we delete tables which lack an index on room_id but have one on event_id for table in ( "event_auth", "event_edges", @@ -1887,6 +1733,165 @@ class EventsStore( logger.info("[purge] done") + return state_groups + + def purge_unreferenced_state_groups( + self, room_id: str, state_groups_to_delete + ) -> defer.Deferred: + """Deletes no longer referenced state groups and de-deltas any state + groups that reference them. + + Args: + room_id: The room the state groups belong to (must all be in the + same room). + state_groups_to_delete (Collection[int]): Set of all state groups + to delete. + """ + + return self.runInteraction( + "purge_unreferenced_state_groups", + self._purge_unreferenced_state_groups, + room_id, + state_groups_to_delete, + ) + + def _purge_unreferenced_state_groups(self, txn, room_id, state_groups_to_delete): + logger.info( + "[purge] found %i state groups to delete", len(state_groups_to_delete) + ) + + rows = self._simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=state_groups_to_delete, + keyvalues={}, + retcols=("state_group",), + ) + + remaining_state_groups = set( + row["state_group"] + for row in rows + if row["state_group"] not in state_groups_to_delete + ) + + logger.info( + "[purge] de-delta-ing %i remaining state groups", + len(remaining_state_groups), + ) + + # Now we turn the state groups that reference to-be-deleted state + # groups to non delta versions. + for sg in remaining_state_groups: + logger.info("[purge] de-delta-ing remaining state group %s", sg) + curr_state = self._get_state_groups_from_groups_txn(txn, [sg]) + curr_state = curr_state[sg] + + self._simple_delete_txn( + txn, table="state_groups_state", keyvalues={"state_group": sg} + ) + + self._simple_delete_txn( + txn, table="state_group_edges", keyvalues={"state_group": sg} + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": sg, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in iteritems(curr_state) + ], + ) + + logger.info("[purge] removing redundant state groups") + txn.executemany( + "DELETE FROM state_groups_state WHERE state_group = ?", + ((sg,) for sg in state_groups_to_delete), + ) + txn.executemany( + "DELETE FROM state_groups WHERE id = ?", + ((sg,) for sg in state_groups_to_delete), + ) + + @defer.inlineCallbacks + def get_previous_state_groups(self, state_groups): + """Fetch the previous groups of the given state groups. + + Args: + state_groups (Iterable[int]) + + Returns: + Deferred[dict[int, int]]: mapping from state group to previous + state group. + """ + + rows = yield self._simple_select_many_batch( + table="state_group_edges", + column="prev_state_group", + iterable=state_groups, + keyvalues={}, + retcols=("prev_state_group", "state_group"), + desc="get_previous_state_groups", + ) + + return {row["state_group"]: row["prev_state_group"] for row in rows} + + def purge_room_state(self, room_id, state_groups_to_delete): + """Deletes all record of a room from state tables + + Args: + room_id (str): + state_groups_to_delete (list[int]): State groups to delete + """ + + return self.runInteraction( + "purge_room_state", + self._purge_room_state_txn, + room_id, + state_groups_to_delete, + ) + + def _purge_room_state_txn(self, txn, room_id, state_groups_to_delete): + # first we have to delete the state groups states + logger.info("[purge] removing %s from state_groups_state", room_id) + + self._simple_delete_many_txn( + txn, + table="state_groups_state", + column="state_group", + iterable=state_groups_to_delete, + keyvalues={}, + ) + + # ... and the state group edges + logger.info("[purge] removing %s from state_group_edges", room_id) + + self._simple_delete_many_txn( + txn, + table="state_group_edges", + column="state_group", + iterable=state_groups_to_delete, + keyvalues={}, + ) + + # ... and the state groups + logger.info("[purge] removing %s from state_groups", room_id) + + self._simple_delete_many_txn( + txn, + table="state_groups", + column="id", + iterable=state_groups_to_delete, + keyvalues={}, + ) + async def is_event_after(self, event_id1, event_id2): """Returns True if event_id1 is after event_id2 in the stream """ diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py index 51352b9966..0ed59ef48e 100644 --- a/synapse/storage/data_stores/main/events_bg_updates.py +++ b/synapse/storage/data_stores/main/events_bg_updates.py @@ -21,6 +21,7 @@ from canonicaljson import json from twisted.internet import defer +from synapse.api.constants import EventContentFields from synapse.storage._base import make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore @@ -85,6 +86,10 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): "event_fix_redactions_bytes", self._event_fix_redactions_bytes ) + self.register_background_update_handler( + "event_store_labels", self._event_store_labels + ) + @defer.inlineCallbacks def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] @@ -503,3 +508,61 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore): yield self._end_background_update("event_fix_redactions_bytes") return 1 + + @defer.inlineCallbacks + def _event_store_labels(self, progress, batch_size): + """Background update handler which will store labels for existing events.""" + last_event_id = progress.get("last_event_id", "") + + def _event_store_labels_txn(txn): + txn.execute( + """ + SELECT event_id, json FROM event_json + LEFT JOIN event_labels USING (event_id) + WHERE event_id > ? AND label IS NULL + ORDER BY event_id LIMIT ? + """, + (last_event_id, batch_size), + ) + + results = list(txn) + + nbrows = 0 + last_row_event_id = "" + for (event_id, event_json_raw) in results: + event_json = json.loads(event_json_raw) + + self._simple_insert_many_txn( + txn=txn, + table="event_labels", + values=[ + { + "event_id": event_id, + "label": label, + "room_id": event_json["room_id"], + "topological_ordering": event_json["depth"], + } + for label in event_json["content"].get( + EventContentFields.LABELS, [] + ) + if isinstance(label, str) + ], + ) + + nbrows += 1 + last_row_event_id = event_id + + self._background_update_progress_txn( + txn, "event_store_labels", {"last_event_id": last_row_event_id} + ) + + return nbrows + + num_rows = yield self.runInteraction( + desc="event_store_labels", func=_event_store_labels_txn + ) + + if not num_rows: + yield self._end_background_update("event_store_labels") + + return num_rows diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py index b3a2771f1b..5ded539af8 100644 --- a/synapse/storage/data_stores/main/group_server.py +++ b/synapse/storage/data_stores/main/group_server.py @@ -553,6 +553,21 @@ class GroupServerStore(SQLBaseStore): desc="remove_user_from_summary", ) + def get_local_groups_for_room(self, room_id): + """Get all of the local group that contain a given room + Args: + room_id (str): The ID of a room + Returns: + Deferred[list[str]]: A twisted.Deferred containing a list of group ids + containing this room + """ + return self._simple_select_onecol( + table="group_rooms", + keyvalues={"room_id": room_id}, + retcol="group_id", + desc="get_local_groups_for_room", + ) + def get_users_for_summary_by_role(self, group_id, include_private=False): """Get the users and roles that should be included in a summary request diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index f70d41ecab..ee1b2b2bbf 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -488,14 +488,14 @@ class RegistrationWorkerStore(SQLBaseStore): we can. Unfortunately, it's possible some of them are already taken by existing users, and there may be gaps in the already taken range. This function returns the start of the first allocatable gap. This is to - avoid the case of ID 10000000 being pre-allocated, so us wasting the - first (and shortest) many generated user IDs. + avoid the case of ID 1000 being pre-allocated and starting at 1001 while + 0-999 are available. """ def _find_next_generated_user_id(txn): - # We bound between '@1' and '@a' to avoid pulling the entire table + # We bound between '@0' and '@a' to avoid pulling the entire table # out. - txn.execute("SELECT name FROM users WHERE '@1' <= name AND name < '@a'") + txn.execute("SELECT name FROM users WHERE '@0' <= name AND name < '@a'") regex = re.compile(r"^@(\d+):") diff --git a/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql b/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql new file mode 100644 index 0000000000..5f5e0499ae --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/event_labels_background_update.sql @@ -0,0 +1,17 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('event_store_labels', '{}'); diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index 9e1541988e..6a90daea31 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -285,7 +285,11 @@ class StateGroupWorkerStore( room_id (str) Returns: - Deferred[unicode|None]: predecessor room id + Deferred[dict|None]: A dictionary containing the structure of the predecessor + field from the room's create event. The structure is subject to other servers, + but it is expected to be: + * room_id (str): The room ID of the predecessor room + * event_id (str): The ID of the tombstone event in the predecessor room Raises: NotFoundError if the room is unknown @@ -991,6 +995,29 @@ class StateGroupWorkerStore( return self.runInteraction("store_state_group", _store_state_group_txn) + @defer.inlineCallbacks + def get_referenced_state_groups(self, state_groups): + """Check if the state groups are referenced by events. + + Args: + state_groups (Iterable[int]) + + Returns: + Deferred[set[int]]: The subset of state groups that are + referenced. + """ + + rows = yield self._simple_select_many_batch( + table="event_to_state_groups", + column="state_group", + iterable=state_groups, + keyvalues={}, + retcols=("DISTINCT state_group",), + desc="get_referenced_state_groups", + ) + + return set(row["state_group"] for row in rows) + class StateBackgroundUpdateStore( StateGroupBackgroundUpdateStore, BackgroundUpdateStore diff --git a/synapse/storage/purge_events.py b/synapse/storage/purge_events.py new file mode 100644 index 0000000000..a368182034 --- /dev/null +++ b/synapse/storage/purge_events.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import logging + +from twisted.internet import defer + +logger = logging.getLogger(__name__) + + +class PurgeEventsStorage(object): + """High level interface for purging rooms and event history. + """ + + def __init__(self, hs, stores): + self.stores = stores + + @defer.inlineCallbacks + def purge_room(self, room_id: str): + """Deletes all record of a room + """ + + state_groups_to_delete = yield self.stores.main.purge_room(room_id) + yield self.stores.main.purge_room_state(room_id, state_groups_to_delete) + + @defer.inlineCallbacks + def purge_history(self, room_id, token, delete_local_events): + """Deletes room history before a certain point + + Args: + room_id (str): + + token (str): A topological token to delete events before + + delete_local_events (bool): + if True, we will delete local events as well as remote ones + (instead of just marking them as outliers and deleting their + state groups). + """ + state_groups = yield self.stores.main.purge_history( + room_id, token, delete_local_events + ) + + logger.info("[purge] finding state groups that can be deleted") + + sg_to_delete = yield self._find_unreferenced_groups(state_groups) + + yield self.stores.main.purge_unreferenced_state_groups(room_id, sg_to_delete) + + @defer.inlineCallbacks + def _find_unreferenced_groups(self, state_groups): + """Used when purging history to figure out which state groups can be + deleted. + + Args: + state_groups (set[int]): Set of state groups referenced by events + that are going to be deleted. + + Returns: + Deferred[set[int]] The set of state groups that can be deleted. + """ + # Graph of state group -> previous group + graph = {} + + # Set of events that we have found to be referenced by events + referenced_groups = set() + + # Set of state groups we've already seen + state_groups_seen = set(state_groups) + + # Set of state groups to handle next. + next_to_search = set(state_groups) + while next_to_search: + # We bound size of groups we're looking up at once, to stop the + # SQL query getting too big + if len(next_to_search) < 100: + current_search = next_to_search + next_to_search = set() + else: + current_search = set(itertools.islice(next_to_search, 100)) + next_to_search -= current_search + + referenced = yield self.stores.main.get_referenced_state_groups( + current_search + ) + referenced_groups |= referenced + + # We don't continue iterating up the state group graphs for state + # groups that are referenced. + current_search -= referenced + + edges = yield self.stores.main.get_previous_state_groups(current_search) + + prevs = set(edges.values()) + # We don't bother re-handling groups we've already seen + prevs -= state_groups_seen + next_to_search |= prevs + state_groups_seen |= prevs + + graph.update(edges) + + to_delete = state_groups_seen - referenced_groups + + return to_delete |