summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/notifier.py222
1 files changed, 48 insertions, 174 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 7282dfd7f3..4ebe1d66de 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from synapse.util.logutils import log_function
+from synapse.util.async import run_on_reactor
 from synapse.types import StreamToken
 import synapse.metrics
 
@@ -49,13 +50,9 @@ class _NotificationListener(object):
     so that it can remove itself from the indexes in the Notifier class.
     """
 
-    def __init__(self, user, rooms, from_token, limit, timeout, deferred,
-                 appservice=None):
+    def __init__(self, user, rooms, deferred, appservice=None):
         self.user = user
         self.appservice = appservice
-        self.from_token = from_token
-        self.limit = limit
-        self.timeout = timeout
         self.deferred = deferred
         self.rooms = rooms
         self.timer = None
@@ -63,17 +60,14 @@ class _NotificationListener(object):
     def notified(self):
         return self.deferred.called
 
-    def notify(self, notifier, events, start_token, end_token):
+    def notify(self, notifier):
         """ Inform whoever is listening about the new events. This will
         also remove this listener from all the indexes in the Notifier
         it knows about.
         """
 
-        result = (events, (start_token, end_token))
-
         try:
-            self.deferred.callback(result)
-            notified_events_counter.inc_by(len(events))
+            self.deferred.callback(None)
         except defer.AlreadyCalledError:
             pass
 
@@ -160,6 +154,7 @@ class Notifier(object):
         listening to the room, and any listeners for the users in the
         `extra_users` param.
         """
+        yield run_on_reactor()
         # poke any interested application service.
         self.hs.get_handlers().appservice_handler.notify_interested_services(
             event
@@ -167,8 +162,6 @@ class Notifier(object):
 
         room_id = event.room_id
 
-        room_source = self.event_sources.sources["room"]
-
         room_listeners = self.room_to_listeners.get(room_id, set())
 
         _discard_if_notified(room_listeners)
@@ -199,33 +192,11 @@ class Notifier(object):
 
         logger.debug("on_new_room_event listeners %s", listeners)
 
-        # TODO (erikj): Can we make this more efficient by hitting the
-        # db once?
-
-        @defer.inlineCallbacks
-        def notify(listener):
-            events, end_key = yield room_source.get_new_events_for_user(
-                listener.user,
-                listener.from_token.room_key,
-                listener.limit,
-            )
-
-            if events:
-                end_token = listener.from_token.copy_and_replace(
-                    "room_key", end_key
-                )
-
-                listener.notify(
-                    self, events, listener.from_token, end_token
-                )
-
-        def eb(failure):
-            logger.exception("Failed to notify listener", failure)
-
-        yield defer.DeferredList(
-            [notify(l).addErrback(eb) for l in listeners],
-            consumeErrors=True,
-        )
+        for listener in listeners:
+            try:
+                listener.notify(self)
+            except:
+                logger.exception("Failed to notify listener")
 
     @defer.inlineCallbacks
     @log_function
@@ -235,11 +206,7 @@ class Notifier(object):
 
         Will wake up all listeners for the given users and rooms.
         """
-        # TODO(paul): This is horrible, having to manually list every event
-        # source here individually
-        presence_source = self.event_sources.sources["presence"]
-        typing_source = self.event_sources.sources["typing"]
-
+        yield run_on_reactor()
         listeners = set()
 
         for user in users:
@@ -256,68 +223,29 @@ class Notifier(object):
 
             listeners |= room_listeners
 
-        @defer.inlineCallbacks
-        def notify(listener):
-            presence_events, presence_end_key = (
-                yield presence_source.get_new_events_for_user(
-                    listener.user,
-                    listener.from_token.presence_key,
-                    listener.limit,
-                )
-            )
-            typing_events, typing_end_key = (
-                yield typing_source.get_new_events_for_user(
-                    listener.user,
-                    listener.from_token.typing_key,
-                    listener.limit,
-                )
-            )
-
-            if presence_events or typing_events:
-                end_token = listener.from_token.copy_and_replace(
-                    "presence_key", presence_end_key
-                ).copy_and_replace(
-                    "typing_key", typing_end_key
-                )
-
-                listener.notify(
-                    self,
-                    presence_events + typing_events,
-                    listener.from_token,
-                    end_token
-                )
-
-        def eb(failure):
-            logger.error(
-                "Failed to notify listener",
-                exc_info=(
-                    failure.type,
-                    failure.value,
-                    failure.getTracebackObject())
-            )
-
-        yield defer.DeferredList(
-            [notify(l).addErrback(eb) for l in listeners],
-            consumeErrors=True,
-        )
+        for listener in listeners:
+            try:
+                listener.notify(self)
+            except:
+                logger.exception("Failed to notify listener")
 
     @defer.inlineCallbacks
-    def wait_for_events(self, user, rooms, filter, timeout, callback):
+    def wait_for_events(self, user, rooms, timeout, callback,
+                        from_token=StreamToken("s0", "0", "0")):
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
 
         deferred = defer.Deferred()
-
-        from_token = StreamToken("s0", "0", "0")
+        appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
+            user.to_string()
+        )
 
         listener = [_NotificationListener(
             user=user,
             rooms=rooms,
-            from_token=from_token,
-            limit=1,
-            timeout=timeout,
             deferred=deferred,
+            appservice=appservice,
         )]
 
         if timeout:
@@ -332,7 +260,7 @@ class Notifier(object):
             def _timeout_listener():
                 timed_out[0] = True
                 timer[0] = None
-                listener[0].notify(self, [], from_token, from_token)
+                listener[0].notify(self)
 
             # We create multiple notification listeners so we have to manage
             # canceling the timeout ourselves.
@@ -344,10 +272,8 @@ class Notifier(object):
                 listener[0] = _NotificationListener(
                     user=user,
                     rooms=rooms,
-                    from_token=from_token,
-                    limit=1,
-                    timeout=timeout,
                     deferred=deferred,
+                    appservice=appservice,
                 )
                 self._register_with_keys(listener[0])
                 result = yield callback()
@@ -360,65 +286,43 @@ class Notifier(object):
 
         defer.returnValue(result)
 
+    @defer.inlineCallbacks
     def get_events_for(self, user, rooms, pagination_config, timeout):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any
         new events to happen before returning.
         """
-        deferred = defer.Deferred()
-
-        self._get_events(
-            deferred, user, rooms, pagination_config.from_token,
-            pagination_config.limit, timeout
-        ).addErrback(deferred.errback)
-
-        return deferred
-
-    @defer.inlineCallbacks
-    def _get_events(self, deferred, user, rooms, from_token, limit, timeout):
+        from_token = pagination_config.from_token
         if not from_token:
             from_token = yield self.event_sources.get_current_token()
 
-        appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
-            user.to_string()
-        )
+        limit = pagination_config.limit
 
-        listener = _NotificationListener(
-            user,
-            rooms,
-            from_token,
-            limit,
-            timeout,
-            deferred,
-            appservice=appservice
-        )
+        @defer.inlineCallbacks
+        def check_for_updates():
+            events = []
+            end_token = from_token
+            for name, source in self.event_sources.sources.items():
+                keyname = "%s_key" % name
+                stuff, new_key = yield source.get_new_events_for_user(
+                    user, getattr(from_token, keyname), limit,
+                )
+                events.extend(stuff)
+                end_token = end_token.copy_and_replace(keyname, new_key)
 
-        def _timeout_listener():
-            # TODO (erikj): We should probably set to_token to the current
-            # max rather than reusing from_token.
-            # Remove the timer from the listener so we don't try to cancel it.
-            listener.timer = None
-            listener.notify(
-                self,
-                [],
-                listener.from_token,
-                listener.from_token,
-            )
+            if events:
+                defer.returnValue((events, (from_token, end_token)))
+            else:
+                defer.returnValue(None)
 
-        if timeout:
-            self._register_with_keys(listener)
+        result = yield self.wait_for_events(
+            user, rooms, timeout, check_for_updates, from_token=from_token
+        )
 
-        yield self._check_for_updates(listener)
+        if result is None:
+            result = ([], (from_token, from_token))
 
-        if not timeout:
-            _timeout_listener()
-        else:
-            # Only add the timer if the listener hasn't been notified
-            if not listener.notified():
-                listener.timer = self.clock.call_later(
-                    timeout/1000.0, _timeout_listener
-                )
-        return
+        defer.returnValue(result)
 
     @log_function
     def _register_with_keys(self, listener):
@@ -433,36 +337,6 @@ class Notifier(object):
                 listener.appservice, set()
             ).add(listener)
 
-    @defer.inlineCallbacks
-    @log_function
-    def _check_for_updates(self, listener):
-        # TODO (erikj): We need to think about limits across multiple sources
-        events = []
-
-        from_token = listener.from_token
-        limit = listener.limit
-
-        # TODO (erikj): DeferredList?
-        for name, source in self.event_sources.sources.items():
-            keyname = "%s_key" % name
-
-            stuff, new_key = yield source.get_new_events_for_user(
-                listener.user,
-                getattr(from_token, keyname),
-                limit,
-            )
-
-            events.extend(stuff)
-
-            from_token = from_token.copy_and_replace(keyname, new_key)
-
-        end_token = from_token
-
-        if events:
-            listener.notify(self, events, listener.from_token, end_token)
-
-        defer.returnValue(listener)
-
     def _user_joined_room(self, user, room_id):
         new_listeners = self.user_to_listeners.get(user, set())