diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 51 |
1 files changed, 29 insertions, 22 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 632b2245ef..753dd6b6a5 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -14,6 +14,7 @@ import logging from typing import ( + TYPE_CHECKING, Awaitable, Callable, Collection, @@ -32,7 +33,6 @@ from prometheus_client import Counter from twisted.internet import defer -import synapse.server from synapse.api.constants import EventTypes, HistoryVisibility, Membership from synapse.api.errors import AuthError from synapse.events import EventBase @@ -53,6 +53,9 @@ from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) notified_events_counter = Counter("synapse_notifier_notified_events", "") @@ -82,7 +85,7 @@ class _NotificationListener: __slots__ = ["deferred"] - def __init__(self, deferred): + def __init__(self, deferred: "defer.Deferred"): self.deferred = deferred @@ -124,7 +127,7 @@ class _NotifierUserStream: stream_key: str, stream_id: Union[int, RoomStreamToken], time_now_ms: int, - ): + ) -> None: """Notify any listeners for this user of a new event from an event source. Args: @@ -135,7 +138,7 @@ class _NotifierUserStream: self.current_token = self.current_token.copy_and_advance(stream_key, stream_id) self.last_notified_token = self.current_token self.last_notified_ms = time_now_ms - noify_deferred = self.notify_deferred + notify_deferred = self.notify_deferred log_kv( { @@ -150,9 +153,9 @@ class _NotifierUserStream: with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) - noify_deferred.callback(self.current_token) + notify_deferred.callback(self.current_token) - def remove(self, notifier: "Notifier"): + def remove(self, notifier: "Notifier") -> None: """Remove this listener from all the indexes in the Notifier it knows about. """ @@ -188,7 +191,7 @@ class EventStreamResult: start_token: StreamToken end_token: StreamToken - def __bool__(self): + def __bool__(self) -> bool: return bool(self.events) @@ -212,7 +215,7 @@ class Notifier: UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 - def __init__(self, hs: "synapse.server.HomeServer"): + def __init__(self, hs: "HomeServer"): self.user_to_user_stream: Dict[str, _NotifierUserStream] = {} self.room_to_user_streams: Dict[str, Set[_NotifierUserStream]] = {} @@ -248,7 +251,7 @@ class Notifier: # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at # most when scraping it. - def count_listeners(): + def count_listeners() -> int: all_user_streams: Set[_NotifierUserStream] = set() for streams in list(self.room_to_user_streams.values()): @@ -270,7 +273,7 @@ class Notifier: "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream) ) - def add_replication_callback(self, cb: Callable[[], None]): + def add_replication_callback(self, cb: Callable[[], None]) -> None: """Add a callback that will be called when some new data is available. Callback is not given any arguments. It should *not* return a Deferred - if it needs to do any asynchronous work, a background thread should be started and @@ -284,7 +287,7 @@ class Notifier: event_pos: PersistedEventPosition, max_room_stream_token: RoomStreamToken, extra_users: Optional[Collection[UserID]] = None, - ): + ) -> None: """Unwraps event and calls `on_new_room_event_args`.""" await self.on_new_room_event_args( event_pos=event_pos, @@ -307,7 +310,7 @@ class Notifier: event_pos: PersistedEventPosition, max_room_stream_token: RoomStreamToken, extra_users: Optional[Collection[UserID]] = None, - ): + ) -> None: """Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -338,7 +341,9 @@ class Notifier: self.notify_replication() - def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken): + def _notify_pending_new_room_events( + self, max_room_stream_token: RoomStreamToken + ) -> None: """Notify for the room events that were queued waiting for a previous event to be persisted. Args: @@ -374,7 +379,7 @@ class Notifier: ) self._on_updated_room_token(max_room_stream_token) - def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken): + def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken) -> None: """Poke services that might care that the room position has been updated. """ @@ -386,13 +391,13 @@ class Notifier: if self.federation_sender: self.federation_sender.notify_new_events(max_room_stream_token) - def _notify_app_services(self, max_room_stream_token: RoomStreamToken): + def _notify_app_services(self, max_room_stream_token: RoomStreamToken) -> None: try: self.appservice_handler.notify_interested_services(max_room_stream_token) except Exception: logger.exception("Error notifying application services of event") - def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): + def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken) -> None: try: self._pusher_pool.on_new_notifications(max_room_stream_token) except Exception: @@ -461,7 +466,9 @@ class Notifier: users, ) except Exception: - logger.exception("Error notifying application services of event") + logger.exception( + "Error notifying application services of ephemeral events" + ) def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happened @@ -473,8 +480,8 @@ class Notifier: user_id: str, timeout: int, callback: Callable[[StreamToken, StreamToken], Awaitable[T]], - room_ids=None, - from_token=StreamToken.START, + room_ids: Optional[Collection[str]] = None, + from_token: StreamToken = StreamToken.START, ) -> T: """Wait until the callback returns a non empty response or the timeout fires. @@ -698,14 +705,14 @@ class Notifier: for expired_stream in expired_streams: expired_stream.remove(self) - def _register_with_keys(self, user_stream: _NotifierUserStream): + def _register_with_keys(self, user_stream: _NotifierUserStream) -> None: self.user_to_user_stream[user_stream.user_id] = user_stream for room in user_stream.rooms: s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) - def _user_joined_room(self, user_id: str, room_id: str): + def _user_joined_room(self, user_id: str, room_id: str) -> None: new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: room_streams = self.room_to_user_streams.setdefault(room_id, set()) @@ -717,7 +724,7 @@ class Notifier: for cb in self.replication_callbacks: cb() - def notify_remote_server_up(self, server: str): + def notify_remote_server_up(self, server: str) -> None: """Notify any replication that a remote server has come back up""" # We call federation_sender directly rather than registering as a # callback as a) we already have a reference to it and b) it introduces |