summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py90
1 files changed, 74 insertions, 16 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 7121d659d0..78eb28e4b2 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
 
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
-from synapse.util.async import run_on_reactor
 from synapse.types import StreamToken
 import synapse.metrics
 
@@ -59,10 +58,11 @@ class _NotificationListener(object):
         self.limit = limit
         self.timeout = timeout
         self.deferred = deferred
-
         self.rooms = rooms
+        self.timer = None
 
-        self.pending_notifications = []
+    def notified(self):
+        return self.deferred.called
 
     def notify(self, notifier, events, start_token, end_token):
         """ Inform whoever is listening about the new events. This will
@@ -78,16 +78,27 @@ class _NotificationListener(object):
         except defer.AlreadyCalledError:
             pass
 
+        # Should the following be done be using intrusively linked lists?
+        # -- erikj
+
         for room in self.rooms:
             lst = notifier.room_to_listeners.get(room, set())
             lst.discard(self)
 
         notifier.user_to_listeners.get(self.user, set()).discard(self)
+
         if self.appservice:
             notifier.appservice_to_listeners.get(
                 self.appservice, set()
             ).discard(self)
 
+        # Cancel the timeout for this notifer if one exists.
+        if self.timer is not None:
+            try:
+                notifier.clock.cancel_call_later(self.timer)
+            except:
+                logger.warn("Failed to cancel notifier timer")
+
 
 class Notifier(object):
     """ This class is responsible for notifying any listeners when there are
@@ -150,8 +161,6 @@ class Notifier(object):
         listening to the room, and any listeners for the users in the
         `extra_users` param.
         """
-        yield run_on_reactor()
-
         # poke any interested application service.
         self.hs.get_handlers().appservice_handler.notify_interested_services(
             event
@@ -161,10 +170,18 @@ class Notifier(object):
 
         room_source = self.event_sources.sources["room"]
 
-        listeners = self.room_to_listeners.get(room_id, set()).copy()
+        room_listeners = self.room_to_listeners.get(room_id, set())
+
+        _discard_if_notified(room_listeners)
+
+        listeners = room_listeners.copy()
 
         for user in extra_users:
-            listeners |= self.user_to_listeners.get(user, set()).copy()
+            user_listeners = self.user_to_listeners.get(user, set())
+
+            _discard_if_notified(user_listeners)
+
+            listeners |= user_listeners
 
         for appservice in self.appservice_to_listeners:
             # TODO (kegan): Redundant appservice listener checks?
@@ -173,9 +190,13 @@ class Notifier(object):
             # receive *invites* for users they are interested in. Does this
             # make the room_to_listeners check somewhat obselete?
             if appservice.is_interested(event):
-                listeners |= self.appservice_to_listeners.get(
+                app_listeners = self.appservice_to_listeners.get(
                     appservice, set()
-                ).copy()
+                )
+
+                _discard_if_notified(app_listeners)
+
+                listeners |= app_listeners
 
         logger.debug("on_new_room_event listeners %s", listeners)
 
@@ -216,8 +237,6 @@ class Notifier(object):
 
         Will wake up all listeners for the given users and rooms.
         """
-        yield run_on_reactor()
-
         # TODO(paul): This is horrible, having to manually list every event
         # source here individually
         presence_source = self.event_sources.sources["presence"]
@@ -226,10 +245,18 @@ class Notifier(object):
         listeners = set()
 
         for user in users:
-            listeners |= self.user_to_listeners.get(user, set()).copy()
+            user_listeners = self.user_to_listeners.get(user, set())
+
+            _discard_if_notified(user_listeners)
+
+            listeners |= user_listeners
 
         for room in rooms:
-            listeners |= self.room_to_listeners.get(room, set()).copy()
+            room_listeners = self.room_to_listeners.get(room, set())
+
+            _discard_if_notified(room_listeners)
+
+            listeners |= room_listeners
 
         @defer.inlineCallbacks
         def notify(listener):
@@ -300,14 +327,20 @@ class Notifier(object):
             self._register_with_keys(listener[0])
 
         result = yield callback()
+        timer = [None]
+
         if timeout:
             timed_out = [False]
 
             def _timeout_listener():
                 timed_out[0] = True
+                timer[0] = None
                 listener[0].notify(self, [], from_token, from_token)
 
-            self.clock.call_later(timeout/1000., _timeout_listener)
+            # We create multiple notification listeners so we have to manage
+            # canceling the timeout ourselves.
+            timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
+
             while not result and not timed_out[0]:
                 yield deferred
                 deferred = defer.Deferred()
@@ -322,6 +355,12 @@ class Notifier(object):
                 self._register_with_keys(listener[0])
                 result = yield callback()
 
+        if timer[0] is not None:
+            try:
+                self.clock.cancel_call_later(timer[0])
+            except:
+                logger.exception("Failed to cancel notifer timer")
+
         defer.returnValue(result)
 
     def get_events_for(self, user, rooms, pagination_config, timeout):
@@ -360,6 +399,8 @@ class Notifier(object):
         def _timeout_listener():
             # TODO (erikj): We should probably set to_token to the current
             # max rather than reusing from_token.
+            # Remove the timer from the listener so we don't try to cancel it.
+            listener.timer = None
             listener.notify(
                 self,
                 [],
@@ -375,8 +416,11 @@ class Notifier(object):
         if not timeout:
             _timeout_listener()
         else:
-            self.clock.call_later(timeout/1000.0, _timeout_listener)
-
+            # Only add the timer if the listener hasn't been notified
+            if not listener.notified():
+                listener.timer = self.clock.call_later(
+                    timeout/1000.0, _timeout_listener
+                )
         return
 
     @log_function
@@ -427,3 +471,17 @@ class Notifier(object):
 
         listeners = self.room_to_listeners.setdefault(room_id, set())
         listeners |= new_listeners
+
+        for l in new_listeners:
+            l.rooms.add(room_id)
+
+
+def _discard_if_notified(listener_set):
+    """Remove any 'stale' listeners from the given set.
+    """
+    to_discard = set()
+    for l in listener_set:
+        if l.notified():
+            to_discard.add(l)
+
+    listener_set -= to_discard