summary refs log tree commit diff
path: root/synapse/storage/databases/main/registration.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/registration.py')
-rw-r--r--synapse/storage/databases/main/registration.py287
1 files changed, 181 insertions, 106 deletions
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py

index cc964604e2..933d76e905 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -143,6 +143,30 @@ class LoginTokenLookupResult: """The session ID advertised by the SSO Identity Provider.""" +@attr.s(frozen=True, slots=True, auto_attribs=True) +class ThreepidResult: + medium: str + address: str + validated_at: int + added_at: int + + +@attr.s(frozen=True, slots=True, auto_attribs=True) +class ThreepidValidationSession: + address: str + """address of the 3pid""" + medium: str + """medium of the 3pid""" + client_secret: str + """a secret provided by the client for this validation session""" + session_id: str + """ID of the validation session""" + last_send_attempt: int + """a number serving to dedupe send attempts for this session""" + validated_at: Optional[int] + """timestamp of when this session was validated if so""" + + class RegistrationWorkerStore(CacheInvalidationWorkerStore): def __init__( self, @@ -195,7 +219,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]: """Returns info about the user account, if it exists.""" - def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]: + def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[UserInfo]: # We could technically use simple_select_one here, but it would not perform # the COALESCEs (unless hacked into the column names), which could yield # confusing results. @@ -213,35 +237,46 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): (user_id,), ) - rows = self.db_pool.cursor_to_dict(txn) - - if len(rows) == 0: + row = txn.fetchone() + if not row: return None - return rows[0] + ( + name, + is_guest, + admin, + consent_version, + consent_ts, + consent_server_notice_sent, + appservice_id, + creation_ts, + user_type, + deactivated, + shadow_banned, + approved, + locked, + ) = row + + return UserInfo( + appservice_id=appservice_id, + consent_server_notice_sent=consent_server_notice_sent, + consent_version=consent_version, + consent_ts=consent_ts, + creation_ts=creation_ts, + is_admin=bool(admin), + is_deactivated=bool(deactivated), + is_guest=bool(is_guest), + is_shadow_banned=bool(shadow_banned), + user_id=UserID.from_string(name), + user_type=user_type, + approved=bool(approved), + locked=bool(locked), + ) - row = await self.db_pool.runInteraction( + return await self.db_pool.runInteraction( desc="get_user_by_id", func=get_user_by_id_txn, ) - if row is None: - return None - - return UserInfo( - appservice_id=row["appservice_id"], - consent_server_notice_sent=row["consent_server_notice_sent"], - consent_version=row["consent_version"], - consent_ts=row["consent_ts"], - creation_ts=row["creation_ts"], - is_admin=bool(row["admin"]), - is_deactivated=bool(row["deactivated"]), - is_guest=bool(row["is_guest"]), - is_shadow_banned=bool(row["shadow_banned"]), - user_id=UserID.from_string(row["name"]), - user_type=row["user_type"], - approved=bool(row["approved"]), - locked=bool(row["locked"]), - ) async def is_trial_user(self, user_id: str) -> bool: """Checks if user is in the "trial" period, i.e. within the first @@ -579,16 +614,31 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """ txn.execute(sql, (token,)) - rows = self.db_pool.cursor_to_dict(txn) - - if rows: - row = rows[0] - - # This field is nullable, ensure it comes out as a boolean - if row["token_used"] is None: - row["token_used"] = False + row = txn.fetchone() - return TokenLookupResult(**row) + if row: + ( + user_id, + is_guest, + shadow_banned, + token_id, + device_id, + valid_until_ms, + token_owner, + token_used, + ) = row + + return TokenLookupResult( + user_id=user_id, + is_guest=is_guest, + shadow_banned=shadow_banned, + token_id=token_id, + device_id=device_id, + valid_until_ms=valid_until_ms, + token_owner=token_owner, + # This field is nullable, ensure it comes out as a boolean + token_used=bool(token_used), + ) return None @@ -821,23 +871,24 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): Returns: Tuples of (auth_provider, external_id) """ - res = await self.db_pool.simple_select_list( - table="user_external_ids", - keyvalues={"user_id": mxid}, - retcols=("auth_provider", "external_id"), - desc="get_external_ids_by_user", + return cast( + List[Tuple[str, str]], + await self.db_pool.simple_select_list( + table="user_external_ids", + keyvalues={"user_id": mxid}, + retcols=("auth_provider", "external_id"), + desc="get_external_ids_by_user", + ), ) - return [(r["auth_provider"], r["external_id"]) for r in res] async def count_all_users(self) -> int: """Counts all users registered on the homeserver.""" def _count_users(txn: LoggingTransaction) -> int: - txn.execute("SELECT COUNT(*) AS users FROM users") - rows = self.db_pool.cursor_to_dict(txn) - if rows: - return rows[0]["users"] - return 0 + txn.execute("SELECT COUNT(*) FROM users") + row = txn.fetchone() + assert row is not None + return row[0] return await self.db_pool.runInteraction("count_users", _count_users) @@ -891,11 +942,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Counts all users without a special user_type registered on the homeserver.""" def _count_users(txn: LoggingTransaction) -> int: - txn.execute("SELECT COUNT(*) AS users FROM users where user_type is null") - rows = self.db_pool.cursor_to_dict(txn) - if rows: - return rows[0]["users"] - return 0 + txn.execute("SELECT COUNT(*) FROM users where user_type is null") + row = txn.fetchone() + assert row is not None + return row[0] return await self.db_pool.runInteraction("count_real_users", _count_users) @@ -964,13 +1014,25 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): {"user_id": user_id, "validated_at": validated_at, "added_at": added_at}, ) - async def user_get_threepids(self, user_id: str) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( - "user_threepids", - {"user_id": user_id}, - ["medium", "address", "validated_at", "added_at"], - "user_get_threepids", + async def user_get_threepids(self, user_id: str) -> List[ThreepidResult]: + results = cast( + List[Tuple[str, str, int, int]], + await self.db_pool.simple_select_list( + "user_threepids", + keyvalues={"user_id": user_id}, + retcols=["medium", "address", "validated_at", "added_at"], + desc="user_get_threepids", + ), ) + return [ + ThreepidResult( + medium=r[0], + address=r[1], + validated_at=r[2], + added_at=r[3], + ) + for r in results + ] async def user_delete_threepid( self, user_id: str, medium: str, address: str @@ -1009,7 +1071,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): desc="add_user_bound_threepid", ) - async def user_get_bound_threepids(self, user_id: str) -> List[Dict[str, Any]]: + async def user_get_bound_threepids(self, user_id: str) -> List[Tuple[str, str]]: """Get the threepids that a user has bound to an identity server through the homeserver The homeserver remembers where binds to an identity server occurred. Using this method can retrieve those threepids. @@ -1018,15 +1080,18 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): user_id: The ID of the user to retrieve threepids for Returns: - List of dictionaries containing the following keys: - medium (str): The medium of the threepid (e.g "email") - address (str): The address of the threepid (e.g "bob@example.com") - """ - return await self.db_pool.simple_select_list( - table="user_threepid_id_server", - keyvalues={"user_id": user_id}, - retcols=["medium", "address"], - desc="user_get_bound_threepids", + List of tuples of two strings: + medium: The medium of the threepid (e.g "email") + address: The address of the threepid (e.g "bob@example.com") + """ + return cast( + List[Tuple[str, str]], + await self.db_pool.simple_select_list( + table="user_threepid_id_server", + keyvalues={"user_id": user_id}, + retcols=["medium", "address"], + desc="user_get_bound_threepids", + ), ) async def remove_user_bound_threepid( @@ -1123,7 +1188,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): address: Optional[str] = None, sid: Optional[str] = None, validated: Optional[bool] = True, - ) -> Optional[Dict[str, Any]]: + ) -> Optional[ThreepidValidationSession]: """Gets a session_id and last_send_attempt (if available) for a combination of validation metadata @@ -1138,15 +1203,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): perform no filtering Returns: - A dict containing the following: - * address - address of the 3pid - * medium - medium of the 3pid - * client_secret - a secret provided by the client for this validation session - * session_id - ID of the validation session - * send_attempt - a number serving to dedupe send attempts for this session - * validated_at - timestamp of when this session was validated if so - - Otherwise None if a validation session is not found + A ThreepidValidationSession or None if a validation session is not found """ if not client_secret: raise SynapseError( @@ -1165,7 +1222,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): def get_threepid_validation_session_txn( txn: LoggingTransaction, - ) -> Optional[Dict[str, Any]]: + ) -> Optional[ThreepidValidationSession]: sql = """ SELECT address, session_id, medium, client_secret, last_send_attempt, validated_at @@ -1180,11 +1237,18 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): sql += " LIMIT 1" txn.execute(sql, list(keyvalues.values())) - rows = self.db_pool.cursor_to_dict(txn) - if not rows: + row = txn.fetchone() + if not row: return None - return rows[0] + return ThreepidValidationSession( + address=row[0], + session_id=row[1], + medium=row[2], + client_secret=row[3], + last_send_attempt=row[4], + validated_at=row[5], + ) return await self.db_pool.runInteraction( "get_threepid_validation_session", get_threepid_validation_session_txn @@ -1252,12 +1316,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): ) txn.execute(sql, []) - res = self.db_pool.cursor_to_dict(txn) - if res: - for user in res: - self.set_expiration_date_for_user_txn( - txn, user["name"], use_delta=True - ) + for (name,) in txn.fetchall(): + self.set_expiration_date_for_user_txn(txn, name, use_delta=True) await self.db_pool.runInteraction( "get_users_with_no_expiration_date", @@ -1457,7 +1517,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): async def get_registration_tokens( self, valid: Optional[bool] = None - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[str, Optional[int], int, int, Optional[int]]]: """List all registration tokens. Used by the admin API. Args: @@ -1466,34 +1526,48 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): Default is None: return all tokens regardless of validity. Returns: - A list of dicts, each containing details of a token. + A list of tuples containing: + * The token + * The number of users allowed (or None) + * Whether it is pending + * Whether it has been completed + * An expiry time (or None if no expiry) """ def select_registration_tokens_txn( txn: LoggingTransaction, now: int, valid: Optional[bool] - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[str, Optional[int], int, int, Optional[int]]]: if valid is None: # Return all tokens regardless of validity - txn.execute("SELECT * FROM registration_tokens") + txn.execute( + """ + SELECT token, uses_allowed, pending, completed, expiry_time + FROM registration_tokens + """ + ) elif valid: # Select valid tokens only - sql = ( - "SELECT * FROM registration_tokens WHERE " - "(uses_allowed > pending + completed OR uses_allowed IS NULL) " - "AND (expiry_time > ? OR expiry_time IS NULL)" - ) + sql = """ + SELECT token, uses_allowed, pending, completed, expiry_time + FROM registration_tokens + WHERE (uses_allowed > pending + completed OR uses_allowed IS NULL) + AND (expiry_time > ? OR expiry_time IS NULL) + """ txn.execute(sql, [now]) else: # Select invalid tokens only - sql = ( - "SELECT * FROM registration_tokens WHERE " - "uses_allowed <= pending + completed OR expiry_time <= ?" - ) + sql = """ + SELECT token, uses_allowed, pending, completed, expiry_time + FROM registration_tokens + WHERE uses_allowed <= pending + completed OR expiry_time <= ? + """ txn.execute(sql, [now]) - return self.db_pool.cursor_to_dict(txn) + return cast( + List[Tuple[str, Optional[int], int, int, Optional[int]]], txn.fetchall() + ) return await self.db_pool.runInteraction( "select_registration_tokens", @@ -1963,11 +2037,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): (user_id,), ) - rows = self.db_pool.cursor_to_dict(txn) + row = txn.fetchone() + assert row is not None # We cast to bool because the value returned by the database engine might # be an integer if we're using SQLite. - return bool(rows[0]["approved"]) + return bool(row[0]) return await self.db_pool.runInteraction( desc="is_user_pending_approval", @@ -2045,22 +2120,22 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): (last_user, batch_size), ) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return True, 0 rows_processed_nb = 0 - for user in rows: - if not user["count_tokens"] and not user["count_threepids"]: - self.set_user_deactivated_status_txn(txn, user["name"], True) + for name, count_tokens, count_threepids in rows: + if not count_tokens and not count_threepids: + self.set_user_deactivated_status_txn(txn, name, True) rows_processed_nb += 1 logger.info("Marked %d rows as deactivated", rows_processed_nb) self.db_pool.updates._background_update_progress_txn( - txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]} + txn, "users_set_deactivated_flag", {"user_id": rows[-1][0]} ) if batch_size > len(rows):