From a7bdf98d01d2225a479753a85ba81adf02b16a32 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Aug 2020 21:38:57 +0100 Subject: Rename database classes to make some sense (#8033) --- synapse/storage/databases/main/search.py | 710 +++++++++++++++++++++++++++++++ 1 file changed, 710 insertions(+) create mode 100644 synapse/storage/databases/main/search.py (limited to 'synapse/storage/databases/main/search.py') diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py new file mode 100644 index 0000000000..2162d0712d --- /dev/null +++ b/synapse/storage/databases/main/search.py @@ -0,0 +1,710 @@ +# -*- coding: utf-8 -*- +# Copyright 2015, 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import re +from collections import namedtuple + +from twisted.internet import defer + +from synapse.api.errors import SynapseError +from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause +from synapse.storage.database import DatabasePool +from synapse.storage.databases.main.events_worker import EventRedactBehaviour +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + +logger = logging.getLogger(__name__) + +SearchEntry = namedtuple( + "SearchEntry", + ["key", "value", "event_id", "room_id", "stream_ordering", "origin_server_ts"], +) + + +class SearchWorkerStore(SQLBaseStore): + def store_search_entries_txn(self, txn, entries): + """Add entries to the search table + + Args: + txn (cursor): + entries (iterable[SearchEntry]): + entries to be added to the table + """ + if not self.hs.config.enable_search: + return + if isinstance(self.database_engine, PostgresEngine): + sql = ( + "INSERT INTO event_search" + " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)" + " VALUES (?,?,?,to_tsvector('english', ?),?,?)" + ) + + args = ( + ( + entry.event_id, + entry.room_id, + entry.key, + entry.value, + entry.stream_ordering, + entry.origin_server_ts, + ) + for entry in entries + ) + + txn.executemany(sql, args) + + elif isinstance(self.database_engine, Sqlite3Engine): + sql = ( + "INSERT INTO event_search (event_id, room_id, key, value)" + " VALUES (?,?,?,?)" + ) + args = ( + (entry.event_id, entry.room_id, entry.key, entry.value) + for entry in entries + ) + + txn.executemany(sql, args) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + +class SearchBackgroundUpdateStore(SearchWorkerStore): + + EVENT_SEARCH_UPDATE_NAME = "event_search" + EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" + EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" + EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin" + + def __init__(self, database: DatabasePool, db_conn, hs): + super(SearchBackgroundUpdateStore, self).__init__(database, db_conn, hs) + + if not hs.config.enable_search: + return + + self.db_pool.updates.register_background_update_handler( + self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search + ) + self.db_pool.updates.register_background_update_handler( + self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order + ) + + # we used to have a background update to turn the GIN index into a + # GIST one; we no longer do that (obviously) because we actually want + # a GIN index. However, it's possible that some people might still have + # the background update queued, so we register a handler to clear the + # background update. + self.db_pool.updates.register_noop_background_update( + self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME + ) + + self.db_pool.updates.register_background_update_handler( + self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search + ) + + @defer.inlineCallbacks + def _background_reindex_search(self, progress, batch_size): + # we work through the events table from highest stream id to lowest + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] + rows_inserted = progress.get("rows_inserted", 0) + + TYPES = ["m.room.name", "m.room.message", "m.room.topic"] + + def reindex_search_txn(txn): + sql = ( + "SELECT stream_ordering, event_id, room_id, type, json, " + " origin_server_ts FROM events" + " JOIN event_json USING (room_id, event_id)" + " WHERE ? <= stream_ordering AND stream_ordering < ?" + " AND (%s)" + " ORDER BY stream_ordering DESC" + " LIMIT ?" + ) % (" OR ".join("type = '%s'" % (t,) for t in TYPES),) + + txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) + + # we could stream straight from the results into + # store_search_entries_txn with a generator function, but that + # would mean having two cursors open on the database at once. + # Instead we just build a list of results. + rows = self.db_pool.cursor_to_dict(txn) + if not rows: + return 0 + + min_stream_id = rows[-1]["stream_ordering"] + + event_search_rows = [] + for row in rows: + try: + event_id = row["event_id"] + room_id = row["room_id"] + etype = row["type"] + stream_ordering = row["stream_ordering"] + origin_server_ts = row["origin_server_ts"] + try: + event_json = db_to_json(row["json"]) + content = event_json["content"] + except Exception: + continue + + if etype == "m.room.message": + key = "content.body" + value = content["body"] + elif etype == "m.room.topic": + key = "content.topic" + value = content["topic"] + elif etype == "m.room.name": + key = "content.name" + value = content["name"] + else: + raise Exception("unexpected event type %s" % etype) + except (KeyError, AttributeError): + # If the event is missing a necessary field then + # skip over it. + continue + + if not isinstance(value, str): + # If the event body, name or topic isn't a string + # then skip over it + continue + + event_search_rows.append( + SearchEntry( + key=key, + value=value, + event_id=event_id, + room_id=room_id, + stream_ordering=stream_ordering, + origin_server_ts=origin_server_ts, + ) + ) + + self.store_search_entries_txn(txn, event_search_rows) + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + "rows_inserted": rows_inserted + len(event_search_rows), + } + + self.db_pool.updates._background_update_progress_txn( + txn, self.EVENT_SEARCH_UPDATE_NAME, progress + ) + + return len(event_search_rows) + + result = yield self.db_pool.runInteraction( + self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn + ) + + if not result: + yield self.db_pool.updates._end_background_update( + self.EVENT_SEARCH_UPDATE_NAME + ) + + return result + + @defer.inlineCallbacks + def _background_reindex_gin_search(self, progress, batch_size): + """This handles old synapses which used GIST indexes, if any; + converting them back to be GIN as per the actual schema. + """ + + def create_index(conn): + conn.rollback() + + # we have to set autocommit, because postgres refuses to + # CREATE INDEX CONCURRENTLY without it. + conn.set_session(autocommit=True) + + try: + c = conn.cursor() + + # if we skipped the conversion to GIST, we may already/still + # have an event_search_fts_idx; unfortunately postgres 9.4 + # doesn't support CREATE INDEX IF EXISTS so we just catch the + # exception and ignore it. + import psycopg2 + + try: + c.execute( + "CREATE INDEX CONCURRENTLY event_search_fts_idx" + " ON event_search USING GIN (vector)" + ) + except psycopg2.ProgrammingError as e: + logger.warning( + "Ignoring error %r when trying to switch from GIST to GIN", e + ) + + # we should now be able to delete the GIST index. + c.execute("DROP INDEX IF EXISTS event_search_fts_idx_gist") + finally: + conn.set_session(autocommit=False) + + if isinstance(self.database_engine, PostgresEngine): + yield self.db_pool.runWithConnection(create_index) + + yield self.db_pool.updates._end_background_update( + self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME + ) + return 1 + + @defer.inlineCallbacks + def _background_reindex_search_order(self, progress, batch_size): + target_min_stream_id = progress["target_min_stream_id_inclusive"] + max_stream_id = progress["max_stream_id_exclusive"] + rows_inserted = progress.get("rows_inserted", 0) + have_added_index = progress["have_added_indexes"] + + if not have_added_index: + + def create_index(conn): + conn.rollback() + conn.set_session(autocommit=True) + c = conn.cursor() + + # We create with NULLS FIRST so that when we search *backwards* + # we get the ones with non null origin_server_ts *first* + c.execute( + "CREATE INDEX CONCURRENTLY event_search_room_order ON event_search(" + "room_id, origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)" + ) + c.execute( + "CREATE INDEX CONCURRENTLY event_search_order ON event_search(" + "origin_server_ts NULLS FIRST, stream_ordering NULLS FIRST)" + ) + conn.set_session(autocommit=False) + + yield self.db_pool.runWithConnection(create_index) + + pg = dict(progress) + pg["have_added_indexes"] = True + + yield self.db_pool.runInteraction( + self.EVENT_SEARCH_ORDER_UPDATE_NAME, + self.db_pool.updates._background_update_progress_txn, + self.EVENT_SEARCH_ORDER_UPDATE_NAME, + pg, + ) + + def reindex_search_txn(txn): + sql = ( + "UPDATE event_search AS es SET stream_ordering = e.stream_ordering," + " origin_server_ts = e.origin_server_ts" + " FROM events AS e" + " WHERE e.event_id = es.event_id" + " AND ? <= e.stream_ordering AND e.stream_ordering < ?" + " RETURNING es.stream_ordering" + ) + + min_stream_id = max_stream_id - batch_size + txn.execute(sql, (min_stream_id, max_stream_id)) + rows = txn.fetchall() + + if min_stream_id < target_min_stream_id: + # We've recached the end. + return len(rows), False + + progress = { + "target_min_stream_id_inclusive": target_min_stream_id, + "max_stream_id_exclusive": min_stream_id, + "rows_inserted": rows_inserted + len(rows), + "have_added_indexes": True, + } + + self.db_pool.updates._background_update_progress_txn( + txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress + ) + + return len(rows), True + + num_rows, finished = yield self.db_pool.runInteraction( + self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn + ) + + if not finished: + yield self.db_pool.updates._end_background_update( + self.EVENT_SEARCH_ORDER_UPDATE_NAME + ) + + return num_rows + + +class SearchStore(SearchBackgroundUpdateStore): + def __init__(self, database: DatabasePool, db_conn, hs): + super(SearchStore, self).__init__(database, db_conn, hs) + + @defer.inlineCallbacks + def search_msgs(self, room_ids, search_term, keys): + """Performs a full text search over events with given keys. + + Args: + room_ids (list): List of room ids to search in + search_term (str): Search term to search for + keys (list): List of keys to search in, currently supports + "content.body", "content.name", "content.topic" + + Returns: + list of dicts + """ + clauses = [] + + search_query = _parse_query(self.database_engine, search_term) + + args = [] + + # Make sure we don't explode because the person is in too many rooms. + # We filter the results below regardless. + if len(room_ids) < 500: + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) + clauses = [clause] + + local_clauses = [] + for key in keys: + local_clauses.append("key = ?") + args.append(key) + + clauses.append("(%s)" % (" OR ".join(local_clauses),)) + + count_args = args + count_clauses = clauses + + if isinstance(self.database_engine, PostgresEngine): + sql = ( + "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) AS rank," + " room_id, event_id" + " FROM event_search" + " WHERE vector @@ to_tsquery('english', ?)" + ) + args = [search_query, search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE vector @@ to_tsquery('english', ?)" + ) + count_args = [search_query] + count_args + elif isinstance(self.database_engine, Sqlite3Engine): + sql = ( + "SELECT rank(matchinfo(event_search)) as rank, room_id, event_id" + " FROM event_search" + " WHERE value MATCH ?" + ) + args = [search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE value MATCH ?" + ) + count_args = [search_term] + count_args + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + for clause in clauses: + sql += " AND " + clause + + for clause in count_clauses: + count_sql += " AND " + clause + + # We add an arbitrary limit here to ensure we don't try to pull the + # entire table from the database. + sql += " ORDER BY rank DESC LIMIT 500" + + results = yield self.db_pool.execute( + "search_msgs", self.db_pool.cursor_to_dict, sql, *args + ) + + results = list(filter(lambda row: row["room_id"] in room_ids, results)) + + # We set redact_behaviour to BLOCK here to prevent redacted events being returned in + # search results (which is a data leak) + events = yield self.get_events_as_list( + [r["event_id"] for r in results], + redact_behaviour=EventRedactBehaviour.BLOCK, + ) + + event_map = {ev.event_id: ev for ev in events} + + highlights = None + if isinstance(self.database_engine, PostgresEngine): + highlights = yield self._find_highlights_in_postgres(search_query, events) + + count_sql += " GROUP BY room_id" + + count_results = yield self.db_pool.execute( + "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args + ) + + count = sum(row["count"] for row in count_results if row["room_id"] in room_ids) + + return { + "results": [ + {"event": event_map[r["event_id"]], "rank": r["rank"]} + for r in results + if r["event_id"] in event_map + ], + "highlights": highlights, + "count": count, + } + + @defer.inlineCallbacks + def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None): + """Performs a full text search over events with given keys. + + Args: + room_id (list): The room_ids to search in + search_term (str): Search term to search for + keys (list): List of keys to search in, currently supports + "content.body", "content.name", "content.topic" + pagination_token (str): A pagination token previously returned + + Returns: + list of dicts + """ + clauses = [] + + search_query = _parse_query(self.database_engine, search_term) + + args = [] + + # Make sure we don't explode because the person is in too many rooms. + # We filter the results below regardless. + if len(room_ids) < 500: + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) + clauses = [clause] + + local_clauses = [] + for key in keys: + local_clauses.append("key = ?") + args.append(key) + + clauses.append("(%s)" % (" OR ".join(local_clauses),)) + + # take copies of the current args and clauses lists, before adding + # pagination clauses to main query. + count_args = list(args) + count_clauses = list(clauses) + + if pagination_token: + try: + origin_server_ts, stream = pagination_token.split(",") + origin_server_ts = int(origin_server_ts) + stream = int(stream) + except Exception: + raise SynapseError(400, "Invalid pagination token") + + clauses.append( + "(origin_server_ts < ?" + " OR (origin_server_ts = ? AND stream_ordering < ?))" + ) + args.extend([origin_server_ts, origin_server_ts, stream]) + + if isinstance(self.database_engine, PostgresEngine): + sql = ( + "SELECT ts_rank_cd(vector, to_tsquery('english', ?)) as rank," + " origin_server_ts, stream_ordering, room_id, event_id" + " FROM event_search" + " WHERE vector @@ to_tsquery('english', ?) AND " + ) + args = [search_query, search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE vector @@ to_tsquery('english', ?) AND " + ) + count_args = [search_query] + count_args + elif isinstance(self.database_engine, Sqlite3Engine): + # We use CROSS JOIN here to ensure we use the right indexes. + # https://sqlite.org/optoverview.html#crossjoin + # + # We want to use the full text search index on event_search to + # extract all possible matches first, then lookup those matches + # in the events table to get the topological ordering. We need + # to use the indexes in this order because sqlite refuses to + # MATCH unless it uses the full text search index + sql = ( + "SELECT rank(matchinfo) as rank, room_id, event_id," + " origin_server_ts, stream_ordering" + " FROM (SELECT key, event_id, matchinfo(event_search) as matchinfo" + " FROM event_search" + " WHERE value MATCH ?" + " )" + " CROSS JOIN events USING (event_id)" + " WHERE " + ) + args = [search_query] + args + + count_sql = ( + "SELECT room_id, count(*) as count FROM event_search" + " WHERE value MATCH ? AND " + ) + count_args = [search_term] + count_args + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") + + sql += " AND ".join(clauses) + count_sql += " AND ".join(count_clauses) + + # We add an arbitrary limit here to ensure we don't try to pull the + # entire table from the database. + if isinstance(self.database_engine, PostgresEngine): + sql += ( + " ORDER BY origin_server_ts DESC NULLS LAST," + " stream_ordering DESC NULLS LAST LIMIT ?" + ) + elif isinstance(self.database_engine, Sqlite3Engine): + sql += " ORDER BY origin_server_ts DESC, stream_ordering DESC LIMIT ?" + else: + raise Exception("Unrecognized database engine") + + args.append(limit) + + results = yield self.db_pool.execute( + "search_rooms", self.db_pool.cursor_to_dict, sql, *args + ) + + results = list(filter(lambda row: row["room_id"] in room_ids, results)) + + # We set redact_behaviour to BLOCK here to prevent redacted events being returned in + # search results (which is a data leak) + events = yield self.get_events_as_list( + [r["event_id"] for r in results], + redact_behaviour=EventRedactBehaviour.BLOCK, + ) + + event_map = {ev.event_id: ev for ev in events} + + highlights = None + if isinstance(self.database_engine, PostgresEngine): + highlights = yield self._find_highlights_in_postgres(search_query, events) + + count_sql += " GROUP BY room_id" + + count_results = yield self.db_pool.execute( + "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args + ) + + count = sum(row["count"] for row in count_results if row["room_id"] in room_ids) + + return { + "results": [ + { + "event": event_map[r["event_id"]], + "rank": r["rank"], + "pagination_token": "%s,%s" + % (r["origin_server_ts"], r["stream_ordering"]), + } + for r in results + if r["event_id"] in event_map + ], + "highlights": highlights, + "count": count, + } + + def _find_highlights_in_postgres(self, search_query, events): + """Given a list of events and a search term, return a list of words + that match from the content of the event. + + This is used to give a list of words that clients can match against to + highlight the matching parts. + + Args: + search_query (str) + events (list): A list of events + + Returns: + deferred : A set of strings. + """ + + def f(txn): + highlight_words = set() + for event in events: + # As a hack we simply join values of all possible keys. This is + # fine since we're only using them to find possible highlights. + values = [] + for key in ("body", "name", "topic"): + v = event.content.get(key, None) + if v: + values.append(v) + + if not values: + continue + + value = " ".join(values) + + # We need to find some values for StartSel and StopSel that + # aren't in the value so that we can pick results out. + start_sel = "<" + stop_sel = ">" + + while start_sel in value: + start_sel += "<" + while stop_sel in value: + stop_sel += ">" + + query = "SELECT ts_headline(?, to_tsquery('english', ?), %s)" % ( + _to_postgres_options( + { + "StartSel": start_sel, + "StopSel": stop_sel, + "MaxFragments": "50", + } + ) + ) + txn.execute(query, (value, search_query)) + (headline,) = txn.fetchall()[0] + + # Now we need to pick the possible highlights out of the haedline + # result. + matcher_regex = "%s(.*?)%s" % ( + re.escape(start_sel), + re.escape(stop_sel), + ) + + res = re.findall(matcher_regex, headline) + highlight_words.update([r.lower() for r in res]) + + return highlight_words + + return self.db_pool.runInteraction("_find_highlights", f) + + +def _to_postgres_options(options_dict): + return "'%s'" % (",".join("%s=%s" % (k, v) for k, v in options_dict.items()),) + + +def _parse_query(database_engine, search_term): + """Takes a plain unicode string from the user and converts it into a form + that can be passed to database. + We use this so that we can add prefix matching, which isn't something + that is supported by default. + """ + + # Pull out the individual words, discarding any non-word characters. + results = re.findall(r"([\w\-]+)", search_term, re.UNICODE) + + if isinstance(database_engine, PostgresEngine): + return " & ".join(result + ":*" for result in results) + elif isinstance(database_engine, Sqlite3Engine): + return " & ".join(result + "*" for result in results) + else: + # This should be unreachable. + raise Exception("Unrecognized database engine") -- cgit 1.5.1 From f3fe6961b211d898aa347771df598c531fbca90c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 7 Aug 2020 12:17:17 -0400 Subject: Convert additional database stores to async/await (#8045) --- changelog.d/8045.misc | 1 + synapse/storage/databases/main/client_ips.py | 54 +++++----- synapse/storage/databases/main/search.py | 69 ++++++------- synapse/storage/databases/main/signatures.py | 7 +- synapse/storage/databases/main/user_directory.py | 124 ++++++++--------------- tests/storage/test_user_directory.py | 4 +- 6 files changed, 107 insertions(+), 152 deletions(-) create mode 100644 changelog.d/8045.misc (limited to 'synapse/storage/databases/main/search.py') diff --git a/changelog.d/8045.misc b/changelog.d/8045.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8045.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index 50d71f5ebc..216a5925fc 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -14,8 +14,7 @@ # limitations under the License. import logging - -from twisted.internet import defer +from typing import Dict, Optional, Tuple from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore @@ -82,21 +81,19 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): "devices_last_seen", self._devices_last_seen_update ) - @defer.inlineCallbacks - def _remove_user_ip_nonunique(self, progress, batch_size): + async def _remove_user_ip_nonunique(self, progress, batch_size): def f(conn): txn = conn.cursor() txn.execute("DROP INDEX IF EXISTS user_ips_user_ip") txn.close() - yield self.db_pool.runWithConnection(f) - yield self.db_pool.updates._end_background_update( + await self.db_pool.runWithConnection(f) + await self.db_pool.updates._end_background_update( "user_ips_drop_nonunique_index" ) return 1 - @defer.inlineCallbacks - def _analyze_user_ip(self, progress, batch_size): + async def _analyze_user_ip(self, progress, batch_size): # Background update to analyze user_ips table before we run the # deduplication background update. The table may not have been analyzed # for ages due to the table locks. @@ -106,14 +103,13 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): def user_ips_analyze(txn): txn.execute("ANALYZE user_ips") - yield self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze) + await self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze) - yield self.db_pool.updates._end_background_update("user_ips_analyze") + await self.db_pool.updates._end_background_update("user_ips_analyze") return 1 - @defer.inlineCallbacks - def _remove_user_ip_dupes(self, progress, batch_size): + async def _remove_user_ip_dupes(self, progress, batch_size): # This works function works by scanning the user_ips table in batches # based on `last_seen`. For each row in a batch it searches the rest of # the table to see if there are any duplicates, if there are then they @@ -140,7 +136,7 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): return None # Get a last seen that has roughly `batch_size` since `begin_last_seen` - end_last_seen = yield self.db_pool.runInteraction( + end_last_seen = await self.db_pool.runInteraction( "user_ips_dups_get_last_seen", get_last_seen ) @@ -275,15 +271,14 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): txn, "user_ips_remove_dupes", {"last_seen": end_last_seen} ) - yield self.db_pool.runInteraction("user_ips_dups_remove", remove) + await self.db_pool.runInteraction("user_ips_dups_remove", remove) if last: - yield self.db_pool.updates._end_background_update("user_ips_remove_dupes") + await self.db_pool.updates._end_background_update("user_ips_remove_dupes") return batch_size - @defer.inlineCallbacks - def _devices_last_seen_update(self, progress, batch_size): + async def _devices_last_seen_update(self, progress, batch_size): """Background update to insert last seen info into devices table """ @@ -346,12 +341,12 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore): return len(rows) - updated = yield self.db_pool.runInteraction( + updated = await self.db_pool.runInteraction( "_devices_last_seen_update", _devices_last_seen_update_txn ) if not updated: - yield self.db_pool.updates._end_background_update("devices_last_seen") + await self.db_pool.updates._end_background_update("devices_last_seen") return updated @@ -460,25 +455,25 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): # Failed to upsert, log and continue logger.error("Failed to insert client IP %r: %r", entry, e) - @defer.inlineCallbacks - def get_last_client_ip_by_device(self, user_id, device_id): + async def get_last_client_ip_by_device( + self, user_id: str, device_id: Optional[str] + ) -> Dict[Tuple[str, str], dict]: """For each device_id listed, give the user_ip it was last seen on Args: - user_id (str) - device_id (str): If None fetches all devices for the user + user_id: The user to fetch devices for. + device_id: If None fetches all devices for the user Returns: - defer.Deferred: resolves to a dict, where the keys - are (user_id, device_id) tuples. The values are also dicts, with - keys giving the column names + A dictionary mapping a tuple of (user_id, device_id) to dicts, with + keys giving the column names from the devices table. """ keyvalues = {"user_id": user_id} if device_id is not None: keyvalues["device_id"] = device_id - res = yield self.db_pool.simple_select_list( + res = await self.db_pool.simple_select_list( table="devices", keyvalues=keyvalues, retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"), @@ -500,8 +495,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): } return ret - @defer.inlineCallbacks - def get_user_ip_and_agents(self, user): + async def get_user_ip_and_agents(self, user): user_id = user.to_string() results = {} @@ -511,7 +505,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore): user_agent, _, last_seen = self._batch_row_update[key] results[(access_token, ip)] = (user_agent, last_seen) - rows = yield self.db_pool.simple_select_list( + rows = await self.db_pool.simple_select_list( table="user_ips", keyvalues={"user_id": user_id}, retcols=["access_token", "ip", "user_agent", "last_seen"], diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index 2162d0712d..7f8d1880e5 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -16,8 +16,7 @@ import logging import re from collections import namedtuple - -from twisted.internet import defer +from typing import List, Optional from synapse.api.errors import SynapseError from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -114,8 +113,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search ) - @defer.inlineCallbacks - def _background_reindex_search(self, progress, batch_size): + async def _background_reindex_search(self, progress, batch_size): # we work through the events table from highest stream id to lowest target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -206,19 +204,18 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): return len(event_search_rows) - result = yield self.db_pool.runInteraction( + result = await self.db_pool.runInteraction( self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn ) if not result: - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( self.EVENT_SEARCH_UPDATE_NAME ) return result - @defer.inlineCallbacks - def _background_reindex_gin_search(self, progress, batch_size): + async def _background_reindex_gin_search(self, progress, batch_size): """This handles old synapses which used GIST indexes, if any; converting them back to be GIN as per the actual schema. """ @@ -255,15 +252,14 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): conn.set_session(autocommit=False) if isinstance(self.database_engine, PostgresEngine): - yield self.db_pool.runWithConnection(create_index) + await self.db_pool.runWithConnection(create_index) - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME ) return 1 - @defer.inlineCallbacks - def _background_reindex_search_order(self, progress, batch_size): + async def _background_reindex_search_order(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] rows_inserted = progress.get("rows_inserted", 0) @@ -288,12 +284,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): ) conn.set_session(autocommit=False) - yield self.db_pool.runWithConnection(create_index) + await self.db_pool.runWithConnection(create_index) pg = dict(progress) pg["have_added_indexes"] = True - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( self.EVENT_SEARCH_ORDER_UPDATE_NAME, self.db_pool.updates._background_update_progress_txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, @@ -331,12 +327,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): return len(rows), True - num_rows, finished = yield self.db_pool.runInteraction( + num_rows, finished = await self.db_pool.runInteraction( self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn ) if not finished: - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( self.EVENT_SEARCH_ORDER_UPDATE_NAME ) @@ -347,8 +343,7 @@ class SearchStore(SearchBackgroundUpdateStore): def __init__(self, database: DatabasePool, db_conn, hs): super(SearchStore, self).__init__(database, db_conn, hs) - @defer.inlineCallbacks - def search_msgs(self, room_ids, search_term, keys): + async def search_msgs(self, room_ids, search_term, keys): """Performs a full text search over events with given keys. Args: @@ -425,7 +420,7 @@ class SearchStore(SearchBackgroundUpdateStore): # entire table from the database. sql += " ORDER BY rank DESC LIMIT 500" - results = yield self.db_pool.execute( + results = await self.db_pool.execute( "search_msgs", self.db_pool.cursor_to_dict, sql, *args ) @@ -433,7 +428,7 @@ class SearchStore(SearchBackgroundUpdateStore): # We set redact_behaviour to BLOCK here to prevent redacted events being returned in # search results (which is a data leak) - events = yield self.get_events_as_list( + events = await self.get_events_as_list( [r["event_id"] for r in results], redact_behaviour=EventRedactBehaviour.BLOCK, ) @@ -442,11 +437,11 @@ class SearchStore(SearchBackgroundUpdateStore): highlights = None if isinstance(self.database_engine, PostgresEngine): - highlights = yield self._find_highlights_in_postgres(search_query, events) + highlights = await self._find_highlights_in_postgres(search_query, events) count_sql += " GROUP BY room_id" - count_results = yield self.db_pool.execute( + count_results = await self.db_pool.execute( "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args ) @@ -462,19 +457,25 @@ class SearchStore(SearchBackgroundUpdateStore): "count": count, } - @defer.inlineCallbacks - def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None): + async def search_rooms( + self, + room_ids: List[str], + search_term: str, + keys: List[str], + limit, + pagination_token: Optional[str] = None, + ) -> List[dict]: """Performs a full text search over events with given keys. Args: - room_id (list): The room_ids to search in - search_term (str): Search term to search for - keys (list): List of keys to search in, currently supports - "content.body", "content.name", "content.topic" - pagination_token (str): A pagination token previously returned + room_ids: The room_ids to search in + search_term: Search term to search for + keys: List of keys to search in, currently supports "content.body", + "content.name", "content.topic" + pagination_token: A pagination token previously returned Returns: - list of dicts + Each match as a dictionary. """ clauses = [] @@ -577,7 +578,7 @@ class SearchStore(SearchBackgroundUpdateStore): args.append(limit) - results = yield self.db_pool.execute( + results = await self.db_pool.execute( "search_rooms", self.db_pool.cursor_to_dict, sql, *args ) @@ -585,7 +586,7 @@ class SearchStore(SearchBackgroundUpdateStore): # We set redact_behaviour to BLOCK here to prevent redacted events being returned in # search results (which is a data leak) - events = yield self.get_events_as_list( + events = await self.get_events_as_list( [r["event_id"] for r in results], redact_behaviour=EventRedactBehaviour.BLOCK, ) @@ -594,11 +595,11 @@ class SearchStore(SearchBackgroundUpdateStore): highlights = None if isinstance(self.database_engine, PostgresEngine): - highlights = yield self._find_highlights_in_postgres(search_query, events) + highlights = await self._find_highlights_in_postgres(search_query, events) count_sql += " GROUP BY room_id" - count_results = yield self.db_pool.execute( + count_results = await self.db_pool.execute( "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args ) diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py index dae8e8bd29..be191dd870 100644 --- a/synapse/storage/databases/main/signatures.py +++ b/synapse/storage/databases/main/signatures.py @@ -15,8 +15,6 @@ from unpaddedbase64 import encode_base64 -from twisted.internet import defer - from synapse.storage._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList @@ -40,9 +38,8 @@ class SignatureWorkerStore(SQLBaseStore): return self.db_pool.runInteraction("get_event_reference_hashes", f) - @defer.inlineCallbacks - def add_event_hashes(self, event_ids): - hashes = yield self.get_event_reference_hashes(event_ids) + async def add_event_hashes(self, event_ids): + hashes = await self.get_event_reference_hashes(event_ids) hashes = { e_id: {k: encode_base64(v) for k, v in h.items() if k == "sha256"} for e_id, h in hashes.items() diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index d73a8e8ab9..af21fe457a 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -16,8 +16,6 @@ import logging import re -from twisted.internet import defer - from synapse.api.constants import EventTypes, JoinRules from synapse.storage.database import DatabasePool from synapse.storage.databases.main.state import StateFilter @@ -59,8 +57,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): "populate_user_directory_cleanup", self._populate_user_directory_cleanup ) - @defer.inlineCallbacks - def _populate_user_directory_createtables(self, progress, batch_size): + async def _populate_user_directory_createtables(self, progress, batch_size): # Get all the rooms that we want to process. def _make_staging_area(txn): @@ -102,45 +99,43 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users) - new_pos = yield self.get_max_stream_id_in_current_state_deltas() - yield self.db_pool.runInteraction( + new_pos = await self.get_max_stream_id_in_current_state_deltas() + await self.db_pool.runInteraction( "populate_user_directory_temp_build", _make_staging_area ) - yield self.db_pool.simple_insert( + await self.db_pool.simple_insert( TEMP_TABLE + "_position", {"position": new_pos} ) - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( "populate_user_directory_createtables" ) return 1 - @defer.inlineCallbacks - def _populate_user_directory_cleanup(self, progress, batch_size): + async def _populate_user_directory_cleanup(self, progress, batch_size): """ Update the user directory stream position, then clean up the old tables. """ - position = yield self.db_pool.simple_select_one_onecol( + position = await self.db_pool.simple_select_one_onecol( TEMP_TABLE + "_position", None, "position" ) - yield self.update_user_directory_stream_pos(position) + await self.update_user_directory_stream_pos(position) def _delete_staging_area(txn): txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users") txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "populate_user_directory_cleanup", _delete_staging_area ) - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( "populate_user_directory_cleanup" ) return 1 - @defer.inlineCallbacks - def _populate_user_directory_process_rooms(self, progress, batch_size): + async def _populate_user_directory_process_rooms(self, progress, batch_size): """ Args: progress (dict) @@ -151,7 +146,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): # If we don't have progress filed, delete everything. if not progress: - yield self.delete_all_from_user_dir() + await self.delete_all_from_user_dir() def _get_next_batch(txn): # Only fetch 250 rooms, so we don't fetch too many at once, even @@ -176,13 +171,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return rooms_to_work_on - rooms_to_work_on = yield self.db_pool.runInteraction( + rooms_to_work_on = await self.db_pool.runInteraction( "populate_user_directory_temp_read", _get_next_batch ) # No more rooms -- complete the transaction. if not rooms_to_work_on: - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( "populate_user_directory_process_rooms" ) return 1 @@ -195,21 +190,19 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): processed_event_count = 0 for room_id, event_count in rooms_to_work_on: - is_in_room = yield self.is_host_joined(room_id, self.server_name) + is_in_room = await self.is_host_joined(room_id, self.server_name) if is_in_room: - is_public = yield self.is_room_world_readable_or_publicly_joinable( + is_public = await self.is_room_world_readable_or_publicly_joinable( room_id ) - users_with_profile = yield defer.ensureDeferred( - state.get_current_users_in_room(room_id) - ) + users_with_profile = await state.get_current_users_in_room(room_id) user_ids = set(users_with_profile) # Update each user in the user directory. for user_id, profile in users_with_profile.items(): - yield self.update_profile_in_user_dir( + await self.update_profile_in_user_dir( user_id, profile.display_name, profile.avatar_url ) @@ -223,7 +216,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): to_insert.add(user_id) if to_insert: - yield self.add_users_in_public_rooms(room_id, to_insert) + await self.add_users_in_public_rooms(room_id, to_insert) to_insert.clear() else: for user_id in user_ids: @@ -243,22 +236,22 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): # If it gets too big, stop and write to the database # to prevent storing too much in RAM. if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET: - yield self.add_users_who_share_private_room( + await self.add_users_who_share_private_room( room_id, to_insert ) to_insert.clear() if to_insert: - yield self.add_users_who_share_private_room(room_id, to_insert) + await self.add_users_who_share_private_room(room_id, to_insert) to_insert.clear() # We've finished a room. Delete it from the table. - yield self.db_pool.simple_delete_one( + await self.db_pool.simple_delete_one( TEMP_TABLE + "_rooms", {"room_id": room_id} ) # Update the remaining counter. progress["remaining"] -= 1 - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "populate_user_directory", self.db_pool.updates._background_update_progress_txn, "populate_user_directory_process_rooms", @@ -273,13 +266,12 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return processed_event_count - @defer.inlineCallbacks - def _populate_user_directory_process_users(self, progress, batch_size): + async def _populate_user_directory_process_users(self, progress, batch_size): """ If search_all_users is enabled, add all of the users to the user directory. """ if not self.hs.config.user_directory_search_all_users: - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( "populate_user_directory_process_users" ) return 1 @@ -305,13 +297,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return users_to_work_on - users_to_work_on = yield self.db_pool.runInteraction( + users_to_work_on = await self.db_pool.runInteraction( "populate_user_directory_temp_read", _get_next_batch ) # No more users -- complete the transaction. if not users_to_work_on: - yield self.db_pool.updates._end_background_update( + await self.db_pool.updates._end_background_update( "populate_user_directory_process_users" ) return 1 @@ -322,18 +314,18 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) for user_id in users_to_work_on: - profile = yield self.get_profileinfo(get_localpart_from_id(user_id)) - yield self.update_profile_in_user_dir( + profile = await self.get_profileinfo(get_localpart_from_id(user_id)) + await self.update_profile_in_user_dir( user_id, profile.display_name, profile.avatar_url ) # We've finished processing a user. Delete it from the table. - yield self.db_pool.simple_delete_one( + await self.db_pool.simple_delete_one( TEMP_TABLE + "_users", {"user_id": user_id} ) # Update the remaining counter. progress["remaining"] -= 1 - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "populate_user_directory", self.db_pool.updates._background_update_progress_txn, "populate_user_directory_process_users", @@ -342,8 +334,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return len(users_to_work_on) - @defer.inlineCallbacks - def is_room_world_readable_or_publicly_joinable(self, room_id): + async def is_room_world_readable_or_publicly_joinable(self, room_id): """Check if the room is either world_readable or publically joinable """ @@ -353,20 +344,20 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): (EventTypes.RoomHistoryVisibility, ""), ) - current_state_ids = yield self.get_filtered_current_state_ids( + current_state_ids = await self.get_filtered_current_state_ids( room_id, StateFilter.from_types(types_to_filter) ) join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) if join_rules_id: - join_rule_ev = yield self.get_event(join_rules_id, allow_none=True) + join_rule_ev = await self.get_event(join_rules_id, allow_none=True) if join_rule_ev: if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC: return True hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, "")) if hist_vis_id: - hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True) + hist_vis_ev = await self.get_event(hist_vis_id, allow_none=True) if hist_vis_ev: if hist_vis_ev.content.get("history_visibility") == "world_readable": return True @@ -590,19 +581,18 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): "remove_from_user_dir", _remove_from_user_dir_txn ) - @defer.inlineCallbacks - def get_users_in_dir_due_to_room(self, room_id): + async def get_users_in_dir_due_to_room(self, room_id): """Get all user_ids that are in the room directory because they're in the given room_id """ - user_ids_share_pub = yield self.db_pool.simple_select_onecol( + user_ids_share_pub = await self.db_pool.simple_select_onecol( table="users_in_public_rooms", keyvalues={"room_id": room_id}, retcol="user_id", desc="get_users_in_dir_due_to_room", ) - user_ids_share_priv = yield self.db_pool.simple_select_onecol( + user_ids_share_priv = await self.db_pool.simple_select_onecol( table="users_who_share_private_rooms", keyvalues={"room_id": room_id}, retcol="other_user_id", @@ -645,8 +635,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): "remove_user_who_share_room", _remove_user_who_share_room_txn ) - @defer.inlineCallbacks - def get_user_dir_rooms_user_is_in(self, user_id): + async def get_user_dir_rooms_user_is_in(self, user_id): """ Returns the rooms that a user is in. @@ -656,14 +645,14 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): Returns: list: user_id """ - rows = yield self.db_pool.simple_select_onecol( + rows = await self.db_pool.simple_select_onecol( table="users_who_share_private_rooms", keyvalues={"user_id": user_id}, retcol="room_id", desc="get_rooms_user_is_in", ) - pub_rows = yield self.db_pool.simple_select_onecol( + pub_rows = await self.db_pool.simple_select_onecol( table="users_in_public_rooms", keyvalues={"user_id": user_id}, retcol="room_id", @@ -674,32 +663,6 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): users.update(rows) return list(users) - @defer.inlineCallbacks - def get_rooms_in_common_for_users(self, user_id, other_user_id): - """Given two user_ids find out the list of rooms they share. - """ - sql = """ - SELECT room_id FROM ( - SELECT c.room_id FROM current_state_events AS c - INNER JOIN room_memberships AS m USING (event_id) - WHERE type = 'm.room.member' - AND m.membership = 'join' - AND state_key = ? - ) AS f1 INNER JOIN ( - SELECT c.room_id FROM current_state_events AS c - INNER JOIN room_memberships AS m USING (event_id) - WHERE type = 'm.room.member' - AND m.membership = 'join' - AND state_key = ? - ) f2 USING (room_id) - """ - - rows = yield self.db_pool.execute( - "get_rooms_in_common_for_users", None, sql, user_id, other_user_id - ) - - return [room_id for room_id, in rows] - def get_user_directory_stream_pos(self): return self.db_pool.simple_select_one_onecol( table="user_directory_stream_pos", @@ -708,8 +671,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): desc="get_user_directory_stream_pos", ) - @defer.inlineCallbacks - def search_user_dir(self, user_id, search_term, limit): + async def search_user_dir(self, user_id, search_term, limit): """Searches for users in directory Returns: @@ -806,7 +768,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore): # This should be unreachable. raise Exception("Unrecognized database engine") - results = yield self.db_pool.execute( + results = await self.db_pool.execute( "search_user_dir", self.db_pool.cursor_to_dict, sql, *args ) diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py index 6a545d2eb0..ecfafe68a9 100644 --- a/tests/storage/test_user_directory.py +++ b/tests/storage/test_user_directory.py @@ -40,7 +40,7 @@ class UserDirectoryStoreTestCase(unittest.TestCase): def test_search_user_dir(self): # normally when alice searches the directory she should just find # bob because bobby doesn't share a room with her. - r = yield self.store.search_user_dir(ALICE, "bob", 10) + r = yield defer.ensureDeferred(self.store.search_user_dir(ALICE, "bob", 10)) self.assertFalse(r["limited"]) self.assertEqual(1, len(r["results"])) self.assertDictEqual( @@ -51,7 +51,7 @@ class UserDirectoryStoreTestCase(unittest.TestCase): def test_search_user_dir_all_users(self): self.hs.config.user_directory_search_all_users = True try: - r = yield self.store.search_user_dir(ALICE, "bob", 10) + r = yield defer.ensureDeferred(self.store.search_user_dir(ALICE, "bob", 10)) self.assertFalse(r["limited"]) self.assertEqual(2, len(r["results"])) self.assertDictEqual( -- cgit 1.5.1