From 5c75adff951b27744528d9c095f4ff1f8df1f77a Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Tue, 12 May 2015 11:00:37 +0100 Subject: Add a NotifierUserStream to hold all the notification listeners for a user --- synapse/notifier.py | 230 ++++++++++++++++++++++++++-------------------------- 1 file changed, 116 insertions(+), 114 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index abe12b1434..0b50898f3f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -43,28 +43,18 @@ def count(func, l): class _NotificationListener(object): """ This represents a single client connection to the events stream. - The events stream handler will have yielded to the deferred, so to notify the handler it is sufficient to resolve the deferred. - - This listener will also keep track of which rooms it is listening in - so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, deferred, appservice=None): - self.user = user - self.appservice = appservice + def __init__(self, deferred): self.deferred = deferred - self.rooms = rooms - self.timer = None def notified(self): return self.deferred.called - def notify(self, notifier): - """ Inform whoever is listening about the new events. This will - also remove this listener from all the indexes in the Notifier - it knows about. + def notify(self): + """ Inform whoever is listening about the new events. """ try: @@ -72,27 +62,45 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass - # Should the following be done be using intrusively linked lists? - # -- erikj + +class _NotifierUserStream(object): + """This represents a user connected to the event stream. + It tracks the most recent stream token for that user. + At a given point a user may have a number of streams listening for + events. + + This listener will also keep track of which rooms it is listening in + so that it can remove itself from the indexes in the Notifier class. + """ + + def __init__(self, user, rooms, current_token, appservice=None): + self.user = user + self.appservice = appservice + self.listeners = set() + self.rooms = rooms + self.current_token = current_token + + def notify(self, new_token): + for listener in self.listeners: + listener.notify(new_token) + self.listeners.clear() + + def remove(self, notifier): + """ Remove this listener from all the indexes in the Notifier + it knows about. + """ for room in self.rooms: - lst = notifier.room_to_listeners.get(room, set()) + lst = notifier.room_to_user_streams.get(room, set()) lst.discard(self) - notifier.user_to_listeners.get(self.user, set()).discard(self) + notifier.user_to_user_streams.get(self.user, set()).discard(self) if self.appservice: - notifier.appservice_to_listeners.get( + notifier.appservice_to_user_streams.get( self.appservice, set() ).discard(self) - # Cancel the timeout for this notifer if one exists. - if self.timer is not None: - try: - notifier.clock.cancel_call_later(self.timer) - except: - logger.warn("Failed to cancel notifier timer") - class Notifier(object): """ This class is responsible for notifying any listeners when there are @@ -104,11 +112,12 @@ class Notifier(object): def __init__(self, hs): self.hs = hs - self.room_to_listeners = {} - self.user_to_listeners = {} - self.appservice_to_listeners = {} + self.user_to_user_stream = {} + self.room_to_user_streams = {} + self.appservice_to_user_streams = {} self.event_sources = hs.get_event_sources() + self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -120,34 +129,34 @@ class Notifier(object): # when rendering the metrics page, which is likely once per minute at # most when scraping it. def count_listeners(): - all_listeners = set() + all_user_streams = set() - for x in self.room_to_listeners.values(): - all_listeners |= x - for x in self.user_to_listeners.values(): - all_listeners |= x - for x in self.appservice_to_listeners.values(): - all_listeners |= x + for x in self.room_to_user_streams.values(): + all_user_streams |= x + for x in self.user_to_user_streams.values(): + all_user_streams |= x + for x in self.appservice_to_user_streams.values(): + all_user_streams |= x - return len(all_listeners) + return sum(len(stream.listeners) for stream in all_user_streams) metrics.register_callback("listeners", count_listeners) metrics.register_callback( "rooms", - lambda: count(bool, self.room_to_listeners.values()), + lambda: count(bool, self.room_to_user_streams.values()), ) metrics.register_callback( "users", - lambda: count(bool, self.user_to_listeners.values()), + lambda: len(self.user_to_user_stream), ) metrics.register_callback( "appservices", - lambda: count(bool, self.appservice_to_listeners.values()), + lambda: count(bool, self.appservice_to_user_streams.values()), ) @log_function @defer.inlineCallbacks - def on_new_room_event(self, event, extra_users=[]): + def on_new_room_event(self, event, new_token, extra_users=[]): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -155,6 +164,7 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ + assert isinstance(new_token, StreamToken) yield run_on_reactor() # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( @@ -163,72 +173,60 @@ class Notifier(object): room_id = event.room_id - room_listeners = self.room_to_listeners.get(room_id, set()) - - _discard_if_notified(room_listeners) + room_user_streams = self.room_to_user_streams.get(room_id, set()) - listeners = room_listeners.copy() + user_streams = room_user_streams.copy() for user in extra_users: - user_listeners = self.user_to_listeners.get(user, set()) + user_stream = self.user_to_user_stream.get(user) + if user_stream is not None: + user_streams.add(user_stream) - _discard_if_notified(user_listeners) - - listeners |= user_listeners - - for appservice in self.appservice_to_listeners: + for appservice in self.appservice_to_user_streams: # TODO (kegan): Redundant appservice listener checks? - # App services will already be in the room_to_listeners set, but + # App services will already be in the room_to_user_streams set, but # that isn't enough. They need to be checked here in order to # receive *invites* for users they are interested in. Does this - # make the room_to_listeners check somewhat obselete? + # make the room_to_user_streams check somewhat obselete? if appservice.is_interested(event): - app_listeners = self.appservice_to_listeners.get( + app_user_streams = self.appservice_to_user_streams.get( appservice, set() ) + user_streams |= app_user_streams - _discard_if_notified(app_listeners) - - listeners |= app_listeners - - logger.debug("on_new_room_event listeners %s", listeners) + logger.debug("on_new_room_event listeners %s", user_streams) with PreserveLoggingContext(): - for listener in listeners: + for user_stream in user_streams: try: - listener.notify(self) + user_stream.notify(new_token) except: logger.exception("Failed to notify listener") @defer.inlineCallbacks @log_function - def on_new_user_event(self, users=[], rooms=[]): + def on_new_user_event(self, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend presence/user event wise. Will wake up all listeners for the given users and rooms. """ + assert isinstance(new_token, StreamToken) yield run_on_reactor() - listeners = set() + user_streams = set() for user in users: - user_listeners = self.user_to_listeners.get(user, set()) - - _discard_if_notified(user_listeners) - - listeners |= user_listeners + user_stream = self.user_to_user_stream.get(user) + if user_stream: + user_stream.add(user_stream) for room in rooms: - room_listeners = self.room_to_listeners.get(room, set()) - - _discard_if_notified(room_listeners) - - listeners |= room_listeners + user_streams |= self.room_to_user_streams.get(room, set()) with PreserveLoggingContext(): - for listener in listeners: + for user_stream in user_streams: try: - listener.notify(self) + user_streams.notify(new_token) except: logger.exception("Failed to notify listener") @@ -240,21 +238,32 @@ class Notifier(object): """ deferred = defer.Deferred() - appservice = yield self.hs.get_datastore().get_app_service_by_user_id( - user.to_string() - ) - listener = [_NotificationListener( - user=user, - rooms=rooms, - deferred=deferred, - appservice=appservice, - )] + user_stream = self.user_to_user_streams.get(user) + if user_stream is None: + appservice = yield self.store.get_app_service_by_user_id( + user.to_string() + ) + current_token = yield self.event_sources.get_current_token() + user_stream = _NotifierUserStream( + user=user, + rooms=rooms, + appservice=appservice, + current_token=current_token, + ) + self._register_with_keys(user_stream) + else: + current_token = user_stream.current_token + + if timeout and not current_token.is_after(from_token): + listener = [_NotificationListener(deferred)] + user_stream.listeners.add(listener[0]) + + if current_token.is_after(from_token): + result = yield callback(from_token, current_token) + else: + result = None - if timeout: - self._register_with_keys(listener[0]) - - result = yield callback() timer = [None] if timeout: @@ -263,23 +272,19 @@ class Notifier(object): def _timeout_listener(): timed_out[0] = True timer[0] = None - listener[0].notify(self) + listener[0].notify(user_stream) # We create multiple notification listeners so we have to manage # canceling the timeout ourselves. timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) while not result and not timed_out[0]: - yield deferred + new_token = yield deferred deferred = defer.Deferred() - listener[0] = _NotificationListener( - user=user, - rooms=rooms, - deferred=deferred, - appservice=appservice, - ) - self._register_with_keys(listener[0]) - result = yield callback() + listener[0] = _NotificationListener(deferred) + user_stream.listeners.add(listener[0]) + result = yield callback(current_token, new_token) + current_token = new_token if timer[0] is not None: try: @@ -302,7 +307,7 @@ class Notifier(object): limit = pagination_config.limit @defer.inlineCallbacks - def check_for_updates(): + def check_for_updates(start_token, end_token): events = [] end_token = from_token for name, source in self.event_sources.sources.items(): @@ -328,26 +333,23 @@ class Notifier(object): defer.returnValue(result) @log_function - def _register_with_keys(self, listener): - for room in listener.rooms: - s = self.room_to_listeners.setdefault(room, set()) - s.add(listener) + def _register_with_keys(self, user_stream): + self.user_to_user_stream[user_stream.user] = user_stream - self.user_to_listeners.setdefault(listener.user, set()).add(listener) + for room in user_stream.rooms: + s = self.room_to_user_stream.setdefault(room, set()) + s.add(user_stream) - if listener.appservice: - self.appservice_to_listeners.setdefault( - listener.appservice, set() - ).add(listener) + if user_stream.appservice: + self.appservice_to_user_stream.setdefault( + user_stream.appservice, set() + ).add(user_stream) def _user_joined_room(self, user, room_id): - new_listeners = self.user_to_listeners.get(user, set()) - - listeners = self.room_to_listeners.setdefault(room_id, set()) - listeners |= new_listeners - - for l in new_listeners: - l.rooms.add(room_id) + new_user_stream = self.user_to_user_stream.get(user) + room_streams = self.room_to_user_streams.setdefault(room_id, set()) + room_streams.add(new_user_stream) + new_user_stream.rooms.add(room_id) def _discard_if_notified(listener_set): -- cgit 1.4.1