summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py223
1 files changed, 144 insertions, 79 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py

index bd41f77852..59415f6f88 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py
@@ -15,7 +15,18 @@ import logging from collections import namedtuple -from typing import Callable, Iterable, List, TypeVar +from typing import ( + Awaitable, + Callable, + Dict, + Iterable, + List, + Optional, + Set, + Tuple, + TypeVar, + Union, +) from prometheus_client import Counter @@ -24,12 +35,20 @@ from twisted.internet import defer import synapse.server from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError +from synapse.events import EventBase from synapse.handlers.presence import format_user_presence_state from synapse.logging.context import PreserveLoggingContext from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import StreamToken +from synapse.streams.config import PaginationConfig +from synapse.types import ( + Collection, + PersistedEventPosition, + RoomStreamToken, + StreamToken, + UserID, +) from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client @@ -55,7 +74,7 @@ def count(func: Callable[[T], bool], it: Iterable[T]) -> int: return n -class _NotificationListener(object): +class _NotificationListener: """ This represents a single client connection to the events stream. The events stream handler will have yielded to the deferred, so to notify the handler it is sufficient to resolve the deferred. @@ -67,7 +86,7 @@ class _NotificationListener(object): self.deferred = deferred -class _NotifierUserStream(object): +class _NotifierUserStream: """This represents a user connected to the event stream. It tracks the most recent stream token for that user. At a given point a user may have a number of streams listening for @@ -77,7 +96,13 @@ class _NotifierUserStream(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user_id, rooms, current_token, time_now_ms): + def __init__( + self, + user_id: str, + rooms: Collection[str], + current_token: StreamToken, + time_now_ms: int, + ): self.user_id = user_id self.rooms = set(rooms) self.current_token = current_token @@ -93,13 +118,15 @@ class _NotifierUserStream(object): with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) - def notify(self, stream_key, stream_id, time_now_ms): + def notify( + self, stream_key: str, stream_id: Union[int, RoomStreamToken], time_now_ms: int, + ): """Notify any listeners for this user of a new event from an event source. Args: - stream_key(str): The stream the event came from. - stream_id(str): The new id for the stream the event came from. - time_now_ms(int): The current time in milliseconds. + stream_key: The stream the event came from. + stream_id: The new id for the stream the event came from. + time_now_ms: The current time in milliseconds. """ self.current_token = self.current_token.copy_and_advance(stream_key, stream_id) self.last_notified_token = self.current_token @@ -112,7 +139,7 @@ class _NotifierUserStream(object): self.notify_deferred = ObservableDeferred(defer.Deferred()) noify_deferred.callback(self.current_token) - def remove(self, notifier): + def remove(self, notifier: "Notifier"): """ Remove this listener from all the indexes in the Notifier it knows about. """ @@ -123,10 +150,10 @@ class _NotifierUserStream(object): notifier.user_to_user_stream.pop(self.user_id) - def count_listeners(self): + def count_listeners(self) -> int: return len(self.notify_deferred.observers()) - def new_listener(self, token): + def new_listener(self, token: StreamToken) -> _NotificationListener: """Returns a deferred that is resolved when there is a new token greater than the given token. @@ -136,20 +163,18 @@ class _NotifierUserStream(object): """ # Immediately wake up stream if something has already since happened # since their last token. - if self.last_notified_token.is_after(token): + if self.last_notified_token != token: return _NotificationListener(defer.succeed(self.current_token)) else: return _NotificationListener(self.notify_deferred.observe()) class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))): - def __nonzero__(self): + def __bool__(self): return bool(self.events) - __bool__ = __nonzero__ # python3 - -class Notifier(object): +class Notifier: """ This class is responsible for notifying any listeners when there are new events available for it. @@ -159,14 +184,16 @@ class Notifier(object): UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 def __init__(self, hs: "synapse.server.HomeServer"): - self.user_to_user_stream = {} - self.room_to_user_streams = {} + self.user_to_user_stream = {} # type: Dict[str, _NotifierUserStream] + self.room_to_user_streams = {} # type: Dict[str, Set[_NotifierUserStream]] self.hs = hs self.storage = hs.get_storage() self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() - self.pending_new_room_events = [] + self.pending_new_room_events = ( + [] + ) # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]] # Called when there are new things to stream over replication self.replication_callbacks = [] # type: List[Callable[[], None]] @@ -177,11 +204,11 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self._pusher_pool = hs.get_pusherpool() + self.federation_sender = None if hs.should_send_federation(): self.federation_sender = hs.get_federation_sender() - else: - self.federation_sender = None self.state_handler = hs.get_state_handler() @@ -193,12 +220,12 @@ class Notifier(object): # when rendering the metrics page, which is likely once per minute at # most when scraping it. def count_listeners(): - all_user_streams = set() + all_user_streams = set() # type: Set[_NotifierUserStream] - for x in list(self.room_to_user_streams.values()): - all_user_streams |= x - for x in list(self.user_to_user_stream.values()): - all_user_streams.add(x) + for streams in list(self.room_to_user_streams.values()): + all_user_streams |= streams + for stream in list(self.user_to_user_stream.values()): + all_user_streams.add(stream) return sum(stream.count_listeners() for stream in all_user_streams) @@ -223,7 +250,11 @@ class Notifier(object): self.replication_callbacks.append(cb) def on_new_room_event( - self, event, room_stream_id, max_room_stream_id, extra_users=[] + self, + event: EventBase, + event_pos: PersistedEventPosition, + max_room_stream_token: RoomStreamToken, + extra_users: Collection[UserID] = [], ): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -236,52 +267,81 @@ class Notifier(object): until all previous events have been persisted before notifying the client streams. """ - self.pending_new_room_events.append((room_stream_id, event, extra_users)) - self._notify_pending_new_room_events(max_room_stream_id) + self.pending_new_room_events.append((event_pos, event, extra_users)) + self._notify_pending_new_room_events(max_room_stream_token) self.notify_replication() - def _notify_pending_new_room_events(self, max_room_stream_id): + def _notify_pending_new_room_events(self, max_room_stream_token: RoomStreamToken): """Notify for the room events that were queued waiting for a previous event to be persisted. Args: - max_room_stream_id(int): The highest stream_id below which all + max_room_stream_token: The highest stream_id below which all events have been persisted. """ pending = self.pending_new_room_events self.pending_new_room_events = [] - for room_stream_id, event, extra_users in pending: - if room_stream_id > max_room_stream_id: - self.pending_new_room_events.append( - (room_stream_id, event, extra_users) - ) + + users = set() # type: Set[UserID] + rooms = set() # type: Set[str] + + for event_pos, event, extra_users in pending: + if event_pos.persisted_after(max_room_stream_token): + self.pending_new_room_events.append((event_pos, event, extra_users)) else: - self._on_new_room_event(event, room_stream_id, extra_users) + if ( + event.type == EventTypes.Member + and event.membership == Membership.JOIN + ): + self._user_joined_room(event.state_key, event.room_id) + + users.update(extra_users) + rooms.add(event.room_id) + + if users or rooms: + self.on_new_event( + "room_key", max_room_stream_token, users=users, rooms=rooms, + ) + self._on_updated_room_token(max_room_stream_token) + + def _on_updated_room_token(self, max_room_stream_token: RoomStreamToken): + """Poke services that might care that the room position has been + updated. + """ - def _on_new_room_event(self, event, room_stream_id, extra_users=[]): - """Notify any user streams that are interested in this room event""" # poke any interested application service. run_as_background_process( - "notify_app_services", self._notify_app_services, room_stream_id + "_notify_app_services", self._notify_app_services, max_room_stream_token ) - if self.federation_sender: - self.federation_sender.notify_new_events(room_stream_id) - - if event.type == EventTypes.Member and event.membership == Membership.JOIN: - self._user_joined_room(event.state_key, event.room_id) - - self.on_new_event( - "room_key", room_stream_id, users=extra_users, rooms=[event.room_id] + run_as_background_process( + "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token ) - async def _notify_app_services(self, room_stream_id): + if self.federation_sender: + self.federation_sender.notify_new_events(max_room_stream_token.stream) + + async def _notify_app_services(self, max_room_stream_token: RoomStreamToken): try: - await self.appservice_handler.notify_interested_services(room_stream_id) + await self.appservice_handler.notify_interested_services( + max_room_stream_token.stream + ) except Exception: logger.exception("Error notifying application services of event") - def on_new_event(self, stream_key, new_token, users=[], rooms=[]): + async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken): + try: + await self._pusher_pool.on_new_notifications(max_room_stream_token.stream) + except Exception: + logger.exception("Error pusher pool of event") + + def on_new_event( + self, + stream_key: str, + new_token: Union[int, RoomStreamToken], + users: Collection[UserID] = [], + rooms: Collection[str] = [], + ): """ Used to inform listeners that something has happened event wise. Will wake up all listeners for the given users and rooms. @@ -307,20 +367,25 @@ class Notifier(object): self.notify_replication() - def on_new_replication_data(self): + def on_new_replication_data(self) -> None: """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" self.notify_replication() async def wait_for_events( - self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START - ): + self, + user_id: str, + timeout: int, + callback: Callable[[StreamToken, StreamToken], Awaitable[T]], + room_ids=None, + from_token=StreamToken.START, + ) -> T: """Wait until the callback returns a non empty response or the timeout fires. """ user_stream = self.user_to_user_stream.get(user_id) if user_stream is None: - current_token = await self.event_sources.get_current_token() + current_token = self.event_sources.get_current_token() if room_ids is None: room_ids = await self.store.get_rooms_for_user(user_id) user_stream = _NotifierUserStream( @@ -377,38 +442,38 @@ class Notifier(object): async def get_events_for( self, - user, - pagination_config, - timeout, - only_keys=None, - is_guest=False, - explicit_room_id=None, - ): + user: UserID, + pagination_config: PaginationConfig, + timeout: int, + is_guest: bool = False, + explicit_room_id: str = None, + ) -> EventStreamResult: """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any new events to happen before returning. - If `only_keys` is not None, events from keys will be sent down. - If explicit_room_id is not set, the user's joined rooms will be polled for events. If explicit_room_id is set, that room will be polled for events only if it is world readable or the user has joined the room. """ - from_token = pagination_config.from_token - if not from_token: - from_token = await self.event_sources.get_current_token() + if pagination_config.from_token: + from_token = pagination_config.from_token + else: + from_token = self.event_sources.get_current_token() limit = pagination_config.limit room_ids, is_joined = await self._get_room_ids(user, explicit_room_id) is_peeking = not is_joined - async def check_for_updates(before_token, after_token): - if not after_token.is_after(before_token): + async def check_for_updates( + before_token: StreamToken, after_token: StreamToken + ) -> EventStreamResult: + if after_token == before_token: return EventStreamResult([], (from_token, from_token)) - events = [] + events = [] # type: List[EventBase] end_token = from_token for name, source in self.event_sources.sources.items(): @@ -417,8 +482,6 @@ class Notifier(object): after_id = getattr(after_token, keyname) if before_id == after_id: continue - if only_keys and name not in only_keys: - continue new_events, new_key = await source.get_new_events( user=user, @@ -476,7 +539,9 @@ class Notifier(object): return result - async def _get_room_ids(self, user, explicit_room_id): + async def _get_room_ids( + self, user: UserID, explicit_room_id: Optional[str] + ) -> Tuple[Collection[str], bool]: joined_room_ids = await self.store.get_rooms_for_user(user.to_string()) if explicit_room_id: if explicit_room_id in joined_room_ids: @@ -486,7 +551,7 @@ class Notifier(object): raise AuthError(403, "Non-joined access not allowed") return joined_room_ids, True - async def _is_world_readable(self, room_id): + async def _is_world_readable(self, room_id: str) -> bool: state = await self.state_handler.get_current_state( room_id, EventTypes.RoomHistoryVisibility, "" ) @@ -496,7 +561,7 @@ class Notifier(object): return False @log_function - def remove_expired_streams(self): + def remove_expired_streams(self) -> None: time_now_ms = self.clock.time_msec() expired_streams = [] expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS @@ -510,21 +575,21 @@ class Notifier(object): expired_stream.remove(self) @log_function - def _register_with_keys(self, user_stream): + def _register_with_keys(self, user_stream: _NotifierUserStream): self.user_to_user_stream[user_stream.user_id] = user_stream for room in user_stream.rooms: s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) - def _user_joined_room(self, user_id, room_id): + def _user_joined_room(self, user_id: str, room_id: str): new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams.add(new_user_stream) new_user_stream.rooms.add(room_id) - def notify_replication(self): + def notify_replication(self) -> None: """Notify the any replication listeners that there's a new event""" for cb in self.replication_callbacks: cb()