diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/notifier.py | 52 |
1 files changed, 33 insertions, 19 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 3dbd6f984d..862b42cfc8 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -81,6 +81,13 @@ class _NotifierUserStream(object): self.last_notified_ms = time_now_ms def notify(self, stream_key, stream_id, time_now_ms): + """Notify any listeners for this user of a new event from an + event source. + Args: + stream_key(str): The stream the event came from. + stream_id(str): The new id for the stream the event came from. + time_now_ms(int): The current time in milliseconds. + """ self.current_token = self.current_token.copy_and_replace( stream_key, stream_id ) @@ -167,17 +174,6 @@ class Notifier(object): lambda: count(bool, self.appservice_to_user_streams.values()), ) - def notify_pending_new_room_events(self, max_room_stream_id): - pending = sorted(self.pending_new_room_events) - self.pending_new_room_events = [] - for event, room_stream_id, extra_users in pending: - if room_stream_id > max_room_stream_id: - self.pending_new_room_events.append(( - event, room_stream_id, extra_users - )) - else: - self._on_new_room_event(event, room_stream_id, extra_users) - @log_function @defer.inlineCallbacks def on_new_room_event(self, event, room_stream_id, max_room_stream_id, @@ -188,19 +184,37 @@ class Notifier(object): This triggers the notifier to wake up any listeners that are listening to the room, and any listeners for the users in the `extra_users` param. + + The events can be peristed out of order. The notifier will wait + until all previous events have been persisted before notifying + the client streams. """ yield run_on_reactor() - self.notify_pending_new_room_events(max_room_stream_id) - - if room_stream_id > max_room_stream_id: - self.pending_new_room_events.append(( - event, room_stream_id, extra_users - )) - else: - self._on_new_room_event(event, room_stream_id, extra_users) + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + self._notify_pending_new_room_events(max_room_stream_id) + + def _notify_pending_new_room_events(self, max_room_stream_id): + """Notify for the room events that were queued waiting for a previous + event to be persisted. + Args: + max_room_stream_id(int): The highest stream_id below which all + events have been persisted. + """ + pending = sorted(self.pending_new_room_events) + self.pending_new_room_events = [] + for event, room_stream_id, extra_users in pending: + if room_stream_id > max_room_stream_id: + self.pending_new_room_events.append(( + event, room_stream_id, extra_users + )) + else: + self._on_new_room_event(event, room_stream_id, extra_users) def _on_new_room_event(self, event, room_stream_id, extra_users=[]): + """Notify any user streams that are interested in this room event""" # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event |