summary refs log tree commit diff
diff options
context:
space:
mode:
authorSean Quah <seanq@element.io>2022-02-24 13:56:38 +0000
committerSean Quah <seanq@element.io>2022-03-08 17:11:51 +0000
commit2b5f3ed4cef1938ae7c95179d533dd7e80a806d5 (patch)
tree196c61d58afcb8afa9fe7d6cbe06eb08bb205493
parentConvert `ReadWriteLock` to use async context managers (diff)
downloadsynapse-2b5f3ed4cef1938ae7c95179d533dd7e80a806d5.tar.xz
Fix clean up when waiting readers or writers are cancelled
Signed-off-by: Sean Quah <seanq@element.io>
-rw-r--r--synapse/util/async_helpers.py36
1 files changed, 27 insertions, 9 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 12a572cdd6..d391b0c2a8 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -528,6 +528,8 @@ class ReadWriteLock:
             try:
                 # We wait for the latest writer to finish writing. We can safely ignore
                 # any existing readers... as they're readers.
+                # May raise a `CancelledError` if the `Deferred` wrapping us is
+                # cancelled.
                 if curr_writer:
                     await make_deferred_yieldable(curr_writer)
                 yield
@@ -549,24 +551,40 @@ class ReadWriteLock:
         if curr_writer:
             to_wait_on.append(curr_writer)
 
-        # We can clear the list of current readers since the new writer waits
+        # We can clear the list of current readers since `new_defer` waits
         # for them to finish.
         curr_readers.clear()
         self.key_to_current_writer[key] = new_defer
 
         @asynccontextmanager
         async def _ctx_manager() -> AsyncIterator[None]:
+            to_wait_on_defer = defer.gatherResults(to_wait_on)
             try:
-                await make_deferred_yieldable(defer.gatherResults(to_wait_on))
+                # Wait for all current readers and the latest writer to finish.
+                # May raise a `CancelledError` if the `Deferred` wrapping us is
+                # cancelled.
+                await make_deferred_yieldable(to_wait_on_defer)
                 yield
             finally:
-                with PreserveLoggingContext():
-                    new_defer.callback(None)
-                # `self.key_to_current_writer[key]` may be missing if there was another
-                # writer waiting for us and it completed entirely within the
-                # `new_defer.callback()` call above.
-                if self.key_to_current_writer.get(key) == new_defer:
-                    self.key_to_current_writer.pop(key)
+
+                def release() -> None:
+                    with PreserveLoggingContext():
+                        new_defer.callback(None)
+                    # `self.key_to_current_writer[key]` may be missing if there was another
+                    # writer waiting for us and it completed entirely within the
+                    # `new_defer.callback()` call above.
+                    if self.key_to_current_writer.get(key) == new_defer:
+                        self.key_to_current_writer.pop(key)
+
+                if to_wait_on_defer.called:
+                    release()
+                else:
+                    # We don't have the lock yet, probably because we were cancelled
+                    # while waiting for it. We can't call `release()` yet, since
+                    # `new_defer` must only resolve once all previous readers and
+                    # writers have finished.
+                    # NB: `release()` won't have a logcontext in this path.
+                    to_wait_on_defer.addCallback(lambda _: release())
 
         return _ctx_manager()