summary refs log tree commit diff
diff options
context:
space:
mode:
authorMark Haines <mark.haines@matrix.org>2015-01-27 20:09:52 +0000
committerMark Haines <mark.haines@matrix.org>2015-01-27 20:09:52 +0000
commitb19cf6a105ad87954e15cf01e60e14fec280db6d (patch)
treeffe11b20ce8ffad0324660fb46077069fd404349
parentStart implementing incremental initial sync (diff)
downloadsynapse-b19cf6a105ad87954e15cf01e60e14fec280db6d.tar.xz
Wait for events if the incremental sync is empty and a timeout is given
-rwxr-xr-xdemo/start.sh2
-rw-r--r--synapse/handlers/sync.py19
-rw-r--r--synapse/notifier.py48
3 files changed, 61 insertions, 8 deletions
diff --git a/demo/start.sh b/demo/start.sh
index bb2248770d..a7502e7a8d 100755
--- a/demo/start.sh
+++ b/demo/start.sh
@@ -16,7 +16,7 @@ if [ $# -eq 1 ]; then
     fi
 fi
 
-for port in 8080 8081 8082; do
+for port in 8080; do
     echo "Starting server on port $port... "
 
     https_port=$((port + 400))
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f8629a588f..9f5f73eab6 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -72,6 +72,7 @@ class SyncHandler(BaseHandler):
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
 
+    @defer.inlineCallbacks
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
@@ -80,15 +81,19 @@ class SyncHandler(BaseHandler):
             A Deferred SyncResult.
         """
         if timeout == 0 or since_token is None:
-            return self.current_sync_for_user(sync_config, since_token)
+            result = yield self.current_sync_for_user(sync_config, since_token)
+            defer.returnValue(result)
         else:
-            def current_sync_callback(since_token):
-                return self.current_sync_for_user(
-                    self, since_token, sync_config
-                )
-            return self.notifier.wait_for_events(
-                sync_config.filter, since_token, current_sync_callback
+            def current_sync_callback():
+                return self.current_sync_for_user(sync_config, since_token)
+
+            rm_handler = self.hs.get_handlers().room_member_handler
+            room_ids = yield rm_handler.get_rooms_for_user(sync_config.user)
+            result = yield self.notifier.wait_for_events(
+                sync_config.user, room_ids,
+                sync_config.filter, timeout, current_sync_callback
             )
+            defer.returnValue(result)
 
     def current_sync_for_user(self, sync_config, since_token=None):
         """Get the sync for client needed to match what the server has now.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3aec1d4af2..922bf064d0 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.async import run_on_reactor
+from synapse.types import StreamToken
 
 import logging
 
@@ -205,6 +206,53 @@ class Notifier(object):
                 [notify(l).addErrback(eb) for l in listeners]
             )
 
+    @defer.inlineCallbacks
+    def wait_for_events(self, user, rooms, filter, timeout, callback):
+        """Wait until the callback returns a non empty response or the
+        timeout fires.
+        """
+
+        deferred = defer.Deferred()
+
+        from_token=StreamToken("s0","0","0")
+
+        listener = [_NotificationListener(
+            user=user,
+            rooms=rooms,
+            from_token=from_token,
+            limit=1,
+            timeout=timeout,
+            deferred=deferred,
+        )]
+
+        if timeout:
+            self._register_with_keys(listener[0])
+
+        result = yield callback()
+        if timeout:
+            timed_out = [False]
+            def _timeout_listener():
+                timed_out[0] = True
+                listener[0].notify(self, [], from_token, from_token)
+
+            self.clock.call_later(timeout/1000., _timeout_listener)
+            while not result and not timed_out[0]:
+                yield deferred
+                deferred = defer.Deferred()
+                listener[0] = _NotificationListener(
+                    user=user,
+                    rooms=rooms,
+                    from_token=from_token,
+                    limit=1,
+                    timeout=timeout,
+                    deferred=deferred,
+                )
+                self._register_with_keys(listener[0])
+                result = yield callback()
+
+        defer.returnValue(result)
+
+
     def get_events_for(self, user, rooms, pagination_config, timeout):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any