From 3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Aug 2014 18:57:46 +0100 Subject: WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model. --- synapse/api/streams/event.py | 194 ------------------------------------------- 1 file changed, 194 deletions(-) delete mode 100644 synapse/api/streams/event.py (limited to 'synapse/api/streams/event.py') diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py deleted file mode 100644 index fe44a488bc..0000000000 --- a/synapse/api/streams/event.py +++ /dev/null @@ -1,194 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains classes for streaming from the event stream: /events. -""" -from twisted.internet import defer - -from synapse.api.errors import EventStreamError -from synapse.api.events import SynapseEvent -from synapse.api.streams import PaginationStream, StreamData - -import logging - -logger = logging.getLogger(__name__) - - -class EventsStreamData(StreamData): - EVENT_TYPE = "EventsStream" - - def __init__(self, hs, room_id=None, feedback=False): - super(EventsStreamData, self).__init__(hs) - self.room_id = room_id - self.with_feedback = feedback - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit, direction): - data, latest_ver = yield self.store.get_room_events( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id, - with_feedback=self.with_feedback - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_room_events_max_id() - defer.returnValue(val) - - -class EventStream(PaginationStream): - - SEPARATOR = '_' - - def __init__(self, user_id, stream_data_list): - super(EventStream, self).__init__() - self.user_id = user_id - self.stream_data = stream_data_list - - @defer.inlineCallbacks - def fix_tokens(self, pagination_config): - pagination_config.from_tok = yield self.fix_token( - pagination_config.from_tok) - pagination_config.to_tok = yield self.fix_token( - pagination_config.to_tok) - - if ( - not pagination_config.to_tok - and pagination_config.direction == 'f' - ): - pagination_config.to_tok = yield self.get_current_max_token() - - logger.debug("pagination_config: %s", pagination_config) - - defer.returnValue(pagination_config) - - @defer.inlineCallbacks - def fix_token(self, token): - """Fixes unknown values in a token to known values. - - Args: - token (str): The token to fix up. - Returns: - The fixed-up token, which may == token. - """ - if token == PaginationStream.TOK_END: - new_token = yield self.get_current_max_token() - - logger.debug("fix_token: From %s to %s", token, new_token) - - token = new_token - - defer.returnValue(token) - - @defer.inlineCallbacks - def get_current_max_token(self): - new_token_parts = [] - for s in self.stream_data: - mx = yield s.max_token() - new_token_parts.append(str(mx)) - - new_token = EventStream.SEPARATOR.join(new_token_parts) - - logger.debug("get_current_max_token: %s", new_token) - - defer.returnValue(new_token) - - @defer.inlineCallbacks - def get_chunk(self, config): - # no support for limit on >1 streams, makes no sense. - if config.limit and len(self.stream_data) > 1: - raise EventStreamError( - 400, "Limit not supported on multiplexed streams." - ) - - chunk_data, next_tok = yield self._get_chunk_data( - config.from_tok, - config.to_tok, - config.limit, - config.direction, - ) - - defer.returnValue({ - "chunk": chunk_data, - "start": config.from_tok, - "end": next_tok - }) - - @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit, direction): - """ Get event data between the two tokens. - - Tokens are SEPARATOR separated values representing pkey values of - certain tables, and the position determines the StreamData invoked - according to the STREAM_DATA list. - - The magic value '-1' can be used to get the latest value. - - Args: - from_tok - The token to start from. - to_tok - The token to end at. Must have values > from_tok or be -1. - Returns: - A list of event data. - Raises: - EventStreamError if something went wrong. - """ - # sanity check - if to_tok is not None: - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") - - chunk = [] - next_ver = [] - for i, (from_pkey, to_pkey) in enumerate(zip( - self._split_token(from_tok), - self._split_token(to_tok) - )): - if from_pkey == to_pkey: - # tokens are the same, we have nothing to do. - next_ver.append(str(to_pkey)) - continue - - (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit, direction, - ) - - chunk.extend([ - e.get_dict() if isinstance(e, SynapseEvent) else e - for e in event_chunk - ]) - next_ver.append(str(max_pkey)) - - defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) - - def _split_token(self, token): - """Splits the given token into a list of pkeys. - - Args: - token (str): The token with SEPARATOR values. - Returns: - A list of ints. - """ - if token: - segments = token.split(EventStream.SEPARATOR) - else: - segments = [None] * len(self.stream_data) - return segments -- cgit 1.4.1