diff options
author | Erik Johnston <erik@matrix.org> | 2016-03-30 12:36:40 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-03-30 12:36:40 +0100 |
commit | 5fbdf2bcec40bf2f24fc0698440ee384595ff027 (patch) | |
tree | de838c7f39544ba52cd94a429bb65d7222a4a7cb /synapse/notifier.py | |
parent | Merge pull request #672 from nikriek/new-author (diff) | |
parent | Bump version and changelog (diff) | |
download | synapse-5fbdf2bcec40bf2f24fc0698440ee384595ff027.tar.xz |
Merge branch 'release-v0.14.0' of github.com:matrix-org/synapse v0.14.0
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 56 |
1 files changed, 55 insertions, 1 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 560866b26e..f00cd8c588 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -159,6 +159,8 @@ class Notifier(object): self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS ) + self.replication_deferred = ObservableDeferred(defer.Deferred()) + # This is not a very cheap test to perform, but it's only executed # when rendering the metrics page, which is likely once per minute at # most when scraping it. @@ -207,6 +209,8 @@ class Notifier(object): )) self._notify_pending_new_room_events(max_room_stream_id) + self.notify_replication() + def _notify_pending_new_room_events(self, max_room_stream_id): """Notify for the room events that were queued waiting for a previous event to be persisted. @@ -276,9 +280,17 @@ class Notifier(object): except: logger.exception("Failed to notify listener") + self.notify_replication() + + def on_new_replication_data(self): + """Used to inform replication listeners that something has happend + without waking up any of the normal user event streams""" + with PreserveLoggingContext(): + self.notify_replication() + @defer.inlineCallbacks def wait_for_events(self, user_id, timeout, callback, room_ids=None, - from_token=StreamToken("s0", "0", "0", "0", "0")): + from_token=StreamToken.START): """Wait until the callback returns a non empty response or the timeout fires. """ @@ -479,3 +491,45 @@ class Notifier(object): room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams.add(new_user_stream) new_user_stream.rooms.add(room_id) + + def notify_replication(self): + """Notify the any replication listeners that there's a new event""" + with PreserveLoggingContext(): + deferred = self.replication_deferred + self.replication_deferred = ObservableDeferred(defer.Deferred()) + deferred.callback(None) + + @defer.inlineCallbacks + def wait_for_replication(self, callback, timeout): + """Wait for an event to happen. + + :param callback: + Gets called whenever an event happens. If this returns a truthy + value then ``wait_for_replication`` returns, otherwise it waits + for another event. + :param int timeout: + How many milliseconds to wait for callback return a truthy value. + :returns: + A deferred that resolves with the value returned by the callback. + """ + listener = _NotificationListener(None) + + def timed_out(): + listener.deferred.cancel() + + timer = self.clock.call_later(timeout / 1000., timed_out) + while True: + listener.deferred = self.replication_deferred.observe() + result = yield callback() + if result: + break + + try: + with PreserveLoggingContext(): + yield listener.deferred + except defer.CancelledError: + break + + self.clock.cancel_call_later(timer, ignore_errs=True) + + defer.returnValue(result) |