summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/event_push_actions.py91
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py75
-rw-r--r--synapse/storage/databases/main/registration.py395
3 files changed, 2 insertions, 559 deletions
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py

index f42023418e..6fb4a6df8c 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py
@@ -1034,97 +1034,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas # one of the subqueries may have hit the limit. return notifs[:limit] - async def get_unread_push_actions_for_user_in_range_for_email( - self, - user_id: str, - min_stream_ordering: int, - max_stream_ordering: int, - limit: int = 20, - ) -> List[EmailPushAction]: - """Get a list of the most recent unread push actions for a given user, - within the given stream ordering range. Called by the emailpusher - - Args: - user_id: The user to fetch push actions for. - min_stream_ordering: The exclusive lower bound on the - stream ordering of event push actions to fetch. - max_stream_ordering: The inclusive upper bound on the - stream ordering of event push actions to fetch. - limit: The maximum number of rows to return. - Returns: - A list of dicts with the keys "event_id", "room_id", "stream_ordering", "actions", "received_ts". - The list will be ordered by descending received_ts. - The list will have between 0~limit entries. - """ - - def get_push_actions_txn( - txn: LoggingTransaction, - ) -> List[Tuple[str, str, str, int, str, bool, int]]: - sql = """ - SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering, - ep.actions, ep.highlight, e.received_ts - FROM event_push_actions AS ep - INNER JOIN events AS e USING (room_id, event_id) - WHERE - ep.user_id = ? - AND ep.stream_ordering > ? - AND ep.stream_ordering <= ? - AND ep.notif = 1 - ORDER BY ep.stream_ordering DESC LIMIT ? - """ - txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit)) - return cast(List[Tuple[str, str, str, int, str, bool, int]], txn.fetchall()) - - push_actions = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email", get_push_actions_txn - ) - - room_ids = set() - thread_ids = [] - for ( - _, - room_id, - thread_id, - _, - _, - _, - _, - ) in push_actions: - room_ids.add(room_id) - thread_ids.append(thread_id) - - receipts_by_room = await self.db_pool.runInteraction( - "get_unread_push_actions_for_user_in_range_email_receipts", - self._get_receipts_for_room_and_threads_txn, - user_id=user_id, - room_ids=room_ids, - thread_ids=thread_ids, - ) - - # Make a list of dicts from the two sets of results. - notifs = [ - EmailPushAction( - event_id=event_id, - room_id=room_id, - stream_ordering=stream_ordering, - actions=_deserialize_action(actions, highlight), - received_ts=received_ts, - ) - for event_id, room_id, thread_id, stream_ordering, actions, highlight, received_ts in push_actions - if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread( - thread_id, stream_ordering - ) - ] - - # Now sort it so it's ordered correctly, since currently it will - # contain results from the first query, correctly ordered, followed - # by results from the second query, but we want them all ordered - # by received_ts (most recent first) - notifs.sort(key=lambda r: -(r.received_ts or 0)) - - # Now return the first `limit` - return notifs[:limit] - async def get_if_maybe_push_in_range_for_user( self, user_id: str, min_stream_ordering: int ) -> bool: diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 659ee13d71..c384675839 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -29,7 +29,6 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.registration import RegistrationWorkerStore from synapse.util.caches.descriptors import cached -from synapse.util.threepids import canonicalise_email if TYPE_CHECKING: from synapse.server import HomeServer @@ -65,18 +64,6 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): self._mau_stats_only = hs.config.server.mau_stats_only - if self._update_on_this_worker: - # Do not add more reserved users than the total allowable number - self.db_pool.new_transaction( - db_conn, - "initialise_mau_threepids", - [], - [], - [], - self._initialise_reserved_users, - hs.config.server.mau_limits_reserved_threepids[: self._max_mau_value], - ) - @cached(num_args=0) async def get_monthly_active_count(self) -> int: """Generates current count of monthly active users @@ -174,26 +161,6 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): return await self.db_pool.runInteraction("list_users", _list_users) - async def get_registered_reserved_users(self) -> List[str]: - """Of the reserved threepids defined in config, retrieve those that are associated - with registered users - - Returns: - User IDs of actual users that are reserved - """ - users = [] - - for tp in self.hs.config.server.mau_limits_reserved_threepids[ - : self.hs.config.server.max_mau_value - ]: - user_id = await self.hs.get_datastores().main.get_user_id_by_threepid( - tp["medium"], canonicalise_email(tp["address"]) - ) - if user_id: - users.append(user_id) - - return users - @cached(num_args=1) async def user_last_seen_monthly_active(self, user_id: str) -> Optional[int]: """ @@ -289,50 +256,10 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): ) self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ()) - reserved_users = await self.get_registered_reserved_users() await self.db_pool.runInteraction( - "reap_monthly_active_users", _reap_users, reserved_users - ) - - def _initialise_reserved_users( - self, txn: LoggingTransaction, threepids: List[dict] - ) -> None: - """Ensures that reserved threepids are accounted for in the MAU table, should - be called on start up. - - Args: - txn: - threepids: List of threepid dicts to reserve - """ - assert self._update_on_this_worker, ( - "This worker is not designated to update MAUs" + "reap_monthly_active_users", _reap_users, [] ) - # XXX what is this function trying to achieve? It upserts into - # monthly_active_users for each *registered* reserved mau user, but why? - # - # - shouldn't there already be an entry for each reserved user (at least - # if they have been active recently)? - # - # - if it's important that the timestamp is kept up to date, why do we only - # run this at startup? - - for tp in threepids: - user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"]) - - if user_id: - is_support = self.is_support_user_txn(txn, user_id) - if not is_support: - # We do this manually here to avoid hitting https://github.com/matrix-org/synapse/issues/6791 - self.db_pool.simple_upsert_txn( - txn, - table="monthly_active_users", - keyvalues={"user_id": user_id}, - values={"timestamp": int(self._clock.time_msec())}, - ) - else: - logger.warning("mau limit reserved threepid %s not found in db" % tp) - async def upsert_monthly_active_user(self, user_id: str) -> None: """Updates or inserts the user into the monthly active user table, which is used to track the current MAU usage of the server diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 40c551bcb4..868803e169 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py
@@ -32,7 +32,6 @@ from synapse.api.errors import ( NotFoundError, StoreError, SynapseError, - ThreepidValidationError, ) from synapse.config.homeserver import HomeServerConfig from synapse.metrics.background_process_metrics import wrap_as_background_process @@ -149,30 +148,6 @@ class LoginTokenLookupResult: """The session ID advertised by the SSO Identity Provider.""" -@attr.s(frozen=True, slots=True, auto_attribs=True) -class ThreepidResult: - medium: str - address: str - validated_at: int - added_at: int - - -@attr.s(frozen=True, slots=True, auto_attribs=True) -class ThreepidValidationSession: - address: str - """address of the 3pid""" - medium: str - """medium of the 3pid""" - client_secret: str - """a secret provided by the client for this validation session""" - session_id: str - """ID of the validation session""" - last_send_attempt: int - """a number serving to dedupe send attempts for this session""" - validated_at: Optional[int] - """timestamp of when this session was validated if so""" - - class RegistrationWorkerStore(CacheInvalidationWorkerStore): def __init__( self, @@ -215,12 +190,6 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): self._set_expiration_date_when_missing, ) - # Create a background job for culling expired 3PID validity tokens - if hs.config.worker.run_background_tasks: - self._clock.looping_call( - self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS - ) - @cached() async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]: """Returns info about the user account, if it exists.""" @@ -984,161 +953,6 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): return str(next_id) - async def get_user_id_by_threepid(self, medium: str, address: str) -> Optional[str]: - """Returns user id from threepid - - Args: - medium: threepid medium e.g. email - address: threepid address e.g. me@example.com. This must already be - in canonical form. - - Returns: - The user ID or None if no user id/threepid mapping exists - """ - user_id = await self.db_pool.runInteraction( - "get_user_id_by_threepid", self.get_user_id_by_threepid_txn, medium, address - ) - return user_id - - def get_user_id_by_threepid_txn( - self, txn: LoggingTransaction, medium: str, address: str - ) -> Optional[str]: - """Returns user id from threepid - - Args: - txn: - medium: threepid medium e.g. email - address: threepid address e.g. me@example.com - - Returns: - user id, or None if no user id/threepid mapping exists - """ - return self.db_pool.simple_select_one_onecol_txn( - txn, - "user_threepids", - {"medium": medium, "address": address}, - "user_id", - True, - ) - - async def user_add_threepid( - self, - user_id: str, - medium: str, - address: str, - validated_at: int, - added_at: int, - ) -> None: - await self.db_pool.simple_upsert( - "user_threepids", - {"medium": medium, "address": address}, - {"user_id": user_id, "validated_at": validated_at, "added_at": added_at}, - ) - - async def user_get_threepids(self, user_id: str) -> List[ThreepidResult]: - results = cast( - List[Tuple[str, str, int, int]], - await self.db_pool.simple_select_list( - "user_threepids", - keyvalues={"user_id": user_id}, - retcols=["medium", "address", "validated_at", "added_at"], - desc="user_get_threepids", - ), - ) - return [ - ThreepidResult( - medium=r[0], - address=r[1], - validated_at=r[2], - added_at=r[3], - ) - for r in results - ] - - async def user_delete_threepid( - self, user_id: str, medium: str, address: str - ) -> None: - await self.db_pool.simple_delete( - "user_threepids", - keyvalues={"user_id": user_id, "medium": medium, "address": address}, - desc="user_delete_threepid", - ) - - async def add_user_bound_threepid( - self, user_id: str, medium: str, address: str, id_server: str - ) -> None: - """The server proxied a bind request to the given identity server on - behalf of the given user. We need to remember this in case the user - asks us to unbind the threepid. - - Args: - user_id - medium - address - id_server - """ - # We need to use an upsert, in case they user had already bound the - # threepid - await self.db_pool.simple_upsert( - table="user_threepid_id_server", - keyvalues={ - "user_id": user_id, - "medium": medium, - "address": address, - "id_server": id_server, - }, - values={}, - insertion_values={}, - desc="add_user_bound_threepid", - ) - - async def user_get_bound_threepids(self, user_id: str) -> List[Tuple[str, str]]: - """Get the threepids that a user has bound to an identity server through the homeserver - The homeserver remembers where binds to an identity server occurred. Using this - method can retrieve those threepids. - - Args: - user_id: The ID of the user to retrieve threepids for - - Returns: - List of tuples of two strings: - medium: The medium of the threepid (e.g "email") - address: The address of the threepid (e.g "bob@example.com") - """ - return cast( - List[Tuple[str, str]], - await self.db_pool.simple_select_list( - table="user_threepid_id_server", - keyvalues={"user_id": user_id}, - retcols=["medium", "address"], - desc="user_get_bound_threepids", - ), - ) - - async def remove_user_bound_threepid( - self, user_id: str, medium: str, address: str, id_server: str - ) -> None: - """The server proxied an unbind request to the given identity server on - behalf of the given user, so we remove the mapping of threepid to - identity server. - - Args: - user_id - medium - address - id_server - """ - await self.db_pool.simple_delete( - table="user_threepid_id_server", - keyvalues={ - "user_id": user_id, - "medium": medium, - "address": address, - "id_server": id_server, - }, - desc="remove_user_bound_threepid", - ) - async def get_id_servers_user_bound( self, user_id: str, medium: str, address: str ) -> List[str]: @@ -1223,121 +1037,6 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): return bool(res) - async def get_threepid_validation_session( - self, - medium: Optional[str], - client_secret: str, - address: Optional[str] = None, - sid: Optional[str] = None, - validated: Optional[bool] = True, - ) -> Optional[ThreepidValidationSession]: - """Gets a session_id and last_send_attempt (if available) for a - combination of validation metadata - - Args: - medium: The medium of the 3PID - client_secret: A unique string provided by the client to help identify this - validation attempt - address: The address of the 3PID - sid: The ID of the validation session - validated: Whether sessions should be filtered by - whether they have been validated already or not. None to - perform no filtering - - Returns: - A ThreepidValidationSession or None if a validation session is not found - """ - if not client_secret: - raise SynapseError( - 400, "Missing parameter: client_secret", errcode=Codes.MISSING_PARAM - ) - - keyvalues = {"client_secret": client_secret} - if medium: - keyvalues["medium"] = medium - if address: - keyvalues["address"] = address - if sid: - keyvalues["session_id"] = sid - - assert address or sid - - def get_threepid_validation_session_txn( - txn: LoggingTransaction, - ) -> Optional[ThreepidValidationSession]: - sql = """ - SELECT address, session_id, medium, client_secret, - last_send_attempt, validated_at - FROM threepid_validation_session WHERE %s - """ % (" AND ".join("%s = ?" % k for k in keyvalues.keys()),) - - if validated is not None: - sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL") - - sql += " LIMIT 1" - - txn.execute(sql, list(keyvalues.values())) - row = txn.fetchone() - if not row: - return None - - return ThreepidValidationSession( - address=row[0], - session_id=row[1], - medium=row[2], - client_secret=row[3], - last_send_attempt=row[4], - validated_at=row[5], - ) - - return await self.db_pool.runInteraction( - "get_threepid_validation_session", get_threepid_validation_session_txn - ) - - async def delete_threepid_session(self, session_id: str) -> None: - """Removes a threepid validation session from the database. This can - be done after validation has been performed and whatever action was - waiting on it has been carried out - - Args: - session_id: The ID of the session to delete - """ - - def delete_threepid_session_txn(txn: LoggingTransaction) -> None: - self.db_pool.simple_delete_txn( - txn, - table="threepid_validation_token", - keyvalues={"session_id": session_id}, - ) - self.db_pool.simple_delete_txn( - txn, - table="threepid_validation_session", - keyvalues={"session_id": session_id}, - ) - - await self.db_pool.runInteraction( - "delete_threepid_session", delete_threepid_session_txn - ) - - @wrap_as_background_process("cull_expired_threepid_validation_tokens") - async def cull_expired_threepid_validation_tokens(self) -> None: - """Remove threepid validation tokens with expiry dates that have passed""" - - def cull_expired_threepid_validation_tokens_txn( - txn: LoggingTransaction, ts: int - ) -> None: - sql = """ - DELETE FROM threepid_validation_token WHERE - expires < ? - """ - txn.execute(sql, (ts,)) - - await self.db_pool.runInteraction( - "cull_expired_threepid_validation_tokens", - cull_expired_threepid_validation_tokens_txn, - self._clock.time_msec(), - ) - @wrap_as_background_process("account_validity_set_expiration_dates") async def _set_expiration_date_when_missing(self) -> None: """ @@ -2361,9 +2060,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): ): super().__init__(database, db_conn, hs) - self._ignore_unknown_session_error = ( - hs.config.server.request_token_inhibit_3pid_errors - ) + self._ignore_unknown_session_error = False # Used to use whether 3pid errors were suppressed or not... Problem? self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id") @@ -2832,96 +2529,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): desc="add_user_pending_deactivation", ) - async def validate_threepid_session( - self, session_id: str, client_secret: str, token: str, current_ts: int - ) -> Optional[str]: - """Attempt to validate a threepid session using a token - - Args: - session_id: The id of a validation session - client_secret: A unique string provided by the client to help identify - this validation attempt - token: A validation token - current_ts: The current unix time in milliseconds. Used for checking - token expiry status - - Raises: - ThreepidValidationError: if a matching validation token was not found or has - expired - - Returns: - A str representing a link to redirect the user to if there is one. - """ - - # Insert everything into a transaction in order to run atomically - def validate_threepid_session_txn(txn: LoggingTransaction) -> Optional[str]: - row = self.db_pool.simple_select_one_txn( - txn, - table="threepid_validation_session", - keyvalues={"session_id": session_id}, - retcols=["client_secret", "validated_at"], - allow_none=True, - ) - - if not row: - if self._ignore_unknown_session_error: - # If we need to inhibit the error caused by an incorrect session ID, - # use None as placeholder values for the client secret and the - # validation timestamp. - # It shouldn't be an issue because they're both only checked after - # the token check, which should fail. And if it doesn't for some - # reason, the next check is on the client secret, which is NOT NULL, - # so we don't have to worry about the client secret matching by - # accident. - row = None, None - else: - raise ThreepidValidationError("Unknown session_id") - - retrieved_client_secret, validated_at = row - - row = self.db_pool.simple_select_one_txn( - txn, - table="threepid_validation_token", - keyvalues={"session_id": session_id, "token": token}, - retcols=["expires", "next_link"], - allow_none=True, - ) - - if not row: - raise ThreepidValidationError( - "Validation token not found or has expired" - ) - expires, next_link = row - - if retrieved_client_secret != client_secret: - raise ThreepidValidationError( - "This client_secret does not match the provided session_id" - ) - - # If the session is already validated, no need to revalidate - if validated_at: - return next_link - - if expires <= current_ts: - raise ThreepidValidationError( - "This token has expired. Please request a new one" - ) - - # Looks good. Validate the session - self.db_pool.simple_update_txn( - txn, - table="threepid_validation_session", - keyvalues={"session_id": session_id}, - updatevalues={"validated_at": self._clock.time_msec()}, - ) - - return next_link - - # Return next_link if it exists - return await self.db_pool.runInteraction( - "validate_threepid_session_txn", validate_threepid_session_txn - ) - async def start_or_continue_validation_session( self, medium: str,