summary refs log tree commit diff
path: root/synapse/util/async_helpers.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/async_helpers.py')
-rw-r--r--synapse/util/async_helpers.py39
1 files changed, 36 insertions, 3 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index f1c46836b1..804dbca443 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -13,12 +13,16 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 import collections
 import logging
 from contextlib import contextmanager
+from typing import Dict, Sequence, Set, Union
 
 from six.moves import range
 
+import attr
+
 from twisted.internet import defer
 from twisted.internet.defer import CancelledError
 from twisted.python import failure
@@ -213,7 +217,9 @@ class Linearizer(object):
         # the first element is the number of things executing, and
         # the second element is an OrderedDict, where the keys are deferreds for the
         # things blocked from executing.
-        self.key_to_defer = {}
+        self.key_to_defer = (
+            {}
+        )  # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
 
     def queue(self, key):
         # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
@@ -340,10 +346,10 @@ class ReadWriteLock(object):
 
     def __init__(self):
         # Latest readers queued
-        self.key_to_current_readers = {}
+        self.key_to_current_readers = {}  # type: Dict[str, Set[defer.Deferred]]
 
         # Latest writer queued
-        self.key_to_current_writer = {}
+        self.key_to_current_writer = {}  # type: Dict[str, defer.Deferred]
 
     @defer.inlineCallbacks
     def read(self, key):
@@ -479,3 +485,30 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
     deferred.addCallbacks(success_cb, failure_cb)
 
     return new_d
+
+
+@attr.s(slots=True, frozen=True)
+class DoneAwaitable(object):
+    """Simple awaitable that returns the provided value.
+    """
+
+    value = attr.ib()
+
+    def __await__(self):
+        return self
+
+    def __iter__(self):
+        return self
+
+    def __next__(self):
+        raise StopIteration(self.value)
+
+
+def maybe_awaitable(value):
+    """Convert a value to an awaitable if not already an awaitable.
+    """
+
+    if hasattr(value, "__await__"):
+        return value
+
+    return DoneAwaitable(value)