summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorreivilibre <oliverw@matrix.org>2023-03-16 11:44:11 +0000
committerGitHub <noreply@github.com>2023-03-16 11:44:11 +0000
commit1f5473465d4cb08239bcc97dbbbf185af6841863 (patch)
tree0b47f3da09ed37d9539ac9f153e266f0894c30a0 /synapse/storage/databases
parentMove Account Validity callbacks to a dedicated file (#15237) (diff)
downloadsynapse-1f5473465d4cb08239bcc97dbbbf185af6841863.tar.xz
Refresh remote profiles that have been marked as stale, in order to fill the user directory. [rei:userdirpriv] (#14756)
* Scaffolding for background process to refresh profiles

* Add scaffolding for background process to refresh profiles for a given server

* Implement the code to select servers to refresh from

* Ensure we don't build up multiple looping calls

* Make `get_profile` able to respect backoffs

* Add logic for refreshing users

* When backing off, schedule a refresh when the backoff is over

* Wake up the background processes when we receive an interesting state event

* Add tests

* Newsfile

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>

* Add comment about 1<<62

---------

Signed-off-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/user_directory.py74
1 files changed, 74 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 9cf01b7f36..97f09b73dd 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -504,6 +504,80 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             desc="set_remote_user_profile_in_user_dir_stale",
         )
 
+    async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None:
+        """
+        Marks a remote user as no longer having a possibly-stale user directory profile.
+
+        Args:
+            user_id: the remote user who no longer has a stale profile on this server.
+        """
+        await self.db_pool.simple_delete(
+            table="user_directory_stale_remote_users",
+            keyvalues={"user_id": user_id},
+            desc="clear_remote_user_profile_in_user_dir_stale",
+        )
+
+    async def get_remote_servers_with_profiles_to_refresh(
+        self, now_ts: int, limit: int
+    ) -> List[str]:
+        """
+        Get a list of up to `limit` server names which have users whose
+        locally-cached profiles we believe to be stale
+        and are refreshable given the current time `now_ts` in milliseconds.
+        """
+
+        def _get_remote_servers_with_refreshable_profiles_txn(
+            txn: LoggingTransaction,
+        ) -> List[str]:
+            sql = """
+                SELECT user_server_name
+                FROM user_directory_stale_remote_users
+                WHERE next_try_at_ts < ?
+                GROUP BY user_server_name
+                ORDER BY MIN(next_try_at_ts), user_server_name
+                LIMIT ?
+            """
+            txn.execute(sql, (now_ts, limit))
+            return [row[0] for row in txn]
+
+        return await self.db_pool.runInteraction(
+            "get_remote_servers_with_profiles_to_refresh",
+            _get_remote_servers_with_refreshable_profiles_txn,
+        )
+
+    async def get_remote_users_to_refresh_on_server(
+        self, server_name: str, now_ts: int, limit: int
+    ) -> List[Tuple[str, int, int]]:
+        """
+        Get a list of up to `limit` user IDs from the server `server_name`
+        whose locally-cached profiles we believe to be stale
+        and are refreshable given the current time `now_ts` in milliseconds.
+
+        Returns:
+            tuple of:
+                - User ID
+                - Retry counter (number of failures so far)
+                - Time the retry is scheduled for, in milliseconds
+        """
+
+        def _get_remote_users_to_refresh_on_server_txn(
+            txn: LoggingTransaction,
+        ) -> List[Tuple[str, int, int]]:
+            sql = """
+                SELECT user_id, retry_counter, next_try_at_ts
+                FROM user_directory_stale_remote_users
+                WHERE user_server_name = ? AND next_try_at_ts < ?
+                ORDER BY next_try_at_ts
+                LIMIT ?
+            """
+            txn.execute(sql, (server_name, now_ts, limit))
+            return cast(List[Tuple[str, int, int]], txn.fetchall())
+
+        return await self.db_pool.runInteraction(
+            "get_remote_users_to_refresh_on_server",
+            _get_remote_users_to_refresh_on_server_txn,
+        )
+
     async def update_profile_in_user_dir(
         self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
     ) -> None: