diff options
-rw-r--r-- | synapse/notifier.py | 113 |
1 files changed, 18 insertions, 95 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index e16a4608e9..abe12b1434 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,6 +17,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.async import run_on_reactor from synapse.types import StreamToken import synapse.metrics @@ -50,13 +51,9 @@ class _NotificationListener(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user, rooms, from_token, limit, timeout, deferred, - appservice=None): + def __init__(self, user, rooms, deferred, appservice=None): self.user = user self.appservice = appservice - self.from_token = from_token - self.limit = limit - self.timeout = timeout self.deferred = deferred self.rooms = rooms self.timer = None @@ -64,17 +61,14 @@ class _NotificationListener(object): def notified(self): return self.deferred.called - def notify(self, notifier, events, start_token, end_token): + def notify(self, notifier): """ Inform whoever is listening about the new events. This will also remove this listener from all the indexes in the Notifier it knows about. """ - result = (events, (start_token, end_token)) - try: - self.deferred.callback(result) - notified_events_counter.inc_by(len(events)) + self.deferred.callback(None) except defer.AlreadyCalledError: pass @@ -161,6 +155,7 @@ class Notifier(object): listening to the room, and any listeners for the users in the `extra_users` param. """ + yield run_on_reactor() # poke any interested application service. self.hs.get_handlers().appservice_handler.notify_interested_services( event @@ -168,8 +163,6 @@ class Notifier(object): room_id = event.room_id - room_source = self.event_sources.sources["room"] - room_listeners = self.room_to_listeners.get(room_id, set()) _discard_if_notified(room_listeners) @@ -200,34 +193,12 @@ class Notifier(object): logger.debug("on_new_room_event listeners %s", listeners) - # TODO (erikj): Can we make this more efficient by hitting the - # db once? - - @defer.inlineCallbacks - def notify(listener): - events, end_key = yield room_source.get_new_events_for_user( - listener.user, - listener.from_token.room_key, - listener.limit, - ) - - if events: - end_token = listener.from_token.copy_and_replace( - "room_key", end_key - ) - - listener.notify( - self, events, listener.from_token, end_token - ) - - def eb(failure): - logger.exception("Failed to notify listener", failure) - with PreserveLoggingContext(): - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners], - consumeErrors=True, - ) + for listener in listeners: + try: + listener.notify(self) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks @log_function @@ -237,11 +208,7 @@ class Notifier(object): Will wake up all listeners for the given users and rooms. """ - # TODO(paul): This is horrible, having to manually list every event - # source here individually - presence_source = self.event_sources.sources["presence"] - typing_source = self.event_sources.sources["typing"] - + yield run_on_reactor() listeners = set() for user in users: @@ -258,51 +225,12 @@ class Notifier(object): listeners |= room_listeners - @defer.inlineCallbacks - def notify(listener): - presence_events, presence_end_key = ( - yield presence_source.get_new_events_for_user( - listener.user, - listener.from_token.presence_key, - listener.limit, - ) - ) - typing_events, typing_end_key = ( - yield typing_source.get_new_events_for_user( - listener.user, - listener.from_token.typing_key, - listener.limit, - ) - ) - - if presence_events or typing_events: - end_token = listener.from_token.copy_and_replace( - "presence_key", presence_end_key - ).copy_and_replace( - "typing_key", typing_end_key - ) - - listener.notify( - self, - presence_events + typing_events, - listener.from_token, - end_token - ) - - def eb(failure): - logger.error( - "Failed to notify listener", - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject()) - ) - with PreserveLoggingContext(): - yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners], - consumeErrors=True, - ) + for listener in listeners: + try: + listener.notify(self) + except: + logger.exception("Failed to notify listener") @defer.inlineCallbacks def wait_for_events(self, user, rooms, timeout, callback, @@ -319,9 +247,6 @@ class Notifier(object): listener = [_NotificationListener( user=user, rooms=rooms, - from_token=from_token, - limit=1, - timeout=timeout, deferred=deferred, appservice=appservice, )] @@ -338,7 +263,7 @@ class Notifier(object): def _timeout_listener(): timed_out[0] = True timer[0] = None - listener[0].notify(self, [], from_token, from_token) + listener[0].notify(self) # We create multiple notification listeners so we have to manage # canceling the timeout ourselves. @@ -350,10 +275,8 @@ class Notifier(object): listener[0] = _NotificationListener( user=user, rooms=rooms, - from_token=from_token, - limit=1, - timeout=timeout, deferred=deferred, + appservice=appservice, ) self._register_with_keys(listener[0]) result = yield callback() |