diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 20acd58fcf..07333f777d 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -15,17 +15,83 @@
import re
+from six.moves import range
+
from twisted.internet import defer
-from synapse.api.errors import StoreError, Codes
+from synapse.api.errors import Codes, StoreError
from synapse.storage import background_updates
+from synapse.storage._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-class RegistrationStore(background_updates.BackgroundUpdateStore):
+class RegistrationWorkerStore(SQLBaseStore):
+ @cached()
+ def get_user_by_id(self, user_id):
+ return self._simple_select_one(
+ table="users",
+ keyvalues={
+ "name": user_id,
+ },
+ retcols=[
+ "name", "password_hash", "is_guest",
+ "consent_version", "consent_server_notice_sent",
+ "appservice_id",
+ ],
+ allow_none=True,
+ desc="get_user_by_id",
+ )
+
+ @cached()
+ def get_user_by_access_token(self, token):
+ """Get a user from the given access token.
+
+ Args:
+ token (str): The access token of a user.
+ Returns:
+ defer.Deferred: None, if the token did not match, otherwise dict
+ including the keys `name`, `is_guest`, `device_id`, `token_id`.
+ """
+ return self.runInteraction(
+ "get_user_by_access_token",
+ self._query_for_auth,
+ token
+ )
+
+ @defer.inlineCallbacks
+ def is_server_admin(self, user):
+ res = yield self._simple_select_one_onecol(
+ table="users",
+ keyvalues={"name": user.to_string()},
+ retcol="admin",
+ allow_none=True,
+ desc="is_server_admin",
+ )
+
+ defer.returnValue(res if res else False)
+
+ def _query_for_auth(self, txn, token):
+ sql = (
+ "SELECT users.name, users.is_guest, access_tokens.id as token_id,"
+ " access_tokens.device_id"
+ " FROM users"
+ " INNER JOIN access_tokens on users.name = access_tokens.user_id"
+ " WHERE token = ?"
+ )
+
+ txn.execute(sql, (token,))
+ rows = self.cursor_to_dict(txn)
+ if rows:
+ return rows[0]
+
+ return None
+
+
+class RegistrationStore(RegistrationWorkerStore,
+ background_updates.BackgroundUpdateStore):
- def __init__(self, hs):
- super(RegistrationStore, self).__init__(hs)
+ def __init__(self, db_conn, hs):
+ super(RegistrationStore, self).__init__(db_conn, hs)
self.clock = hs.get_clock()
@@ -37,12 +103,17 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
)
self.register_background_index_update(
- "refresh_tokens_device_index",
- index_name="refresh_tokens_device_id",
- table="refresh_tokens",
- columns=["user_id", "device_id"],
+ "users_creation_ts",
+ index_name="users_creation_ts",
+ table="users",
+ columns=["creation_ts"],
)
+ # we no longer use refresh tokens, but it's possible that some people
+ # might have a background update queued to build this index. Just
+ # clear the background update.
+ self.register_noop_background_update("refresh_tokens_device_index")
+
@defer.inlineCallbacks
def add_access_token_to_user(self, user_id, token, device_id=None):
"""Adds an access token for the given user.
@@ -177,9 +248,11 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
)
if create_profile_with_localpart:
+ # set a default displayname serverside to avoid ugly race
+ # between auto-joins and clients trying to set displaynames
txn.execute(
- "INSERT INTO profiles(user_id) VALUES (?)",
- (create_profile_with_localpart,)
+ "INSERT INTO profiles(user_id, displayname) VALUES (?,?)",
+ (create_profile_with_localpart, create_profile_with_localpart)
)
self._invalidate_cache_and_stream(
@@ -187,18 +260,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
)
txn.call_after(self.is_guest.invalidate, (user_id,))
- @cached()
- def get_user_by_id(self, user_id):
- return self._simple_select_one(
- table="users",
- keyvalues={
- "name": user_id,
- },
- retcols=["name", "password_hash", "is_guest"],
- allow_none=True,
- desc="get_user_by_id",
- )
-
def get_users_by_id_case_insensitive(self, user_id):
"""Gets users that match user_id case insensitively.
Returns a mapping of user_id -> password_hash.
@@ -236,12 +297,57 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
"user_set_password_hash", user_set_password_hash_txn
)
- @defer.inlineCallbacks
+ def user_set_consent_version(self, user_id, consent_version):
+ """Updates the user table to record privacy policy consent
+
+ Args:
+ user_id (str): full mxid of the user to update
+ consent_version (str): version of the policy the user has consented
+ to
+
+ Raises:
+ StoreError(404) if user not found
+ """
+ def f(txn):
+ self._simple_update_one_txn(
+ txn,
+ table='users',
+ keyvalues={'name': user_id, },
+ updatevalues={'consent_version': consent_version, },
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_id, (user_id,)
+ )
+ return self.runInteraction("user_set_consent_version", f)
+
+ def user_set_consent_server_notice_sent(self, user_id, consent_version):
+ """Updates the user table to record that we have sent the user a server
+ notice about privacy policy consent
+
+ Args:
+ user_id (str): full mxid of the user to update
+ consent_version (str): version of the policy we have notified the
+ user about
+
+ Raises:
+ StoreError(404) if user not found
+ """
+ def f(txn):
+ self._simple_update_one_txn(
+ txn,
+ table='users',
+ keyvalues={'name': user_id, },
+ updatevalues={'consent_server_notice_sent': consent_version, },
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_id, (user_id,)
+ )
+ return self.runInteraction("user_set_consent_server_notice_sent", f)
+
def user_delete_access_tokens(self, user_id, except_token_id=None,
- device_id=None,
- delete_refresh_tokens=False):
+ device_id=None):
"""
- Invalidate access/refresh tokens belonging to a user
+ Invalidate access tokens belonging to a user
Args:
user_id (str): ID of user the tokens belong to
@@ -250,10 +356,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
device_id (str|None): ID of device the tokens are associated with.
If None, tokens associated with any device (or no device) will
be deleted
- delete_refresh_tokens (bool): True to delete refresh tokens as
- well as access tokens.
Returns:
- defer.Deferred:
+ defer.Deferred[list[str, int, str|None, int]]: a list of
+ (token, token id, device id) for each of the deleted tokens
"""
def f(txn):
keyvalues = {
@@ -262,13 +367,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
if device_id is not None:
keyvalues["device_id"] = device_id
- if delete_refresh_tokens:
- self._simple_delete_txn(
- txn,
- table="refresh_tokens",
- keyvalues=keyvalues,
- )
-
items = keyvalues.items()
where_clause = " AND ".join(k + " = ?" for k, _ in items)
values = [v for _, v in items]
@@ -277,14 +375,14 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
values.append(except_token_id)
txn.execute(
- "SELECT token FROM access_tokens WHERE %s" % where_clause,
+ "SELECT token, id, device_id FROM access_tokens WHERE %s" % where_clause,
values
)
- rows = self.cursor_to_dict(txn)
+ tokens_and_devices = [(r[0], r[1], r[2]) for r in txn]
- for row in rows:
+ for token, _, _ in tokens_and_devices:
self._invalidate_cache_and_stream(
- txn, self.get_user_by_access_token, (row["token"],)
+ txn, self.get_user_by_access_token, (token,)
)
txn.execute(
@@ -292,7 +390,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
values
)
- yield self.runInteraction(
+ return tokens_and_devices
+
+ return self.runInteraction(
"user_delete_access_tokens", f,
)
@@ -312,34 +412,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
return self.runInteraction("delete_access_token", f)
- @cached()
- def get_user_by_access_token(self, token):
- """Get a user from the given access token.
-
- Args:
- token (str): The access token of a user.
- Returns:
- defer.Deferred: None, if the token did not match, otherwise dict
- including the keys `name`, `is_guest`, `device_id`, `token_id`.
- """
- return self.runInteraction(
- "get_user_by_access_token",
- self._query_for_auth,
- token
- )
-
- @defer.inlineCallbacks
- def is_server_admin(self, user):
- res = yield self._simple_select_one_onecol(
- table="users",
- keyvalues={"name": user.to_string()},
- retcol="admin",
- allow_none=True,
- desc="is_server_admin",
- )
-
- defer.returnValue(res if res else False)
-
@cachedInlineCallbacks()
def is_guest(self, user_id):
res = yield self._simple_select_one_onecol(
@@ -352,22 +424,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
defer.returnValue(res if res else False)
- def _query_for_auth(self, txn, token):
- sql = (
- "SELECT users.name, users.is_guest, access_tokens.id as token_id,"
- " access_tokens.device_id"
- " FROM users"
- " INNER JOIN access_tokens on users.name = access_tokens.user_id"
- " WHERE token = ?"
- )
-
- txn.execute(sql, (token,))
- rows = self.cursor_to_dict(txn)
- if rows:
- return rows[0]
-
- return None
-
@defer.inlineCallbacks
def user_add_threepid(self, user_id, medium, address, validated_at, added_at):
yield self._simple_upsert("user_threepids", {
@@ -404,15 +460,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
defer.returnValue(ret['user_id'])
defer.returnValue(None)
- def user_delete_threepids(self, user_id):
- return self._simple_delete(
- "user_threepids",
- keyvalues={
- "user_id": user_id,
- },
- desc="user_delete_threepids",
- )
-
def user_delete_threepid(self, user_id, medium, address):
return self._simple_delete(
"user_threepids",
@@ -437,6 +484,35 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
ret = yield self.runInteraction("count_users", _count_users)
defer.returnValue(ret)
+ def count_daily_user_type(self):
+ """
+ Counts 1) native non guest users
+ 2) native guests users
+ 3) bridged users
+ who registered on the homeserver in the past 24 hours
+ """
+ def _count_daily_user_type(txn):
+ yesterday = int(self._clock.time()) - (60 * 60 * 24)
+
+ sql = """
+ SELECT user_type, COALESCE(count(*), 0) AS count FROM (
+ SELECT
+ CASE
+ WHEN is_guest=0 AND appservice_id IS NULL THEN 'native'
+ WHEN is_guest=1 AND appservice_id IS NULL THEN 'guest'
+ WHEN is_guest=0 AND appservice_id IS NOT NULL THEN 'bridged'
+ END AS user_type
+ FROM users
+ WHERE creation_ts > ?
+ ) AS t GROUP BY user_type
+ """
+ results = {'native': 0, 'guest': 0, 'bridged': 0}
+ txn.execute(sql, (yesterday,))
+ for row in txn:
+ results[row[0]] = row[1]
+ return results
+ return self.runInteraction("count_daily_user_type", _count_daily_user_type)
+
@defer.inlineCallbacks
def count_nonbridged_users(self):
def _count_users(txn):
@@ -464,18 +540,16 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
"""
def _find_next_generated_user_id(txn):
txn.execute("SELECT name FROM users")
- rows = self.cursor_to_dict(txn)
regex = re.compile("^@(\d+):")
found = set()
- for r in rows:
- user_id = r["name"]
+ for user_id, in txn:
match = regex.search(user_id)
if match:
found.add(int(match.group(1)))
- for i in xrange(len(found) + 1):
+ for i in range(len(found) + 1):
if i not in found:
return i
@@ -530,3 +604,44 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
except self.database_engine.module.IntegrityError:
ret = yield self.get_3pid_guest_access_token(medium, address)
defer.returnValue(ret)
+
+ def add_user_pending_deactivation(self, user_id):
+ """
+ Adds a user to the table of users who need to be parted from all the rooms they're
+ in
+ """
+ return self._simple_insert(
+ "users_pending_deactivation",
+ values={
+ "user_id": user_id,
+ },
+ desc="add_user_pending_deactivation",
+ )
+
+ def del_user_pending_deactivation(self, user_id):
+ """
+ Removes the given user to the table of users who need to be parted from all the
+ rooms they're in, effectively marking that user as fully deactivated.
+ """
+ # XXX: This should be simple_delete_one but we failed to put a unique index on
+ # the table, so somehow duplicate entries have ended up in it.
+ return self._simple_delete(
+ "users_pending_deactivation",
+ keyvalues={
+ "user_id": user_id,
+ },
+ desc="del_user_pending_deactivation",
+ )
+
+ def get_user_pending_deactivation(self):
+ """
+ Gets one user from the table of users waiting to be parted from all the rooms
+ they're in.
+ """
+ return self._simple_select_one_onecol(
+ "users_pending_deactivation",
+ keyvalues={},
+ retcol="user_id",
+ allow_none=True,
+ desc="get_users_pending_deactivation",
+ )
|