summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/lock.py36
1 files changed, 22 insertions, 14 deletions
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 54d40e7a3a..5a01ec2137 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -17,7 +17,7 @@ from types import TracebackType
 from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
 from weakref import WeakValueDictionary
 
-from twisted.internet.interfaces import IReactorCore
+from twisted.internet.task import LoopingCall
 
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore
@@ -26,6 +26,7 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
+from synapse.types import ISynapseReactor
 from synapse.util import Clock
 from synapse.util.stringutils import random_string
 
@@ -358,7 +359,7 @@ class Lock:
 
     def __init__(
         self,
-        reactor: IReactorCore,
+        reactor: ISynapseReactor,
         clock: Clock,
         store: LockStore,
         read_write: bool,
@@ -377,19 +378,25 @@ class Lock:
 
         self._table = "worker_read_write_locks" if read_write else "worker_locks"
 
-        self._looping_call = clock.looping_call(
+        # We might be called from a non-main thread, so we defer setting up the
+        # looping call.
+        self._looping_call: Optional[LoopingCall] = None
+        reactor.callFromThread(self._setup_looping_call)
+
+        self._dropped = False
+
+    def _setup_looping_call(self) -> None:
+        self._looping_call = self._clock.looping_call(
             self._renew,
             _RENEWAL_INTERVAL_MS,
-            store,
-            clock,
-            read_write,
-            lock_name,
-            lock_key,
-            token,
+            self._store,
+            self._clock,
+            self._read_write,
+            self._lock_name,
+            self._lock_key,
+            self._token,
         )
 
-        self._dropped = False
-
     @staticmethod
     @wrap_as_background_process("Lock._renew")
     async def _renew(
@@ -459,7 +466,7 @@ class Lock:
         if self._dropped:
             return
 
-        if self._looping_call.running:
+        if self._looping_call and self._looping_call.running:
             self._looping_call.stop()
 
         await self._store.db_pool.simple_delete(
@@ -486,8 +493,9 @@ class Lock:
             # We should not be dropped without the lock being released (unless
             # we're shutting down), but if we are then let's at least stop
             # renewing the lock.
-            if self._looping_call.running:
-                self._looping_call.stop()
+            if self._looping_call and self._looping_call.running:
+                # We might be called from a non-main thread.
+                self._reactor.callFromThread(self._looping_call.stop)
 
             if self._reactor.running:
                 logger.error(