summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/databases/main/user_directory.py190
1 files changed, 114 insertions, 76 deletions
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index b7d58978de..a0319575f0 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -17,6 +17,7 @@ import re
 import unicodedata
 from typing import (
     TYPE_CHECKING,
+    Collection,
     Iterable,
     List,
     Mapping,
@@ -45,7 +46,7 @@ from synapse.util.stringutils import non_null_str_or_none
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
-from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules
+from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, UserTypes
 from synapse.storage.database import (
     DatabasePool,
     LoggingDatabaseConnection,
@@ -356,13 +357,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
         Add all local users to the user directory.
         """
 
-        def _get_next_batch(txn: LoggingTransaction) -> Optional[List[str]]:
-            sql = "SELECT user_id FROM %s LIMIT %s" % (
-                TEMP_TABLE + "_users",
-                str(batch_size),
-            )
-            txn.execute(sql)
-            user_result = cast(List[Tuple[str]], txn.fetchall())
+        def _populate_user_directory_process_users_txn(
+            txn: LoggingTransaction,
+        ) -> Optional[int]:
+            if self.database_engine.supports_returning:
+                # Note: we use an ORDER BY in the SELECT to force usage of an
+                # index. Otherwise, postgres does a sequential scan that is
+                # surprisingly slow (I think due to the fact it will read/skip
+                # over lots of already deleted rows).
+                sql = f"""
+                    DELETE FROM {TEMP_TABLE + "_users"}
+                    WHERE user_id IN (
+                        SELECT user_id FROM {TEMP_TABLE + "_users"} ORDER BY user_id LIMIT ?
+                    )
+                    RETURNING user_id
+                """
+                txn.execute(sql, (batch_size,))
+                user_result = cast(List[Tuple[str]], txn.fetchall())
+            else:
+                sql = "SELECT user_id FROM %s ORDER BY user_id LIMIT %s" % (
+                    TEMP_TABLE + "_users",
+                    str(batch_size),
+                )
+                txn.execute(sql)
+                user_result = cast(List[Tuple[str]], txn.fetchall())
 
             if not user_result:
                 return None
@@ -378,85 +396,81 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
                 assert count_result is not None
                 progress["remaining"] = count_result[0]
 
-            return users_to_work_on
-
-        users_to_work_on = await self.db_pool.runInteraction(
-            "populate_user_directory_temp_read", _get_next_batch
-        )
+            if not users_to_work_on:
+                return None
 
-        # No more users -- complete the transaction.
-        if not users_to_work_on:
-            await self.db_pool.updates._end_background_update(
-                "populate_user_directory_process_users"
+            logger.debug(
+                "Processing the next %d users of %d remaining",
+                len(users_to_work_on),
+                progress["remaining"],
             )
-            return 1
-
-        logger.debug(
-            "Processing the next %d users of %d remaining"
-            % (len(users_to_work_on), progress["remaining"])
-        )
 
-        # First filter down to users we want to insert into the user directory.
-        users_to_insert = [
-            user_id
-            for user_id in users_to_work_on
-            if await self.should_include_local_user_in_dir(user_id)
-        ]
+            # First filter down to users we want to insert into the user directory.
+            users_to_insert = self._filter_local_users_for_dir_txn(
+                txn, users_to_work_on
+            )
 
-        # Next fetch their profiles. Note that the `user_id` here is the
-        # *localpart*, and that not all users have profiles.
-        profile_rows = await self.db_pool.simple_select_many_batch(
-            table="profiles",
-            column="user_id",
-            iterable=[get_localpart_from_id(u) for u in users_to_insert],
-            retcols=(
-                "user_id",
-                "displayname",
-                "avatar_url",
-            ),
-            keyvalues={},
-            desc="populate_user_directory_process_users_get_profiles",
-        )
-        profiles = {
-            f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
-                f"@{row['user_id']}:{self.server_name}",
-                row["displayname"],
-                row["avatar_url"],
+            # Next fetch their profiles. Note that the `user_id` here is the
+            # *localpart*, and that not all users have profiles.
+            profile_rows = self.db_pool.simple_select_many_txn(
+                txn,
+                table="profiles",
+                column="user_id",
+                iterable=[get_localpart_from_id(u) for u in users_to_insert],
+                retcols=(
+                    "user_id",
+                    "displayname",
+                    "avatar_url",
+                ),
+                keyvalues={},
             )
-            for row in profile_rows
-        }
+            profiles = {
+                f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
+                    f"@{row['user_id']}:{self.server_name}",
+                    row["displayname"],
+                    row["avatar_url"],
+                )
+                for row in profile_rows
+            }
 
-        profiles_to_insert = [
-            profiles.get(user_id) or _UserDirProfile(user_id)
-            for user_id in users_to_insert
-        ]
+            profiles_to_insert = [
+                profiles.get(user_id) or _UserDirProfile(user_id)
+                for user_id in users_to_insert
+            ]
+
+            # Actually insert the users with their profiles into the directory.
+            self._update_profiles_in_user_dir_txn(txn, profiles_to_insert)
+
+            # We've finished processing the users. Delete it from the table, if
+            # we haven't already.
+            if not self.database_engine.supports_returning:
+                self.db_pool.simple_delete_many_txn(
+                    txn,
+                    table=TEMP_TABLE + "_users",
+                    column="user_id",
+                    values=users_to_work_on,
+                    keyvalues={},
+                )
 
-        # Actually insert the users with their profiles into the directory.
-        await self.db_pool.runInteraction(
-            "populate_user_directory_process_users_insertion",
-            self._update_profiles_in_user_dir_txn,
-            profiles_to_insert,
-        )
+            # Update the remaining counter.
+            progress["remaining"] -= len(users_to_work_on)
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "populate_user_directory_process_users", progress
+            )
+            return len(users_to_work_on)
 
-        # We've finished processing the users. Delete it from the table.
-        await self.db_pool.simple_delete_many(
-            table=TEMP_TABLE + "_users",
-            column="user_id",
-            iterable=users_to_work_on,
-            keyvalues={},
-            desc="populate_user_directory_process_users_delete",
+        processed_count = await self.db_pool.runInteraction(
+            "populate_user_directory_temp", _populate_user_directory_process_users_txn
         )
 
-        # Update the remaining counter.
-        progress["remaining"] -= len(users_to_work_on)
-        await self.db_pool.runInteraction(
-            "populate_user_directory",
-            self.db_pool.updates._background_update_progress_txn,
-            "populate_user_directory_process_users",
-            progress,
-        )
+        # No more users -- complete the transaction.
+        if not processed_count:
+            await self.db_pool.updates._end_background_update(
+                "populate_user_directory_process_users"
+            )
+            return 1
 
-        return len(users_to_work_on)
+        return processed_count
 
     async def should_include_local_user_in_dir(self, user: str) -> bool:
         """Certain classes of local user are omitted from the user directory.
@@ -494,6 +508,30 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
 
         return True
 
+    def _filter_local_users_for_dir_txn(
+        self, txn: LoggingTransaction, users: Collection[str]
+    ) -> Collection[str]:
+        """A batched version of `should_include_local_user_in_dir`"""
+        users = [
+            user
+            for user in users
+            if self.get_app_service_by_user_id(user) is None  # type: ignore[attr-defined]
+            and not self.get_if_app_services_interested_in_user(user)  # type: ignore[attr-defined]
+        ]
+
+        rows = self.db_pool.simple_select_many_txn(
+            txn,
+            table="users",
+            column="name",
+            iterable=users,
+            keyvalues={
+                "deactivated": 0,
+            },
+            retcols=("name", "user_type"),
+        )
+
+        return [row["name"] for row in rows if row["user_type"] != UserTypes.SUPPORT]
+
     async def is_room_world_readable_or_publicly_joinable(self, room_id: str) -> bool:
         """Check if the room is either world_readable or publically joinable"""