diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 014db1355b..a3b65aee27 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -23,6 +23,7 @@ from typing import (
Awaitable,
Callable,
Dict,
+ Generic,
Hashable,
Iterable,
List,
@@ -39,6 +40,7 @@ from twisted.internet import defer
from twisted.internet.defer import CancelledError
from twisted.internet.interfaces import IReactorTime
from twisted.python import failure
+from twisted.python.failure import Failure
from synapse.logging.context import (
PreserveLoggingContext,
@@ -49,8 +51,10 @@ from synapse.util import Clock, unwrapFirstError
logger = logging.getLogger(__name__)
+_T = TypeVar("_T")
-class ObservableDeferred:
+
+class ObservableDeferred(Generic[_T]):
"""Wraps a deferred object so that we can add observer deferreds. These
observer deferreds do not affect the callback chain of the original
deferred.
@@ -68,7 +72,7 @@ class ObservableDeferred:
__slots__ = ["_deferred", "_observers", "_result"]
- def __init__(self, deferred: defer.Deferred, consumeErrors: bool = False):
+ def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False):
object.__setattr__(self, "_deferred", deferred)
object.__setattr__(self, "_result", None)
object.__setattr__(self, "_observers", set())
@@ -113,7 +117,7 @@ class ObservableDeferred:
deferred.addCallbacks(callback, errback)
- def observe(self) -> defer.Deferred:
+ def observe(self) -> "defer.Deferred[_T]":
"""Observe the underlying deferred.
This returns a brand new deferred that is resolved when the underlying
@@ -121,7 +125,7 @@ class ObservableDeferred:
effect the underlying deferred.
"""
if not self._result:
- d = defer.Deferred()
+ d: "defer.Deferred[_T]" = defer.Deferred()
def remove(r):
self._observers.discard(d)
@@ -135,7 +139,7 @@ class ObservableDeferred:
success, res = self._result
return defer.succeed(res) if success else defer.fail(res)
- def observers(self) -> List[defer.Deferred]:
+ def observers(self) -> "List[defer.Deferred[_T]]":
return self._observers
def has_called(self) -> bool:
@@ -144,7 +148,7 @@ class ObservableDeferred:
def has_succeeded(self) -> bool:
return self._result is not None and self._result[0] is True
- def get_result(self) -> Any:
+ def get_result(self) -> Union[_T, Failure]:
return self._result[1]
def __getattr__(self, name: str) -> Any:
@@ -415,7 +419,7 @@ class ReadWriteLock:
self.key_to_current_writer: Dict[str, defer.Deferred] = {}
async def read(self, key: str) -> ContextManager:
- new_defer = defer.Deferred()
+ new_defer: "defer.Deferred[None]" = defer.Deferred()
curr_readers = self.key_to_current_readers.setdefault(key, set())
curr_writer = self.key_to_current_writer.get(key, None)
@@ -438,7 +442,7 @@ class ReadWriteLock:
return _ctx_manager()
async def write(self, key: str) -> ContextManager:
- new_defer = defer.Deferred()
+ new_defer: "defer.Deferred[None]" = defer.Deferred()
curr_readers = self.key_to_current_readers.get(key, set())
curr_writer = self.key_to_current_writer.get(key, None)
@@ -471,10 +475,8 @@ R = TypeVar("R")
def timeout_deferred(
- deferred: defer.Deferred,
- timeout: float,
- reactor: IReactorTime,
-) -> defer.Deferred:
+ deferred: "defer.Deferred[_T]", timeout: float, reactor: IReactorTime
+) -> "defer.Deferred[_T]":
"""The in built twisted `Deferred.addTimeout` fails to time out deferreds
that have a canceller that throws exceptions. This method creates a new
deferred that wraps and times out the given deferred, correctly handling
@@ -497,7 +499,7 @@ def timeout_deferred(
Returns:
A new Deferred, which will errback with defer.TimeoutError on timeout.
"""
- new_d = defer.Deferred()
+ new_d: "defer.Deferred[_T]" = defer.Deferred()
timed_out = [False]
|