summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py109
1 files changed, 30 insertions, 79 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 78eb28e4b2..e16a4608e9 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -305,14 +305,16 @@ class Notifier(object):
             )
 
     @defer.inlineCallbacks
-    def wait_for_events(self, user, rooms, filter, timeout, callback):
+    def wait_for_events(self, user, rooms, timeout, callback,
+                        from_token=StreamToken("s0", "0", "0")):
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
 
         deferred = defer.Deferred()
-
-        from_token = StreamToken("s0", "0", "0")
+        appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
+            user.to_string()
+        )
 
         listener = [_NotificationListener(
             user=user,
@@ -321,6 +323,7 @@ class Notifier(object):
             limit=1,
             timeout=timeout,
             deferred=deferred,
+            appservice=appservice,
         )]
 
         if timeout:
@@ -363,65 +366,43 @@ class Notifier(object):
 
         defer.returnValue(result)
 
+    @defer.inlineCallbacks
     def get_events_for(self, user, rooms, pagination_config, timeout):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any
         new events to happen before returning.
         """
-        deferred = defer.Deferred()
-
-        self._get_events(
-            deferred, user, rooms, pagination_config.from_token,
-            pagination_config.limit, timeout
-        ).addErrback(deferred.errback)
-
-        return deferred
-
-    @defer.inlineCallbacks
-    def _get_events(self, deferred, user, rooms, from_token, limit, timeout):
+        from_token = pagination_config.from_token
         if not from_token:
             from_token = yield self.event_sources.get_current_token()
 
-        appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
-            user.to_string()
-        )
+        limit = pagination_config.limit
 
-        listener = _NotificationListener(
-            user,
-            rooms,
-            from_token,
-            limit,
-            timeout,
-            deferred,
-            appservice=appservice
-        )
+        @defer.inlineCallbacks
+        def check_for_updates():
+            events = []
+            end_token = from_token
+            for name, source in self.event_sources.sources.items():
+                keyname = "%s_key" % name
+                stuff, new_key = yield source.get_new_events_for_user(
+                    user, getattr(from_token, keyname), limit,
+                )
+                events.extend(stuff)
+                end_token = from_token.copy_and_replace(keyname, new_key)
 
-        def _timeout_listener():
-            # TODO (erikj): We should probably set to_token to the current
-            # max rather than reusing from_token.
-            # Remove the timer from the listener so we don't try to cancel it.
-            listener.timer = None
-            listener.notify(
-                self,
-                [],
-                listener.from_token,
-                listener.from_token,
-            )
+            if events:
+                defer.returnValue((events, (from_token, end_token)))
+            else:
+                defer.returnValue(None)
 
-        if timeout:
-            self._register_with_keys(listener)
+        result = yield self.wait_for_events(
+            user, rooms, timeout, check_for_updates, from_token=from_token
+        )
 
-        yield self._check_for_updates(listener)
+        if result is None:
+            result = ([], (from_token, from_token))
 
-        if not timeout:
-            _timeout_listener()
-        else:
-            # Only add the timer if the listener hasn't been notified
-            if not listener.notified():
-                listener.timer = self.clock.call_later(
-                    timeout/1000.0, _timeout_listener
-                )
-        return
+        defer.returnValue(result)
 
     @log_function
     def _register_with_keys(self, listener):
@@ -436,36 +417,6 @@ class Notifier(object):
                 listener.appservice, set()
             ).add(listener)
 
-    @defer.inlineCallbacks
-    @log_function
-    def _check_for_updates(self, listener):
-        # TODO (erikj): We need to think about limits across multiple sources
-        events = []
-
-        from_token = listener.from_token
-        limit = listener.limit
-
-        # TODO (erikj): DeferredList?
-        for name, source in self.event_sources.sources.items():
-            keyname = "%s_key" % name
-
-            stuff, new_key = yield source.get_new_events_for_user(
-                listener.user,
-                getattr(from_token, keyname),
-                limit,
-            )
-
-            events.extend(stuff)
-
-            from_token = from_token.copy_and_replace(keyname, new_key)
-
-        end_token = from_token
-
-        if events:
-            listener.notify(self, events, listener.from_token, end_token)
-
-        defer.returnValue(listener)
-
     def _user_joined_room(self, user, room_id):
         new_listeners = self.user_to_listeners.get(user, set())