summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorDavid Robertson <davidr@element.io>2022-09-09 11:14:10 +0100
committerGitHub <noreply@github.com>2022-09-09 11:14:10 +0100
commitf2d2481e56f06005de5ae8429eca3bb31834079e (patch)
treea01d3458c529efb4b76eb8f7ef037ca5e580386e /synapse/storage/databases
parentRe-type hint some collections in `/sync` code as read-only (#13754) (diff)
downloadsynapse-f2d2481e56f06005de5ae8429eca3bb31834079e.tar.xz
Require SQLite >= 3.27.0 (#13760)
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/lock.py121
-rw-r--r--synapse/storage/databases/main/stats.py86
-rw-r--r--synapse/storage/databases/main/transactions.py30
3 files changed, 82 insertions, 155 deletions
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
         now = self._clock.time_msec()
         token = random_string(6)
 
-        if self.db_pool.engine.can_native_upsert:
-
-            def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
-                # We take out the lock if either a) there is no row for the lock
-                # already, b) the existing row has timed out, or c) the row is
-                # for this instance (which means the process got killed and
-                # restarted)
-                sql = """
-                    INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
-                    VALUES (?, ?, ?, ?, ?)
-                    ON CONFLICT (lock_name, lock_key)
-                    DO UPDATE
-                        SET
-                            token = EXCLUDED.token,
-                            instance_name = EXCLUDED.instance_name,
-                            last_renewed_ts = EXCLUDED.last_renewed_ts
-                        WHERE
-                            worker_locks.last_renewed_ts < ?
-                            OR worker_locks.instance_name = EXCLUDED.instance_name
-                """
-                txn.execute(
-                    sql,
-                    (
-                        lock_name,
-                        lock_key,
-                        self._instance_name,
-                        token,
-                        now,
-                        now - _LOCK_TIMEOUT_MS,
-                    ),
-                )
-
-                # We only acquired the lock if we inserted or updated the table.
-                return bool(txn.rowcount)
-
-            did_lock = await self.db_pool.runInteraction(
-                "try_acquire_lock",
-                _try_acquire_lock_txn,
-                # We can autocommit here as we're executing a single query, this
-                # will avoid serialization errors.
-                db_autocommit=True,
+        def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
+            # We take out the lock if either a) there is no row for the lock
+            # already, b) the existing row has timed out, or c) the row is
+            # for this instance (which means the process got killed and
+            # restarted)
+            sql = """
+               INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
+               VALUES (?, ?, ?, ?, ?)
+               ON CONFLICT (lock_name, lock_key)
+               DO UPDATE
+                   SET
+                       token = EXCLUDED.token,
+                       instance_name = EXCLUDED.instance_name,
+                       last_renewed_ts = EXCLUDED.last_renewed_ts
+                   WHERE
+                       worker_locks.last_renewed_ts < ?
+                       OR worker_locks.instance_name = EXCLUDED.instance_name
+           """
+            txn.execute(
+                sql,
+                (
+                    lock_name,
+                    lock_key,
+                    self._instance_name,
+                    token,
+                    now,
+                    now - _LOCK_TIMEOUT_MS,
+                ),
             )
-            if not did_lock:
-                return None
-
-        else:
-            # If we're on an old SQLite we emulate the above logic by first
-            # clearing out any existing stale locks and then upserting.
-
-            def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
-                sql = """
-                    DELETE FROM worker_locks
-                    WHERE
-                        lock_name = ?
-                        AND lock_key = ?
-                        AND (last_renewed_ts < ? OR instance_name = ?)
-                """
-                txn.execute(
-                    sql,
-                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
-                )
-
-                inserted = self.db_pool.simple_upsert_txn_emulated(
-                    txn,
-                    table="worker_locks",
-                    keyvalues={
-                        "lock_name": lock_name,
-                        "lock_key": lock_key,
-                    },
-                    values={},
-                    insertion_values={
-                        "token": token,
-                        "last_renewed_ts": self._clock.time_msec(),
-                        "instance_name": self._instance_name,
-                    },
-                )
-
-                return inserted
 
-            did_lock = await self.db_pool.runInteraction(
-                "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
-            )
+            # We only acquired the lock if we inserted or updated the table.
+            return bool(txn.rowcount)
 
-            if not did_lock:
-                return None
+        did_lock = await self.db_pool.runInteraction(
+            "try_acquire_lock",
+            _try_acquire_lock_txn,
+            # We can autocommit here as we're executing a single query, this
+            # will avoid serialization errors.
+            db_autocommit=True,
+        )
+        if not did_lock:
+            return None
 
         lock = Lock(
             self._reactor,
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b4c652acf3..356d4ca788 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore):
             absolutes: Absolute (set) fields
             additive_relatives: Fields that will be added onto if existing row present.
         """
-        if self.database_engine.can_native_upsert:
-            absolute_updates = [
-                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
-                for field in absolutes.keys()
-            ]
-
-            relative_updates = [
-                "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
-                % {"table": table, "field": field}
-                for field in additive_relatives.keys()
-            ]
-
-            insert_cols = []
-            qargs = []
-
-            for (key, val) in chain(
-                keyvalues.items(), absolutes.items(), additive_relatives.items()
-            ):
-                insert_cols.append(key)
-                qargs.append(val)
+        absolute_updates = [
+            "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+            for field in absolutes.keys()
+        ]
+
+        relative_updates = [
+            "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
+            % {"table": table, "field": field}
+            for field in additive_relatives.keys()
+        ]
+
+        insert_cols = []
+        qargs = []
+
+        for (key, val) in chain(
+            keyvalues.items(), absolutes.items(), additive_relatives.items()
+        ):
+            insert_cols.append(key)
+            qargs.append(val)
+
+        sql = """
+            INSERT INTO %(table)s (%(insert_cols_cs)s)
+            VALUES (%(insert_vals_qs)s)
+            ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+        """ % {
+            "table": table,
+            "insert_cols_cs": ", ".join(insert_cols),
+            "insert_vals_qs": ", ".join(
+                ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+            ),
+            "key_columns": ", ".join(keyvalues),
+            "updates": ", ".join(chain(absolute_updates, relative_updates)),
+        }
 
-            sql = """
-                INSERT INTO %(table)s (%(insert_cols_cs)s)
-                VALUES (%(insert_vals_qs)s)
-                ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
-            """ % {
-                "table": table,
-                "insert_cols_cs": ", ".join(insert_cols),
-                "insert_vals_qs": ", ".join(
-                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
-                ),
-                "key_columns": ", ".join(keyvalues),
-                "updates": ", ".join(chain(absolute_updates, relative_updates)),
-            }
-
-            txn.execute(sql, qargs)
-        else:
-            self.database_engine.lock_table(txn, table)
-            retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
-            current_row = self.db_pool.simple_select_one_txn(
-                txn, table, keyvalues, retcols, allow_none=True
-            )
-            if current_row is None:
-                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
-                self.db_pool.simple_insert_txn(txn, table, merged_dict)
-            else:
-                for (key, val) in additive_relatives.items():
-                    if current_row[key] is None:
-                        current_row[key] = val
-                    else:
-                        current_row[key] += val
-                current_row.update(absolutes)
-                self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
+        txn.execute(sql, qargs)
 
     async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
         """Calculate and insert an entry into room_stats_current.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index ba79e19f7f..f8c6877ee8 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             retry_interval: how long until next retry in ms
         """
 
-        if self.database_engine.can_native_upsert:
-            await self.db_pool.runInteraction(
-                "set_destination_retry_timings",
-                self._set_destination_retry_timings_native,
-                destination,
-                failure_ts,
-                retry_last_ts,
-                retry_interval,
-                db_autocommit=True,  # Safe as its a single upsert
-            )
-        else:
-            await self.db_pool.runInteraction(
-                "set_destination_retry_timings",
-                self._set_destination_retry_timings_emulated,
-                destination,
-                failure_ts,
-                retry_last_ts,
-                retry_interval,
-            )
+        await self.db_pool.runInteraction(
+            "set_destination_retry_timings",
+            self._set_destination_retry_timings_native,
+            destination,
+            failure_ts,
+            retry_last_ts,
+            retry_interval,
+            db_autocommit=True,  # Safe as it's a single upsert
+        )
 
     def _set_destination_retry_timings_native(
         self,
@@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         retry_last_ts: int,
         retry_interval: int,
     ) -> None:
-        assert self.database_engine.can_native_upsert
-
         # Upsert retry time interval if retry_interval is zero (i.e. we're
         # resetting it) or greater than the existing retry interval.
         #