summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2015-01-30 15:54:25 +0000
committerMark Haines <mjark@negativecurvature.net>2015-01-30 15:54:25 +0000
commit6dc92d3427c80c1b4ea3182ac5c36575f50e8366 (patch)
treed3121fece4d3abfe8b17a6b5cda9c595b0cce480 /synapse/notifier.py
parentMerge pull request #42 from matrix-org/replication_split (diff)
parentAdd doc string for __nonzero__ overrides for sync results, raise not implemen... (diff)
downloadsynapse-6dc92d3427c80c1b4ea3182ac5c36575f50e8366.tar.xz
Merge pull request #41 from matrix-org/client_v2_sync
Client v2 sync
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py48
1 files changed, 48 insertions, 0 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3aec1d4af2..e3b6ead620 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.async import run_on_reactor
+from synapse.types import StreamToken
 
 import logging
 
@@ -205,6 +206,53 @@ class Notifier(object):
                 [notify(l).addErrback(eb) for l in listeners]
             )
 
+    @defer.inlineCallbacks
+    def wait_for_events(self, user, rooms, filter, timeout, callback):
+        """Wait until the callback returns a non empty response or the
+        timeout fires.
+        """
+
+        deferred = defer.Deferred()
+
+        from_token = StreamToken("s0", "0", "0")
+
+        listener = [_NotificationListener(
+            user=user,
+            rooms=rooms,
+            from_token=from_token,
+            limit=1,
+            timeout=timeout,
+            deferred=deferred,
+        )]
+
+        if timeout:
+            self._register_with_keys(listener[0])
+
+        result = yield callback()
+        if timeout:
+            timed_out = [False]
+
+            def _timeout_listener():
+                timed_out[0] = True
+                listener[0].notify(self, [], from_token, from_token)
+
+            self.clock.call_later(timeout/1000., _timeout_listener)
+            while not result and not timed_out[0]:
+                yield deferred
+                deferred = defer.Deferred()
+                listener[0] = _NotificationListener(
+                    user=user,
+                    rooms=rooms,
+                    from_token=from_token,
+                    limit=1,
+                    timeout=timeout,
+                    deferred=deferred,
+                )
+                self._register_with_keys(listener[0])
+                result = yield callback()
+
+        defer.returnValue(result)
+
     def get_events_for(self, user, rooms, pagination_config, timeout):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any