summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/database.py99
-rw-r--r--synapse/storage/databases/main/keys.py5
-rw-r--r--synapse/storage/databases/main/transactions.py76
-rw-r--r--synapse/storage/databases/main/user_directory.py45
4 files changed, 155 insertions, 70 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 0ba3a025cf..763722d6bc 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -893,6 +893,12 @@ class DatabasePool:
         attempts = 0
         while True:
             try:
+                # We can autocommit if we are going to use native upserts
+                autocommit = (
+                    self.engine.can_native_upsert
+                    and table not in self._unsafe_to_upsert_tables
+                )
+
                 return await self.runInteraction(
                     desc,
                     self.simple_upsert_txn,
@@ -901,6 +907,7 @@ class DatabasePool:
                     values,
                     insertion_values,
                     lock=lock,
+                    db_autocommit=autocommit,
                 )
             except self.engine.module.IntegrityError as e:
                 attempts += 1
@@ -1063,6 +1070,43 @@ class DatabasePool:
         )
         txn.execute(sql, list(allvalues.values()))
 
+    async def simple_upsert_many(
+        self,
+        table: str,
+        key_names: Collection[str],
+        key_values: Collection[Iterable[Any]],
+        value_names: Collection[str],
+        value_values: Iterable[Iterable[Any]],
+        desc: str,
+    ) -> None:
+        """
+        Upsert, many times.
+
+        Args:
+            table: The table to upsert into
+            key_names: The key column names.
+            key_values: A list of each row's key column values.
+            value_names: The value column names
+            value_values: A list of each row's value column values.
+                Ignored if value_names is empty.
+        """
+
+        # We can autocommit if we are going to use native upserts
+        autocommit = (
+            self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
+        )
+
+        return await self.runInteraction(
+            desc,
+            self.simple_upsert_many_txn,
+            table,
+            key_names,
+            key_values,
+            value_names,
+            value_values,
+            db_autocommit=autocommit,
+        )
+
     def simple_upsert_many_txn(
         self,
         txn: LoggingTransaction,
@@ -1214,7 +1258,13 @@ class DatabasePool:
             desc: description of the transaction, for logging and metrics
         """
         return await self.runInteraction(
-            desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none
+            desc,
+            self.simple_select_one_txn,
+            table,
+            keyvalues,
+            retcols,
+            allow_none,
+            db_autocommit=True,
         )
 
     @overload
@@ -1265,6 +1315,7 @@ class DatabasePool:
             keyvalues,
             retcol,
             allow_none=allow_none,
+            db_autocommit=True,
         )
 
     @overload
@@ -1346,7 +1397,12 @@ class DatabasePool:
             Results in a list
         """
         return await self.runInteraction(
-            desc, self.simple_select_onecol_txn, table, keyvalues, retcol
+            desc,
+            self.simple_select_onecol_txn,
+            table,
+            keyvalues,
+            retcol,
+            db_autocommit=True,
         )
 
     async def simple_select_list(
@@ -1371,7 +1427,12 @@ class DatabasePool:
             A list of dictionaries.
         """
         return await self.runInteraction(
-            desc, self.simple_select_list_txn, table, keyvalues, retcols
+            desc,
+            self.simple_select_list_txn,
+            table,
+            keyvalues,
+            retcols,
+            db_autocommit=True,
         )
 
     @classmethod
@@ -1450,6 +1511,7 @@ class DatabasePool:
                 chunk,
                 keyvalues,
                 retcols,
+                db_autocommit=True,
             )
 
             results.extend(rows)
@@ -1548,7 +1610,12 @@ class DatabasePool:
             desc: description of the transaction, for logging and metrics
         """
         await self.runInteraction(
-            desc, self.simple_update_one_txn, table, keyvalues, updatevalues
+            desc,
+            self.simple_update_one_txn,
+            table,
+            keyvalues,
+            updatevalues,
+            db_autocommit=True,
         )
 
     @classmethod
@@ -1607,7 +1674,9 @@ class DatabasePool:
             keyvalues: dict of column names and values to select the row with
             desc: description of the transaction, for logging and metrics
         """
-        await self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues)
+        await self.runInteraction(
+            desc, self.simple_delete_one_txn, table, keyvalues, db_autocommit=True,
+        )
 
     @staticmethod
     def simple_delete_one_txn(
@@ -1646,7 +1715,9 @@ class DatabasePool:
         Returns:
             The number of deleted rows.
         """
-        return await self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
+        return await self.runInteraction(
+            desc, self.simple_delete_txn, table, keyvalues, db_autocommit=True
+        )
 
     @staticmethod
     def simple_delete_txn(
@@ -1694,7 +1765,13 @@ class DatabasePool:
             Number rows deleted
         """
         return await self.runInteraction(
-            desc, self.simple_delete_many_txn, table, column, iterable, keyvalues
+            desc,
+            self.simple_delete_many_txn,
+            table,
+            column,
+            iterable,
+            keyvalues,
+            db_autocommit=True,
         )
 
     @staticmethod
@@ -1860,7 +1937,13 @@ class DatabasePool:
         """
 
         return await self.runInteraction(
-            desc, self.simple_search_list_txn, table, term, col, retcols
+            desc,
+            self.simple_search_list_txn,
+            table,
+            term,
+            col,
+            retcols,
+            db_autocommit=True,
         )
 
     @classmethod
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ad43bb05ab..f8f4bb9b3f 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -122,9 +122,7 @@ class KeyStore(SQLBaseStore):
             # param, which is itself the 2-tuple (server_name, key_id).
             invalidations.append((server_name, key_id))
 
-        await self.db_pool.runInteraction(
-            "store_server_verify_keys",
-            self.db_pool.simple_upsert_many_txn,
+        await self.db_pool.simple_upsert_many(
             table="server_signature_keys",
             key_names=("server_name", "key_id"),
             key_values=key_values,
@@ -135,6 +133,7 @@ class KeyStore(SQLBaseStore):
                 "verify_key",
             ),
             value_values=value_values,
+            desc="store_server_verify_keys",
         )
 
         invalidate = self._get_server_verify_key.invalidate
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 7d46090267..59207cadd4 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -208,42 +208,56 @@ class TransactionStore(TransactionWorkerStore):
         """
 
         self._destination_retry_cache.pop(destination, None)
-        return await self.db_pool.runInteraction(
-            "set_destination_retry_timings",
-            self._set_destination_retry_timings,
-            destination,
-            failure_ts,
-            retry_last_ts,
-            retry_interval,
-        )
+        if self.database_engine.can_native_upsert:
+            return await self.db_pool.runInteraction(
+                "set_destination_retry_timings",
+                self._set_destination_retry_timings_native,
+                destination,
+                failure_ts,
+                retry_last_ts,
+                retry_interval,
+                db_autocommit=True,  # Safe as its a single upsert
+            )
+        else:
+            return await self.db_pool.runInteraction(
+                "set_destination_retry_timings",
+                self._set_destination_retry_timings_emulated,
+                destination,
+                failure_ts,
+                retry_last_ts,
+                retry_interval,
+            )
 
-    def _set_destination_retry_timings(
+    def _set_destination_retry_timings_native(
         self, txn, destination, failure_ts, retry_last_ts, retry_interval
     ):
+        assert self.database_engine.can_native_upsert
+
+        # Upsert retry time interval if retry_interval is zero (i.e. we're
+        # resetting it) or greater than the existing retry interval.
+        #
+        # WARNING: This is executed in autocommit, so we shouldn't add any more
+        # SQL calls in here (without being very careful).
+        sql = """
+            INSERT INTO destinations (
+                destination, failure_ts, retry_last_ts, retry_interval
+            )
+                VALUES (?, ?, ?, ?)
+            ON CONFLICT (destination) DO UPDATE SET
+                    failure_ts = EXCLUDED.failure_ts,
+                    retry_last_ts = EXCLUDED.retry_last_ts,
+                    retry_interval = EXCLUDED.retry_interval
+                WHERE
+                    EXCLUDED.retry_interval = 0
+                    OR destinations.retry_interval IS NULL
+                    OR destinations.retry_interval < EXCLUDED.retry_interval
+        """
 
-        if self.database_engine.can_native_upsert:
-            # Upsert retry time interval if retry_interval is zero (i.e. we're
-            # resetting it) or greater than the existing retry interval.
-
-            sql = """
-                INSERT INTO destinations (
-                    destination, failure_ts, retry_last_ts, retry_interval
-                )
-                    VALUES (?, ?, ?, ?)
-                ON CONFLICT (destination) DO UPDATE SET
-                        failure_ts = EXCLUDED.failure_ts,
-                        retry_last_ts = EXCLUDED.retry_last_ts,
-                        retry_interval = EXCLUDED.retry_interval
-                    WHERE
-                        EXCLUDED.retry_interval = 0
-                        OR destinations.retry_interval IS NULL
-                        OR destinations.retry_interval < EXCLUDED.retry_interval
-            """
-
-            txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
-
-            return
+        txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
 
+    def _set_destination_retry_timings_emulated(
+        self, txn, destination, failure_ts, retry_last_ts, retry_interval
+    ):
         self.database_engine.lock_table(txn, "destinations")
 
         # We need to be careful here as the data may have changed from under us
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 5a390ff2f6..d87ceec6da 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -480,21 +480,16 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             user_id_tuples: iterable of 2-tuple of user IDs.
         """
 
-        def _add_users_who_share_room_txn(txn):
-            self.db_pool.simple_upsert_many_txn(
-                txn,
-                table="users_who_share_private_rooms",
-                key_names=["user_id", "other_user_id", "room_id"],
-                key_values=[
-                    (user_id, other_user_id, room_id)
-                    for user_id, other_user_id in user_id_tuples
-                ],
-                value_names=(),
-                value_values=None,
-            )
-
-        await self.db_pool.runInteraction(
-            "add_users_who_share_room", _add_users_who_share_room_txn
+        await self.db_pool.simple_upsert_many(
+            table="users_who_share_private_rooms",
+            key_names=["user_id", "other_user_id", "room_id"],
+            key_values=[
+                (user_id, other_user_id, room_id)
+                for user_id, other_user_id in user_id_tuples
+            ],
+            value_names=(),
+            value_values=None,
+            desc="add_users_who_share_room",
         )
 
     async def add_users_in_public_rooms(
@@ -508,19 +503,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             user_ids
         """
 
-        def _add_users_in_public_rooms_txn(txn):
-
-            self.db_pool.simple_upsert_many_txn(
-                txn,
-                table="users_in_public_rooms",
-                key_names=["user_id", "room_id"],
-                key_values=[(user_id, room_id) for user_id in user_ids],
-                value_names=(),
-                value_values=None,
-            )
-
-        await self.db_pool.runInteraction(
-            "add_users_in_public_rooms", _add_users_in_public_rooms_txn
+        await self.db_pool.simple_upsert_many(
+            table="users_in_public_rooms",
+            key_names=["user_id", "room_id"],
+            key_values=[(user_id, room_id) for user_id in user_ids],
+            value_names=(),
+            value_values=None,
+            desc="add_users_in_public_rooms",
         )
 
     async def delete_all_from_user_dir(self) -> None: