summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/notifier.py52
1 files changed, 33 insertions, 19 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3dbd6f984d..862b42cfc8 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -81,6 +81,13 @@ class _NotifierUserStream(object):
         self.last_notified_ms = time_now_ms
 
     def notify(self, stream_key, stream_id, time_now_ms):
+        """Notify any listeners for this user of a new event from an
+        event source.
+        Args:
+            stream_key(str): The stream the event came from.
+            stream_id(str): The new id for the stream the event came from.
+            time_now_ms(int): The current time in milliseconds.
+        """
         self.current_token = self.current_token.copy_and_replace(
             stream_key, stream_id
         )
@@ -167,17 +174,6 @@ class Notifier(object):
             lambda: count(bool, self.appservice_to_user_streams.values()),
         )
 
-    def notify_pending_new_room_events(self, max_room_stream_id):
-        pending = sorted(self.pending_new_room_events)
-        self.pending_new_room_events = []
-        for event, room_stream_id, extra_users in pending:
-            if room_stream_id > max_room_stream_id:
-                self.pending_new_room_events.append((
-                    event, room_stream_id, extra_users
-                ))
-            else:
-                self._on_new_room_event(event, room_stream_id, extra_users)
-
     @log_function
     @defer.inlineCallbacks
     def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
@@ -188,19 +184,37 @@ class Notifier(object):
         This triggers the notifier to wake up any listeners that are
         listening to the room, and any listeners for the users in the
         `extra_users` param.
+
+        The events can be peristed out of order. The notifier will wait
+        until all previous events have been persisted before notifying
+        the client streams.
         """
         yield run_on_reactor()
 
-        self.notify_pending_new_room_events(max_room_stream_id)
-
-        if room_stream_id > max_room_stream_id:
-            self.pending_new_room_events.append((
-                event, room_stream_id, extra_users
-            ))
-        else:
-            self._on_new_room_event(event, room_stream_id, extra_users)
+        self.pending_new_room_events.append((
+            event, room_stream_id, extra_users
+        ))
+        self._notify_pending_new_room_events(max_room_stream_id)
+
+    def _notify_pending_new_room_events(self, max_room_stream_id):
+        """Notify for the room events that were queued waiting for a previous
+        event to be persisted.
+        Args:
+            max_room_stream_id(int): The highest stream_id below which all
+                events have been persisted.
+        """
+        pending = sorted(self.pending_new_room_events)
+        self.pending_new_room_events = []
+        for event, room_stream_id, extra_users in pending:
+            if room_stream_id > max_room_stream_id:
+                self.pending_new_room_events.append((
+                    event, room_stream_id, extra_users
+                ))
+            else:
+                self._on_new_room_event(event, room_stream_id, extra_users)
 
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
+        """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
         self.hs.get_handlers().appservice_handler.notify_interested_services(
             event