summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py168
1 files changed, 72 insertions, 96 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 078abfc56d..dbd8efe9fb 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -16,7 +16,7 @@
 from twisted.internet import defer
 
 from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor
+from synapse.util.async import run_on_reactor, ObservableDeferred
 from synapse.types import StreamToken
 import synapse.metrics
 
@@ -45,21 +45,11 @@ class _NotificationListener(object):
     The events stream handler will have yielded to the deferred, so to
     notify the handler it is sufficient to resolve the deferred.
     """
+    __slots__ = ["deferred"]
 
     def __init__(self, deferred):
         self.deferred = deferred
 
-    def notified(self):
-        return self.deferred.called
-
-    def notify(self, token):
-        """ Inform whoever is listening about the new events.
-        """
-        try:
-            self.deferred.callback(token)
-        except defer.AlreadyCalledError:
-            pass
-
 
 class _NotifierUserStream(object):
     """This represents a user connected to the event stream.
@@ -75,11 +65,12 @@ class _NotifierUserStream(object):
                  appservice=None):
         self.user = str(user)
         self.appservice = appservice
-        self.listeners = set()
         self.rooms = set(rooms)
         self.current_token = current_token
         self.last_notified_ms = time_now_ms
 
+        self.notify_deferred = ObservableDeferred(defer.Deferred())
+
     def notify(self, stream_key, stream_id, time_now_ms):
         """Notify any listeners for this user of a new event from an
         event source.
@@ -91,12 +82,10 @@ class _NotifierUserStream(object):
         self.current_token = self.current_token.copy_and_advance(
             stream_key, stream_id
         )
-        if self.listeners:
-            self.last_notified_ms = time_now_ms
-            listeners = self.listeners
-            self.listeners = set()
-            for listener in listeners:
-                listener.notify(self.current_token)
+        self.last_notified_ms = time_now_ms
+        noify_deferred = self.notify_deferred
+        self.notify_deferred = ObservableDeferred(defer.Deferred())
+        noify_deferred.callback(self.current_token)
 
     def remove(self, notifier):
         """ Remove this listener from all the indexes in the Notifier
@@ -114,6 +103,18 @@ class _NotifierUserStream(object):
                 self.appservice, set()
             ).discard(self)
 
+    def count_listeners(self):
+        return len(self.notify_deferred.observers())
+
+    def new_listener(self, token):
+        """Returns a deferred that is resolved when there is a new token
+        greater than the given token.
+        """
+        if self.current_token.is_after(token):
+            return _NotificationListener(defer.succeed(self.current_token))
+        else:
+            return _NotificationListener(self.notify_deferred.observe())
+
 
 class Notifier(object):
     """ This class is responsible for notifying any listeners when there are
@@ -158,7 +159,7 @@ class Notifier(object):
             for x in self.appservice_to_user_streams.values():
                 all_user_streams |= x
 
-            return sum(len(stream.listeners) for stream in all_user_streams)
+            return sum(stream.count_listeners() for stream in all_user_streams)
         metrics.register_callback("listeners", count_listeners)
 
         metrics.register_callback(
@@ -220,16 +221,7 @@ class Notifier(object):
             event
         )
 
-        room_id = event.room_id
-
-        room_user_streams = self.room_to_user_streams.get(room_id, set())
-
-        user_streams = room_user_streams.copy()
-
-        for user in extra_users:
-            user_stream = self.user_to_user_stream.get(str(user))
-            if user_stream is not None:
-                user_streams.add(user_stream)
+        app_streams = set()
 
         for appservice in self.appservice_to_user_streams:
             # TODO (kegan): Redundant appservice listener checks?
@@ -241,24 +233,20 @@ class Notifier(object):
                 app_user_streams = self.appservice_to_user_streams.get(
                     appservice, set()
                 )
-                user_streams |= app_user_streams
+                app_streams |= app_user_streams
 
-        logger.debug("on_new_room_event listeners %s", user_streams)
-
-        time_now_ms = self.clock.time_msec()
-        for user_stream in user_streams:
-            try:
-                user_stream.notify(
-                    "room_key", "s%d" % (room_stream_id,), time_now_ms
-                )
-            except:
-                logger.exception("Failed to notify listener")
+        self.on_new_event(
+            "room_key", room_stream_id,
+            users=extra_users,
+            rooms=[event.room_id],
+            extra_streams=app_streams,
+        )
 
     @defer.inlineCallbacks
     @log_function
-    def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]):
-        """ Used to inform listeners that something has happend
-        presence/user event wise.
+    def on_new_event(self, stream_key, new_token, users=[], rooms=[],
+                     extra_streams=set()):
+        """ Used to inform listeners that something has happend event wise.
 
         Will wake up all listeners for the given users and rooms.
         """
@@ -282,14 +270,10 @@ class Notifier(object):
 
     @defer.inlineCallbacks
     def wait_for_events(self, user, rooms, timeout, callback,
-                        from_token=StreamToken("s0", "0", "0")):
+                        from_token=StreamToken("s0", "0", "0", "0")):
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
-
-        deferred = defer.Deferred()
-        time_now_ms = self.clock.time_msec()
-
         user = str(user)
         user_stream = self.user_to_user_stream.get(user)
         if user_stream is None:
@@ -302,55 +286,44 @@ class Notifier(object):
                 rooms=rooms,
                 appservice=appservice,
                 current_token=current_token,
-                time_now_ms=time_now_ms,
+                time_now_ms=self.clock.time_msec(),
             )
             self._register_with_keys(user_stream)
+
+        result = None
+        if timeout:
+            # Will be set to a _NotificationListener that we'll be waiting on.
+            # Allows us to cancel it.
+            listener = None
+
+            def timed_out():
+                if listener:
+                    listener.deferred.cancel()
+            timer = self.clock.call_later(timeout/1000., timed_out)
+
+            prev_token = from_token
+            while not result:
+                try:
+                    current_token = user_stream.current_token
+
+                    result = yield callback(prev_token, current_token)
+                    if result:
+                        break
+
+                    # Now we wait for the _NotifierUserStream to be told there
+                    # is a new token.
+                    # We need to supply the token we supplied to callback so
+                    # that we don't miss any current_token updates.
+                    prev_token = current_token
+                    listener = user_stream.new_listener(prev_token)
+                    yield listener.deferred
+                except defer.CancelledError:
+                    break
+
+            self.clock.cancel_call_later(timer, ignore_errs=True)
         else:
             current_token = user_stream.current_token
-
-        listener = [_NotificationListener(deferred)]
-
-        if timeout and not current_token.is_after(from_token):
-            user_stream.listeners.add(listener[0])
-
-        if current_token.is_after(from_token):
             result = yield callback(from_token, current_token)
-        else:
-            result = None
-
-        timer = [None]
-
-        if result:
-            user_stream.listeners.discard(listener[0])
-            defer.returnValue(result)
-            return
-
-        if timeout:
-            timed_out = [False]
-
-            def _timeout_listener():
-                timed_out[0] = True
-                timer[0] = None
-                user_stream.listeners.discard(listener[0])
-                listener[0].notify(current_token)
-
-            # We create multiple notification listeners so we have to manage
-            # canceling the timeout ourselves.
-            timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
-
-            while not result and not timed_out[0]:
-                new_token = yield deferred
-                deferred = defer.Deferred()
-                listener[0] = _NotificationListener(deferred)
-                user_stream.listeners.add(listener[0])
-                result = yield callback(current_token, new_token)
-                current_token = new_token
-
-        if timer[0] is not None:
-            try:
-                self.clock.cancel_call_later(timer[0])
-            except:
-                logger.exception("Failed to cancel notifer timer")
 
         defer.returnValue(result)
 
@@ -368,6 +341,9 @@ class Notifier(object):
 
         @defer.inlineCallbacks
         def check_for_updates(before_token, after_token):
+            if not after_token.is_after(before_token):
+                defer.returnValue(None)
+
             events = []
             end_token = from_token
             for name, source in self.event_sources.sources.items():
@@ -376,10 +352,10 @@ class Notifier(object):
                 after_id = getattr(after_token, keyname)
                 if before_id == after_id:
                     continue
-                stuff, new_key = yield source.get_new_events_for_user(
+                new_events, new_key = yield source.get_new_events_for_user(
                     user, getattr(from_token, keyname), limit,
                 )
-                events.extend(stuff)
+                events.extend(new_events)
                 end_token = end_token.copy_and_replace(keyname, new_key)
 
             if events:
@@ -402,7 +378,7 @@ class Notifier(object):
         expired_streams = []
         expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS
         for stream in self.user_to_user_stream.values():
-            if stream.listeners:
+            if stream.count_listeners():
                 continue
             if stream.last_notified_ms < expire_before_ts:
                 expired_streams.append(stream)