diff options
author | Erik Johnston <erik@matrix.org> | 2015-04-08 14:11:07 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-04-08 14:11:07 +0100 |
commit | 3887350e47bc96aef094ee7eb66abe8cc6e30694 (patch) | |
tree | f48744618305d66665b8a3c666ccc94d55f99fd7 /synapse/notifier.py | |
parent | Retry transaction, not SQL query (diff) | |
parent | Merge pull request #117 from matrix-org/notifier-leak (diff) | |
download | synapse-3887350e47bc96aef094ee7eb66abe8cc6e30694.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into mysql
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 53 |
1 files changed, 46 insertions, 7 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 7121d659d0..12573f3f59 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -62,7 +62,8 @@ class _NotificationListener(object): self.rooms = rooms - self.pending_notifications = [] + def notified(self): + return self.deferred.called def notify(self, notifier, events, start_token, end_token): """ Inform whoever is listening about the new events. This will @@ -78,11 +79,15 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass + # Should the following be done be using intrusively linked lists? + # -- erikj + for room in self.rooms: lst = notifier.room_to_listeners.get(room, set()) lst.discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self) + if self.appservice: notifier.appservice_to_listeners.get( self.appservice, set() @@ -161,10 +166,18 @@ class Notifier(object): room_source = self.event_sources.sources["room"] - listeners = self.room_to_listeners.get(room_id, set()).copy() + room_listeners = self.room_to_listeners.get(room_id, set()) + + _discard_if_notified(room_listeners) + + listeners = room_listeners.copy() for user in extra_users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + _discard_if_notified(user_listeners) + + listeners |= user_listeners for appservice in self.appservice_to_listeners: # TODO (kegan): Redundant appservice listener checks? @@ -173,9 +186,13 @@ class Notifier(object): # receive *invites* for users they are interested in. Does this # make the room_to_listeners check somewhat obselete? if appservice.is_interested(event): - listeners |= self.appservice_to_listeners.get( + app_listeners = self.appservice_to_listeners.get( appservice, set() - ).copy() + ) + + _discard_if_notified(app_listeners) + + listeners |= app_listeners logger.debug("on_new_room_event listeners %s", listeners) @@ -226,10 +243,18 @@ class Notifier(object): listeners = set() for user in users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + _discard_if_notified(user_listeners) + + listeners |= user_listeners for room in rooms: - listeners |= self.room_to_listeners.get(room, set()).copy() + room_listeners = self.room_to_listeners.get(room, set()) + + _discard_if_notified(room_listeners) + + listeners |= room_listeners @defer.inlineCallbacks def notify(listener): @@ -427,3 +452,17 @@ class Notifier(object): listeners = self.room_to_listeners.setdefault(room_id, set()) listeners |= new_listeners + + for l in new_listeners: + l.rooms.add(room_id) + + +def _discard_if_notified(listener_set): + """Remove any 'stale' listeners from the given set. + """ + to_discard = set() + for l in listener_set: + if l.notified(): + to_discard.add(l) + + listener_set -= to_discard |