diff options
Diffstat (limited to 'synapse/handlers/events.py')
-rw-r--r-- | synapse/handlers/events.py | 107 |
1 files changed, 31 insertions, 76 deletions
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 1bd173acd8..b336b292d3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -15,19 +15,17 @@ from twisted.internet import defer +from synapse.api.events import SynapseEvent + from ._base import BaseHandler -from synapse.api.streams.event import ( - EventStream, EventsStreamData -) -from synapse.handlers.presence import PresenceStreamData +import logging -class EventStreamHandler(BaseHandler): - stream_data_classes = [ - EventsStreamData, - PresenceStreamData, - ] +logger = logging.getLogger(__name__) + + +class EventStreamHandler(BaseHandler): def __init__(self, hs): super(EventStreamHandler, self).__init__(hs) @@ -43,45 +41,12 @@ class EventStreamHandler(BaseHandler): self.clock = hs.get_clock() - def get_event_stream_token(self, stream_type, store_id, start_token): - """Return the next token after this event. - - Args: - stream_type (str): The StreamData.EVENT_TYPE - store_id (int): The new storage ID assigned from the data store. - start_token (str): The token the user started with. - Returns: - str: The end token. - """ - for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes): - if stream_cls.EVENT_TYPE == stream_type: - # this is the stream for this event, so replace this part of - # the token - store_ids = start_token.split(EventStream.SEPARATOR) - store_ids[i] = str(store_id) - return EventStream.SEPARATOR.join(store_ids) - raise RuntimeError("Didn't find a stream type %s" % stream_type) + self.notifier = hs.get_notifier() @defer.inlineCallbacks def get_stream(self, auth_user_id, pagin_config, timeout=0): - """Gets events as an event stream for this user. - - This function looks for interesting *events* for this user. This is - different from the notifier, which looks for interested *users* who may - want to know about a single event. - - Args: - auth_user_id (str): The user requesting their event stream. - pagin_config (synapse.api.streams.PaginationConfig): The config to - use when obtaining the stream. - timeout (int): The max time to wait for an incoming event in ms. - Returns: - A pagination stream API dict - """ auth_user = self.hs.parse_userid(auth_user_id) - stream_id = object() - try: if auth_user not in self._streams_per_user: self._streams_per_user[auth_user] = 0 @@ -94,41 +59,31 @@ class EventStreamHandler(BaseHandler): ) self._streams_per_user[auth_user] += 1 - # construct an event stream with the correct data ordering - stream_data_list = [] - for stream_class in EventStreamHandler.stream_data_classes: - stream_data_list.append(stream_class(self.hs)) - event_stream = EventStream(auth_user_id, stream_data_list) - - # fix unknown tokens to known tokens - pagin_config = yield event_stream.fix_tokens(pagin_config) - - # register interest in receiving new events - self.notifier.store_events_for(user_id=auth_user_id, - stream_id=stream_id, - from_tok=pagin_config.from_tok) - - # see if we can grab a chunk now - data_chunk = yield event_stream.get_chunk(config=pagin_config) - - # if there are previous events, return those. If not, wait on the - # new events for 'timeout' seconds. - if len(data_chunk["chunk"]) == 0 and timeout != 0: - results = yield defer.maybeDeferred( - self.notifier.get_events_for, - user_id=auth_user_id, - stream_id=stream_id, - timeout=timeout - ) - if results: - defer.returnValue(results) - defer.returnValue(data_chunk) - finally: - # cleanup - self.notifier.purge_events_for(user_id=auth_user_id, - stream_id=stream_id) + if pagin_config.from_token is None: + pagin_config.from_token = None + + rm_handler = self.hs.get_handlers().room_member_handler + room_ids = yield rm_handler.get_rooms_for_user(auth_user) + events, tokens = yield self.notifier.get_events_for( + auth_user, room_ids, pagin_config, timeout + ) + + chunks = [ + e.get_dict() if isinstance(e, SynapseEvent) else e + for e in events + ] + + chunk = { + "chunk": chunks, + "start": tokens[0].to_string(), + "end": tokens[1].to_string(), + } + + defer.returnValue(chunk) + + finally: self._streams_per_user[auth_user] -= 1 if not self._streams_per_user[auth_user]: del self._streams_per_user[auth_user] |