diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 102 |
1 files changed, 93 insertions, 9 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 3aec1d4af2..df13e8ddb6 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.util.async import run_on_reactor +from synapse.types import StreamToken import logging @@ -35,8 +36,10 @@ class _NotificationListener(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, from_token, limit, timeout, deferred): + def __init__(self, user, rooms, from_token, limit, timeout, deferred, + appservice=None): self.user = user + self.appservice = appservice self.from_token = from_token self.limit = limit self.timeout = timeout @@ -60,10 +63,14 @@ class _NotificationListener(object): pass for room in self.rooms: - lst = notifier.rooms_to_listeners.get(room, set()) + lst = notifier.room_to_listeners.get(room, set()) lst.discard(self) notifier.user_to_listeners.get(self.user, set()).discard(self) + if self.appservice: + notifier.appservice_to_listeners.get( + self.appservice, set() + ).discard(self) class Notifier(object): @@ -76,8 +83,9 @@ class Notifier(object): def __init__(self, hs): self.hs = hs - self.rooms_to_listeners = {} + self.room_to_listeners = {} self.user_to_listeners = {} + self.appservice_to_listeners = {} self.event_sources = hs.get_event_sources() @@ -98,15 +106,32 @@ class Notifier(object): `extra_users` param. """ yield run_on_reactor() + + # poke any interested application service. + self.hs.get_handlers().appservice_handler.notify_interested_services( + event + ) + room_id = event.room_id room_source = self.event_sources.sources["room"] - listeners = self.rooms_to_listeners.get(room_id, set()).copy() + listeners = self.room_to_listeners.get(room_id, set()).copy() for user in extra_users: listeners |= self.user_to_listeners.get(user, set()).copy() + for appservice in self.appservice_to_listeners: + # TODO (kegan): Redundant appservice listener checks? + # App services will already be in the room_to_listeners set, but + # that isn't enough. They need to be checked here in order to + # receive *invites* for users they are interested in. Does this + # make the room_to_listeners check somewhat obselete? + if appservice.is_interested(event): + listeners |= self.appservice_to_listeners.get( + appservice, set() + ).copy() + logger.debug("on_new_room_event listeners %s", listeners) # TODO (erikj): Can we make this more efficient by hitting the @@ -134,7 +159,8 @@ class Notifier(object): with PreserveLoggingContext(): yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners] + [notify(l).addErrback(eb) for l in listeners], + consumeErrors=True, ) @defer.inlineCallbacks @@ -158,7 +184,7 @@ class Notifier(object): listeners |= self.user_to_listeners.get(user, set()).copy() for room in rooms: - listeners |= self.rooms_to_listeners.get(room, set()).copy() + listeners |= self.room_to_listeners.get(room, set()).copy() @defer.inlineCallbacks def notify(listener): @@ -202,9 +228,57 @@ class Notifier(object): with PreserveLoggingContext(): yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners] + [notify(l).addErrback(eb) for l in listeners], + consumeErrors=True, ) + @defer.inlineCallbacks + def wait_for_events(self, user, rooms, filter, timeout, callback): + """Wait until the callback returns a non empty response or the + timeout fires. + """ + + deferred = defer.Deferred() + + from_token = StreamToken("s0", "0", "0") + + listener = [_NotificationListener( + user=user, + rooms=rooms, + from_token=from_token, + limit=1, + timeout=timeout, + deferred=deferred, + )] + + if timeout: + self._register_with_keys(listener[0]) + + result = yield callback() + if timeout: + timed_out = [False] + + def _timeout_listener(): + timed_out[0] = True + listener[0].notify(self, [], from_token, from_token) + + self.clock.call_later(timeout/1000., _timeout_listener) + while not result and not timed_out[0]: + yield deferred + deferred = defer.Deferred() + listener[0] = _NotificationListener( + user=user, + rooms=rooms, + from_token=from_token, + limit=1, + timeout=timeout, + deferred=deferred, + ) + self._register_with_keys(listener[0]) + result = yield callback() + + defer.returnValue(result) + def get_events_for(self, user, rooms, pagination_config, timeout): """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any @@ -224,6 +298,10 @@ class Notifier(object): if not from_token: from_token = yield self.event_sources.get_current_token() + appservice = yield self.hs.get_datastore().get_app_service_by_user_id( + user.to_string() + ) + listener = _NotificationListener( user, rooms, @@ -231,6 +309,7 @@ class Notifier(object): limit, timeout, deferred, + appservice=appservice ) def _timeout_listener(): @@ -258,11 +337,16 @@ class Notifier(object): @log_function def _register_with_keys(self, listener): for room in listener.rooms: - s = self.rooms_to_listeners.setdefault(room, set()) + s = self.room_to_listeners.setdefault(room, set()) s.add(listener) self.user_to_listeners.setdefault(listener.user, set()).add(listener) + if listener.appservice: + self.appservice_to_listeners.setdefault( + listener.appservice, set() + ).add(listener) + @defer.inlineCallbacks @log_function def _check_for_updates(self, listener): @@ -296,5 +380,5 @@ class Notifier(object): def _user_joined_room(self, user, room_id): new_listeners = self.user_to_listeners.get(user, set()) - listeners = self.rooms_to_listeners.setdefault(room_id, set()) + listeners = self.room_to_listeners.setdefault(room_id, set()) listeners |= new_listeners |