diff options
author | Erik Johnston <erik@matrix.org> | 2014-08-27 16:11:43 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-08-27 16:11:43 +0100 |
commit | 89c044c2a0727c73cce4e6bff9a25a87da6f52bd (patch) | |
tree | 22d342f5beae7218c4f3664db8a888ddbafc4dab /synapse/notifier.py | |
parent | api docs: Finished adding all C-S APIs. Added initialSync, publicRooms, membe... (diff) | |
parent | Turn off presence again. (diff) | |
download | synapse-89c044c2a0727c73cce4e6bff9a25a87da6f52bd.tar.xz |
Merge branch 'stream_refactor' into develop
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py new file mode 100644 index 0000000000..b969011b32 --- /dev/null +++ b/synapse/notifier.py @@ -0,0 +1,201 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor + +from synapse.util.logutils import log_function + +import logging + + +logger = logging.getLogger(__name__) + + +class _NotificationListener(object): + def __init__(self, user, rooms, from_token, limit, timeout, deferred): + self.user = user + self.from_token = from_token + self.limit = limit + self.timeout = timeout + self.deferred = deferred + + self.rooms = rooms + + self.pending_notifications = [] + + def notify(self, notifier, events, start_token, end_token): + result = (events, (start_token, end_token)) + + try: + self.deferred.callback(result) + except defer.AlreadyCalledError: + pass + + for room in self.rooms: + lst = notifier.rooms_to_listeners.get(room, set()) + lst.discard(self) + + notifier.user_to_listeners.get(self.user, set()).discard(self) + + +class Notifier(object): + + def __init__(self, hs): + self.hs = hs + + self.rooms_to_listeners = {} + self.user_to_listeners = {} + + self.event_sources = hs.get_event_sources() + + hs.get_distributor().observe( + "user_joined_room", self._user_joined_room + ) + + @log_function + @defer.inlineCallbacks + def on_new_room_event(self, event, extra_users=[]): + room_id = event.room_id + + source = self.event_sources.sources["room"] + + listeners = self.rooms_to_listeners.get(room_id, set()).copy() + + for user in extra_users: + listeners |= self.user_to_listeners.get(user, set()).copy() + + logger.debug("on_new_room_event listeners %s", listeners) + + # TODO (erikj): Can we make this more efficient by hitting the + # db once? + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) + + @defer.inlineCallbacks + def on_new_user_event(self, users=[], rooms=[]): + source = self.event_sources.sources["presence"] + + listeners = set() + + for user in users: + listeners |= self.user_to_listeners.get(user, set()).copy() + + for room in rooms: + listeners |= self.rooms_to_listeners.get(room, set()).copy() + + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) + + def get_events_for(self, user, rooms, pagination_config, timeout): + deferred = defer.Deferred() + + self._get_events( + deferred, user, rooms, pagination_config.from_token, + pagination_config.limit, timeout + ).addErrback(deferred.errback) + + return deferred + + @defer.inlineCallbacks + def _get_events(self, deferred, user, rooms, from_token, limit, timeout): + if not from_token: + from_token = yield self.event_sources.get_current_token() + + listener = _NotificationListener( + user, + rooms, + from_token, + limit, + timeout, + deferred, + ) + + if timeout: + reactor.callLater(timeout/1000, self._timeout_listener, listener) + + self._register_with_keys(listener) + yield self._check_for_updates(listener) + + return + + def _timeout_listener(self, listener): + # TODO (erikj): We should probably set to_token to the current max + # rather than reusing from_token. + listener.notify( + self, + [], + listener.from_token, + listener.from_token, + ) + + @log_function + def _register_with_keys(self, listener): + for room in listener.rooms: + s = self.rooms_to_listeners.setdefault(room, set()) + s.add(listener) + + self.user_to_listeners.setdefault(listener.user, set()).add(listener) + + @defer.inlineCallbacks + @log_function + def _check_for_updates(self, listener): + # TODO (erikj): We need to think about limits across multiple sources + events = [] + + from_token = listener.from_token + limit = listener.limit + + # TODO (erikj): DeferredList? + for source in self.event_sources.sources.values(): + stuff, new_token = yield source.get_new_events_for_user( + listener.user, + from_token, + limit, + ) + + events.extend(stuff) + + from_token = new_token + + end_token = from_token + + if events: + listener.notify(self, events, listener.from_token, end_token) + + defer.returnValue(listener) + + def _user_joined_room(self, user, room_id): + new_listeners = self.user_to_listeners.get(user, set()) + + listeners = self.rooms_to_listeners.setdefault(room_id, set()) + listeners |= new_listeners |