summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-08-26 18:57:46 +0100
committerErik Johnston <erik@matrix.org>2014-08-26 18:57:46 +0100
commit3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 (patch)
tree5f5b515cdaa586f1f7f92337471dfdc2e0d4683d /synapse/notifier.py
parentMerge branch 'develop' of github.com:matrix-org/synapse into stream_refactor (diff)
downloadsynapse-3a2a5b959cb1f56b26af32e1ad4c1db424279eb7.tar.xz
WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model.
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py184
1 files changed, 184 insertions, 0 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
new file mode 100644
index 0000000000..1911fd20ae
--- /dev/null
+++ b/synapse/notifier.py
@@ -0,0 +1,184 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer, reactor
+
+from synapse.util.logutils import log_function
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class _NotificationListener(object):
+    def __init__(self, user, from_token, limit, timeout, deferred):
+        self.user = user
+        self.from_token = from_token
+        self.limit = limit
+        self.timeout = timeout
+        self.deferred = deferred
+
+        self.signal_key_list = []
+
+        self.pending_notifications = []
+
+    def notify(self, notifier, events, start_token, end_token):
+        result = (events, (start_token, end_token))
+
+        try:
+            self.deferred.callback(result)
+        except defer.AlreadyCalledError:
+            pass
+
+        for signal, key in self.signal_key_list:
+            lst = notifier.signal_keys_to_users.get((signal, key), [])
+
+            try:
+                lst.remove(self)
+            except:
+                pass
+
+class Notifier(object):
+
+    def __init__(self, hs):
+        self.hs = hs
+
+        self.signal_keys_to_users = {}
+
+        self.event_sources = hs.get_event_sources()
+
+    @log_function
+    @defer.inlineCallbacks
+    def on_new_room_event(self, event, store_id):
+        room_id = event.room_id
+
+        source = self.event_sources.sources[0]
+
+        listeners = self.signal_keys_to_users.get(
+            (source.SIGNAL_NAME, room_id),
+            []
+        )
+
+        logger.debug("on_new_room_event self.signal_keys_to_users %s", listeners)
+        logger.debug("on_new_room_event listeners %s", listeners)
+
+        # TODO (erikj): Can we make this more efficient by hitting the
+        # db once?
+        for listener in listeners:
+            events, end_token = yield source.get_new_events_for_user(
+                listener.user,
+                listener.from_token,
+                listener.limit,
+                key=room_id,
+            )
+
+            if events:
+                listener.notify(
+                    self, events, listener.from_token, end_token
+                )
+
+    def on_new_user_event(self, *args, **kwargs):
+        pass
+
+    def get_events_for(self, user, pagination_config, timeout):
+        deferred = defer.Deferred()
+
+        self._get_events(
+            deferred, user, pagination_config.from_token,
+            pagination_config.limit, timeout
+        ).addErrback(deferred.errback)
+
+        return deferred
+
+    @defer.inlineCallbacks
+    def _get_events(self, deferred, user, from_token, limit, timeout):
+        if not from_token:
+            from_token = yield self.event_sources.get_current_token()
+
+        listener = _NotificationListener(
+            user,
+            from_token,
+            limit,
+            timeout,
+            deferred,
+        )
+
+        if timeout:
+            reactor.callLater(timeout/1000, self._timeout_listener, listener)
+
+        yield self._register_with_keys(listener)
+        yield self._check_for_updates(listener)
+
+        return
+
+    def _timeout_listener(self, listener):
+        # TODO (erikj): We should probably set to_token to the current max
+        # rather than reusing from_token.
+        listener.notify(
+            self,
+            [],
+            listener.from_token,
+            listener.from_token,
+        )
+
+    @defer.inlineCallbacks
+    @log_function
+    def _register_with_keys(self, listener):
+        signals_keys = {}
+
+        # TODO (erikj): This can probably be replaced by a DeferredList
+        for source in self.event_sources.sources:
+            keys = yield source.get_keys_for_user(listener.user)
+            signals_keys.setdefault(source.SIGNAL_NAME, []).extend(keys)
+
+        for signal, keys in signals_keys.items():
+            for key in keys:
+                s = self.signal_keys_to_users.setdefault((signal, key), [])
+                s.append(listener)
+                listener.signal_key_list.append((signal, key))
+
+        logger.debug("New signal_keys_to_users: %s", self.signal_keys_to_users)
+
+        defer.returnValue(listener)
+
+    @defer.inlineCallbacks
+    @log_function
+    def _check_for_updates(self, listener):
+        # TODO (erikj): We need to think about limits across multiple sources
+        events = []
+
+        from_token = listener.from_token
+        limit = listener.limit
+
+        # TODO (erikj): DeferredList?
+        for source in self.event_sources.sources:
+            stuff, new_token = yield source.get_new_events_for_user(
+                listener.user,
+                from_token,
+                limit,
+            )
+
+            events.extend(stuff)
+
+            from_token = new_token
+
+        end_token = from_token
+
+
+        if events:
+            listener.notify(self, events, listener.from_token, end_token)
+
+        defer.returnValue(listener)