diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 50 |
1 files changed, 37 insertions, 13 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 8051a7a842..2657dcd8dc 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -37,6 +37,10 @@ metrics = synapse.metrics.get_metrics_for(__name__) notified_events_counter = metrics.register_counter("notified_events") +users_woken_by_stream_counter = metrics.register_counter( + "users_woken_by_stream", labels=["stream"] +) + # TODO(paul): Should be shared somewhere def count(func, l): @@ -73,6 +77,13 @@ class _NotifierUserStream(object): self.user_id = user_id self.rooms = set(rooms) self.current_token = current_token + + # The last token for which we should wake up any streams that have a + # token that comes before it. This gets updated everytime we get poked. + # We start it at the current token since if we get any streams + # that have a token from before we have no idea whether they should be + # woken up or not, so lets just wake them up. + self.last_notified_token = current_token self.last_notified_ms = time_now_ms with PreserveLoggingContext(): @@ -89,9 +100,12 @@ class _NotifierUserStream(object): self.current_token = self.current_token.copy_and_advance( stream_key, stream_id ) + self.last_notified_token = self.current_token self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred + users_woken_by_stream_counter.inc(stream_key) + with PreserveLoggingContext(): self.notify_deferred = ObservableDeferred(defer.Deferred()) noify_deferred.callback(self.current_token) @@ -113,8 +127,14 @@ class _NotifierUserStream(object): def new_listener(self, token): """Returns a deferred that is resolved when there is a new token greater than the given token. + + Args: + token: The token from which we are streaming from, i.e. we shouldn't + notify for things that happened before this. """ - if self.current_token.is_after(token): + # Immediately wake up stream if something has already since happened + # since their last token. + if self.last_notified_token.is_after(token): return _NotificationListener(defer.succeed(self.current_token)) else: return _NotificationListener(self.notify_deferred.observe()) @@ -294,40 +314,44 @@ class Notifier(object): self._register_with_keys(user_stream) result = None + prev_token = from_token if timeout: end_time = self.clock.time_msec() + timeout - prev_token = from_token while not result: try: - current_token = user_stream.current_token - - result = yield callback(prev_token, current_token) - if result: - break - now = self.clock.time_msec() if end_time <= now: break # Now we wait for the _NotifierUserStream to be told there # is a new token. - # We need to supply the token we supplied to callback so - # that we don't miss any current_token updates. - prev_token = current_token listener = user_stream.new_listener(prev_token) with PreserveLoggingContext(): yield self.clock.time_bound_deferred( listener.deferred, time_out=(end_time - now) / 1000. ) + + current_token = user_stream.current_token + + result = yield callback(prev_token, current_token) + if result: + break + + # Update the prev_token to the current_token since nothing + # has happened between the old prev_token and the current_token + prev_token = current_token except DeferredTimedOutError: break except defer.CancelledError: break - else: + + if result is None: + # This happened if there was no timeout or if the timeout had + # already expired. current_token = user_stream.current_token - result = yield callback(from_token, current_token) + result = yield callback(prev_token, current_token) defer.returnValue(result) |