summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/16061.misc1
-rw-r--r--synapse/storage/databases/main/lock.py87
-rw-r--r--tests/storage/databases/main/test_lock.py7
3 files changed, 40 insertions, 55 deletions
diff --git a/changelog.d/16061.misc b/changelog.d/16061.misc
new file mode 100644
index 0000000000..37928b670f
--- /dev/null
+++ b/changelog.d/16061.misc
@@ -0,0 +1 @@
+Fix database performance of read/write worker locks.
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 1680bf6168..54d40e7a3a 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -26,7 +26,6 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
-from synapse.storage.engines import PostgresEngine
 from synapse.util import Clock
 from synapse.util.stringutils import random_string
 
@@ -96,6 +95,10 @@ class LockStore(SQLBaseStore):
 
         self._acquiring_locks: Set[Tuple[str, str]] = set()
 
+        self._clock.looping_call(
+            self._reap_stale_read_write_locks, _LOCK_TIMEOUT_MS / 10.0
+        )
+
     @wrap_as_background_process("LockStore._on_shutdown")
     async def _on_shutdown(self) -> None:
         """Called when the server is shutting down"""
@@ -216,6 +219,7 @@ class LockStore(SQLBaseStore):
                 lock_name,
                 lock_key,
                 write,
+                db_autocommit=True,
             )
         except self.database_engine.module.IntegrityError:
             return None
@@ -233,61 +237,22 @@ class LockStore(SQLBaseStore):
         # `worker_read_write_locks` and seeing if that fails any
         # constraints. If it doesn't then we have acquired the lock,
         # otherwise we haven't.
-        #
-        # Before that though we clear the table of any stale locks.
 
         now = self._clock.time_msec()
         token = random_string(6)
 
-        delete_sql = """
-            DELETE FROM worker_read_write_locks
-                WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
-        """
-
-        insert_sql = """
-            INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
-            VALUES (?, ?, ?, ?, ?, ?)
-        """
-
-        if isinstance(self.database_engine, PostgresEngine):
-            # For Postgres we can send these queries at the same time.
-            txn.execute(
-                delete_sql + ";" + insert_sql,
-                (
-                    # DELETE args
-                    now - _LOCK_TIMEOUT_MS,
-                    lock_name,
-                    lock_key,
-                    # UPSERT args
-                    lock_name,
-                    lock_key,
-                    write,
-                    self._instance_name,
-                    token,
-                    now,
-                ),
-            )
-        else:
-            # For SQLite these need to be two queries.
-            txn.execute(
-                delete_sql,
-                (
-                    now - _LOCK_TIMEOUT_MS,
-                    lock_name,
-                    lock_key,
-                ),
-            )
-            txn.execute(
-                insert_sql,
-                (
-                    lock_name,
-                    lock_key,
-                    write,
-                    self._instance_name,
-                    token,
-                    now,
-                ),
-            )
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="worker_read_write_locks",
+            values={
+                "lock_name": lock_name,
+                "lock_key": lock_key,
+                "write_lock": write,
+                "instance_name": self._instance_name,
+                "token": token,
+                "last_renewed_ts": now,
+            },
+        )
 
         lock = Lock(
             self._reactor,
@@ -351,6 +316,24 @@ class LockStore(SQLBaseStore):
 
         return locks
 
+    @wrap_as_background_process("_reap_stale_read_write_locks")
+    async def _reap_stale_read_write_locks(self) -> None:
+        delete_sql = """
+            DELETE FROM worker_read_write_locks
+                WHERE last_renewed_ts < ?
+        """
+
+        def reap_stale_read_write_locks_txn(txn: LoggingTransaction) -> None:
+            txn.execute(delete_sql, (self._clock.time_msec() - _LOCK_TIMEOUT_MS,))
+            if txn.rowcount:
+                logger.info("Reaped %d stale locks", txn.rowcount)
+
+        await self.db_pool.runInteraction(
+            "_reap_stale_read_write_locks",
+            reap_stale_read_write_locks_txn,
+            db_autocommit=True,
+        )
+
 
 class Lock:
     """An async context manager that manages an acquired lock, ensuring it is
diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py
index 383da83dfb..f541f1d6be 100644
--- a/tests/storage/databases/main/test_lock.py
+++ b/tests/storage/databases/main/test_lock.py
@@ -12,13 +12,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+
 from twisted.internet import defer, reactor
 from twisted.internet.base import ReactorBase
 from twisted.internet.defer import Deferred
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.server import HomeServer
-from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS
+from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS, _RENEWAL_INTERVAL_MS
 from synapse.util import Clock
 
 from tests import unittest
@@ -380,8 +381,8 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase):
         self.get_success(lock.__aenter__())
 
         # Wait for ages with the lock, we should not be able to get the lock.
-        self.reactor.advance(5 * _LOCK_TIMEOUT_MS / 1000)
-        self.pump()
+        for _ in range(0, 10):
+            self.reactor.advance((_RENEWAL_INTERVAL_MS / 1000))
 
         lock2 = self.get_success(
             self.store.try_acquire_read_write_lock("name", "key", write=True)