summary refs log tree commit diff
path: root/synapse/storage/databases/main/user_directory.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/user_directory.py')
-rw-r--r--synapse/storage/databases/main/user_directory.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py

index f16a509ac4..97f09b73dd 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py
@@ -54,6 +54,7 @@ from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import ( JsonDict, + UserID, UserProfile, get_domain_from_id, get_localpart_from_id, @@ -473,11 +474,116 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): return False + async def set_remote_user_profile_in_user_dir_stale( + self, user_id: str, next_try_at_ms: int, retry_counter: int + ) -> None: + """ + Marks a remote user as having a possibly-stale user directory profile. + + Args: + user_id: the remote user who may have a stale profile on this server. + next_try_at_ms: timestamp in ms after which the user directory profile can be + refreshed. + retry_counter: number of failures in refreshing the profile so far. Used for + exponential backoff calculations. + """ + assert not self.hs.is_mine_id( + user_id + ), "Can't mark a local user as a stale remote user." + + server_name = UserID.from_string(user_id).domain + + await self.db_pool.simple_upsert( + table="user_directory_stale_remote_users", + keyvalues={"user_id": user_id}, + values={ + "next_try_at_ts": next_try_at_ms, + "retry_counter": retry_counter, + "user_server_name": server_name, + }, + desc="set_remote_user_profile_in_user_dir_stale", + ) + + async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None: + """ + Marks a remote user as no longer having a possibly-stale user directory profile. + + Args: + user_id: the remote user who no longer has a stale profile on this server. + """ + await self.db_pool.simple_delete( + table="user_directory_stale_remote_users", + keyvalues={"user_id": user_id}, + desc="clear_remote_user_profile_in_user_dir_stale", + ) + + async def get_remote_servers_with_profiles_to_refresh( + self, now_ts: int, limit: int + ) -> List[str]: + """ + Get a list of up to `limit` server names which have users whose + locally-cached profiles we believe to be stale + and are refreshable given the current time `now_ts` in milliseconds. + """ + + def _get_remote_servers_with_refreshable_profiles_txn( + txn: LoggingTransaction, + ) -> List[str]: + sql = """ + SELECT user_server_name + FROM user_directory_stale_remote_users + WHERE next_try_at_ts < ? + GROUP BY user_server_name + ORDER BY MIN(next_try_at_ts), user_server_name + LIMIT ? + """ + txn.execute(sql, (now_ts, limit)) + return [row[0] for row in txn] + + return await self.db_pool.runInteraction( + "get_remote_servers_with_profiles_to_refresh", + _get_remote_servers_with_refreshable_profiles_txn, + ) + + async def get_remote_users_to_refresh_on_server( + self, server_name: str, now_ts: int, limit: int + ) -> List[Tuple[str, int, int]]: + """ + Get a list of up to `limit` user IDs from the server `server_name` + whose locally-cached profiles we believe to be stale + and are refreshable given the current time `now_ts` in milliseconds. + + Returns: + tuple of: + - User ID + - Retry counter (number of failures so far) + - Time the retry is scheduled for, in milliseconds + """ + + def _get_remote_users_to_refresh_on_server_txn( + txn: LoggingTransaction, + ) -> List[Tuple[str, int, int]]: + sql = """ + SELECT user_id, retry_counter, next_try_at_ts + FROM user_directory_stale_remote_users + WHERE user_server_name = ? AND next_try_at_ts < ? + ORDER BY next_try_at_ts + LIMIT ? + """ + txn.execute(sql, (server_name, now_ts, limit)) + return cast(List[Tuple[str, int, int]], txn.fetchall()) + + return await self.db_pool.runInteraction( + "get_remote_users_to_refresh_on_server", + _get_remote_users_to_refresh_on_server_txn, + ) + async def update_profile_in_user_dir( self, user_id: str, display_name: Optional[str], avatar_url: Optional[str] ) -> None: """ Update or add a user's profile in the user directory. + If the user is remote, the profile will be marked as not stale. """ # If the display name or avatar URL are unexpected types, replace with None. display_name = non_null_str_or_none(display_name) @@ -491,6 +597,14 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): values={"display_name": display_name, "avatar_url": avatar_url}, ) + if not self.hs.is_mine_id(user_id): + # Remote users: Make sure the profile is not marked as stale anymore. + self.db_pool.simple_delete_txn( + txn, + table="user_directory_stale_remote_users", + keyvalues={"user_id": user_id}, + ) + # The display name that goes into the database index. index_display_name = display_name if index_display_name is not None: