From 65f5e4e3e43ee471ee0a8c6989bbf60cb3be2c95 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 13:31:06 +0100 Subject: Add paranoia checks to make sure that we evict stale NotificationListeners when we are about to process them --- synapse/notifier.py | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index b12b54353e..ce9b0d2187 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -62,7 +62,8 @@ class _NotificationListener(object): self.rooms = rooms - self.pending_notifications = [] + def notified(self): + return self.deferred.called def notify(self, notifier, events, start_token, end_token): """ Inform whoever is listening about the new events. This will @@ -78,11 +79,15 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass + # Should the following be done be using intrusively linked lists? + # -- erikj + for room in self.rooms: lst = notifier.room_to_listeners.get(room, set()) lst.discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self) + if self.appservice: notifier.appservice_to_listeners.get( self.appservice, set() @@ -161,10 +166,24 @@ class Notifier(object): room_source = self.event_sources.sources["room"] - listeners = self.room_to_listeners.get(room_id, set()).copy() + room_listeners = self.room_to_listeners.get(room_id, set()) + + # Remove any 'stale' listeners. + for l in room_listeners.copy(): + if l.notified(): + room_listeners.discard(l) + + listeners = room_listeners.copy() for user in extra_users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + # Remove any 'stale' listeners. + for l in user_listeners.copy(): + if l.notified(): + user_listeners.discard(l) + + listeners |= user_listeners for appservice in self.appservice_to_listeners: # TODO (kegan): Redundant appservice listener checks? @@ -173,9 +192,16 @@ class Notifier(object): # receive *invites* for users they are interested in. Does this # make the room_to_listeners check somewhat obselete? if appservice.is_interested(event): - listeners |= self.appservice_to_listeners.get( + app_listeners = self.appservice_to_listeners.get( appservice, set() - ).copy() + ) + + # Remove any 'stale' listeners. + for l in app_listeners.copy(): + if l.notified(): + app_listeners.discard(l) + + listeners |= app_listeners logger.debug("on_new_room_event listeners %s", listeners) -- cgit 1.4.1