diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
now = self._clock.time_msec()
token = random_string(6)
- if self.db_pool.engine.can_native_upsert:
-
- def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
- # We take out the lock if either a) there is no row for the lock
- # already, b) the existing row has timed out, or c) the row is
- # for this instance (which means the process got killed and
- # restarted)
- sql = """
- INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
- VALUES (?, ?, ?, ?, ?)
- ON CONFLICT (lock_name, lock_key)
- DO UPDATE
- SET
- token = EXCLUDED.token,
- instance_name = EXCLUDED.instance_name,
- last_renewed_ts = EXCLUDED.last_renewed_ts
- WHERE
- worker_locks.last_renewed_ts < ?
- OR worker_locks.instance_name = EXCLUDED.instance_name
- """
- txn.execute(
- sql,
- (
- lock_name,
- lock_key,
- self._instance_name,
- token,
- now,
- now - _LOCK_TIMEOUT_MS,
- ),
- )
-
- # We only acquired the lock if we inserted or updated the table.
- return bool(txn.rowcount)
-
- did_lock = await self.db_pool.runInteraction(
- "try_acquire_lock",
- _try_acquire_lock_txn,
- # We can autocommit here as we're executing a single query, this
- # will avoid serialization errors.
- db_autocommit=True,
+ def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
+ # We take out the lock if either a) there is no row for the lock
+ # already, b) the existing row has timed out, or c) the row is
+ # for this instance (which means the process got killed and
+ # restarted)
+ sql = """
+ INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
+ VALUES (?, ?, ?, ?, ?)
+ ON CONFLICT (lock_name, lock_key)
+ DO UPDATE
+ SET
+ token = EXCLUDED.token,
+ instance_name = EXCLUDED.instance_name,
+ last_renewed_ts = EXCLUDED.last_renewed_ts
+ WHERE
+ worker_locks.last_renewed_ts < ?
+ OR worker_locks.instance_name = EXCLUDED.instance_name
+ """
+ txn.execute(
+ sql,
+ (
+ lock_name,
+ lock_key,
+ self._instance_name,
+ token,
+ now,
+ now - _LOCK_TIMEOUT_MS,
+ ),
)
- if not did_lock:
- return None
-
- else:
- # If we're on an old SQLite we emulate the above logic by first
- # clearing out any existing stale locks and then upserting.
-
- def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
- sql = """
- DELETE FROM worker_locks
- WHERE
- lock_name = ?
- AND lock_key = ?
- AND (last_renewed_ts < ? OR instance_name = ?)
- """
- txn.execute(
- sql,
- (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
- )
-
- inserted = self.db_pool.simple_upsert_txn_emulated(
- txn,
- table="worker_locks",
- keyvalues={
- "lock_name": lock_name,
- "lock_key": lock_key,
- },
- values={},
- insertion_values={
- "token": token,
- "last_renewed_ts": self._clock.time_msec(),
- "instance_name": self._instance_name,
- },
- )
-
- return inserted
- did_lock = await self.db_pool.runInteraction(
- "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
- )
+ # We only acquired the lock if we inserted or updated the table.
+ return bool(txn.rowcount)
- if not did_lock:
- return None
+ did_lock = await self.db_pool.runInteraction(
+ "try_acquire_lock",
+ _try_acquire_lock_txn,
+ # We can autocommit here as we're executing a single query, this
+ # will avoid serialization errors.
+ db_autocommit=True,
+ )
+ if not did_lock:
+ return None
lock = Lock(
self._reactor,
|