diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 5138792a5f..6c5b29288a 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -22,9 +22,10 @@ from six import iterkeys
from six.moves import range
from twisted.internet import defer
+from twisted.internet.defer import Deferred
from synapse.api.constants import UserTypes
-from synapse.api.errors import Codes, StoreError, ThreepidValidationError
+from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
@@ -323,6 +324,19 @@ class RegistrationWorkerStore(SQLBaseStore):
return None
@cachedInlineCallbacks()
+ def is_real_user(self, user_id):
+ """Determines if the user is a real user, ie does not have a 'user_type'.
+
+ Args:
+ user_id (str): user id to test
+
+ Returns:
+ Deferred[bool]: True if user 'user_type' is null or empty string
+ """
+ res = yield self.runInteraction("is_real_user", self.is_real_user_txn, user_id)
+ return res
+
+ @cachedInlineCallbacks()
def is_support_user(self, user_id):
"""Determines if the user is of type UserTypes.SUPPORT
@@ -337,6 +351,16 @@ class RegistrationWorkerStore(SQLBaseStore):
)
return res
+ def is_real_user_txn(self, txn, user_id):
+ res = self._simple_select_one_onecol_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ retcol="user_type",
+ allow_none=True,
+ )
+ return res is None
+
def is_support_user_txn(self, txn, user_id):
res = self._simple_select_one_onecol_txn(
txn=txn,
@@ -361,6 +385,26 @@ class RegistrationWorkerStore(SQLBaseStore):
return self.runInteraction("get_users_by_id_case_insensitive", f)
+ async def get_user_by_external_id(
+ self, auth_provider: str, external_id: str
+ ) -> str:
+ """Look up a user by their external auth id
+
+ Args:
+ auth_provider: identifier for the remote auth provider
+ external_id: id on that system
+
+ Returns:
+ str|None: the mxid of the user, or None if they are not known
+ """
+ return await self._simple_select_one_onecol(
+ table="user_external_ids",
+ keyvalues={"auth_provider": auth_provider, "external_id": external_id},
+ retcol="user_id",
+ allow_none=True,
+ desc="get_user_by_external_id",
+ )
+
@defer.inlineCallbacks
def count_all_users(self):
"""Counts all users registered on the homeserver."""
@@ -422,6 +466,20 @@ class RegistrationWorkerStore(SQLBaseStore):
return ret
@defer.inlineCallbacks
+ def count_real_users(self):
+ """Counts all users without a special user_type registered on the homeserver."""
+
+ def _count_users(txn):
+ txn.execute("SELECT COUNT(*) AS users FROM users where user_type is null")
+ rows = self.cursor_to_dict(txn)
+ if rows:
+ return rows[0]["users"]
+ return 0
+
+ ret = yield self.runInteraction("count_real_users", _count_users)
+ return ret
+
+ @defer.inlineCallbacks
def find_next_generated_user_id_localpart(self):
"""
Gets the localpart of the next generated user ID.
@@ -435,7 +493,9 @@ class RegistrationWorkerStore(SQLBaseStore):
"""
def _find_next_generated_user_id(txn):
- txn.execute("SELECT name FROM users")
+ # We bound between '@1' and '@a' to avoid pulling the entire table
+ # out.
+ txn.execute("SELECT name FROM users WHERE '@1' <= name AND name < '@a'")
regex = re.compile(r"^@(\d+):")
@@ -458,7 +518,7 @@ class RegistrationWorkerStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_user_id_by_threepid(self, medium, address, require_verified=False):
+ def get_user_id_by_threepid(self, medium, address):
"""Returns user id from threepid
Args:
@@ -549,6 +609,26 @@ class RegistrationWorkerStore(SQLBaseStore):
desc="add_user_bound_threepid",
)
+ def user_get_bound_threepids(self, user_id):
+ """Get the threepids that a user has bound to an identity server through the homeserver
+ The homeserver remembers where binds to an identity server occurred. Using this
+ method can retrieve those threepids.
+
+ Args:
+ user_id (str): The ID of the user to retrieve threepids for
+
+ Returns:
+ Deferred[list[dict]]: List of dictionaries containing the following:
+ medium (str): The medium of the threepid (e.g "email")
+ address (str): The address of the threepid (e.g "bob@example.com")
+ """
+ return self._simple_select_list(
+ table="user_threepid_id_server",
+ keyvalues={"user_id": user_id},
+ retcols=["medium", "address"],
+ desc="user_get_bound_threepids",
+ )
+
def remove_user_bound_threepid(self, user_id, medium, address, id_server):
"""The server proxied an unbind request to the given identity server on
behalf of the given user, so we remove the mapping of threepid to
@@ -618,24 +698,37 @@ class RegistrationWorkerStore(SQLBaseStore):
self, medium, client_secret, address=None, sid=None, validated=True
):
"""Gets a session_id and last_send_attempt (if available) for a
- client_secret/medium/(address|session_id) combo
+ combination of validation metadata
Args:
medium (str|None): The medium of the 3PID
address (str|None): The address of the 3PID
sid (str|None): The ID of the validation session
- client_secret (str|None): A unique string provided by the client to
- help identify this validation attempt
+ client_secret (str): A unique string provided by the client to help identify this
+ validation attempt
validated (bool|None): Whether sessions should be filtered by
whether they have been validated already or not. None to
perform no filtering
Returns:
- deferred {str, int}|None: A dict containing the
- latest session_id and send_attempt count for this 3PID.
- Otherwise None if there hasn't been a previous attempt
+ Deferred[dict|None]: A dict containing the following:
+ * address - address of the 3pid
+ * medium - medium of the 3pid
+ * client_secret - a secret provided by the client for this validation session
+ * session_id - ID of the validation session
+ * send_attempt - a number serving to dedupe send attempts for this session
+ * validated_at - timestamp of when this session was validated if so
+
+ Otherwise None if a validation session is not found
"""
- keyvalues = {"medium": medium, "client_secret": client_secret}
+ if not client_secret:
+ raise SynapseError(
+ 400, "Missing parameter: client_secret", errcode=Codes.MISSING_PARAM
+ )
+
+ keyvalues = {"client_secret": client_secret}
+ if medium:
+ keyvalues["medium"] = medium
if address:
keyvalues["address"] = address
if sid:
@@ -694,13 +787,14 @@ class RegistrationWorkerStore(SQLBaseStore):
)
-class RegistrationStore(
+class RegistrationBackgroundUpdateStore(
RegistrationWorkerStore, background_updates.BackgroundUpdateStore
):
def __init__(self, db_conn, hs):
- super(RegistrationStore, self).__init__(db_conn, hs)
+ super(RegistrationBackgroundUpdateStore, self).__init__(db_conn, hs)
self.clock = hs.get_clock()
+ self.config = hs.config
self.register_background_index_update(
"access_tokens_device_index",
@@ -716,8 +810,6 @@ class RegistrationStore(
columns=["creation_ts"],
)
- self._account_validity = hs.config.account_validity
-
# we no longer use refresh tokens, but it's possible that some people
# might have a background update queued to build this index. Just
# clear the background update.
@@ -731,17 +823,6 @@ class RegistrationStore(
"users_set_deactivated_flag", self._background_update_set_deactivated_flag
)
- # Create a background job for culling expired 3PID validity tokens
- def start_cull():
- # run as a background process to make sure that the database transactions
- # have a logcontext to report to
- return run_as_background_process(
- "cull_expired_threepid_validation_tokens",
- self.cull_expired_threepid_validation_tokens,
- )
-
- hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
-
@defer.inlineCallbacks
def _background_update_set_deactivated_flag(self, progress, batch_size):
"""Retrieves a list of all deactivated users and sets the 'deactivated' flag to 1
@@ -774,7 +855,7 @@ class RegistrationStore(
rows = self.cursor_to_dict(txn)
if not rows:
- return True
+ return True, 0
rows_processed_nb = 0
@@ -790,18 +871,66 @@ class RegistrationStore(
)
if batch_size > len(rows):
- return True
+ return True, len(rows)
else:
- return False
+ return False, len(rows)
- end = yield self.runInteraction(
+ end, nb_processed = yield self.runInteraction(
"users_set_deactivated_flag", _background_update_set_deactivated_flag_txn
)
if end:
yield self._end_background_update("users_set_deactivated_flag")
- return batch_size
+ return nb_processed
+
+ @defer.inlineCallbacks
+ def _bg_user_threepids_grandfather(self, progress, batch_size):
+ """We now track which identity servers a user binds their 3PID to, so
+ we need to handle the case of existing bindings where we didn't track
+ this.
+
+ We do this by grandfathering in existing user threepids assuming that
+ they used one of the server configured trusted identity servers.
+ """
+ id_servers = set(self.config.trusted_third_party_id_servers)
+
+ def _bg_user_threepids_grandfather_txn(txn):
+ sql = """
+ INSERT INTO user_threepid_id_server
+ (user_id, medium, address, id_server)
+ SELECT user_id, medium, address, ?
+ FROM user_threepids
+ """
+
+ txn.executemany(sql, [(id_server,) for id_server in id_servers])
+
+ if id_servers:
+ yield self.runInteraction(
+ "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
+ )
+
+ yield self._end_background_update("user_threepids_grandfather")
+
+ return 1
+
+
+class RegistrationStore(RegistrationBackgroundUpdateStore):
+ def __init__(self, db_conn, hs):
+ super(RegistrationStore, self).__init__(db_conn, hs)
+
+ self._account_validity = hs.config.account_validity
+
+ # Create a background job for culling expired 3PID validity tokens
+ def start_cull():
+ # run as a background process to make sure that the database transactions
+ # have a logcontext to report to
+ return run_as_background_process(
+ "cull_expired_threepid_validation_tokens",
+ self.cull_expired_threepid_validation_tokens,
+ )
+
+ hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS)
@defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id, valid_until_ms):
@@ -962,6 +1091,26 @@ class RegistrationStore(
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
+ def record_user_external_id(
+ self, auth_provider: str, external_id: str, user_id: str
+ ) -> Deferred:
+ """Record a mapping from an external user id to a mxid
+
+ Args:
+ auth_provider: identifier for the remote auth provider
+ external_id: id on that system
+ user_id: complete mxid that it is mapped to
+ """
+ return self._simple_insert(
+ table="user_external_ids",
+ values={
+ "auth_provider": auth_provider,
+ "external_id": external_id,
+ "user_id": user_id,
+ },
+ desc="record_user_external_id",
+ )
+
def user_set_password_hash(self, user_id, password_hash):
"""
NB. This does *not* evict any cache because the one use for this
@@ -1131,36 +1280,6 @@ class RegistrationStore(
desc="get_users_pending_deactivation",
)
- @defer.inlineCallbacks
- def _bg_user_threepids_grandfather(self, progress, batch_size):
- """We now track which identity servers a user binds their 3PID to, so
- we need to handle the case of existing bindings where we didn't track
- this.
-
- We do this by grandfathering in existing user threepids assuming that
- they used one of the server configured trusted identity servers.
- """
- id_servers = set(self.config.trusted_third_party_id_servers)
-
- def _bg_user_threepids_grandfather_txn(txn):
- sql = """
- INSERT INTO user_threepid_id_server
- (user_id, medium, address, id_server)
- SELECT user_id, medium, address, ?
- FROM user_threepids
- """
-
- txn.executemany(sql, [(id_server,) for id_server in id_servers])
-
- if id_servers:
- yield self.runInteraction(
- "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
- )
-
- yield self._end_background_update("user_threepids_grandfather")
-
- return 1
-
def validate_threepid_session(self, session_id, client_secret, token, current_ts):
"""Attempt to validate a threepid session using a token
@@ -1172,6 +1291,10 @@ class RegistrationStore(
current_ts (int): The current unix time in milliseconds. Used for
checking token expiry status
+ Raises:
+ ThreepidValidationError: if a matching validation token was not found or has
+ expired
+
Returns:
deferred str|None: A str representing a link to redirect the user
to if there is one.
@@ -1348,17 +1471,6 @@ class RegistrationStore(
self.clock.time_msec(),
)
- def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
- self._simple_update_one_txn(
- txn=txn,
- table="users",
- keyvalues={"name": user_id},
- updatevalues={"deactivated": 1 if deactivated else 0},
- )
- self._invalidate_cache_and_stream(
- txn, self.get_user_deactivated_status, (user_id,)
- )
-
@defer.inlineCallbacks
def set_user_deactivated_status(self, user_id, deactivated):
"""Set the `deactivated` property for the provided user to the provided value.
@@ -1374,3 +1486,14 @@ class RegistrationStore(
user_id,
deactivated,
)
+
+ def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
+ self._simple_update_one_txn(
+ txn=txn,
+ table="users",
+ keyvalues={"name": user_id},
+ updatevalues={"deactivated": 1 if deactivated else 0},
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_deactivated_status, (user_id,)
+ )
|