diff options
Diffstat (limited to 'synapse/storage/data_stores')
6 files changed, 157 insertions, 64 deletions
diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py index d20df5f076..092e803799 100644 --- a/synapse/storage/data_stores/__init__.py +++ b/synapse/storage/data_stores/__init__.py @@ -37,6 +37,8 @@ class DataStores(object): # store. self.databases = [] + self.main = None + self.state = None for database_config in hs.config.database.databases: db_name = database_config.name @@ -54,10 +56,22 @@ class DataStores(object): if "main" in database_config.data_stores: logger.info("Starting 'main' data store") + + # Sanity check we don't try and configure the main store on + # multiple databases. + if self.main: + raise Exception("'main' data store already configured") + self.main = main_store_class(database, db_conn, hs) if "state" in database_config.data_stores: logger.info("Starting 'state' data store") + + # Sanity check we don't try and configure the state store on + # multiple databases. + if self.state: + raise Exception("'state' data store already configured") + self.state = StateGroupDataStore(database, db_conn, hs) db_conn.commit() @@ -65,3 +79,10 @@ class DataStores(object): self.databases.append(database) logger.info("Database %r prepared", db_name) + + # Sanity check that we have actually configured all the required stores. + if not self.main: + raise Exception("No 'main' data store configured") + + if not self.state: + raise Exception("No 'main' data store configured") diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 1d18f13801..60c67457b4 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -14,13 +14,10 @@ # limitations under the License. import itertools import logging -import random from six.moves import range from six.moves.queue import Empty, PriorityQueue -from unpaddedbase64 import encode_base64 - from twisted.internet import defer from synapse.api.errors import StoreError @@ -148,8 +145,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas retcol="event_id", ) - @defer.inlineCallbacks - def get_prev_events_for_room(self, room_id): + def get_prev_events_for_room(self, room_id: str): """ Gets a subset of the current forward extremities in the given room. @@ -160,40 +156,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas room_id (str): room_id Returns: - Deferred[list[(str, dict[str, str], int)]] - for each event, a tuple of (event_id, hashes, depth) - where *hashes* is a map from algorithm to hash. + Deferred[List[str]]: the event ids of the forward extremites + """ - res = yield self.get_latest_event_ids_and_hashes_in_room(room_id) - if len(res) > 10: - # Sort by reverse depth, so we point to the most recent. - res.sort(key=lambda a: -a[2]) - # we use half of the limit for the actual most recent events, and - # the other half to randomly point to some of the older events, to - # make sure that we don't completely ignore the older events. - res = res[0:5] + random.sample(res[5:], 5) + return self.db.runInteraction( + "get_prev_events_for_room", self._get_prev_events_for_room_txn, room_id + ) - return res + def _get_prev_events_for_room_txn(self, txn, room_id: str): + # we just use the 10 newest events. Older events will become + # prev_events of future events. - def get_latest_event_ids_and_hashes_in_room(self, room_id): + sql = """ + SELECT e.event_id FROM event_forward_extremities AS f + INNER JOIN events AS e USING (event_id) + WHERE f.room_id = ? + ORDER BY e.depth DESC + LIMIT 10 """ - Gets the current forward extremities in the given room - Args: - room_id (str): room_id + txn.execute(sql, (room_id,)) - Returns: - Deferred[list[(str, dict[str, str], int)]] - for each event, a tuple of (event_id, hashes, depth) - where *hashes* is a map from algorithm to hash. - """ - - return self.db.runInteraction( - "get_latest_event_ids_and_hashes_in_room", - self._get_latest_event_ids_and_hashes_in_room, - room_id, - ) + return [row[0] for row in txn] def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter): """Get the top rooms with at least N extremities. @@ -243,27 +228,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas desc="get_latest_event_ids_in_room", ) - def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id): - sql = ( - "SELECT e.event_id, e.depth FROM events as e " - "INNER JOIN event_forward_extremities as f " - "ON e.event_id = f.event_id " - "AND e.room_id = f.room_id " - "WHERE f.room_id = ?" - ) - - txn.execute(sql, (room_id,)) - - results = [] - for event_id, depth in txn.fetchall(): - hashes = self._get_event_reference_hashes_txn(txn, event_id) - prev_hashes = { - k: encode_base64(v) for k, v in hashes.items() if k == "sha256" - } - results.append((event_id, prev_hashes, depth)) - - return results - def get_min_depth(self, room_id): """ For hte given room, get the minimum depth we have seen for it. """ diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 2c9142814c..0cce5232f5 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -137,7 +137,7 @@ class EventsWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_event( self, - event_id: List[str], + event_id: str, redact_behaviour: EventRedactBehaviour = EventRedactBehaviour.REDACT, get_prev_content: bool = False, allow_rejected: bool = False, @@ -148,15 +148,22 @@ class EventsWorkerStore(SQLBaseStore): Args: event_id: The event_id of the event to fetch + redact_behaviour: Determine what to do with a redacted event. Possible values: * AS_IS - Return the full event body with no redacted content * REDACT - Return the event but with a redacted body - * DISALLOW - Do not return redacted events + * DISALLOW - Do not return redacted events (behave as per allow_none + if the event is redacted) + get_prev_content: If True and event is a state event, include the previous states content in the unsigned field. - allow_rejected: If True return rejected events. + + allow_rejected: If True, return rejected events. Otherwise, + behave as per allow_none. + allow_none: If True, return None if no event found, if False throw a NotFoundError + check_room_id: if not None, check the room of the found event. If there is a mismatch, behave as per allow_none. @@ -196,14 +203,18 @@ class EventsWorkerStore(SQLBaseStore): Args: event_ids: The event_ids of the events to fetch + redact_behaviour: Determine what to do with a redacted event. Possible values: * AS_IS - Return the full event body with no redacted content * REDACT - Return the event but with a redacted body - * DISALLOW - Do not return redacted events + * DISALLOW - Do not return redacted events (omit them from the response) + get_prev_content: If True and event is a state event, include the previous states content in the unsigned field. - allow_rejected: If True return rejected events. + + allow_rejected: If True, return rejected events. Otherwise, + omits rejeted events from the response. Returns: Deferred : Dict from event_id to event. @@ -228,15 +239,21 @@ class EventsWorkerStore(SQLBaseStore): """Get events from the database and return in a list in the same order as given by `event_ids` arg. + Unknown events will be omitted from the response. + Args: event_ids: The event_ids of the events to fetch + redact_behaviour: Determine what to do with a redacted event. Possible values: * AS_IS - Return the full event body with no redacted content * REDACT - Return the event but with a redacted body - * DISALLOW - Do not return redacted events + * DISALLOW - Do not return redacted events (omit them from the response) + get_prev_content: If True and event is a state event, include the previous states content in the unsigned field. - allow_rejected: If True, return rejected events. + + allow_rejected: If True, return rejected events. Otherwise, + omits rejected events from the response. Returns: Deferred[list[EventBase]]: List of events fetched from the database. The @@ -369,9 +386,14 @@ class EventsWorkerStore(SQLBaseStore): If events are pulled from the database, they will be cached for future lookups. + Unknown events are omitted from the response. + Args: + event_ids (Iterable[str]): The event_ids of the events to fetch - allow_rejected (bool): Whether to include rejected events + + allow_rejected (bool): Whether to include rejected events. If False, + rejected events are omitted from the response. Returns: Deferred[Dict[str, _EventCacheEntry]]: @@ -506,9 +528,13 @@ class EventsWorkerStore(SQLBaseStore): Returned events will be added to the cache for future lookups. + Unknown events are omitted from the response. + Args: event_ids (Iterable[str]): The event_ids of the events to fetch - allow_rejected (bool): Whether to include rejected events + + allow_rejected (bool): Whether to include rejected events. If False, + rejected events are omitted from the response. Returns: Deferred[Dict[str, _EventCacheEntry]]: diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index aa476d0fbf..79cfd39194 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -17,6 +17,7 @@ import collections import logging import re +from abc import abstractmethod from typing import Optional, Tuple from six import integer_types @@ -367,6 +368,8 @@ class RoomWorkerStore(SQLBaseStore): class RoomBackgroundUpdateStore(SQLBaseStore): + REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" + def __init__(self, database: Database, db_conn, hs): super(RoomBackgroundUpdateStore, self).__init__(database, db_conn, hs) @@ -376,6 +379,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore): "insert_room_retention", self._background_insert_retention, ) + self.db.updates.register_background_update_handler( + self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, + self._remove_tombstoned_rooms_from_directory, + ) + @defer.inlineCallbacks def _background_insert_retention(self, progress, batch_size): """Retrieves a list of all rooms within a range and inserts an entry for each of @@ -444,6 +452,62 @@ class RoomBackgroundUpdateStore(SQLBaseStore): defer.returnValue(batch_size) + async def _remove_tombstoned_rooms_from_directory( + self, progress, batch_size + ) -> int: + """Removes any rooms with tombstone events from the room directory + + Nowadays this is handled by the room upgrade handler, but we may have some + that got left behind + """ + + last_room = progress.get("room_id", "") + + def _get_rooms(txn): + txn.execute( + """ + SELECT room_id + FROM rooms r + INNER JOIN current_state_events cse USING (room_id) + WHERE room_id > ? AND r.is_public + AND cse.type = '%s' AND cse.state_key = '' + ORDER BY room_id ASC + LIMIT ?; + """ + % EventTypes.Tombstone, + (last_room, batch_size), + ) + + return [row[0] for row in txn] + + rooms = await self.db.runInteraction( + "get_tombstoned_directory_rooms", _get_rooms + ) + + if not rooms: + await self.db.updates._end_background_update( + self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE + ) + return 0 + + for room_id in rooms: + logger.info("Removing tombstoned room %s from the directory", room_id) + await self.set_room_is_public(room_id, False) + + await self.db.updates._background_update_progress( + self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]} + ) + + return len(rooms) + + @abstractmethod + def set_room_is_public(self, room_id, is_public): + # this will need to be implemented if a background update is performed with + # existing (tombstoned, public) rooms in the database. + # + # It's overridden by RoomStore for the synapse master. + raise NotImplementedError() + class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): def __init__(self, database: Database, db_conn, hs): diff --git a/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql b/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql new file mode 100644 index 0000000000..aeb17813d3 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Now that #6232 is a thing, we can remove old rooms from the directory. +INSERT INTO background_updates (update_name, progress_json) VALUES + ('remove_tombstoned_rooms_from_directory', '{}'); diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index 0dc39f139c..d07440e3ed 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections.abc import logging from collections import namedtuple from typing import Iterable, Tuple @@ -107,7 +107,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): predecessor = create_event.content.get("predecessor", None) # Ensure the key is a dictionary - if not isinstance(predecessor, dict): + if not isinstance(predecessor, collections.abc.Mapping): return None return predecessor |