summary refs log tree commit diff
path: root/synapse/api/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/api/notifier.py')
-rw-r--r--synapse/api/notifier.py186
1 files changed, 186 insertions, 0 deletions
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py
new file mode 100644
index 0000000000..974f7f0ba0
--- /dev/null
+++ b/synapse/api/notifier.py
@@ -0,0 +1,186 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.api.constants import Membership
+from synapse.api.events.room import RoomMemberEvent
+
+from twisted.internet import defer
+from twisted.internet import reactor
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class Notifier(object):
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.hs = hs
+        self.stored_event_listeners = {}
+
+    @defer.inlineCallbacks
+    def on_new_room_event(self, event, store_id):
+        """Called when there is a new room event which may potentially be sent
+        down listening users' event streams.
+
+        This function looks for interested *users* who may want to be notified
+        for this event. This is different to users requesting from the event
+        stream which looks for interested *events* for this user.
+
+        Args:
+            event (SynapseEvent): The new event, which must have a room_id
+            store_id (int): The ID of this event after it was stored with the
+            data store.
+        '"""
+        member_list = yield self.store.get_room_members(room_id=event.room_id,
+                                                        membership="join")
+        if not member_list:
+            member_list = []
+
+        member_list = [u.user_id for u in member_list]
+
+        # invites MUST prod the person being invited, who won't be in the room.
+        if (event.type == RoomMemberEvent.TYPE and
+                event.content["membership"] == Membership.INVITE):
+            member_list.append(event.target_user_id)
+
+        for user_id in member_list:
+            if user_id in self.stored_event_listeners:
+                self._notify_and_callback(
+                    user_id=user_id,
+                    event_data=event.get_dict(),
+                    stream_type=event.type,
+                    store_id=store_id)
+
+    def on_new_user_event(self, user_id, event_data, stream_type, store_id):
+        if user_id in self.stored_event_listeners:
+            self._notify_and_callback(
+                user_id=user_id,
+                event_data=event_data,
+                stream_type=stream_type,
+                store_id=store_id
+            )
+
+    def _notify_and_callback(self, user_id, event_data, stream_type, store_id):
+        logger.debug(
+            "Notifying %s of a new event.",
+            user_id
+        )
+
+        stream_ids = list(self.stored_event_listeners[user_id])
+        for stream_id in stream_ids:
+            self._notify_and_callback_stream(user_id, stream_id, event_data,
+                                             stream_type, store_id)
+
+        if not self.stored_event_listeners[user_id]:
+            del self.stored_event_listeners[user_id]
+
+    def _notify_and_callback_stream(self, user_id, stream_id, event_data,
+                                    stream_type, store_id):
+
+        event_listener = self.stored_event_listeners[user_id].pop(stream_id)
+        return_event_object = {
+            k: event_listener[k] for k in ["start", "chunk", "end"]
+        }
+
+        # work out the new end token
+        token = event_listener["start"]
+        end = self._next_token(stream_type, store_id, token)
+        return_event_object["end"] = end
+
+        # add the event to the chunk
+        chunk = event_listener["chunk"]
+        chunk.append(event_data)
+
+        # callback the defer. We know this can't have been resolved before as
+        # we always remove the event_listener from the map before resolving.
+        event_listener["defer"].callback(return_event_object)
+
+    def _next_token(self, stream_type, store_id, current_token):
+        stream_handler = self.hs.get_handlers().event_stream_handler
+        return stream_handler.get_event_stream_token(
+            stream_type,
+            store_id,
+            current_token
+        )
+
+    def store_events_for(self, user_id=None, stream_id=None, from_tok=None):
+        """Store all incoming events for this user. This should be paired with
+        get_events_for to return chunked data.
+
+        Args:
+            user_id (str): The user to monitor incoming events for.
+            stream (object): The stream that is receiving events
+            from_tok (str): The token to monitor incoming events from.
+        """
+        event_listener = {
+            "start": from_tok,
+            "chunk": [],
+            "end": from_tok,
+            "defer": defer.Deferred(),
+        }
+
+        if user_id not in self.stored_event_listeners:
+            self.stored_event_listeners[user_id] = {stream_id: event_listener}
+        else:
+            self.stored_event_listeners[user_id][stream_id] = event_listener
+
+    def purge_events_for(self, user_id=None, stream_id=None):
+        """Purges any stored events for this user.
+
+        Args:
+            user_id (str): The user to purge stored events for.
+        """
+        try:
+            del self.stored_event_listeners[user_id][stream_id]
+            if not self.stored_event_listeners[user_id]:
+                del self.stored_event_listeners[user_id]
+        except KeyError:
+            pass
+
+    def get_events_for(self, user_id=None, stream_id=None, timeout=0):
+        """Retrieve stored events for this user, waiting if necessary.
+
+        It is advisable to wrap this call in a maybeDeferred.
+
+        Args:
+            user_id (str): The user to get events for.
+            timeout (int): The time in seconds to wait before giving up.
+        Returns:
+            A Deferred or a dict containing the chunk data, depending on if
+            there was data to return yet. The Deferred callback may be None if
+            there were no events before the timeout expired.
+        """
+        logger.debug("%s is listening for events.", user_id)
+
+        if len(self.stored_event_listeners[user_id][stream_id]["chunk"]) > 0:
+            logger.debug("%s returning existing chunk.", user_id)
+            return self.stored_event_listeners[user_id][stream_id]
+
+        reactor.callLater(
+            (timeout / 1000.0), self._timeout, user_id, stream_id
+        )
+        return self.stored_event_listeners[user_id][stream_id]["defer"]
+
+    def _timeout(self, user_id, stream_id):
+        try:
+            # We remove the event_listener from the map so that we can't
+            # resolve the deferred twice.
+            event_listeners = self.stored_event_listeners[user_id]
+            event_listener = event_listeners.pop(stream_id)
+            event_listener["defer"].callback(None)
+            logger.debug("%s event listening timed out.", user_id)
+        except KeyError:
+            pass