diff options
author | Shay <hillerys@element.io> | 2023-05-16 10:57:39 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-16 10:57:39 -0700 |
commit | 9f6ff6a0eb94a9f81b9948bc3b651a1eb78de460 (patch) | |
tree | 130fa09eb1d7e40a8a3998177f48e351eebe5f25 /synapse/storage/databases | |
parent | `traceback.format_exception(...)` usage that is compatible with Python 3.7 an... (diff) | |
download | synapse-9f6ff6a0eb94a9f81b9948bc3b651a1eb78de460.tar.xz |
Add not null constraint to column `full_user_id` of tables `profiles` and `user_filters` (#15537)
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/filtering.py | 95 | ||||
-rw-r--r-- | synapse/storage/databases/main/profile.py | 102 |
2 files changed, 195 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py index 50516402f9..da31eb44dc 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py @@ -25,6 +25,7 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.engines import PostgresEngine from synapse.types import JsonDict, UserID from synapse.util.caches.descriptors import cached @@ -40,6 +41,8 @@ class FilteringWorkerStore(SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self.server_name: str = hs.hostname + self.database_engine = database.engine self.db_pool.updates.register_background_index_update( "full_users_filters_unique_idx", index_name="full_users_unique_idx", @@ -48,6 +51,98 @@ class FilteringWorkerStore(SQLBaseStore): unique=True, ) + self.db_pool.updates.register_background_update_handler( + "populate_full_user_id_user_filters", + self.populate_full_user_id_user_filters, + ) + + async def populate_full_user_id_user_filters( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the column `full_user_id` of the table + user_filters from entries in the column `user_local_part` of the same table + """ + + lower_bound_id = progress.get("lower_bound_id", "") + + def _get_last_id(txn: LoggingTransaction) -> Optional[str]: + sql = """ + SELECT user_id FROM user_filters + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET 50 + """ + txn.execute(sql, (lower_bound_id,)) + res = txn.fetchone() + if res: + upper_bound_id = res[0] + return upper_bound_id + else: + return None + + def _process_batch( + txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str + ) -> None: + sql = """ + UPDATE user_filters + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL + """ + txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id)) + + def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None: + sql = """ + UPDATE user_filters + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND full_user_id IS NULL + """ + txn.execute( + sql, + ( + f":{self.server_name}", + lower_bound_id, + ), + ) + + if isinstance(self.database_engine, PostgresEngine): + sql = """ + ALTER TABLE user_filters VALIDATE CONSTRAINT full_user_id_not_null + """ + txn.execute(sql) + + upper_bound_id = await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", _get_last_id + ) + + if upper_bound_id is None: + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", _final_batch, lower_bound_id + ) + + await self.db_pool.updates._end_background_update( + "populate_full_user_id_user_filters" + ) + return 1 + + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", + _process_batch, + lower_bound_id, + upper_bound_id, + ) + + progress["lower_bound_id"] = upper_bound_id + + await self.db_pool.runInteraction( + "populate_full_user_id_user_filters", + self.db_pool.updates._background_update_progress_txn, + "populate_full_user_id_user_filters", + progress, + ) + + return 50 + @cached(num_args=2) async def get_user_filter( self, user_localpart: str, filter_id: Union[int, str] diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index c4022d2427..65c92bef51 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -15,9 +15,14 @@ from typing import TYPE_CHECKING, Optional from synapse.api.errors import StoreError from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, +) from synapse.storage.databases.main.roommember import ProfileInfo -from synapse.types import UserID +from synapse.storage.engines import PostgresEngine +from synapse.types import JsonDict, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -31,6 +36,8 @@ class ProfileWorkerStore(SQLBaseStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + self.server_name: str = hs.hostname + self.database_engine = database.engine self.db_pool.updates.register_background_index_update( "profiles_full_user_id_key_idx", index_name="profiles_full_user_id_key", @@ -39,6 +46,97 @@ class ProfileWorkerStore(SQLBaseStore): unique=True, ) + self.db_pool.updates.register_background_update_handler( + "populate_full_user_id_profiles", self.populate_full_user_id_profiles + ) + + async def populate_full_user_id_profiles( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the column `full_user_id` of the table + profiles from entries in the column `user_local_part` of the same table + """ + + lower_bound_id = progress.get("lower_bound_id", "") + + def _get_last_id(txn: LoggingTransaction) -> Optional[str]: + sql = """ + SELECT user_id FROM profiles + WHERE user_id > ? + ORDER BY user_id + LIMIT 1 OFFSET 50 + """ + txn.execute(sql, (lower_bound_id,)) + res = txn.fetchone() + if res: + upper_bound_id = res[0] + return upper_bound_id + else: + return None + + def _process_batch( + txn: LoggingTransaction, lower_bound_id: str, upper_bound_id: str + ) -> None: + sql = """ + UPDATE profiles + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND user_id <= ? AND full_user_id IS NULL + """ + txn.execute(sql, (f":{self.server_name}", lower_bound_id, upper_bound_id)) + + def _final_batch(txn: LoggingTransaction, lower_bound_id: str) -> None: + sql = """ + UPDATE profiles + SET full_user_id = '@' || user_id || ? + WHERE ? < user_id AND full_user_id IS NULL + """ + txn.execute( + sql, + ( + f":{self.server_name}", + lower_bound_id, + ), + ) + + if isinstance(self.database_engine, PostgresEngine): + sql = """ + ALTER TABLE profiles VALIDATE CONSTRAINT full_user_id_not_null + """ + txn.execute(sql) + + upper_bound_id = await self.db_pool.runInteraction( + "populate_full_user_id_profiles", _get_last_id + ) + + if upper_bound_id is None: + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", _final_batch, lower_bound_id + ) + + await self.db_pool.updates._end_background_update( + "populate_full_user_id_profiles" + ) + return 1 + + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", + _process_batch, + lower_bound_id, + upper_bound_id, + ) + + progress["lower_bound_id"] = upper_bound_id + + await self.db_pool.runInteraction( + "populate_full_user_id_profiles", + self.db_pool.updates._background_update_progress_txn, + "populate_full_user_id_profiles", + progress, + ) + + return 50 + async def get_profileinfo(self, user_localpart: str) -> ProfileInfo: try: profile = await self.db_pool.simple_select_one( |