diff options
author | Erik Johnston <erikj@jki.re> | 2016-08-16 11:37:53 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-08-16 11:37:53 +0100 |
commit | 25c2332071f9f26111a54e099b9884b4a382ac2f (patch) | |
tree | c388750ddc395cf25d44337d1897a133bb7e0b1e /synapse/storage | |
parent | Merge branch 'fix_integrity_retry' of https://github.com/Ralith/synapse into ... (diff) | |
parent | Use cached get_user_by_access_token in slaves (diff) | |
download | synapse-25c2332071f9f26111a54e099b9884b4a382ac2f.tar.xz |
Merge pull request #1010 from matrix-org/erikj/refactor_deletions
Refactor user_delete_access_tokens. Invalidate get_user_by_access_token to slaves.
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/_base.py | 1 | ||||
-rw-r--r-- | synapse/storage/registration.py | 70 |
2 files changed, 33 insertions, 38 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index b0923a9cad..0a2e78fd81 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -880,6 +880,7 @@ class SQLBaseStore(object): ctx = self._cache_id_gen.get_next() stream_id = ctx.__enter__() txn.call_after(ctx.__exit__, None, None, None) + txn.call_after(self.hs.get_notifier().on_new_replication_data) self._simple_insert_txn( txn, diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 7e7d32eb66..19cb3b31c6 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -251,7 +251,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): self.get_user_by_id.invalidate((user_id,)) @defer.inlineCallbacks - def user_delete_access_tokens(self, user_id, except_token_ids=[], + def user_delete_access_tokens(self, user_id, except_token_id=None, device_id=None, delete_refresh_tokens=False): """ @@ -259,7 +259,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Args: user_id (str): ID of user the tokens belong to - except_token_ids (list[str]): list of access_tokens which should + except_token_id (str): list of access_tokens IDs which should *not* be deleted device_id (str|None): ID of device the tokens are associated with. If None, tokens associated with any device (or no device) will @@ -269,53 +269,45 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Returns: defer.Deferred: """ - def f(txn, table, except_tokens, call_after_delete): - sql = "SELECT token FROM %s WHERE user_id = ?" % table - clauses = [user_id] - + def f(txn): + keyvalues = { + "user_id": user_id, + } if device_id is not None: - sql += " AND device_id = ?" - clauses.append(device_id) + keyvalues["device_id"] = device_id - if except_tokens: - sql += " AND id NOT IN (%s)" % ( - ",".join(["?" for _ in except_tokens]), + if delete_refresh_tokens: + self._simple_delete_txn( + txn, + table="refresh_tokens", + keyvalues=keyvalues, ) - clauses += except_tokens - - txn.execute(sql, clauses) - rows = txn.fetchall() + items = keyvalues.items() + where_clause = " AND ".join(k + " = ?" for k, _ in items) + values = [v for _, v in items] + if except_token_id: + where_clause += " AND id != ?" + values.append(except_token_id) - n = 100 - chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)] - for chunk in chunks: - if call_after_delete: - for row in chunk: - txn.call_after(call_after_delete, (row[0],)) + txn.execute( + "SELECT token FROM access_tokens WHERE %s" % where_clause, + values + ) + rows = self.cursor_to_dict(txn) - txn.execute( - "DELETE FROM %s WHERE token in (%s)" % ( - table, - ",".join(["?" for _ in chunk]), - ), [r[0] for r in chunk] + for row in rows: + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (row["token"],) ) - # delete refresh tokens first, to stop new access tokens being - # allocated while our backs are turned - if delete_refresh_tokens: - yield self.runInteraction( - "user_delete_access_tokens", f, - table="refresh_tokens", - except_tokens=[], - call_after_delete=None, + txn.execute( + "DELETE FROM access_tokens WHERE %s" % where_clause, + values ) yield self.runInteraction( "user_delete_access_tokens", f, - table="access_tokens", - except_tokens=except_token_ids, - call_after_delete=self.get_user_by_access_token.invalidate, ) def delete_access_token(self, access_token): @@ -328,7 +320,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): }, ) - txn.call_after(self.get_user_by_access_token.invalidate, (access_token,)) + self._invalidate_cache_and_stream( + txn, self.get_user_by_access_token, (access_token,) + ) return self.runInteraction("delete_access_token", f) |