summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/database.py10
-rw-r--r--synapse/storage/engines/_base.py19
-rw-r--r--synapse/storage/engines/postgres.py29
-rw-r--r--synapse/storage/engines/sqlite.py7
4 files changed, 60 insertions, 5 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 57cc1d76e0..7455326ed3 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -702,6 +702,7 @@ class DatabasePool:
         func: Callable[..., R],
         *args: Any,
         db_autocommit: bool = False,
+        isolation_level: Optional[int] = None,
         **kwargs: Any,
     ) -> R:
         """Starts a transaction on the database and runs a given function
@@ -724,6 +725,7 @@ class DatabasePool:
                 called multiple times if the transaction is retried, so must
                 correctly handle that case.
 
+            isolation_level: Set the server isolation level for this transaction.
             args: positional args to pass to `func`
             kwargs: named args to pass to `func`
 
@@ -763,6 +765,7 @@ class DatabasePool:
         func: Callable[..., R],
         *args: Any,
         db_autocommit: bool = False,
+        isolation_level: Optional[int] = None,
         **kwargs: Any,
     ) -> R:
         """Wraps the .runWithConnection() method on the underlying db_pool.
@@ -775,6 +778,7 @@ class DatabasePool:
             db_autocommit: Whether to run the function in "autocommit" mode,
                 i.e. outside of a transaction. This is useful for transaction
                 that are only a single query. Currently only affects postgres.
+            isolation_level: Set the server isolation level for this transaction.
             kwargs: named args to pass to `func`
 
         Returns:
@@ -834,6 +838,10 @@ class DatabasePool:
                     try:
                         if db_autocommit:
                             self.engine.attempt_to_set_autocommit(conn, True)
+                        if isolation_level is not None:
+                            self.engine.attempt_to_set_isolation_level(
+                                conn, isolation_level
+                            )
 
                         db_conn = LoggingDatabaseConnection(
                             conn, self.engine, "runWithConnection"
@@ -842,6 +850,8 @@ class DatabasePool:
                     finally:
                         if db_autocommit:
                             self.engine.attempt_to_set_autocommit(conn, False)
+                        if isolation_level:
+                            self.engine.attempt_to_set_isolation_level(conn, None)
 
         return await make_deferred_yieldable(
             self._db_pool.runWithConnection(inner_func, *args, **kwargs)
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 20cd63c330..143cd98ca2 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -12,11 +12,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import abc
-from typing import Generic, TypeVar
+from enum import IntEnum
+from typing import Generic, Optional, TypeVar
 
 from synapse.storage.types import Connection
 
 
+class IsolationLevel(IntEnum):
+    READ_COMMITTED: int = 1
+    REPEATABLE_READ: int = 2
+    SERIALIZABLE: int = 3
+
+
 class IncorrectDatabaseSetup(RuntimeError):
     pass
 
@@ -109,3 +116,13 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
         commit/rollback the connections.
         """
         ...
+
+    @abc.abstractmethod
+    def attempt_to_set_isolation_level(
+        self, conn: Connection, isolation_level: Optional[int]
+    ):
+        """Attempt to set the connections isolation level.
+
+        Note: This has no effect on SQLite3, as transactions are SERIALIZABLE by default.
+        """
+        ...
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index b3d71f661c..808342fafb 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -13,8 +13,13 @@
 # limitations under the License.
 
 import logging
+from typing import Mapping, Optional
 
-from synapse.storage.engines._base import BaseDatabaseEngine, IncorrectDatabaseSetup
+from synapse.storage.engines._base import (
+    BaseDatabaseEngine,
+    IncorrectDatabaseSetup,
+    IsolationLevel,
+)
 from synapse.storage.types import Connection
 
 logger = logging.getLogger(__name__)
@@ -34,6 +39,15 @@ class PostgresEngine(BaseDatabaseEngine):
         self.synchronous_commit = database_config.get("synchronous_commit", True)
         self._version = None  # unknown as yet
 
+        self.isolation_level_map: Mapping[int, int] = {
+            IsolationLevel.READ_COMMITTED: self.module.extensions.ISOLATION_LEVEL_READ_COMMITTED,
+            IsolationLevel.REPEATABLE_READ: self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ,
+            IsolationLevel.SERIALIZABLE: self.module.extensions.ISOLATION_LEVEL_SERIALIZABLE,
+        }
+        self.default_isolation_level = (
+            self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
+        )
+
     @property
     def single_threaded(self) -> bool:
         return False
@@ -104,9 +118,7 @@ class PostgresEngine(BaseDatabaseEngine):
         return sql.replace("?", "%s")
 
     def on_new_connection(self, db_conn):
-        db_conn.set_isolation_level(
-            self.module.extensions.ISOLATION_LEVEL_REPEATABLE_READ
-        )
+        db_conn.set_isolation_level(self.default_isolation_level)
 
         # Set the bytea output to escape, vs the default of hex
         cursor = db_conn.cursor()
@@ -175,3 +187,12 @@ class PostgresEngine(BaseDatabaseEngine):
 
     def attempt_to_set_autocommit(self, conn: Connection, autocommit: bool):
         return conn.set_session(autocommit=autocommit)  # type: ignore
+
+    def attempt_to_set_isolation_level(
+        self, conn: Connection, isolation_level: Optional[int]
+    ):
+        if isolation_level is None:
+            isolation_level = self.default_isolation_level
+        else:
+            isolation_level = self.isolation_level_map[isolation_level]
+        return conn.set_isolation_level(isolation_level)  # type: ignore
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 70d17d4f2c..6c19e55999 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -15,6 +15,7 @@ import platform
 import struct
 import threading
 import typing
+from typing import Optional
 
 from synapse.storage.engines import BaseDatabaseEngine
 from synapse.storage.types import Connection
@@ -122,6 +123,12 @@ class Sqlite3Engine(BaseDatabaseEngine["sqlite3.Connection"]):
         # set the connection to autocommit mode.
         pass
 
+    def attempt_to_set_isolation_level(
+        self, conn: Connection, isolation_level: Optional[int]
+    ):
+        # All transactions are SERIALIZABLE by default in sqllite
+        pass
+
 
 # Following functions taken from: https://github.com/coleifer/peewee