diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f8629a588f..9f5f73eab6 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -72,6 +72,7 @@ class SyncHandler(BaseHandler):
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
+ @defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
@@ -80,15 +81,19 @@ class SyncHandler(BaseHandler):
A Deferred SyncResult.
"""
if timeout == 0 or since_token is None:
- return self.current_sync_for_user(sync_config, since_token)
+ result = yield self.current_sync_for_user(sync_config, since_token)
+ defer.returnValue(result)
else:
- def current_sync_callback(since_token):
- return self.current_sync_for_user(
- self, since_token, sync_config
- )
- return self.notifier.wait_for_events(
- sync_config.filter, since_token, current_sync_callback
+ def current_sync_callback():
+ return self.current_sync_for_user(sync_config, since_token)
+
+ rm_handler = self.hs.get_handlers().room_member_handler
+ room_ids = yield rm_handler.get_rooms_for_user(sync_config.user)
+ result = yield self.notifier.wait_for_events(
+ sync_config.user, room_ids,
+ sync_config.filter, timeout, current_sync_callback
)
+ defer.returnValue(result)
def current_sync_for_user(self, sync_config, since_token=None):
"""Get the sync for client needed to match what the server has now.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3aec1d4af2..922bf064d0 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.async import run_on_reactor
+from synapse.types import StreamToken
import logging
@@ -205,6 +206,53 @@ class Notifier(object):
[notify(l).addErrback(eb) for l in listeners]
)
+ @defer.inlineCallbacks
+ def wait_for_events(self, user, rooms, filter, timeout, callback):
+ """Wait until the callback returns a non empty response or the
+ timeout fires.
+ """
+
+ deferred = defer.Deferred()
+
+ from_token=StreamToken("s0","0","0")
+
+ listener = [_NotificationListener(
+ user=user,
+ rooms=rooms,
+ from_token=from_token,
+ limit=1,
+ timeout=timeout,
+ deferred=deferred,
+ )]
+
+ if timeout:
+ self._register_with_keys(listener[0])
+
+ result = yield callback()
+ if timeout:
+ timed_out = [False]
+ def _timeout_listener():
+ timed_out[0] = True
+ listener[0].notify(self, [], from_token, from_token)
+
+ self.clock.call_later(timeout/1000., _timeout_listener)
+ while not result and not timed_out[0]:
+ yield deferred
+ deferred = defer.Deferred()
+ listener[0] = _NotificationListener(
+ user=user,
+ rooms=rooms,
+ from_token=from_token,
+ limit=1,
+ timeout=timeout,
+ deferred=deferred,
+ )
+ self._register_with_keys(listener[0])
+ result = yield callback()
+
+ defer.returnValue(result)
+
+
def get_events_for(self, user, rooms, pagination_config, timeout):
""" For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
|