diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 54d40e7a3a..5a01ec2137 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -17,7 +17,7 @@ from types import TracebackType
from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
from weakref import WeakValueDictionary
-from twisted.internet.interfaces import IReactorCore
+from twisted.internet.task import LoopingCall
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore
@@ -26,6 +26,7 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
+from synapse.types import ISynapseReactor
from synapse.util import Clock
from synapse.util.stringutils import random_string
@@ -358,7 +359,7 @@ class Lock:
def __init__(
self,
- reactor: IReactorCore,
+ reactor: ISynapseReactor,
clock: Clock,
store: LockStore,
read_write: bool,
@@ -377,19 +378,25 @@ class Lock:
self._table = "worker_read_write_locks" if read_write else "worker_locks"
- self._looping_call = clock.looping_call(
+ # We might be called from a non-main thread, so we defer setting up the
+ # looping call.
+ self._looping_call: Optional[LoopingCall] = None
+ reactor.callFromThread(self._setup_looping_call)
+
+ self._dropped = False
+
+ def _setup_looping_call(self) -> None:
+ self._looping_call = self._clock.looping_call(
self._renew,
_RENEWAL_INTERVAL_MS,
- store,
- clock,
- read_write,
- lock_name,
- lock_key,
- token,
+ self._store,
+ self._clock,
+ self._read_write,
+ self._lock_name,
+ self._lock_key,
+ self._token,
)
- self._dropped = False
-
@staticmethod
@wrap_as_background_process("Lock._renew")
async def _renew(
@@ -459,7 +466,7 @@ class Lock:
if self._dropped:
return
- if self._looping_call.running:
+ if self._looping_call and self._looping_call.running:
self._looping_call.stop()
await self._store.db_pool.simple_delete(
@@ -486,8 +493,9 @@ class Lock:
# We should not be dropped without the lock being released (unless
# we're shutting down), but if we are then let's at least stop
# renewing the lock.
- if self._looping_call.running:
- self._looping_call.stop()
+ if self._looping_call and self._looping_call.running:
+ # We might be called from a non-main thread.
+ self._reactor.callFromThread(self._looping_call.stop)
if self._reactor.running:
logger.error(
|