diff options
author | Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> | 2023-01-01 03:40:46 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-01 03:40:46 +0000 |
commit | c4456114e1a5471bb61cb45605e782263dc8233c (patch) | |
tree | add9465775182b17448f7cfad588604eb3671ac2 /synapse/storage | |
parent | Actually use the picture_claim as configured in OIDC config. (#14751) (diff) | |
download | synapse-c4456114e1a5471bb61cb45605e782263dc8233c.tar.xz |
Add experimental support for MSC3391: deleting account data (#14714)
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/database.py | 33 | ||||
-rw-r--r-- | synapse/storage/databases/main/account_data.py | 219 |
2 files changed, 238 insertions, 14 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 0b29e67b94..88479a16db 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1762,7 +1762,8 @@ class DatabasePool: desc: description of the transaction, for logging and metrics Returns: - A list of dictionaries. + A list of dictionaries, one per result row, each a mapping between the + column names from `retcols` and that column's value for the row. """ return await self.runInteraction( desc, @@ -1791,6 +1792,10 @@ class DatabasePool: column names and values to select the rows with, or None to not apply a WHERE clause. retcols: the names of the columns to return + + Returns: + A list of dictionaries, one per result row, each a mapping between the + column names from `retcols` and that column's value for the row. """ if keyvalues: sql = "SELECT %s FROM %s WHERE %s" % ( @@ -1898,6 +1903,19 @@ class DatabasePool: updatevalues: Dict[str, Any], desc: str, ) -> int: + """ + Update rows in the given database table. + If the given keyvalues don't match anything, nothing will be updated. + + Args: + table: The database table to update. + keyvalues: A mapping of column name to value to match rows on. + updatevalues: A mapping of column name to value to replace in any matched rows. + desc: description of the transaction, for logging and metrics. + + Returns: + The number of rows that were updated. Will be 0 if no matching rows were found. + """ return await self.runInteraction( desc, self.simple_update_txn, table, keyvalues, updatevalues ) @@ -1909,6 +1927,19 @@ class DatabasePool: keyvalues: Dict[str, Any], updatevalues: Dict[str, Any], ) -> int: + """ + Update rows in the given database table. + If the given keyvalues don't match anything, nothing will be updated. + + Args: + txn: The database transaction object. + table: The database table to update. + keyvalues: A mapping of column name to value to match rows on. + updatevalues: A mapping of column name to value to replace in any matched rows. + + Returns: + The number of rows that were updated. Will be 0 if no matching rows were found. + """ if keyvalues: where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) else: diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 07908c41d9..e59776f434 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -123,7 +123,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) async def get_account_data_for_user( self, user_id: str ) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: - """Get all the client account_data for a user. + """ + Get all the client account_data for a user. + + If experimental MSC3391 support is enabled, any entries with an empty + content body are excluded; as this means they have been deleted. Args: user_id: The user to get the account_data for. @@ -135,27 +139,48 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) def get_account_data_for_user_txn( txn: LoggingTransaction, ) -> Tuple[Dict[str, JsonDict], Dict[str, Dict[str, JsonDict]]]: - rows = self.db_pool.simple_select_list_txn( - txn, - "account_data", - {"user_id": user_id}, - ["account_data_type", "content"], - ) + # The 'content != '{}' condition below prevents us from using + # `simple_select_list_txn` here, as it doesn't support conditions + # other than 'equals'. + sql = """ + SELECT account_data_type, content FROM account_data + WHERE user_id = ? + """ + + # If experimental MSC3391 support is enabled, then account data entries + # with an empty content are considered "deleted". So skip adding them to + # the results. + if self.hs.config.experimental.msc3391_enabled: + sql += " AND content != '{}'" + + txn.execute(sql, (user_id,)) + rows = self.db_pool.cursor_to_dict(txn) global_account_data = { row["account_data_type"]: db_to_json(row["content"]) for row in rows } - rows = self.db_pool.simple_select_list_txn( - txn, - "room_account_data", - {"user_id": user_id}, - ["room_id", "account_data_type", "content"], - ) + # The 'content != '{}' condition below prevents us from using + # `simple_select_list_txn` here, as it doesn't support conditions + # other than 'equals'. + sql = """ + SELECT room_id, account_data_type, content FROM room_account_data + WHERE user_id = ? + """ + + # If experimental MSC3391 support is enabled, then account data entries + # with an empty content are considered "deleted". So skip adding them to + # the results. + if self.hs.config.experimental.msc3391_enabled: + sql += " AND content != '{}'" + + txn.execute(sql, (user_id,)) + rows = self.db_pool.cursor_to_dict(txn) by_room: Dict[str, Dict[str, JsonDict]] = {} for row in rows: room_data = by_room.setdefault(row["room_id"], {}) + room_data[row["account_data_type"]] = db_to_json(row["content"]) return global_account_data, by_room @@ -469,6 +494,72 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) return self._account_data_id_gen.get_current_token() + async def remove_account_data_for_room( + self, user_id: str, room_id: str, account_data_type: str + ) -> Optional[int]: + """Delete the room account data for the user of a given type. + + Args: + user_id: The user to remove account_data for. + room_id: The room ID to scope the request to. + account_data_type: The account data type to delete. + + Returns: + The maximum stream position, or None if there was no matching room account + data to delete. + """ + assert self._can_write_to_account_data + assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator) + + def _remove_account_data_for_room_txn( + txn: LoggingTransaction, next_id: int + ) -> bool: + """ + Args: + txn: The transaction object. + next_id: The stream_id to update any existing rows to. + + Returns: + True if an entry in room_account_data had its content set to '{}', + otherwise False. This informs callers of whether there actually was an + existing room account data entry to delete, or if the call was a no-op. + """ + # We can't use `simple_update` as it doesn't have the ability to specify + # where clauses other than '=', which we need for `content != '{}'` below. + sql = """ + UPDATE room_account_data + SET stream_id = ?, content = '{}' + WHERE user_id = ? + AND room_id = ? + AND account_data_type = ? + AND content != '{}' + """ + txn.execute( + sql, + (next_id, user_id, room_id, account_data_type), + ) + # Return true if any rows were updated. + return txn.rowcount != 0 + + async with self._account_data_id_gen.get_next() as next_id: + row_updated = await self.db_pool.runInteraction( + "remove_account_data_for_room", + _remove_account_data_for_room_txn, + next_id, + ) + + if not row_updated: + return None + + self._account_data_stream_cache.entity_has_changed(user_id, next_id) + self.get_account_data_for_user.invalidate((user_id,)) + self.get_account_data_for_room.invalidate((user_id, room_id)) + self.get_account_data_for_room_and_type.prefill( + (user_id, room_id, account_data_type), {} + ) + + return self._account_data_id_gen.get_current_token() + async def add_account_data_for_user( self, user_id: str, account_data_type: str, content: JsonDict ) -> int: @@ -569,6 +660,108 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) self._invalidate_cache_and_stream(txn, self.ignored_by, (ignored_user_id,)) self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) + async def remove_account_data_for_user( + self, + user_id: str, + account_data_type: str, + ) -> Optional[int]: + """ + Delete a single piece of user account data by type. + + A "delete" is performed by updating a potentially existing row in the + "account_data" database table for (user_id, account_data_type) and + setting its content to "{}". + + Args: + user_id: The user ID to modify the account data of. + account_data_type: The type to remove. + + Returns: + The maximum stream position, or None if there was no matching account data + to delete. + """ + assert self._can_write_to_account_data + assert isinstance(self._account_data_id_gen, AbstractStreamIdGenerator) + + def _remove_account_data_for_user_txn( + txn: LoggingTransaction, next_id: int + ) -> bool: + """ + Args: + txn: The transaction object. + next_id: The stream_id to update any existing rows to. + + Returns: + True if an entry in account_data had its content set to '{}', otherwise + False. This informs callers of whether there actually was an existing + account data entry to delete, or if the call was a no-op. + """ + # We can't use `simple_update` as it doesn't have the ability to specify + # where clauses other than '=', which we need for `content != '{}'` below. + sql = """ + UPDATE account_data + SET stream_id = ?, content = '{}' + WHERE user_id = ? + AND account_data_type = ? + AND content != '{}' + """ + txn.execute(sql, (next_id, user_id, account_data_type)) + if txn.rowcount == 0: + # We didn't update any rows. This means that there was no matching room + # account data entry to delete in the first place. + return False + + # Ignored users get denormalized into a separate table as an optimisation. + if account_data_type == AccountDataTypes.IGNORED_USER_LIST: + # If this method was called with the ignored users account data type, we + # simply delete all ignored users. + + # First pull all the users that this user ignores. + previously_ignored_users = set( + self.db_pool.simple_select_onecol_txn( + txn, + table="ignored_users", + keyvalues={"ignorer_user_id": user_id}, + retcol="ignored_user_id", + ) + ) + + # Then delete them from the database. + self.db_pool.simple_delete_txn( + txn, + table="ignored_users", + keyvalues={"ignorer_user_id": user_id}, + ) + + # Invalidate the cache for ignored users which were removed. + for ignored_user_id in previously_ignored_users: + self._invalidate_cache_and_stream( + txn, self.ignored_by, (ignored_user_id,) + ) + + # Invalidate for this user the cache tracking ignored users. + self._invalidate_cache_and_stream(txn, self.ignored_users, (user_id,)) + + return True + + async with self._account_data_id_gen.get_next() as next_id: + row_updated = await self.db_pool.runInteraction( + "remove_account_data_for_user", + _remove_account_data_for_user_txn, + next_id, + ) + + if not row_updated: + return None + + self._account_data_stream_cache.entity_has_changed(user_id, next_id) + self.get_account_data_for_user.invalidate((user_id,)) + self.get_global_account_data_by_type_for_user.prefill( + (user_id, account_data_type), {} + ) + + return self._account_data_id_gen.get_current_token() + async def purge_account_data_for_user(self, user_id: str) -> None: """ Removes ALL the account data for a user. |