diff options
author | Erik Johnston <erik@matrix.org> | 2014-08-26 18:57:46 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-08-26 18:57:46 +0100 |
commit | 3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 (patch) | |
tree | 5f5b515cdaa586f1f7f92337471dfdc2e0d4683d /synapse/notifier.py | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into stream_refactor (diff) | |
download | synapse-3a2a5b959cb1f56b26af32e1ad4c1db424279eb7.tar.xz |
WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model.
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py new file mode 100644 index 0000000000..1911fd20ae --- /dev/null +++ b/synapse/notifier.py @@ -0,0 +1,184 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer, reactor + +from synapse.util.logutils import log_function + +import logging + + +logger = logging.getLogger(__name__) + + +class _NotificationListener(object): + def __init__(self, user, from_token, limit, timeout, deferred): + self.user = user + self.from_token = from_token + self.limit = limit + self.timeout = timeout + self.deferred = deferred + + self.signal_key_list = [] + + self.pending_notifications = [] + + def notify(self, notifier, events, start_token, end_token): + result = (events, (start_token, end_token)) + + try: + self.deferred.callback(result) + except defer.AlreadyCalledError: + pass + + for signal, key in self.signal_key_list: + lst = notifier.signal_keys_to_users.get((signal, key), []) + + try: + lst.remove(self) + except: + pass + +class Notifier(object): + + def __init__(self, hs): + self.hs = hs + + self.signal_keys_to_users = {} + + self.event_sources = hs.get_event_sources() + + @log_function + @defer.inlineCallbacks + def on_new_room_event(self, event, store_id): + room_id = event.room_id + + source = self.event_sources.sources[0] + + listeners = self.signal_keys_to_users.get( + (source.SIGNAL_NAME, room_id), + [] + ) + + logger.debug("on_new_room_event self.signal_keys_to_users %s", listeners) + logger.debug("on_new_room_event listeners %s", listeners) + + # TODO (erikj): Can we make this more efficient by hitting the + # db once? + for listener in listeners: + events, end_token = yield source.get_new_events_for_user( + listener.user, + listener.from_token, + listener.limit, + key=room_id, + ) + + if events: + listener.notify( + self, events, listener.from_token, end_token + ) + + def on_new_user_event(self, *args, **kwargs): + pass + + def get_events_for(self, user, pagination_config, timeout): + deferred = defer.Deferred() + + self._get_events( + deferred, user, pagination_config.from_token, + pagination_config.limit, timeout + ).addErrback(deferred.errback) + + return deferred + + @defer.inlineCallbacks + def _get_events(self, deferred, user, from_token, limit, timeout): + if not from_token: + from_token = yield self.event_sources.get_current_token() + + listener = _NotificationListener( + user, + from_token, + limit, + timeout, + deferred, + ) + + if timeout: + reactor.callLater(timeout/1000, self._timeout_listener, listener) + + yield self._register_with_keys(listener) + yield self._check_for_updates(listener) + + return + + def _timeout_listener(self, listener): + # TODO (erikj): We should probably set to_token to the current max + # rather than reusing from_token. + listener.notify( + self, + [], + listener.from_token, + listener.from_token, + ) + + @defer.inlineCallbacks + @log_function + def _register_with_keys(self, listener): + signals_keys = {} + + # TODO (erikj): This can probably be replaced by a DeferredList + for source in self.event_sources.sources: + keys = yield source.get_keys_for_user(listener.user) + signals_keys.setdefault(source.SIGNAL_NAME, []).extend(keys) + + for signal, keys in signals_keys.items(): + for key in keys: + s = self.signal_keys_to_users.setdefault((signal, key), []) + s.append(listener) + listener.signal_key_list.append((signal, key)) + + logger.debug("New signal_keys_to_users: %s", self.signal_keys_to_users) + + defer.returnValue(listener) + + @defer.inlineCallbacks + @log_function + def _check_for_updates(self, listener): + # TODO (erikj): We need to think about limits across multiple sources + events = [] + + from_token = listener.from_token + limit = listener.limit + + # TODO (erikj): DeferredList? + for source in self.event_sources.sources: + stuff, new_token = yield source.get_new_events_for_user( + listener.user, + from_token, + limit, + ) + + events.extend(stuff) + + from_token = new_token + + end_token = from_token + + + if events: + listener.notify(self, events, listener.from_token, end_token) + + defer.returnValue(listener) |