diff options
Diffstat (limited to 'synapse/storage')
28 files changed, 1266 insertions, 456 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index eb1a7e5002..59f3394b0a 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -90,8 +90,10 @@ class BackgroundUpdater(object): self._clock = hs.get_clock() self.db = database + # if a background update is currently running, its name. + self._current_background_update = None # type: Optional[str] + self._background_update_performance = {} - self._background_update_queue = [] self._background_update_handlers = {} self._all_done = False @@ -111,7 +113,7 @@ class BackgroundUpdater(object): except Exception: logger.exception("Error doing update") else: - if result is None: + if result: logger.info( "No more background updates to do." " Unscheduling background update task." @@ -119,26 +121,25 @@ class BackgroundUpdater(object): self._all_done = True return None - @defer.inlineCallbacks - def has_completed_background_updates(self): + async def has_completed_background_updates(self) -> bool: """Check if all the background updates have completed Returns: - Deferred[bool]: True if all background updates have completed + True if all background updates have completed """ # if we've previously determined that there is nothing left to do, that # is easy if self._all_done: return True - # obviously, if we have things in our queue, we're not done. - if self._background_update_queue: + # obviously, if we are currently processing an update, we're not done. + if self._current_background_update: return False # otherwise, check if there are updates to be run. This is important, # as we may be running on a worker which doesn't perform the bg updates # itself, but still wants to wait for them to happen. - updates = yield self.db.simple_select_onecol( + updates = await self.db.simple_select_onecol( "background_updates", keyvalues=None, retcol="1", @@ -153,11 +154,10 @@ class BackgroundUpdater(object): async def has_completed_background_update(self, update_name) -> bool: """Check if the given background update has finished running. """ - if self._all_done: return True - if update_name in self._background_update_queue: + if update_name == self._current_background_update: return False update_exists = await self.db.simple_select_one_onecol( @@ -170,9 +170,7 @@ class BackgroundUpdater(object): return not update_exists - async def do_next_background_update( - self, desired_duration_ms: float - ) -> Optional[int]: + async def do_next_background_update(self, desired_duration_ms: float) -> bool: """Does some amount of work on the next queued background update Returns once some amount of work is done. @@ -181,33 +179,51 @@ class BackgroundUpdater(object): desired_duration_ms(float): How long we want to spend updating. Returns: - None if there is no more work to do, otherwise an int + True if we have finished running all the background updates, otherwise False """ - if not self._background_update_queue: - updates = await self.db.simple_select_list( - "background_updates", - keyvalues=None, - retcols=("update_name", "depends_on"), + + def get_background_updates_txn(txn): + txn.execute( + """ + SELECT update_name, depends_on FROM background_updates + ORDER BY ordering, update_name + """ ) - in_flight = {update["update_name"] for update in updates} - for update in updates: - if update["depends_on"] not in in_flight: - self._background_update_queue.append(update["update_name"]) + return self.db.cursor_to_dict(txn) - if not self._background_update_queue: - # no work left to do - return None + if not self._current_background_update: + all_pending_updates = await self.db.runInteraction( + "background_updates", get_background_updates_txn, + ) + if not all_pending_updates: + # no work left to do + return True + + # find the first update which isn't dependent on another one in the queue. + pending = {update["update_name"] for update in all_pending_updates} + for upd in all_pending_updates: + depends_on = upd["depends_on"] + if not depends_on or depends_on not in pending: + break + logger.info( + "Not starting on bg update %s until %s is done", + upd["update_name"], + depends_on, + ) + else: + # if we get to the end of that for loop, there is a problem + raise Exception( + "Unable to find a background update which doesn't depend on " + "another: dependency cycle?" + ) - # pop from the front, and add back to the back - update_name = self._background_update_queue.pop(0) - self._background_update_queue.append(update_name) + self._current_background_update = upd["update_name"] - res = await self._do_background_update(update_name, desired_duration_ms) - return res + await self._do_background_update(desired_duration_ms) + return False - async def _do_background_update( - self, update_name: str, desired_duration_ms: float - ) -> int: + async def _do_background_update(self, desired_duration_ms: float) -> int: + update_name = self._current_background_update logger.info("Starting update batch on background update '%s'", update_name) update_handler = self._background_update_handlers[update_name] @@ -400,27 +416,6 @@ class BackgroundUpdater(object): self.register_background_update_handler(update_name, updater) - def start_background_update(self, update_name, progress): - """Starts a background update running. - - Args: - update_name: The update to set running. - progress: The initial state of the progress of the update. - - Returns: - A deferred that completes once the task has been added to the - queue. - """ - # Clear the background update queue so that we will pick up the new - # task on the next iteration of do_background_update. - self._background_update_queue = [] - progress_json = json.dumps(progress) - - return self.db.simple_insert( - "background_updates", - {"update_name": update_name, "progress_json": progress_json}, - ) - def _end_background_update(self, update_name): """Removes a completed background update task from the queue. @@ -429,9 +424,12 @@ class BackgroundUpdater(object): Returns: A deferred that completes once the task is removed. """ - self._background_update_queue = [ - name for name in self._background_update_queue if name != update_name - ] + if update_name != self._current_background_update: + raise Exception( + "Cannot end background update %s which isn't currently running" + % update_name + ) + self._current_background_update = None return self.db.simple_delete_one( "background_updates", keyvalues={"update_name": update_name} ) diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index acca079f23..ceba10882c 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -66,6 +66,7 @@ from .stats import StatsStore from .stream import StreamStore from .tags import TagsStore from .transactions import TransactionStore +from .ui_auth import UIAuthStore from .user_directory import UserDirectoryStore from .user_erasure_store import UserErasureStore @@ -112,6 +113,7 @@ class DataStore( StatsStore, RelationsStore, CacheInvalidationStore, + UIAuthStore, ): def __init__(self, database: Database, db_conn, hs): self.hs = hs @@ -144,7 +146,10 @@ class DataStore( db_conn, "device_lists_stream", "stream_id", - extra_tables=[("user_signature_stream", "stream_id")], + extra_tables=[ + ("user_signature_stream", "stream_id"), + ("device_lists_outbound_pokes", "stream_id"), + ], ) self._cross_signing_id_gen = StreamIdGenerator( db_conn, "e2e_cross_signing_keys", "stream_id" @@ -500,7 +505,8 @@ class DataStore( self, start, limit, name=None, guests=True, deactivated=False ): """Function to retrieve a paginated list of users from - users list. This will return a json list of users. + users list. This will return a json list of users and the + total number of users matching the filter criteria. Args: start (int): start number to begin the query from @@ -509,35 +515,44 @@ class DataStore( guests (bool): whether to in include guest users deactivated (bool): whether to include deactivated users Returns: - defer.Deferred: resolves to list[dict[str, Any]] + defer.Deferred: resolves to list[dict[str, Any]], int """ - name_filter = {} - if name: - name_filter["name"] = "%" + name + "%" - - attr_filter = {} - if not guests: - attr_filter["is_guest"] = 0 - if not deactivated: - attr_filter["deactivated"] = 0 - - return self.db.simple_select_list_paginate( - desc="get_users_paginate", - table="users", - orderby="name", - start=start, - limit=limit, - filters=name_filter, - keyvalues=attr_filter, - retcols=[ - "name", - "password_hash", - "is_guest", - "admin", - "user_type", - "deactivated", - ], - ) + + def get_users_paginate_txn(txn): + filters = [] + args = [] + + if name: + filters.append("name LIKE ?") + args.append("%" + name + "%") + + if not guests: + filters.append("is_guest = 0") + + if not deactivated: + filters.append("deactivated = 0") + + where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else "" + + sql = "SELECT COUNT(*) as total_users FROM users %s" % (where_clause) + txn.execute(sql, args) + count = txn.fetchone()[0] + + args = [self.hs.config.server_name] + args + [limit, start] + sql = """ + SELECT name, user_type, is_guest, admin, deactivated, displayname, avatar_url + FROM users as u + LEFT JOIN profiles AS p ON u.name = '@' || p.user_id || ':' || ? + {} + ORDER BY u.name LIMIT ? OFFSET ? + """.format( + where_clause + ) + txn.execute(sql, args) + users = self.db.cursor_to_dict(txn) + return users, count + + return self.db.runInteraction("get_users_paginate_txn", get_users_paginate_txn) def search_users(self, term): """Function to search users list for one or more users with diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py index d4c44dcc75..4dc5da3fe8 100644 --- a/synapse/storage/data_stores/main/cache.py +++ b/synapse/storage/data_stores/main/cache.py @@ -32,7 +32,29 @@ logger = logging.getLogger(__name__) CURRENT_STATE_CACHE_NAME = "cs_cache_fake" -class CacheInvalidationStore(SQLBaseStore): +class CacheInvalidationWorkerStore(SQLBaseStore): + def get_all_updated_caches(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_updated_caches_txn(txn): + # We purposefully don't bound by the current token, as we want to + # send across cache invalidations as quickly as possible. Cache + # invalidations are idempotent, so duplicates are fine. + sql = ( + "SELECT stream_id, cache_func, keys, invalidation_ts" + " FROM cache_invalidation_stream" + " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, limit)) + return txn.fetchall() + + return self.db.runInteraction( + "get_all_updated_caches", get_all_updated_caches_txn + ) + + +class CacheInvalidationStore(CacheInvalidationWorkerStore): async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]): """Invalidates the cache and adds it to the cache stream so slaves will know to invalidate their caches. @@ -145,26 +167,6 @@ class CacheInvalidationStore(SQLBaseStore): }, ) - def get_all_updated_caches(self, last_id, current_id, limit): - if last_id == current_id: - return defer.succeed([]) - - def get_all_updated_caches_txn(txn): - # We purposefully don't bound by the current token, as we want to - # send across cache invalidations as quickly as possible. Cache - # invalidations are idempotent, so duplicates are fine. - sql = ( - "SELECT stream_id, cache_func, keys, invalidation_ts" - " FROM cache_invalidation_stream" - " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, limit)) - return txn.fetchall() - - return self.db.runInteraction( - "get_all_updated_caches", get_all_updated_caches_txn - ) - def get_cache_stream_token(self): if self._cache_id_gen: return self._cache_id_gen.get_current_token() diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py index e1ccb27142..92bc06919b 100644 --- a/synapse/storage/data_stores/main/client_ips.py +++ b/synapse/storage/data_stores/main/client_ips.py @@ -21,7 +21,7 @@ from twisted.internet import defer from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore -from synapse.storage.database import Database +from synapse.storage.database import Database, make_tuple_comparison_clause from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.caches.descriptors import Cache @@ -303,16 +303,10 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): # we'll just end up updating the same device row multiple # times, which is fine. - if self.database_engine.supports_tuple_comparison: - where_clause = "(user_id, device_id) > (?, ?)" - where_args = [last_user_id, last_device_id] - else: - # We explicitly do a `user_id >= ? AND (...)` here to ensure - # that an index is used, as doing `user_id > ? OR (user_id = ? AND ...)` - # makes it hard for query optimiser to tell that it can use the - # index on user_id - where_clause = "user_id >= ? AND (user_id > ? OR device_id > ?)" - where_args = [last_user_id, last_user_id, last_device_id] + where_clause, where_args = make_tuple_comparison_clause( + self.database_engine, + [("user_id", last_user_id), ("device_id", last_device_id)], + ) sql = """ SELECT diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py index 0613b49f4a..9a1178fb39 100644 --- a/synapse/storage/data_stores/main/deviceinbox.py +++ b/synapse/storage/data_stores/main/deviceinbox.py @@ -207,6 +207,50 @@ class DeviceInboxWorkerStore(SQLBaseStore): "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn ) + def get_all_new_device_messages(self, last_pos, current_pos, limit): + """ + Args: + last_pos(int): + current_pos(int): + limit(int): + Returns: + A deferred list of rows from the device inbox + """ + if last_pos == current_pos: + return defer.succeed([]) + + def get_all_new_device_messages_txn(txn): + # We limit like this as we might have multiple rows per stream_id, and + # we want to make sure we always get all entries for any stream_id + # we return. + upper_pos = min(current_pos, last_pos + limit) + sql = ( + "SELECT max(stream_id), user_id" + " FROM device_inbox" + " WHERE ? < stream_id AND stream_id <= ?" + " GROUP BY user_id" + ) + txn.execute(sql, (last_pos, upper_pos)) + rows = txn.fetchall() + + sql = ( + "SELECT max(stream_id), destination" + " FROM device_federation_outbox" + " WHERE ? < stream_id AND stream_id <= ?" + " GROUP BY destination" + ) + txn.execute(sql, (last_pos, upper_pos)) + rows.extend(txn) + + # Order by ascending stream ordering + rows.sort() + + return rows + + return self.db.runInteraction( + "get_all_new_device_messages", get_all_new_device_messages_txn + ) + class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" @@ -411,47 +455,3 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore) rows.append((user_id, device_id, stream_id, message_json)) txn.executemany(sql, rows) - - def get_all_new_device_messages(self, last_pos, current_pos, limit): - """ - Args: - last_pos(int): - current_pos(int): - limit(int): - Returns: - A deferred list of rows from the device inbox - """ - if last_pos == current_pos: - return defer.succeed([]) - - def get_all_new_device_messages_txn(txn): - # We limit like this as we might have multiple rows per stream_id, and - # we want to make sure we always get all entries for any stream_id - # we return. - upper_pos = min(current_pos, last_pos + limit) - sql = ( - "SELECT max(stream_id), user_id" - " FROM device_inbox" - " WHERE ? < stream_id AND stream_id <= ?" - " GROUP BY user_id" - ) - txn.execute(sql, (last_pos, upper_pos)) - rows = txn.fetchall() - - sql = ( - "SELECT max(stream_id), destination" - " FROM device_federation_outbox" - " WHERE ? < stream_id AND stream_id <= ?" - " GROUP BY destination" - ) - txn.execute(sql, (last_pos, upper_pos)) - rows.extend(txn) - - # Order by ascending stream ordering - rows.sort() - - return rows - - return self.db.runInteraction( - "get_all_new_device_messages", get_all_new_device_messages_txn - ) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index 8af5f7de54..03f5141e6c 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -15,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List, Tuple from six import iteritems @@ -31,7 +32,11 @@ from synapse.logging.opentracing import ( ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause -from synapse.storage.database import Database +from synapse.storage.database import ( + Database, + LoggingTransaction, + make_tuple_comparison_clause, +) from synapse.types import Collection, get_verify_key_from_cross_signing_key from synapse.util.caches.descriptors import ( Cache, @@ -40,6 +45,7 @@ from synapse.util.caches.descriptors import ( cachedList, ) from synapse.util.iterutils import batch_iter +from synapse.util.stringutils import shortstr logger = logging.getLogger(__name__) @@ -47,6 +53,8 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( "drop_device_list_streams_non_unique_indexes" ) +BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes" + class DeviceWorkerStore(SQLBaseStore): def get_device(self, user_id, device_id): @@ -112,23 +120,13 @@ class DeviceWorkerStore(SQLBaseStore): if not has_changed: return now_stream_id, [] - # We retrieve n+1 devices from the list of outbound pokes where n is - # our outbound device update limit. We then check if the very last - # device has the same stream_id as the second-to-last device. If so, - # then we ignore all devices with that stream_id and only send the - # devices with a lower stream_id. - # - # If when culling the list we end up with no devices afterwards, we - # consider the device update to be too large, and simply skip the - # stream_id; the rationale being that such a large device list update - # is likely an error. updates = yield self.db.runInteraction( "get_device_updates_by_remote", self._get_device_updates_by_remote_txn, destination, from_stream_id, now_stream_id, - limit + 1, + limit, ) # Return an empty list if there are no updates @@ -166,14 +164,6 @@ class DeviceWorkerStore(SQLBaseStore): "device_id": verify_key.version, } - # if we have exceeded the limit, we need to exclude any results with the - # same stream_id as the last row. - if len(updates) > limit: - stream_id_cutoff = updates[-1][2] - now_stream_id = stream_id_cutoff - 1 - else: - stream_id_cutoff = None - # Perform the equivalent of a GROUP BY # # Iterate through the updates list and copy non-duplicate @@ -181,7 +171,6 @@ class DeviceWorkerStore(SQLBaseStore): # the max stream_id across each set of duplicate entries # # maps (user_id, device_id) -> (stream_id, opentracing_context) - # as long as their stream_id does not match that of the last row # # opentracing_context contains the opentracing metadata for the request # that created the poke @@ -192,10 +181,6 @@ class DeviceWorkerStore(SQLBaseStore): query_map = {} cross_signing_keys_by_user = {} for user_id, device_id, update_stream_id, update_context in updates: - if stream_id_cutoff is not None and update_stream_id >= stream_id_cutoff: - # Stop processing updates - break - if ( user_id in master_key_by_user and device_id == master_key_by_user[user_id]["device_id"] @@ -218,17 +203,6 @@ class DeviceWorkerStore(SQLBaseStore): if update_stream_id > previous_update_stream_id: query_map[key] = (update_stream_id, update_context) - # If we didn't find any updates with a stream_id lower than the cutoff, it - # means that there are more than limit updates all of which have the same - # steam_id. - - # That should only happen if a client is spamming the server with new - # devices, in which case E2E isn't going to work well anyway. We'll just - # skip that stream_id and return an empty list, and continue with the next - # stream_id next time. - if not query_map and not cross_signing_keys_by_user: - return stream_id_cutoff, [] - results = yield self._get_device_update_edus_by_remote( destination, from_stream_id, query_map ) @@ -259,11 +233,11 @@ class DeviceWorkerStore(SQLBaseStore): # get the list of device updates that need to be sent sql = """ SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes - WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ? + WHERE destination = ? AND ? < stream_id AND stream_id <= ? ORDER BY stream_id LIMIT ? """ - txn.execute(sql, (destination, from_stream_id, now_stream_id, False, limit)) + txn.execute(sql, (destination, from_stream_id, now_stream_id, limit)) return list(txn) @@ -301,7 +275,14 @@ class DeviceWorkerStore(SQLBaseStore): prev_id = yield self._get_last_device_update_for_remote_user( destination, user_id, from_stream_id ) - for device_id, device in iteritems(user_devices): + + # make sure we go through the devices in stream order + device_ids = sorted( + user_devices.keys(), key=lambda i: query_map[(user_id, i)][0], + ) + + for device_id in device_ids: + device = user_devices[device_id] stream_id, opentracing_context = query_map[(user_id, device_id)] result = { "user_id": user_id, @@ -560,8 +541,8 @@ class DeviceWorkerStore(SQLBaseStore): # Get set of users who *may* have changed. Users not in the returned # list have definitely not changed. - to_check = list( - self._device_list_stream_cache.get_entities_changed(user_ids, from_key) + to_check = self._device_list_stream_cache.get_entities_changed( + user_ids, from_key ) if not to_check: @@ -611,22 +592,33 @@ class DeviceWorkerStore(SQLBaseStore): else: return set() - def get_all_device_list_changes_for_remotes(self, from_key, to_key): - """Return a list of `(stream_id, user_id, destination)` which is the - combined list of changes to devices, and which destinations need to be - poked. `destination` may be None if no destinations need to be poked. + async def get_all_device_list_changes_for_remotes( + self, from_key: int, to_key: int, limit: int, + ) -> List[Tuple[int, str]]: + """Return a list of `(stream_id, entity)` which is the combined list of + changes to devices and which destinations need to be poked. Entity is + either a user ID (starting with '@') or a remote destination. """ - # We do a group by here as there can be a large number of duplicate - # entries, since we throw away device IDs. + + # This query Does The Right Thing where it'll correctly apply the + # bounds to the inner queries. sql = """ - SELECT MAX(stream_id) AS stream_id, user_id, destination - FROM device_lists_stream - LEFT JOIN device_lists_outbound_pokes USING (stream_id, user_id, device_id) + SELECT stream_id, entity FROM ( + SELECT stream_id, user_id AS entity FROM device_lists_stream + UNION ALL + SELECT stream_id, destination AS entity FROM device_lists_outbound_pokes + ) AS e WHERE ? < stream_id AND stream_id <= ? - GROUP BY user_id, destination + LIMIT ? """ - return self.db.execute( - "get_all_device_list_changes_for_remotes", None, sql, from_key, to_key + + return await self.db.execute( + "get_all_device_list_changes_for_remotes", + None, + sql, + from_key, + to_key, + limit, ) @cached(max_entries=10000) @@ -728,6 +720,11 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): self._drop_device_list_streams_non_unique_indexes, ) + # clear out duplicate device list outbound pokes + self.db.updates.register_background_update_handler( + BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, self._remove_duplicate_outbound_pokes, + ) + @defer.inlineCallbacks def _drop_device_list_streams_non_unique_indexes(self, progress, batch_size): def f(conn): @@ -742,6 +739,66 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): ) return 1 + async def _remove_duplicate_outbound_pokes(self, progress, batch_size): + # for some reason, we have accumulated duplicate entries in + # device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less + # efficient. + # + # For each duplicate, we delete all the existing rows and put one back. + + KEY_COLS = ["stream_id", "destination", "user_id", "device_id"] + last_row = progress.get( + "last_row", + {"stream_id": 0, "destination": "", "user_id": "", "device_id": ""}, + ) + + def _txn(txn): + clause, args = make_tuple_comparison_clause( + self.db.engine, [(x, last_row[x]) for x in KEY_COLS] + ) + sql = """ + SELECT stream_id, destination, user_id, device_id, MAX(ts) AS ts + FROM device_lists_outbound_pokes + WHERE %s + GROUP BY %s + HAVING count(*) > 1 + ORDER BY %s + LIMIT ? + """ % ( + clause, # WHERE + ",".join(KEY_COLS), # GROUP BY + ",".join(KEY_COLS), # ORDER BY + ) + txn.execute(sql, args + [batch_size]) + rows = self.db.cursor_to_dict(txn) + + row = None + for row in rows: + self.db.simple_delete_txn( + txn, "device_lists_outbound_pokes", {x: row[x] for x in KEY_COLS}, + ) + + row["sent"] = False + self.db.simple_insert_txn( + txn, "device_lists_outbound_pokes", row, + ) + + if row: + self.db.updates._background_update_progress_txn( + txn, BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, {"last_row": row}, + ) + + return len(rows) + + rows = await self.db.runInteraction(BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES, _txn) + + if not rows: + await self.db.updates._end_background_update( + BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES + ) + + return rows + class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): def __init__(self, database: Database, db_conn, hs): @@ -1021,29 +1078,49 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): """Persist that a user's devices have been updated, and which hosts (if any) should be poked. """ - with self._device_list_id_gen.get_next() as stream_id: + if not device_ids: + return + + with self._device_list_id_gen.get_next_mult(len(device_ids)) as stream_ids: + yield self.db.runInteraction( + "add_device_change_to_stream", + self._add_device_change_to_stream_txn, + user_id, + device_ids, + stream_ids, + ) + + if not hosts: + return stream_ids[-1] + + context = get_active_span_text_map() + with self._device_list_id_gen.get_next_mult( + len(hosts) * len(device_ids) + ) as stream_ids: yield self.db.runInteraction( - "add_device_change_to_streams", - self._add_device_change_txn, + "add_device_outbound_poke_to_stream", + self._add_device_outbound_poke_to_stream_txn, user_id, device_ids, hosts, - stream_id, + stream_ids, + context, ) - return stream_id - def _add_device_change_txn(self, txn, user_id, device_ids, hosts, stream_id): - now = self._clock.time_msec() + return stream_ids[-1] + def _add_device_change_to_stream_txn( + self, + txn: LoggingTransaction, + user_id: str, + device_ids: Collection[str], + stream_ids: List[str], + ): txn.call_after( - self._device_list_stream_cache.entity_has_changed, user_id, stream_id + self._device_list_stream_cache.entity_has_changed, user_id, stream_ids[-1], ) - for host in hosts: - txn.call_after( - self._device_list_federation_stream_cache.entity_has_changed, - host, - stream_id, - ) + + min_stream_id = stream_ids[0] # Delete older entries in the table, as we really only care about # when the latest change happened. @@ -1052,7 +1129,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): DELETE FROM device_lists_stream WHERE user_id = ? AND device_id = ? AND stream_id < ? """, - [(user_id, device_id, stream_id) for device_id in device_ids], + [(user_id, device_id, min_stream_id) for device_id in device_ids], ) self.db.simple_insert_many_txn( @@ -1060,11 +1137,22 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): table="device_lists_stream", values=[ {"stream_id": stream_id, "user_id": user_id, "device_id": device_id} - for device_id in device_ids + for stream_id, device_id in zip(stream_ids, device_ids) ], ) - context = get_active_span_text_map() + def _add_device_outbound_poke_to_stream_txn( + self, txn, user_id, device_ids, hosts, stream_ids, context, + ): + for host in hosts: + txn.call_after( + self._device_list_federation_stream_cache.entity_has_changed, + host, + stream_ids[-1], + ) + + now = self._clock.time_msec() + next_stream_id = iter(stream_ids) self.db.simple_insert_many_txn( txn, @@ -1072,7 +1160,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): values=[ { "destination": destination, - "stream_id": stream_id, + "stream_id": next(next_stream_id), "user_id": user_id, "device_id": device_id, "sent": False, @@ -1086,18 +1174,47 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ], ) - def _prune_old_outbound_device_pokes(self): + def _prune_old_outbound_device_pokes(self, prune_age=24 * 60 * 60 * 1000): """Delete old entries out of the device_lists_outbound_pokes to ensure - that we don't fill up due to dead servers. We keep one entry per - (destination, user_id) tuple to ensure that the prev_ids remain correct - if the server does come back. + that we don't fill up due to dead servers. + + Normally, we try to send device updates as a delta since a previous known point: + this is done by setting the prev_id in the m.device_list_update EDU. However, + for that to work, we have to have a complete record of each change to + each device, which can add up to quite a lot of data. + + An alternative mechanism is that, if the remote server sees that it has missed + an entry in the stream_id sequence for a given user, it will request a full + list of that user's devices. Hence, we can reduce the amount of data we have to + store (and transmit in some future transaction), by clearing almost everything + for a given destination out of the database, and having the remote server + resync. + + All we need to do is make sure we keep at least one row for each + (user, destination) pair, to remind us to send a m.device_list_update EDU for + that user when the destination comes back. It doesn't matter which device + we keep. """ - yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 + yesterday = self._clock.time_msec() - prune_age def _prune_txn(txn): + # look for (user, destination) pairs which have an update older than + # the cutoff. + # + # For each pair, we also need to know the most recent stream_id, and + # an arbitrary device_id at that stream_id. select_sql = """ - SELECT destination, user_id, max(stream_id) as stream_id - FROM device_lists_outbound_pokes + SELECT + dlop1.destination, + dlop1.user_id, + MAX(dlop1.stream_id) AS stream_id, + (SELECT MIN(dlop2.device_id) AS device_id FROM + device_lists_outbound_pokes dlop2 + WHERE dlop2.destination = dlop1.destination AND + dlop2.user_id=dlop1.user_id AND + dlop2.stream_id=MAX(dlop1.stream_id) + ) + FROM device_lists_outbound_pokes dlop1 GROUP BY destination, user_id HAVING min(ts) < ? AND count(*) > 1 """ @@ -1108,14 +1225,29 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): if not rows: return + logger.info( + "Pruning old outbound device list updates for %i users/destinations: %s", + len(rows), + shortstr((row[0], row[1]) for row in rows), + ) + + # we want to keep the update with the highest stream_id for each user. + # + # there might be more than one update (with different device_ids) with the + # same stream_id, so we also delete all but one rows with the max stream id. delete_sql = """ DELETE FROM device_lists_outbound_pokes - WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? + WHERE destination = ? AND user_id = ? AND ( + stream_id < ? OR + (stream_id = ? AND device_id != ?) + ) """ - - txn.executemany( - delete_sql, ((yesterday, row[0], row[1], row[2]) for row in rows) - ) + count = 0 + for (destination, user_id, stream_id, device_id) in rows: + txn.execute( + delete_sql, (destination, user_id, stream_id, stream_id, device_id) + ) + count += txn.rowcount # Since we've deleted unsent deltas, we need to remove the entry # of last successful sent so that the prev_ids are correctly set. @@ -1125,7 +1257,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): """ txn.executemany(sql, ((row[0], row[1]) for row in rows)) - logger.info("Pruned %d device list outbound pokes", txn.rowcount) + logger.info("Pruned %d device list outbound pokes", count) return run_as_background_process( "prune_old_outbound_device_pokes", diff --git a/synapse/storage/data_stores/main/directory.py b/synapse/storage/data_stores/main/directory.py index c9e7de7d12..e1d1bc3e05 100644 --- a/synapse/storage/data_stores/main/directory.py +++ b/synapse/storage/data_stores/main/directory.py @@ -14,6 +14,7 @@ # limitations under the License. from collections import namedtuple +from typing import Optional from twisted.internet import defer @@ -159,10 +160,29 @@ class DirectoryStore(DirectoryWorkerStore): return room_id - def update_aliases_for_room(self, old_room_id, new_room_id, creator): + def update_aliases_for_room( + self, old_room_id: str, new_room_id: str, creator: Optional[str] = None, + ): + """Repoint all of the aliases for a given room, to a different room. + + Args: + old_room_id: + new_room_id: + creator: The user to record as the creator of the new mapping. + If None, the creator will be left unchanged. + """ + def _update_aliases_for_room_txn(txn): - sql = "UPDATE room_aliases SET room_id = ?, creator = ? WHERE room_id = ?" - txn.execute(sql, (new_room_id, creator, old_room_id)) + update_creator_sql = "" + sql_params = (new_room_id, old_room_id) + if creator: + update_creator_sql = ", creator = ?" + sql_params = (new_room_id, creator, old_room_id) + + sql = "UPDATE room_aliases SET room_id = ? %s WHERE room_id = ?" % ( + update_creator_sql, + ) + txn.execute(sql, sql_params) self._invalidate_cache_and_stream( txn, self.get_aliases_for_room, (old_room_id,) ) diff --git a/synapse/storage/data_stores/main/e2e_room_keys.py b/synapse/storage/data_stores/main/e2e_room_keys.py index 84594cf0a9..23f4570c4b 100644 --- a/synapse/storage/data_stores/main/e2e_room_keys.py +++ b/synapse/storage/data_stores/main/e2e_room_keys.py @@ -146,7 +146,8 @@ class EndToEndRoomKeyStore(SQLBaseStore): room_entry["sessions"][row["session_id"]] = { "first_message_index": row["first_message_index"], "forwarded_count": row["forwarded_count"], - "is_verified": row["is_verified"], + # is_verified must be returned to the client as a boolean + "is_verified": bool(row["is_verified"]), "session_data": json.loads(row["session_data"]), } diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 001a53f9b4..bcf746b7ef 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -537,7 +537,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): return result - def get_all_user_signature_changes_for_remotes(self, from_key, to_key): + def get_all_user_signature_changes_for_remotes(self, from_key, to_key, limit): """Return a list of changes from the user signature stream to notify remotes. Note that the user signature stream represents when a user signs their device with their user-signing key, which is not published to other @@ -552,13 +552,19 @@ class EndToEndKeyWorkerStore(SQLBaseStore): Deferred[list[(int,str)]] a list of `(stream_id, user_id)` """ sql = """ - SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id + SELECT stream_id, from_user_id AS user_id FROM user_signature_stream WHERE ? < stream_id AND stream_id <= ? - GROUP BY user_id + ORDER BY stream_id ASC + LIMIT ? """ return self.db.execute( - "get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key + "get_all_user_signature_changes_for_remotes", + None, + sql, + from_key, + to_key, + limit, ) diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 62d4e9f599..b99439cc37 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -173,19 +173,28 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas for event_id in initial_events } + # The sorted list of events whose auth chains we should walk. + search = [] # type: List[Tuple[int, str]] + # We need to get the depth of the initial events for sorting purposes. sql = """ SELECT depth, event_id FROM events WHERE %s - ORDER BY depth ASC """ - clause, args = make_in_list_sql_clause( - txn.database_engine, "event_id", initial_events - ) - txn.execute(sql % (clause,), args) + # the list can be huge, so let's avoid looking them all up in one massive + # query. + for batch in batch_iter(initial_events, 1000): + clause, args = make_in_list_sql_clause( + txn.database_engine, "event_id", batch + ) + txn.execute(sql % (clause,), args) - # The sorted list of events whose auth chains we should walk. - search = txn.fetchall() # type: List[Tuple[int, str]] + # I think building a temporary list with fetchall is more efficient than + # just `search.extend(txn)`, but this is unconfirmed + search.extend(txn.fetchall()) + + # sort by depth + search.sort() # Map from event to its auth events event_to_auth_events = {} # type: Dict[str, Set[str]] diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py index d593ef47b8..e71c23541d 100644 --- a/synapse/storage/data_stores/main/events.py +++ b/synapse/storage/data_stores/main/events.py @@ -1267,104 +1267,6 @@ class EventsStore( ret = yield self.db.runInteraction("count_daily_active_rooms", _count) return ret - def get_current_backfill_token(self): - """The current minimum token that backfilled events have reached""" - return -self._backfill_id_gen.get_current_token() - - def get_current_events_token(self): - """The current maximum token that events have reached""" - return self._stream_id_gen.get_current_token() - - def get_all_new_forward_event_rows(self, last_id, current_id, limit): - if last_id == current_id: - return defer.succeed([]) - - def get_all_new_forward_event_rows(txn): - sql = ( - "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" - " FROM events AS e" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " WHERE ? < stream_ordering AND stream_ordering <= ?" - " ORDER BY stream_ordering ASC" - " LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - new_event_updates = txn.fetchall() - - if len(new_event_updates) == limit: - upper_bound = new_event_updates[-1][0] - else: - upper_bound = current_id - - sql = ( - "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" - " FROM events AS e" - " INNER JOIN ex_outlier_stream USING (event_id)" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " WHERE ? < event_stream_ordering" - " AND event_stream_ordering <= ?" - " ORDER BY event_stream_ordering DESC" - ) - txn.execute(sql, (last_id, upper_bound)) - new_event_updates.extend(txn) - - return new_event_updates - - return self.db.runInteraction( - "get_all_new_forward_event_rows", get_all_new_forward_event_rows - ) - - def get_all_new_backfill_event_rows(self, last_id, current_id, limit): - if last_id == current_id: - return defer.succeed([]) - - def get_all_new_backfill_event_rows(txn): - sql = ( - "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" - " FROM events AS e" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " WHERE ? > stream_ordering AND stream_ordering >= ?" - " ORDER BY stream_ordering ASC" - " LIMIT ?" - ) - txn.execute(sql, (-last_id, -current_id, limit)) - new_event_updates = txn.fetchall() - - if len(new_event_updates) == limit: - upper_bound = new_event_updates[-1][0] - else: - upper_bound = current_id - - sql = ( - "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," - " state_key, redacts, relates_to_id" - " FROM events AS e" - " INNER JOIN ex_outlier_stream USING (event_id)" - " LEFT JOIN redactions USING (event_id)" - " LEFT JOIN state_events USING (event_id)" - " LEFT JOIN event_relations USING (event_id)" - " WHERE ? > event_stream_ordering" - " AND event_stream_ordering >= ?" - " ORDER BY event_stream_ordering DESC" - ) - txn.execute(sql, (-last_id, -upper_bound)) - new_event_updates.extend(txn.fetchall()) - - return new_event_updates - - return self.db.runInteraction( - "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows - ) - @cached(num_args=5, max_entries=10) def get_all_new_events( self, @@ -1850,22 +1752,6 @@ class EventsStore( return (int(res["topological_ordering"]), int(res["stream_ordering"])) - def get_all_updated_current_state_deltas(self, from_token, to_token, limit): - def get_all_updated_current_state_deltas_txn(txn): - sql = """ - SELECT stream_id, room_id, type, state_key, event_id - FROM current_state_delta_stream - WHERE ? < stream_id AND stream_id <= ? - ORDER BY stream_id ASC LIMIT ? - """ - txn.execute(sql, (from_token, to_token, limit)) - return txn.fetchall() - - return self.db.runInteraction( - "get_all_updated_current_state_deltas", - get_all_updated_current_state_deltas_txn, - ) - def insert_labels_for_event_txn( self, txn, event_id, labels, room_id, topological_ordering ): diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py index ca237c6f12..73df6b33ba 100644 --- a/synapse/storage/data_stores/main/events_worker.py +++ b/synapse/storage/data_stores/main/events_worker.py @@ -19,7 +19,7 @@ import itertools import logging import threading from collections import namedtuple -from typing import List, Optional +from typing import List, Optional, Tuple from canonicaljson import json from constantly import NamedConstant, Names @@ -35,7 +35,7 @@ from synapse.api.room_versions import ( ) from synapse.events import make_event_from_dict from synapse.events.utils import prune_event -from synapse.logging.context import LoggingContext, PreserveLoggingContext +from synapse.logging.context import PreserveLoggingContext, current_context from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.database import Database @@ -409,7 +409,7 @@ class EventsWorkerStore(SQLBaseStore): missing_events_ids = [e for e in event_ids if e not in event_entry_map] if missing_events_ids: - log_ctx = LoggingContext.current_context() + log_ctx = current_context() log_ctx.record_event_fetch(len(missing_events_ids)) # Note that _get_events_from_db is also responsible for turning db rows @@ -632,7 +632,7 @@ class EventsWorkerStore(SQLBaseStore): event_map[event_id] = original_ev - # finally, we can decide whether each one nededs redacting, and build + # finally, we can decide whether each one needs redacting, and build # the cache entries. result_map = {} for event_id, original_ev in event_map.items(): @@ -963,3 +963,195 @@ class EventsWorkerStore(SQLBaseStore): complexity_v1 = round(state_events / 500, 2) return {"v1": complexity_v1} + + def get_current_backfill_token(self): + """The current minimum token that backfilled events have reached""" + return -self._backfill_id_gen.get_current_token() + + def get_current_events_token(self): + """The current maximum token that events have reached""" + return self._stream_id_gen.get_current_token() + + def get_all_new_forward_event_rows(self, last_id, current_id, limit): + """Returns new events, for the Events replication stream + + Args: + last_id: the last stream_id from the previous batch. + current_id: the maximum stream_id to return up to + limit: the maximum number of rows to return + + Returns: Deferred[List[Tuple]] + a list of events stream rows. Each tuple consists of a stream id as + the first element, followed by fields suitable for casting into an + EventsStreamRow. + """ + + def get_all_new_forward_event_rows(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts, relates_to_id" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? < stream_ordering AND stream_ordering <= ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + return txn.fetchall() + + return self.db.runInteraction( + "get_all_new_forward_event_rows", get_all_new_forward_event_rows + ) + + def get_ex_outlier_stream_rows(self, last_id, current_id): + """Returns de-outliered events, for the Events replication stream + + Args: + last_id: the last stream_id from the previous batch. + current_id: the maximum stream_id to return up to + + Returns: Deferred[List[Tuple]] + a list of events stream rows. Each tuple consists of a stream id as + the first element, followed by fields suitable for casting into an + EventsStreamRow. + """ + + def get_ex_outlier_stream_rows_txn(txn): + sql = ( + "SELECT event_stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts, relates_to_id" + " FROM events AS e" + " INNER JOIN ex_outlier_stream USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? < event_stream_ordering" + " AND event_stream_ordering <= ?" + " ORDER BY event_stream_ordering ASC" + ) + + txn.execute(sql, (last_id, current_id)) + return txn.fetchall() + + return self.db.runInteraction( + "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn + ) + + def get_all_new_backfill_event_rows(self, last_id, current_id, limit): + if last_id == current_id: + return defer.succeed([]) + + def get_all_new_backfill_event_rows(txn): + sql = ( + "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts, relates_to_id" + " FROM events AS e" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? > stream_ordering AND stream_ordering >= ?" + " ORDER BY stream_ordering ASC" + " LIMIT ?" + ) + txn.execute(sql, (-last_id, -current_id, limit)) + new_event_updates = txn.fetchall() + + if len(new_event_updates) == limit: + upper_bound = new_event_updates[-1][0] + else: + upper_bound = current_id + + sql = ( + "SELECT -event_stream_ordering, e.event_id, e.room_id, e.type," + " state_key, redacts, relates_to_id" + " FROM events AS e" + " INNER JOIN ex_outlier_stream USING (event_id)" + " LEFT JOIN redactions USING (event_id)" + " LEFT JOIN state_events USING (event_id)" + " LEFT JOIN event_relations USING (event_id)" + " WHERE ? > event_stream_ordering" + " AND event_stream_ordering >= ?" + " ORDER BY event_stream_ordering DESC" + ) + txn.execute(sql, (-last_id, -upper_bound)) + new_event_updates.extend(txn.fetchall()) + + return new_event_updates + + return self.db.runInteraction( + "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows + ) + + async def get_all_updated_current_state_deltas( + self, from_token: int, to_token: int, target_row_count: int + ) -> Tuple[List[Tuple], int, bool]: + """Fetch updates from current_state_delta_stream + + Args: + from_token: The previous stream token. Updates from this stream id will + be excluded. + + to_token: The current stream token (ie the upper limit). Updates up to this + stream id will be included (modulo the 'limit' param) + + target_row_count: The number of rows to try to return. If more rows are + available, we will set 'limited' in the result. In the event of a large + batch, we may return more rows than this. + Returns: + A triplet `(updates, new_last_token, limited)`, where: + * `updates` is a list of database tuples. + * `new_last_token` is the new position in stream. + * `limited` is whether there are more updates to fetch. + """ + + def get_all_updated_current_state_deltas_txn(txn): + sql = """ + SELECT stream_id, room_id, type, state_key, event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC LIMIT ? + """ + txn.execute(sql, (from_token, to_token, target_row_count)) + return txn.fetchall() + + def get_deltas_for_stream_id_txn(txn, stream_id): + sql = """ + SELECT stream_id, room_id, type, state_key, event_id + FROM current_state_delta_stream + WHERE stream_id = ? + """ + txn.execute(sql, [stream_id]) + return txn.fetchall() + + # we need to make sure that, for every stream id in the results, we get *all* + # the rows with that stream id. + + rows = await self.db.runInteraction( + "get_all_updated_current_state_deltas", + get_all_updated_current_state_deltas_txn, + ) # type: List[Tuple] + + # if we've got fewer rows than the limit, we're good + if len(rows) < target_row_count: + return rows, to_token, False + + # we hit the limit, so reduce the upper limit so that we exclude the stream id + # of the last row in the result. + assert rows[-1][0] <= to_token + to_token = rows[-1][0] - 1 + + # search backwards through the list for the point to truncate + for idx in range(len(rows) - 1, 0, -1): + if rows[idx - 1][0] <= to_token: + return rows[:idx], to_token, True + + # bother. We didn't get a full set of changes for even a single + # stream id. let's run the query again, without a row limit, but for + # just one stream id. + to_token += 1 + rows = await self.db.runInteraction( + "get_deltas_for_stream_id", get_deltas_for_stream_id_txn, to_token + ) + return rows, to_token, True diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py index 80ca36dedf..8aecd414c2 100644 --- a/synapse/storage/data_stores/main/media_repository.py +++ b/synapse/storage/data_stores/main/media_repository.py @@ -340,7 +340,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "get_expired_url_cache", _get_expired_url_cache_txn ) - def delete_url_cache(self, media_ids): + async def delete_url_cache(self, media_ids): if len(media_ids) == 0: return @@ -349,7 +349,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): def _delete_url_cache_txn(txn): txn.executemany(sql, [(media_id,) for media_id in media_ids]) - return self.db.runInteraction("delete_url_cache", _delete_url_cache_txn) + return await self.db.runInteraction("delete_url_cache", _delete_url_cache_txn) def get_url_cache_media_before(self, before_ts): sql = ( @@ -367,7 +367,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "get_url_cache_media_before", _get_url_cache_media_before_txn ) - def delete_url_cache_media(self, media_ids): + async def delete_url_cache_media(self, media_ids): if len(media_ids) == 0: return @@ -380,6 +380,6 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): txn.executemany(sql, [(media_id,) for media_id in media_ids]) - return self.db.runInteraction( + return await self.db.runInteraction( "delete_url_cache_media", _delete_url_cache_media_txn ) diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py index 604c8b7ddd..dab31e0c2d 100644 --- a/synapse/storage/data_stores/main/presence.py +++ b/synapse/storage/data_stores/main/presence.py @@ -60,7 +60,7 @@ class PresenceStore(SQLBaseStore): "status_msg": state.status_msg, "currently_active": state.currently_active, } - for state in presence_states + for stream_id, state in zip(stream_orderings, presence_states) ], ) @@ -73,19 +73,22 @@ class PresenceStore(SQLBaseStore): ) txn.execute(sql + clause, [stream_id] + list(args)) - def get_all_presence_updates(self, last_id, current_id): + def get_all_presence_updates(self, last_id, current_id, limit): if last_id == current_id: return defer.succeed([]) def get_all_presence_updates_txn(txn): - sql = ( - "SELECT stream_id, user_id, state, last_active_ts," - " last_federation_update_ts, last_user_sync_ts, status_msg," - " currently_active" - " FROM presence_stream" - " WHERE ? < stream_id AND stream_id <= ?" - ) - txn.execute(sql, (last_id, current_id)) + sql = """ + SELECT stream_id, user_id, state, last_active_ts, + last_federation_update_ts, last_user_sync_ts, + status_msg, + currently_active + FROM presence_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ + txn.execute(sql, (last_id, current_id, limit)) return txn.fetchall() return self.db.runInteraction( diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index 62ac88d9f2..b3faafa0a4 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -41,6 +41,7 @@ def _load_rules(rawrules, enabled_map): rule = dict(rawrule) rule["conditions"] = json.loads(rawrule["conditions"]) rule["actions"] = json.loads(rawrule["actions"]) + rule["default"] = False ruleslist.append(rule) # We're going to be mutating this a lot, so do a deep copy @@ -333,6 +334,26 @@ class PushRulesWorkerStore( results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled return results + def get_all_push_rule_updates(self, last_id, current_id, limit): + """Get all the push rules changes that have happend on the server""" + if last_id == current_id: + return defer.succeed([]) + + def get_all_push_rule_updates_txn(txn): + sql = ( + "SELECT stream_id, event_stream_ordering, user_id, rule_id," + " op, priority_class, priority, conditions, actions" + " FROM push_rules_stream" + " WHERE ? < stream_id AND stream_id <= ?" + " ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, current_id, limit)) + return txn.fetchall() + + return self.db.runInteraction( + "get_all_push_rule_updates", get_all_push_rule_updates_txn + ) + class PushRuleStore(PushRulesWorkerStore): @defer.inlineCallbacks @@ -684,26 +705,6 @@ class PushRuleStore(PushRulesWorkerStore): self.push_rules_stream_cache.entity_has_changed, user_id, stream_id ) - def get_all_push_rule_updates(self, last_id, current_id, limit): - """Get all the push rules changes that have happend on the server""" - if last_id == current_id: - return defer.succeed([]) - - def get_all_push_rule_updates_txn(txn): - sql = ( - "SELECT stream_id, event_stream_ordering, user_id, rule_id," - " op, priority_class, priority, conditions, actions" - " FROM push_rules_stream" - " WHERE ? < stream_id AND stream_id <= ?" - " ORDER BY stream_id ASC LIMIT ?" - ) - txn.execute(sql, (last_id, current_id, limit)) - return txn.fetchall() - - return self.db.runInteraction( - "get_all_push_rule_updates", get_all_push_rule_updates_txn - ) - def get_push_rules_stream_token(self): """Get the position of the push rules stream. Returns a pair of a stream id for the push_rules stream and the diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py index 3e53c8568a..efcdd2100b 100644 --- a/synapse/storage/data_stores/main/registration.py +++ b/synapse/storage/data_stores/main/registration.py @@ -273,8 +273,7 @@ class RegistrationWorkerStore(SQLBaseStore): desc="delete_account_validity_for_user", ) - @defer.inlineCallbacks - def is_server_admin(self, user): + async def is_server_admin(self, user): """Determines if a user is an admin of this homeserver. Args: @@ -283,7 +282,7 @@ class RegistrationWorkerStore(SQLBaseStore): Returns (bool): true iff the user is a server admin, false otherwise. """ - res = yield self.db.simple_select_one_onecol( + res = await self.db.simple_select_one_onecol( table="users", keyvalues={"name": user.to_string()}, retcol="admin", diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py index e6c10c6316..147eba1df7 100644 --- a/synapse/storage/data_stores/main/room.py +++ b/synapse/storage/data_stores/main/room.py @@ -52,12 +52,28 @@ class RoomSortOrder(Enum): """ Enum to define the sorting method used when returning rooms with get_rooms_paginate - ALPHABETICAL = sort rooms alphabetically by name - SIZE = sort rooms by membership size, highest to lowest + NAME = sort rooms alphabetically by name + JOINED_MEMBERS = sort rooms by membership size, highest to lowest """ + # ALPHABETICAL and SIZE are deprecated. + # ALPHABETICAL is the same as NAME. ALPHABETICAL = "alphabetical" + # SIZE is the same as JOINED_MEMBERS. SIZE = "size" + NAME = "name" + CANONICAL_ALIAS = "canonical_alias" + JOINED_MEMBERS = "joined_members" + JOINED_LOCAL_MEMBERS = "joined_local_members" + VERSION = "version" + CREATOR = "creator" + ENCRYPTION = "encryption" + FEDERATABLE = "federatable" + PUBLIC = "public" + JOIN_RULES = "join_rules" + GUEST_ACCESS = "guest_access" + HISTORY_VISIBILITY = "history_visibility" + STATE_EVENTS = "state_events" class RoomWorkerStore(SQLBaseStore): @@ -329,12 +345,52 @@ class RoomWorkerStore(SQLBaseStore): # Set ordering if RoomSortOrder(order_by) == RoomSortOrder.SIZE: + # Deprecated in favour of RoomSortOrder.JOINED_MEMBERS order_by_column = "curr.joined_members" order_by_asc = False elif RoomSortOrder(order_by) == RoomSortOrder.ALPHABETICAL: - # Sort alphabetically + # Deprecated in favour of RoomSortOrder.NAME order_by_column = "state.name" order_by_asc = True + elif RoomSortOrder(order_by) == RoomSortOrder.NAME: + order_by_column = "state.name" + order_by_asc = True + elif RoomSortOrder(order_by) == RoomSortOrder.CANONICAL_ALIAS: + order_by_column = "state.canonical_alias" + order_by_asc = True + elif RoomSortOrder(order_by) == RoomSortOrder.JOINED_MEMBERS: + order_by_column = "curr.joined_members" + order_by_asc = False + elif RoomSortOrder(order_by) == RoomSortOrder.JOINED_LOCAL_MEMBERS: + order_by_column = "curr.local_users_in_room" + order_by_asc = False + elif RoomSortOrder(order_by) == RoomSortOrder.VERSION: + order_by_column = "rooms.room_version" + order_by_asc = False + elif RoomSortOrder(order_by) == RoomSortOrder.CREATOR: + order_by_column = "rooms.creator" + order_by_asc = True + elif RoomSortOrder(order_by) == RoomSortOrder.ENCRYPTION: + order_by_column = "state.encryption" + order_by_asc = True + elif RoomSortOrder(order_by) == RoomSortOrder.FEDERATABLE: + order_by_column = "state.is_federatable" + order_by_asc = True + elif RoomSortOrder(order_by) == RoomSortOrder.PUBLIC: + order_by_column = "rooms.is_public" + order_by_asc = True + elif RoomSortOrder(order_by) == RoomSortOrder.JOIN_RULES: + order_by_column = "state.join_rules" + order_by_asc = True + elif RoomSortOrder(order_by) == RoomSortOrder.GUEST_ACCESS: + order_by_column = "state.guest_access" + order_by_asc = True + elif RoomSortOrder(order_by) == RoomSortOrder.HISTORY_VISIBILITY: + order_by_column = "state.history_visibility" + order_by_asc = True + elif RoomSortOrder(order_by) == RoomSortOrder.STATE_EVENTS: + order_by_column = "curr.current_state_events" + order_by_asc = False else: raise StoreError( 500, "Incorrect value for order_by provided: %s" % order_by @@ -349,9 +405,13 @@ class RoomWorkerStore(SQLBaseStore): # for, and another query for getting the total number of events that could be # returned. Thus allowing us to see if there are more events to paginate through info_sql = """ - SELECT state.room_id, state.name, state.canonical_alias, curr.joined_members + SELECT state.room_id, state.name, state.canonical_alias, curr.joined_members, + curr.local_users_in_room, rooms.room_version, rooms.creator, + state.encryption, state.is_federatable, rooms.is_public, state.join_rules, + state.guest_access, state.history_visibility, curr.current_state_events FROM room_stats_state state INNER JOIN room_stats_current curr USING (room_id) + INNER JOIN rooms USING (room_id) %s ORDER BY %s %s LIMIT ? @@ -389,6 +449,16 @@ class RoomWorkerStore(SQLBaseStore): "name": room[1], "canonical_alias": room[2], "joined_members": room[3], + "joined_local_members": room[4], + "version": room[5], + "creator": room[6], + "encryption": room[7], + "federatable": room[8], + "public": room[9], + "join_rules": room[10], + "guest_access": room[11], + "history_visibility": room[12], + "state_events": room[13], } ) @@ -732,6 +802,26 @@ class RoomWorkerStore(SQLBaseStore): return total_media_quarantined + def get_all_new_public_rooms(self, prev_id, current_id, limit): + def get_all_new_public_rooms(txn): + sql = """ + SELECT stream_id, room_id, visibility, appservice_id, network_id + FROM public_room_list_stream + WHERE stream_id > ? AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """ + + txn.execute(sql, (prev_id, current_id, limit)) + return txn.fetchall() + + if prev_id == current_id: + return defer.succeed([]) + + return self.db.runInteraction( + "get_all_new_public_rooms", get_all_new_public_rooms + ) + class RoomBackgroundUpdateStore(SQLBaseStore): REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory" @@ -1249,26 +1339,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): def get_current_public_room_stream_id(self): return self._public_room_id_gen.get_current_token() - def get_all_new_public_rooms(self, prev_id, current_id, limit): - def get_all_new_public_rooms(txn): - sql = """ - SELECT stream_id, room_id, visibility, appservice_id, network_id - FROM public_room_list_stream - WHERE stream_id > ? AND stream_id <= ? - ORDER BY stream_id ASC - LIMIT ? - """ - - txn.execute(sql, (prev_id, current_id, limit)) - return txn.fetchall() - - if prev_id == current_id: - return defer.succeed([]) - - return self.db.runInteraction( - "get_all_new_public_rooms", get_all_new_public_rooms - ) - @defer.inlineCallbacks def block_room(self, room_id, user_id): """Marks the room as blocked. Can be called multiple times. diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py index d5bd0cb5cf..e626b7f6f7 100644 --- a/synapse/storage/data_stores/main/roommember.py +++ b/synapse/storage/data_stores/main/roommember.py @@ -576,7 +576,8 @@ class RoomMemberWorkerStore(EventsWorkerStore): if key[0] == EventTypes.Member ] for etype, state_key in context.delta_ids: - users_in_room.pop(state_key, None) + if etype == EventTypes.Member: + users_in_room.pop(state_key, None) # We check if we have any of the member event ids in the event cache # before we ask the DB diff --git a/synapse/storage/data_stores/main/schema/delta/56/stats_separated.sql b/synapse/storage/data_stores/main/schema/delta/56/stats_separated.sql index 163529c071..bbdde121e8 100644 --- a/synapse/storage/data_stores/main/schema/delta/56/stats_separated.sql +++ b/synapse/storage/data_stores/main/schema/delta/56/stats_separated.sql @@ -35,9 +35,13 @@ DELETE FROM background_updates WHERE update_name IN ( 'populate_stats_cleanup' ); +-- this relies on current_state_events.membership having been populated, so add +-- a dependency on current_state_events_membership. INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES - ('populate_stats_process_rooms', '{}', ''); + ('populate_stats_process_rooms', '{}', 'current_state_events_membership'); +-- this also relies on current_state_events.membership having been populated, but +-- we get that as a side-effect of depending on populate_stats_process_rooms. INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES ('populate_stats_process_users', '{}', 'populate_stats_process_rooms'); diff --git a/synapse/storage/data_stores/main/schema/delta/57/remove_sent_outbound_pokes.sql b/synapse/storage/data_stores/main/schema/delta/57/remove_sent_outbound_pokes.sql new file mode 100644 index 0000000000..133d80af35 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/57/remove_sent_outbound_pokes.sql @@ -0,0 +1,21 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- we no longer keep sent outbound device pokes in the db; clear them out +-- so that we don't have to worry about them. +-- +-- This is a sequence scan, but it doesn't take too long. + +DELETE FROM device_lists_outbound_pokes WHERE sent; diff --git a/synapse/storage/data_stores/main/schema/delta/58/02remove_dup_outbound_pokes.sql b/synapse/storage/data_stores/main/schema/delta/58/02remove_dup_outbound_pokes.sql new file mode 100644 index 0000000000..fdc39e9ba5 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/02remove_dup_outbound_pokes.sql @@ -0,0 +1,22 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + /* for some reason, we have accumulated duplicate entries in + * device_lists_outbound_pokes, which makes prune_outbound_device_list_pokes less + * efficient. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) + VALUES (5800, 'remove_dup_outbound_pokes', '{}'); diff --git a/synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql b/synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql new file mode 100644 index 0000000000..dcb593fc2d --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/58/03persist_ui_auth.sql @@ -0,0 +1,36 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE IF NOT EXISTS ui_auth_sessions( + session_id TEXT NOT NULL, -- The session ID passed to the client. + creation_time BIGINT NOT NULL, -- The time this session was created (epoch time in milliseconds). + serverdict TEXT NOT NULL, -- A JSON dictionary of arbitrary data added by Synapse. + clientdict TEXT NOT NULL, -- A JSON dictionary of arbitrary data from the client. + uri TEXT NOT NULL, -- The URI the UI authentication session is using. + method TEXT NOT NULL, -- The HTTP method the UI authentication session is using. + -- The clientdict, uri, and method make up an tuple that must be immutable + -- throughout the lifetime of the UI Auth session. + description TEXT NOT NULL, -- A human readable description of the operation which caused the UI Auth flow to occur. + UNIQUE (session_id) +); + +CREATE TABLE IF NOT EXISTS ui_auth_sessions_credentials( + session_id TEXT NOT NULL, -- The corresponding UI Auth session. + stage_type TEXT NOT NULL, -- The stage type. + result TEXT NOT NULL, -- The result of the stage verification, stored as JSON. + UNIQUE (session_id, stage_type), + FOREIGN KEY (session_id) + REFERENCES ui_auth_sessions (session_id) +); diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py index ada5cce6c2..e89f0bffb5 100644 --- a/synapse/storage/data_stores/main/stream.py +++ b/synapse/storage/data_stores/main/stream.py @@ -481,11 +481,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_id, limit, end_token ) - logger.debug("stream before") events = yield self.get_events_as_list( [r.event_id for r in rows], get_prev_content=True ) - logger.debug("stream after") self._set_before_and_after(events, rows) diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py new file mode 100644 index 0000000000..1d8ee22fb1 --- /dev/null +++ b/synapse/storage/data_stores/main/ui_auth.py @@ -0,0 +1,300 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +from typing import Any, Dict, Optional, Union + +import attr + +import synapse.util.stringutils as stringutils +from synapse.api.errors import StoreError +from synapse.storage._base import SQLBaseStore +from synapse.types import JsonDict + + +@attr.s +class UIAuthSessionData: + session_id = attr.ib(type=str) + # The dictionary from the client root level, not the 'auth' key. + clientdict = attr.ib(type=JsonDict) + # The URI and method the session was intiatied with. These are checked at + # each stage of the authentication to ensure that the asked for operation + # has not changed. + uri = attr.ib(type=str) + method = attr.ib(type=str) + # A string description of the operation that the current authentication is + # authorising. + description = attr.ib(type=str) + + +class UIAuthWorkerStore(SQLBaseStore): + """ + Manage user interactive authentication sessions. + """ + + async def create_ui_auth_session( + self, clientdict: JsonDict, uri: str, method: str, description: str, + ) -> UIAuthSessionData: + """ + Creates a new user interactive authentication session. + + The session can be used to track the stages necessary to authenticate a + user across multiple HTTP requests. + + Args: + clientdict: + The dictionary from the client root level, not the 'auth' key. + uri: + The URI this session was initiated with, this is checked at each + stage of the authentication to ensure that the asked for + operation has not changed. + method: + The method this session was initiated with, this is checked at each + stage of the authentication to ensure that the asked for + operation has not changed. + description: + A string description of the operation that the current + authentication is authorising. + Returns: + The newly created session. + Raises: + StoreError if a unique session ID cannot be generated. + """ + # The clientdict gets stored as JSON. + clientdict_json = json.dumps(clientdict) + + # autogen a session ID and try to create it. We may clash, so just + # try a few times till one goes through, giving up eventually. + attempts = 0 + while attempts < 5: + session_id = stringutils.random_string(24) + + try: + await self.db.simple_insert( + table="ui_auth_sessions", + values={ + "session_id": session_id, + "clientdict": clientdict_json, + "uri": uri, + "method": method, + "description": description, + "serverdict": "{}", + "creation_time": self.hs.get_clock().time_msec(), + }, + desc="create_ui_auth_session", + ) + return UIAuthSessionData( + session_id, clientdict, uri, method, description + ) + except self.db.engine.module.IntegrityError: + attempts += 1 + raise StoreError(500, "Couldn't generate a session ID.") + + async def get_ui_auth_session(self, session_id: str) -> UIAuthSessionData: + """Retrieve a UI auth session. + + Args: + session_id: The ID of the session. + Returns: + A dict containing the device information. + Raises: + StoreError if the session is not found. + """ + result = await self.db.simple_select_one( + table="ui_auth_sessions", + keyvalues={"session_id": session_id}, + retcols=("clientdict", "uri", "method", "description"), + desc="get_ui_auth_session", + ) + + result["clientdict"] = json.loads(result["clientdict"]) + + return UIAuthSessionData(session_id, **result) + + async def mark_ui_auth_stage_complete( + self, session_id: str, stage_type: str, result: Union[str, bool, JsonDict], + ): + """ + Mark a session stage as completed. + + Args: + session_id: The ID of the corresponding session. + stage_type: The completed stage type. + result: The result of the stage verification. + Raises: + StoreError if the session cannot be found. + """ + # Add (or update) the results of the current stage to the database. + # + # Note that we need to allow for the same stage to complete multiple + # times here so that registration is idempotent. + try: + await self.db.simple_upsert( + table="ui_auth_sessions_credentials", + keyvalues={"session_id": session_id, "stage_type": stage_type}, + values={"result": json.dumps(result)}, + desc="mark_ui_auth_stage_complete", + ) + except self.db.engine.module.IntegrityError: + raise StoreError(400, "Unknown session ID: %s" % (session_id,)) + + async def get_completed_ui_auth_stages( + self, session_id: str + ) -> Dict[str, Union[str, bool, JsonDict]]: + """ + Retrieve the completed stages of a UI authentication session. + + Args: + session_id: The ID of the session. + Returns: + The completed stages mapped to the result of the verification of + that auth-type. + """ + results = {} + for row in await self.db.simple_select_list( + table="ui_auth_sessions_credentials", + keyvalues={"session_id": session_id}, + retcols=("stage_type", "result"), + desc="get_completed_ui_auth_stages", + ): + results[row["stage_type"]] = json.loads(row["result"]) + + return results + + async def set_ui_auth_clientdict( + self, session_id: str, clientdict: JsonDict + ) -> None: + """ + Store an updated clientdict for a given session ID. + + Args: + session_id: The ID of this session as returned from check_auth + clientdict: + The dictionary from the client root level, not the 'auth' key. + """ + # The clientdict gets stored as JSON. + clientdict_json = json.dumps(clientdict) + + self.db.simple_update_one( + table="ui_auth_sessions", + keyvalues={"session_id": session_id}, + updatevalues={"clientdict": clientdict_json}, + desc="set_ui_auth_client_dict", + ) + + async def set_ui_auth_session_data(self, session_id: str, key: str, value: Any): + """ + Store a key-value pair into the sessions data associated with this + request. This data is stored server-side and cannot be modified by + the client. + + Args: + session_id: The ID of this session as returned from check_auth + key: The key to store the data under + value: The data to store + Raises: + StoreError if the session cannot be found. + """ + await self.db.runInteraction( + "set_ui_auth_session_data", + self._set_ui_auth_session_data_txn, + session_id, + key, + value, + ) + + def _set_ui_auth_session_data_txn(self, txn, session_id: str, key: str, value: Any): + # Get the current value. + result = self.db.simple_select_one_txn( + txn, + table="ui_auth_sessions", + keyvalues={"session_id": session_id}, + retcols=("serverdict",), + ) + + # Update it and add it back to the database. + serverdict = json.loads(result["serverdict"]) + serverdict[key] = value + + self.db.simple_update_one_txn( + txn, + table="ui_auth_sessions", + keyvalues={"session_id": session_id}, + updatevalues={"serverdict": json.dumps(serverdict)}, + ) + + async def get_ui_auth_session_data( + self, session_id: str, key: str, default: Optional[Any] = None + ) -> Any: + """ + Retrieve data stored with set_session_data + + Args: + session_id: The ID of this session as returned from check_auth + key: The key to store the data under + default: Value to return if the key has not been set + Raises: + StoreError if the session cannot be found. + """ + result = await self.db.simple_select_one( + table="ui_auth_sessions", + keyvalues={"session_id": session_id}, + retcols=("serverdict",), + desc="get_ui_auth_session_data", + ) + + serverdict = json.loads(result["serverdict"]) + + return serverdict.get(key, default) + + +class UIAuthStore(UIAuthWorkerStore): + def delete_old_ui_auth_sessions(self, expiration_time: int): + """ + Remove sessions which were last used earlier than the expiration time. + + Args: + expiration_time: The latest time that is still considered valid. + This is an epoch time in milliseconds. + + """ + return self.db.runInteraction( + "delete_old_ui_auth_sessions", + self._delete_old_ui_auth_sessions_txn, + expiration_time, + ) + + def _delete_old_ui_auth_sessions_txn(self, txn, expiration_time: int): + # Get the expired sessions. + sql = "SELECT session_id FROM ui_auth_sessions WHERE creation_time <= ?" + txn.execute(sql, [expiration_time]) + session_ids = [r[0] for r in txn.fetchall()] + + # Delete the corresponding completed credentials. + self.db.simple_delete_many_txn( + txn, + table="ui_auth_sessions_credentials", + column="session_id", + iterable=session_ids, + keyvalues={}, + ) + + # Finally, delete the sessions. + self.db.simple_delete_many_txn( + txn, + table="ui_auth_sessions", + column="session_id", + iterable=session_ids, + keyvalues={}, + ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index e61595336c..50f475bfd3 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -17,7 +17,17 @@ import logging import time from time import monotonic as monotonic_time -from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple +from typing import ( + Any, + Callable, + Dict, + Iterable, + Iterator, + List, + Optional, + Tuple, + TypeVar, +) from six import iteritems, iterkeys, itervalues from six.moves import intern, range @@ -32,6 +42,7 @@ from synapse.config.database import DatabaseConnectionConfig from synapse.logging.context import ( LoggingContext, LoggingContextOrSentinel, + current_context, make_deferred_yieldable, ) from synapse.metrics.background_process_metrics import run_as_background_process @@ -201,9 +212,9 @@ class LoggingTransaction: def executemany(self, sql: str, *args: Any): self._do_execute(self.txn.executemany, sql, *args) - def _make_sql_one_line(self, sql): + def _make_sql_one_line(self, sql: str) -> str: "Strip newlines out of SQL so that the loggers in the DB are on one line" - return " ".join(l.strip() for l in sql.splitlines() if l.strip()) + return " ".join(line.strip() for line in sql.splitlines() if line.strip()) def _do_execute(self, func, sql, *args): sql = self._make_sql_one_line(sql) @@ -483,7 +494,7 @@ class Database(object): end = monotonic_time() duration = end - start - LoggingContext.current_context().add_database_transaction(duration) + current_context().add_database_transaction(duration) transaction_logger.debug("[TXN END] {%s} %f sec", name, duration) @@ -510,7 +521,7 @@ class Database(object): after_callbacks = [] # type: List[_CallbackListEntry] exception_callbacks = [] # type: List[_CallbackListEntry] - if LoggingContext.current_context() == LoggingContext.sentinel: + if not current_context(): logger.warning("Starting db txn '%s' from sentinel context", desc) try: @@ -547,10 +558,8 @@ class Database(object): Returns: Deferred: The result of func """ - parent_context = ( - LoggingContext.current_context() - ) # type: Optional[LoggingContextOrSentinel] - if parent_context == LoggingContext.sentinel: + parent_context = current_context() # type: Optional[LoggingContextOrSentinel] + if not parent_context: logger.warning( "Starting db connection from sentinel context: metrics will be lost" ) @@ -1558,3 +1567,74 @@ def make_in_list_sql_clause( return "%s = ANY(?)" % (column,), [list(iterable)] else: return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable) + + +KV = TypeVar("KV") + + +def make_tuple_comparison_clause( + database_engine: BaseDatabaseEngine, keys: List[Tuple[str, KV]] +) -> Tuple[str, List[KV]]: + """Returns a tuple comparison SQL clause + + Depending what the SQL engine supports, builds a SQL clause that looks like either + "(a, b) > (?, ?)", or "(a > ?) OR (a == ? AND b > ?)". + + Args: + database_engine + keys: A set of (column, value) pairs to be compared. + + Returns: + A tuple of SQL query and the args + """ + if database_engine.supports_tuple_comparison: + return ( + "(%s) > (%s)" % (",".join(k[0] for k in keys), ",".join("?" for _ in keys)), + [k[1] for k in keys], + ) + + # we want to build a clause + # (a > ?) OR + # (a == ? AND b > ?) OR + # (a == ? AND b == ? AND c > ?) + # ... + # (a == ? AND b == ? AND ... AND z > ?) + # + # or, equivalently: + # + # (a > ? OR (a == ? AND + # (b > ? OR (b == ? AND + # ... + # (y > ? OR (y == ? AND + # z > ? + # )) + # ... + # )) + # )) + # + # which itself is equivalent to (and apparently easier for the query optimiser): + # + # (a >= ? AND (a > ? OR + # (b >= ? AND (b > ? OR + # ... + # (y >= ? AND (y > ? OR + # z > ? + # )) + # ... + # )) + # )) + # + # + + clause = "" + args = [] # type: List[KV] + for k, v in keys[:-1]: + clause = clause + "(%s >= ? AND (%s > ? OR " % (k, k) + args.extend([v, v]) + + (k, v) = keys[-1] + clause += "%s > ?" % (k,) + args.append(v) + + clause += "))" * (len(keys) - 1) + return clause, args diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index 3bc2e8b986..215a949442 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -85,6 +85,7 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]): prepare_database(db_conn, self, config=None) db_conn.create_function("rank", 1, _rank) + db_conn.execute("PRAGMA foreign_keys = ON;") def is_deadlock(self, error): return False diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 6cb7d4b922..1712932f31 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -29,7 +29,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 57 +SCHEMA_VERSION = 58 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/58/00background_update_ordering.sql b/synapse/storage/schema/delta/58/00background_update_ordering.sql new file mode 100644 index 0000000000..02dae587cc --- /dev/null +++ b/synapse/storage/schema/delta/58/00background_update_ordering.sql @@ -0,0 +1,19 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* add an "ordering" column to background_updates, which can be used to sort them + to achieve some level of consistency. */ + +ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0; |