diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7e7d32eb66..e404fa72de 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -93,7 +93,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
desc="add_refresh_token_to_user",
)
- @defer.inlineCallbacks
def register(self, user_id, token=None, password_hash=None,
was_guest=False, make_guest=False, appservice_id=None,
create_profile_with_localpart=None, admin=False):
@@ -115,7 +114,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
Raises:
StoreError if the user_id could not be registered.
"""
- yield self.runInteraction(
+ return self.runInteraction(
"register",
self._register,
user_id,
@@ -127,8 +126,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
create_profile_with_localpart,
admin
)
- self.get_user_by_id.invalidate((user_id,))
- self.is_guest.invalidate((user_id,))
def _register(
self,
@@ -210,6 +207,11 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
(create_profile_with_localpart,)
)
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_id, (user_id,)
+ )
+ txn.call_after(self.is_guest.invalidate, (user_id,))
+
@cached()
def get_user_by_id(self, user_id):
return self._simple_select_one(
@@ -236,22 +238,31 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
return self.runInteraction("get_users_by_id_case_insensitive", f)
- @defer.inlineCallbacks
def user_set_password_hash(self, user_id, password_hash):
"""
NB. This does *not* evict any cache because the one use for this
removes most of the entries subsequently anyway so it would be
pointless. Use flush_user separately.
"""
- yield self._simple_update_one('users', {
- 'name': user_id
- }, {
- 'password_hash': password_hash
- })
- self.get_user_by_id.invalidate((user_id,))
+ def user_set_password_hash_txn(txn):
+ self._simple_update_one_txn(
+ txn,
+ 'users', {
+ 'name': user_id
+ },
+ {
+ 'password_hash': password_hash
+ }
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_id, (user_id,)
+ )
+ return self.runInteraction(
+ "user_set_password_hash", user_set_password_hash_txn
+ )
@defer.inlineCallbacks
- def user_delete_access_tokens(self, user_id, except_token_ids=[],
+ def user_delete_access_tokens(self, user_id, except_token_id=None,
device_id=None,
delete_refresh_tokens=False):
"""
@@ -259,7 +270,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
Args:
user_id (str): ID of user the tokens belong to
- except_token_ids (list[str]): list of access_tokens which should
+ except_token_id (str): list of access_tokens IDs which should
*not* be deleted
device_id (str|None): ID of device the tokens are associated with.
If None, tokens associated with any device (or no device) will
@@ -269,53 +280,45 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
Returns:
defer.Deferred:
"""
- def f(txn, table, except_tokens, call_after_delete):
- sql = "SELECT token FROM %s WHERE user_id = ?" % table
- clauses = [user_id]
-
+ def f(txn):
+ keyvalues = {
+ "user_id": user_id,
+ }
if device_id is not None:
- sql += " AND device_id = ?"
- clauses.append(device_id)
+ keyvalues["device_id"] = device_id
- if except_tokens:
- sql += " AND id NOT IN (%s)" % (
- ",".join(["?" for _ in except_tokens]),
+ if delete_refresh_tokens:
+ self._simple_delete_txn(
+ txn,
+ table="refresh_tokens",
+ keyvalues=keyvalues,
)
- clauses += except_tokens
-
- txn.execute(sql, clauses)
- rows = txn.fetchall()
+ items = keyvalues.items()
+ where_clause = " AND ".join(k + " = ?" for k, _ in items)
+ values = [v for _, v in items]
+ if except_token_id:
+ where_clause += " AND id != ?"
+ values.append(except_token_id)
- n = 100
- chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)]
- for chunk in chunks:
- if call_after_delete:
- for row in chunk:
- txn.call_after(call_after_delete, (row[0],))
+ txn.execute(
+ "SELECT token FROM access_tokens WHERE %s" % where_clause,
+ values
+ )
+ rows = self.cursor_to_dict(txn)
- txn.execute(
- "DELETE FROM %s WHERE token in (%s)" % (
- table,
- ",".join(["?" for _ in chunk]),
- ), [r[0] for r in chunk]
+ for row in rows:
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_access_token, (row["token"],)
)
- # delete refresh tokens first, to stop new access tokens being
- # allocated while our backs are turned
- if delete_refresh_tokens:
- yield self.runInteraction(
- "user_delete_access_tokens", f,
- table="refresh_tokens",
- except_tokens=[],
- call_after_delete=None,
+ txn.execute(
+ "DELETE FROM access_tokens WHERE %s" % where_clause,
+ values
)
yield self.runInteraction(
"user_delete_access_tokens", f,
- table="access_tokens",
- except_tokens=except_token_ids,
- call_after_delete=self.get_user_by_access_token.invalidate,
)
def delete_access_token(self, access_token):
@@ -328,7 +331,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
},
)
- txn.call_after(self.get_user_by_access_token.invalidate, (access_token,))
+ self._invalidate_cache_and_stream(
+ txn, self.get_user_by_access_token, (access_token,)
+ )
return self.runInteraction("delete_access_token", f)
|