diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 85 |
1 files changed, 42 insertions, 43 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index df9be29f3d..a69d5343cb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -24,14 +24,14 @@ logger = logging.getLogger(__name__) class _NotificationListener(object): - def __init__(self, user, from_token, limit, timeout, deferred): + def __init__(self, user, rooms, from_token, limit, timeout, deferred): self.user = user self.from_token = from_token self.limit = limit self.timeout = timeout self.deferred = deferred - self.signal_key_list = [] + self.rooms = rooms self.pending_notifications = [] @@ -43,36 +43,39 @@ class _NotificationListener(object): except defer.AlreadyCalledError: pass - for signal, key in self.signal_key_list: - lst = notifier.signal_keys_to_users.get((signal, key), []) + for room in self.rooms: + lst = notifier.rooms_to_listeners.get(room, set()) + lst.discard(self) + + notifier.user_to_listeners.get(self.user, set()).discard(self) - try: - lst.remove(self) - except: - pass class Notifier(object): def __init__(self, hs): self.hs = hs - self.signal_keys_to_users = {} + self.rooms_to_listeners = {} + self.user_to_listeners = {} self.event_sources = hs.get_event_sources() + hs.get_distributor().observe( + "user_joined_room", self._user_joined_room + ) + @log_function @defer.inlineCallbacks - def on_new_room_event(self, event, store_id): + def on_new_room_event(self, event, extra_users=[]): room_id = event.room_id source = self.event_sources.sources[0] - listeners = self.signal_keys_to_users.get( - (source.SIGNAL_NAME, room_id), - [] - ) + listeners = self.rooms_to_listeners.get(room_id, set()).copy() + + for user in extra_users: + listeners |= self.user_to_listeners.get(user, set()).copy() - logger.debug("on_new_room_event self.signal_keys_to_users %s", listeners) logger.debug("on_new_room_event listeners %s", listeners) # TODO (erikj): Can we make this more efficient by hitting the @@ -82,7 +85,6 @@ class Notifier(object): listener.user, listener.from_token, listener.limit, - key=room_id, ) if events: @@ -90,20 +92,23 @@ class Notifier(object): self, events, listener.from_token, end_token ) - def on_new_user_event(self, *args, **kwargs): + @defer.inlineCallbacks + def on_new_user_event(self, users=[], rooms=[]): source = self.event_sources.sources[1] - listeners = self.signal_keys_to_users.get( - (source.SIGNAL_NAME, "moose"), - [] - ) + listeners = set() + + for user in users: + listeners |= self.user_to_listeners.get(user, set()).copy() + + for room in rooms: + listeners |= self.rooms_to_listeners.get(room, set()).copy() for listener in listeners: events, end_token = yield source.get_new_events_for_user( listener.user, listener.from_token, listener.limit, - key="moose", ) if events: @@ -111,23 +116,24 @@ class Notifier(object): self, events, listener.from_token, end_token ) - def get_events_for(self, user, pagination_config, timeout): + def get_events_for(self, user, rooms, pagination_config, timeout): deferred = defer.Deferred() self._get_events( - deferred, user, pagination_config.from_token, + deferred, user, rooms, pagination_config.from_token, pagination_config.limit, timeout ).addErrback(deferred.errback) return deferred @defer.inlineCallbacks - def _get_events(self, deferred, user, from_token, limit, timeout): + def _get_events(self, deferred, user, rooms, from_token, limit, timeout): if not from_token: from_token = yield self.event_sources.get_current_token() listener = _NotificationListener( user, + rooms, from_token, limit, timeout, @@ -137,7 +143,7 @@ class Notifier(object): if timeout: reactor.callLater(timeout/1000, self._timeout_listener, listener) - yield self._register_with_keys(listener) + self._register_with_keys(listener) yield self._check_for_updates(listener) return @@ -152,25 +158,13 @@ class Notifier(object): listener.from_token, ) - @defer.inlineCallbacks @log_function def _register_with_keys(self, listener): - signals_keys = {} - - # TODO (erikj): This can probably be replaced by a DeferredList - for source in self.event_sources.sources: - keys = yield source.get_keys_for_user(listener.user) - signals_keys.setdefault(source.SIGNAL_NAME, []).extend(keys) + for room in listener.rooms: + s = self.rooms_to_listeners.setdefault(room, set()) + s.add(listener) - for signal, keys in signals_keys.items(): - for key in keys: - s = self.signal_keys_to_users.setdefault((signal, key), []) - s.append(listener) - listener.signal_key_list.append((signal, key)) - - logger.debug("New signal_keys_to_users: %s", self.signal_keys_to_users) - - defer.returnValue(listener) + self.user_to_listeners.setdefault(listener.user, set()).add(listener) @defer.inlineCallbacks @log_function @@ -195,8 +189,13 @@ class Notifier(object): end_token = from_token - if events: listener.notify(self, events, listener.from_token, end_token) defer.returnValue(listener) + + def _user_joined_room(self, user, room_id): + new_listeners = self.user_to_listeners.get(user, set()) + + listeners = self.rooms_to_listeners.setdefault(room_id, set()) + listeners |= new_listeners |