summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/notifier.py36
1 files changed, 31 insertions, 5 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index b12b54353e..ce9b0d2187 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -62,7 +62,8 @@ class _NotificationListener(object):
 
         self.rooms = rooms
 
-        self.pending_notifications = []
+    def notified(self):
+        return self.deferred.called
 
     def notify(self, notifier, events, start_token, end_token):
         """ Inform whoever is listening about the new events. This will
@@ -78,11 +79,15 @@ class _NotificationListener(object):
         except defer.AlreadyCalledError:
             pass
 
+        # Should the following be done be using intrusively linked lists?
+        # -- erikj
+
         for room in self.rooms:
             lst = notifier.room_to_listeners.get(room, set())
             lst.discard(self)
 
         notifier.user_to_listeners.get(self.user, set()).discard(self)
+
         if self.appservice:
             notifier.appservice_to_listeners.get(
                 self.appservice, set()
@@ -161,10 +166,24 @@ class Notifier(object):
 
         room_source = self.event_sources.sources["room"]
 
-        listeners = self.room_to_listeners.get(room_id, set()).copy()
+        room_listeners = self.room_to_listeners.get(room_id, set())
+
+        # Remove any 'stale' listeners.
+        for l in room_listeners.copy():
+            if l.notified():
+                room_listeners.discard(l)
+
+        listeners = room_listeners.copy()
 
         for user in extra_users:
-            listeners |= self.user_to_listeners.get(user, set()).copy()
+            user_listeners = self.user_to_listeners.get(user, set())
+
+            # Remove any 'stale' listeners.
+            for l in user_listeners.copy():
+                if l.notified():
+                    user_listeners.discard(l)
+
+            listeners |= user_listeners
 
         for appservice in self.appservice_to_listeners:
             # TODO (kegan): Redundant appservice listener checks?
@@ -173,9 +192,16 @@ class Notifier(object):
             # receive *invites* for users they are interested in. Does this
             # make the room_to_listeners check somewhat obselete?
             if appservice.is_interested(event):
-                listeners |= self.appservice_to_listeners.get(
+                app_listeners = self.appservice_to_listeners.get(
                     appservice, set()
-                ).copy()
+                )
+
+                # Remove any 'stale' listeners.
+                for l in app_listeners.copy():
+                    if l.notified():
+                        app_listeners.discard(l)
+
+                listeners |= app_listeners
 
         logger.debug("on_new_room_event listeners %s", listeners)