diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 90 |
1 files changed, 74 insertions, 16 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 7121d659d0..78eb28e4b2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext -from synapse.util.async import run_on_reactor from synapse.types import StreamToken import synapse.metrics @@ -59,10 +58,11 @@ class _NotificationListener(object): self.limit = limit self.timeout = timeout self.deferred = deferred - self.rooms = rooms + self.timer = None - self.pending_notifications = [] + def notified(self): + return self.deferred.called def notify(self, notifier, events, start_token, end_token): """ Inform whoever is listening about the new events. This will @@ -78,16 +78,27 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass + # Should the following be done be using intrusively linked lists? + # -- erikj + for room in self.rooms: lst = notifier.room_to_listeners.get(room, set()) lst.discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self) + if self.appservice: notifier.appservice_to_listeners.get( self.appservice, set() ).discard(self) + # Cancel the timeout for this notifer if one exists. + if self.timer is not None: + try: + notifier.clock.cancel_call_later(self.timer) + except: + logger.warn("Failed to cancel notifier timer") + class Notifier(object): """ This class is responsible for notifying any listeners when there are @@ -150,8 +161,6 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ - yield run_on_reactor() - # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event @@ -161,10 +170,18 @@ class Notifier(object): room_source = self.event_sources.sources["room"] - listeners = self.room_to_listeners.get(room_id, set()).copy() + room_listeners = self.room_to_listeners.get(room_id, set()) + + _discard_if_notified(room_listeners) + + listeners = room_listeners.copy() for user in extra_users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + _discard_if_notified(user_listeners) + + listeners |= user_listeners for appservice in self.appservice_to_listeners: # TODO (kegan): Redundant appservice listener checks? @@ -173,9 +190,13 @@ class Notifier(object): # receive *invites* for users they are interested in. Does this # make the room_to_listeners check somewhat obselete? if appservice.is_interested(event): - listeners |= self.appservice_to_listeners.get( + app_listeners = self.appservice_to_listeners.get( appservice, set() - ).copy() + ) + + _discard_if_notified(app_listeners) + + listeners |= app_listeners logger.debug("on_new_room_event listeners %s", listeners) @@ -216,8 +237,6 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ - yield run_on_reactor() - # TODO(paul): This is horrible, having to manually list every event # source here individually presence_source = self.event_sources.sources["presence"] @@ -226,10 +245,18 @@ class Notifier(object): listeners = set() for user in users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + _discard_if_notified(user_listeners) + + listeners |= user_listeners for room in rooms: - listeners |= self.room_to_listeners.get(room, set()).copy() + room_listeners = self.room_to_listeners.get(room, set()) + + _discard_if_notified(room_listeners) + + listeners |= room_listeners @defer.inlineCallbacks def notify(listener): @@ -300,14 +327,20 @@ class Notifier(object): self._register_with_keys(listener[0]) result = yield callback() + timer = [None] + if timeout: timed_out = [False] def _timeout_listener(): timed_out[0] = True + timer[0] = None listener[0].notify(self, [], from_token, from_token) - self.clock.call_later(timeout/1000., _timeout_listener) + # We create multiple notification listeners so we have to manage + # canceling the timeout ourselves. + timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) + while not result and not timed_out[0]: yield deferred deferred = defer.Deferred() @@ -322,6 +355,12 @@ class Notifier(object): self._register_with_keys(listener[0]) result = yield callback() + if timer[0] is not None: + try: + self.clock.cancel_call_later(timer[0]) + except: + logger.exception("Failed to cancel notifer timer") + defer.returnValue(result) def get_events_for(self, user, rooms, pagination_config, timeout): @@ -360,6 +399,8 @@ class Notifier(object): def _timeout_listener(): # TODO (erikj): We should probably set to_token to the current # max rather than reusing from_token. + # Remove the timer from the listener so we don't try to cancel it. + listener.timer = None listener.notify( self, [], @@ -375,8 +416,11 @@ class Notifier(object): if not timeout: _timeout_listener() else: - self.clock.call_later(timeout/1000.0, _timeout_listener) - + # Only add the timer if the listener hasn't been notified + if not listener.notified(): + listener.timer = self.clock.call_later( + timeout/1000.0, _timeout_listener + ) return @log_function @@ -427,3 +471,17 @@ class Notifier(object): listeners = self.room_to_listeners.setdefault(room_id, set()) listeners |= new_listeners + + for l in new_listeners: + l.rooms.add(room_id) + + +def _discard_if_notified(listener_set): + """Remove any 'stale' listeners from the given set. + """ + to_discard = set() + for l in listener_set: + if l.notified(): + to_discard.add(l) + + listener_set -= to_discard |