diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 4 | ||||
-rw-r--r-- | synapse/storage/background_updates.py | 51 | ||||
-rw-r--r-- | synapse/storage/data_stores/__init__.py | 21 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/event_federation.py | 70 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/events_worker.py | 44 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/room.py | 64 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql | 18 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/state.py | 4 |
8 files changed, 195 insertions, 81 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 88546ad614..3bb9381663 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -16,6 +16,7 @@ # limitations under the License. import logging import random +from abc import ABCMeta from six import PY2 from six.moves import builtins @@ -30,7 +31,8 @@ from synapse.types import get_domain_from_id logger = logging.getLogger(__name__) -class SQLBaseStore(object): +# some of our subclasses have abstract methods, so we use the ABCMeta metaclass. +class SQLBaseStore(metaclass=ABCMeta): """Base class for data stores that holds helper functions. Note that multiple instances of this class will exist as there will be one diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 4f97fd5ab6..bd547f35cf 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Optional from canonicaljson import json @@ -97,15 +98,14 @@ class BackgroundUpdater(object): def start_doing_background_updates(self): run_as_background_process("background_updates", self.run_background_updates) - @defer.inlineCallbacks - def run_background_updates(self, sleep=True): + async def run_background_updates(self, sleep=True): logger.info("Starting background schema updates") while True: if sleep: - yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) + await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) try: - result = yield self.do_next_background_update( + result = await self.do_next_background_update( self.BACKGROUND_UPDATE_DURATION_MS ) except Exception: @@ -170,20 +170,21 @@ class BackgroundUpdater(object): return not update_exists - @defer.inlineCallbacks - def do_next_background_update(self, desired_duration_ms): + async def do_next_background_update( + self, desired_duration_ms: float + ) -> Optional[int]: """Does some amount of work on the next queued background update + Returns once some amount of work is done. + Args: desired_duration_ms(float): How long we want to spend updating. Returns: - A deferred that completes once some amount of work is done. - The deferred will have a value of None if there is currently - no more work to do. + None if there is no more work to do, otherwise an int """ if not self._background_update_queue: - updates = yield self.db.simple_select_list( + updates = await self.db.simple_select_list( "background_updates", keyvalues=None, retcols=("update_name", "depends_on"), @@ -201,11 +202,12 @@ class BackgroundUpdater(object): update_name = self._background_update_queue.pop(0) self._background_update_queue.append(update_name) - res = yield self._do_background_update(update_name, desired_duration_ms) + res = await self._do_background_update(update_name, desired_duration_ms) return res - @defer.inlineCallbacks - def _do_background_update(self, update_name, desired_duration_ms): + async def _do_background_update( + self, update_name: str, desired_duration_ms: float + ) -> int: logger.info("Starting update batch on background update '%s'", update_name) update_handler = self._background_update_handlers[update_name] @@ -225,7 +227,7 @@ class BackgroundUpdater(object): else: batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE - progress_json = yield self.db.simple_select_one_onecol( + progress_json = await self.db.simple_select_one_onecol( "background_updates", keyvalues={"update_name": update_name}, retcol="progress_json", @@ -234,7 +236,7 @@ class BackgroundUpdater(object): progress = json.loads(progress_json) time_start = self._clock.time_msec() - items_updated = yield update_handler(progress, batch_size) + items_updated = await update_handler(progress, batch_size) time_stop = self._clock.time_msec() duration_ms = time_stop - time_start @@ -263,7 +265,9 @@ class BackgroundUpdater(object): * A dict of the current progress * An integer count of the number of items to update in this batch. - The handler should return a deferred integer count of items updated. + The handler should return a deferred or coroutine which returns an integer count + of items updated. + The handler is responsible for updating the progress of the update. Args: @@ -432,6 +436,21 @@ class BackgroundUpdater(object): "background_updates", keyvalues={"update_name": update_name} ) + def _background_update_progress(self, update_name: str, progress: dict): + """Update the progress of a background update + + Args: + update_name: The name of the background update task + progress: The progress of the update. + """ + + return self.db.runInteraction( + "background_update_progress", + self._background_update_progress_txn, + update_name, + progress, + ) + def _background_update_progress_txn(self, txn, update_name, progress): """Update the progress of a background update diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py index d20df5f076..092e803799 100644 --- a/synapse/storage/data_stores/__init__.py +++ b/synapse/storage/data_stores/__init__.py @@ -37,6 +37,8 @@ class DataStores(object): # store. self.databases = [] + self.main = None + self.state = None for database_config in hs.config.database.databases: db_name = database_config.name @@ -54,10 +56,22 @@ class DataStores(object): if "main" in database_config.data_stores: logger.info("Starting 'main' data store") + + # Sanity check we don't try and configure the main store on + # multiple databases. + if self.main: + raise Exception("'main' data store already configured") + self.main = main_store_class(database, db_conn, hs) if "state" in database_config.data_stores: logger.info("Starting 'state' data store") + + # Sanity check we don't try and configure the state store on + # multiple databases. + if self.state: + raise Exception("'state' data store already configured") + self.state = StateGroupDataStore(database, db_conn, hs) db_conn.commit() @@ -65,3 +79,10 @@ class DataStores(object): self.databases.append(database) logger.info("Database %r prepared", db_name) + + # Sanity check that we have actually configured all the required stores. + if not self.main: + raise Exception("No 'main' data store configured") + + if not self.state: + raise Exception("No 'main' data store configured") diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 1d18f13801..60c67457b4 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -14,13 +14,10 @@ # limitations under the License. import itertools import logging -import random from six.moves import range from six.moves.queue import Empty, PriorityQueue -from unpaddedbase64 import encode_base64 - from twisted.internet import defer from synapse.api.errors import StoreError @@ -148,8 +145,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas retcol="event_id", ) - @defer.inlineCallbacks - def get_prev_events_for_room(self, room_id): + def get_prev_events_for_room(self, room_id: str): """ Gets a subset of the current forward extremities in the given room. @@ -160,40 +156,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas room_id (str): room_id Returns: - Deferred[list[(str, dict[str, str], int)]] - for each event, a tuple of (event_id, hashes, depth) - where *hashes* is a map from algorithm to hash. + Deferred[List[str]]: the event ids of the forward extremites + """ - res = yield self.get_latest_event_ids_and_hashes_in_room(room_id) - if len(res) > 10: - # Sort by reverse depth, so we point to the most recent. - res.sort(key=lambda a: -a[2]) - # we use half of the limit for the actual most recent events, and - # the other half to randomly point to some of the older events, to - # make sure that we don't completely ignore the older events. - res = res[0:5] + random.sample(res[5:], 5) + return self.db.runInteraction( + "get_prev_events_for_room", self._get_prev_events_for_room_txn, room_id + ) - return res + def _get_prev_events_for_room_txn(self, txn, room_id: str): + # we just use the 10 newest events. Older events will become + # prev_events of future events. - def get_latest_event_ids_and_hashes_in_room(self, room_id): + sql = """ + SELECT e.event_id FROM event_forward_extremities AS f + INNER JOIN events AS e USING (event_id) + WHERE f.room_id = ? + ORDER BY e.depth DESC + LIMIT 10 """ - Gets the current forward extremities in the given room - Args: - room_id (str): room_id + txn.execute(sql, (room_id,)) - Returns: - Deferred[list[(str, dict[str, str], int)]] - for each event, a tuple of (event_id, hashes, depth) - where *hashes* is a map from algorithm to hash. - """ - - return self.db.runInteraction( - "get_latest_event_ids_and_hashes_in_room", - self._get_latest_event_ids_and_hashes_in_room, - room_id, - ) + return [row[0] for row in txn] def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter): """Get the top rooms with at least N extremities. @@ -243,27 +228,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas desc="get_latest_event_ids_in_room", ) - def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id): - sql = ( - "SELECT e.event_id, e.depth FROM events as e " - "INNER JOIN event_forward_extremities as f " - "ON e.event_id = f.event_id " - "AND e.room_id = f.room_id " - "WHERE f.room_id = ?" - ) - - txn.execute(sql, (room_id,)) - - results = [] - for event_id, depth in txn.fetchall(): - hashes = self._get_event_reference_hashes_txn(txn, event_id) - prev_hashes = { - k: encode_base64(v) for k, v in hashes.items() if k == "sha256" - } - results.append((event_id, prev_hashes, depth)) - - return results - def get_min_depth(self, room_id): """ For hte given room, get the minimum depth we have seen for it. """ diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index 2c9142814c..0cce5232f5 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -137,7 +137,7 @@ class EventsWorkerStore(SQLBaseStore): @defer.inlineCallbacks def get_event( self, - event_id: List[str], + event_id: str, redact_behaviour: EventRedactBehaviour = EventRedactBehaviour.REDACT, get_prev_content: bool = False, allow_rejected: bool = False, @@ -148,15 +148,22 @@ class EventsWorkerStore(SQLBaseStore): Args: event_id: The event_id of the event to fetch + redact_behaviour: Determine what to do with a redacted event. Possible values: * AS_IS - Return the full event body with no redacted content * REDACT - Return the event but with a redacted body - * DISALLOW - Do not return redacted events + * DISALLOW - Do not return redacted events (behave as per allow_none + if the event is redacted) + get_prev_content: If True and event is a state event, include the previous states content in the unsigned field. - allow_rejected: If True return rejected events. + + allow_rejected: If True, return rejected events. Otherwise, + behave as per allow_none. + allow_none: If True, return None if no event found, if False throw a NotFoundError + check_room_id: if not None, check the room of the found event. If there is a mismatch, behave as per allow_none. @@ -196,14 +203,18 @@ class EventsWorkerStore(SQLBaseStore): Args: event_ids: The event_ids of the events to fetch + redact_behaviour: Determine what to do with a redacted event. Possible values: * AS_IS - Return the full event body with no redacted content * REDACT - Return the event but with a redacted body - * DISALLOW - Do not return redacted events + * DISALLOW - Do not return redacted events (omit them from the response) + get_prev_content: If True and event is a state event, include the previous states content in the unsigned field. - allow_rejected: If True return rejected events. + + allow_rejected: If True, return rejected events. Otherwise, + omits rejeted events from the response. Returns: Deferred : Dict from event_id to event. @@ -228,15 +239,21 @@ class EventsWorkerStore(SQLBaseStore): """Get events from the database and return in a list in the same order as given by `event_ids` arg. + Unknown events will be omitted from the response. + Args: event_ids: The event_ids of the events to fetch + redact_behaviour: Determine what to do with a redacted event. Possible values: * AS_IS - Return the full event body with no redacted content * REDACT - Return the event but with a redacted body - * DISALLOW - Do not return redacted events + * DISALLOW - Do not return redacted events (omit them from the response) + get_prev_content: If True and event is a state event, include the previous states content in the unsigned field. - allow_rejected: If True, return rejected events. + + allow_rejected: If True, return rejected events. Otherwise, + omits rejected events from the response. Returns: Deferred[list[EventBase]]: List of events fetched from the database. The @@ -369,9 +386,14 @@ class EventsWorkerStore(SQLBaseStore): If events are pulled from the database, they will be cached for future lookups. + Unknown events are omitted from the response. + Args: + event_ids (Iterable[str]): The event_ids of the events to fetch - allow_rejected (bool): Whether to include rejected events + + allow_rejected (bool): Whether to include rejected events. If False, + rejected events are omitted from the response. Returns: Deferred[Dict[str, _EventCacheEntry]]: @@ -506,9 +528,13 @@ class EventsWorkerStore(SQLBaseStore): Returned events will be added to the cache for future lookups. + Unknown events are omitted from the response. + Args: event_ids (Iterable[str]): The event_ids of the events to fetch - allow_rejected (bool): Whether to include rejected events + + allow_rejected (bool): Whether to include rejected events. If False, + rejected events are omitted from the response. Returns: Deferred[Dict[str, _EventCacheEntry]]: diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index aa476d0fbf..79cfd39194 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -17,6 +17,7 @@ import collections import logging import re +from abc import abstractmethod from typing import Optional, Tuple from six import integer_types @@ -367,6 +368,8 @@ class RoomWorkerStore(SQLBaseStore): class RoomBackgroundUpdateStore(SQLBaseStore): + REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" + def __init__(self, database: Database, db_conn, hs): super(RoomBackgroundUpdateStore, self).__init__(database, db_conn, hs) @@ -376,6 +379,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore): "insert_room_retention", self._background_insert_retention, ) + self.db.updates.register_background_update_handler( + self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, + self._remove_tombstoned_rooms_from_directory, + ) + @defer.inlineCallbacks def _background_insert_retention(self, progress, batch_size): """Retrieves a list of all rooms within a range and inserts an entry for each of @@ -444,6 +452,62 @@ class RoomBackgroundUpdateStore(SQLBaseStore): defer.returnValue(batch_size) + async def _remove_tombstoned_rooms_from_directory( + self, progress, batch_size + ) -> int: + """Removes any rooms with tombstone events from the room directory + + Nowadays this is handled by the room upgrade handler, but we may have some + that got left behind + """ + + last_room = progress.get("room_id", "") + + def _get_rooms(txn): + txn.execute( + """ + SELECT room_id + FROM rooms r + INNER JOIN current_state_events cse USING (room_id) + WHERE room_id > ? AND r.is_public + AND cse.type = '%s' AND cse.state_key = '' + ORDER BY room_id ASC + LIMIT ?; + """ + % EventTypes.Tombstone, + (last_room, batch_size), + ) + + return [row[0] for row in txn] + + rooms = await self.db.runInteraction( + "get_tombstoned_directory_rooms", _get_rooms + ) + + if not rooms: + await self.db.updates._end_background_update( + self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE + ) + return 0 + + for room_id in rooms: + logger.info("Removing tombstoned room %s from the directory", room_id) + await self.set_room_is_public(room_id, False) + + await self.db.updates._background_update_progress( + self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]} + ) + + return len(rooms) + + @abstractmethod + def set_room_is_public(self, room_id, is_public): + # this will need to be implemented if a background update is performed with + # existing (tombstoned, public) rooms in the database. + # + # It's overridden by RoomStore for the synapse master. + raise NotImplementedError() + class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): def __init__(self, database: Database, db_conn, hs): diff --git a/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql b/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql new file mode 100644 index 0000000000..aeb17813d3 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Now that #6232 is a thing, we can remove old rooms from the directory. +INSERT INTO background_updates (update_name, progress_json) VALUES + ('remove_tombstoned_rooms_from_directory', '{}'); diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py index 0dc39f139c..d07440e3ed 100644 --- a/synapse/storage/data_stores/main/state.py +++ b/synapse/storage/data_stores/main/state.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import collections.abc import logging from collections import namedtuple from typing import Iterable, Tuple @@ -107,7 +107,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): predecessor = create_event.content.get("predecessor", None) # Ensure the key is a dictionary - if not isinstance(predecessor, dict): + if not isinstance(predecessor, collections.abc.Mapping): return None return predecessor |