diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index f562770922..dfefbd996d 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -20,6 +20,7 @@ from contextlib import contextmanager
from typing import Dict, Sequence, Set, Union
import attr
+from typing_extensions import ContextManager
from twisted.internet import defer
from twisted.internet.defer import CancelledError
@@ -338,11 +339,11 @@ class Linearizer(object):
class ReadWriteLock(object):
- """A deferred style read write lock.
+ """An async read write lock.
Example:
- with (yield read_write_lock.read("test_key")):
+ with await read_write_lock.read("test_key"):
# do some work
"""
@@ -365,8 +366,7 @@ class ReadWriteLock(object):
# Latest writer queued
self.key_to_current_writer = {} # type: Dict[str, defer.Deferred]
- @defer.inlineCallbacks
- def read(self, key):
+ async def read(self, key: str) -> ContextManager:
new_defer = defer.Deferred()
curr_readers = self.key_to_current_readers.setdefault(key, set())
@@ -376,7 +376,8 @@ class ReadWriteLock(object):
# We wait for the latest writer to finish writing. We can safely ignore
# any existing readers... as they're readers.
- yield make_deferred_yieldable(curr_writer)
+ if curr_writer:
+ await make_deferred_yieldable(curr_writer)
@contextmanager
def _ctx_manager():
@@ -388,8 +389,7 @@ class ReadWriteLock(object):
return _ctx_manager()
- @defer.inlineCallbacks
- def write(self, key):
+ async def write(self, key: str) -> ContextManager:
new_defer = defer.Deferred()
curr_readers = self.key_to_current_readers.get(key, set())
@@ -405,7 +405,7 @@ class ReadWriteLock(object):
curr_readers.clear()
self.key_to_current_writer[key] = new_defer
- yield make_deferred_yieldable(defer.gatherResults(to_wait_on))
+ await make_deferred_yieldable(defer.gatherResults(to_wait_on))
@contextmanager
def _ctx_manager():
|