From 07d404170934db8bc3aa3ae8ac89ceb25cd2e9a1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 13:27:36 +0100 Subject: Fix bug where we didn't inform the NotificataionListeners about new rooms they have been subscribed to. This meant that the listeners didn't clean themselves up fully from all the dicts --- synapse/notifier.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 7121d659d0..b12b54353e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -427,3 +427,6 @@ class Notifier(object): listeners = self.room_to_listeners.setdefault(room_id, set()) listeners |= new_listeners + + for l in new_listeners: + l.rooms.add(room_id) -- cgit 1.4.1 From 65f5e4e3e43ee471ee0a8c6989bbf60cb3be2c95 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 13:31:06 +0100 Subject: Add paranoia checks to make sure that we evict stale NotificationListeners when we are about to process them --- synapse/notifier.py | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index b12b54353e..ce9b0d2187 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -62,7 +62,8 @@ class _NotificationListener(object): self.rooms = rooms - self.pending_notifications = [] + def notified(self): + return self.deferred.called def notify(self, notifier, events, start_token, end_token): """ Inform whoever is listening about the new events. This will @@ -78,11 +79,15 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass + # Should the following be done be using intrusively linked lists? + # -- erikj + for room in self.rooms: lst = notifier.room_to_listeners.get(room, set()) lst.discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self) + if self.appservice: notifier.appservice_to_listeners.get( self.appservice, set() @@ -161,10 +166,24 @@ class Notifier(object): room_source = self.event_sources.sources["room"] - listeners = self.room_to_listeners.get(room_id, set()).copy() + room_listeners = self.room_to_listeners.get(room_id, set()) + + # Remove any 'stale' listeners. + for l in room_listeners.copy(): + if l.notified(): + room_listeners.discard(l) + + listeners = room_listeners.copy() for user in extra_users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + # Remove any 'stale' listeners. + for l in user_listeners.copy(): + if l.notified(): + user_listeners.discard(l) + + listeners |= user_listeners for appservice in self.appservice_to_listeners: # TODO (kegan): Redundant appservice listener checks? @@ -173,9 +192,16 @@ class Notifier(object): # receive *invites* for users they are interested in. Does this # make the room_to_listeners check somewhat obselete? if appservice.is_interested(event): - listeners |= self.appservice_to_listeners.get( + app_listeners = self.appservice_to_listeners.get( appservice, set() - ).copy() + ) + + # Remove any 'stale' listeners. + for l in app_listeners.copy(): + if l.notified(): + app_listeners.discard(l) + + listeners |= app_listeners logger.debug("on_new_room_event listeners %s", listeners) -- cgit 1.4.1 From 830d07db8278d773338fee94eb269eafd6b1b7fc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 13:40:20 +0100 Subject: Also perform paranoia checks in 'on_new_user_event' --- synapse/notifier.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index ce9b0d2187..be78082021 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -252,10 +252,24 @@ class Notifier(object): listeners = set() for user in users: - listeners |= self.user_to_listeners.get(user, set()).copy() + user_listeners = self.user_to_listeners.get(user, set()) + + # Remove any 'stale' listeners. + for l in user_listeners.copy(): + if l.notified(): + user_listeners.discard(l) + + listeners |= user_listeners for room in rooms: - listeners |= self.room_to_listeners.get(room, set()).copy() + room_listeners = self.room_to_listeners.get(room, set()) + + # Remove any 'stale' listeners. + for l in room_listeners.copy(): + if l.notified(): + room_listeners.discard(l) + + listeners |= room_listeners @defer.inlineCallbacks def notify(listener): -- cgit 1.4.1 From 638be5a6b971bf961ee030d96245f296eb83e612 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 13:58:32 +0100 Subject: Factor out loops into '_discard_if_notified' --- synapse/notifier.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index be78082021..754569ebd2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -169,9 +169,7 @@ class Notifier(object): room_listeners = self.room_to_listeners.get(room_id, set()) # Remove any 'stale' listeners. - for l in room_listeners.copy(): - if l.notified(): - room_listeners.discard(l) + _discard_if_notified(room_listeners) listeners = room_listeners.copy() @@ -179,9 +177,7 @@ class Notifier(object): user_listeners = self.user_to_listeners.get(user, set()) # Remove any 'stale' listeners. - for l in user_listeners.copy(): - if l.notified(): - user_listeners.discard(l) + _discard_if_notified(user_listeners) listeners |= user_listeners @@ -197,9 +193,7 @@ class Notifier(object): ) # Remove any 'stale' listeners. - for l in app_listeners.copy(): - if l.notified(): - app_listeners.discard(l) + _discard_if_notified(app_listeners) listeners |= app_listeners @@ -255,9 +249,7 @@ class Notifier(object): user_listeners = self.user_to_listeners.get(user, set()) # Remove any 'stale' listeners. - for l in user_listeners.copy(): - if l.notified(): - user_listeners.discard(l) + _discard_if_notified(user_listeners) listeners |= user_listeners @@ -265,9 +257,7 @@ class Notifier(object): room_listeners = self.room_to_listeners.get(room, set()) # Remove any 'stale' listeners. - for l in room_listeners.copy(): - if l.notified(): - room_listeners.discard(l) + _discard_if_notified(room_listeners) listeners |= room_listeners @@ -470,3 +460,12 @@ class Notifier(object): for l in new_listeners: l.rooms.add(room_id) + + +def _discard_if_notified(listener_set): + to_discard = set() + for l in listener_set: + if l.notified(): + to_discard.add(l) + + listener_set -= to_discard -- cgit 1.4.1 From 5bc41fe9f8d40ecf4070c7ffb8df635dcccb4efe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 8 Apr 2015 14:01:22 +0100 Subject: Move comment into docstring --- synapse/notifier.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 754569ebd2..12573f3f59 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -168,7 +168,6 @@ class Notifier(object): room_listeners = self.room_to_listeners.get(room_id, set()) - # Remove any 'stale' listeners. _discard_if_notified(room_listeners) listeners = room_listeners.copy() @@ -176,7 +175,6 @@ class Notifier(object): for user in extra_users: user_listeners = self.user_to_listeners.get(user, set()) - # Remove any 'stale' listeners. _discard_if_notified(user_listeners) listeners |= user_listeners @@ -192,7 +190,6 @@ class Notifier(object): appservice, set() ) - # Remove any 'stale' listeners. _discard_if_notified(app_listeners) listeners |= app_listeners @@ -248,7 +245,6 @@ class Notifier(object): for user in users: user_listeners = self.user_to_listeners.get(user, set()) - # Remove any 'stale' listeners. _discard_if_notified(user_listeners) listeners |= user_listeners @@ -256,7 +252,6 @@ class Notifier(object): for room in rooms: room_listeners = self.room_to_listeners.get(room, set()) - # Remove any 'stale' listeners. _discard_if_notified(room_listeners) listeners |= room_listeners @@ -463,6 +458,8 @@ class Notifier(object): def _discard_if_notified(listener_set): + """Remove any 'stale' listeners from the given set. + """ to_discard = set() for l in listener_set: if l.notified(): -- cgit 1.4.1 From 6f9dea7483ed01d17522857c5b103971a0050d8f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 9 Apr 2015 11:07:20 +0100 Subject: SYN-339: Cancel the notifier timeout when the notifier fires --- synapse/notifier.py | 30 ++++++++++++++++++++++++++---- 1 file changed, 26 insertions(+), 4 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 12573f3f59..0fa77d28ca 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -59,8 +59,8 @@ class _NotificationListener(object): self.limit = limit self.timeout = timeout self.deferred = deferred - self.rooms = rooms + self.timer = None def notified(self): return self.deferred.called @@ -93,6 +93,13 @@ class _NotificationListener(object): self.appservice, set() ).discard(self) + # Cancel the timeout for this notifer if one exists. + if self.timer is not None: + try: + notifier.clock.cancel_call_later(self.timer) + except: + logger.exception("Failed to cancel notifier timer") + class Notifier(object): """ This class is responsible for notifying any listeners when there are @@ -325,14 +332,20 @@ class Notifier(object): self._register_with_keys(listener[0]) result = yield callback() + timer = [None] + if timeout: timed_out = [False] def _timeout_listener(): timed_out[0] = True + timer[0] = None listener[0].notify(self, [], from_token, from_token) - self.clock.call_later(timeout/1000., _timeout_listener) + # We create multiple notification listeners so we have to manage + # canceling the timeout ourselves. + timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) + while not result and not timed_out[0]: yield deferred deferred = defer.Deferred() @@ -347,6 +360,12 @@ class Notifier(object): self._register_with_keys(listener[0]) result = yield callback() + if timer[0] is not None: + try: + self.clock.cancel_call_later(timer[0]) + except: + logger.exception("Failed to cancel notifer timer") + defer.returnValue(result) def get_events_for(self, user, rooms, pagination_config, timeout): @@ -400,8 +419,11 @@ class Notifier(object): if not timeout: _timeout_listener() else: - self.clock.call_later(timeout/1000.0, _timeout_listener) - + # Only add the timer if the listener hasn't been notified + if not listener.notified(): + listener.timer = self.clock.call_later( + timeout/1000.0, _timeout_listener + ) return @log_function -- cgit 1.4.1 From 23d285ad57ca76e8ff2d33f1f6e476930689d9a7 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 9 Apr 2015 11:41:50 +0100 Subject: Unset the timer in the timeout callback so that we don't try to cancel it if it has been called --- synapse/notifier.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 0fa77d28ca..e6f37c3736 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -404,6 +404,7 @@ class Notifier(object): def _timeout_listener(): # TODO (erikj): We should probably set to_token to the current # max rather than reusing from_token. + listener.timer = None listener.notify( self, [], -- cgit 1.4.1 From 1280a47fc671b718239e06030d469d99aa5ea513 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 9 Apr 2015 11:42:21 +0100 Subject: Add comment --- synapse/notifier.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index e6f37c3736..d750a6fcf7 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -404,6 +404,7 @@ class Notifier(object): def _timeout_listener(): # TODO (erikj): We should probably set to_token to the current # max rather than reusing from_token. + # Remove the timer from the listener so we don't try to cancel it. listener.timer = None listener.notify( self, -- cgit 1.4.1 From e19f794fee8fe32215a3dcd49ac37ed7b17a4608 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 15 Apr 2015 15:12:57 +0100 Subject: Change from exception to warn --- synapse/notifier.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index d750a6fcf7..ea854482b5 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -98,7 +98,7 @@ class _NotificationListener(object): try: notifier.clock.cancel_call_later(self.timer) except: - logger.exception("Failed to cancel notifier timer") + logger.warn("Failed to cancel notifier timer") class Notifier(object): -- cgit 1.4.1 From 42c12c04f6aa30586a8f9779886374ab7bece2d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 1 May 2015 14:37:33 +0100 Subject: Remove some run_on_reactors --- synapse/api/auth.py | 3 --- synapse/handlers/federation.py | 2 -- synapse/notifier.py | 5 ----- 3 files changed, 10 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 0933521a1a..d0938375ec 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -20,7 +20,6 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import AuthError, Codes, SynapseError from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor from synapse.types import UserID, ClientInfo import logging @@ -427,8 +426,6 @@ class Auth(object): @defer.inlineCallbacks def add_auth_events(self, builder, context): - yield run_on_reactor() - auth_ids = self.compute_auth_events(builder, context.current_state) auth_events_entries = yield self.store.add_event_hashes( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 98148c13d7..6761121aa3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -89,8 +89,6 @@ class FederationHandler(BaseHandler): processing. """ - yield run_on_reactor() - self.replication_layer.send_pdu(event, destinations) @log_function diff --git a/synapse/notifier.py b/synapse/notifier.py index ea854482b5..78eb28e4b2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,7 +17,6 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext -from synapse.util.async import run_on_reactor from synapse.types import StreamToken import synapse.metrics @@ -162,8 +161,6 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ - yield run_on_reactor() - # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event @@ -240,8 +237,6 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ - yield run_on_reactor() - # TODO(paul): This is horrible, having to manually list every event # source here individually presence_source = self.event_sources.sources["presence"] -- cgit 1.4.1