diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index f42023418e..6fb4a6df8c 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -1034,97 +1034,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# one of the subqueries may have hit the limit.
return notifs[:limit]
- async def get_unread_push_actions_for_user_in_range_for_email(
- self,
- user_id: str,
- min_stream_ordering: int,
- max_stream_ordering: int,
- limit: int = 20,
- ) -> List[EmailPushAction]:
- """Get a list of the most recent unread push actions for a given user,
- within the given stream ordering range. Called by the emailpusher
-
- Args:
- user_id: The user to fetch push actions for.
- min_stream_ordering: The exclusive lower bound on the
- stream ordering of event push actions to fetch.
- max_stream_ordering: The inclusive upper bound on the
- stream ordering of event push actions to fetch.
- limit: The maximum number of rows to return.
- Returns:
- A list of dicts with the keys "event_id", "room_id", "stream_ordering", "actions", "received_ts".
- The list will be ordered by descending received_ts.
- The list will have between 0~limit entries.
- """
-
- def get_push_actions_txn(
- txn: LoggingTransaction,
- ) -> List[Tuple[str, str, str, int, str, bool, int]]:
- sql = """
- SELECT ep.event_id, ep.room_id, ep.thread_id, ep.stream_ordering,
- ep.actions, ep.highlight, e.received_ts
- FROM event_push_actions AS ep
- INNER JOIN events AS e USING (room_id, event_id)
- WHERE
- ep.user_id = ?
- AND ep.stream_ordering > ?
- AND ep.stream_ordering <= ?
- AND ep.notif = 1
- ORDER BY ep.stream_ordering DESC LIMIT ?
- """
- txn.execute(sql, (user_id, min_stream_ordering, max_stream_ordering, limit))
- return cast(List[Tuple[str, str, str, int, str, bool, int]], txn.fetchall())
-
- push_actions = await self.db_pool.runInteraction(
- "get_unread_push_actions_for_user_in_range_email", get_push_actions_txn
- )
-
- room_ids = set()
- thread_ids = []
- for (
- _,
- room_id,
- thread_id,
- _,
- _,
- _,
- _,
- ) in push_actions:
- room_ids.add(room_id)
- thread_ids.append(thread_id)
-
- receipts_by_room = await self.db_pool.runInteraction(
- "get_unread_push_actions_for_user_in_range_email_receipts",
- self._get_receipts_for_room_and_threads_txn,
- user_id=user_id,
- room_ids=room_ids,
- thread_ids=thread_ids,
- )
-
- # Make a list of dicts from the two sets of results.
- notifs = [
- EmailPushAction(
- event_id=event_id,
- room_id=room_id,
- stream_ordering=stream_ordering,
- actions=_deserialize_action(actions, highlight),
- received_ts=received_ts,
- )
- for event_id, room_id, thread_id, stream_ordering, actions, highlight, received_ts in push_actions
- if receipts_by_room.get(room_id, MISSING_ROOM_RECEIPT).is_unread(
- thread_id, stream_ordering
- )
- ]
-
- # Now sort it so it's ordered correctly, since currently it will
- # contain results from the first query, correctly ordered, followed
- # by results from the second query, but we want them all ordered
- # by received_ts (most recent first)
- notifs.sort(key=lambda r: -(r.received_ts or 0))
-
- # Now return the first `limit`
- return notifs[:limit]
-
async def get_if_maybe_push_in_range_for_user(
self, user_id: str, min_stream_ordering: int
) -> bool:
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 659ee13d71..c384675839 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -29,7 +29,6 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.registration import RegistrationWorkerStore
from synapse.util.caches.descriptors import cached
-from synapse.util.threepids import canonicalise_email
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -65,18 +64,6 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
self._mau_stats_only = hs.config.server.mau_stats_only
- if self._update_on_this_worker:
- # Do not add more reserved users than the total allowable number
- self.db_pool.new_transaction(
- db_conn,
- "initialise_mau_threepids",
- [],
- [],
- [],
- self._initialise_reserved_users,
- hs.config.server.mau_limits_reserved_threepids[: self._max_mau_value],
- )
-
@cached(num_args=0)
async def get_monthly_active_count(self) -> int:
"""Generates current count of monthly active users
@@ -174,26 +161,6 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
return await self.db_pool.runInteraction("list_users", _list_users)
- async def get_registered_reserved_users(self) -> List[str]:
- """Of the reserved threepids defined in config, retrieve those that are associated
- with registered users
-
- Returns:
- User IDs of actual users that are reserved
- """
- users = []
-
- for tp in self.hs.config.server.mau_limits_reserved_threepids[
- : self.hs.config.server.max_mau_value
- ]:
- user_id = await self.hs.get_datastores().main.get_user_id_by_threepid(
- tp["medium"], canonicalise_email(tp["address"])
- )
- if user_id:
- users.append(user_id)
-
- return users
-
@cached(num_args=1)
async def user_last_seen_monthly_active(self, user_id: str) -> Optional[int]:
"""
@@ -289,50 +256,10 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
)
self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
- reserved_users = await self.get_registered_reserved_users()
await self.db_pool.runInteraction(
- "reap_monthly_active_users", _reap_users, reserved_users
- )
-
- def _initialise_reserved_users(
- self, txn: LoggingTransaction, threepids: List[dict]
- ) -> None:
- """Ensures that reserved threepids are accounted for in the MAU table, should
- be called on start up.
-
- Args:
- txn:
- threepids: List of threepid dicts to reserve
- """
- assert self._update_on_this_worker, (
- "This worker is not designated to update MAUs"
+ "reap_monthly_active_users", _reap_users, []
)
- # XXX what is this function trying to achieve? It upserts into
- # monthly_active_users for each *registered* reserved mau user, but why?
- #
- # - shouldn't there already be an entry for each reserved user (at least
- # if they have been active recently)?
- #
- # - if it's important that the timestamp is kept up to date, why do we only
- # run this at startup?
-
- for tp in threepids:
- user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"])
-
- if user_id:
- is_support = self.is_support_user_txn(txn, user_id)
- if not is_support:
- # We do this manually here to avoid hitting https://github.com/matrix-org/synapse/issues/6791
- self.db_pool.simple_upsert_txn(
- txn,
- table="monthly_active_users",
- keyvalues={"user_id": user_id},
- values={"timestamp": int(self._clock.time_msec())},
- )
- else:
- logger.warning("mau limit reserved threepid %s not found in db" % tp)
-
async def upsert_monthly_active_user(self, user_id: str) -> None:
"""Updates or inserts the user into the monthly active user table, which
is used to track the current MAU usage of the server
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 40c551bcb4..868803e169 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -32,7 +32,6 @@ from synapse.api.errors import (
NotFoundError,
StoreError,
SynapseError,
- ThreepidValidationError,
)
from synapse.config.homeserver import HomeServerConfig
from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -149,30 +148,6 @@ class LoginTokenLookupResult:
"""The session ID advertised by the SSO Identity Provider."""
-@attr.s(frozen=True, slots=True, auto_attribs=True)
-class ThreepidResult:
- medium: str
- address: str
- validated_at: int
- added_at: int
-
-
-@attr.s(frozen=True, slots=True, auto_attribs=True)
-class ThreepidValidationSession:
- address: str
- """address of the 3pid"""
- medium: str
- """medium of the 3pid"""
- client_secret: str
- """a secret provided by the client for this validation session"""
- session_id: str
- """ID of the validation session"""
- last_send_attempt: int
- """a number serving to dedupe send attempts for this session"""
- validated_at: Optional[int]
- """timestamp of when this session was validated if so"""
-
-
class RegistrationWorkerStore(CacheInvalidationWorkerStore):
def __init__(
self,
@@ -215,12 +190,6 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
self._set_expiration_date_when_missing,
)
- # Create a background job for culling expired 3PID validity tokens
- if hs.config.worker.run_background_tasks:
- self._clock.looping_call(
- self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
- )
-
@cached()
async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]:
"""Returns info about the user account, if it exists."""
@@ -984,161 +953,6 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
return str(next_id)
- async def get_user_id_by_threepid(self, medium: str, address: str) -> Optional[str]:
- """Returns user id from threepid
-
- Args:
- medium: threepid medium e.g. email
- address: threepid address e.g. me@example.com. This must already be
- in canonical form.
-
- Returns:
- The user ID or None if no user id/threepid mapping exists
- """
- user_id = await self.db_pool.runInteraction(
- "get_user_id_by_threepid", self.get_user_id_by_threepid_txn, medium, address
- )
- return user_id
-
- def get_user_id_by_threepid_txn(
- self, txn: LoggingTransaction, medium: str, address: str
- ) -> Optional[str]:
- """Returns user id from threepid
-
- Args:
- txn:
- medium: threepid medium e.g. email
- address: threepid address e.g. me@example.com
-
- Returns:
- user id, or None if no user id/threepid mapping exists
- """
- return self.db_pool.simple_select_one_onecol_txn(
- txn,
- "user_threepids",
- {"medium": medium, "address": address},
- "user_id",
- True,
- )
-
- async def user_add_threepid(
- self,
- user_id: str,
- medium: str,
- address: str,
- validated_at: int,
- added_at: int,
- ) -> None:
- await self.db_pool.simple_upsert(
- "user_threepids",
- {"medium": medium, "address": address},
- {"user_id": user_id, "validated_at": validated_at, "added_at": added_at},
- )
-
- async def user_get_threepids(self, user_id: str) -> List[ThreepidResult]:
- results = cast(
- List[Tuple[str, str, int, int]],
- await self.db_pool.simple_select_list(
- "user_threepids",
- keyvalues={"user_id": user_id},
- retcols=["medium", "address", "validated_at", "added_at"],
- desc="user_get_threepids",
- ),
- )
- return [
- ThreepidResult(
- medium=r[0],
- address=r[1],
- validated_at=r[2],
- added_at=r[3],
- )
- for r in results
- ]
-
- async def user_delete_threepid(
- self, user_id: str, medium: str, address: str
- ) -> None:
- await self.db_pool.simple_delete(
- "user_threepids",
- keyvalues={"user_id": user_id, "medium": medium, "address": address},
- desc="user_delete_threepid",
- )
-
- async def add_user_bound_threepid(
- self, user_id: str, medium: str, address: str, id_server: str
- ) -> None:
- """The server proxied a bind request to the given identity server on
- behalf of the given user. We need to remember this in case the user
- asks us to unbind the threepid.
-
- Args:
- user_id
- medium
- address
- id_server
- """
- # We need to use an upsert, in case they user had already bound the
- # threepid
- await self.db_pool.simple_upsert(
- table="user_threepid_id_server",
- keyvalues={
- "user_id": user_id,
- "medium": medium,
- "address": address,
- "id_server": id_server,
- },
- values={},
- insertion_values={},
- desc="add_user_bound_threepid",
- )
-
- async def user_get_bound_threepids(self, user_id: str) -> List[Tuple[str, str]]:
- """Get the threepids that a user has bound to an identity server through the homeserver
- The homeserver remembers where binds to an identity server occurred. Using this
- method can retrieve those threepids.
-
- Args:
- user_id: The ID of the user to retrieve threepids for
-
- Returns:
- List of tuples of two strings:
- medium: The medium of the threepid (e.g "email")
- address: The address of the threepid (e.g "bob@example.com")
- """
- return cast(
- List[Tuple[str, str]],
- await self.db_pool.simple_select_list(
- table="user_threepid_id_server",
- keyvalues={"user_id": user_id},
- retcols=["medium", "address"],
- desc="user_get_bound_threepids",
- ),
- )
-
- async def remove_user_bound_threepid(
- self, user_id: str, medium: str, address: str, id_server: str
- ) -> None:
- """The server proxied an unbind request to the given identity server on
- behalf of the given user, so we remove the mapping of threepid to
- identity server.
-
- Args:
- user_id
- medium
- address
- id_server
- """
- await self.db_pool.simple_delete(
- table="user_threepid_id_server",
- keyvalues={
- "user_id": user_id,
- "medium": medium,
- "address": address,
- "id_server": id_server,
- },
- desc="remove_user_bound_threepid",
- )
-
async def get_id_servers_user_bound(
self, user_id: str, medium: str, address: str
) -> List[str]:
@@ -1223,121 +1037,6 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
return bool(res)
- async def get_threepid_validation_session(
- self,
- medium: Optional[str],
- client_secret: str,
- address: Optional[str] = None,
- sid: Optional[str] = None,
- validated: Optional[bool] = True,
- ) -> Optional[ThreepidValidationSession]:
- """Gets a session_id and last_send_attempt (if available) for a
- combination of validation metadata
-
- Args:
- medium: The medium of the 3PID
- client_secret: A unique string provided by the client to help identify this
- validation attempt
- address: The address of the 3PID
- sid: The ID of the validation session
- validated: Whether sessions should be filtered by
- whether they have been validated already or not. None to
- perform no filtering
-
- Returns:
- A ThreepidValidationSession or None if a validation session is not found
- """
- if not client_secret:
- raise SynapseError(
- 400, "Missing parameter: client_secret", errcode=Codes.MISSING_PARAM
- )
-
- keyvalues = {"client_secret": client_secret}
- if medium:
- keyvalues["medium"] = medium
- if address:
- keyvalues["address"] = address
- if sid:
- keyvalues["session_id"] = sid
-
- assert address or sid
-
- def get_threepid_validation_session_txn(
- txn: LoggingTransaction,
- ) -> Optional[ThreepidValidationSession]:
- sql = """
- SELECT address, session_id, medium, client_secret,
- last_send_attempt, validated_at
- FROM threepid_validation_session WHERE %s
- """ % (" AND ".join("%s = ?" % k for k in keyvalues.keys()),)
-
- if validated is not None:
- sql += " AND validated_at IS " + ("NOT NULL" if validated else "NULL")
-
- sql += " LIMIT 1"
-
- txn.execute(sql, list(keyvalues.values()))
- row = txn.fetchone()
- if not row:
- return None
-
- return ThreepidValidationSession(
- address=row[0],
- session_id=row[1],
- medium=row[2],
- client_secret=row[3],
- last_send_attempt=row[4],
- validated_at=row[5],
- )
-
- return await self.db_pool.runInteraction(
- "get_threepid_validation_session", get_threepid_validation_session_txn
- )
-
- async def delete_threepid_session(self, session_id: str) -> None:
- """Removes a threepid validation session from the database. This can
- be done after validation has been performed and whatever action was
- waiting on it has been carried out
-
- Args:
- session_id: The ID of the session to delete
- """
-
- def delete_threepid_session_txn(txn: LoggingTransaction) -> None:
- self.db_pool.simple_delete_txn(
- txn,
- table="threepid_validation_token",
- keyvalues={"session_id": session_id},
- )
- self.db_pool.simple_delete_txn(
- txn,
- table="threepid_validation_session",
- keyvalues={"session_id": session_id},
- )
-
- await self.db_pool.runInteraction(
- "delete_threepid_session", delete_threepid_session_txn
- )
-
- @wrap_as_background_process("cull_expired_threepid_validation_tokens")
- async def cull_expired_threepid_validation_tokens(self) -> None:
- """Remove threepid validation tokens with expiry dates that have passed"""
-
- def cull_expired_threepid_validation_tokens_txn(
- txn: LoggingTransaction, ts: int
- ) -> None:
- sql = """
- DELETE FROM threepid_validation_token WHERE
- expires < ?
- """
- txn.execute(sql, (ts,))
-
- await self.db_pool.runInteraction(
- "cull_expired_threepid_validation_tokens",
- cull_expired_threepid_validation_tokens_txn,
- self._clock.time_msec(),
- )
-
@wrap_as_background_process("account_validity_set_expiration_dates")
async def _set_expiration_date_when_missing(self) -> None:
"""
@@ -2361,9 +2060,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
):
super().__init__(database, db_conn, hs)
- self._ignore_unknown_session_error = (
- hs.config.server.request_token_inhibit_3pid_errors
- )
+ self._ignore_unknown_session_error = False # Used to use whether 3pid errors were suppressed or not... Problem?
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
@@ -2832,96 +2529,6 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
desc="add_user_pending_deactivation",
)
- async def validate_threepid_session(
- self, session_id: str, client_secret: str, token: str, current_ts: int
- ) -> Optional[str]:
- """Attempt to validate a threepid session using a token
-
- Args:
- session_id: The id of a validation session
- client_secret: A unique string provided by the client to help identify
- this validation attempt
- token: A validation token
- current_ts: The current unix time in milliseconds. Used for checking
- token expiry status
-
- Raises:
- ThreepidValidationError: if a matching validation token was not found or has
- expired
-
- Returns:
- A str representing a link to redirect the user to if there is one.
- """
-
- # Insert everything into a transaction in order to run atomically
- def validate_threepid_session_txn(txn: LoggingTransaction) -> Optional[str]:
- row = self.db_pool.simple_select_one_txn(
- txn,
- table="threepid_validation_session",
- keyvalues={"session_id": session_id},
- retcols=["client_secret", "validated_at"],
- allow_none=True,
- )
-
- if not row:
- if self._ignore_unknown_session_error:
- # If we need to inhibit the error caused by an incorrect session ID,
- # use None as placeholder values for the client secret and the
- # validation timestamp.
- # It shouldn't be an issue because they're both only checked after
- # the token check, which should fail. And if it doesn't for some
- # reason, the next check is on the client secret, which is NOT NULL,
- # so we don't have to worry about the client secret matching by
- # accident.
- row = None, None
- else:
- raise ThreepidValidationError("Unknown session_id")
-
- retrieved_client_secret, validated_at = row
-
- row = self.db_pool.simple_select_one_txn(
- txn,
- table="threepid_validation_token",
- keyvalues={"session_id": session_id, "token": token},
- retcols=["expires", "next_link"],
- allow_none=True,
- )
-
- if not row:
- raise ThreepidValidationError(
- "Validation token not found or has expired"
- )
- expires, next_link = row
-
- if retrieved_client_secret != client_secret:
- raise ThreepidValidationError(
- "This client_secret does not match the provided session_id"
- )
-
- # If the session is already validated, no need to revalidate
- if validated_at:
- return next_link
-
- if expires <= current_ts:
- raise ThreepidValidationError(
- "This token has expired. Please request a new one"
- )
-
- # Looks good. Validate the session
- self.db_pool.simple_update_txn(
- txn,
- table="threepid_validation_session",
- keyvalues={"session_id": session_id},
- updatevalues={"validated_at": self._clock.time_msec()},
- )
-
- return next_link
-
- # Return next_link if it exists
- return await self.db_pool.runInteraction(
- "validate_threepid_session_txn", validate_threepid_session_txn
- )
-
async def start_or_continue_validation_session(
self,
medium: str,
|