summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
authorKegan Dougal <kegan@matrix.org>2015-02-02 15:57:59 +0000
committerKegan Dougal <kegan@matrix.org>2015-02-02 15:57:59 +0000
commitc059c9fea5e3ef36cadc136e055284953556c4ed (patch)
tree39faa451578ceecc16c92c390a564ebec4fc2373 /synapse/notifier.py
parentAdd basic application_services SQL, and hook up parts of the appservice store... (diff)
parentIgnore empty strings for display names & room names in notifications (diff)
downloadsynapse-c059c9fea5e3ef36cadc136e055284953556c4ed.tar.xz
Merge branch 'develop' into application-services
Conflicts:
	synapse/handlers/__init__.py
	synapse/storage/__init__.py
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py48
1 files changed, 48 insertions, 0 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3aec1d4af2..e3b6ead620 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.async import run_on_reactor
+from synapse.types import StreamToken
 
 import logging
 
@@ -205,6 +206,53 @@ class Notifier(object):
                 [notify(l).addErrback(eb) for l in listeners]
             )
 
+    @defer.inlineCallbacks
+    def wait_for_events(self, user, rooms, filter, timeout, callback):
+        """Wait until the callback returns a non empty response or the
+        timeout fires.
+        """
+
+        deferred = defer.Deferred()
+
+        from_token = StreamToken("s0", "0", "0")
+
+        listener = [_NotificationListener(
+            user=user,
+            rooms=rooms,
+            from_token=from_token,
+            limit=1,
+            timeout=timeout,
+            deferred=deferred,
+        )]
+
+        if timeout:
+            self._register_with_keys(listener[0])
+
+        result = yield callback()
+        if timeout:
+            timed_out = [False]
+
+            def _timeout_listener():
+                timed_out[0] = True
+                listener[0].notify(self, [], from_token, from_token)
+
+            self.clock.call_later(timeout/1000., _timeout_listener)
+            while not result and not timed_out[0]:
+                yield deferred
+                deferred = defer.Deferred()
+                listener[0] = _NotificationListener(
+                    user=user,
+                    rooms=rooms,
+                    from_token=from_token,
+                    limit=1,
+                    timeout=timeout,
+                    deferred=deferred,
+                )
+                self._register_with_keys(listener[0])
+                result = yield callback()
+
+        defer.returnValue(result)
+
     def get_events_for(self, user, rooms, pagination_config, timeout):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any