diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 6bb797caf2..2d7bd5083b 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -16,18 +16,14 @@
from twisted.internet import defer
from ._base import BaseHandler
-from synapse.api.streams.event import (
- EventStream, EventsStreamData
-)
-from synapse.handlers.presence import PresenceStreamData
+import logging
-class EventStreamHandler(BaseHandler):
- stream_data_classes = [
- EventsStreamData,
- PresenceStreamData,
- ]
+logger = logging.getLogger(__name__)
+
+
+class EventStreamHandler(BaseHandler):
def __init__(self, hs):
super(EventStreamHandler, self).__init__(hs)
@@ -43,104 +39,22 @@ class EventStreamHandler(BaseHandler):
self.clock = hs.get_clock()
- def get_event_stream_token(self, stream_type, store_id, start_token):
- """Return the next token after this event.
-
- Args:
- stream_type (str): The StreamData.EVENT_TYPE
- store_id (int): The new storage ID assigned from the data store.
- start_token (str): The token the user started with.
- Returns:
- str: The end token.
- """
- for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
- if stream_cls.EVENT_TYPE == stream_type:
- # this is the stream for this event, so replace this part of
- # the token
- store_ids = start_token.split(EventStream.SEPARATOR)
- store_ids[i] = str(store_id)
- return EventStream.SEPARATOR.join(store_ids)
- raise RuntimeError("Didn't find a stream type %s" % stream_type)
+ self.notifier = hs.get_notifier()
@defer.inlineCallbacks
def get_stream(self, auth_user_id, pagin_config, timeout=0):
- """Gets events as an event stream for this user.
-
- This function looks for interesting *events* for this user. This is
- different from the notifier, which looks for interested *users* who may
- want to know about a single event.
-
- Args:
- auth_user_id (str): The user requesting their event stream.
- pagin_config (synapse.api.streams.PaginationConfig): The config to
- use when obtaining the stream.
- timeout (int): The max time to wait for an incoming event in ms.
- Returns:
- A pagination stream API dict
- """
auth_user = self.hs.parse_userid(auth_user_id)
- stream_id = object()
-
- try:
- if auth_user not in self._streams_per_user:
- self._streams_per_user[auth_user] = 0
- if auth_user in self._stop_timer_per_user:
- self.clock.cancel_call_later(
- self._stop_timer_per_user.pop(auth_user))
- else:
- self.distributor.fire(
- "started_user_eventstream", auth_user
- )
- self._streams_per_user[auth_user] += 1
-
- # construct an event stream with the correct data ordering
- stream_data_list = []
- for stream_class in EventStreamHandler.stream_data_classes:
- stream_data_list.append(stream_class(self.hs))
- event_stream = EventStream(auth_user_id, stream_data_list)
-
- # fix unknown tokens to known tokens
- pagin_config = yield event_stream.fix_tokens(pagin_config)
-
- # register interest in receiving new events
- self.notifier.store_events_for(user_id=auth_user_id,
- stream_id=stream_id,
- from_tok=pagin_config.from_tok)
-
- # see if we can grab a chunk now
- data_chunk = yield event_stream.get_chunk(config=pagin_config)
-
- # if there are previous events, return those. If not, wait on the
- # new events for 'timeout' seconds.
- if len(data_chunk["chunk"]) == 0 and timeout != 0:
- results = yield defer.maybeDeferred(
- self.notifier.get_events_for,
- user_id=auth_user_id,
- stream_id=stream_id,
- timeout=timeout
- )
- if results:
- defer.returnValue(results)
-
- defer.returnValue(data_chunk)
- finally:
- # cleanup
- self.notifier.purge_events_for(user_id=auth_user_id,
- stream_id=stream_id)
-
- self._streams_per_user[auth_user] -= 1
- if not self._streams_per_user[auth_user]:
- del self._streams_per_user[auth_user]
-
- # 10 seconds of grace to allow the client to reconnect again
- # before we think they're gone
- def _later():
- self.distributor.fire(
- "stopped_user_eventstream", auth_user
- )
- del self._stop_timer_per_user[auth_user]
-
- self._stop_timer_per_user[auth_user] = (
- self.clock.call_later(5, _later)
- )
+ if pagin_config.from_token is None:
+ pagin_config.from_token = None
+
+ events, tokens = yield self.notifier.get_events_for(auth_user, pagin_config, timeout)
+
+ chunk = {
+ "chunk": [e.get_dict() for e in events],
+ "start_token": tokens[0].to_string(),
+ "end_token": tokens[1].to_string(),
+ }
+
+ defer.returnValue(chunk)
+
|