summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py80
1 files changed, 25 insertions, 55 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 30883a0696..b86648f5e4 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -19,7 +19,8 @@ from synapse.api.errors import AuthError
 
 from synapse.util.logutils import log_function
 from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.metrics import Measure
 from synapse.types import StreamToken
 from synapse.visibility import filter_events_for_client
 import synapse.metrics
@@ -67,10 +68,8 @@ class _NotifierUserStream(object):
     so that it can remove itself from the indexes in the Notifier class.
     """
 
-    def __init__(self, user_id, rooms, current_token, time_now_ms,
-                 appservice=None):
+    def __init__(self, user_id, rooms, current_token, time_now_ms):
         self.user_id = user_id
-        self.appservice = appservice
         self.rooms = set(rooms)
         self.current_token = current_token
         self.last_notified_ms = time_now_ms
@@ -107,11 +106,6 @@ class _NotifierUserStream(object):
 
         notifier.user_to_user_stream.pop(self.user_id)
 
-        if self.appservice:
-            notifier.appservice_to_user_streams.get(
-                self.appservice, set()
-            ).discard(self)
-
     def count_listeners(self):
         return len(self.notify_deferred.observers())
 
@@ -142,7 +136,6 @@ class Notifier(object):
     def __init__(self, hs):
         self.user_to_user_stream = {}
         self.room_to_user_streams = {}
-        self.appservice_to_user_streams = {}
 
         self.event_sources = hs.get_event_sources()
         self.store = hs.get_datastore()
@@ -168,8 +161,6 @@ class Notifier(object):
                 all_user_streams |= x
             for x in self.user_to_user_stream.values():
                 all_user_streams.add(x)
-            for x in self.appservice_to_user_streams.values():
-                all_user_streams |= x
 
             return sum(stream.count_listeners() for stream in all_user_streams)
         metrics.register_callback("listeners", count_listeners)
@@ -182,11 +173,8 @@ class Notifier(object):
             "users",
             lambda: len(self.user_to_user_stream),
         )
-        metrics.register_callback(
-            "appservices",
-            lambda: count(bool, self.appservice_to_user_streams.values()),
-        )
 
+    @preserve_fn
     def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
                           extra_users=[]):
         """ Used by handlers to inform the notifier something has happened
@@ -208,6 +196,7 @@ class Notifier(object):
 
             self.notify_replication()
 
+    @preserve_fn
     def _notify_pending_new_room_events(self, max_room_stream_id):
         """Notify for the room events that were queued waiting for a previous
         event to be persisted.
@@ -225,24 +214,11 @@ class Notifier(object):
             else:
                 self._on_new_room_event(event, room_stream_id, extra_users)
 
+    @preserve_fn
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
-        self.appservice_handler.notify_interested_services(event)
-
-        app_streams = set()
-
-        for appservice in self.appservice_to_user_streams:
-            # TODO (kegan): Redundant appservice listener checks?
-            # App services will already be in the room_to_user_streams set, but
-            # that isn't enough. They need to be checked here in order to
-            # receive *invites* for users they are interested in. Does this
-            # make the room_to_user_streams check somewhat obselete?
-            if appservice.is_interested(event):
-                app_user_streams = self.appservice_to_user_streams.get(
-                    appservice, set()
-                )
-                app_streams |= app_user_streams
+        self.appservice_handler.notify_interested_services(room_stream_id)
 
         if event.type == EventTypes.Member and event.membership == Membership.JOIN:
             self._user_joined_room(event.state_key, event.room_id)
@@ -251,35 +227,36 @@ class Notifier(object):
             "room_key", room_stream_id,
             users=extra_users,
             rooms=[event.room_id],
-            extra_streams=app_streams,
         )
 
-    def on_new_event(self, stream_key, new_token, users=[], rooms=[],
-                     extra_streams=set()):
+    @preserve_fn
+    def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
         """ Used to inform listeners that something has happend event wise.
 
         Will wake up all listeners for the given users and rooms.
         """
         with PreserveLoggingContext():
-            user_streams = set()
+            with Measure(self.clock, "on_new_event"):
+                user_streams = set()
 
-            for user in users:
-                user_stream = self.user_to_user_stream.get(str(user))
-                if user_stream is not None:
-                    user_streams.add(user_stream)
+                for user in users:
+                    user_stream = self.user_to_user_stream.get(str(user))
+                    if user_stream is not None:
+                        user_streams.add(user_stream)
 
-            for room in rooms:
-                user_streams |= self.room_to_user_streams.get(room, set())
+                for room in rooms:
+                    user_streams |= self.room_to_user_streams.get(room, set())
 
-            time_now_ms = self.clock.time_msec()
-            for user_stream in user_streams:
-                try:
-                    user_stream.notify(stream_key, new_token, time_now_ms)
-                except:
-                    logger.exception("Failed to notify listener")
+                time_now_ms = self.clock.time_msec()
+                for user_stream in user_streams:
+                    try:
+                        user_stream.notify(stream_key, new_token, time_now_ms)
+                    except:
+                        logger.exception("Failed to notify listener")
 
-            self.notify_replication()
+                self.notify_replication()
 
+    @preserve_fn
     def on_new_replication_data(self):
         """Used to inform replication listeners that something has happend
         without waking up any of the normal user event streams"""
@@ -294,7 +271,6 @@ class Notifier(object):
         """
         user_stream = self.user_to_user_stream.get(user_id)
         if user_stream is None:
-            appservice = yield self.store.get_app_service_by_user_id(user_id)
             current_token = yield self.event_sources.get_current_token()
             if room_ids is None:
                 rooms = yield self.store.get_rooms_for_user(user_id)
@@ -302,7 +278,6 @@ class Notifier(object):
             user_stream = _NotifierUserStream(
                 user_id=user_id,
                 rooms=room_ids,
-                appservice=appservice,
                 current_token=current_token,
                 time_now_ms=self.clock.time_msec(),
             )
@@ -477,11 +452,6 @@ class Notifier(object):
             s = self.room_to_user_streams.setdefault(room, set())
             s.add(user_stream)
 
-        if user_stream.appservice:
-            self.appservice_to_user_stream.setdefault(
-                user_stream.appservice, set()
-            ).add(user_stream)
-
     def _user_joined_room(self, user_id, room_id):
         new_user_stream = self.user_to_user_stream.get(user_id)
         if new_user_stream is not None: