diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 168 |
1 files changed, 72 insertions, 96 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 078abfc56d..dbd8efe9fb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor +from synapse.util.async import run_on_reactor, ObservableDeferred from synapse.types import StreamToken import synapse.metrics @@ -45,21 +45,11 @@ class _NotificationListener(object): The events stream handler will have yielded to the deferred, so to notify the handler it is sufficient to resolve the deferred. """ + __slots__ = ["deferred"] def __init__(self, deferred): self.deferred = deferred - def notified(self): - return self.deferred.called - - def notify(self, token): - """ Inform whoever is listening about the new events. - """ - try: - self.deferred.callback(token) - except defer.AlreadyCalledError: - pass - class _NotifierUserStream(object): """This represents a user connected to the event stream. @@ -75,11 +65,12 @@ class _NotifierUserStream(object): appservice=None): self.user = str(user) self.appservice = appservice - self.listeners = set() self.rooms = set(rooms) self.current_token = current_token self.last_notified_ms = time_now_ms + self.notify_deferred = ObservableDeferred(defer.Deferred()) + def notify(self, stream_key, stream_id, time_now_ms): """Notify any listeners for this user of a new event from an event source. @@ -91,12 +82,10 @@ class _NotifierUserStream(object): self.current_token = self.current_token.copy_and_advance( stream_key, stream_id ) - if self.listeners: - self.last_notified_ms = time_now_ms - listeners = self.listeners - self.listeners = set() - for listener in listeners: - listener.notify(self.current_token) + self.last_notified_ms = time_now_ms + noify_deferred = self.notify_deferred + self.notify_deferred = ObservableDeferred(defer.Deferred()) + noify_deferred.callback(self.current_token) def remove(self, notifier): """ Remove this listener from all the indexes in the Notifier @@ -114,6 +103,18 @@ class _NotifierUserStream(object): self.appservice, set() ).discard(self) + def count_listeners(self): + return len(self.notify_deferred.observers()) + + def new_listener(self, token): + """Returns a deferred that is resolved when there is a new token + greater than the given token. + """ + if self.current_token.is_after(token): + return _NotificationListener(defer.succeed(self.current_token)) + else: + return _NotificationListener(self.notify_deferred.observe()) + class Notifier(object): """ This class is responsible for notifying any listeners when there are @@ -158,7 +159,7 @@ class Notifier(object): for x in self.appservice_to_user_streams.values(): all_user_streams |= x - return sum(len(stream.listeners) for stream in all_user_streams) + return sum(stream.count_listeners() for stream in all_user_streams) metrics.register_callback("listeners", count_listeners) metrics.register_callback( @@ -220,16 +221,7 @@ class Notifier(object): event ) - room_id = event.room_id - - room_user_streams = self.room_to_user_streams.get(room_id, set()) - - user_streams = room_user_streams.copy() - - for user in extra_users: - user_stream = self.user_to_user_stream.get(str(user)) - if user_stream is not None: - user_streams.add(user_stream) + app_streams = set() for appservice in self.appservice_to_user_streams: # TODO (kegan): Redundant appservice listener checks? @@ -241,24 +233,20 @@ class Notifier(object): app_user_streams = self.appservice_to_user_streams.get( appservice, set() ) - user_streams |= app_user_streams + app_streams |= app_user_streams - logger.debug("on_new_room_event listeners %s", user_streams) - - time_now_ms = self.clock.time_msec() - for user_stream in user_streams: - try: - user_stream.notify( - "room_key", "s%d" % (room_stream_id,), time_now_ms - ) - except: - logger.exception("Failed to notify listener") + self.on_new_event( + "room_key", room_stream_id, + users=extra_users, + rooms=[event.room_id], + extra_streams=app_streams, + ) @defer.inlineCallbacks @log_function - def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]): - """ Used to inform listeners that something has happend - presence/user event wise. + def on_new_event(self, stream_key, new_token, users=[], rooms=[], + extra_streams=set()): + """ Used to inform listeners that something has happend event wise. Will wake up all listeners for the given users and rooms. """ @@ -282,14 +270,10 @@ class Notifier(object): @defer.inlineCallbacks def wait_for_events(self, user, rooms, timeout, callback, - from_token=StreamToken("s0", "0", "0")): + from_token=StreamToken("s0", "0", "0", "0")): """Wait until the callback returns a non empty response or the timeout fires. """ - - deferred = defer.Deferred() - time_now_ms = self.clock.time_msec() - user = str(user) user_stream = self.user_to_user_stream.get(user) if user_stream is None: @@ -302,55 +286,44 @@ class Notifier(object): rooms=rooms, appservice=appservice, current_token=current_token, - time_now_ms=time_now_ms, + time_now_ms=self.clock.time_msec(), ) self._register_with_keys(user_stream) + + result = None + if timeout: + # Will be set to a _NotificationListener that we'll be waiting on. + # Allows us to cancel it. + listener = None + + def timed_out(): + if listener: + listener.deferred.cancel() + timer = self.clock.call_later(timeout/1000., timed_out) + + prev_token = from_token + while not result: + try: + current_token = user_stream.current_token + + result = yield callback(prev_token, current_token) + if result: + break + + # Now we wait for the _NotifierUserStream to be told there + # is a new token. + # We need to supply the token we supplied to callback so + # that we don't miss any current_token updates. + prev_token = current_token + listener = user_stream.new_listener(prev_token) + yield listener.deferred + except defer.CancelledError: + break + + self.clock.cancel_call_later(timer, ignore_errs=True) else: current_token = user_stream.current_token - - listener = [_NotificationListener(deferred)] - - if timeout and not current_token.is_after(from_token): - user_stream.listeners.add(listener[0]) - - if current_token.is_after(from_token): result = yield callback(from_token, current_token) - else: - result = None - - timer = [None] - - if result: - user_stream.listeners.discard(listener[0]) - defer.returnValue(result) - return - - if timeout: - timed_out = [False] - - def _timeout_listener(): - timed_out[0] = True - timer[0] = None - user_stream.listeners.discard(listener[0]) - listener[0].notify(current_token) - - # We create multiple notification listeners so we have to manage - # canceling the timeout ourselves. - timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) - - while not result and not timed_out[0]: - new_token = yield deferred - deferred = defer.Deferred() - listener[0] = _NotificationListener(deferred) - user_stream.listeners.add(listener[0]) - result = yield callback(current_token, new_token) - current_token = new_token - - if timer[0] is not None: - try: - self.clock.cancel_call_later(timer[0]) - except: - logger.exception("Failed to cancel notifer timer") defer.returnValue(result) @@ -368,6 +341,9 @@ class Notifier(object): @defer.inlineCallbacks def check_for_updates(before_token, after_token): + if not after_token.is_after(before_token): + defer.returnValue(None) + events = [] end_token = from_token for name, source in self.event_sources.sources.items(): @@ -376,10 +352,10 @@ class Notifier(object): after_id = getattr(after_token, keyname) if before_id == after_id: continue - stuff, new_key = yield source.get_new_events_for_user( + new_events, new_key = yield source.get_new_events_for_user( user, getattr(from_token, keyname), limit, ) - events.extend(stuff) + events.extend(new_events) end_token = end_token.copy_and_replace(keyname, new_key) if events: @@ -402,7 +378,7 @@ class Notifier(object): expired_streams = [] expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS for stream in self.user_to_user_stream.values(): - if stream.listeners: + if stream.count_listeners(): continue if stream.last_notified_ms < expire_before_ts: expired_streams.append(stream) |