diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 1680bf6168..54d40e7a3a 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -26,7 +26,6 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
-from synapse.storage.engines import PostgresEngine
from synapse.util import Clock
from synapse.util.stringutils import random_string
@@ -96,6 +95,10 @@ class LockStore(SQLBaseStore):
self._acquiring_locks: Set[Tuple[str, str]] = set()
+ self._clock.looping_call(
+ self._reap_stale_read_write_locks, _LOCK_TIMEOUT_MS / 10.0
+ )
+
@wrap_as_background_process("LockStore._on_shutdown")
async def _on_shutdown(self) -> None:
"""Called when the server is shutting down"""
@@ -216,6 +219,7 @@ class LockStore(SQLBaseStore):
lock_name,
lock_key,
write,
+ db_autocommit=True,
)
except self.database_engine.module.IntegrityError:
return None
@@ -233,61 +237,22 @@ class LockStore(SQLBaseStore):
# `worker_read_write_locks` and seeing if that fails any
# constraints. If it doesn't then we have acquired the lock,
# otherwise we haven't.
- #
- # Before that though we clear the table of any stale locks.
now = self._clock.time_msec()
token = random_string(6)
- delete_sql = """
- DELETE FROM worker_read_write_locks
- WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
- """
-
- insert_sql = """
- INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
- VALUES (?, ?, ?, ?, ?, ?)
- """
-
- if isinstance(self.database_engine, PostgresEngine):
- # For Postgres we can send these queries at the same time.
- txn.execute(
- delete_sql + ";" + insert_sql,
- (
- # DELETE args
- now - _LOCK_TIMEOUT_MS,
- lock_name,
- lock_key,
- # UPSERT args
- lock_name,
- lock_key,
- write,
- self._instance_name,
- token,
- now,
- ),
- )
- else:
- # For SQLite these need to be two queries.
- txn.execute(
- delete_sql,
- (
- now - _LOCK_TIMEOUT_MS,
- lock_name,
- lock_key,
- ),
- )
- txn.execute(
- insert_sql,
- (
- lock_name,
- lock_key,
- write,
- self._instance_name,
- token,
- now,
- ),
- )
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="worker_read_write_locks",
+ values={
+ "lock_name": lock_name,
+ "lock_key": lock_key,
+ "write_lock": write,
+ "instance_name": self._instance_name,
+ "token": token,
+ "last_renewed_ts": now,
+ },
+ )
lock = Lock(
self._reactor,
@@ -351,6 +316,24 @@ class LockStore(SQLBaseStore):
return locks
+ @wrap_as_background_process("_reap_stale_read_write_locks")
+ async def _reap_stale_read_write_locks(self) -> None:
+ delete_sql = """
+ DELETE FROM worker_read_write_locks
+ WHERE last_renewed_ts < ?
+ """
+
+ def reap_stale_read_write_locks_txn(txn: LoggingTransaction) -> None:
+ txn.execute(delete_sql, (self._clock.time_msec() - _LOCK_TIMEOUT_MS,))
+ if txn.rowcount:
+ logger.info("Reaped %d stale locks", txn.rowcount)
+
+ await self.db_pool.runInteraction(
+ "_reap_stale_read_write_locks",
+ reap_stale_read_write_locks_txn,
+ db_autocommit=True,
+ )
+
class Lock:
"""An async context manager that manages an acquired lock, ensuring it is
|