diff --git a/changelog.d/14756.bugfix b/changelog.d/14756.bugfix
new file mode 100644
index 0000000000..12f979e9d0
--- /dev/null
+++ b/changelog.d/14756.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change.
\ No newline at end of file
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 4bf9a047a3..9a81a77cbd 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -63,7 +63,7 @@ class ProfileHandler:
self._third_party_rules = hs.get_third_party_event_rules()
- async def get_profile(self, user_id: str) -> JsonDict:
+ async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict:
target_user = UserID.from_string(user_id)
if self.hs.is_mine(target_user):
@@ -81,7 +81,7 @@ class ProfileHandler:
destination=target_user.domain,
query_type="profile",
args={"user_id": user_id},
- ignore_backoff=True,
+ ignore_backoff=ignore_backoff,
)
return result
except RequestSendFailed as e:
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 0815be79fa..28a92d41d6 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -13,15 +13,22 @@
# limitations under the License.
import logging
+from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
+from twisted.internet.interfaces import IDelayedCall
+
import synapse.metrics
from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
+from synapse.api.errors import Codes, SynapseError
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.databases.main.user_directory import SearchResult
from synapse.storage.roommember import ProfileInfo
+from synapse.types import UserID
from synapse.util.metrics import Measure
+from synapse.util.retryutils import NotRetryingDestination
+from synapse.util.stringutils import non_null_str_or_none
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -33,6 +40,25 @@ logger = logging.getLogger(__name__)
# then be coalesced such that only one /profile request is made).
USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000
+# Maximum number of remote servers that we will attempt to refresh profiles for
+# in one go.
+MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5
+
+# As long as we have servers to refresh (without backoff), keep adding more
+# every 15 seconds.
+INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15
+
+
+def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int:
+ """
+ Calculates the time of a next retry given `now_ts` in ms and the number
+ of failures encountered thus far.
+
+ Currently the sequence goes:
+ 1 min, 5 min, 25 min, 2 hour, 10 hour, 52 hour, 10 day, 7.75 week
+ """
+ return now_ts + 60_000 * (5 ** min(retry_count, 7))
+
class UserDirectoryHandler(StateDeltasHandler):
"""Handles queries and updates for the user_directory.
@@ -69,12 +95,24 @@ class UserDirectoryHandler(StateDeltasHandler):
self.update_user_directory = hs.config.worker.should_update_user_directory
self.search_all_users = hs.config.userdirectory.user_directory_search_all_users
self.spam_checker = hs.get_spam_checker()
+ self._hs = hs
+
# The current position in the current_state_delta stream
self.pos: Optional[int] = None
# Guard to ensure we only process deltas one at a time
self._is_processing = False
+ # Guard to ensure we only have one process for refreshing remote profiles
+ self._is_refreshing_remote_profiles = False
+ # Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process`
+ self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None
+
+ # Guard to ensure we only have one process for refreshing remote profiles
+ # for the given servers.
+ # Set of server names.
+ self._is_refreshing_remote_profiles_for_servers: Set[str] = set()
+
if self.update_user_directory:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -82,6 +120,11 @@ class UserDirectoryHandler(StateDeltasHandler):
# we start populating the user directory
self.clock.call_later(0, self.notify_new_event)
+ # Kick off the profile refresh process on startup
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ 10, self.kick_off_remote_profile_refresh_process
+ )
+
async def search_users(
self, user_id: str, search_term: str, limit: int
) -> SearchResult:
@@ -483,6 +526,20 @@ class UserDirectoryHandler(StateDeltasHandler):
next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS,
retry_counter=0,
)
+ # Schedule a wake-up to refresh the user directory for this server.
+ # We intentionally wake up this server directly because we don't want
+ # other servers ahead of it in the queue to get in the way of updating
+ # the profile if the server only just sent us an event.
+ self.clock.call_later(
+ USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
+ self.kick_off_remote_profile_refresh_process_for_remote_server,
+ UserID.from_string(user_id).domain,
+ )
+ # Schedule a wake-up to handle any backoffs that may occur in the future.
+ self.clock.call_later(
+ 2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
+ self.kick_off_remote_profile_refresh_process,
+ )
return
prev_name = prev_event.content.get("displayname")
@@ -505,3 +562,188 @@ class UserDirectoryHandler(StateDeltasHandler):
# Only update if something has changed, or we didn't have a previous event
# in the first place.
await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
+
+ def kick_off_remote_profile_refresh_process(self) -> None:
+ """Called when there may be remote users with stale profiles to be refreshed"""
+ if not self.update_user_directory:
+ return
+
+ if self._is_refreshing_remote_profiles:
+ return
+
+ if self._refresh_remote_profiles_call_later:
+ if self._refresh_remote_profiles_call_later.active():
+ self._refresh_remote_profiles_call_later.cancel()
+ self._refresh_remote_profiles_call_later = None
+
+ async def process() -> None:
+ try:
+ await self._unsafe_refresh_remote_profiles()
+ finally:
+ self._is_refreshing_remote_profiles = False
+
+ self._is_refreshing_remote_profiles = True
+ run_as_background_process("user_directory.refresh_remote_profiles", process)
+
+ async def _unsafe_refresh_remote_profiles(self) -> None:
+ limit = MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO - len(
+ self._is_refreshing_remote_profiles_for_servers
+ )
+ if limit <= 0:
+ # nothing to do: already refreshing the maximum number of servers
+ # at once.
+ # Come back later.
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
+ self.kick_off_remote_profile_refresh_process,
+ )
+ return
+
+ servers_to_refresh = (
+ await self.store.get_remote_servers_with_profiles_to_refresh(
+ now_ts=self.clock.time_msec(), limit=limit
+ )
+ )
+
+ if not servers_to_refresh:
+ # Do we have any backing-off servers that we should try again
+ # for eventually?
+ # By setting `now` is a point in the far future, we can ask for
+ # which server/user is next to be refreshed, even though it is
+ # not actually refreshable *now*.
+ end_of_time = 1 << 62
+ backing_off_servers = (
+ await self.store.get_remote_servers_with_profiles_to_refresh(
+ now_ts=end_of_time, limit=1
+ )
+ )
+ if backing_off_servers:
+ # Find out when the next user is refreshable and schedule a
+ # refresh then.
+ backing_off_server_name = backing_off_servers[0]
+ users = await self.store.get_remote_users_to_refresh_on_server(
+ backing_off_server_name, now_ts=end_of_time, limit=1
+ )
+ if not users:
+ return
+ _, _, next_try_at_ts = users[0]
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2,
+ self.kick_off_remote_profile_refresh_process,
+ )
+
+ return
+
+ for server_to_refresh in servers_to_refresh:
+ self.kick_off_remote_profile_refresh_process_for_remote_server(
+ server_to_refresh
+ )
+
+ self._refresh_remote_profiles_call_later = self.clock.call_later(
+ INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
+ self.kick_off_remote_profile_refresh_process,
+ )
+
+ def kick_off_remote_profile_refresh_process_for_remote_server(
+ self, server_name: str
+ ) -> None:
+ """Called when there may be remote users with stale profiles to be refreshed
+ on the given server."""
+ if not self.update_user_directory:
+ return
+
+ if server_name in self._is_refreshing_remote_profiles_for_servers:
+ return
+
+ async def process() -> None:
+ try:
+ await self._unsafe_refresh_remote_profiles_for_remote_server(
+ server_name
+ )
+ finally:
+ self._is_refreshing_remote_profiles_for_servers.remove(server_name)
+
+ self._is_refreshing_remote_profiles_for_servers.add(server_name)
+ run_as_background_process(
+ "user_directory.refresh_remote_profiles_for_remote_server", process
+ )
+
+ async def _unsafe_refresh_remote_profiles_for_remote_server(
+ self, server_name: str
+ ) -> None:
+ logger.info("Refreshing profiles in user directory for %s", server_name)
+
+ while True:
+ # Get a handful of users to process.
+ next_batch = await self.store.get_remote_users_to_refresh_on_server(
+ server_name, now_ts=self.clock.time_msec(), limit=10
+ )
+ if not next_batch:
+ # Finished for now
+ return
+
+ for user_id, retry_counter, _ in next_batch:
+ # Request the profile of the user.
+ try:
+ profile = await self._hs.get_profile_handler().get_profile(
+ user_id, ignore_backoff=False
+ )
+ except NotRetryingDestination as e:
+ logger.info(
+ "Failed to refresh profile for %r because the destination is undergoing backoff",
+ user_id,
+ )
+ # As a special-case, we back off until the destination is no longer
+ # backed off from.
+ await self.store.set_remote_user_profile_in_user_dir_stale(
+ user_id,
+ e.retry_last_ts + e.retry_interval,
+ retry_counter=retry_counter + 1,
+ )
+ continue
+ except SynapseError as e:
+ if e.code == HTTPStatus.NOT_FOUND and e.errcode == Codes.NOT_FOUND:
+ # The profile doesn't exist.
+ # TODO Does this mean we should clear it from our user
+ # directory?
+ await self.store.clear_remote_user_profile_in_user_dir_stale(
+ user_id
+ )
+ logger.warning(
+ "Refresh of remote profile %r: not found (%r)",
+ user_id,
+ e.msg,
+ )
+ continue
+
+ logger.warning(
+ "Failed to refresh profile for %r because %r", user_id, e
+ )
+ await self.store.set_remote_user_profile_in_user_dir_stale(
+ user_id,
+ calculate_time_of_next_retry(
+ self.clock.time_msec(), retry_counter + 1
+ ),
+ retry_counter=retry_counter + 1,
+ )
+ continue
+ except Exception:
+ logger.error(
+ "Failed to refresh profile for %r due to unhandled exception",
+ user_id,
+ exc_info=True,
+ )
+ await self.store.set_remote_user_profile_in_user_dir_stale(
+ user_id,
+ calculate_time_of_next_retry(
+ self.clock.time_msec(), retry_counter + 1
+ ),
+ retry_counter=retry_counter + 1,
+ )
+ continue
+
+ await self.store.update_profile_in_user_dir(
+ user_id,
+ display_name=non_null_str_or_none(profile.get("displayname")),
+ avatar_url=non_null_str_or_none(profile.get("avatar_url")),
+ )
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 9cf01b7f36..97f09b73dd 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -504,6 +504,80 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
desc="set_remote_user_profile_in_user_dir_stale",
)
+ async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None:
+ """
+ Marks a remote user as no longer having a possibly-stale user directory profile.
+
+ Args:
+ user_id: the remote user who no longer has a stale profile on this server.
+ """
+ await self.db_pool.simple_delete(
+ table="user_directory_stale_remote_users",
+ keyvalues={"user_id": user_id},
+ desc="clear_remote_user_profile_in_user_dir_stale",
+ )
+
+ async def get_remote_servers_with_profiles_to_refresh(
+ self, now_ts: int, limit: int
+ ) -> List[str]:
+ """
+ Get a list of up to `limit` server names which have users whose
+ locally-cached profiles we believe to be stale
+ and are refreshable given the current time `now_ts` in milliseconds.
+ """
+
+ def _get_remote_servers_with_refreshable_profiles_txn(
+ txn: LoggingTransaction,
+ ) -> List[str]:
+ sql = """
+ SELECT user_server_name
+ FROM user_directory_stale_remote_users
+ WHERE next_try_at_ts < ?
+ GROUP BY user_server_name
+ ORDER BY MIN(next_try_at_ts), user_server_name
+ LIMIT ?
+ """
+ txn.execute(sql, (now_ts, limit))
+ return [row[0] for row in txn]
+
+ return await self.db_pool.runInteraction(
+ "get_remote_servers_with_profiles_to_refresh",
+ _get_remote_servers_with_refreshable_profiles_txn,
+ )
+
+ async def get_remote_users_to_refresh_on_server(
+ self, server_name: str, now_ts: int, limit: int
+ ) -> List[Tuple[str, int, int]]:
+ """
+ Get a list of up to `limit` user IDs from the server `server_name`
+ whose locally-cached profiles we believe to be stale
+ and are refreshable given the current time `now_ts` in milliseconds.
+
+ Returns:
+ tuple of:
+ - User ID
+ - Retry counter (number of failures so far)
+ - Time the retry is scheduled for, in milliseconds
+ """
+
+ def _get_remote_users_to_refresh_on_server_txn(
+ txn: LoggingTransaction,
+ ) -> List[Tuple[str, int, int]]:
+ sql = """
+ SELECT user_id, retry_counter, next_try_at_ts
+ FROM user_directory_stale_remote_users
+ WHERE user_server_name = ? AND next_try_at_ts < ?
+ ORDER BY next_try_at_ts
+ LIMIT ?
+ """
+ txn.execute(sql, (server_name, now_ts, limit))
+ return cast(List[Tuple[str, int, int]], txn.fetchall())
+
+ return await self.db_pool.runInteraction(
+ "get_remote_users_to_refresh_on_server",
+ _get_remote_users_to_refresh_on_server_txn,
+ )
+
async def update_profile_in_user_dir(
self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
) -> None:
diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py
index a02c1c6227..da4d240826 100644
--- a/tests/handlers/test_user_directory.py
+++ b/tests/handlers/test_user_directory.py
@@ -19,17 +19,18 @@ from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.constants import UserTypes
+from synapse.api.errors import SynapseError
from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.appservice import ApplicationService
from synapse.rest.client import login, register, room, user_directory
from synapse.server import HomeServer
from synapse.storage.roommember import ProfileInfo
-from synapse.types import UserProfile, create_requester
+from synapse.types import JsonDict, UserProfile, create_requester
from synapse.util import Clock
from tests import unittest
from tests.storage.test_user_directory import GetUserDirectoryTables
-from tests.test_utils import make_awaitable
+from tests.test_utils import event_injection, make_awaitable
from tests.test_utils.event_injection import inject_member_event
from tests.unittest import override_config
@@ -1103,3 +1104,185 @@ class TestUserDirSearchDisabled(unittest.HomeserverTestCase):
)
self.assertEqual(200, channel.code, channel.result)
self.assertTrue(len(channel.json_body["results"]) == 0)
+
+
+class UserDirectoryRemoteProfileTestCase(unittest.HomeserverTestCase):
+ servlets = [
+ login.register_servlets,
+ synapse.rest.admin.register_servlets,
+ register.register_servlets,
+ room.register_servlets,
+ ]
+
+ def default_config(self) -> JsonDict:
+ config = super().default_config()
+ # Re-enables updating the user directory, as that functionality is needed below.
+ config["update_user_directory_from_worker"] = None
+ return config
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.store = hs.get_datastores().main
+ self.alice = self.register_user("alice", "alice123")
+ self.alice_tok = self.login("alice", "alice123")
+ self.user_dir_helper = GetUserDirectoryTables(self.store)
+ self.user_dir_handler = hs.get_user_directory_handler()
+ self.profile_handler = hs.get_profile_handler()
+
+ # Cancel the startup call: in the steady-state case we can't rely on it anyway.
+ assert self.user_dir_handler._refresh_remote_profiles_call_later is not None
+ self.user_dir_handler._refresh_remote_profiles_call_later.cancel()
+
+ def test_public_rooms_have_profiles_collected(self) -> None:
+ """
+ In a public room, member state events are treated as reflecting the user's
+ real profile and they are accepted.
+ (The main motivation for accepting this is to prevent having to query
+ *every* single profile change over federation.)
+ """
+ room_id = self.helper.create_room_as(
+ self.alice, is_public=True, tok=self.alice_tok
+ )
+ self.get_success(
+ event_injection.inject_member_event(
+ self.hs,
+ room_id,
+ "@bruce:remote",
+ "join",
+ "@bruce:remote",
+ extra_content={
+ "displayname": "Bruce!",
+ "avatar_url": "mxc://remote/123",
+ },
+ )
+ )
+ # Sending this event makes the streams move forward after the injection...
+ self.helper.send(room_id, "Test", tok=self.alice_tok)
+ self.pump(0.1)
+
+ profiles = self.get_success(
+ self.user_dir_helper.get_profiles_in_user_directory()
+ )
+ self.assertEqual(
+ profiles.get("@bruce:remote"),
+ ProfileInfo(display_name="Bruce!", avatar_url="mxc://remote/123"),
+ )
+
+ def test_private_rooms_do_not_have_profiles_collected(self) -> None:
+ """
+ In a private room, member state events are not pulled out and used to populate
+ the user directory.
+ """
+ room_id = self.helper.create_room_as(
+ self.alice, is_public=False, tok=self.alice_tok
+ )
+ self.get_success(
+ event_injection.inject_member_event(
+ self.hs,
+ room_id,
+ "@bruce:remote",
+ "join",
+ "@bruce:remote",
+ extra_content={
+ "displayname": "super-duper bruce",
+ "avatar_url": "mxc://remote/456",
+ },
+ )
+ )
+ # Sending this event makes the streams move forward after the injection...
+ self.helper.send(room_id, "Test", tok=self.alice_tok)
+ self.pump(0.1)
+
+ profiles = self.get_success(
+ self.user_dir_helper.get_profiles_in_user_directory()
+ )
+ self.assertNotIn("@bruce:remote", profiles)
+
+ def test_private_rooms_have_profiles_requested(self) -> None:
+ """
+ When a name changes in a private room, the homeserver instead requests
+ the user's global profile over federation.
+ """
+
+ async def get_remote_profile(
+ user_id: str, ignore_backoff: bool = True
+ ) -> JsonDict:
+ if user_id == "@bruce:remote":
+ return {
+ "displayname": "Sir Bruce Bruceson",
+ "avatar_url": "mxc://remote/789",
+ }
+ else:
+ raise ValueError(f"unable to fetch {user_id}")
+
+ with patch.object(self.profile_handler, "get_profile", get_remote_profile):
+ # Continue from the earlier test...
+ self.test_private_rooms_do_not_have_profiles_collected()
+
+ # Advance by a minute
+ self.reactor.advance(61.0)
+
+ profiles = self.get_success(
+ self.user_dir_helper.get_profiles_in_user_directory()
+ )
+ self.assertEqual(
+ profiles.get("@bruce:remote"),
+ ProfileInfo(
+ display_name="Sir Bruce Bruceson", avatar_url="mxc://remote/789"
+ ),
+ )
+
+ def test_profile_requests_are_retried(self) -> None:
+ """
+ When we fail to fetch the user's profile over federation,
+ we try again later.
+ """
+ has_failed_once = False
+
+ async def get_remote_profile(
+ user_id: str, ignore_backoff: bool = True
+ ) -> JsonDict:
+ nonlocal has_failed_once
+ if user_id == "@bruce:remote":
+ if not has_failed_once:
+ has_failed_once = True
+ raise SynapseError(502, "temporary network problem")
+
+ return {
+ "displayname": "Sir Bruce Bruceson",
+ "avatar_url": "mxc://remote/789",
+ }
+ else:
+ raise ValueError(f"unable to fetch {user_id}")
+
+ with patch.object(self.profile_handler, "get_profile", get_remote_profile):
+ # Continue from the earlier test...
+ self.test_private_rooms_do_not_have_profiles_collected()
+
+ # Advance by a minute
+ self.reactor.advance(61.0)
+
+ # The request has already failed once
+ self.assertTrue(has_failed_once)
+
+ # The profile has yet to be updated.
+ profiles = self.get_success(
+ self.user_dir_helper.get_profiles_in_user_directory()
+ )
+ self.assertNotIn(
+ "@bruce:remote",
+ profiles,
+ )
+
+ # Advance by five minutes, after the backoff has finished
+ self.reactor.advance(301.0)
+
+ # The profile should have been updated now
+ profiles = self.get_success(
+ self.user_dir_helper.get_profiles_in_user_directory()
+ )
+ self.assertEqual(
+ profiles.get("@bruce:remote"),
+ ProfileInfo(
+ display_name="Sir Bruce Bruceson", avatar_url="mxc://remote/789"
+ ),
+ )
|