diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3dbd6f984d..862b42cfc8 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -81,6 +81,13 @@ class _NotifierUserStream(object):
self.last_notified_ms = time_now_ms
def notify(self, stream_key, stream_id, time_now_ms):
+ """Notify any listeners for this user of a new event from an
+ event source.
+ Args:
+ stream_key(str): The stream the event came from.
+ stream_id(str): The new id for the stream the event came from.
+ time_now_ms(int): The current time in milliseconds.
+ """
self.current_token = self.current_token.copy_and_replace(
stream_key, stream_id
)
@@ -167,17 +174,6 @@ class Notifier(object):
lambda: count(bool, self.appservice_to_user_streams.values()),
)
- def notify_pending_new_room_events(self, max_room_stream_id):
- pending = sorted(self.pending_new_room_events)
- self.pending_new_room_events = []
- for event, room_stream_id, extra_users in pending:
- if room_stream_id > max_room_stream_id:
- self.pending_new_room_events.append((
- event, room_stream_id, extra_users
- ))
- else:
- self._on_new_room_event(event, room_stream_id, extra_users)
-
@log_function
@defer.inlineCallbacks
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
@@ -188,19 +184,37 @@ class Notifier(object):
This triggers the notifier to wake up any listeners that are
listening to the room, and any listeners for the users in the
`extra_users` param.
+
+ The events can be peristed out of order. The notifier will wait
+ until all previous events have been persisted before notifying
+ the client streams.
"""
yield run_on_reactor()
- self.notify_pending_new_room_events(max_room_stream_id)
-
- if room_stream_id > max_room_stream_id:
- self.pending_new_room_events.append((
- event, room_stream_id, extra_users
- ))
- else:
- self._on_new_room_event(event, room_stream_id, extra_users)
+ self.pending_new_room_events.append((
+ event, room_stream_id, extra_users
+ ))
+ self._notify_pending_new_room_events(max_room_stream_id)
+
+ def _notify_pending_new_room_events(self, max_room_stream_id):
+ """Notify for the room events that were queued waiting for a previous
+ event to be persisted.
+ Args:
+ max_room_stream_id(int): The highest stream_id below which all
+ events have been persisted.
+ """
+ pending = sorted(self.pending_new_room_events)
+ self.pending_new_room_events = []
+ for event, room_stream_id, extra_users in pending:
+ if room_stream_id > max_room_stream_id:
+ self.pending_new_room_events.append((
+ event, room_stream_id, extra_users
+ ))
+ else:
+ self._on_new_room_event(event, room_stream_id, extra_users)
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
+ """Notify any user streams that are interested in this room event"""
# poke any interested application service.
self.hs.get_handlers().appservice_handler.notify_interested_services(
event
|