summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/lock.py87
1 files changed, 35 insertions, 52 deletions
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 1680bf6168..54d40e7a3a 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -26,7 +26,6 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
-from synapse.storage.engines import PostgresEngine
 from synapse.util import Clock
 from synapse.util.stringutils import random_string
 
@@ -96,6 +95,10 @@ class LockStore(SQLBaseStore):
 
         self._acquiring_locks: Set[Tuple[str, str]] = set()
 
+        self._clock.looping_call(
+            self._reap_stale_read_write_locks, _LOCK_TIMEOUT_MS / 10.0
+        )
+
     @wrap_as_background_process("LockStore._on_shutdown")
     async def _on_shutdown(self) -> None:
         """Called when the server is shutting down"""
@@ -216,6 +219,7 @@ class LockStore(SQLBaseStore):
                 lock_name,
                 lock_key,
                 write,
+                db_autocommit=True,
             )
         except self.database_engine.module.IntegrityError:
             return None
@@ -233,61 +237,22 @@ class LockStore(SQLBaseStore):
         # `worker_read_write_locks` and seeing if that fails any
         # constraints. If it doesn't then we have acquired the lock,
         # otherwise we haven't.
-        #
-        # Before that though we clear the table of any stale locks.
 
         now = self._clock.time_msec()
         token = random_string(6)
 
-        delete_sql = """
-            DELETE FROM worker_read_write_locks
-                WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
-        """
-
-        insert_sql = """
-            INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
-            VALUES (?, ?, ?, ?, ?, ?)
-        """
-
-        if isinstance(self.database_engine, PostgresEngine):
-            # For Postgres we can send these queries at the same time.
-            txn.execute(
-                delete_sql + ";" + insert_sql,
-                (
-                    # DELETE args
-                    now - _LOCK_TIMEOUT_MS,
-                    lock_name,
-                    lock_key,
-                    # UPSERT args
-                    lock_name,
-                    lock_key,
-                    write,
-                    self._instance_name,
-                    token,
-                    now,
-                ),
-            )
-        else:
-            # For SQLite these need to be two queries.
-            txn.execute(
-                delete_sql,
-                (
-                    now - _LOCK_TIMEOUT_MS,
-                    lock_name,
-                    lock_key,
-                ),
-            )
-            txn.execute(
-                insert_sql,
-                (
-                    lock_name,
-                    lock_key,
-                    write,
-                    self._instance_name,
-                    token,
-                    now,
-                ),
-            )
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="worker_read_write_locks",
+            values={
+                "lock_name": lock_name,
+                "lock_key": lock_key,
+                "write_lock": write,
+                "instance_name": self._instance_name,
+                "token": token,
+                "last_renewed_ts": now,
+            },
+        )
 
         lock = Lock(
             self._reactor,
@@ -351,6 +316,24 @@ class LockStore(SQLBaseStore):
 
         return locks
 
+    @wrap_as_background_process("_reap_stale_read_write_locks")
+    async def _reap_stale_read_write_locks(self) -> None:
+        delete_sql = """
+            DELETE FROM worker_read_write_locks
+                WHERE last_renewed_ts < ?
+        """
+
+        def reap_stale_read_write_locks_txn(txn: LoggingTransaction) -> None:
+            txn.execute(delete_sql, (self._clock.time_msec() - _LOCK_TIMEOUT_MS,))
+            if txn.rowcount:
+                logger.info("Reaped %d stale locks", txn.rowcount)
+
+        await self.db_pool.runInteraction(
+            "_reap_stale_read_write_locks",
+            reap_stale_read_write_locks_txn,
+            db_autocommit=True,
+        )
+
 
 class Lock:
     """An async context manager that manages an acquired lock, ensuring it is