summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py75
1 files changed, 51 insertions, 24 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 214a2b28ca..4d10c05038 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -52,12 +52,11 @@ class _NotificationListener(object):
     def notified(self):
         return self.deferred.called
 
-    def notify(self):
+    def notify(self, token):
         """ Inform whoever is listening about the new events.
         """
-
         try:
-            self.deferred.callback(None)
+            self.deferred.callback(token)
         except defer.AlreadyCalledError:
             pass
 
@@ -73,15 +72,18 @@ class _NotifierUserStream(object):
     """
 
     def __init__(self, user, rooms, current_token, appservice=None):
-        self.user = user
+        self.user = str(user)
         self.appservice = appservice
         self.listeners = set()
-        self.rooms = rooms
+        self.rooms = set(rooms)
         self.current_token = current_token
 
-    def notify(self, new_token):
+    def notify(self, stream_key, stream_id):
+        self.current_token = self.current_token.copy_and_replace(
+            stream_key, stream_id
+        )
         for listener in self.listeners:
-            listener.notify(new_token)
+            listener.notify(self.current_token)
         self.listeners.clear()
 
     def remove(self, notifier):
@@ -117,6 +119,7 @@ class Notifier(object):
 
         self.event_sources = hs.get_event_sources()
         self.store = hs.get_datastore()
+        self.pending_new_room_events = []
 
         self.clock = hs.get_clock()
 
@@ -153,9 +156,21 @@ class Notifier(object):
             lambda: count(bool, self.appservice_to_user_streams.values()),
         )
 
+    def notify_pending_new_room_events(self, max_room_stream_id):
+        pending = sorted(self.pending_new_room_events)
+        self.pending_new_room_events = []
+        for event, room_stream_id, extra_users in pending:
+            if room_stream_id > max_room_stream_id:
+                self.pending_new_room_events.append((
+                    event, room_stream_id, extra_users
+                ))
+            else:
+                self._on_new_room_event(event, room_stream_id, extra_users)
+
     @log_function
     @defer.inlineCallbacks
-    def on_new_room_event(self, event, new_token, extra_users=[]):
+    def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
+                          extra_users=[]):
         """ Used by handlers to inform the notifier something has happened
         in the room, room event wise.
 
@@ -163,8 +178,18 @@ class Notifier(object):
         listening to the room, and any listeners for the users in the
         `extra_users` param.
         """
-        assert isinstance(new_token, StreamToken)
         yield run_on_reactor()
+
+        self.notify_pending_new_room_events(max_room_stream_id)
+
+        if room_stream_id > max_room_stream_id:
+            self.pending_new_room_events.append((
+                event, room_stream_id, extra_users
+            ))
+        else:
+            self._on_new_room_event(event, room_stream_id, extra_users)
+
+    def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         # poke any interested application service.
         self.hs.get_handlers().appservice_handler.notify_interested_services(
             event
@@ -197,33 +222,32 @@ class Notifier(object):
 
         for user_stream in user_streams:
             try:
-                user_stream.notify(new_token)
+                user_stream.notify("room_key", "s%d" % (room_stream_id,))
             except:
                 logger.exception("Failed to notify listener")
 
     @defer.inlineCallbacks
     @log_function
-    def on_new_user_event(self, new_token, users=[], rooms=[]):
+    def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]):
         """ Used to inform listeners that something has happend
         presence/user event wise.
 
         Will wake up all listeners for the given users and rooms.
         """
-        assert isinstance(new_token, StreamToken)
         yield run_on_reactor()
         user_streams = set()
 
         for user in users:
             user_stream = self.user_to_user_stream.get(user)
-            if user_stream:
-                user_stream.add(user_stream)
+            if user_stream is not None:
+                user_streams.add(user_stream)
 
         for room in rooms:
             user_streams |= self.room_to_user_streams.get(room, set())
 
         for user_stream in user_streams:
             try:
-                user_streams.notify(new_token)
+                user_stream.notify(stream_key, new_token)
             except:
                 logger.exception("Failed to notify listener")
 
@@ -236,12 +260,12 @@ class Notifier(object):
 
         deferred = defer.Deferred()
 
-        user_stream = self.user_to_user_streams.get(user)
+        user = str(user)
+        user_stream = self.user_to_user_stream.get(user)
         if user_stream is None:
-            appservice = yield self.store.get_app_service_by_user_id(
-                user.to_string()
-            )
+            appservice = yield self.store.get_app_service_by_user_id(user)
             current_token = yield self.event_sources.get_current_token()
+            rooms = yield self.store.get_rooms_for_user(user)
             user_stream = _NotifierUserStream(
                 user=user,
                 rooms=rooms,
@@ -252,8 +276,9 @@ class Notifier(object):
         else:
             current_token = user_stream.current_token
 
+        listener = [_NotificationListener(deferred)]
+
         if timeout and not current_token.is_after(from_token):
-            listener = [_NotificationListener(deferred)]
             user_stream.listeners.add(listener[0])
 
         if current_token.is_after(from_token):
@@ -334,7 +359,7 @@ class Notifier(object):
         self.user_to_user_stream[user_stream.user] = user_stream
 
         for room in user_stream.rooms:
-            s = self.room_to_user_stream.setdefault(room, set())
+            s = self.room_to_user_streams.setdefault(room, set())
             s.add(user_stream)
 
         if user_stream.appservice:
@@ -343,10 +368,12 @@ class Notifier(object):
             ).add(user_stream)
 
     def _user_joined_room(self, user, room_id):
+        user = str(user)
         new_user_stream = self.user_to_user_stream.get(user)
-        room_streams = self.room_to_user_streams.setdefault(room_id, set())
-        room_streams.add(new_user_stream)
-        new_user_stream.rooms.add(room_id)
+        if new_user_stream is not None:
+            room_streams = self.room_to_user_streams.setdefault(room_id, set())
+            room_streams.add(new_user_stream)
+            new_user_stream.rooms.add(room_id)
 
 
 def _discard_if_notified(listener_set):