diff --git a/synapse/notifier.py b/synapse/notifier.py
index 214a2b28ca..4d10c05038 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -52,12 +52,11 @@ class _NotificationListener(object):
def notified(self):
return self.deferred.called
- def notify(self):
+ def notify(self, token):
""" Inform whoever is listening about the new events.
"""
-
try:
- self.deferred.callback(None)
+ self.deferred.callback(token)
except defer.AlreadyCalledError:
pass
@@ -73,15 +72,18 @@ class _NotifierUserStream(object):
"""
def __init__(self, user, rooms, current_token, appservice=None):
- self.user = user
+ self.user = str(user)
self.appservice = appservice
self.listeners = set()
- self.rooms = rooms
+ self.rooms = set(rooms)
self.current_token = current_token
- def notify(self, new_token):
+ def notify(self, stream_key, stream_id):
+ self.current_token = self.current_token.copy_and_replace(
+ stream_key, stream_id
+ )
for listener in self.listeners:
- listener.notify(new_token)
+ listener.notify(self.current_token)
self.listeners.clear()
def remove(self, notifier):
@@ -117,6 +119,7 @@ class Notifier(object):
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
+ self.pending_new_room_events = []
self.clock = hs.get_clock()
@@ -153,9 +156,21 @@ class Notifier(object):
lambda: count(bool, self.appservice_to_user_streams.values()),
)
+ def notify_pending_new_room_events(self, max_room_stream_id):
+ pending = sorted(self.pending_new_room_events)
+ self.pending_new_room_events = []
+ for event, room_stream_id, extra_users in pending:
+ if room_stream_id > max_room_stream_id:
+ self.pending_new_room_events.append((
+ event, room_stream_id, extra_users
+ ))
+ else:
+ self._on_new_room_event(event, room_stream_id, extra_users)
+
@log_function
@defer.inlineCallbacks
- def on_new_room_event(self, event, new_token, extra_users=[]):
+ def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
+ extra_users=[]):
""" Used by handlers to inform the notifier something has happened
in the room, room event wise.
@@ -163,8 +178,18 @@ class Notifier(object):
listening to the room, and any listeners for the users in the
`extra_users` param.
"""
- assert isinstance(new_token, StreamToken)
yield run_on_reactor()
+
+ self.notify_pending_new_room_events(max_room_stream_id)
+
+ if room_stream_id > max_room_stream_id:
+ self.pending_new_room_events.append((
+ event, room_stream_id, extra_users
+ ))
+ else:
+ self._on_new_room_event(event, room_stream_id, extra_users)
+
+ def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
# poke any interested application service.
self.hs.get_handlers().appservice_handler.notify_interested_services(
event
@@ -197,33 +222,32 @@ class Notifier(object):
for user_stream in user_streams:
try:
- user_stream.notify(new_token)
+ user_stream.notify("room_key", "s%d" % (room_stream_id,))
except:
logger.exception("Failed to notify listener")
@defer.inlineCallbacks
@log_function
- def on_new_user_event(self, new_token, users=[], rooms=[]):
+ def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend
presence/user event wise.
Will wake up all listeners for the given users and rooms.
"""
- assert isinstance(new_token, StreamToken)
yield run_on_reactor()
user_streams = set()
for user in users:
user_stream = self.user_to_user_stream.get(user)
- if user_stream:
- user_stream.add(user_stream)
+ if user_stream is not None:
+ user_streams.add(user_stream)
for room in rooms:
user_streams |= self.room_to_user_streams.get(room, set())
for user_stream in user_streams:
try:
- user_streams.notify(new_token)
+ user_stream.notify(stream_key, new_token)
except:
logger.exception("Failed to notify listener")
@@ -236,12 +260,12 @@ class Notifier(object):
deferred = defer.Deferred()
- user_stream = self.user_to_user_streams.get(user)
+ user = str(user)
+ user_stream = self.user_to_user_stream.get(user)
if user_stream is None:
- appservice = yield self.store.get_app_service_by_user_id(
- user.to_string()
- )
+ appservice = yield self.store.get_app_service_by_user_id(user)
current_token = yield self.event_sources.get_current_token()
+ rooms = yield self.store.get_rooms_for_user(user)
user_stream = _NotifierUserStream(
user=user,
rooms=rooms,
@@ -252,8 +276,9 @@ class Notifier(object):
else:
current_token = user_stream.current_token
+ listener = [_NotificationListener(deferred)]
+
if timeout and not current_token.is_after(from_token):
- listener = [_NotificationListener(deferred)]
user_stream.listeners.add(listener[0])
if current_token.is_after(from_token):
@@ -334,7 +359,7 @@ class Notifier(object):
self.user_to_user_stream[user_stream.user] = user_stream
for room in user_stream.rooms:
- s = self.room_to_user_stream.setdefault(room, set())
+ s = self.room_to_user_streams.setdefault(room, set())
s.add(user_stream)
if user_stream.appservice:
@@ -343,10 +368,12 @@ class Notifier(object):
).add(user_stream)
def _user_joined_room(self, user, room_id):
+ user = str(user)
new_user_stream = self.user_to_user_stream.get(user)
- room_streams = self.room_to_user_streams.setdefault(room_id, set())
- room_streams.add(new_user_stream)
- new_user_stream.rooms.add(room_id)
+ if new_user_stream is not None:
+ room_streams = self.room_to_user_streams.setdefault(room_id, set())
+ room_streams.add(new_user_stream)
+ new_user_stream.rooms.add(room_id)
def _discard_if_notified(listener_set):
|