summary refs log tree commit diff
path: root/synapse/storage/registration.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/registration.py')
-rw-r--r--synapse/storage/registration.py103
1 files changed, 54 insertions, 49 deletions
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 7e7d32eb66..e404fa72de 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -93,7 +93,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
             desc="add_refresh_token_to_user",
         )
 
-    @defer.inlineCallbacks
     def register(self, user_id, token=None, password_hash=None,
                  was_guest=False, make_guest=False, appservice_id=None,
                  create_profile_with_localpart=None, admin=False):
@@ -115,7 +114,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
         Raises:
             StoreError if the user_id could not be registered.
         """
-        yield self.runInteraction(
+        return self.runInteraction(
             "register",
             self._register,
             user_id,
@@ -127,8 +126,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
             create_profile_with_localpart,
             admin
         )
-        self.get_user_by_id.invalidate((user_id,))
-        self.is_guest.invalidate((user_id,))
 
     def _register(
         self,
@@ -210,6 +207,11 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
                 (create_profile_with_localpart,)
             )
 
+        self._invalidate_cache_and_stream(
+            txn, self.get_user_by_id, (user_id,)
+        )
+        txn.call_after(self.is_guest.invalidate, (user_id,))
+
     @cached()
     def get_user_by_id(self, user_id):
         return self._simple_select_one(
@@ -236,22 +238,31 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
 
         return self.runInteraction("get_users_by_id_case_insensitive", f)
 
-    @defer.inlineCallbacks
     def user_set_password_hash(self, user_id, password_hash):
         """
         NB. This does *not* evict any cache because the one use for this
             removes most of the entries subsequently anyway so it would be
             pointless. Use flush_user separately.
         """
-        yield self._simple_update_one('users', {
-            'name': user_id
-        }, {
-            'password_hash': password_hash
-        })
-        self.get_user_by_id.invalidate((user_id,))
+        def user_set_password_hash_txn(txn):
+            self._simple_update_one_txn(
+                txn,
+                'users', {
+                    'name': user_id
+                },
+                {
+                    'password_hash': password_hash
+                }
+            )
+            self._invalidate_cache_and_stream(
+                txn, self.get_user_by_id, (user_id,)
+            )
+        return self.runInteraction(
+            "user_set_password_hash", user_set_password_hash_txn
+        )
 
     @defer.inlineCallbacks
-    def user_delete_access_tokens(self, user_id, except_token_ids=[],
+    def user_delete_access_tokens(self, user_id, except_token_id=None,
                                   device_id=None,
                                   delete_refresh_tokens=False):
         """
@@ -259,7 +270,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
 
         Args:
             user_id (str):  ID of user the tokens belong to
-            except_token_ids (list[str]): list of access_tokens which should
+            except_token_id (str): list of access_tokens IDs which should
                 *not* be deleted
             device_id (str|None):  ID of device the tokens are associated with.
                 If None, tokens associated with any device (or no device) will
@@ -269,53 +280,45 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
         Returns:
             defer.Deferred:
         """
-        def f(txn, table, except_tokens, call_after_delete):
-            sql = "SELECT token FROM %s WHERE user_id = ?" % table
-            clauses = [user_id]
-
+        def f(txn):
+            keyvalues = {
+                "user_id": user_id,
+            }
             if device_id is not None:
-                sql += " AND device_id = ?"
-                clauses.append(device_id)
+                keyvalues["device_id"] = device_id
 
-            if except_tokens:
-                sql += " AND id NOT IN (%s)" % (
-                    ",".join(["?" for _ in except_tokens]),
+            if delete_refresh_tokens:
+                self._simple_delete_txn(
+                    txn,
+                    table="refresh_tokens",
+                    keyvalues=keyvalues,
                 )
-                clauses += except_tokens
-
-            txn.execute(sql, clauses)
 
-            rows = txn.fetchall()
+            items = keyvalues.items()
+            where_clause = " AND ".join(k + " = ?" for k, _ in items)
+            values = [v for _, v in items]
+            if except_token_id:
+                where_clause += " AND id != ?"
+                values.append(except_token_id)
 
-            n = 100
-            chunks = [rows[i:i + n] for i in xrange(0, len(rows), n)]
-            for chunk in chunks:
-                if call_after_delete:
-                    for row in chunk:
-                        txn.call_after(call_after_delete, (row[0],))
+            txn.execute(
+                "SELECT token FROM access_tokens WHERE %s" % where_clause,
+                values
+            )
+            rows = self.cursor_to_dict(txn)
 
-                txn.execute(
-                    "DELETE FROM %s WHERE token in (%s)" % (
-                        table,
-                        ",".join(["?" for _ in chunk]),
-                    ), [r[0] for r in chunk]
+            for row in rows:
+                self._invalidate_cache_and_stream(
+                    txn, self.get_user_by_access_token, (row["token"],)
                 )
 
-        # delete refresh tokens first, to stop new access tokens being
-        # allocated while our backs are turned
-        if delete_refresh_tokens:
-            yield self.runInteraction(
-                "user_delete_access_tokens", f,
-                table="refresh_tokens",
-                except_tokens=[],
-                call_after_delete=None,
+            txn.execute(
+                "DELETE FROM access_tokens WHERE %s" % where_clause,
+                values
             )
 
         yield self.runInteraction(
             "user_delete_access_tokens", f,
-            table="access_tokens",
-            except_tokens=except_token_ids,
-            call_after_delete=self.get_user_by_access_token.invalidate,
         )
 
     def delete_access_token(self, access_token):
@@ -328,7 +331,9 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
                 },
             )
 
-            txn.call_after(self.get_user_by_access_token.invalidate, (access_token,))
+            self._invalidate_cache_and_stream(
+                txn, self.get_user_by_access_token, (access_token,)
+            )
 
         return self.runInteraction("delete_access_token", f)