From 050ebccf309916c57c58b67776e66d98fffbff0f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jun 2015 11:36:26 +0100 Subject: Fix notifier leak --- synapse/notifier.py | 41 ++++++++++++++++++++--------------------- 1 file changed, 20 insertions(+), 21 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 078abfc56d..27c034ed51 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -308,49 +308,48 @@ class Notifier(object): else: current_token = user_stream.current_token - listener = [_NotificationListener(deferred)] - - if timeout and not current_token.is_after(from_token): - user_stream.listeners.add(listener[0]) - + result = None if current_token.is_after(from_token): result = yield callback(from_token, current_token) - else: - result = None - - timer = [None] if result: - user_stream.listeners.discard(listener[0]) defer.returnValue(result) - return if timeout: + timer = [None] + listeners = [] timed_out = [False] + def notify_listeners(): + user_stream.listeners.difference_update(listeners) + for listener in listeners: + listener.notify(current_token) + del listeners[:] + def _timeout_listener(): timed_out[0] = True timer[0] = None - user_stream.listeners.discard(listener[0]) - listener[0].notify(current_token) + notify_listeners() # We create multiple notification listeners so we have to manage # canceling the timeout ourselves. timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) while not result and not timed_out[0]: - new_token = yield deferred deferred = defer.Deferred() - listener[0] = _NotificationListener(deferred) - user_stream.listeners.add(listener[0]) + notify_listeners() + listeners.append(_NotificationListener(deferred)) + user_stream.listeners.update(listeners) + new_token = yield deferred + result = yield callback(current_token, new_token) current_token = new_token - if timer[0] is not None: - try: - self.clock.cancel_call_later(timer[0]) - except: - logger.exception("Failed to cancel notifer timer") + if timer[0] is not None: + try: + self.clock.cancel_call_later(timer[0]) + except: + logger.exception("Failed to cancel notifer timer") defer.returnValue(result) -- cgit 1.4.1 From 22049ea700173017cf2f8e88fb8848e06b82f9b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jun 2015 15:49:05 +0100 Subject: Refactor the notifier.wait_for_events code to be clearer. Add _NotifierUserStream.new_listener that accpets a token to avoid races. --- synapse/notifier.py | 122 ++++++++++++++++++++++------------------------- synapse/util/__init__.py | 8 +++- synapse/util/async.py | 13 ++++- 3 files changed, 73 insertions(+), 70 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 27c034ed51..e441561029 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -16,7 +16,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function -from synapse.util.async import run_on_reactor +from synapse.util.async import run_on_reactor, ObservableDeferred from synapse.types import StreamToken import synapse.metrics @@ -45,20 +45,16 @@ class _NotificationListener(object): The events stream handler will have yielded to the deferred, so to notify the handler it is sufficient to resolve the deferred. """ + __slots__ = ["deferred"] def __init__(self, deferred): - self.deferred = deferred + object.__setattr__(self, "deferred", deferred) - def notified(self): - return self.deferred.called + def __getattr__(self, name): + return getattr(self.deferred, name) - def notify(self, token): - """ Inform whoever is listening about the new events. - """ - try: - self.deferred.callback(token) - except defer.AlreadyCalledError: - pass + def __setattr__(self, name, value): + setattr(self.deferred, name, value) class _NotifierUserStream(object): @@ -75,11 +71,12 @@ class _NotifierUserStream(object): appservice=None): self.user = str(user) self.appservice = appservice - self.listeners = set() self.rooms = set(rooms) self.current_token = current_token self.last_notified_ms = time_now_ms + self.notify_deferred = ObservableDeferred(defer.Deferred()) + def notify(self, stream_key, stream_id, time_now_ms): """Notify any listeners for this user of a new event from an event source. @@ -91,12 +88,10 @@ class _NotifierUserStream(object): self.current_token = self.current_token.copy_and_advance( stream_key, stream_id ) - if self.listeners: - self.last_notified_ms = time_now_ms - listeners = self.listeners - self.listeners = set() - for listener in listeners: - listener.notify(self.current_token) + self.last_notified_ms = time_now_ms + noify_deferred = self.notify_deferred + self.notify_deferred = ObservableDeferred(defer.Deferred()) + noify_deferred.callback(self.current_token) def remove(self, notifier): """ Remove this listener from all the indexes in the Notifier @@ -114,6 +109,18 @@ class _NotifierUserStream(object): self.appservice, set() ).discard(self) + def count_listeners(self): + return len(self.noify_deferred.observers()) + + def new_listener(self, token): + """Returns a deferred that is resolved when there is a new token + greater than the given token. + """ + if self.current_token.is_after(token): + return _NotificationListener(defer.succeed(self.current_token)) + else: + return _NotificationListener(self.notify_deferred.observe()) + class Notifier(object): """ This class is responsible for notifying any listeners when there are @@ -158,7 +165,7 @@ class Notifier(object): for x in self.appservice_to_user_streams.values(): all_user_streams |= x - return sum(len(stream.listeners) for stream in all_user_streams) + return sum(stream.count_listeners() for stream in all_user_streams) metrics.register_callback("listeners", count_listeners) metrics.register_callback( @@ -286,10 +293,6 @@ class Notifier(object): """Wait until the callback returns a non empty response or the timeout fires. """ - - deferred = defer.Deferred() - time_now_ms = self.clock.time_msec() - user = str(user) user_stream = self.user_to_user_stream.get(user) if user_stream is None: @@ -302,54 +305,38 @@ class Notifier(object): rooms=rooms, appservice=appservice, current_token=current_token, - time_now_ms=time_now_ms, + time_now_ms=self.clock.time_msec(), ) self._register_with_keys(user_stream) - else: - current_token = user_stream.current_token result = None - if current_token.is_after(from_token): - result = yield callback(from_token, current_token) - - if result: - defer.returnValue(result) - if timeout: - timer = [None] - listeners = [] - timed_out = [False] - - def notify_listeners(): - user_stream.listeners.difference_update(listeners) - for listener in listeners: - listener.notify(current_token) - del listeners[:] - - def _timeout_listener(): - timed_out[0] = True - timer[0] = None - notify_listeners() - - # We create multiple notification listeners so we have to manage - # canceling the timeout ourselves. - timer[0] = self.clock.call_later(timeout/1000., _timeout_listener) - - while not result and not timed_out[0]: - deferred = defer.Deferred() - notify_listeners() - listeners.append(_NotificationListener(deferred)) - user_stream.listeners.update(listeners) - new_token = yield deferred - - result = yield callback(current_token, new_token) - current_token = new_token - - if timer[0] is not None: + listener = None + timer = self.clock.call_later( + timeout/1000., lambda: listener.cancel() + ) + + prev_token = from_token + while not result: try: - self.clock.cancel_call_later(timer[0]) - except: - logger.exception("Failed to cancel notifer timer") + # We need to start listening to the streams *before* doing + # the callback, as otherwise we may miss something. + current_token = user_stream.current_token + + result = yield callback(prev_token, current_token) + if result: + break + + prev_token = current_token + listener = user_stream.new_listener(prev_token) + yield listener.deferred + except defer.CancelledError: + break + + self.clock.cancel_call_later(timer, ignore_errs=True) + else: + current_token = user_stream.current_token + result = yield callback(from_token, current_token) defer.returnValue(result) @@ -367,6 +354,9 @@ class Notifier(object): @defer.inlineCallbacks def check_for_updates(before_token, after_token): + if not after_token.is_after(before_token): + defer.returnValue(None) + events = [] end_token = from_token for name, source in self.event_sources.sources.items(): @@ -401,7 +391,7 @@ class Notifier(object): expired_streams = [] expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS for stream in self.user_to_user_stream.values(): - if stream.listeners: + if stream.count_listeners(): continue if stream.last_notified_ms < expire_before_ts: expired_streams.append(stream) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 260714ccc2..07ff25cef3 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -91,8 +91,12 @@ class Clock(object): with PreserveLoggingContext(): return reactor.callLater(delay, wrapped_callback, *args, **kwargs) - def cancel_call_later(self, timer): - timer.cancel() + def cancel_call_later(self, timer, ignore_errs=False): + try: + timer.cancel() + except: + if not ignore_errs: + raise def time_bound_deferred(self, given_deferred, time_out): if given_deferred.called: diff --git a/synapse/util/async.py b/synapse/util/async.py index 1c2044e5b4..6f567bcaa6 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -45,7 +45,7 @@ class ObservableDeferred(object): def __init__(self, deferred, consumeErrors=False): object.__setattr__(self, "_deferred", deferred) object.__setattr__(self, "_result", None) - object.__setattr__(self, "_observers", []) + object.__setattr__(self, "_observers", set()) def callback(r): self._result = (True, r) @@ -74,12 +74,21 @@ class ObservableDeferred(object): def observe(self): if not self._result: d = defer.Deferred() - self._observers.append(d) + + def remove(r): + self._observers.discard(d) + return r + d.addBoth(remove) + + self._observers.add(d) return d else: success, res = self._result return defer.succeed(res) if success else defer.fail(res) + def observers(self): + return self._observers + def __getattr__(self, name): return getattr(self._deferred, name) -- cgit 1.4.1 From 1f24c2e5896c1c9e5c7fded779fdb0ddcf2a3801 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jun 2015 16:09:53 +0100 Subject: Don't bother proxying lookups on _NotificationListener to underlying deferred --- synapse/notifier.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index e441561029..053475a2f5 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -48,13 +48,7 @@ class _NotificationListener(object): __slots__ = ["deferred"] def __init__(self, deferred): - object.__setattr__(self, "deferred", deferred) - - def __getattr__(self, name): - return getattr(self.deferred, name) - - def __setattr__(self, name, value): - setattr(self.deferred, name, value) + self.deferred = deferred class _NotifierUserStream(object): @@ -313,14 +307,12 @@ class Notifier(object): if timeout: listener = None timer = self.clock.call_later( - timeout/1000., lambda: listener.cancel() + timeout/1000., lambda: listener.deferred.cancel() ) prev_token = from_token while not result: try: - # We need to start listening to the streams *before* doing - # the callback, as otherwise we may miss something. current_token = user_stream.current_token result = yield callback(prev_token, current_token) -- cgit 1.4.1 From 73513ececce51427971d49f0d55bfa76dafc391e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jun 2015 16:15:10 +0100 Subject: Documentation --- synapse/notifier.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 053475a2f5..5475ee36ca 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -305,10 +305,13 @@ class Notifier(object): result = None if timeout: - listener = None - timer = self.clock.call_later( - timeout/1000., lambda: listener.deferred.cancel() - ) + listener = None # Will be set to a _NotificationListener that + # we'll be waiting on. Allows us to cancel it. + + def timed_out(): + if listener: + listener.deferred.cancel() + timer = self.clock.call_later(timeout/1000., timed_out) prev_token = from_token while not result: @@ -319,6 +322,10 @@ class Notifier(object): if result: break + # Now we wait for the _NotifierUserStream to be told there + # is a new token. + # We need to supply the token we supplied to callback so + # that we don't miss any current_token updates. prev_token = current_token listener = user_stream.new_listener(prev_token) yield listener.deferred -- cgit 1.4.1 From 6f6ebd216d6a84ee72f1becfb31bf40ba960e3e2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Jun 2015 17:00:32 +0100 Subject: PEP8 --- synapse/notifier.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 5475ee36ca..d6655f3f5a 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -305,8 +305,9 @@ class Notifier(object): result = None if timeout: - listener = None # Will be set to a _NotificationListener that - # we'll be waiting on. Allows us to cancel it. + # Will be set to a _NotificationListener that we'll be waiting on. + # Allows us to cancel it. + listener = None def timed_out(): if listener: -- cgit 1.4.1 From a68abc79fd90465aed6ead3eec1a5704c64a1682 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 11:48:55 +0100 Subject: Add comment on cancellation of observers --- synapse/util/async.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/synapse/util/async.py b/synapse/util/async.py index 6f567bcaa6..5a1d545c96 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -38,6 +38,9 @@ class ObservableDeferred(object): deferred. If consumeErrors is true errors will be captured from the origin deferred. + + Cancelling or otherwise resolving an observer will not affect the original + ObservableDeferred. """ __slots__ = ["_deferred", "_observers", "_result"] -- cgit 1.4.1