summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/notifier.py53
-rw-r--r--synapse/storage/_base.py3
2 files changed, 48 insertions, 8 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 7121d659d0..12573f3f59 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -62,7 +62,8 @@ class _NotificationListener(object):
 
         self.rooms = rooms
 
-        self.pending_notifications = []
+    def notified(self):
+        return self.deferred.called
 
     def notify(self, notifier, events, start_token, end_token):
         """ Inform whoever is listening about the new events. This will
@@ -78,11 +79,15 @@ class _NotificationListener(object):
         except defer.AlreadyCalledError:
             pass
 
+        # Should the following be done be using intrusively linked lists?
+        # -- erikj
+
         for room in self.rooms:
             lst = notifier.room_to_listeners.get(room, set())
             lst.discard(self)
 
         notifier.user_to_listeners.get(self.user, set()).discard(self)
+
         if self.appservice:
             notifier.appservice_to_listeners.get(
                 self.appservice, set()
@@ -161,10 +166,18 @@ class Notifier(object):
 
         room_source = self.event_sources.sources["room"]
 
-        listeners = self.room_to_listeners.get(room_id, set()).copy()
+        room_listeners = self.room_to_listeners.get(room_id, set())
+
+        _discard_if_notified(room_listeners)
+
+        listeners = room_listeners.copy()
 
         for user in extra_users:
-            listeners |= self.user_to_listeners.get(user, set()).copy()
+            user_listeners = self.user_to_listeners.get(user, set())
+
+            _discard_if_notified(user_listeners)
+
+            listeners |= user_listeners
 
         for appservice in self.appservice_to_listeners:
             # TODO (kegan): Redundant appservice listener checks?
@@ -173,9 +186,13 @@ class Notifier(object):
             # receive *invites* for users they are interested in. Does this
             # make the room_to_listeners check somewhat obselete?
             if appservice.is_interested(event):
-                listeners |= self.appservice_to_listeners.get(
+                app_listeners = self.appservice_to_listeners.get(
                     appservice, set()
-                ).copy()
+                )
+
+                _discard_if_notified(app_listeners)
+
+                listeners |= app_listeners
 
         logger.debug("on_new_room_event listeners %s", listeners)
 
@@ -226,10 +243,18 @@ class Notifier(object):
         listeners = set()
 
         for user in users:
-            listeners |= self.user_to_listeners.get(user, set()).copy()
+            user_listeners = self.user_to_listeners.get(user, set())
+
+            _discard_if_notified(user_listeners)
+
+            listeners |= user_listeners
 
         for room in rooms:
-            listeners |= self.room_to_listeners.get(room, set()).copy()
+            room_listeners = self.room_to_listeners.get(room, set())
+
+            _discard_if_notified(room_listeners)
+
+            listeners |= room_listeners
 
         @defer.inlineCallbacks
         def notify(listener):
@@ -427,3 +452,17 @@ class Notifier(object):
 
         listeners = self.room_to_listeners.setdefault(room_id, set())
         listeners |= new_listeners
+
+        for l in new_listeners:
+            l.rooms.add(room_id)
+
+
+def _discard_if_notified(listener_set):
+    """Remove any 'stale' listeners from the given set.
+    """
+    to_discard = set()
+    for l in listener_set:
+        if l.notified():
+            to_discard.add(l)
+
+    listener_set -= to_discard
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index fee713eb26..23289bbdd4 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -36,6 +36,7 @@ logger = logging.getLogger(__name__)
 
 sql_logger = logging.getLogger("synapse.storage.SQL")
 transaction_logger = logging.getLogger("synapse.storage.txn")
+perf_logger = logging.getLogger("synapse.storage.TIME")
 
 
 metrics = synapse.metrics.get_metrics_for("synapse.storage")
@@ -323,7 +324,7 @@ class SQLBaseStore(object):
                 time_now - time_then, limit=3
             )
 
-            logger.info(
+            perf_logger.info(
                 "Total database time: %.3f%% {%s} {%s}",
                 ratio * 100, top_three_counters, top_3_event_counters
             )