summary refs log tree commit diff
path: root/synapse/storage/databases/main/lock.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/lock.py')
-rw-r--r--synapse/storage/databases/main/lock.py121
1 files changed, 39 insertions, 82 deletions
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
         now = self._clock.time_msec()
         token = random_string(6)
 
-        if self.db_pool.engine.can_native_upsert:
-
-            def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
-                # We take out the lock if either a) there is no row for the lock
-                # already, b) the existing row has timed out, or c) the row is
-                # for this instance (which means the process got killed and
-                # restarted)
-                sql = """
-                    INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
-                    VALUES (?, ?, ?, ?, ?)
-                    ON CONFLICT (lock_name, lock_key)
-                    DO UPDATE
-                        SET
-                            token = EXCLUDED.token,
-                            instance_name = EXCLUDED.instance_name,
-                            last_renewed_ts = EXCLUDED.last_renewed_ts
-                        WHERE
-                            worker_locks.last_renewed_ts < ?
-                            OR worker_locks.instance_name = EXCLUDED.instance_name
-                """
-                txn.execute(
-                    sql,
-                    (
-                        lock_name,
-                        lock_key,
-                        self._instance_name,
-                        token,
-                        now,
-                        now - _LOCK_TIMEOUT_MS,
-                    ),
-                )
-
-                # We only acquired the lock if we inserted or updated the table.
-                return bool(txn.rowcount)
-
-            did_lock = await self.db_pool.runInteraction(
-                "try_acquire_lock",
-                _try_acquire_lock_txn,
-                # We can autocommit here as we're executing a single query, this
-                # will avoid serialization errors.
-                db_autocommit=True,
+        def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
+            # We take out the lock if either a) there is no row for the lock
+            # already, b) the existing row has timed out, or c) the row is
+            # for this instance (which means the process got killed and
+            # restarted)
+            sql = """
+               INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
+               VALUES (?, ?, ?, ?, ?)
+               ON CONFLICT (lock_name, lock_key)
+               DO UPDATE
+                   SET
+                       token = EXCLUDED.token,
+                       instance_name = EXCLUDED.instance_name,
+                       last_renewed_ts = EXCLUDED.last_renewed_ts
+                   WHERE
+                       worker_locks.last_renewed_ts < ?
+                       OR worker_locks.instance_name = EXCLUDED.instance_name
+           """
+            txn.execute(
+                sql,
+                (
+                    lock_name,
+                    lock_key,
+                    self._instance_name,
+                    token,
+                    now,
+                    now - _LOCK_TIMEOUT_MS,
+                ),
             )
-            if not did_lock:
-                return None
-
-        else:
-            # If we're on an old SQLite we emulate the above logic by first
-            # clearing out any existing stale locks and then upserting.
-
-            def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
-                sql = """
-                    DELETE FROM worker_locks
-                    WHERE
-                        lock_name = ?
-                        AND lock_key = ?
-                        AND (last_renewed_ts < ? OR instance_name = ?)
-                """
-                txn.execute(
-                    sql,
-                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
-                )
-
-                inserted = self.db_pool.simple_upsert_txn_emulated(
-                    txn,
-                    table="worker_locks",
-                    keyvalues={
-                        "lock_name": lock_name,
-                        "lock_key": lock_key,
-                    },
-                    values={},
-                    insertion_values={
-                        "token": token,
-                        "last_renewed_ts": self._clock.time_msec(),
-                        "instance_name": self._instance_name,
-                    },
-                )
-
-                return inserted
 
-            did_lock = await self.db_pool.runInteraction(
-                "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
-            )
+            # We only acquired the lock if we inserted or updated the table.
+            return bool(txn.rowcount)
 
-            if not did_lock:
-                return None
+        did_lock = await self.db_pool.runInteraction(
+            "try_acquire_lock",
+            _try_acquire_lock_txn,
+            # We can autocommit here as we're executing a single query, this
+            # will avoid serialization errors.
+            db_autocommit=True,
+        )
+        if not did_lock:
+            return None
 
         lock = Lock(
             self._reactor,