diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 4bf9a047a3..9a81a77cbd 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -63,7 +63,7 @@ class ProfileHandler:
self._third_party_rules = hs.get_third_party_event_rules()
- async def get_profile(self, user_id: str) -> JsonDict:
+ async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict:
target_user = UserID.from_string(user_id)
if self.hs.is_mine(target_user):
@@ -81,7 +81,7 @@ class ProfileHandler:
destination=target_user.domain,
query_type="profile",
args={"user_id": user_id},
- ignore_backoff=True,
+ ignore_backoff=ignore_backoff,
)
return result
except RequestSendFailed as e:
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 0815be79fa..28a92d41d6 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -13,15 +13,22 @@
# limitations under the License.
import logging
+from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
+from twisted.internet.interfaces import IDelayedCall
+
import synapse.metrics
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
+from synapse.api.errors import Codes, SynapseError
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.user_directory import SearchResult
from synapse.storage.roommember import ProfileInfo
+from synapse.types import UserID
from synapse.util.metrics import Measure
+from synapse.util.retryutils import NotRetryingDestination
+from synapse.util.stringutils import non_null_str_or_none
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -33,6 +40,25 @@ logger = logging.getLogger(__name__)
# then be coalesced such that only one /profile request is made).
USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000
+# Maximum number of remote servers that we will attempt to refresh profiles for
+# in one go.
+MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5
+
+# As long as we have servers to refresh (without backoff), keep adding more
+# every 15 seconds.
+INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15
+
+
+def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int:
+ """
+ Calculates the time of a next retry given `now_ts` in ms and the number
+ of failures encountered thus far.
+
+ Currently the sequence goes:
+ 1 min, 5 min, 25 min, 2 hour, 10 hour, 52 hour, 10 day, 7.75 week
+ """
+ return now_ts + 60_000 * (5 ** min(retry_count, 7))
+
class UserDirectoryHandler(StateDeltasHandler):
"""Handles queries and updates for the user_directory.
@@ -69,12 +95,24 @@ class UserDirectoryHandler(StateDeltasHandler):
self.update_user_directory = hs.config.worker.should_update_user_directory
self.search_all_users = hs.config.userdirectory.user_directory_search_all_users
self.spam_checker = hs.get_spam_checker()
+ self._hs = hs
+
# The current position in the current_state_delta stream
self.pos: Optional[int] = None
# Guard to ensure we only process deltas one at a time
self._is_processing = False
+ # Guard to ensure we only have one process for refreshing remote profiles
+ self._is_refreshing_remote_profiles = False
+ # Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process`
+ self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None
+
+ # Guard to ensure we only have one process for refreshing remote profiles
+ # for the given servers.
+ # Set of server names.
+ self._is_refreshing_remote_profiles_for_servers: Set[str] = set()
+
if self.update_user_directory:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -82,6 +120,11 @@ class UserDirectoryHandler(StateDeltasHandler):
# we start populating the user directory
self.clock.call_later(0, self.notify_new_event)
+ # Kick off the profile refresh process on startup
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ 10, self.kick_off_remote_profile_refresh_process
+ )
+
async def search_users(
self, user_id: str, search_term: str, limit: int
) -> SearchResult:
@@ -483,6 +526,20 @@ class UserDirectoryHandler(StateDeltasHandler):
next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS,
retry_counter=0,
)
+ # Schedule a wake-up to refresh the user directory for this server.
+ # We intentionally wake up this server directly because we don't want
+ # other servers ahead of it in the queue to get in the way of updating
+ # the profile if the server only just sent us an event.
+ self.clock.call_later(
+ USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
+ self.kick_off_remote_profile_refresh_process_for_remote_server,
+ UserID.from_string(user_id).domain,
+ )
+ # Schedule a wake-up to handle any backoffs that may occur in the future.
+ self.clock.call_later(
+ 2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
+ self.kick_off_remote_profile_refresh_process,
+ )
return
prev_name = prev_event.content.get("displayname")
@@ -505,3 +562,188 @@ class UserDirectoryHandler(StateDeltasHandler):
# Only update if something has changed, or we didn't have a previous event
# in the first place.
await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
+
+ def kick_off_remote_profile_refresh_process(self) -> None:
+ """Called when there may be remote users with stale profiles to be refreshed"""
+ if not self.update_user_directory:
+ return
+
+ if self._is_refreshing_remote_profiles:
+ return
+
+ if self._refresh_remote_profiles_call_later:
+ if self._refresh_remote_profiles_call_later.active():
+ self._refresh_remote_profiles_call_later.cancel()
+ self._refresh_remote_profiles_call_later = None
+
+ async def process() -> None:
+ try:
+ await self._unsafe_refresh_remote_profiles()
+ finally:
+ self._is_refreshing_remote_profiles = False
+
+ self._is_refreshing_remote_profiles = True
+ run_as_background_process("user_directory.refresh_remote_profiles", process)
+
+ async def _unsafe_refresh_remote_profiles(self) -> None:
+ limit = MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO - len(
+ self._is_refreshing_remote_profiles_for_servers
+ )
+ if limit <= 0:
+ # nothing to do: already refreshing the maximum number of servers
+ # at once.
+ # Come back later.
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
+ self.kick_off_remote_profile_refresh_process,
+ )
+ return
+
+ servers_to_refresh = (
+ await self.store.get_remote_servers_with_profiles_to_refresh(
+ now_ts=self.clock.time_msec(), limit=limit
+ )
+ )
+
+ if not servers_to_refresh:
+ # Do we have any backing-off servers that we should try again
+ # for eventually?
+ # By setting `now` is a point in the far future, we can ask for
+ # which server/user is next to be refreshed, even though it is
+ # not actually refreshable *now*.
+ end_of_time = 1 << 62
+ backing_off_servers = (
+ await self.store.get_remote_servers_with_profiles_to_refresh(
+ now_ts=end_of_time, limit=1
+ )
+ )
+ if backing_off_servers:
+ # Find out when the next user is refreshable and schedule a
+ # refresh then.
+ backing_off_server_name = backing_off_servers[0]
+ users = await self.store.get_remote_users_to_refresh_on_server(
+ backing_off_server_name, now_ts=end_of_time, limit=1
+ )
+ if not users:
+ return
+ _, _, next_try_at_ts = users[0]
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2,
+ self.kick_off_remote_profile_refresh_process,
+ )
+
+ return
+
+ for server_to_refresh in servers_to_refresh:
+ self.kick_off_remote_profile_refresh_process_for_remote_server(
+ server_to_refresh
+ )
+
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
+ self.kick_off_remote_profile_refresh_process,
+ )
+
+ def kick_off_remote_profile_refresh_process_for_remote_server(
+ self, server_name: str
+ ) -> None:
+ """Called when there may be remote users with stale profiles to be refreshed
+ on the given server."""
+ if not self.update_user_directory:
+ return
+
+ if server_name in self._is_refreshing_remote_profiles_for_servers:
+ return
+
+ async def process() -> None:
+ try:
+ await self._unsafe_refresh_remote_profiles_for_remote_server(
+ server_name
+ )
+ finally:
+ self._is_refreshing_remote_profiles_for_servers.remove(server_name)
+
+ self._is_refreshing_remote_profiles_for_servers.add(server_name)
+ run_as_background_process(
+ "user_directory.refresh_remote_profiles_for_remote_server", process
+ )
+
+ async def _unsafe_refresh_remote_profiles_for_remote_server(
+ self, server_name: str
+ ) -> None:
+ logger.info("Refreshing profiles in user directory for %s", server_name)
+
+ while True:
+ # Get a handful of users to process.
+ next_batch = await self.store.get_remote_users_to_refresh_on_server(
+ server_name, now_ts=self.clock.time_msec(), limit=10
+ )
+ if not next_batch:
+ # Finished for now
+ return
+
+ for user_id, retry_counter, _ in next_batch:
+ # Request the profile of the user.
+ try:
+ profile = await self._hs.get_profile_handler().get_profile(
+ user_id, ignore_backoff=False
+ )
+ except NotRetryingDestination as e:
+ logger.info(
+ "Failed to refresh profile for %r because the destination is undergoing backoff",
+ user_id,
+ )
+ # As a special-case, we back off until the destination is no longer
+ # backed off from.
+ await self.store.set_remote_user_profile_in_user_dir_stale(
+ user_id,
+ e.retry_last_ts + e.retry_interval,
+ retry_counter=retry_counter + 1,
+ )
+ continue
+ except SynapseError as e:
+ if e.code == HTTPStatus.NOT_FOUND and e.errcode == Codes.NOT_FOUND:
+ # The profile doesn't exist.
+ # TODO Does this mean we should clear it from our user
+ # directory?
+ await self.store.clear_remote_user_profile_in_user_dir_stale(
+ user_id
+ )
+ logger.warning(
+ "Refresh of remote profile %r: not found (%r)",
+ user_id,
+ e.msg,
+ )
+ continue
+
+ logger.warning(
+ "Failed to refresh profile for %r because %r", user_id, e
+ )
+ await self.store.set_remote_user_profile_in_user_dir_stale(
+ user_id,
+ calculate_time_of_next_retry(
+ self.clock.time_msec(), retry_counter + 1
+ ),
+ retry_counter=retry_counter + 1,
+ )
+ continue
+ except Exception:
+ logger.error(
+ "Failed to refresh profile for %r due to unhandled exception",
+ user_id,
+ exc_info=True,
+ )
+ await self.store.set_remote_user_profile_in_user_dir_stale(
+ user_id,
+ calculate_time_of_next_retry(
+ self.clock.time_msec(), retry_counter + 1
+ ),
+ retry_counter=retry_counter + 1,
+ )
+ continue
+
+ await self.store.update_profile_in_user_dir(
+ user_id,
+ display_name=non_null_str_or_none(profile.get("displayname")),
+ avatar_url=non_null_str_or_none(profile.get("avatar_url")),
+ )
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 9cf01b7f36..97f09b73dd 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -504,6 +504,80 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
desc="set_remote_user_profile_in_user_dir_stale",
)
+ async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None:
+ """
+ Marks a remote user as no longer having a possibly-stale user directory profile.
+
+ Args:
+ user_id: the remote user who no longer has a stale profile on this server.
+ """
+ await self.db_pool.simple_delete(
+ table="user_directory_stale_remote_users",
+ keyvalues={"user_id": user_id},
+ desc="clear_remote_user_profile_in_user_dir_stale",
+ )
+
+ async def get_remote_servers_with_profiles_to_refresh(
+ self, now_ts: int, limit: int
+ ) -> List[str]:
+ """
+ Get a list of up to `limit` server names which have users whose
+ locally-cached profiles we believe to be stale
+ and are refreshable given the current time `now_ts` in milliseconds.
+ """
+
+ def _get_remote_servers_with_refreshable_profiles_txn(
+ txn: LoggingTransaction,
+ ) -> List[str]:
+ sql = """
+ SELECT user_server_name
+ FROM user_directory_stale_remote_users
+ WHERE next_try_at_ts < ?
+ GROUP BY user_server_name
+ ORDER BY MIN(next_try_at_ts), user_server_name
+ LIMIT ?
+ """
+ txn.execute(sql, (now_ts, limit))
+ return [row[0] for row in txn]
+
+ return await self.db_pool.runInteraction(
+ "get_remote_servers_with_profiles_to_refresh",
+ _get_remote_servers_with_refreshable_profiles_txn,
+ )
+
+ async def get_remote_users_to_refresh_on_server(
+ self, server_name: str, now_ts: int, limit: int
+ ) -> List[Tuple[str, int, int]]:
+ """
+ Get a list of up to `limit` user IDs from the server `server_name`
+ whose locally-cached profiles we believe to be stale
+ and are refreshable given the current time `now_ts` in milliseconds.
+
+ Returns:
+ tuple of:
+ - User ID
+ - Retry counter (number of failures so far)
+ - Time the retry is scheduled for, in milliseconds
+ """
+
+ def _get_remote_users_to_refresh_on_server_txn(
+ txn: LoggingTransaction,
+ ) -> List[Tuple[str, int, int]]:
+ sql = """
+ SELECT user_id, retry_counter, next_try_at_ts
+ FROM user_directory_stale_remote_users
+ WHERE user_server_name = ? AND next_try_at_ts < ?
+ ORDER BY next_try_at_ts
+ LIMIT ?
+ """
+ txn.execute(sql, (server_name, now_ts, limit))
+ return cast(List[Tuple[str, int, int]], txn.fetchall())
+
+ return await self.db_pool.runInteraction(
+ "get_remote_users_to_refresh_on_server",
+ _get_remote_users_to_refresh_on_server_txn,
+ )
+
async def update_profile_in_user_dir(
self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
) -> None:
|