summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/profile.py4
-rw-r--r--synapse/handlers/user_directory.py242
-rw-r--r--synapse/storage/databases/main/user_directory.py74
3 files changed, 318 insertions, 2 deletions
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 4bf9a047a3..9a81a77cbd 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -63,7 +63,7 @@ class ProfileHandler:
 
         self._third_party_rules = hs.get_third_party_event_rules()
 
-    async def get_profile(self, user_id: str) -> JsonDict:
+    async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict:
         target_user = UserID.from_string(user_id)
 
         if self.hs.is_mine(target_user):
@@ -81,7 +81,7 @@ class ProfileHandler:
                     destination=target_user.domain,
                     query_type="profile",
                     args={"user_id": user_id},
-                    ignore_backoff=True,
+                    ignore_backoff=ignore_backoff,
                 )
                 return result
             except RequestSendFailed as e:
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 0815be79fa..28a92d41d6 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -13,15 +13,22 @@
 # limitations under the License.
 
 import logging
+from http import HTTPStatus
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
 
+from twisted.internet.interfaces import IDelayedCall
+
 import synapse.metrics
 from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
+from synapse.api.errors import Codes, SynapseError
 from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.databases.main.user_directory import SearchResult
 from synapse.storage.roommember import ProfileInfo
+from synapse.types import UserID
 from synapse.util.metrics import Measure
+from synapse.util.retryutils import NotRetryingDestination
+from synapse.util.stringutils import non_null_str_or_none
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -33,6 +40,25 @@ logger = logging.getLogger(__name__)
 # then be coalesced such that only one /profile request is made).
 USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000
 
+# Maximum number of remote servers that we will attempt to refresh profiles for
+# in one go.
+MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5
+
+# As long as we have servers to refresh (without backoff), keep adding more
+# every 15 seconds.
+INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15
+
+
+def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int:
+    """
+    Calculates the time of a next retry given `now_ts` in ms and the number
+    of failures encountered thus far.
+
+    Currently the sequence goes:
+    1 min, 5 min, 25 min, 2 hour, 10 hour, 52 hour, 10 day, 7.75 week
+    """
+    return now_ts + 60_000 * (5 ** min(retry_count, 7))
+
 
 class UserDirectoryHandler(StateDeltasHandler):
     """Handles queries and updates for the user_directory.
@@ -69,12 +95,24 @@ class UserDirectoryHandler(StateDeltasHandler):
         self.update_user_directory = hs.config.worker.should_update_user_directory
         self.search_all_users = hs.config.userdirectory.user_directory_search_all_users
         self.spam_checker = hs.get_spam_checker()
+        self._hs = hs
+
         # The current position in the current_state_delta stream
         self.pos: Optional[int] = None
 
         # Guard to ensure we only process deltas one at a time
         self._is_processing = False
 
+        # Guard to ensure we only have one process for refreshing remote profiles
+        self._is_refreshing_remote_profiles = False
+        # Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process`
+        self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None
+
+        # Guard to ensure we only have one process for refreshing remote profiles
+        # for the given servers.
+        # Set of server names.
+        self._is_refreshing_remote_profiles_for_servers: Set[str] = set()
+
         if self.update_user_directory:
             self.notifier.add_replication_callback(self.notify_new_event)
 
@@ -82,6 +120,11 @@ class UserDirectoryHandler(StateDeltasHandler):
             # we start populating the user directory
             self.clock.call_later(0, self.notify_new_event)
 
+            # Kick off the profile refresh process on startup
+            self._refresh_remote_profiles_call_later = self.clock.call_later(
+                10, self.kick_off_remote_profile_refresh_process
+            )
+
     async def search_users(
         self, user_id: str, search_term: str, limit: int
     ) -> SearchResult:
@@ -483,6 +526,20 @@ class UserDirectoryHandler(StateDeltasHandler):
                 next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS,
                 retry_counter=0,
             )
+            # Schedule a wake-up to refresh the user directory for this server.
+            # We intentionally wake up this server directly because we don't want
+            # other servers ahead of it in the queue to get in the way of updating
+            # the profile if the server only just sent us an event.
+            self.clock.call_later(
+                USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
+                self.kick_off_remote_profile_refresh_process_for_remote_server,
+                UserID.from_string(user_id).domain,
+            )
+            # Schedule a wake-up to handle any backoffs that may occur in the future.
+            self.clock.call_later(
+                2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
+                self.kick_off_remote_profile_refresh_process,
+            )
             return
 
         prev_name = prev_event.content.get("displayname")
@@ -505,3 +562,188 @@ class UserDirectoryHandler(StateDeltasHandler):
             # Only update if something has changed, or we didn't have a previous event
             # in the first place.
             await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
+
+    def kick_off_remote_profile_refresh_process(self) -> None:
+        """Called when there may be remote users with stale profiles to be refreshed"""
+        if not self.update_user_directory:
+            return
+
+        if self._is_refreshing_remote_profiles:
+            return
+
+        if self._refresh_remote_profiles_call_later:
+            if self._refresh_remote_profiles_call_later.active():
+                self._refresh_remote_profiles_call_later.cancel()
+            self._refresh_remote_profiles_call_later = None
+
+        async def process() -> None:
+            try:
+                await self._unsafe_refresh_remote_profiles()
+            finally:
+                self._is_refreshing_remote_profiles = False
+
+        self._is_refreshing_remote_profiles = True
+        run_as_background_process("user_directory.refresh_remote_profiles", process)
+
+    async def _unsafe_refresh_remote_profiles(self) -> None:
+        limit = MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO - len(
+            self._is_refreshing_remote_profiles_for_servers
+        )
+        if limit <= 0:
+            # nothing to do: already refreshing the maximum number of servers
+            # at once.
+            # Come back later.
+            self._refresh_remote_profiles_call_later = self.clock.call_later(
+                INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
+                self.kick_off_remote_profile_refresh_process,
+            )
+            return
+
+        servers_to_refresh = (
+            await self.store.get_remote_servers_with_profiles_to_refresh(
+                now_ts=self.clock.time_msec(), limit=limit
+            )
+        )
+
+        if not servers_to_refresh:
+            # Do we have any backing-off servers that we should try again
+            # for eventually?
+            # By setting `now` is a point in the far future, we can ask for
+            # which server/user is next to be refreshed, even though it is
+            # not actually refreshable *now*.
+            end_of_time = 1 << 62
+            backing_off_servers = (
+                await self.store.get_remote_servers_with_profiles_to_refresh(
+                    now_ts=end_of_time, limit=1
+                )
+            )
+            if backing_off_servers:
+                # Find out when the next user is refreshable and schedule a
+                # refresh then.
+                backing_off_server_name = backing_off_servers[0]
+                users = await self.store.get_remote_users_to_refresh_on_server(
+                    backing_off_server_name, now_ts=end_of_time, limit=1
+                )
+                if not users:
+                    return
+                _, _, next_try_at_ts = users[0]
+                self._refresh_remote_profiles_call_later = self.clock.call_later(
+                    ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2,
+                    self.kick_off_remote_profile_refresh_process,
+                )
+
+            return
+
+        for server_to_refresh in servers_to_refresh:
+            self.kick_off_remote_profile_refresh_process_for_remote_server(
+                server_to_refresh
+            )
+
+        self._refresh_remote_profiles_call_later = self.clock.call_later(
+            INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
+            self.kick_off_remote_profile_refresh_process,
+        )
+
+    def kick_off_remote_profile_refresh_process_for_remote_server(
+        self, server_name: str
+    ) -> None:
+        """Called when there may be remote users with stale profiles to be refreshed
+        on the given server."""
+        if not self.update_user_directory:
+            return
+
+        if server_name in self._is_refreshing_remote_profiles_for_servers:
+            return
+
+        async def process() -> None:
+            try:
+                await self._unsafe_refresh_remote_profiles_for_remote_server(
+                    server_name
+                )
+            finally:
+                self._is_refreshing_remote_profiles_for_servers.remove(server_name)
+
+        self._is_refreshing_remote_profiles_for_servers.add(server_name)
+        run_as_background_process(
+            "user_directory.refresh_remote_profiles_for_remote_server", process
+        )
+
+    async def _unsafe_refresh_remote_profiles_for_remote_server(
+        self, server_name: str
+    ) -> None:
+        logger.info("Refreshing profiles in user directory for %s", server_name)
+
+        while True:
+            # Get a handful of users to process.
+            next_batch = await self.store.get_remote_users_to_refresh_on_server(
+                server_name, now_ts=self.clock.time_msec(), limit=10
+            )
+            if not next_batch:
+                # Finished for now
+                return
+
+            for user_id, retry_counter, _ in next_batch:
+                # Request the profile of the user.
+                try:
+                    profile = await self._hs.get_profile_handler().get_profile(
+                        user_id, ignore_backoff=False
+                    )
+                except NotRetryingDestination as e:
+                    logger.info(
+                        "Failed to refresh profile for %r because the destination is undergoing backoff",
+                        user_id,
+                    )
+                    # As a special-case, we back off until the destination is no longer
+                    # backed off from.
+                    await self.store.set_remote_user_profile_in_user_dir_stale(
+                        user_id,
+                        e.retry_last_ts + e.retry_interval,
+                        retry_counter=retry_counter + 1,
+                    )
+                    continue
+                except SynapseError as e:
+                    if e.code == HTTPStatus.NOT_FOUND and e.errcode == Codes.NOT_FOUND:
+                        # The profile doesn't exist.
+                        # TODO Does this mean we should clear it from our user
+                        #      directory?
+                        await self.store.clear_remote_user_profile_in_user_dir_stale(
+                            user_id
+                        )
+                        logger.warning(
+                            "Refresh of remote profile %r: not found (%r)",
+                            user_id,
+                            e.msg,
+                        )
+                        continue
+
+                    logger.warning(
+                        "Failed to refresh profile for %r because %r", user_id, e
+                    )
+                    await self.store.set_remote_user_profile_in_user_dir_stale(
+                        user_id,
+                        calculate_time_of_next_retry(
+                            self.clock.time_msec(), retry_counter + 1
+                        ),
+                        retry_counter=retry_counter + 1,
+                    )
+                    continue
+                except Exception:
+                    logger.error(
+                        "Failed to refresh profile for %r due to unhandled exception",
+                        user_id,
+                        exc_info=True,
+                    )
+                    await self.store.set_remote_user_profile_in_user_dir_stale(
+                        user_id,
+                        calculate_time_of_next_retry(
+                            self.clock.time_msec(), retry_counter + 1
+                        ),
+                        retry_counter=retry_counter + 1,
+                    )
+                    continue
+
+                await self.store.update_profile_in_user_dir(
+                    user_id,
+                    display_name=non_null_str_or_none(profile.get("displayname")),
+                    avatar_url=non_null_str_or_none(profile.get("avatar_url")),
+                )
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 9cf01b7f36..97f09b73dd 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -504,6 +504,80 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             desc="set_remote_user_profile_in_user_dir_stale",
         )
 
+    async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None:
+        """
+        Marks a remote user as no longer having a possibly-stale user directory profile.
+
+        Args:
+            user_id: the remote user who no longer has a stale profile on this server.
+        """
+        await self.db_pool.simple_delete(
+            table="user_directory_stale_remote_users",
+            keyvalues={"user_id": user_id},
+            desc="clear_remote_user_profile_in_user_dir_stale",
+        )
+
+    async def get_remote_servers_with_profiles_to_refresh(
+        self, now_ts: int, limit: int
+    ) -> List[str]:
+        """
+        Get a list of up to `limit` server names which have users whose
+        locally-cached profiles we believe to be stale
+        and are refreshable given the current time `now_ts` in milliseconds.
+        """
+
+        def _get_remote_servers_with_refreshable_profiles_txn(
+            txn: LoggingTransaction,
+        ) -> List[str]:
+            sql = """
+                SELECT user_server_name
+                FROM user_directory_stale_remote_users
+                WHERE next_try_at_ts < ?
+                GROUP BY user_server_name
+                ORDER BY MIN(next_try_at_ts), user_server_name
+                LIMIT ?
+            """
+            txn.execute(sql, (now_ts, limit))
+            return [row[0] for row in txn]
+
+        return await self.db_pool.runInteraction(
+            "get_remote_servers_with_profiles_to_refresh",
+            _get_remote_servers_with_refreshable_profiles_txn,
+        )
+
+    async def get_remote_users_to_refresh_on_server(
+        self, server_name: str, now_ts: int, limit: int
+    ) -> List[Tuple[str, int, int]]:
+        """
+        Get a list of up to `limit` user IDs from the server `server_name`
+        whose locally-cached profiles we believe to be stale
+        and are refreshable given the current time `now_ts` in milliseconds.
+
+        Returns:
+            tuple of:
+                - User ID
+                - Retry counter (number of failures so far)
+                - Time the retry is scheduled for, in milliseconds
+        """
+
+        def _get_remote_users_to_refresh_on_server_txn(
+            txn: LoggingTransaction,
+        ) -> List[Tuple[str, int, int]]:
+            sql = """
+                SELECT user_id, retry_counter, next_try_at_ts
+                FROM user_directory_stale_remote_users
+                WHERE user_server_name = ? AND next_try_at_ts < ?
+                ORDER BY next_try_at_ts
+                LIMIT ?
+            """
+            txn.execute(sql, (server_name, now_ts, limit))
+            return cast(List[Tuple[str, int, int]], txn.fetchall())
+
+        return await self.db_pool.runInteraction(
+            "get_remote_users_to_refresh_on_server",
+            _get_remote_users_to_refresh_on_server_txn,
+        )
+
     async def update_profile_in_user_dir(
         self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
     ) -> None: