summary refs log tree commit diff
path: root/synapse/handlers/events.py
diff options
context:
space:
mode:
authormatrix.org <matrix@matrix.org>2014-08-12 15:10:52 +0100
committermatrix.org <matrix@matrix.org>2014-08-12 15:10:52 +0100
commit4f475c7697722e946e39e42f38f3dd03a95d8765 (patch)
tree076d96d3809fb836c7245fd9f7960e7b75888a77 /synapse/handlers/events.py
downloadsynapse-4f475c7697722e946e39e42f38f3dd03a95d8765.tar.xz
Reference Matrix Home Server
Diffstat (limited to 'synapse/handlers/events.py')
-rw-r--r--synapse/handlers/events.py149
1 files changed, 149 insertions, 0 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
new file mode 100644
index 0000000000..79742a4e1c
--- /dev/null
+++ b/synapse/handlers/events.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from ._base import BaseHandler
+from synapse.api.streams.event import (
+    EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
+    RoomDataStreamData
+)
+from synapse.handlers.presence import PresenceStreamData
+
+
+class EventStreamHandler(BaseHandler):
+
+    stream_data_classes = [
+        MessagesStreamData,
+        RoomMemberStreamData,
+        FeedbackStreamData,
+        RoomDataStreamData,
+        PresenceStreamData,
+    ]
+
+    def __init__(self, hs):
+        super(EventStreamHandler, self).__init__(hs)
+
+        # Count of active streams per user
+        self._streams_per_user = {}
+        # Grace timers per user to delay the "stopped" signal
+        self._stop_timer_per_user = {}
+
+        self.distributor = hs.get_distributor()
+        self.distributor.declare("started_user_eventstream")
+        self.distributor.declare("stopped_user_eventstream")
+
+        self.clock = hs.get_clock()
+
+    def get_event_stream_token(self, stream_type, store_id, start_token):
+        """Return the next token after this event.
+
+        Args:
+            stream_type (str): The StreamData.EVENT_TYPE
+            store_id (int): The new storage ID assigned from the data store.
+            start_token (str): The token the user started with.
+        Returns:
+            str: The end token.
+        """
+        for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
+            if stream_cls.EVENT_TYPE == stream_type:
+                # this is the stream for this event, so replace this part of
+                # the token
+                store_ids = start_token.split(EventStream.SEPARATOR)
+                store_ids[i] = str(store_id)
+                return EventStream.SEPARATOR.join(store_ids)
+        raise RuntimeError("Didn't find a stream type %s" % stream_type)
+
+    @defer.inlineCallbacks
+    def get_stream(self, auth_user_id, pagin_config, timeout=0):
+        """Gets events as an event stream for this user.
+
+        This function looks for interesting *events* for this user. This is
+        different from the notifier, which looks for interested *users* who may
+        want to know about a single event.
+
+        Args:
+            auth_user_id (str): The user requesting their event stream.
+            pagin_config (synapse.api.streams.PaginationConfig): The config to
+            use when obtaining the stream.
+            timeout (int): The max time to wait for an incoming event in ms.
+        Returns:
+            A pagination stream API dict
+        """
+        auth_user = self.hs.parse_userid(auth_user_id)
+
+        stream_id = object()
+
+        try:
+            if auth_user not in self._streams_per_user:
+                self._streams_per_user[auth_user] = 0
+                if auth_user in self._stop_timer_per_user:
+                    self.clock.cancel_call_later(
+                        self._stop_timer_per_user.pop(auth_user))
+                else:
+                    self.distributor.fire(
+                        "started_user_eventstream", auth_user
+                    )
+            self._streams_per_user[auth_user] += 1
+
+            # construct an event stream with the correct data ordering
+            stream_data_list = []
+            for stream_class in EventStreamHandler.stream_data_classes:
+                stream_data_list.append(stream_class(self.hs))
+            event_stream = EventStream(auth_user_id, stream_data_list)
+
+            # fix unknown tokens to known tokens
+            pagin_config = yield event_stream.fix_tokens(pagin_config)
+
+            # register interest in receiving new events
+            self.notifier.store_events_for(user_id=auth_user_id,
+                                           stream_id=stream_id,
+                                           from_tok=pagin_config.from_tok)
+
+            # see if we can grab a chunk now
+            data_chunk = yield event_stream.get_chunk(config=pagin_config)
+
+            # if there are previous events, return those. If not, wait on the
+            # new events for 'timeout' seconds.
+            if len(data_chunk["chunk"]) == 0 and timeout != 0:
+                results = yield defer.maybeDeferred(
+                    self.notifier.get_events_for,
+                    user_id=auth_user_id,
+                    stream_id=stream_id,
+                    timeout=timeout
+                )
+                if results:
+                    defer.returnValue(results)
+
+            defer.returnValue(data_chunk)
+        finally:
+            # cleanup
+            self.notifier.purge_events_for(user_id=auth_user_id,
+                                           stream_id=stream_id)
+
+            self._streams_per_user[auth_user] -= 1
+            if not self._streams_per_user[auth_user]:
+                del self._streams_per_user[auth_user]
+
+                # 10 seconds of grace to allow the client to reconnect again
+                #   before we think they're gone
+                def _later():
+                    self.distributor.fire(
+                        "stopped_user_eventstream", auth_user
+                    )
+                    del self._stop_timer_per_user[auth_user]
+
+                self._stop_timer_per_user[auth_user] = (
+                    self.clock.call_later(5, _later)
+                )