summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py43
1 files changed, 24 insertions, 19 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 5988c67d90..e0fad2da66 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -14,6 +14,7 @@
 
 import logging
 from typing import (
+    TYPE_CHECKING,
     Awaitable,
     Callable,
     Collection,
@@ -32,7 +33,6 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 
-import synapse.server
 from synapse.api.constants import EventTypes, HistoryVisibility, Membership
 from synapse.api.errors import AuthError
 from synapse.events import EventBase
@@ -53,6 +53,9 @@ from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
 from synapse.util.metrics import Measure
 from synapse.visibility import filter_events_for_client
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 notified_events_counter = Counter("synapse_notifier_notified_events", "")
@@ -82,7 +85,7 @@ class _NotificationListener:
 
     __slots__ = ["deferred"]
 
-    def __init__(self, deferred):
+    def __init__(self, deferred: "defer.Deferred"):
         self.deferred = deferred
 
 
@@ -124,7 +127,7 @@ class _NotifierUserStream:
         stream_key: str,
         stream_id: Union[int, RoomStreamToken],
         time_now_ms: int,
-    ):
+    ) -> None:
         """Notify any listeners for this user of a new event from an
         event source.
         Args:
@@ -152,7 +155,7 @@ class _NotifierUserStream:
             self.notify_deferred = ObservableDeferred(defer.Deferred())
             noify_deferred.callback(self.current_token)
 
-    def remove(self, notifier: "Notifier"):
+    def remove(self, notifier: "Notifier") -> None:
         """Remove this listener from all the indexes in the Notifier
         it knows about.
         """
@@ -188,7 +191,7 @@ class EventStreamResult:
     start_token: StreamToken
     end_token: StreamToken
 
-    def __bool__(self):
+    def __bool__(self) -> bool:
         return bool(self.events)
 
 
@@ -212,7 +215,7 @@ class Notifier:
 
     UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
 
-    def __init__(self, hs: "synapse.server.HomeServer"):
+    def __init__(self, hs: "HomeServer"):
         self.user_to_user_stream: Dict[str, _NotifierUserStream] = {}
         self.room_to_user_streams: Dict[str, Set[_NotifierUserStream]] = {}
 
@@ -248,7 +251,7 @@ class Notifier:
         # This is not a very cheap test to perform, but it's only executed
         # when rendering the metrics page, which is likely once per minute at
         # most when scraping it.
-        def count_listeners():
+        def count_listeners() -> int:
             all_user_streams: Set[_NotifierUserStream] = set()
 
             for streams in list(self.room_to_user_streams.values()):
@@ -270,7 +273,7 @@ class Notifier:
             "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
         )
 
-    def add_replication_callback(self, cb: Callable[[], None]):
+    def add_replication_callback(self, cb: Callable[[], None]) -> None:
         """Add a callback that will be called when some new data is available.
         Callback is not given any arguments. It should *not* return a Deferred - if
         it needs to do any asynchronous work, a background thread should be started and
@@ -284,7 +287,7 @@ class Notifier:
         event_pos: PersistedEventPosition,
         max_room_stream_token: RoomStreamToken,
         extra_users: Optional[Collection[UserID]] = None,
-    ):
+    ) -> None:
         """Unwraps event and calls `on_new_room_event_args`."""
         await self.on_new_room_event_args(
             event_pos=event_pos,
@@ -307,7 +310,7 @@ class Notifier:
         event_pos: PersistedEventPosition,
         max_room_stream_token: RoomStreamToken,
         extra_users: Optional[Collection[UserID]] = None,
-    ):
+    ) -> None:
         """Used by handlers to inform the notifier something has happened
         in the room, room event wise.
 
@@ -338,7 +341,9 @@ class Notifier:
 
         self.notify_replication()
 
-    def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken):
+    def _notify_pending_new_room_events(
+        self, max_room_stream_token: RoomStreamToken
+    ) -> None:
         """Notify for the room events that were queued waiting for a previous
         event to be persisted.
         Args:
@@ -374,7 +379,7 @@ class Notifier:
             )
             self._on_updated_room_token(max_room_stream_token)
 
-    def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken):
+    def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken) -> None:
         """Poke services that might care that the room position has been
         updated.
         """
@@ -386,13 +391,13 @@ class Notifier:
         if self.federation_sender:
             self.federation_sender.notify_new_events(max_room_stream_token)
 
-    def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
+    def _notify_app_services(self, max_room_stream_token: RoomStreamToken) -> None:
         try:
             self.appservice_handler.notify_interested_services(max_room_stream_token)
         except Exception:
             logger.exception("Error notifying application services of event")
 
-    def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
+    def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken) -> None:
         try:
             self._pusher_pool.on_new_notifications(max_room_stream_token)
         except Exception:
@@ -475,8 +480,8 @@ class Notifier:
         user_id: str,
         timeout: int,
         callback: Callable[[StreamToken, StreamToken], Awaitable[T]],
-        room_ids=None,
-        from_token=StreamToken.START,
+        room_ids: Optional[Collection[str]] = None,
+        from_token: StreamToken = StreamToken.START,
     ) -> T:
         """Wait until the callback returns a non empty response or the
         timeout fires.
@@ -700,14 +705,14 @@ class Notifier:
         for expired_stream in expired_streams:
             expired_stream.remove(self)
 
-    def _register_with_keys(self, user_stream: _NotifierUserStream):
+    def _register_with_keys(self, user_stream: _NotifierUserStream) -> None:
         self.user_to_user_stream[user_stream.user_id] = user_stream
 
         for room in user_stream.rooms:
             s = self.room_to_user_streams.setdefault(room, set())
             s.add(user_stream)
 
-    def _user_joined_room(self, user_id: str, room_id: str):
+    def _user_joined_room(self, user_id: str, room_id: str) -> None:
         new_user_stream = self.user_to_user_stream.get(user_id)
         if new_user_stream is not None:
             room_streams = self.room_to_user_streams.setdefault(room_id, set())
@@ -719,7 +724,7 @@ class Notifier:
         for cb in self.replication_callbacks:
             cb()
 
-    def notify_remote_server_up(self, server: str):
+    def notify_remote_server_up(self, server: str) -> None:
         """Notify any replication that a remote server has come back up"""
         # We call federation_sender directly rather than registering as a
         # callback as a) we already have a reference to it and b) it introduces