diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index b69f562ca5..bd234549bd 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -15,27 +15,35 @@
import json
import logging
import re
-from typing import Pattern
+import typing
+from typing import Any, Callable, Dict, Generator, Pattern
import attr
from frozendict import frozendict
from twisted.internet import defer, task
+from twisted.internet.defer import Deferred
+from twisted.internet.interfaces import IDelayedCall, IReactorTime
+from twisted.internet.task import LoopingCall
+from twisted.python.failure import Failure
from synapse.logging import context
+if typing.TYPE_CHECKING:
+ pass
+
logger = logging.getLogger(__name__)
_WILDCARD_RUN = re.compile(r"([\?\*]+)")
-def _reject_invalid_json(val):
+def _reject_invalid_json(val: Any) -> None:
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
raise ValueError("Invalid JSON value: '%s'" % val)
-def _handle_frozendict(obj):
+def _handle_frozendict(obj: Any) -> Dict[Any, Any]:
"""Helper for json_encoder. Makes frozendicts serializable by returning
the underlying dict
"""
@@ -60,10 +68,10 @@ json_encoder = json.JSONEncoder(
json_decoder = json.JSONDecoder(parse_constant=_reject_invalid_json)
-def unwrapFirstError(failure):
+def unwrapFirstError(failure: Failure) -> Failure:
# defer.gatherResults and DeferredLists wrap failures.
failure.trap(defer.FirstError)
- return failure.value.subFailure
+ return failure.value.subFailure # type: ignore[union-attr] # Issue in Twisted's annotations
@attr.s(slots=True)
@@ -75,25 +83,25 @@ class Clock:
reactor: The Twisted reactor to use.
"""
- _reactor = attr.ib()
+ _reactor: IReactorTime = attr.ib()
- @defer.inlineCallbacks
- def sleep(self, seconds):
- d = defer.Deferred()
+ @defer.inlineCallbacks # type: ignore[arg-type] # Issue in Twisted's type annotations
+ def sleep(self, seconds: float) -> "Generator[Deferred[float], Any, Any]":
+ d: defer.Deferred[float] = defer.Deferred()
with context.PreserveLoggingContext():
self._reactor.callLater(seconds, d.callback, seconds)
res = yield d
return res
- def time(self):
+ def time(self) -> float:
"""Returns the current system time in seconds since epoch."""
return self._reactor.seconds()
- def time_msec(self):
+ def time_msec(self) -> int:
"""Returns the current system time in milliseconds since epoch."""
return int(self.time() * 1000)
- def looping_call(self, f, msec, *args, **kwargs):
+ def looping_call(self, f: Callable, msec: float, *args, **kwargs) -> LoopingCall:
"""Call a function repeatedly.
Waits `msec` initially before calling `f` for the first time.
@@ -102,8 +110,8 @@ class Clock:
other than trivial, you probably want to wrap it in run_as_background_process.
Args:
- f(function): The function to call repeatedly.
- msec(float): How long to wait between calls in milliseconds.
+ f: The function to call repeatedly.
+ msec: How long to wait between calls in milliseconds.
*args: Postional arguments to pass to function.
**kwargs: Key arguments to pass to function.
"""
@@ -113,7 +121,7 @@ class Clock:
d.addErrback(log_failure, "Looping call died", consumeErrors=False)
return call
- def call_later(self, delay, callback, *args, **kwargs):
+ def call_later(self, delay, callback, *args, **kwargs) -> IDelayedCall:
"""Call something later
Note that the function will be called with no logcontext, so if it is anything
@@ -133,7 +141,7 @@ class Clock:
with context.PreserveLoggingContext():
return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs)
- def cancel_call_later(self, timer, ignore_errs=False):
+ def cancel_call_later(self, timer: IDelayedCall, ignore_errs: bool = False) -> None:
try:
timer.cancel()
except Exception:
|