summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/11262.bugfix1
-rw-r--r--synapse/federation/federation_server.py5
-rw-r--r--synapse/storage/databases/main/lock.py31
3 files changed, 27 insertions, 10 deletions
diff --git a/changelog.d/11262.bugfix b/changelog.d/11262.bugfix
new file mode 100644
index 0000000000..768fbb8973
--- /dev/null
+++ b/changelog.d/11262.bugfix
@@ -0,0 +1 @@
+Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 42e3acecb4..9a8758e9a6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -213,6 +213,11 @@ class FederationServer(FederationBase):
             self._started_handling_of_staged_events = True
             self._handle_old_staged_events()
 
+            # Start a periodic check for old staged events. This is to handle
+            # the case where locks time out, e.g. if another process gets killed
+            # without dropping its locks.
+            self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)
+
         # keep this as early as possible to make the calculated origin ts as
         # accurate as possible.
         request_time = self._clock.time_msec()
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 3d1dff660b..3d0df0cbd4 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -14,6 +14,7 @@
 import logging
 from types import TracebackType
 from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type
+from weakref import WeakValueDictionary
 
 from twisted.internet.interfaces import IReactorCore
 
@@ -61,7 +62,7 @@ class LockStore(SQLBaseStore):
 
         # A map from `(lock_name, lock_key)` to the token of any locks that we
         # think we currently hold.
-        self._live_tokens: Dict[Tuple[str, str], str] = {}
+        self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary()
 
         # When we shut down we want to remove the locks. Technically this can
         # lead to a race, as we may drop the lock while we are still processing.
@@ -80,10 +81,10 @@ class LockStore(SQLBaseStore):
 
         # We need to take a copy of the tokens dict as dropping the locks will
         # cause the dictionary to change.
-        tokens = dict(self._live_tokens)
+        locks = dict(self._live_tokens)
 
-        for (lock_name, lock_key), token in tokens.items():
-            await self._drop_lock(lock_name, lock_key, token)
+        for lock in locks.values():
+            await lock.release()
 
         logger.info("Dropped locks due to shutdown")
 
@@ -93,6 +94,11 @@ class LockStore(SQLBaseStore):
         used (otherwise the lock will leak).
         """
 
+        # Check if this process has taken out a lock and if it's still valid.
+        lock = self._live_tokens.get((lock_name, lock_key))
+        if lock and await lock.is_still_valid():
+            return None
+
         now = self._clock.time_msec()
         token = random_string(6)
 
@@ -100,7 +106,9 @@ class LockStore(SQLBaseStore):
 
             def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
                 # We take out the lock if either a) there is no row for the lock
-                # already or b) the existing row has timed out.
+                # already, b) the existing row has timed out, or c) the row is
+                # for this instance (which means the process got killed and
+                # restarted)
                 sql = """
                     INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
                     VALUES (?, ?, ?, ?, ?)
@@ -112,6 +120,7 @@ class LockStore(SQLBaseStore):
                             last_renewed_ts = EXCLUDED.last_renewed_ts
                         WHERE
                             worker_locks.last_renewed_ts < ?
+                            OR worker_locks.instance_name = EXCLUDED.instance_name
                 """
                 txn.execute(
                     sql,
@@ -148,11 +157,11 @@ class LockStore(SQLBaseStore):
                     WHERE
                         lock_name = ?
                         AND lock_key = ?
-                        AND last_renewed_ts < ?
+                        AND (last_renewed_ts < ? OR instance_name = ?)
                 """
                 txn.execute(
                     sql,
-                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS),
+                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
                 )
 
                 inserted = self.db_pool.simple_upsert_txn_emulated(
@@ -179,9 +188,7 @@ class LockStore(SQLBaseStore):
             if not did_lock:
                 return None
 
-        self._live_tokens[(lock_name, lock_key)] = token
-
-        return Lock(
+        lock = Lock(
             self._reactor,
             self._clock,
             self,
@@ -190,6 +197,10 @@ class LockStore(SQLBaseStore):
             token=token,
         )
 
+        self._live_tokens[(lock_name, lock_key)] = lock
+
+        return lock
+
     async def _is_lock_still_valid(
         self, lock_name: str, lock_key: str, token: str
     ) -> bool: