diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index f16a509ac4..97f09b73dd 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -54,6 +54,7 @@ from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import (
JsonDict,
+ UserID,
UserProfile,
get_domain_from_id,
get_localpart_from_id,
@@ -473,11 +474,116 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
return False
+ async def set_remote_user_profile_in_user_dir_stale(
+ self, user_id: str, next_try_at_ms: int, retry_counter: int
+ ) -> None:
+ """
+ Marks a remote user as having a possibly-stale user directory profile.
+
+ Args:
+ user_id: the remote user who may have a stale profile on this server.
+ next_try_at_ms: timestamp in ms after which the user directory profile can be
+ refreshed.
+ retry_counter: number of failures in refreshing the profile so far. Used for
+ exponential backoff calculations.
+ """
+ assert not self.hs.is_mine_id(
+ user_id
+ ), "Can't mark a local user as a stale remote user."
+
+ server_name = UserID.from_string(user_id).domain
+
+ await self.db_pool.simple_upsert(
+ table="user_directory_stale_remote_users",
+ keyvalues={"user_id": user_id},
+ values={
+ "next_try_at_ts": next_try_at_ms,
+ "retry_counter": retry_counter,
+ "user_server_name": server_name,
+ },
+ desc="set_remote_user_profile_in_user_dir_stale",
+ )
+
+ async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None:
+ """
+ Marks a remote user as no longer having a possibly-stale user directory profile.
+
+ Args:
+ user_id: the remote user who no longer has a stale profile on this server.
+ """
+ await self.db_pool.simple_delete(
+ table="user_directory_stale_remote_users",
+ keyvalues={"user_id": user_id},
+ desc="clear_remote_user_profile_in_user_dir_stale",
+ )
+
+ async def get_remote_servers_with_profiles_to_refresh(
+ self, now_ts: int, limit: int
+ ) -> List[str]:
+ """
+ Get a list of up to `limit` server names which have users whose
+ locally-cached profiles we believe to be stale
+ and are refreshable given the current time `now_ts` in milliseconds.
+ """
+
+ def _get_remote_servers_with_refreshable_profiles_txn(
+ txn: LoggingTransaction,
+ ) -> List[str]:
+ sql = """
+ SELECT user_server_name
+ FROM user_directory_stale_remote_users
+ WHERE next_try_at_ts < ?
+ GROUP BY user_server_name
+ ORDER BY MIN(next_try_at_ts), user_server_name
+ LIMIT ?
+ """
+ txn.execute(sql, (now_ts, limit))
+ return [row[0] for row in txn]
+
+ return await self.db_pool.runInteraction(
+ "get_remote_servers_with_profiles_to_refresh",
+ _get_remote_servers_with_refreshable_profiles_txn,
+ )
+
+ async def get_remote_users_to_refresh_on_server(
+ self, server_name: str, now_ts: int, limit: int
+ ) -> List[Tuple[str, int, int]]:
+ """
+ Get a list of up to `limit` user IDs from the server `server_name`
+ whose locally-cached profiles we believe to be stale
+ and are refreshable given the current time `now_ts` in milliseconds.
+
+ Returns:
+ tuple of:
+ - User ID
+ - Retry counter (number of failures so far)
+ - Time the retry is scheduled for, in milliseconds
+ """
+
+ def _get_remote_users_to_refresh_on_server_txn(
+ txn: LoggingTransaction,
+ ) -> List[Tuple[str, int, int]]:
+ sql = """
+ SELECT user_id, retry_counter, next_try_at_ts
+ FROM user_directory_stale_remote_users
+ WHERE user_server_name = ? AND next_try_at_ts < ?
+ ORDER BY next_try_at_ts
+ LIMIT ?
+ """
+ txn.execute(sql, (server_name, now_ts, limit))
+ return cast(List[Tuple[str, int, int]], txn.fetchall())
+
+ return await self.db_pool.runInteraction(
+ "get_remote_users_to_refresh_on_server",
+ _get_remote_users_to_refresh_on_server_txn,
+ )
+
async def update_profile_in_user_dir(
self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
) -> None:
"""
Update or add a user's profile in the user directory.
+ If the user is remote, the profile will be marked as not stale.
"""
# If the display name or avatar URL are unexpected types, replace with None.
display_name = non_null_str_or_none(display_name)
@@ -491,6 +597,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
values={"display_name": display_name, "avatar_url": avatar_url},
)
+ if not self.hs.is_mine_id(user_id):
+ # Remote users: Make sure the profile is not marked as stale anymore.
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="user_directory_stale_remote_users",
+ keyvalues={"user_id": user_id},
+ )
+
# The display name that goes into the database index.
index_display_name = display_name
if index_display_name is not None:
|