diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 4b4227003d..bdefa7f26f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -26,7 +26,14 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.replication.tcp.streams import TypingStream
from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID
+from synapse.types import (
+ JsonDict,
+ JsonMapping,
+ Requester,
+ StrCollection,
+ StreamKeyType,
+ UserID,
+)
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter
@@ -487,7 +494,7 @@ class TypingWriterHandler(FollowerTypingHandler):
raise Exception("Typing writer instance got typing info over replication")
-class TypingNotificationEventSource(EventSource[int, JsonDict]):
+class TypingNotificationEventSource(EventSource[int, JsonMapping]):
def __init__(self, hs: "HomeServer"):
self._main_store = hs.get_datastores().main
self.clock = hs.get_clock()
@@ -497,7 +504,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
#
self.get_typing_handler = hs.get_typing_handler
- def _make_event_for(self, room_id: str) -> JsonDict:
+ def _make_event_for(self, room_id: str) -> JsonMapping:
typing = self.get_typing_handler()._room_typing[room_id]
return {
"type": EduTypes.TYPING,
@@ -507,7 +514,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
async def get_new_events_as(
self, from_key: int, service: ApplicationService
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[List[JsonMapping], int]:
"""Returns a set of new typing events that an appservice
may be interested in.
@@ -551,7 +558,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
room_ids: Iterable[str],
is_guest: bool,
explicit_room_id: Optional[str] = None,
- ) -> Tuple[List[JsonDict], int]:
+ ) -> Tuple[List[JsonMapping], int]:
with Measure(self.clock, "typing.get_new_events"):
from_key = int(from_key)
handler = self.get_typing_handler()
|