diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 88546ad614..3bb9381663 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -16,6 +16,7 @@
# limitations under the License.
import logging
import random
+from abc import ABCMeta
from six import PY2
from six.moves import builtins
@@ -30,7 +31,8 @@ from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
-class SQLBaseStore(object):
+# some of our subclasses have abstract methods, so we use the ABCMeta metaclass.
+class SQLBaseStore(metaclass=ABCMeta):
"""Base class for data stores that holds helper functions.
Note that multiple instances of this class will exist as there will be one
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 4f97fd5ab6..bd547f35cf 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+from typing import Optional
from canonicaljson import json
@@ -97,15 +98,14 @@ class BackgroundUpdater(object):
def start_doing_background_updates(self):
run_as_background_process("background_updates", self.run_background_updates)
- @defer.inlineCallbacks
- def run_background_updates(self, sleep=True):
+ async def run_background_updates(self, sleep=True):
logger.info("Starting background schema updates")
while True:
if sleep:
- yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
+ await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
try:
- result = yield self.do_next_background_update(
+ result = await self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
except Exception:
@@ -170,20 +170,21 @@ class BackgroundUpdater(object):
return not update_exists
- @defer.inlineCallbacks
- def do_next_background_update(self, desired_duration_ms):
+ async def do_next_background_update(
+ self, desired_duration_ms: float
+ ) -> Optional[int]:
"""Does some amount of work on the next queued background update
+ Returns once some amount of work is done.
+
Args:
desired_duration_ms(float): How long we want to spend
updating.
Returns:
- A deferred that completes once some amount of work is done.
- The deferred will have a value of None if there is currently
- no more work to do.
+ None if there is no more work to do, otherwise an int
"""
if not self._background_update_queue:
- updates = yield self.db.simple_select_list(
+ updates = await self.db.simple_select_list(
"background_updates",
keyvalues=None,
retcols=("update_name", "depends_on"),
@@ -201,11 +202,12 @@ class BackgroundUpdater(object):
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)
- res = yield self._do_background_update(update_name, desired_duration_ms)
+ res = await self._do_background_update(update_name, desired_duration_ms)
return res
- @defer.inlineCallbacks
- def _do_background_update(self, update_name, desired_duration_ms):
+ async def _do_background_update(
+ self, update_name: str, desired_duration_ms: float
+ ) -> int:
logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name]
@@ -225,7 +227,7 @@ class BackgroundUpdater(object):
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
- progress_json = yield self.db.simple_select_one_onecol(
+ progress_json = await self.db.simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="progress_json",
@@ -234,7 +236,7 @@ class BackgroundUpdater(object):
progress = json.loads(progress_json)
time_start = self._clock.time_msec()
- items_updated = yield update_handler(progress, batch_size)
+ items_updated = await update_handler(progress, batch_size)
time_stop = self._clock.time_msec()
duration_ms = time_stop - time_start
@@ -263,7 +265,9 @@ class BackgroundUpdater(object):
* A dict of the current progress
* An integer count of the number of items to update in this batch.
- The handler should return a deferred integer count of items updated.
+ The handler should return a deferred or coroutine which returns an integer count
+ of items updated.
+
The handler is responsible for updating the progress of the update.
Args:
@@ -432,6 +436,21 @@ class BackgroundUpdater(object):
"background_updates", keyvalues={"update_name": update_name}
)
+ def _background_update_progress(self, update_name: str, progress: dict):
+ """Update the progress of a background update
+
+ Args:
+ update_name: The name of the background update task
+ progress: The progress of the update.
+ """
+
+ return self.db.runInteraction(
+ "background_update_progress",
+ self._background_update_progress_txn,
+ update_name,
+ progress,
+ )
+
def _background_update_progress_txn(self, txn, update_name, progress):
"""Update the progress of a background update
diff --git a/synapse/storage/data_stores/__init__.py b/synapse/storage/data_stores/__init__.py
index d20df5f076..092e803799 100644
--- a/synapse/storage/data_stores/__init__.py
+++ b/synapse/storage/data_stores/__init__.py
@@ -37,6 +37,8 @@ class DataStores(object):
# store.
self.databases = []
+ self.main = None
+ self.state = None
for database_config in hs.config.database.databases:
db_name = database_config.name
@@ -54,10 +56,22 @@ class DataStores(object):
if "main" in database_config.data_stores:
logger.info("Starting 'main' data store")
+
+ # Sanity check we don't try and configure the main store on
+ # multiple databases.
+ if self.main:
+ raise Exception("'main' data store already configured")
+
self.main = main_store_class(database, db_conn, hs)
if "state" in database_config.data_stores:
logger.info("Starting 'state' data store")
+
+ # Sanity check we don't try and configure the state store on
+ # multiple databases.
+ if self.state:
+ raise Exception("'state' data store already configured")
+
self.state = StateGroupDataStore(database, db_conn, hs)
db_conn.commit()
@@ -65,3 +79,10 @@ class DataStores(object):
self.databases.append(database)
logger.info("Database %r prepared", db_name)
+
+ # Sanity check that we have actually configured all the required stores.
+ if not self.main:
+ raise Exception("No 'main' data store configured")
+
+ if not self.state:
+ raise Exception("No 'main' data store configured")
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 1f517e8fad..60c67457b4 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -14,13 +14,10 @@
# limitations under the License.
import itertools
import logging
-import random
from six.moves import range
from six.moves.queue import Empty, PriorityQueue
-from unpaddedbase64 import encode_base64
-
from twisted.internet import defer
from synapse.api.errors import StoreError
@@ -148,8 +145,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
retcol="event_id",
)
- @defer.inlineCallbacks
- def get_prev_events_for_room(self, room_id):
+ def get_prev_events_for_room(self, room_id: str):
"""
Gets a subset of the current forward extremities in the given room.
@@ -160,40 +156,29 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
room_id (str): room_id
Returns:
- Deferred[list[(str, dict[str, str], int)]]
- for each event, a tuple of (event_id, hashes, depth)
- where *hashes* is a map from algorithm to hash.
+ Deferred[List[str]]: the event ids of the forward extremites
+
"""
- res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
- if len(res) > 10:
- # Sort by reverse depth, so we point to the most recent.
- res.sort(key=lambda a: -a[2])
- # we use half of the limit for the actual most recent events, and
- # the other half to randomly point to some of the older events, to
- # make sure that we don't completely ignore the older events.
- res = res[0:5] + random.sample(res[5:], 5)
+ return self.db.runInteraction(
+ "get_prev_events_for_room", self._get_prev_events_for_room_txn, room_id
+ )
- return res
+ def _get_prev_events_for_room_txn(self, txn, room_id: str):
+ # we just use the 10 newest events. Older events will become
+ # prev_events of future events.
- def get_latest_event_ids_and_hashes_in_room(self, room_id):
+ sql = """
+ SELECT e.event_id FROM event_forward_extremities AS f
+ INNER JOIN events AS e USING (event_id)
+ WHERE f.room_id = ?
+ ORDER BY e.depth DESC
+ LIMIT 10
"""
- Gets the current forward extremities in the given room
- Args:
- room_id (str): room_id
+ txn.execute(sql, (room_id,))
- Returns:
- Deferred[list[(str, dict[str, str], int)]]
- for each event, a tuple of (event_id, hashes, depth)
- where *hashes* is a map from algorithm to hash.
- """
-
- return self.db.runInteraction(
- "get_latest_event_ids_and_hashes_in_room",
- self._get_latest_event_ids_and_hashes_in_room,
- room_id,
- )
+ return [row[0] for row in txn]
def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter):
"""Get the top rooms with at least N extremities.
@@ -243,27 +228,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
desc="get_latest_event_ids_in_room",
)
- def _get_latest_event_ids_and_hashes_in_room(self, txn, room_id):
- sql = (
- "SELECT e.event_id, e.depth FROM events as e "
- "INNER JOIN event_forward_extremities as f "
- "ON e.event_id = f.event_id "
- "AND e.room_id = f.room_id "
- "WHERE f.room_id = ?"
- )
-
- txn.execute(sql, (room_id,))
-
- results = []
- for event_id, depth in txn.fetchall():
- hashes = self._get_event_reference_hashes_txn(txn, event_id)
- prev_hashes = {
- k: encode_base64(v) for k, v in hashes.items() if k == "sha256"
- }
- results.append((event_id, prev_hashes, depth))
-
- return results
-
def get_min_depth(self, room_id):
""" For hte given room, get the minimum depth we have seen for it.
"""
@@ -506,7 +470,7 @@ class EventFederationStore(EventFederationWorkerStore):
def _update_min_depth_for_room_txn(self, txn, room_id, depth):
min_depth = self._get_min_depth_interaction(txn, room_id)
- if min_depth and depth >= min_depth:
+ if min_depth is not None and depth >= min_depth:
return
self.db.simple_upsert_txn(
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 2c9142814c..0cce5232f5 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -137,7 +137,7 @@ class EventsWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def get_event(
self,
- event_id: List[str],
+ event_id: str,
redact_behaviour: EventRedactBehaviour = EventRedactBehaviour.REDACT,
get_prev_content: bool = False,
allow_rejected: bool = False,
@@ -148,15 +148,22 @@ class EventsWorkerStore(SQLBaseStore):
Args:
event_id: The event_id of the event to fetch
+
redact_behaviour: Determine what to do with a redacted event. Possible values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
- * DISALLOW - Do not return redacted events
+ * DISALLOW - Do not return redacted events (behave as per allow_none
+ if the event is redacted)
+
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
- allow_rejected: If True return rejected events.
+
+ allow_rejected: If True, return rejected events. Otherwise,
+ behave as per allow_none.
+
allow_none: If True, return None if no event found, if
False throw a NotFoundError
+
check_room_id: if not None, check the room of the found event.
If there is a mismatch, behave as per allow_none.
@@ -196,14 +203,18 @@ class EventsWorkerStore(SQLBaseStore):
Args:
event_ids: The event_ids of the events to fetch
+
redact_behaviour: Determine what to do with a redacted event. Possible
values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
- * DISALLOW - Do not return redacted events
+ * DISALLOW - Do not return redacted events (omit them from the response)
+
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
- allow_rejected: If True return rejected events.
+
+ allow_rejected: If True, return rejected events. Otherwise,
+ omits rejeted events from the response.
Returns:
Deferred : Dict from event_id to event.
@@ -228,15 +239,21 @@ class EventsWorkerStore(SQLBaseStore):
"""Get events from the database and return in a list in the same order
as given by `event_ids` arg.
+ Unknown events will be omitted from the response.
+
Args:
event_ids: The event_ids of the events to fetch
+
redact_behaviour: Determine what to do with a redacted event. Possible values:
* AS_IS - Return the full event body with no redacted content
* REDACT - Return the event but with a redacted body
- * DISALLOW - Do not return redacted events
+ * DISALLOW - Do not return redacted events (omit them from the response)
+
get_prev_content: If True and event is a state event,
include the previous states content in the unsigned field.
- allow_rejected: If True, return rejected events.
+
+ allow_rejected: If True, return rejected events. Otherwise,
+ omits rejected events from the response.
Returns:
Deferred[list[EventBase]]: List of events fetched from the database. The
@@ -369,9 +386,14 @@ class EventsWorkerStore(SQLBaseStore):
If events are pulled from the database, they will be cached for future lookups.
+ Unknown events are omitted from the response.
+
Args:
+
event_ids (Iterable[str]): The event_ids of the events to fetch
- allow_rejected (bool): Whether to include rejected events
+
+ allow_rejected (bool): Whether to include rejected events. If False,
+ rejected events are omitted from the response.
Returns:
Deferred[Dict[str, _EventCacheEntry]]:
@@ -506,9 +528,13 @@ class EventsWorkerStore(SQLBaseStore):
Returned events will be added to the cache for future lookups.
+ Unknown events are omitted from the response.
+
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
- allow_rejected (bool): Whether to include rejected events
+
+ allow_rejected (bool): Whether to include rejected events. If False,
+ rejected events are omitted from the response.
Returns:
Deferred[Dict[str, _EventCacheEntry]]:
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index aa476d0fbf..79cfd39194 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -17,6 +17,7 @@
import collections
import logging
import re
+from abc import abstractmethod
from typing import Optional, Tuple
from six import integer_types
@@ -367,6 +368,8 @@ class RoomWorkerStore(SQLBaseStore):
class RoomBackgroundUpdateStore(SQLBaseStore):
+ REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
+
def __init__(self, database: Database, db_conn, hs):
super(RoomBackgroundUpdateStore, self).__init__(database, db_conn, hs)
@@ -376,6 +379,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
"insert_room_retention", self._background_insert_retention,
)
+ self.db.updates.register_background_update_handler(
+ self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE,
+ self._remove_tombstoned_rooms_from_directory,
+ )
+
@defer.inlineCallbacks
def _background_insert_retention(self, progress, batch_size):
"""Retrieves a list of all rooms within a range and inserts an entry for each of
@@ -444,6 +452,62 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
defer.returnValue(batch_size)
+ async def _remove_tombstoned_rooms_from_directory(
+ self, progress, batch_size
+ ) -> int:
+ """Removes any rooms with tombstone events from the room directory
+
+ Nowadays this is handled by the room upgrade handler, but we may have some
+ that got left behind
+ """
+
+ last_room = progress.get("room_id", "")
+
+ def _get_rooms(txn):
+ txn.execute(
+ """
+ SELECT room_id
+ FROM rooms r
+ INNER JOIN current_state_events cse USING (room_id)
+ WHERE room_id > ? AND r.is_public
+ AND cse.type = '%s' AND cse.state_key = ''
+ ORDER BY room_id ASC
+ LIMIT ?;
+ """
+ % EventTypes.Tombstone,
+ (last_room, batch_size),
+ )
+
+ return [row[0] for row in txn]
+
+ rooms = await self.db.runInteraction(
+ "get_tombstoned_directory_rooms", _get_rooms
+ )
+
+ if not rooms:
+ await self.db.updates._end_background_update(
+ self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE
+ )
+ return 0
+
+ for room_id in rooms:
+ logger.info("Removing tombstoned room %s from the directory", room_id)
+ await self.set_room_is_public(room_id, False)
+
+ await self.db.updates._background_update_progress(
+ self.REMOVE_TOMESTONED_ROOMS_BG_UPDATE, {"room_id": rooms[-1]}
+ )
+
+ return len(rooms)
+
+ @abstractmethod
+ def set_room_is_public(self, room_id, is_public):
+ # this will need to be implemented if a background update is performed with
+ # existing (tombstoned, public) rooms in the database.
+ #
+ # It's overridden by RoomStore for the synapse master.
+ raise NotImplementedError()
+
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def __init__(self, database: Database, db_conn, hs):
diff --git a/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql b/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql
new file mode 100644
index 0000000000..aeb17813d3
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/56/remove_tombstoned_rooms_from_directory.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Now that #6232 is a thing, we can remove old rooms from the directory.
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('remove_tombstoned_rooms_from_directory', '{}');
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 0dc39f139c..d07440e3ed 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import collections.abc
import logging
from collections import namedtuple
from typing import Iterable, Tuple
@@ -107,7 +107,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
predecessor = create_event.content.get("predecessor", None)
# Ensure the key is a dictionary
- if not isinstance(predecessor, dict):
+ if not isinstance(predecessor, collections.abc.Mapping):
return None
return predecessor
|