summary refs log tree commit diff
path: root/synapse/storage/database.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/database.py')
-rw-r--r--synapse/storage/database.py63
1 files changed, 60 insertions, 3 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 79ec8f119d..6116191b16 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -403,6 +403,24 @@ class DatabasePool:
         *args: Any,
         **kwargs: Any
     ) -> R:
+        """Start a new database transaction with the given connection.
+
+        Note: The given func may be called multiple times under certain
+        failure modes. This is normally fine when in a standard transaction,
+        but care must be taken if the connection is in `autocommit` mode that
+        the function will correctly handle being aborted and retried half way
+        through its execution.
+
+        Args:
+            conn
+            desc
+            after_callbacks
+            exception_callbacks
+            func
+            *args
+            **kwargs
+        """
+
         start = monotonic_time()
         txn_id = self._TXN_ID
 
@@ -508,7 +526,12 @@ class DatabasePool:
             sql_txn_timer.labels(desc).observe(duration)
 
     async def runInteraction(
-        self, desc: str, func: "Callable[..., R]", *args: Any, **kwargs: Any
+        self,
+        desc: str,
+        func: "Callable[..., R]",
+        *args: Any,
+        db_autocommit: bool = False,
+        **kwargs: Any
     ) -> R:
         """Starts a transaction on the database and runs a given function
 
@@ -518,6 +541,18 @@ class DatabasePool:
                 database transaction (twisted.enterprise.adbapi.Transaction) as
                 its first argument, followed by `args` and `kwargs`.
 
+            db_autocommit: Whether to run the function in "autocommit" mode,
+                i.e. outside of a transaction. This is useful for transactions
+                that are only a single query.
+
+                Currently, this is only implemented for Postgres. SQLite will still
+                run the function inside a transaction.
+
+                WARNING: This means that if func fails half way through then
+                the changes will *not* be rolled back. `func` may also get
+                called multiple times if the transaction is retried, so must
+                correctly handle that case.
+
             args: positional args to pass to `func`
             kwargs: named args to pass to `func`
 
@@ -538,6 +573,7 @@ class DatabasePool:
                 exception_callbacks,
                 func,
                 *args,
+                db_autocommit=db_autocommit,
                 **kwargs
             )
 
@@ -551,7 +587,11 @@ class DatabasePool:
         return cast(R, result)
 
     async def runWithConnection(
-        self, func: "Callable[..., R]", *args: Any, **kwargs: Any
+        self,
+        func: "Callable[..., R]",
+        *args: Any,
+        db_autocommit: bool = False,
+        **kwargs: Any
     ) -> R:
         """Wraps the .runWithConnection() method on the underlying db_pool.
 
@@ -560,6 +600,9 @@ class DatabasePool:
                 database connection (twisted.enterprise.adbapi.Connection) as
                 its first argument, followed by `args` and `kwargs`.
             args: positional args to pass to `func`
+            db_autocommit: Whether to run the function in "autocommit" mode,
+                i.e. outside of a transaction. This is useful for transaction
+                that are only a single query. Currently only affects postgres.
             kwargs: named args to pass to `func`
 
         Returns:
@@ -575,6 +618,13 @@ class DatabasePool:
         start_time = monotonic_time()
 
         def inner_func(conn, *args, **kwargs):
+            # We shouldn't be in a transaction. If we are then something
+            # somewhere hasn't committed after doing work. (This is likely only
+            # possible during startup, as `run*` will ensure changes are
+            # committed/rolled back before putting the connection back in the
+            # pool).
+            assert not self.engine.in_transaction(conn)
+
             with LoggingContext("runWithConnection", parent_context) as context:
                 sched_duration_sec = monotonic_time() - start_time
                 sql_scheduling_timer.observe(sched_duration_sec)
@@ -584,7 +634,14 @@ class DatabasePool:
                     logger.debug("Reconnecting closed database connection")
                     conn.reconnect()
 
-                return func(conn, *args, **kwargs)
+                try:
+                    if db_autocommit:
+                        self.engine.attempt_to_set_autocommit(conn, True)
+
+                    return func(conn, *args, **kwargs)
+                finally:
+                    if db_autocommit:
+                        self.engine.attempt_to_set_autocommit(conn, False)
 
         return await make_deferred_yieldable(
             self._db_pool.runWithConnection(inner_func, *args, **kwargs)