summary refs log tree commit diff
path: root/synapse/storage/user_directory.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/user_directory.py')
-rw-r--r--synapse/storage/user_directory.py504
1 files changed, 342 insertions, 162 deletions
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py

index 2317d22ed6..d360e857d1 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py
@@ -16,22 +16,301 @@ import logging import re -from six import iteritems - from twisted.internet import defer from synapse.api.constants import EventTypes, JoinRules +from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.state import StateFilter from synapse.types import get_domain_from_id, get_localpart_from_id -from synapse.util.caches.descriptors import cached, cachedInlineCallbacks - -from ._base import SQLBaseStore +from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) -class UserDirectoryStore(SQLBaseStore): +TEMP_TABLE = "_temp_populate_user_directory" + + +class UserDirectoryStore(BackgroundUpdateStore): + + # How many records do we calculate before sending it to + # add_users_who_share_private_rooms? + SHARE_PRIVATE_WORKING_SET = 500 + + def __init__(self, db_conn, hs): + super(UserDirectoryStore, self).__init__(db_conn, hs) + + self.server_name = hs.hostname + + self.register_background_update_handler( + "populate_user_directory_createtables", + self._populate_user_directory_createtables, + ) + self.register_background_update_handler( + "populate_user_directory_process_rooms", + self._populate_user_directory_process_rooms, + ) + self.register_background_update_handler( + "populate_user_directory_process_users", + self._populate_user_directory_process_users, + ) + self.register_background_update_handler( + "populate_user_directory_cleanup", self._populate_user_directory_cleanup + ) + + @defer.inlineCallbacks + def _populate_user_directory_createtables(self, progress, batch_size): + + # Get all the rooms that we want to process. + def _make_staging_area(txn): + sql = ( + "CREATE TABLE IF NOT EXISTS " + + TEMP_TABLE + + "_rooms(room_id TEXT NOT NULL, events BIGINT NOT NULL)" + ) + txn.execute(sql) + + sql = ( + "CREATE TABLE IF NOT EXISTS " + + TEMP_TABLE + + "_position(position TEXT NOT NULL)" + ) + txn.execute(sql) + + # Get rooms we want to process from the database + sql = """ + SELECT room_id, count(*) FROM current_state_events + GROUP BY room_id + """ + txn.execute(sql) + rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()] + self._simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms) + del rooms + + # If search all users is on, get all the users we want to add. + if self.hs.config.user_directory_search_all_users: + sql = ( + "CREATE TABLE IF NOT EXISTS " + + TEMP_TABLE + + "_users(user_id TEXT NOT NULL)" + ) + txn.execute(sql) + + txn.execute("SELECT name FROM users") + users = [{"user_id": x[0]} for x in txn.fetchall()] + + self._simple_insert_many_txn(txn, TEMP_TABLE + "_users", users) + + new_pos = yield self.get_max_stream_id_in_current_state_deltas() + yield self.runInteraction( + "populate_user_directory_temp_build", _make_staging_area + ) + yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) + + yield self._end_background_update("populate_user_directory_createtables") + defer.returnValue(1) + + @defer.inlineCallbacks + def _populate_user_directory_cleanup(self, progress, batch_size): + """ + Update the user directory stream position, then clean up the old tables. + """ + position = yield self._simple_select_one_onecol( + TEMP_TABLE + "_position", None, "position" + ) + yield self.update_user_directory_stream_pos(position) + + def _delete_staging_area(txn): + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users") + txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") + + yield self.runInteraction( + "populate_user_directory_cleanup", _delete_staging_area + ) + + yield self._end_background_update("populate_user_directory_cleanup") + defer.returnValue(1) + + @defer.inlineCallbacks + def _populate_user_directory_process_rooms(self, progress, batch_size): + + state = self.hs.get_state_handler() + + # If we don't have progress filed, delete everything. + if not progress: + yield self.delete_all_from_user_dir() + + def _get_next_batch(txn): + sql = """ + SELECT room_id FROM %s + ORDER BY events DESC + LIMIT %s + """ % ( + TEMP_TABLE + "_rooms", + str(batch_size), + ) + txn.execute(sql) + rooms_to_work_on = txn.fetchall() + + if not rooms_to_work_on: + return None + + rooms_to_work_on = [x[0] for x in rooms_to_work_on] + + # Get how many are left to process, so we can give status on how + # far we are in processing + txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") + progress["remaining"] = txn.fetchone()[0] + + return rooms_to_work_on + + rooms_to_work_on = yield self.runInteraction( + "populate_user_directory_temp_read", _get_next_batch + ) + + # No more rooms -- complete the transaction. + if not rooms_to_work_on: + yield self._end_background_update("populate_user_directory_process_rooms") + defer.returnValue(1) + + logger.info( + "Processing the next %d rooms of %d remaining" + % (len(rooms_to_work_on), progress["remaining"]) + ) + + for room_id in rooms_to_work_on: + is_in_room = yield self.is_host_joined(room_id, self.server_name) + + if is_in_room: + is_public = yield self.is_room_world_readable_or_publicly_joinable( + room_id + ) + + users_with_profile = yield state.get_current_user_in_room(room_id) + user_ids = set(users_with_profile) + + # Update each user in the user directory. + for user_id, profile in users_with_profile.items(): + yield self.update_profile_in_user_dir( + user_id, profile.display_name, profile.avatar_url + ) + + to_insert = set() + + if is_public: + for user_id in user_ids: + if self.get_if_app_services_interested_in_user(user_id): + continue + + to_insert.add(user_id) + + if to_insert: + yield self.add_users_in_public_rooms(room_id, to_insert) + to_insert.clear() + else: + for user_id in user_ids: + if not self.hs.is_mine_id(user_id): + continue + + if self.get_if_app_services_interested_in_user(user_id): + continue + + for other_user_id in user_ids: + if user_id == other_user_id: + continue + + user_set = (user_id, other_user_id) + to_insert.add(user_set) + + # If it gets too big, stop and write to the database + # to prevent storing too much in RAM. + if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET: + yield self.add_users_who_share_private_room( + room_id, to_insert + ) + to_insert.clear() + + if to_insert: + yield self.add_users_who_share_private_room(room_id, to_insert) + to_insert.clear() + + # We've finished a room. Delete it from the table. + yield self._simple_delete_one(TEMP_TABLE + "_rooms", {"room_id": room_id}) + # Update the remaining counter. + progress["remaining"] -= 1 + yield self.runInteraction( + "populate_user_directory", + self._background_update_progress_txn, + "populate_user_directory_process_rooms", + progress, + ) + + defer.returnValue(len(rooms_to_work_on)) + + @defer.inlineCallbacks + def _populate_user_directory_process_users(self, progress, batch_size): + """ + If search_all_users is enabled, add all of the users to the user directory. + """ + if not self.hs.config.user_directory_search_all_users: + yield self._end_background_update("populate_user_directory_process_users") + defer.returnValue(1) + + def _get_next_batch(txn): + sql = "SELECT user_id FROM %s LIMIT %s" % ( + TEMP_TABLE + "_users", + str(batch_size), + ) + txn.execute(sql) + users_to_work_on = txn.fetchall() + + if not users_to_work_on: + return None + + users_to_work_on = [x[0] for x in users_to_work_on] + + # Get how many are left to process, so we can give status on how + # far we are in processing + sql = "SELECT COUNT(*) FROM " + TEMP_TABLE + "_users" + txn.execute(sql) + progress["remaining"] = txn.fetchone()[0] + + return users_to_work_on + + users_to_work_on = yield self.runInteraction( + "populate_user_directory_temp_read", _get_next_batch + ) + + # No more users -- complete the transaction. + if not users_to_work_on: + yield self._end_background_update("populate_user_directory_process_users") + defer.returnValue(1) + + logger.info( + "Processing the next %d users of %d remaining" + % (len(users_to_work_on), progress["remaining"]) + ) + + for user_id in users_to_work_on: + profile = yield self.get_profileinfo(get_localpart_from_id(user_id)) + yield self.update_profile_in_user_dir( + user_id, profile.display_name, profile.avatar_url + ) + + # We've finished processing a user. Delete it from the table. + yield self._simple_delete_one(TEMP_TABLE + "_users", {"user_id": user_id}) + # Update the remaining counter. + progress["remaining"] -= 1 + yield self.runInteraction( + "populate_user_directory", + self._background_update_progress_txn, + "populate_user_directory_process_users", + progress, + ) + + defer.returnValue(len(users_to_work_on)) + @defer.inlineCallbacks def is_room_world_readable_or_publicly_joinable(self, room_id): """Check if the room is either world_readable or publically joinable @@ -63,89 +342,16 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(False) - def add_profiles_to_user_dir(self, users_with_profile): - """Add profiles to the user directory - - Args: - users_with_profile (dict): Users to add to directory in the form of - mapping of user_id -> ProfileInfo + def update_profile_in_user_dir(self, user_id, display_name, avatar_url): + """ + Update or add a user's profile in the user directory. """ - if isinstance(self.database_engine, PostgresEngine): - # We weight the loclpart most highly, then display name and finally - # server name - sql = """ - INSERT INTO user_directory_search(user_id, vector) - VALUES (?, - setweight(to_tsvector('english', ?), 'A') - || setweight(to_tsvector('english', ?), 'D') - || setweight(to_tsvector('english', COALESCE(?, '')), 'B') - ) - """ - args = ( - ( - user_id, - get_localpart_from_id(user_id), - get_domain_from_id(user_id), - profile.display_name, - ) - for user_id, profile in iteritems(users_with_profile) - ) - elif isinstance(self.database_engine, Sqlite3Engine): - sql = """ - INSERT INTO user_directory_search(user_id, value) - VALUES (?,?) - """ - args = tuple( - ( - user_id, - "%s %s" % (user_id, p.display_name) if p.display_name else user_id, - ) - for user_id, p in iteritems(users_with_profile) - ) - else: - # This should be unreachable. - raise Exception("Unrecognized database engine") - - def _add_profiles_to_user_dir_txn(txn): - txn.executemany(sql, args) - self._simple_insert_many_txn( - txn, - table="user_directory", - values=[ - { - "user_id": user_id, - "room_id": None, - "display_name": profile.display_name, - "avatar_url": profile.avatar_url, - } - for user_id, profile in iteritems(users_with_profile) - ], - ) - for user_id in users_with_profile: - txn.call_after(self.get_user_in_directory.invalidate, (user_id,)) - - return self.runInteraction( - "add_profiles_to_user_dir", _add_profiles_to_user_dir_txn - ) - - @defer.inlineCallbacks - def update_user_in_user_dir(self, user_id, room_id): - yield self._simple_update_one( - table="user_directory", - keyvalues={"user_id": user_id}, - updatevalues={"room_id": room_id}, - desc="update_user_in_user_dir", - ) - self.get_user_in_directory.invalidate((user_id,)) - - def update_profile_in_user_dir(self, user_id, display_name, avatar_url, room_id): def _update_profile_in_user_dir_txn(txn): new_entry = self._simple_upsert_txn( txn, table="user_directory", keyvalues={"user_id": user_id}, - insertion_values={"room_id": room_id}, values={"display_name": display_name, "avatar_url": avatar_url}, lock=False, # We're only inserter ) @@ -242,14 +448,7 @@ class UserDirectoryStore(SQLBaseStore): txn, table="user_directory_search", keyvalues={"user_id": user_id} ) self._simple_delete_txn( - txn, - table="users_who_share_public_rooms", - keyvalues={"user_id": user_id}, - ) - self._simple_delete_txn( - txn, - table="users_who_share_public_rooms", - keyvalues={"other_user_id": user_id}, + txn, table="users_in_public_rooms", keyvalues={"user_id": user_id} ) self._simple_delete_txn( txn, @@ -271,9 +470,9 @@ class UserDirectoryStore(SQLBaseStore): in the given room_id """ user_ids_share_pub = yield self._simple_select_onecol( - table="users_who_share_public_rooms", + table="users_in_public_rooms", keyvalues={"room_id": room_id}, - retcol="other_user_id", + retcol="user_id", desc="get_users_in_dir_due_to_room", ) @@ -290,18 +489,6 @@ class UserDirectoryStore(SQLBaseStore): defer.returnValue(user_ids) @defer.inlineCallbacks - def get_all_rooms(self): - """Get all room_ids we've ever known about, in ascending order of "size" - """ - sql = """ - SELECT room_id FROM current_state_events - GROUP BY room_id - ORDER BY count(*) ASC - """ - rows = yield self._execute("get_all_rooms", None, sql) - defer.returnValue([room_id for room_id, in rows]) - - @defer.inlineCallbacks def get_all_local_users(self): """Get all local users """ @@ -311,26 +498,19 @@ class UserDirectoryStore(SQLBaseStore): rows = yield self._execute("get_all_local_users", None, sql) defer.returnValue([name for name, in rows]) - def add_users_who_share_room(self, room_id, share_private, user_id_tuples): - """Insert entries into the users_who_share_*_rooms table. The first + def add_users_who_share_private_room(self, room_id, user_id_tuples): + """Insert entries into the users_who_share_private_rooms table. The first user should be a local user. Args: room_id (str) - share_private (bool): Is the room private user_id_tuples([(str, str)]): iterable of 2-tuple of user IDs. """ def _add_users_who_share_room_txn(txn): - - if share_private: - tbl = "users_who_share_private_rooms" - else: - tbl = "users_who_share_public_rooms" - self._simple_upsert_many_txn( txn, - table=tbl, + table="users_who_share_private_rooms", key_names=["user_id", "other_user_id", "room_id"], key_values=[ (user_id, other_user_id, room_id) @@ -339,15 +519,35 @@ class UserDirectoryStore(SQLBaseStore): value_names=(), value_values=None, ) - for user_id, other_user_id in user_id_tuples: - txn.call_after( - self.get_users_who_share_room_from_dir.invalidate, (user_id,) - ) return self.runInteraction( "add_users_who_share_room", _add_users_who_share_room_txn ) + def add_users_in_public_rooms(self, room_id, user_ids): + """Insert entries into the users_who_share_private_rooms table. The first + user should be a local user. + + Args: + room_id (str) + user_ids (list[str]) + """ + + def _add_users_in_public_rooms_txn(txn): + + self._simple_upsert_many_txn( + txn, + table="users_in_public_rooms", + key_names=["user_id", "room_id"], + key_values=[(user_id, room_id) for user_id in user_ids], + value_names=(), + value_values=None, + ) + + return self.runInteraction( + "add_users_in_public_rooms", _add_users_in_public_rooms_txn + ) + def remove_user_who_share_room(self, user_id, room_id): """ Deletes entries in the users_who_share_*_rooms table. The first @@ -371,25 +571,18 @@ class UserDirectoryStore(SQLBaseStore): ) self._simple_delete_txn( txn, - table="users_who_share_public_rooms", + table="users_in_public_rooms", keyvalues={"user_id": user_id, "room_id": room_id}, ) - self._simple_delete_txn( - txn, - table="users_who_share_public_rooms", - keyvalues={"other_user_id": user_id, "room_id": room_id}, - ) - txn.call_after( - self.get_users_who_share_room_from_dir.invalidate, (user_id,) - ) return self.runInteraction( "remove_user_who_share_room", _remove_user_who_share_room_txn ) - @cachedInlineCallbacks(max_entries=500000, iterable=True) - def get_users_who_share_room_from_dir(self, user_id): - """Returns the set of users who share a room with `user_id` + @defer.inlineCallbacks + def get_user_dir_rooms_user_is_in(self, user_id): + """ + Returns the rooms that a user is in. Args: user_id(str): Must be a local user @@ -400,23 +593,19 @@ class UserDirectoryStore(SQLBaseStore): rows = yield self._simple_select_onecol( table="users_who_share_private_rooms", keyvalues={"user_id": user_id}, - retcol="other_user_id", - desc="get_users_who_share_room_with_user", + retcol="room_id", + desc="get_rooms_user_is_in", ) pub_rows = yield self._simple_select_onecol( - table="users_who_share_public_rooms", + table="users_in_public_rooms", keyvalues={"user_id": user_id}, - retcol="other_user_id", - desc="get_users_who_share_room_with_user", + retcol="room_id", + desc="get_rooms_user_is_in", ) users = set(pub_rows) users.update(rows) - - # Remove the user themselves from this list. - users.discard(user_id) - defer.returnValue(list(users)) @defer.inlineCallbacks @@ -452,10 +641,9 @@ class UserDirectoryStore(SQLBaseStore): def _delete_all_from_user_dir_txn(txn): txn.execute("DELETE FROM user_directory") txn.execute("DELETE FROM user_directory_search") - txn.execute("DELETE FROM users_who_share_public_rooms") + txn.execute("DELETE FROM users_in_public_rooms") txn.execute("DELETE FROM users_who_share_private_rooms") txn.call_after(self.get_user_in_directory.invalidate_all) - txn.call_after(self.get_users_who_share_room_from_dir.invalidate_all) return self.runInteraction( "delete_all_from_user_dir", _delete_all_from_user_dir_txn @@ -560,23 +748,19 @@ class UserDirectoryStore(SQLBaseStore): """ if self.hs.config.user_directory_search_all_users: - # make s.user_id null to keep the ordering algorithm happy - join_clause = """ - CROSS JOIN (SELECT NULL as user_id) AS s - """ - join_args = () - where_clause = "1=1" + join_args = (user_id,) + where_clause = "user_id != ?" else: - join_clause = """ - LEFT JOIN ( - SELECT other_user_id AS user_id FROM users_who_share_public_rooms - UNION - SELECT other_user_id AS user_id FROM users_who_share_private_rooms - WHERE user_id = ? - ) AS p USING (user_id) - """ join_args = (user_id,) - where_clause = "p.user_id IS NOT NULL" + where_clause = """ + ( + EXISTS (select 1 from users_in_public_rooms WHERE user_id = t.user_id) + OR EXISTS ( + SELECT 1 FROM users_who_share_private_rooms + WHERE user_id = ? AND other_user_id = t.user_id + ) + ) + """ if isinstance(self.database_engine, PostgresEngine): full_query, exact_query, prefix_query = _parse_query_postgres(search_term) @@ -588,9 +772,8 @@ class UserDirectoryStore(SQLBaseStore): # search: (domain, _, display name, localpart) sql = """ SELECT d.user_id AS user_id, display_name, avatar_url - FROM user_directory_search + FROM user_directory_search as t INNER JOIN user_directory AS d USING (user_id) - %s WHERE %s AND vector @@ to_tsquery('english', ?) @@ -617,7 +800,6 @@ class UserDirectoryStore(SQLBaseStore): avatar_url IS NULL LIMIT ? """ % ( - join_clause, where_clause, ) args = join_args + (full_query, exact_query, prefix_query, limit + 1) @@ -626,9 +808,8 @@ class UserDirectoryStore(SQLBaseStore): sql = """ SELECT d.user_id AS user_id, display_name, avatar_url - FROM user_directory_search + FROM user_directory_search as t INNER JOIN user_directory AS d USING (user_id) - %s WHERE %s AND value MATCH ? @@ -638,7 +819,6 @@ class UserDirectoryStore(SQLBaseStore): avatar_url IS NULL LIMIT ? """ % ( - join_clause, where_clause, ) args = join_args + (search_query, limit + 1)