diff --git a/synapse/notifier.py b/synapse/notifier.py
index 78eb28e4b2..e16a4608e9 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -305,14 +305,16 @@ class Notifier(object):
)
@defer.inlineCallbacks
- def wait_for_events(self, user, rooms, filter, timeout, callback):
+ def wait_for_events(self, user, rooms, timeout, callback,
+ from_token=StreamToken("s0", "0", "0")):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
deferred = defer.Deferred()
-
- from_token = StreamToken("s0", "0", "0")
+ appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
+ user.to_string()
+ )
listener = [_NotificationListener(
user=user,
@@ -321,6 +323,7 @@ class Notifier(object):
limit=1,
timeout=timeout,
deferred=deferred,
+ appservice=appservice,
)]
if timeout:
@@ -363,65 +366,43 @@ class Notifier(object):
defer.returnValue(result)
+ @defer.inlineCallbacks
def get_events_for(self, user, rooms, pagination_config, timeout):
""" For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
new events to happen before returning.
"""
- deferred = defer.Deferred()
-
- self._get_events(
- deferred, user, rooms, pagination_config.from_token,
- pagination_config.limit, timeout
- ).addErrback(deferred.errback)
-
- return deferred
-
- @defer.inlineCallbacks
- def _get_events(self, deferred, user, rooms, from_token, limit, timeout):
+ from_token = pagination_config.from_token
if not from_token:
from_token = yield self.event_sources.get_current_token()
- appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
- user.to_string()
- )
+ limit = pagination_config.limit
- listener = _NotificationListener(
- user,
- rooms,
- from_token,
- limit,
- timeout,
- deferred,
- appservice=appservice
- )
+ @defer.inlineCallbacks
+ def check_for_updates():
+ events = []
+ end_token = from_token
+ for name, source in self.event_sources.sources.items():
+ keyname = "%s_key" % name
+ stuff, new_key = yield source.get_new_events_for_user(
+ user, getattr(from_token, keyname), limit,
+ )
+ events.extend(stuff)
+ end_token = from_token.copy_and_replace(keyname, new_key)
- def _timeout_listener():
- # TODO (erikj): We should probably set to_token to the current
- # max rather than reusing from_token.
- # Remove the timer from the listener so we don't try to cancel it.
- listener.timer = None
- listener.notify(
- self,
- [],
- listener.from_token,
- listener.from_token,
- )
+ if events:
+ defer.returnValue((events, (from_token, end_token)))
+ else:
+ defer.returnValue(None)
- if timeout:
- self._register_with_keys(listener)
+ result = yield self.wait_for_events(
+ user, rooms, timeout, check_for_updates, from_token=from_token
+ )
- yield self._check_for_updates(listener)
+ if result is None:
+ result = ([], (from_token, from_token))
- if not timeout:
- _timeout_listener()
- else:
- # Only add the timer if the listener hasn't been notified
- if not listener.notified():
- listener.timer = self.clock.call_later(
- timeout/1000.0, _timeout_listener
- )
- return
+ defer.returnValue(result)
@log_function
def _register_with_keys(self, listener):
@@ -436,36 +417,6 @@ class Notifier(object):
listener.appservice, set()
).add(listener)
- @defer.inlineCallbacks
- @log_function
- def _check_for_updates(self, listener):
- # TODO (erikj): We need to think about limits across multiple sources
- events = []
-
- from_token = listener.from_token
- limit = listener.limit
-
- # TODO (erikj): DeferredList?
- for name, source in self.event_sources.sources.items():
- keyname = "%s_key" % name
-
- stuff, new_key = yield source.get_new_events_for_user(
- listener.user,
- getattr(from_token, keyname),
- limit,
- )
-
- events.extend(stuff)
-
- from_token = from_token.copy_and_replace(keyname, new_key)
-
- end_token = from_token
-
- if events:
- listener.notify(self, events, listener.from_token, end_token)
-
- defer.returnValue(listener)
-
def _user_joined_room(self, user, room_id):
new_listeners = self.user_to_listeners.get(user, set())
|