diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/user_directory.py | 190 |
1 files changed, 114 insertions, 76 deletions
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index b7d58978de..a0319575f0 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -17,6 +17,7 @@ import re import unicodedata from typing import ( TYPE_CHECKING, + Collection, Iterable, List, Mapping, @@ -45,7 +46,7 @@ from synapse.util.stringutils import non_null_str_or_none if TYPE_CHECKING: from synapse.server import HomeServer -from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules +from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, UserTypes from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -356,13 +357,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): Add all local users to the user directory. """ - def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]: - sql = "SELECT user_id FROM %s LIMIT %s" % ( - TEMP_TABLE + "_users", - str(batch_size), - ) - txn.execute(sql) - user_result = cast(List[Tuple[str]], txn.fetchall()) + def _populate_user_directory_process_users_txn( + txn: LoggingTransaction, + ) -> Optional[int]: + if self.database_engine.supports_returning: + # Note: we use an ORDER BY in the SELECT to force usage of an + # index. Otherwise, postgres does a sequential scan that is + # surprisingly slow (I think due to the fact it will read/skip + # over lots of already deleted rows). + sql = f""" + DELETE FROM {TEMP_TABLE + "_users"} + WHERE user_id IN ( + SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ? + ) + RETURNING user_id + """ + txn.execute(sql, (batch_size,)) + user_result = cast(List[Tuple[str]], txn.fetchall()) + else: + sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % ( + TEMP_TABLE + "_users", + str(batch_size), + ) + txn.execute(sql) + user_result = cast(List[Tuple[str]], txn.fetchall()) if not user_result: return None @@ -378,85 +396,81 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): assert count_result is not None progress["remaining"] = count_result[0] - return users_to_work_on - - users_to_work_on = await self.db_pool.runInteraction( - "populate_user_directory_temp_read", _get_next_batch - ) + if not users_to_work_on: + return None - # No more users -- complete the transaction. - if not users_to_work_on: - await self.db_pool.updates._end_background_update( - "populate_user_directory_process_users" + logger.debug( + "Processing the next %d users of %d remaining", + len(users_to_work_on), + progress["remaining"], ) - return 1 - - logger.debug( - "Processing the next %d users of %d remaining" - % (len(users_to_work_on), progress["remaining"]) - ) - # First filter down to users we want to insert into the user directory. - users_to_insert = [ - user_id - for user_id in users_to_work_on - if await self.should_include_local_user_in_dir(user_id) - ] + # First filter down to users we want to insert into the user directory. + users_to_insert = self._filter_local_users_for_dir_txn( + txn, users_to_work_on + ) - # Next fetch their profiles. Note that the `user_id` here is the - # *localpart*, and that not all users have profiles. - profile_rows = await self.db_pool.simple_select_many_batch( - table="profiles", - column="user_id", - iterable=[get_localpart_from_id(u) for u in users_to_insert], - retcols=( - "user_id", - "displayname", - "avatar_url", - ), - keyvalues={}, - desc="populate_user_directory_process_users_get_profiles", - ) - profiles = { - f"@{row['user_id']}:{self.server_name}": _UserDirProfile( - f"@{row['user_id']}:{self.server_name}", - row["displayname"], - row["avatar_url"], + # Next fetch their profiles. Note that the `user_id` here is the + # *localpart*, and that not all users have profiles. + profile_rows = self.db_pool.simple_select_many_txn( + txn, + table="profiles", + column="user_id", + iterable=[get_localpart_from_id(u) for u in users_to_insert], + retcols=( + "user_id", + "displayname", + "avatar_url", + ), + keyvalues={}, ) - for row in profile_rows - } + profiles = { + f"@{row['user_id']}:{self.server_name}": _UserDirProfile( + f"@{row['user_id']}:{self.server_name}", + row["displayname"], + row["avatar_url"], + ) + for row in profile_rows + } - profiles_to_insert = [ - profiles.get(user_id) or _UserDirProfile(user_id) - for user_id in users_to_insert - ] + profiles_to_insert = [ + profiles.get(user_id) or _UserDirProfile(user_id) + for user_id in users_to_insert + ] + + # Actually insert the users with their profiles into the directory. + self._update_profiles_in_user_dir_txn(txn, profiles_to_insert) + + # We've finished processing the users. Delete it from the table, if + # we haven't already. + if not self.database_engine.supports_returning: + self.db_pool.simple_delete_many_txn( + txn, + table=TEMP_TABLE + "_users", + column="user_id", + values=users_to_work_on, + keyvalues={}, + ) - # Actually insert the users with their profiles into the directory. - await self.db_pool.runInteraction( - "populate_user_directory_process_users_insertion", - self._update_profiles_in_user_dir_txn, - profiles_to_insert, - ) + # Update the remaining counter. + progress["remaining"] -= len(users_to_work_on) + self.db_pool.updates._background_update_progress_txn( + txn, "populate_user_directory_process_users", progress + ) + return len(users_to_work_on) - # We've finished processing the users. Delete it from the table. - await self.db_pool.simple_delete_many( - table=TEMP_TABLE + "_users", - column="user_id", - iterable=users_to_work_on, - keyvalues={}, - desc="populate_user_directory_process_users_delete", + processed_count = await self.db_pool.runInteraction( + "populate_user_directory_temp", _populate_user_directory_process_users_txn ) - # Update the remaining counter. - progress["remaining"] -= len(users_to_work_on) - await self.db_pool.runInteraction( - "populate_user_directory", - self.db_pool.updates._background_update_progress_txn, - "populate_user_directory_process_users", - progress, - ) + # No more users -- complete the transaction. + if not processed_count: + await self.db_pool.updates._end_background_update( + "populate_user_directory_process_users" + ) + return 1 - return len(users_to_work_on) + return processed_count async def should_include_local_user_in_dir(self, user: str) -> bool: """Certain classes of local user are omitted from the user directory. @@ -494,6 +508,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return True + def _filter_local_users_for_dir_txn( + self, txn: LoggingTransaction, users: Collection[str] + ) -> Collection[str]: + """A batched version of `should_include_local_user_in_dir`""" + users = [ + user + for user in users + if self.get_app_service_by_user_id(user) is None # type: ignore[attr-defined] + and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined] + ] + + rows = self.db_pool.simple_select_many_txn( + txn, + table="users", + column="name", + iterable=users, + keyvalues={ + "deactivated": 0, + }, + retcols=("name", "user_type"), + ) + + return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT] + async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool: """Check if the room is either world_readable or publically joinable""" |