summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2022-09-09 11:14:10 +0100
committerGitHub <noreply@github.com>2022-09-09 11:14:10 +0100
commitf2d2481e56f06005de5ae8429eca3bb31834079e (patch)
treea01d3458c529efb4b76eb8f7ef037ca5e580386e /synapse/storage
parentRe-type hint some collections in `/sync` code as read-only (#13754) (diff)
downloadsynapse-f2d2481e56f06005de5ae8429eca3bb31834079e.tar.xz
Require SQLite >= 3.27.0 (#13760)
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/database.py47
-rw-r--r--synapse/storage/databases/main/lock.py121
-rw-r--r--synapse/storage/databases/main/stats.py86
-rw-r--r--synapse/storage/databases/main/transactions.py30
-rw-r--r--synapse/storage/engines/_base.py8
-rw-r--r--synapse/storage/engines/postgres.py7
-rw-r--r--synapse/storage/engines/sqlite.py13
7 files changed, 105 insertions, 207 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b394a6658b..e881bff7fb 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -533,15 +533,14 @@ class DatabasePool:
         if isinstance(self.engine, Sqlite3Engine):
             self._unsafe_to_upsert_tables.add("user_directory_search")
 
-        if self.engine.can_native_upsert:
-            # Check ASAP (and then later, every 1s) to see if we have finished
-            # background updates of tables that aren't safe to update.
-            self._clock.call_later(
-                0.0,
-                run_as_background_process,
-                "upsert_safety_check",
-                self._check_safe_to_upsert,
-            )
+        # Check ASAP (and then later, every 1s) to see if we have finished
+        # background updates of tables that aren't safe to update.
+        self._clock.call_later(
+            0.0,
+            run_as_background_process,
+            "upsert_safety_check",
+            self._check_safe_to_upsert,
+        )
 
     def name(self) -> str:
         "Return the name of this database"
@@ -1160,11 +1159,8 @@ class DatabasePool:
         attempts = 0
         while True:
             try:
-                # We can autocommit if we are going to use native upserts
-                autocommit = (
-                    self.engine.can_native_upsert
-                    and table not in self._unsafe_to_upsert_tables
-                )
+                # We can autocommit if it is safe to upsert
+                autocommit = table not in self._unsafe_to_upsert_tables
 
                 return await self.runInteraction(
                     desc,
@@ -1199,7 +1195,7 @@ class DatabasePool:
     ) -> bool:
         """
         Pick the UPSERT method which works best on the platform. Either the
-        native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+        native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method.
 
         Args:
             txn: The transaction to use.
@@ -1207,14 +1203,15 @@ class DatabasePool:
             keyvalues: The unique key tables and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
-            lock: True to lock the table when doing the upsert.
+            lock: True to lock the table when doing the upsert. Unused when performing
+                a native upsert.
         Returns:
             Returns True if a row was inserted or updated (i.e. if `values` is
             not empty then this always returns True)
         """
         insertion_values = insertion_values or {}
 
-        if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+        if table not in self._unsafe_to_upsert_tables:
             return self.simple_upsert_txn_native_upsert(
                 txn, table, keyvalues, values, insertion_values=insertion_values
             )
@@ -1365,14 +1362,12 @@ class DatabasePool:
             value_names: The value column names
             value_values: A list of each row's value column values.
                 Ignored if value_names is empty.
-            lock: True to lock the table when doing the upsert. Unused if the database engine
-                supports native upserts.
+            lock: True to lock the table when doing the upsert. Unused when performing
+                a native upsert.
         """
 
-        # We can autocommit if we are going to use native upserts
-        autocommit = (
-            self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
-        )
+        # We can autocommit if it safe to upsert
+        autocommit = table not in self._unsafe_to_upsert_tables
 
         await self.runInteraction(
             desc,
@@ -1406,10 +1401,10 @@ class DatabasePool:
             value_names: The value column names
             value_values: A list of each row's value column values.
                 Ignored if value_names is empty.
-            lock: True to lock the table when doing the upsert. Unused if the database engine
-                supports native upserts.
+            lock: True to lock the table when doing the upsert. Unused when performing
+                a native upsert.
         """
-        if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+        if table not in self._unsafe_to_upsert_tables:
             return self.simple_upsert_many_txn_native_upsert(
                 txn, table, key_names, key_values, value_names, value_values
             )
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
         now = self._clock.time_msec()
         token = random_string(6)
 
-        if self.db_pool.engine.can_native_upsert:
-
-            def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
-                # We take out the lock if either a) there is no row for the lock
-                # already, b) the existing row has timed out, or c) the row is
-                # for this instance (which means the process got killed and
-                # restarted)
-                sql = """
-                    INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
-                    VALUES (?, ?, ?, ?, ?)
-                    ON CONFLICT (lock_name, lock_key)
-                    DO UPDATE
-                        SET
-                            token = EXCLUDED.token,
-                            instance_name = EXCLUDED.instance_name,
-                            last_renewed_ts = EXCLUDED.last_renewed_ts
-                        WHERE
-                            worker_locks.last_renewed_ts < ?
-                            OR worker_locks.instance_name = EXCLUDED.instance_name
-                """
-                txn.execute(
-                    sql,
-                    (
-                        lock_name,
-                        lock_key,
-                        self._instance_name,
-                        token,
-                        now,
-                        now - _LOCK_TIMEOUT_MS,
-                    ),
-                )
-
-                # We only acquired the lock if we inserted or updated the table.
-                return bool(txn.rowcount)
-
-            did_lock = await self.db_pool.runInteraction(
-                "try_acquire_lock",
-                _try_acquire_lock_txn,
-                # We can autocommit here as we're executing a single query, this
-                # will avoid serialization errors.
-                db_autocommit=True,
+        def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
+            # We take out the lock if either a) there is no row for the lock
+            # already, b) the existing row has timed out, or c) the row is
+            # for this instance (which means the process got killed and
+            # restarted)
+            sql = """
+               INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
+               VALUES (?, ?, ?, ?, ?)
+               ON CONFLICT (lock_name, lock_key)
+               DO UPDATE
+                   SET
+                       token = EXCLUDED.token,
+                       instance_name = EXCLUDED.instance_name,
+                       last_renewed_ts = EXCLUDED.last_renewed_ts
+                   WHERE
+                       worker_locks.last_renewed_ts < ?
+                       OR worker_locks.instance_name = EXCLUDED.instance_name
+           """
+            txn.execute(
+                sql,
+                (
+                    lock_name,
+                    lock_key,
+                    self._instance_name,
+                    token,
+                    now,
+                    now - _LOCK_TIMEOUT_MS,
+                ),
             )
-            if not did_lock:
-                return None
-
-        else:
-            # If we're on an old SQLite we emulate the above logic by first
-            # clearing out any existing stale locks and then upserting.
-
-            def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
-                sql = """
-                    DELETE FROM worker_locks
-                    WHERE
-                        lock_name = ?
-                        AND lock_key = ?
-                        AND (last_renewed_ts < ? OR instance_name = ?)
-                """
-                txn.execute(
-                    sql,
-                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
-                )
-
-                inserted = self.db_pool.simple_upsert_txn_emulated(
-                    txn,
-                    table="worker_locks",
-                    keyvalues={
-                        "lock_name": lock_name,
-                        "lock_key": lock_key,
-                    },
-                    values={},
-                    insertion_values={
-                        "token": token,
-                        "last_renewed_ts": self._clock.time_msec(),
-                        "instance_name": self._instance_name,
-                    },
-                )
-
-                return inserted
 
-            did_lock = await self.db_pool.runInteraction(
-                "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
-            )
+            # We only acquired the lock if we inserted or updated the table.
+            return bool(txn.rowcount)
 
-            if not did_lock:
-                return None
+        did_lock = await self.db_pool.runInteraction(
+            "try_acquire_lock",
+            _try_acquire_lock_txn,
+            # We can autocommit here as we're executing a single query, this
+            # will avoid serialization errors.
+            db_autocommit=True,
+        )
+        if not did_lock:
+            return None
 
         lock = Lock(
             self._reactor,
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b4c652acf3..356d4ca788 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore):
             absolutes: Absolute (set) fields
             additive_relatives: Fields that will be added onto if existing row present.
         """
-        if self.database_engine.can_native_upsert:
-            absolute_updates = [
-                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
-                for field in absolutes.keys()
-            ]
-
-            relative_updates = [
-                "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
-                % {"table": table, "field": field}
-                for field in additive_relatives.keys()
-            ]
-
-            insert_cols = []
-            qargs = []
-
-            for (key, val) in chain(
-                keyvalues.items(), absolutes.items(), additive_relatives.items()
-            ):
-                insert_cols.append(key)
-                qargs.append(val)
+        absolute_updates = [
+            "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+            for field in absolutes.keys()
+        ]
+
+        relative_updates = [
+            "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
+            % {"table": table, "field": field}
+            for field in additive_relatives.keys()
+        ]
+
+        insert_cols = []
+        qargs = []
+
+        for (key, val) in chain(
+            keyvalues.items(), absolutes.items(), additive_relatives.items()
+        ):
+            insert_cols.append(key)
+            qargs.append(val)
+
+        sql = """
+            INSERT INTO %(table)s (%(insert_cols_cs)s)
+            VALUES (%(insert_vals_qs)s)
+            ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+        """ % {
+            "table": table,
+            "insert_cols_cs": ", ".join(insert_cols),
+            "insert_vals_qs": ", ".join(
+                ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+            ),
+            "key_columns": ", ".join(keyvalues),
+            "updates": ", ".join(chain(absolute_updates, relative_updates)),
+        }
 
-            sql = """
-                INSERT INTO %(table)s (%(insert_cols_cs)s)
-                VALUES (%(insert_vals_qs)s)
-                ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
-            """ % {
-                "table": table,
-                "insert_cols_cs": ", ".join(insert_cols),
-                "insert_vals_qs": ", ".join(
-                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
-                ),
-                "key_columns": ", ".join(keyvalues),
-                "updates": ", ".join(chain(absolute_updates, relative_updates)),
-            }
-
-            txn.execute(sql, qargs)
-        else:
-            self.database_engine.lock_table(txn, table)
-            retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
-            current_row = self.db_pool.simple_select_one_txn(
-                txn, table, keyvalues, retcols, allow_none=True
-            )
-            if current_row is None:
-                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
-                self.db_pool.simple_insert_txn(txn, table, merged_dict)
-            else:
-                for (key, val) in additive_relatives.items():
-                    if current_row[key] is None:
-                        current_row[key] = val
-                    else:
-                        current_row[key] += val
-                current_row.update(absolutes)
-                self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
+        txn.execute(sql, qargs)
 
     async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
         """Calculate and insert an entry into room_stats_current.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index ba79e19f7f..f8c6877ee8 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             retry_interval: how long until next retry in ms
         """
 
-        if self.database_engine.can_native_upsert:
-            await self.db_pool.runInteraction(
-                "set_destination_retry_timings",
-                self._set_destination_retry_timings_native,
-                destination,
-                failure_ts,
-                retry_last_ts,
-                retry_interval,
-                db_autocommit=True,  # Safe as its a single upsert
-            )
-        else:
-            await self.db_pool.runInteraction(
-                "set_destination_retry_timings",
-                self._set_destination_retry_timings_emulated,
-                destination,
-                failure_ts,
-                retry_last_ts,
-                retry_interval,
-            )
+        await self.db_pool.runInteraction(
+            "set_destination_retry_timings",
+            self._set_destination_retry_timings_native,
+            destination,
+            failure_ts,
+            retry_last_ts,
+            retry_interval,
+            db_autocommit=True,  # Safe as it's a single upsert
+        )
 
     def _set_destination_retry_timings_native(
         self,
@@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         retry_last_ts: int,
         retry_interval: int,
     ) -> None:
-        assert self.database_engine.can_native_upsert
-
         # Upsert retry time interval if retry_interval is zero (i.e. we're
         # resetting it) or greater than the existing retry interval.
         #
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 971ff82693..0d16a419a4 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -45,14 +45,6 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
 
     @property
     @abc.abstractmethod
-    def can_native_upsert(self) -> bool:
-        """
-        Do we support native UPSERTs?
-        """
-        ...
-
-    @property
-    @abc.abstractmethod
     def supports_using_any_list(self) -> bool:
         """
         Do we support using `a = ANY(?)` and passing a list
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 517f9d5f98..7f7d006ac2 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -159,13 +159,6 @@ class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
         db_conn.commit()
 
     @property
-    def can_native_upsert(self) -> bool:
-        """
-        Can we use native UPSERTs?
-        """
-        return True
-
-    @property
     def supports_using_any_list(self) -> bool:
         """Do we support using `a = ANY(?)` and passing a list"""
         return True
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 621f2c5efe..095ae0a096 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -49,14 +49,6 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
         return True
 
     @property
-    def can_native_upsert(self) -> bool:
-        """
-        Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
-        more work we haven't done yet to tell what was inserted vs updated.
-        """
-        return sqlite3.sqlite_version_info >= (3, 24, 0)
-
-    @property
     def supports_using_any_list(self) -> bool:
         """Do we support using `a = ANY(?)` and passing a list"""
         return False
@@ -70,12 +62,11 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
         self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False
     ) -> None:
         if not allow_outdated_version:
-            version = sqlite3.sqlite_version_info
             # Synapse is untested against older SQLite versions, and we don't want
             # to let users upgrade to a version of Synapse with broken support for their
             # sqlite version, because it risks leaving them with a half-upgraded db.
-            if version < (3, 22, 0):
-                raise RuntimeError("Synapse requires sqlite 3.22 or above.")
+            if sqlite3.sqlite_version_info < (3, 27, 0):
+                raise RuntimeError("Synapse requires sqlite 3.27 or above.")
 
     def check_new_database(self, txn: Cursor) -> None:
         """Gets called when setting up a brand new database. This allows us to