summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py48
1 files changed, 20 insertions, 28 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index bdd03dcbe8..f998fc83bf 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -221,16 +221,7 @@ class Notifier(object):
             event
         )
 
-        room_id = event.room_id
-
-        room_user_streams = self.room_to_user_streams.get(room_id, set())
-
-        user_streams = room_user_streams.copy()
-
-        for user in extra_users:
-            user_stream = self.user_to_user_stream.get(str(user))
-            if user_stream is not None:
-                user_streams.add(user_stream)
+        app_streams = set()
 
         for appservice in self.appservice_to_user_streams:
             # TODO (kegan): Redundant appservice listener checks?
@@ -242,24 +233,20 @@ class Notifier(object):
                 app_user_streams = self.appservice_to_user_streams.get(
                     appservice, set()
                 )
-                user_streams |= app_user_streams
+                app_streams |= app_user_streams
 
-        logger.debug("on_new_room_event listeners %s", user_streams)
-
-        time_now_ms = self.clock.time_msec()
-        for user_stream in user_streams:
-            try:
-                user_stream.notify(
-                    "room_key", "s%d" % (room_stream_id,), time_now_ms
-                )
-            except:
-                logger.exception("Failed to notify listener")
+        self.on_new_event(
+            "room_key", room_stream_id,
+            users=extra_users,
+            rooms=[event.room_id],
+            extra_streams=app_streams,
+        )
 
     @defer.inlineCallbacks
     @log_function
-    def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]):
-        """ Used to inform listeners that something has happend
-        presence/user event wise.
+    def on_new_event(self, stream_key, new_token, users=[], rooms=[],
+                     extra_streams=set()):
+        """ Used to inform listeners that something has happend event wise.
 
         Will wake up all listeners for the given users and rooms.
         """
@@ -283,7 +270,7 @@ class Notifier(object):
 
     @defer.inlineCallbacks
     def wait_for_events(self, user, rooms, timeout, callback,
-                        from_token=StreamToken("s0", "0", "0")):
+                        from_token=StreamToken("s0", "0", "0", "0")):
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
@@ -341,10 +328,13 @@ class Notifier(object):
         defer.returnValue(result)
 
     @defer.inlineCallbacks
-    def get_events_for(self, user, rooms, pagination_config, timeout):
+    def get_events_for(self, user, rooms, pagination_config, timeout,
+                       only_room_events=False):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any
         new events to happen before returning.
+
+        If `only_room_events` is `True` only room events will be returned.
         """
         from_token = pagination_config.from_token
         if not from_token:
@@ -365,10 +355,12 @@ class Notifier(object):
                 after_id = getattr(after_token, keyname)
                 if before_id == after_id:
                     continue
-                stuff, new_key = yield source.get_new_events_for_user(
+                if only_room_events and name != "room":
+                    continue
+                new_events, new_key = yield source.get_new_events_for_user(
                     user, getattr(from_token, keyname), limit,
                 )
-                events.extend(stuff)
+                events.extend(new_events)
                 end_token = end_token.copy_and_replace(keyname, new_key)
 
             if events: