diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6eaa65071e..32bd16661d 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -22,6 +22,8 @@ from synapse.util.async import run_on_reactor, ObservableDeferred
from synapse.types import StreamToken
import synapse.metrics
+from collections import namedtuple
+
import logging
@@ -118,6 +120,11 @@ class _NotifierUserStream(object):
return _NotificationListener(self.notify_deferred.observe())
+class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
+ def __nonzero__(self):
+ return bool(self.events)
+
+
class Notifier(object):
""" This class is responsible for notifying any listeners when there are
new events available for it.
@@ -356,7 +363,7 @@ class Notifier(object):
@defer.inlineCallbacks
def check_for_updates(before_token, after_token):
if not after_token.is_after(before_token):
- defer.returnValue(None)
+ defer.returnValue(EventStreamResult([], (before_token, before_token)))
events = []
end_token = from_token
@@ -369,10 +376,14 @@ class Notifier(object):
continue
if only_keys and name not in only_keys:
continue
+ if limit:
+ new_limit = max(limit * 2, 10)
+ else:
+ new_limit = 10
new_events, new_key = yield source.get_new_events(
user=user,
from_key=getattr(from_token, keyname),
- limit=limit,
+ limit=new_limit,
is_guest=is_peeking,
room_ids=room_ids,
)
@@ -388,10 +399,7 @@ class Notifier(object):
events.extend(new_events)
end_token = end_token.copy_and_replace(keyname, new_key)
- if events:
- defer.returnValue((events, (from_token, end_token)))
- else:
- defer.returnValue(None)
+ defer.returnValue(EventStreamResult(events, (from_token, end_token)))
user_id_for_stream = user.to_string()
if is_peeking:
@@ -415,9 +423,6 @@ class Notifier(object):
from_token=from_token,
)
- if result is None:
- result = ([], (from_token, from_token))
-
defer.returnValue(result)
@defer.inlineCallbacks
|