summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/user_directory.py74
1 files changed, 74 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 9cf01b7f36..97f09b73dd 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -504,6 +504,80 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             desc="set_remote_user_profile_in_user_dir_stale",
         )
 
+    async def clear_remote_user_profile_in_user_dir_stale(self, user_id: str) -> None:
+        """
+        Marks a remote user as no longer having a possibly-stale user directory profile.
+
+        Args:
+            user_id: the remote user who no longer has a stale profile on this server.
+        """
+        await self.db_pool.simple_delete(
+            table="user_directory_stale_remote_users",
+            keyvalues={"user_id": user_id},
+            desc="clear_remote_user_profile_in_user_dir_stale",
+        )
+
+    async def get_remote_servers_with_profiles_to_refresh(
+        self, now_ts: int, limit: int
+    ) -> List[str]:
+        """
+        Get a list of up to `limit` server names which have users whose
+        locally-cached profiles we believe to be stale
+        and are refreshable given the current time `now_ts` in milliseconds.
+        """
+
+        def _get_remote_servers_with_refreshable_profiles_txn(
+            txn: LoggingTransaction,
+        ) -> List[str]:
+            sql = """
+                SELECT user_server_name
+                FROM user_directory_stale_remote_users
+                WHERE next_try_at_ts < ?
+                GROUP BY user_server_name
+                ORDER BY MIN(next_try_at_ts), user_server_name
+                LIMIT ?
+            """
+            txn.execute(sql, (now_ts, limit))
+            return [row[0] for row in txn]
+
+        return await self.db_pool.runInteraction(
+            "get_remote_servers_with_profiles_to_refresh",
+            _get_remote_servers_with_refreshable_profiles_txn,
+        )
+
+    async def get_remote_users_to_refresh_on_server(
+        self, server_name: str, now_ts: int, limit: int
+    ) -> List[Tuple[str, int, int]]:
+        """
+        Get a list of up to `limit` user IDs from the server `server_name`
+        whose locally-cached profiles we believe to be stale
+        and are refreshable given the current time `now_ts` in milliseconds.
+
+        Returns:
+            tuple of:
+                - User ID
+                - Retry counter (number of failures so far)
+                - Time the retry is scheduled for, in milliseconds
+        """
+
+        def _get_remote_users_to_refresh_on_server_txn(
+            txn: LoggingTransaction,
+        ) -> List[Tuple[str, int, int]]:
+            sql = """
+                SELECT user_id, retry_counter, next_try_at_ts
+                FROM user_directory_stale_remote_users
+                WHERE user_server_name = ? AND next_try_at_ts < ?
+                ORDER BY next_try_at_ts
+                LIMIT ?
+            """
+            txn.execute(sql, (server_name, now_ts, limit))
+            return cast(List[Tuple[str, int, int]], txn.fetchall())
+
+        return await self.db_pool.runInteraction(
+            "get_remote_users_to_refresh_on_server",
+            _get_remote_users_to_refresh_on_server_txn,
+        )
+
     async def update_profile_in_user_dir(
         self, user_id: str, display_name: Optional[str], avatar_url: Optional[str]
     ) -> None: