diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
new file mode 100644
index 0000000000..79742a4e1c
--- /dev/null
+++ b/synapse/handlers/events.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from ._base import BaseHandler
+from synapse.api.streams.event import (
+ EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
+ RoomDataStreamData
+)
+from synapse.handlers.presence import PresenceStreamData
+
+
+class EventStreamHandler(BaseHandler):
+
+ stream_data_classes = [
+ MessagesStreamData,
+ RoomMemberStreamData,
+ FeedbackStreamData,
+ RoomDataStreamData,
+ PresenceStreamData,
+ ]
+
+ def __init__(self, hs):
+ super(EventStreamHandler, self).__init__(hs)
+
+ # Count of active streams per user
+ self._streams_per_user = {}
+ # Grace timers per user to delay the "stopped" signal
+ self._stop_timer_per_user = {}
+
+ self.distributor = hs.get_distributor()
+ self.distributor.declare("started_user_eventstream")
+ self.distributor.declare("stopped_user_eventstream")
+
+ self.clock = hs.get_clock()
+
+ def get_event_stream_token(self, stream_type, store_id, start_token):
+ """Return the next token after this event.
+
+ Args:
+ stream_type (str): The StreamData.EVENT_TYPE
+ store_id (int): The new storage ID assigned from the data store.
+ start_token (str): The token the user started with.
+ Returns:
+ str: The end token.
+ """
+ for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
+ if stream_cls.EVENT_TYPE == stream_type:
+ # this is the stream for this event, so replace this part of
+ # the token
+ store_ids = start_token.split(EventStream.SEPARATOR)
+ store_ids[i] = str(store_id)
+ return EventStream.SEPARATOR.join(store_ids)
+ raise RuntimeError("Didn't find a stream type %s" % stream_type)
+
+ @defer.inlineCallbacks
+ def get_stream(self, auth_user_id, pagin_config, timeout=0):
+ """Gets events as an event stream for this user.
+
+ This function looks for interesting *events* for this user. This is
+ different from the notifier, which looks for interested *users* who may
+ want to know about a single event.
+
+ Args:
+ auth_user_id (str): The user requesting their event stream.
+ pagin_config (synapse.api.streams.PaginationConfig): The config to
+ use when obtaining the stream.
+ timeout (int): The max time to wait for an incoming event in ms.
+ Returns:
+ A pagination stream API dict
+ """
+ auth_user = self.hs.parse_userid(auth_user_id)
+
+ stream_id = object()
+
+ try:
+ if auth_user not in self._streams_per_user:
+ self._streams_per_user[auth_user] = 0
+ if auth_user in self._stop_timer_per_user:
+ self.clock.cancel_call_later(
+ self._stop_timer_per_user.pop(auth_user))
+ else:
+ self.distributor.fire(
+ "started_user_eventstream", auth_user
+ )
+ self._streams_per_user[auth_user] += 1
+
+ # construct an event stream with the correct data ordering
+ stream_data_list = []
+ for stream_class in EventStreamHandler.stream_data_classes:
+ stream_data_list.append(stream_class(self.hs))
+ event_stream = EventStream(auth_user_id, stream_data_list)
+
+ # fix unknown tokens to known tokens
+ pagin_config = yield event_stream.fix_tokens(pagin_config)
+
+ # register interest in receiving new events
+ self.notifier.store_events_for(user_id=auth_user_id,
+ stream_id=stream_id,
+ from_tok=pagin_config.from_tok)
+
+ # see if we can grab a chunk now
+ data_chunk = yield event_stream.get_chunk(config=pagin_config)
+
+ # if there are previous events, return those. If not, wait on the
+ # new events for 'timeout' seconds.
+ if len(data_chunk["chunk"]) == 0 and timeout != 0:
+ results = yield defer.maybeDeferred(
+ self.notifier.get_events_for,
+ user_id=auth_user_id,
+ stream_id=stream_id,
+ timeout=timeout
+ )
+ if results:
+ defer.returnValue(results)
+
+ defer.returnValue(data_chunk)
+ finally:
+ # cleanup
+ self.notifier.purge_events_for(user_id=auth_user_id,
+ stream_id=stream_id)
+
+ self._streams_per_user[auth_user] -= 1
+ if not self._streams_per_user[auth_user]:
+ del self._streams_per_user[auth_user]
+
+ # 10 seconds of grace to allow the client to reconnect again
+ # before we think they're gone
+ def _later():
+ self.distributor.fire(
+ "stopped_user_eventstream", auth_user
+ )
+ del self._stop_timer_per_user[auth_user]
+
+ self._stop_timer_per_user[auth_user] = (
+ self.clock.call_later(5, _later)
+ )
|