diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index cb63cd9b7d..31f0f2bd3d 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -21,7 +21,13 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
import attr
from synapse.api.constants import UserTypes
-from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
+from synapse.api.errors import (
+ Codes,
+ NotFoundError,
+ StoreError,
+ SynapseError,
+ ThreepidValidationError,
+)
from synapse.config.homeserver import HomeServerConfig
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage.database import (
@@ -50,6 +56,14 @@ class ExternalIDReuseException(Exception):
because this external id is given to an other user."""
+class LoginTokenExpired(Exception):
+ """Exception if the login token sent expired"""
+
+
+class LoginTokenReused(Exception):
+ """Exception if the login token sent was already used"""
+
+
@attr.s(frozen=True, slots=True, auto_attribs=True)
class TokenLookupResult:
"""Result of looking up an access token.
@@ -69,9 +83,9 @@ class TokenLookupResult:
"""
user_id: str
+ token_id: int
is_guest: bool = False
shadow_banned: bool = False
- token_id: Optional[int] = None
device_id: Optional[str] = None
valid_until_ms: Optional[int] = None
token_owner: str = attr.ib()
@@ -115,6 +129,20 @@ class RefreshTokenLookupResult:
If None, the session can be refreshed indefinitely."""
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class LoginTokenLookupResult:
+ """Result of looking up a login token."""
+
+ user_id: str
+ """The user this token belongs to."""
+
+ auth_provider_id: Optional[str]
+ """The SSO Identity Provider that the user authenticated with, to get this token."""
+
+ auth_provider_session_id: Optional[str]
+ """The session ID advertised by the SSO Identity Provider."""
+
+
class RegistrationWorkerStore(CacheInvalidationWorkerStore):
def __init__(
self,
@@ -166,26 +194,49 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
@cached()
async def get_user_by_id(self, user_id: str) -> Optional[Dict[str, Any]]:
"""Deprecated: use get_userinfo_by_id instead"""
- return await self.db_pool.simple_select_one(
- table="users",
- keyvalues={"name": user_id},
- retcols=[
- "name",
- "password_hash",
- "is_guest",
- "admin",
- "consent_version",
- "consent_server_notice_sent",
- "appservice_id",
- "creation_ts",
- "user_type",
- "deactivated",
- "shadow_banned",
- ],
- allow_none=True,
+
+ def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]:
+ # We could technically use simple_select_one here, but it would not perform
+ # the COALESCEs (unless hacked into the column names), which could yield
+ # confusing results.
+ txn.execute(
+ """
+ SELECT
+ name, password_hash, is_guest, admin, consent_version, consent_ts,
+ consent_server_notice_sent, appservice_id, creation_ts, user_type,
+ deactivated, COALESCE(shadow_banned, FALSE) AS shadow_banned,
+ COALESCE(approved, TRUE) AS approved
+ FROM users
+ WHERE name = ?
+ """,
+ (user_id,),
+ )
+
+ rows = self.db_pool.cursor_to_dict(txn)
+
+ if len(rows) == 0:
+ return None
+
+ return rows[0]
+
+ row = await self.db_pool.runInteraction(
desc="get_user_by_id",
+ func=get_user_by_id_txn,
)
+ if row is not None:
+ # If we're using SQLite our boolean values will be integers. Because we
+ # present some of this data as is to e.g. server admins via REST APIs, we
+ # want to make sure we're returning the right type of data.
+ # Note: when adding a column name to this list, be wary of NULLable columns,
+ # since NULL values will be turned into False.
+ boolean_columns = ["admin", "deactivated", "shadow_banned", "approved"]
+ for column in boolean_columns:
+ if not isinstance(row[column], bool):
+ row[column] = bool(row[column])
+
+ return row
+
async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]:
"""Get a UserInfo object for a user by user ID.
@@ -902,7 +953,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"""Returns user id from threepid
Args:
- txn (cursor):
+ txn:
medium: threepid medium e.g. email
address: threepid address e.g. me@example.com
@@ -1232,8 +1283,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"""Sets an expiration date to the account with the given user ID.
Args:
- user_id (str): User ID to set an expiration date for.
- use_delta (bool): If set to False, the expiration date for the user will be
+ user_id: User ID to set an expiration date for.
+ use_delta: If set to False, the expiration date for the user will be
now + validity period. If set to True, this expiration date will be a
random value in the [now + period - d ; now + period] range, d being a
delta equal to 10% of the validity period.
@@ -1766,6 +1817,130 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"replace_refresh_token", _replace_refresh_token_txn
)
+ async def add_login_token_to_user(
+ self,
+ user_id: str,
+ token: str,
+ expiry_ts: int,
+ auth_provider_id: Optional[str],
+ auth_provider_session_id: Optional[str],
+ ) -> None:
+ """Adds a short-term login token for the given user.
+
+ Args:
+ user_id: The user ID.
+ token: The new login token to add.
+ expiry_ts (milliseconds since the epoch): Time after which the login token
+ cannot be used.
+ auth_provider_id: The SSO Identity Provider that the user authenticated with
+ to get this token, if any
+ auth_provider_session_id: The session ID advertised by the SSO Identity
+ Provider, if any.
+ """
+ await self.db_pool.simple_insert(
+ "login_tokens",
+ {
+ "token": token,
+ "user_id": user_id,
+ "expiry_ts": expiry_ts,
+ "auth_provider_id": auth_provider_id,
+ "auth_provider_session_id": auth_provider_session_id,
+ },
+ desc="add_login_token_to_user",
+ )
+
+ def _consume_login_token(
+ self,
+ txn: LoggingTransaction,
+ token: str,
+ ts: int,
+ ) -> LoginTokenLookupResult:
+ values = self.db_pool.simple_select_one_txn(
+ txn,
+ "login_tokens",
+ keyvalues={"token": token},
+ retcols=(
+ "user_id",
+ "expiry_ts",
+ "used_ts",
+ "auth_provider_id",
+ "auth_provider_session_id",
+ ),
+ allow_none=True,
+ )
+
+ if values is None:
+ raise NotFoundError()
+
+ self.db_pool.simple_update_one_txn(
+ txn,
+ "login_tokens",
+ keyvalues={"token": token},
+ updatevalues={"used_ts": ts},
+ )
+ user_id = values["user_id"]
+ expiry_ts = values["expiry_ts"]
+ used_ts = values["used_ts"]
+ auth_provider_id = values["auth_provider_id"]
+ auth_provider_session_id = values["auth_provider_session_id"]
+
+ # Token was already used
+ if used_ts is not None:
+ raise LoginTokenReused()
+
+ # Token expired
+ if ts > int(expiry_ts):
+ raise LoginTokenExpired()
+
+ return LoginTokenLookupResult(
+ user_id=user_id,
+ auth_provider_id=auth_provider_id,
+ auth_provider_session_id=auth_provider_session_id,
+ )
+
+ async def consume_login_token(self, token: str) -> LoginTokenLookupResult:
+ """Lookup a login token and consume it.
+
+ Args:
+ token: The login token.
+
+ Returns:
+ The data stored with that token, including the `user_id`. Returns `None` if
+ the token does not exist or if it expired.
+
+ Raises:
+ NotFound if the login token was not found in database
+ LoginTokenExpired if the login token expired
+ LoginTokenReused if the login token was already used
+ """
+ return await self.db_pool.runInteraction(
+ "consume_login_token",
+ self._consume_login_token,
+ token,
+ self._clock.time_msec(),
+ )
+
+ async def invalidate_login_tokens_by_session_id(
+ self, auth_provider_id: str, auth_provider_session_id: str
+ ) -> None:
+ """Invalidate login tokens with the given IdP session ID.
+
+ Args:
+ auth_provider_id: The SSO Identity Provider that the user authenticated with
+ to get this token
+ auth_provider_session_id: The session ID advertised by the SSO Identity
+ Provider
+ """
+ await self.db_pool.simple_update(
+ table="login_tokens",
+ keyvalues={
+ "auth_provider_id": auth_provider_id,
+ "auth_provider_session_id": auth_provider_session_id,
+ },
+ updatevalues={"used_ts": self._clock.time_msec()},
+ desc="invalidate_login_tokens_by_session_id",
+ )
+
@cached()
async def is_guest(self, user_id: str) -> bool:
res = await self.db_pool.simple_select_one_onecol(
@@ -1778,6 +1953,40 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
return res if res else False
+ @cached()
+ async def is_user_approved(self, user_id: str) -> bool:
+ """Checks if a user is approved and therefore can be allowed to log in.
+
+ If the user's 'approved' column is NULL, we consider it as true given it means
+ the user was registered when support for an approval flow was either disabled
+ or nonexistent.
+
+ Args:
+ user_id: the user to check the approval status of.
+
+ Returns:
+ A boolean that is True if the user is approved, False otherwise.
+ """
+
+ def is_user_approved_txn(txn: LoggingTransaction) -> bool:
+ txn.execute(
+ """
+ SELECT COALESCE(approved, TRUE) AS approved FROM users WHERE name = ?
+ """,
+ (user_id,),
+ )
+
+ rows = self.db_pool.cursor_to_dict(txn)
+
+ # We cast to bool because the value returned by the database engine might
+ # be an integer if we're using SQLite.
+ return bool(rows[0]["approved"])
+
+ return await self.db_pool.runInteraction(
+ desc="is_user_pending_approval",
+ func=is_user_approved_txn,
+ )
+
class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
def __init__(
@@ -1915,6 +2124,29 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
+ def update_user_approval_status_txn(
+ self, txn: LoggingTransaction, user_id: str, approved: bool
+ ) -> None:
+ """Set the user's 'approved' flag to the given value.
+
+ The boolean is turned into an int because the column is a smallint.
+
+ Args:
+ txn: the current database transaction.
+ user_id: the user to update the flag for.
+ approved: the value to set the flag to.
+ """
+ self.db_pool.simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"approved": approved},
+ )
+
+ # Invalidate the caches of methods that read the value of the 'approved' flag.
+ self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
+ self._invalidate_cache_and_stream(txn, self.is_user_approved, (user_id,))
+
class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
def __init__(
@@ -1932,6 +2164,19 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
+ # If support for MSC3866 is enabled and configured to require approval for new
+ # account, we will create new users with an 'approved' flag set to false.
+ self._require_approval = (
+ hs.config.experimental.msc3866.enabled
+ and hs.config.experimental.msc3866.require_approval_for_new_accounts
+ )
+
+ # Create a background job for removing expired login tokens
+ if hs.config.worker.run_background_tasks:
+ self._clock.looping_call(
+ self._delete_expired_login_tokens, THIRTY_MINUTES_IN_MS
+ )
+
async def add_access_token_to_user(
self,
user_id: str,
@@ -2064,6 +2309,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
admin: bool = False,
user_type: Optional[str] = None,
shadow_banned: bool = False,
+ approved: bool = False,
) -> None:
"""Attempts to register an account.
@@ -2082,6 +2328,8 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
or None for a normal user.
shadow_banned: Whether the user is shadow-banned, i.e. they may be
told their requests succeeded but we ignore them.
+ approved: Whether to consider the user has already been approved by an
+ administrator.
Raises:
StoreError if the user_id could not be registered.
@@ -2098,6 +2346,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
admin,
user_type,
shadow_banned,
+ approved,
)
def _register_user(
@@ -2112,11 +2361,14 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
admin: bool,
user_type: Optional[str],
shadow_banned: bool,
+ approved: bool,
) -> None:
user_id_obj = UserID.from_string(user_id)
now = int(self._clock.time())
+ user_approved = approved or not self._require_approval
+
try:
if was_guest:
# Ensure that the guest user actually exists
@@ -2142,6 +2394,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
"admin": 1 if admin else 0,
"user_type": user_type,
"shadow_banned": shadow_banned,
+ "approved": user_approved,
},
)
else:
@@ -2157,6 +2410,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
"admin": 1 if admin else 0,
"user_type": user_type,
"shadow_banned": shadow_banned,
+ "approved": user_approved,
},
)
@@ -2227,7 +2481,10 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
txn,
table="users",
keyvalues={"name": user_id},
- updatevalues={"consent_version": consent_version},
+ updatevalues={
+ "consent_version": consent_version,
+ "consent_ts": self._clock.time_msec(),
+ },
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
@@ -2499,6 +2756,42 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
start_or_continue_validation_session_txn,
)
+ async def update_user_approval_status(
+ self, user_id: UserID, approved: bool
+ ) -> None:
+ """Set the user's 'approved' flag to the given value.
+
+ The boolean will be turned into an int (in update_user_approval_status_txn)
+ because the column is a smallint.
+
+ Args:
+ user_id: the user to update the flag for.
+ approved: the value to set the flag to.
+ """
+ await self.db_pool.runInteraction(
+ "update_user_approval_status",
+ self.update_user_approval_status_txn,
+ user_id.to_string(),
+ approved,
+ )
+
+ @wrap_as_background_process("delete_expired_login_tokens")
+ async def _delete_expired_login_tokens(self) -> None:
+ """Remove login tokens with expiry dates that have passed."""
+
+ def _delete_expired_login_tokens_txn(txn: LoggingTransaction, ts: int) -> None:
+ sql = "DELETE FROM login_tokens WHERE expiry_ts <= ?"
+ txn.execute(sql, (ts,))
+
+ # We keep the expired tokens for an extra 5 minutes so we can measure how many
+ # times a token is being used after its expiry
+ now = self._clock.time_msec()
+ await self.db_pool.runInteraction(
+ "delete_expired_login_tokens",
+ _delete_expired_login_tokens_txn,
+ now - (5 * 60 * 1000),
+ )
+
def find_max_generated_user_id_localpart(cur: Cursor) -> int:
"""
|