diff options
author | Sean Quah <seanq@element.io> | 2022-02-24 13:56:38 +0000 |
---|---|---|
committer | Sean Quah <seanq@element.io> | 2022-03-08 17:11:51 +0000 |
commit | 2b5f3ed4cef1938ae7c95179d533dd7e80a806d5 (patch) | |
tree | 196c61d58afcb8afa9fe7d6cbe06eb08bb205493 | |
parent | Convert `ReadWriteLock` to use async context managers (diff) | |
download | synapse-2b5f3ed4cef1938ae7c95179d533dd7e80a806d5.tar.xz |
Fix clean up when waiting readers or writers are cancelled
Signed-off-by: Sean Quah <seanq@element.io>
-rw-r--r-- | synapse/util/async_helpers.py | 36 |
1 files changed, 27 insertions, 9 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 12a572cdd6..d391b0c2a8 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -528,6 +528,8 @@ class ReadWriteLock: try: # We wait for the latest writer to finish writing. We can safely ignore # any existing readers... as they're readers. + # May raise a `CancelledError` if the `Deferred` wrapping us is + # cancelled. if curr_writer: await make_deferred_yieldable(curr_writer) yield @@ -549,24 +551,40 @@ class ReadWriteLock: if curr_writer: to_wait_on.append(curr_writer) - # We can clear the list of current readers since the new writer waits + # We can clear the list of current readers since `new_defer` waits # for them to finish. curr_readers.clear() self.key_to_current_writer[key] = new_defer @asynccontextmanager async def _ctx_manager() -> AsyncIterator[None]: + to_wait_on_defer = defer.gatherResults(to_wait_on) try: - await make_deferred_yieldable(defer.gatherResults(to_wait_on)) + # Wait for all current readers and the latest writer to finish. + # May raise a `CancelledError` if the `Deferred` wrapping us is + # cancelled. + await make_deferred_yieldable(to_wait_on_defer) yield finally: - with PreserveLoggingContext(): - new_defer.callback(None) - # `self.key_to_current_writer[key]` may be missing if there was another - # writer waiting for us and it completed entirely within the - # `new_defer.callback()` call above. - if self.key_to_current_writer.get(key) == new_defer: - self.key_to_current_writer.pop(key) + + def release() -> None: + with PreserveLoggingContext(): + new_defer.callback(None) + # `self.key_to_current_writer[key]` may be missing if there was another + # writer waiting for us and it completed entirely within the + # `new_defer.callback()` call above. + if self.key_to_current_writer.get(key) == new_defer: + self.key_to_current_writer.pop(key) + + if to_wait_on_defer.called: + release() + else: + # We don't have the lock yet, probably because we were cancelled + # while waiting for it. We can't call `release()` yet, since + # `new_defer` must only resolve once all previous readers and + # writers have finished. + # NB: `release()` won't have a logcontext in this path. + to_wait_on_defer.addCallback(lambda _: release()) return _ctx_manager() |