diff options
author | Mark Haines <mark.haines@matrix.org> | 2014-08-27 17:15:58 +0100 |
---|---|---|
committer | Mark Haines <mark.haines@matrix.org> | 2014-08-27 17:15:58 +0100 |
commit | 2aeaa7b77c0c39f12c89b0054049970faad28406 (patch) | |
tree | f3022ab73ea75827dfec17de532fb07b0949196f /synapse/notifier.py | |
parent | Return the store_id from persist_event (diff) | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into develop (diff) | |
download | synapse-2aeaa7b77c0c39f12c89b0054049970faad28406.tar.xz |
Merge branch 'develop' into storage_transactions
Conflicts: synapse/handlers/room.py synapse/storage/stream.py
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 236 |
1 files changed, 236 insertions, 0 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py new file mode 100644 index 0000000000..1656717cd7 --- /dev/null +++ b/synapse/notifier.py @@ -0,0 +1,236 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor + +from synapse.util.logutils import log_function + +import logging + + +logger = logging.getLogger(__name__) + + +class _NotificationListener(object): + """ This represents a single client connection to the events stream. + + The events stream handler will have yielded to the deferred, so to + notify the handler it is sufficient to resolve the deferred. + + This listener will also keep track of which rooms it is listening in + so that it can remove itself from the indexes in the Notifier class. + """ + + def __init__(self, user, rooms, from_token, limit, timeout, deferred): + self.user = user + self.from_token = from_token + self.limit = limit + self.timeout = timeout + self.deferred = deferred + + self.rooms = rooms + + self.pending_notifications = [] + + def notify(self, notifier, events, start_token, end_token): + """ Inform whoever is listening about the new events. This will + also remove this listener from all the indexes in the Notifier + it knows about. + """ + + result = (events, (start_token, end_token)) + + try: + self.deferred.callback(result) + except defer.AlreadyCalledError: + pass + + for room in self.rooms: + lst = notifier.rooms_to_listeners.get(room, set()) + lst.discard(self) + + notifier.user_to_listeners.get(self.user, set()).discard(self) + + +class Notifier(object): + """ This class is responsible for notifying any listeners when there are + new events available for it. + + Primarily used from the /events stream. + """ + + def __init__(self, hs): + self.hs = hs + + self.rooms_to_listeners = {} + self.user_to_listeners = {} + + self.event_sources = hs.get_event_sources() + + hs.get_distributor().observe( + "user_joined_room", self._user_joined_room + ) + + @log_function + @defer.inlineCallbacks + def on_new_room_event(self, event, extra_users=[]): + """ Used by handlers to inform the notifier something has happened + in the room, room event wise. + + This triggers the notifier to wake up any listeners that are + listening to the room, and any listeners for the users in the + `extra_users` param. + """ + room_id = event.room_id + + source = self.event_sources.sources["room"] + + listeners = self.rooms_to_listeners.get(room_id, set()).copy() + + for user in extra_users: + listeners |= self.user_to_listeners.get(user, set()).copy() + + logger.debug("on_new_room_event listeners %s", listeners) + + # TODO (erikj): Can we make this more efficient by hitting the + # db once? + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) + + @defer.inlineCallbacks + def on_new_user_event(self, users=[], rooms=[]): + """ Used to inform listeners that something has happend + presence/user event wise. + + Will wake up all listeners for the given users and rooms. + """ + source = self.event_sources.sources["presence"] + + listeners = set() + + for user in users: + listeners |= self.user_to_listeners.get(user, set()).copy() + + for room in rooms: + listeners |= self.rooms_to_listeners.get(room, set()).copy() + + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) + + def get_events_for(self, user, rooms, pagination_config, timeout): + """ For the given user and rooms, return any new events for them. If + there are no new events wait for up to `timeout` milliseconds for any + new events to happen before returning. + """ + deferred = defer.Deferred() + + self._get_events( + deferred, user, rooms, pagination_config.from_token, + pagination_config.limit, timeout + ).addErrback(deferred.errback) + + return deferred + + @defer.inlineCallbacks + def _get_events(self, deferred, user, rooms, from_token, limit, timeout): + if not from_token: + from_token = yield self.event_sources.get_current_token() + + listener = _NotificationListener( + user, + rooms, + from_token, + limit, + timeout, + deferred, + ) + + if timeout: + reactor.callLater(timeout/1000, self._timeout_listener, listener) + + self._register_with_keys(listener) + yield self._check_for_updates(listener) + + return + + def _timeout_listener(self, listener): + # TODO (erikj): We should probably set to_token to the current max + # rather than reusing from_token. + listener.notify( + self, + [], + listener.from_token, + listener.from_token, + ) + + @log_function + def _register_with_keys(self, listener): + for room in listener.rooms: + s = self.rooms_to_listeners.setdefault(room, set()) + s.add(listener) + + self.user_to_listeners.setdefault(listener.user, set()).add(listener) + + @defer.inlineCallbacks + @log_function + def _check_for_updates(self, listener): + # TODO (erikj): We need to think about limits across multiple sources + events = [] + + from_token = listener.from_token + limit = listener.limit + + # TODO (erikj): DeferredList? + for source in self.event_sources.sources.values(): + stuff, new_token = yield source.get_new_events_for_user( + listener.user, + from_token, + limit, + ) + + events.extend(stuff) + + from_token = new_token + + end_token = from_token + + if events: + listener.notify(self, events, listener.from_token, end_token) + + defer.returnValue(listener) + + def _user_joined_room(self, user, room_id): + new_listeners = self.user_to_listeners.get(user, set()) + + listeners = self.rooms_to_listeners.setdefault(room_id, set()) + listeners |= new_listeners |