diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 3610b6bf78..28a92d41d6 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -13,21 +13,52 @@
# limitations under the License.
import logging
+from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
+from twisted.internet.interfaces import IDelayedCall
+
import synapse.metrics
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
+from synapse.api.errors import Codes, SynapseError
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.user_directory import SearchResult
from synapse.storage.roommember import ProfileInfo
+from synapse.types import UserID
from synapse.util.metrics import Measure
+from synapse.util.retryutils import NotRetryingDestination
+from synapse.util.stringutils import non_null_str_or_none
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
+# Don't refresh a stale user directory entry, using a Federation /profile request,
+# for 60 seconds. This gives time for other state events to arrive (which will
+# then be coalesced such that only one /profile request is made).
+USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000
+
+# Maximum number of remote servers that we will attempt to refresh profiles for
+# in one go.
+MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5
+
+# As long as we have servers to refresh (without backoff), keep adding more
+# every 15 seconds.
+INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15
+
+
+def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int:
+ """
+ Calculates the time of a next retry given `now_ts` in ms and the number
+ of failures encountered thus far.
+
+ Currently the sequence goes:
+ 1 min, 5 min, 25 min, 2 hour, 10 hour, 52 hour, 10 day, 7.75 week
+ """
+ return now_ts + 60_000 * (5 ** min(retry_count, 7))
+
class UserDirectoryHandler(StateDeltasHandler):
"""Handles queries and updates for the user_directory.
@@ -64,12 +95,24 @@ class UserDirectoryHandler(StateDeltasHandler):
self.update_user_directory = hs.config.worker.should_update_user_directory
self.search_all_users = hs.config.userdirectory.user_directory_search_all_users
self.spam_checker = hs.get_spam_checker()
+ self._hs = hs
+
# The current position in the current_state_delta stream
self.pos: Optional[int] = None
# Guard to ensure we only process deltas one at a time
self._is_processing = False
+ # Guard to ensure we only have one process for refreshing remote profiles
+ self._is_refreshing_remote_profiles = False
+ # Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process`
+ self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None
+
+ # Guard to ensure we only have one process for refreshing remote profiles
+ # for the given servers.
+ # Set of server names.
+ self._is_refreshing_remote_profiles_for_servers: Set[str] = set()
+
if self.update_user_directory:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -77,6 +120,11 @@ class UserDirectoryHandler(StateDeltasHandler):
# we start populating the user directory
self.clock.call_later(0, self.notify_new_event)
+ # Kick off the profile refresh process on startup
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ 10, self.kick_off_remote_profile_refresh_process
+ )
+
async def search_users(
self, user_id: str, search_term: str, limit: int
) -> SearchResult:
@@ -200,8 +248,8 @@ class UserDirectoryHandler(StateDeltasHandler):
typ = delta["type"]
state_key = delta["state_key"]
room_id = delta["room_id"]
- event_id = delta["event_id"]
- prev_event_id = delta["prev_event_id"]
+ event_id: Optional[str] = delta["event_id"]
+ prev_event_id: Optional[str] = delta["prev_event_id"]
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
@@ -297,8 +345,8 @@ class UserDirectoryHandler(StateDeltasHandler):
async def _handle_room_membership_event(
self,
room_id: str,
- prev_event_id: str,
- event_id: str,
+ prev_event_id: Optional[str],
+ event_id: Optional[str],
state_key: str,
) -> None:
"""Process a single room membershp event.
@@ -348,7 +396,8 @@ class UserDirectoryHandler(StateDeltasHandler):
# Handle any profile changes for remote users.
# (For local users the rest of the application calls
# `handle_local_profile_change`.)
- if is_remote:
+ # Only process if there is an event_id.
+ if is_remote and event_id is not None:
await self._handle_possible_remote_profile_change(
state_key, room_id, prev_event_id, event_id
)
@@ -356,29 +405,13 @@ class UserDirectoryHandler(StateDeltasHandler):
# This may be the first time we've seen a remote user. If
# so, ensure we have a directory entry for them. (For local users,
# the rest of the application calls `handle_local_profile_change`.)
- if is_remote:
- await self._upsert_directory_entry_for_remote_user(state_key, event_id)
+ # Only process if there is an event_id.
+ if is_remote and event_id is not None:
+ await self._handle_possible_remote_profile_change(
+ state_key, room_id, None, event_id
+ )
await self._track_user_joined_room(room_id, state_key)
- async def _upsert_directory_entry_for_remote_user(
- self, user_id: str, event_id: str
- ) -> None:
- """A remote user has just joined a room. Ensure they have an entry in
- the user directory. The caller is responsible for making sure they're
- remote.
- """
- event = await self.store.get_event(event_id, allow_none=True)
- # It isn't expected for this event to not exist, but we
- # don't want the entire background process to break.
- if event is None:
- return
-
- logger.debug("Adding new user to dir, %r", user_id)
-
- await self.store.update_profile_in_user_dir(
- user_id, event.content.get("displayname"), event.content.get("avatar_url")
- )
-
async def _track_user_joined_room(self, room_id: str, joining_user_id: str) -> None:
"""Someone's just joined a room. Update `users_in_public_rooms` or
`users_who_share_private_rooms` as appropriate.
@@ -460,14 +493,17 @@ class UserDirectoryHandler(StateDeltasHandler):
user_id: str,
room_id: str,
prev_event_id: Optional[str],
- event_id: Optional[str],
+ event_id: str,
) -> None:
"""Check member event changes for any profile changes and update the
database if there are. This is intended for remote users only. The caller
is responsible for checking that the given user is remote.
"""
- if not prev_event_id or not event_id:
- return
+
+ if not prev_event_id:
+ # If we don't have an older event to fall back on, just fetch the same
+ # event itself.
+ prev_event_id = event_id
prev_event = await self.store.get_event(prev_event_id, allow_none=True)
event = await self.store.get_event(event_id, allow_none=True)
@@ -478,17 +514,236 @@ class UserDirectoryHandler(StateDeltasHandler):
if event.membership != Membership.JOIN:
return
+ is_public = await self.store.is_room_world_readable_or_publicly_joinable(
+ room_id
+ )
+ if not is_public:
+ # Don't collect user profiles from private rooms as they are not guaranteed
+ # to be the same as the user's global profile.
+ now_ts = self.clock.time_msec()
+ await self.store.set_remote_user_profile_in_user_dir_stale(
+ user_id,
+ next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS,
+ retry_counter=0,
+ )
+ # Schedule a wake-up to refresh the user directory for this server.
+ # We intentionally wake up this server directly because we don't want
+ # other servers ahead of it in the queue to get in the way of updating
+ # the profile if the server only just sent us an event.
+ self.clock.call_later(
+ USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
+ self.kick_off_remote_profile_refresh_process_for_remote_server,
+ UserID.from_string(user_id).domain,
+ )
+ # Schedule a wake-up to handle any backoffs that may occur in the future.
+ self.clock.call_later(
+ 2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
+ self.kick_off_remote_profile_refresh_process,
+ )
+ return
+
prev_name = prev_event.content.get("displayname")
new_name = event.content.get("displayname")
- # If the new name is an unexpected form, do not update the directory.
+ # If the new name is an unexpected form, replace with None.
if not isinstance(new_name, str):
- new_name = prev_name
+ new_name = None
prev_avatar = prev_event.content.get("avatar_url")
new_avatar = event.content.get("avatar_url")
- # If the new avatar is an unexpected form, do not update the directory.
+ # If the new avatar is an unexpected form, replace with None.
if not isinstance(new_avatar, str):
- new_avatar = prev_avatar
+ new_avatar = None
- if prev_name != new_name or prev_avatar != new_avatar:
+ if (
+ prev_name != new_name
+ or prev_avatar != new_avatar
+ or prev_event_id == event_id
+ ):
+ # Only update if something has changed, or we didn't have a previous event
+ # in the first place.
await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
+
+ def kick_off_remote_profile_refresh_process(self) -> None:
+ """Called when there may be remote users with stale profiles to be refreshed"""
+ if not self.update_user_directory:
+ return
+
+ if self._is_refreshing_remote_profiles:
+ return
+
+ if self._refresh_remote_profiles_call_later:
+ if self._refresh_remote_profiles_call_later.active():
+ self._refresh_remote_profiles_call_later.cancel()
+ self._refresh_remote_profiles_call_later = None
+
+ async def process() -> None:
+ try:
+ await self._unsafe_refresh_remote_profiles()
+ finally:
+ self._is_refreshing_remote_profiles = False
+
+ self._is_refreshing_remote_profiles = True
+ run_as_background_process("user_directory.refresh_remote_profiles", process)
+
+ async def _unsafe_refresh_remote_profiles(self) -> None:
+ limit = MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO - len(
+ self._is_refreshing_remote_profiles_for_servers
+ )
+ if limit <= 0:
+ # nothing to do: already refreshing the maximum number of servers
+ # at once.
+ # Come back later.
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
+ self.kick_off_remote_profile_refresh_process,
+ )
+ return
+
+ servers_to_refresh = (
+ await self.store.get_remote_servers_with_profiles_to_refresh(
+ now_ts=self.clock.time_msec(), limit=limit
+ )
+ )
+
+ if not servers_to_refresh:
+ # Do we have any backing-off servers that we should try again
+ # for eventually?
+ # By setting `now` is a point in the far future, we can ask for
+ # which server/user is next to be refreshed, even though it is
+ # not actually refreshable *now*.
+ end_of_time = 1 << 62
+ backing_off_servers = (
+ await self.store.get_remote_servers_with_profiles_to_refresh(
+ now_ts=end_of_time, limit=1
+ )
+ )
+ if backing_off_servers:
+ # Find out when the next user is refreshable and schedule a
+ # refresh then.
+ backing_off_server_name = backing_off_servers[0]
+ users = await self.store.get_remote_users_to_refresh_on_server(
+ backing_off_server_name, now_ts=end_of_time, limit=1
+ )
+ if not users:
+ return
+ _, _, next_try_at_ts = users[0]
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2,
+ self.kick_off_remote_profile_refresh_process,
+ )
+
+ return
+
+ for server_to_refresh in servers_to_refresh:
+ self.kick_off_remote_profile_refresh_process_for_remote_server(
+ server_to_refresh
+ )
+
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
+ self.kick_off_remote_profile_refresh_process,
+ )
+
+ def kick_off_remote_profile_refresh_process_for_remote_server(
+ self, server_name: str
+ ) -> None:
+ """Called when there may be remote users with stale profiles to be refreshed
+ on the given server."""
+ if not self.update_user_directory:
+ return
+
+ if server_name in self._is_refreshing_remote_profiles_for_servers:
+ return
+
+ async def process() -> None:
+ try:
+ await self._unsafe_refresh_remote_profiles_for_remote_server(
+ server_name
+ )
+ finally:
+ self._is_refreshing_remote_profiles_for_servers.remove(server_name)
+
+ self._is_refreshing_remote_profiles_for_servers.add(server_name)
+ run_as_background_process(
+ "user_directory.refresh_remote_profiles_for_remote_server", process
+ )
+
+ async def _unsafe_refresh_remote_profiles_for_remote_server(
+ self, server_name: str
+ ) -> None:
+ logger.info("Refreshing profiles in user directory for %s", server_name)
+
+ while True:
+ # Get a handful of users to process.
+ next_batch = await self.store.get_remote_users_to_refresh_on_server(
+ server_name, now_ts=self.clock.time_msec(), limit=10
+ )
+ if not next_batch:
+ # Finished for now
+ return
+
+ for user_id, retry_counter, _ in next_batch:
+ # Request the profile of the user.
+ try:
+ profile = await self._hs.get_profile_handler().get_profile(
+ user_id, ignore_backoff=False
+ )
+ except NotRetryingDestination as e:
+ logger.info(
+ "Failed to refresh profile for %r because the destination is undergoing backoff",
+ user_id,
+ )
+ # As a special-case, we back off until the destination is no longer
+ # backed off from.
+ await self.store.set_remote_user_profile_in_user_dir_stale(
+ user_id,
+ e.retry_last_ts + e.retry_interval,
+ retry_counter=retry_counter + 1,
+ )
+ continue
+ except SynapseError as e:
+ if e.code == HTTPStatus.NOT_FOUND and e.errcode == Codes.NOT_FOUND:
+ # The profile doesn't exist.
+ # TODO Does this mean we should clear it from our user
+ # directory?
+ await self.store.clear_remote_user_profile_in_user_dir_stale(
+ user_id
+ )
+ logger.warning(
+ "Refresh of remote profile %r: not found (%r)",
+ user_id,
+ e.msg,
+ )
+ continue
+
+ logger.warning(
+ "Failed to refresh profile for %r because %r", user_id, e
+ )
+ await self.store.set_remote_user_profile_in_user_dir_stale(
+ user_id,
+ calculate_time_of_next_retry(
+ self.clock.time_msec(), retry_counter + 1
+ ),
+ retry_counter=retry_counter + 1,
+ )
+ continue
+ except Exception:
+ logger.error(
+ "Failed to refresh profile for %r due to unhandled exception",
+ user_id,
+ exc_info=True,
+ )
+ await self.store.set_remote_user_profile_in_user_dir_stale(
+ user_id,
+ calculate_time_of_next_retry(
+ self.clock.time_msec(), retry_counter + 1
+ ),
+ retry_counter=retry_counter + 1,
+ )
+ continue
+
+ await self.store.update_profile_in_user_dir(
+ user_id,
+ display_name=non_null_str_or_none(profile.get("displayname")),
+ avatar_url=non_null_str_or_none(profile.get("avatar_url")),
+ )
|