diff options
Diffstat (limited to 'synapse/storage/registration.py')
-rw-r--r-- | synapse/storage/registration.py | 103 |
1 files changed, 54 insertions, 49 deletions
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7e7d32eb66..e404fa72de 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -93,7 +93,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): desc="add_refresh_token_to_user", ) - @defer.inlineCallbacks def register(self, user_id, token=None, password_hash=None, was_guest=False, make_guest=False, appservice_id=None, create_profile_with_localpart=None, admin=False): @@ -115,7 +114,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Raises: StoreError if the user_id could not be registered. """ - yield self.runInteraction( + return self.runInteraction( "register", self._register, user_id, @@ -127,8 +126,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): create_profile_with_localpart, admin ) - self.get_user_by_id.invalidate((user_id,)) - self.is_guest.invalidate((user_id,)) def _register( self, @@ -210,6 +207,11 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): (create_profile_with_localpart,) ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + txn.call_after(self.is_guest.invalidate, (user_id,)) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( @@ -236,22 +238,31 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): return self.runInteraction("get_users_by_id_case_insensitive", f) - @defer.inlineCallbacks def user_set_password_hash(self, user_id, password_hash): """ NB. This does *not* evict any cache because the one use for this removes most of the entries subsequently anyway so it would be pointless. Use flush_user separately. """ - yield self._simple_update_one('users', { - 'name': user_id - }, { - 'password_hash': password_hash - }) - self.get_user_by_id.invalidate((user_id,)) + def user_set_password_hash_txn(txn): + self._simple_update_one_txn( + txn, + 'users', { + 'name': user_id + }, + { + 'password_hash': password_hash + } + ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + return self.runInteraction( + "user_set_password_hash", user_set_password_hash_txn + ) @defer.inlineCallbacks - def user_delete_access_tokens(self, user_id, except_token_ids=[], + def user_delete_access_tokens(self, user_id, except_token_id=None, device_id=None, delete_refresh_tokens=False): """ @@ -259,7 +270,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Args: user_id (str): ID of user the tokens belong to - except_token_ids (list[str]): list of access_tokens which should + except_token_id (str): list of access_tokens IDs which should *not* be deleted device_id (str|None): ID of device the tokens are associated with. If None, tokens associated with any device (or no device) will @@ -269,53 +280,45 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Returns: defer.Deferred: """ - def f(txn, table, except_tokens, call_after_delete): - sql = "SELECT token FROM %s WHERE user_id = ?" % table - clauses = [user_id] - + def f(txn): + keyvalues = { + "user_id": user_id, + } if device_id is not None: - sql += " AND device_id = ?" - clauses.append(device_id) + keyvalues["device_id"] = device_id - if except_tokens: - sql += " AND id NOT IN (%s)" % ( - ",".join(["?" for _ in except_tokens]), + if delete_refresh_tokens: + self._simple_delete_txn( + txn, + table="refresh_tokens", + keyvalues=keyvalues, ) - clauses += except_tokens - - txn.execute(sql, clauses) - rows = txn.fetchall() + items = keyvalues.items() + where_clause = " AND ".join(k + " = ?" for k, _ in items) + values = [v for _, v in items] + if except_token_id: + where_clause += " AND id != ?" + values.append(except_token_id) - n = 100 - chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)] - for chunk in chunks: - if call_after_delete: - for row in chunk: - txn.call_after(call_after_delete, (row[0],)) + txn.execute( + "SELECT token FROM access_tokens WHERE %s" % where_clause, + values + ) + rows = self.cursor_to_dict(txn) - txn.execute( - "DELETE FROM %s WHERE token in (%s)" % ( - table, - ",".join(["?" for _ in chunk]), - ), [r[0] for r in chunk] + for row in rows: + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (row["token"],) ) - # delete refresh tokens first, to stop new access tokens being - # allocated while our backs are turned - if delete_refresh_tokens: - yield self.runInteraction( - "user_delete_access_tokens", f, - table="refresh_tokens", - except_tokens=[], - call_after_delete=None, + txn.execute( + "DELETE FROM access_tokens WHERE %s" % where_clause, + values ) yield self.runInteraction( "user_delete_access_tokens", f, - table="access_tokens", - except_tokens=except_token_ids, - call_after_delete=self.get_user_by_access_token.invalidate, ) def delete_access_token(self, access_token): @@ -328,7 +331,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): }, ) - txn.call_after(self.get_user_by_access_token.invalidate, (access_token,)) + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (access_token,) + ) return self.runInteraction("delete_access_token", f) |