diff options
author | Erik Johnston <erikj@jki.re> | 2017-03-14 16:58:45 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-03-14 16:58:45 +0000 |
commit | bad72b0b8e4639bca481dda3e2c3853a3c88ccec (patch) | |
tree | 3d93130a152ef973307544ae3971ac032c63195c /synapse/notifier.py | |
parent | Merge pull request #1989 from matrix-org/erikj/public_list_speed (diff) | |
parent | Reduce spurious calls to generate sync (diff) | |
download | synapse-bad72b0b8e4639bca481dda3e2c3853a3c88ccec.tar.xz |
Merge pull request #2002 from matrix-org/erikj/dont_sync_by_default
Reduce number of spurious sync result generations.
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 44 |
1 files changed, 31 insertions, 13 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 8051a7a842..6abb33bb3f 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -73,6 +73,13 @@ class _NotifierUserStream(object): self.user_id = user_id self.rooms = set(rooms) self.current_token = current_token + + # The last token for which we should wake up any streams that have a + # token that comes before it. This gets updated everytime we get poked. + # We start it at the current token since if we get any streams + # that have a token from before we have no idea whether they should be + # woken up or not, so lets just wake them up. + self.last_notified_token = current_token self.last_notified_ms = time_now_ms with PreserveLoggingContext(): @@ -89,6 +96,7 @@ class _NotifierUserStream(object): self.current_token = self.current_token.copy_and_advance( stream_key, stream_id ) + self.last_notified_token = self.current_token self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred @@ -113,8 +121,14 @@ class _NotifierUserStream(object): def new_listener(self, token): """Returns a deferred that is resolved when there is a new token greater than the given token. + + Args: + token: The token from which we are streaming from, i.e. we shouldn't + notify for things that happened before this. """ - if self.current_token.is_after(token): + # Immediately wake up stream if something has already since happened + # since their last token. + if self.last_notified_token.is_after(token): return _NotificationListener(defer.succeed(self.current_token)) else: return _NotificationListener(self.notify_deferred.observe()) @@ -294,40 +308,44 @@ class Notifier(object): self._register_with_keys(user_stream) result = None + prev_token = from_token if timeout: end_time = self.clock.time_msec() + timeout - prev_token = from_token while not result: try: - current_token = user_stream.current_token - - result = yield callback(prev_token, current_token) - if result: - break - now = self.clock.time_msec() if end_time <= now: break # Now we wait for the _NotifierUserStream to be told there # is a new token. - # We need to supply the token we supplied to callback so - # that we don't miss any current_token updates. - prev_token = current_token listener = user_stream.new_listener(prev_token) with PreserveLoggingContext(): yield self.clock.time_bound_deferred( listener.deferred, time_out=(end_time - now) / 1000. ) + + current_token = user_stream.current_token + + result = yield callback(prev_token, current_token) + if result: + break + + # Update the prev_token to the current_token since nothing + # has happened between the old prev_token and the current_token + prev_token = current_token except DeferredTimedOutError: break except defer.CancelledError: break - else: + + if result is None: + # This happened if there was no timeout or if the timeout had + # already expired. current_token = user_stream.current_token - result = yield callback(from_token, current_token) + result = yield callback(prev_token, current_token) defer.returnValue(result) |