summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/14756.bugfix1
-rw-r--r--synapse/handlers/profile.py4
-rw-r--r--synapse/handlers/user_directory.py242
-rw-r--r--synapse/storage/databases/main/user_directory.py74
-rw-r--r--tests/handlers/test_user_directory.py187
5 files changed, 504 insertions, 4 deletions
diff --git a/changelog.d/14756.bugfix b/changelog.d/14756.bugfix
new file mode 100644
index 0000000000..12f979e9d0
--- /dev/null
+++ b/changelog.d/14756.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug in which the user directory would assume any remote membership state events represent a profile change.
\ No newline at end of file
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 4bf9a047a3..9a81a77cbd 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -63,7 +63,7 @@ class ProfileHandler:
 
         self._third_party_rules = hs.get_third_party_event_rules()
 
-    async def get_profile(self, user_id: str) -> JsonDict:
+    async def get_profile(self, user_id: str, ignore_backoff: bool = True) -> JsonDict:
         target_user = UserID.from_string(user_id)
 
         if self.hs.is_mine(target_user):
@@ -81,7 +81,7 @@ class ProfileHandler:
                     destination=target_user.domain,
                     query_type="profile",
                     args={"user_id": user_id},
-                    ignore_backoff=True,
+                    ignore_backoff=ignore_backoff,
                 )
                 return result
             except RequestSendFailed as e:
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 0815be79fa..28a92d41d6 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -13,15 +13,22 @@
 # limitations under the License.
 
 import logging
+from http import HTTPStatus
 from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
 
+from twisted.internet.interfaces import IDelayedCall
+
 import synapse.metrics
 from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
+from synapse.api.errors import Codes, SynapseError
 from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.databases.main.user_directory import SearchResult
 from synapse.storage.roommember import ProfileInfo
+from synapse.types import UserID
 from synapse.util.metrics import Measure
+from synapse.util.retryutils import NotRetryingDestination
+from synapse.util.stringutils import non_null_str_or_none
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -33,6 +40,25 @@ logger = logging.getLogger(__name__)
 # then be coalesced such that only one /profile request is made).
 USER_DIRECTORY_STALE_REFRESH_TIME_MS = 60 * 1000
 
+# Maximum number of remote servers that we will attempt to refresh profiles for
+# in one go.
+MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO = 5
+
+# As long as we have servers to refresh (without backoff), keep adding more
+# every 15 seconds.
+INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES = 15
+
+
+def calculate_time_of_next_retry(now_ts: int, retry_count: int) -> int:
+    """
+    Calculates the time of a next retry given `now_ts` in ms and the number
+    of failures encountered thus far.
+
+    Currently the sequence goes:
+    1 min, 5 min, 25 min, 2 hour, 10 hour, 52 hour, 10 day, 7.75 week
+    """
+    return now_ts + 60_000 * (5 ** min(retry_count, 7))
+
 
 class UserDirectoryHandler(StateDeltasHandler):
     """Handles queries and updates for the user_directory.
@@ -69,12 +95,24 @@ class UserDirectoryHandler(StateDeltasHandler):
         self.update_user_directory = hs.config.worker.should_update_user_directory
         self.search_all_users = hs.config.userdirectory.user_directory_search_all_users
         self.spam_checker = hs.get_spam_checker()
+        self._hs = hs
+
         # The current position in the current_state_delta stream
         self.pos: Optional[int] = None
 
         # Guard to ensure we only process deltas one at a time
         self._is_processing = False
 
+        # Guard to ensure we only have one process for refreshing remote profiles
+        self._is_refreshing_remote_profiles = False
+        # Handle to cancel the `call_later` of `kick_off_remote_profile_refresh_process`
+        self._refresh_remote_profiles_call_later: Optional[IDelayedCall] = None
+
+        # Guard to ensure we only have one process for refreshing remote profiles
+        # for the given servers.
+        # Set of server names.
+        self._is_refreshing_remote_profiles_for_servers: Set[str] = set()
+
         if self.update_user_directory:
             self.notifier.add_replication_callback(self.notify_new_event)
 
@@ -82,6 +120,11 @@ class UserDirectoryHandler(StateDeltasHandler):
             # we start populating the user directory
             self.clock.call_later(0, self.notify_new_event)
 
+            # Kick off the profile refresh process on startup
+            self._refresh_remote_profiles_call_later = self.clock.call_later(
+                10, self.kick_off_remote_profile_refresh_process
+            )
+
     async def search_users(
         self, user_id: str, search_term: str, limit: int
     ) -> SearchResult:
@@ -483,6 +526,20 @@ class UserDirectoryHandler(StateDeltasHandler):
                 next_try_at_ms=now_ts + USER_DIRECTORY_STALE_REFRESH_TIME_MS,
                 retry_counter=0,
             )
+            # Schedule a wake-up to refresh the user directory for this server.
+            # We intentionally wake up this server directly because we don't want
+            # other servers ahead of it in the queue to get in the way of updating
+            # the profile if the server only just sent us an event.
+            self.clock.call_later(
+                USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
+                self.kick_off_remote_profile_refresh_process_for_remote_server,
+                UserID.from_string(user_id).domain,
+            )
+            # Schedule a wake-up to handle any backoffs that may occur in the future.
+            self.clock.call_later(
+                2 * USER_DIRECTORY_STALE_REFRESH_TIME_MS // 1000 + 1,
+                self.kick_off_remote_profile_refresh_process,
+            )
             return
 
         prev_name = prev_event.content.get("displayname")
@@ -505,3 +562,188 @@ class UserDirectoryHandler(StateDeltasHandler):
             # Only update if something has changed, or we didn't have a previous event
             # in the first place.
             await self.store.update_profile_in_user_dir(user_id, new_name, new_avatar)
+
+    def kick_off_remote_profile_refresh_process(self) -> None:
+        """Called when there may be remote users with stale profiles to be refreshed"""
+        if not self.update_user_directory:
+            return
+
+        if self._is_refreshing_remote_profiles:
+            return
+
+        if self._refresh_remote_profiles_call_later:
+            if self._refresh_remote_profiles_call_later.active():
+                self._refresh_remote_profiles_call_later.cancel()
+            self._refresh_remote_profiles_call_later = None
+
+        async def process() -> None:
+            try:
+                await self._unsafe_refresh_remote_profiles()
+            finally:
+                self._is_refreshing_remote_profiles = False
+
+        self._is_refreshing_remote_profiles = True
+        run_as_background_process("user_directory.refresh_remote_profiles", process)
+
+    async def _unsafe_refresh_remote_profiles(self) -> None:
+        limit = MAX_SERVERS_TO_REFRESH_PROFILES_FOR_IN_ONE_GO - len(
+            self._is_refreshing_remote_profiles_for_servers
+        )
+        if limit <= 0:
+            # nothing to do: already refreshing the maximum number of servers
+            # at once.
+            # Come back later.
+            self._refresh_remote_profiles_call_later = self.clock.call_later(
+                INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
+                self.kick_off_remote_profile_refresh_process,
+            )
+            return
+
+        servers_to_refresh = (
+            await self.store.get_remote_servers_with_profiles_to_refresh(
+                now_ts=self.clock.time_msec(), limit=limit
+            )
+        )
+
+        if not servers_to_refresh:
+            # Do we have any backing-off servers that we should try again
+            # for eventually?
+            # By setting `now` is a point in the far future, we can ask for
+            # which server/user is next to be refreshed, even though it is
+            # not actually refreshable *now*.
+            end_of_time = 1 << 62
+            backing_off_servers = (
+                await self.store.get_remote_servers_with_profiles_to_refresh(
+                    now_ts=end_of_time, limit=1
+                )
+            )
+            if backing_off_servers:
+                # Find out when the next user is refreshable and schedule a
+                # refresh then.
+                backing_off_server_name = backing_off_servers[0]
+                users = await self.store.get_remote_users_to_refresh_on_server(
+                    backing_off_server_name, now_ts=end_of_time, limit=1
+                )
+                if not users:
+                    return
+                _, _, next_try_at_ts = users[0]
+                self._refresh_remote_profiles_call_later = self.clock.call_later(
+                    ((next_try_at_ts - self.clock.time_msec()) // 1000) + 2,
+                    self.kick_off_remote_profile_refresh_process,
+                )
+
+            return
+
+        for server_to_refresh in servers_to_refresh:
+            self.kick_off_remote_profile_refresh_process_for_remote_server(
+                server_to_refresh
+            )
+
+        self._refresh_remote_profiles_call_later = self.clock.call_later(
+            INTERVAL_TO_ADD_MORE_SERVERS_TO_REFRESH_PROFILES,
+            self.kick_off_remote_profile_refresh_process,
+        )
+
+    def kick_off_remote_profile_refresh_process_for_remote_server(
+        self, server_name: str
+    ) -> None:
+        """Called when there may be remote users with stale profiles to be refreshed
+        on the given server."""
+        if not self.update_user_directory:
+            return
+
+        if server_name in self._is_refreshing_remote_profiles_for_servers:
+            return
+
+        async def process() -> None:
+            try:
+                await self._unsafe_refresh_remote_profiles_for_remote_server(
+                    server_name
+                )
+            finally:
+                self._is_refreshing_remote_profiles_for_servers.remove(server_name)
+
+        self._is_refreshing_remote_profiles_for_servers.add(server_name)
+        run_as_background_process(
+            "user_directory.refresh_remote_profiles_for_remote_server", process
+        )
+
+    async def _unsafe_refresh_remote_profiles_for_remote_server(
+        self, server_name: str
+    ) -> None:
+        logger.info("Refreshing profiles in user directory for %s", server_name)
+
+        while True:
+            # Get a handful of users to process.
+            next_batch = await self.store.get_remote_users_to_refresh_on_server(
+                server_name, now_ts=self.clock.time_msec(), limit=10
+            )
+            if not next_batch:
+                # Finished for now
+                return
+
+            for user_id, retry_counter, _ in next_batch:
+                # Request the profile of the user.
+                try:
+                    profile = await self._hs.get_profile_handler().get_profile(
+                        user_id, ignore_backoff=False
+                    )
+                except NotRetryingDestination as e:
+                    logger.info(
+                        "Failed to refresh profile for %r because the destination is undergoing backoff",
+                        user_id,
+                    )
+                    # As a special-case, we back off until the destination is no longer
+                    # backed off from.
+                    await self.store.set_remote_user_profile_in_user_dir_stale(
+                        user_id,
+                        e.retry_last_ts + e.retry_interval,
+                        retry_counter=retry_counter + 1,
+                    )
+                    continue
+                except SynapseError as e:
+                    if e.code == HTTPStatus.NOT_FOUND and e.errcode == Codes.NOT_FOUND:
+                        # The profile doesn't exist.
+                        # TODO Does this mean we should clear it from our user
+                        #      directory?
+                        await self.store.clear_remote_user_profile_in_user_dir_stale(
+                            user_id
+                        )
+                        logger.warning(
+                            "Refresh of remote profile %r: not found (%r)",
+                            user_id,
+                            e.msg,
+                        )
+                        continue
+
+                    logger.warning(
+                        "Failed to refresh profile for %r because %r", user_id, e
+                    )
+                    await self.store.set_remote_user_profile_in_user_dir_stale(
+                        user_id,
+                        calculate_time_of_next_retry(
+                            self.clock.time_msec(), retry_counter + 1
+                        ),
+                        retry_counter=retry_counter + 1,
+                    )
+                    continue
+                except Exception:
+                    logger.error(
+                        "Failed to refresh profile for %r due to unhandled exception",
+                        user_id,
+                        exc_info=True,
+                    )
+                    await self.store.set_remote_user_profile_in_user_dir_stale(
+                        user_id,
+                        calculate_time_of_next_retry(
+                            self.clock.time_msec(), retry_counter + 1
+                        ),
+                        retry_counter=retry_counter + 1,
+                    )
+                    continue
+
+                await self.store.update_profile_in_user_dir(
+                    user_id,
+                    display_name=non_null_str_or_none(profile.get("displayname")),
+                    avatar_url=non_null_str_or_none(profile.get("avatar_url")),
+                )
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 9cf01b7f36..97f09b73dd 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -504,6 +504,80 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             desc="set_remote_user_profile_in_user_dir_stale",
         )
 
+    async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None:
+        """
+        Marks a remote user as no longer having a possibly-stale user directory profile.
+
+        Args:
+            user_id: the remote user who no longer has a stale profile on this server.
+        """
+        await self.db_pool.simple_delete(
+            table="user_directory_stale_remote_users",
+            keyvalues={"user_id": user_id},
+            desc="clear_remote_user_profile_in_user_dir_stale",
+        )
+
+    async def get_remote_servers_with_profiles_to_refresh(
+        self, now_ts: int, limit: int
+    ) -> List[str]:
+        """
+        Get a list of up to `limit` server names which have users whose
+        locally-cached profiles we believe to be stale
+        and are refreshable given the current time `now_ts` in milliseconds.
+        """
+
+        def _get_remote_servers_with_refreshable_profiles_txn(
+            txn: LoggingTransaction,
+        ) -> List[str]:
+            sql = """
+                SELECT user_server_name
+                FROM user_directory_stale_remote_users
+                WHERE next_try_at_ts < ?
+                GROUP BY user_server_name
+                ORDER BY MIN(next_try_at_ts), user_server_name
+                LIMIT ?
+            """
+            txn.execute(sql, (now_ts, limit))
+            return [row[0] for row in txn]
+
+        return await self.db_pool.runInteraction(
+            "get_remote_servers_with_profiles_to_refresh",
+            _get_remote_servers_with_refreshable_profiles_txn,
+        )
+
+    async def get_remote_users_to_refresh_on_server(
+        self, server_name: str, now_ts: int, limit: int
+    ) -> List[Tuple[str, int, int]]:
+        """
+        Get a list of up to `limit` user IDs from the server `server_name`
+        whose locally-cached profiles we believe to be stale
+        and are refreshable given the current time `now_ts` in milliseconds.
+
+        Returns:
+            tuple of:
+                - User ID
+                - Retry counter (number of failures so far)
+                - Time the retry is scheduled for, in milliseconds
+        """
+
+        def _get_remote_users_to_refresh_on_server_txn(
+            txn: LoggingTransaction,
+        ) -> List[Tuple[str, int, int]]:
+            sql = """
+                SELECT user_id, retry_counter, next_try_at_ts
+                FROM user_directory_stale_remote_users
+                WHERE user_server_name = ? AND next_try_at_ts < ?
+                ORDER BY next_try_at_ts
+                LIMIT ?
+            """
+            txn.execute(sql, (server_name, now_ts, limit))
+            return cast(List[Tuple[str, int, int]], txn.fetchall())
+
+        return await self.db_pool.runInteraction(
+            "get_remote_users_to_refresh_on_server",
+            _get_remote_users_to_refresh_on_server_txn,
+        )
+
     async def update_profile_in_user_dir(
         self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
     ) -> None:
diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py
index a02c1c6227..da4d240826 100644
--- a/tests/handlers/test_user_directory.py
+++ b/tests/handlers/test_user_directory.py
@@ -19,17 +19,18 @@ from twisted.test.proto_helpers import MemoryReactor
 
 import synapse.rest.admin
 from synapse.api.constants import UserTypes
+from synapse.api.errors import SynapseError
 from synapse.api.room_versions import RoomVersion, RoomVersions
 from synapse.appservice import ApplicationService
 from synapse.rest.client import login, register, room, user_directory
 from synapse.server import HomeServer
 from synapse.storage.roommember import ProfileInfo
-from synapse.types import UserProfile, create_requester
+from synapse.types import JsonDict, UserProfile, create_requester
 from synapse.util import Clock
 
 from tests import unittest
 from tests.storage.test_user_directory import GetUserDirectoryTables
-from tests.test_utils import make_awaitable
+from tests.test_utils import event_injection, make_awaitable
 from tests.test_utils.event_injection import inject_member_event
 from tests.unittest import override_config
 
@@ -1103,3 +1104,185 @@ class TestUserDirSearchDisabled(unittest.HomeserverTestCase):
         )
         self.assertEqual(200, channel.code, channel.result)
         self.assertTrue(len(channel.json_body["results"]) == 0)
+
+
+class UserDirectoryRemoteProfileTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        login.register_servlets,
+        synapse.rest.admin.register_servlets,
+        register.register_servlets,
+        room.register_servlets,
+    ]
+
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        # Re-enables updating the user directory, as that functionality is needed below.
+        config["update_user_directory_from_worker"] = None
+        return config
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.alice = self.register_user("alice", "alice123")
+        self.alice_tok = self.login("alice", "alice123")
+        self.user_dir_helper = GetUserDirectoryTables(self.store)
+        self.user_dir_handler = hs.get_user_directory_handler()
+        self.profile_handler = hs.get_profile_handler()
+
+        # Cancel the startup call: in the steady-state case we can't rely on it anyway.
+        assert self.user_dir_handler._refresh_remote_profiles_call_later is not None
+        self.user_dir_handler._refresh_remote_profiles_call_later.cancel()
+
+    def test_public_rooms_have_profiles_collected(self) -> None:
+        """
+        In a public room, member state events are treated as reflecting the user's
+        real profile and they are accepted.
+        (The main motivation for accepting this is to prevent having to query
+        *every* single profile change over federation.)
+        """
+        room_id = self.helper.create_room_as(
+            self.alice, is_public=True, tok=self.alice_tok
+        )
+        self.get_success(
+            event_injection.inject_member_event(
+                self.hs,
+                room_id,
+                "@bruce:remote",
+                "join",
+                "@bruce:remote",
+                extra_content={
+                    "displayname": "Bruce!",
+                    "avatar_url": "mxc://remote/123",
+                },
+            )
+        )
+        # Sending this event makes the streams move forward after the injection...
+        self.helper.send(room_id, "Test", tok=self.alice_tok)
+        self.pump(0.1)
+
+        profiles = self.get_success(
+            self.user_dir_helper.get_profiles_in_user_directory()
+        )
+        self.assertEqual(
+            profiles.get("@bruce:remote"),
+            ProfileInfo(display_name="Bruce!", avatar_url="mxc://remote/123"),
+        )
+
+    def test_private_rooms_do_not_have_profiles_collected(self) -> None:
+        """
+        In a private room, member state events are not pulled out and used to populate
+        the user directory.
+        """
+        room_id = self.helper.create_room_as(
+            self.alice, is_public=False, tok=self.alice_tok
+        )
+        self.get_success(
+            event_injection.inject_member_event(
+                self.hs,
+                room_id,
+                "@bruce:remote",
+                "join",
+                "@bruce:remote",
+                extra_content={
+                    "displayname": "super-duper bruce",
+                    "avatar_url": "mxc://remote/456",
+                },
+            )
+        )
+        # Sending this event makes the streams move forward after the injection...
+        self.helper.send(room_id, "Test", tok=self.alice_tok)
+        self.pump(0.1)
+
+        profiles = self.get_success(
+            self.user_dir_helper.get_profiles_in_user_directory()
+        )
+        self.assertNotIn("@bruce:remote", profiles)
+
+    def test_private_rooms_have_profiles_requested(self) -> None:
+        """
+        When a name changes in a private room, the homeserver instead requests
+        the user's global profile over federation.
+        """
+
+        async def get_remote_profile(
+            user_id: str, ignore_backoff: bool = True
+        ) -> JsonDict:
+            if user_id == "@bruce:remote":
+                return {
+                    "displayname": "Sir Bruce Bruceson",
+                    "avatar_url": "mxc://remote/789",
+                }
+            else:
+                raise ValueError(f"unable to fetch {user_id}")
+
+        with patch.object(self.profile_handler, "get_profile", get_remote_profile):
+            # Continue from the earlier test...
+            self.test_private_rooms_do_not_have_profiles_collected()
+
+            # Advance by a minute
+            self.reactor.advance(61.0)
+
+        profiles = self.get_success(
+            self.user_dir_helper.get_profiles_in_user_directory()
+        )
+        self.assertEqual(
+            profiles.get("@bruce:remote"),
+            ProfileInfo(
+                display_name="Sir Bruce Bruceson", avatar_url="mxc://remote/789"
+            ),
+        )
+
+    def test_profile_requests_are_retried(self) -> None:
+        """
+        When we fail to fetch the user's profile over federation,
+        we try again later.
+        """
+        has_failed_once = False
+
+        async def get_remote_profile(
+            user_id: str, ignore_backoff: bool = True
+        ) -> JsonDict:
+            nonlocal has_failed_once
+            if user_id == "@bruce:remote":
+                if not has_failed_once:
+                    has_failed_once = True
+                    raise SynapseError(502, "temporary network problem")
+
+                return {
+                    "displayname": "Sir Bruce Bruceson",
+                    "avatar_url": "mxc://remote/789",
+                }
+            else:
+                raise ValueError(f"unable to fetch {user_id}")
+
+        with patch.object(self.profile_handler, "get_profile", get_remote_profile):
+            # Continue from the earlier test...
+            self.test_private_rooms_do_not_have_profiles_collected()
+
+            # Advance by a minute
+            self.reactor.advance(61.0)
+
+            # The request has already failed once
+            self.assertTrue(has_failed_once)
+
+            # The profile has yet to be updated.
+            profiles = self.get_success(
+                self.user_dir_helper.get_profiles_in_user_directory()
+            )
+            self.assertNotIn(
+                "@bruce:remote",
+                profiles,
+            )
+
+            # Advance by five minutes, after the backoff has finished
+            self.reactor.advance(301.0)
+
+            # The profile should have been updated now
+            profiles = self.get_success(
+                self.user_dir_helper.get_profiles_in_user_directory()
+            )
+            self.assertEqual(
+                profiles.get("@bruce:remote"),
+                ProfileInfo(
+                    display_name="Sir Bruce Bruceson", avatar_url="mxc://remote/789"
+                ),
+            )