diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index b7d58978de..a0319575f0 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -17,6 +17,7 @@ import re
import unicodedata
from typing import (
TYPE_CHECKING,
+ Collection,
Iterable,
List,
Mapping,
@@ -45,7 +46,7 @@ from synapse.util.stringutils import non_null_str_or_none
if TYPE_CHECKING:
from synapse.server import HomeServer
-from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
+from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, UserTypes
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
@@ -356,13 +357,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
Add all local users to the user directory.
"""
- def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
- sql = "SELECT user_id FROM %s LIMIT %s" % (
- TEMP_TABLE + "_users",
- str(batch_size),
- )
- txn.execute(sql)
- user_result = cast(List[Tuple[str]], txn.fetchall())
+ def _populate_user_directory_process_users_txn(
+ txn: LoggingTransaction,
+ ) -> Optional[int]:
+ if self.database_engine.supports_returning:
+ # Note: we use an ORDER BY in the SELECT to force usage of an
+ # index. Otherwise, postgres does a sequential scan that is
+ # surprisingly slow (I think due to the fact it will read/skip
+ # over lots of already deleted rows).
+ sql = f"""
+ DELETE FROM {TEMP_TABLE + "_users"}
+ WHERE user_id IN (
+ SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ?
+ )
+ RETURNING user_id
+ """
+ txn.execute(sql, (batch_size,))
+ user_result = cast(List[Tuple[str]], txn.fetchall())
+ else:
+ sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % (
+ TEMP_TABLE + "_users",
+ str(batch_size),
+ )
+ txn.execute(sql)
+ user_result = cast(List[Tuple[str]], txn.fetchall())
if not user_result:
return None
@@ -378,85 +396,81 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
assert count_result is not None
progress["remaining"] = count_result[0]
- return users_to_work_on
-
- users_to_work_on = await self.db_pool.runInteraction(
- "populate_user_directory_temp_read", _get_next_batch
- )
+ if not users_to_work_on:
+ return None
- # No more users -- complete the transaction.
- if not users_to_work_on:
- await self.db_pool.updates._end_background_update(
- "populate_user_directory_process_users"
+ logger.debug(
+ "Processing the next %d users of %d remaining",
+ len(users_to_work_on),
+ progress["remaining"],
)
- return 1
-
- logger.debug(
- "Processing the next %d users of %d remaining"
- % (len(users_to_work_on), progress["remaining"])
- )
- # First filter down to users we want to insert into the user directory.
- users_to_insert = [
- user_id
- for user_id in users_to_work_on
- if await self.should_include_local_user_in_dir(user_id)
- ]
+ # First filter down to users we want to insert into the user directory.
+ users_to_insert = self._filter_local_users_for_dir_txn(
+ txn, users_to_work_on
+ )
- # Next fetch their profiles. Note that the `user_id` here is the
- # *localpart*, and that not all users have profiles.
- profile_rows = await self.db_pool.simple_select_many_batch(
- table="profiles",
- column="user_id",
- iterable=[get_localpart_from_id(u) for u in users_to_insert],
- retcols=(
- "user_id",
- "displayname",
- "avatar_url",
- ),
- keyvalues={},
- desc="populate_user_directory_process_users_get_profiles",
- )
- profiles = {
- f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
- f"@{row['user_id']}:{self.server_name}",
- row["displayname"],
- row["avatar_url"],
+ # Next fetch their profiles. Note that the `user_id` here is the
+ # *localpart*, and that not all users have profiles.
+ profile_rows = self.db_pool.simple_select_many_txn(
+ txn,
+ table="profiles",
+ column="user_id",
+ iterable=[get_localpart_from_id(u) for u in users_to_insert],
+ retcols=(
+ "user_id",
+ "displayname",
+ "avatar_url",
+ ),
+ keyvalues={},
)
- for row in profile_rows
- }
+ profiles = {
+ f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
+ f"@{row['user_id']}:{self.server_name}",
+ row["displayname"],
+ row["avatar_url"],
+ )
+ for row in profile_rows
+ }
- profiles_to_insert = [
- profiles.get(user_id) or _UserDirProfile(user_id)
- for user_id in users_to_insert
- ]
+ profiles_to_insert = [
+ profiles.get(user_id) or _UserDirProfile(user_id)
+ for user_id in users_to_insert
+ ]
+
+ # Actually insert the users with their profiles into the directory.
+ self._update_profiles_in_user_dir_txn(txn, profiles_to_insert)
+
+ # We've finished processing the users. Delete it from the table, if
+ # we haven't already.
+ if not self.database_engine.supports_returning:
+ self.db_pool.simple_delete_many_txn(
+ txn,
+ table=TEMP_TABLE + "_users",
+ column="user_id",
+ values=users_to_work_on,
+ keyvalues={},
+ )
- # Actually insert the users with their profiles into the directory.
- await self.db_pool.runInteraction(
- "populate_user_directory_process_users_insertion",
- self._update_profiles_in_user_dir_txn,
- profiles_to_insert,
- )
+ # Update the remaining counter.
+ progress["remaining"] -= len(users_to_work_on)
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "populate_user_directory_process_users", progress
+ )
+ return len(users_to_work_on)
- # We've finished processing the users. Delete it from the table.
- await self.db_pool.simple_delete_many(
- table=TEMP_TABLE + "_users",
- column="user_id",
- iterable=users_to_work_on,
- keyvalues={},
- desc="populate_user_directory_process_users_delete",
+ processed_count = await self.db_pool.runInteraction(
+ "populate_user_directory_temp", _populate_user_directory_process_users_txn
)
- # Update the remaining counter.
- progress["remaining"] -= len(users_to_work_on)
- await self.db_pool.runInteraction(
- "populate_user_directory",
- self.db_pool.updates._background_update_progress_txn,
- "populate_user_directory_process_users",
- progress,
- )
+ # No more users -- complete the transaction.
+ if not processed_count:
+ await self.db_pool.updates._end_background_update(
+ "populate_user_directory_process_users"
+ )
+ return 1
- return len(users_to_work_on)
+ return processed_count
async def should_include_local_user_in_dir(self, user: str) -> bool:
"""Certain classes of local user are omitted from the user directory.
@@ -494,6 +508,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return True
+ def _filter_local_users_for_dir_txn(
+ self, txn: LoggingTransaction, users: Collection[str]
+ ) -> Collection[str]:
+ """A batched version of `should_include_local_user_in_dir`"""
+ users = [
+ user
+ for user in users
+ if self.get_app_service_by_user_id(user) is None # type: ignore[attr-defined]
+ and not self.get_if_app_services_interested_in_user(user) # type: ignore[attr-defined]
+ ]
+
+ rows = self.db_pool.simple_select_many_txn(
+ txn,
+ table="users",
+ column="name",
+ iterable=users,
+ keyvalues={
+ "deactivated": 0,
+ },
+ retcols=("name", "user_type"),
+ )
+
+ return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT]
+
async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
"""Check if the room is either world_readable or publically joinable"""
|