diff options
author | Erik Johnston <erik@matrix.org> | 2014-08-26 18:57:46 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-08-26 18:57:46 +0100 |
commit | 3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 (patch) | |
tree | 5f5b515cdaa586f1f7f92337471dfdc2e0d4683d /synapse/api/streams | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into stream_refactor (diff) | |
download | synapse-3a2a5b959cb1f56b26af32e1ad4c1db424279eb7.tar.xz |
WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model.
Diffstat (limited to 'synapse/api/streams')
-rw-r--r-- | synapse/api/streams/__init__.py | 107 | ||||
-rw-r--r-- | synapse/api/streams/event.py | 194 |
2 files changed, 0 insertions, 301 deletions
diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py deleted file mode 100644 index 0ba4783ea2..0000000000 --- a/synapse/api/streams/__init__.py +++ /dev/null @@ -1,107 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.api.errors import SynapseError -from synapse.types import StreamToken - - -class PaginationConfig(object): - - """A configuration object which stores pagination parameters.""" - - def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_tok = StreamToken(from_tok) if from_tok else None - self.to_tok = StreamToken(to_tok) if to_tok else None - self.direction = 'f' if direction == 'f' else 'b' - self.limit = int(limit) - - @classmethod - def from_request(cls, request, raise_invalid_params=True): - params = { - "from_tok": "END", - "direction": 'f', - } - - query_param_mappings = [ # 3-tuple of qp_key, attribute, rules - ("from", "from_tok", lambda x: type(x) == str), - ("to", "to_tok", lambda x: type(x) == str), - ("limit", "limit", lambda x: x.isdigit()), - ("dir", "direction", lambda x: x == 'f' or x == 'b'), - ] - - for qp, attr, is_valid in query_param_mappings: - if qp in request.args: - if is_valid(request.args[qp][0]): - params[attr] = request.args[qp][0] - elif raise_invalid_params: - raise SynapseError(400, "%s parameter is invalid." % qp) - - try: - return PaginationConfig(**params) - except: - raise SynapseError(400, "Invalid request.") - - def __str__(self): - return ( - "<PaginationConfig from_tok=%s, to_tok=%s, " - "direction=%s, limit=%s>" - ) % (self.from_tok, self.to_tok, self.direction, self.limit) - - -class PaginationStream(object): - - """ An interface for streaming data as chunks. """ - - TOK_END = "END" - - def get_chunk(self, config=None): - """ Return the next chunk in the stream. - - Args: - config (PaginationConfig): The config to aid which chunk to get. - Returns: - A dict containing the new start token "start", the new end token - "end" and the data "chunk" as a list. - """ - raise NotImplementedError() - - -class StreamData(object): - - """ An interface for obtaining streaming data from a table. """ - - def __init__(self, hs): - self.hs = hs - self.store = hs.get_datastore() - - def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): - """ Get event stream data between the specified pkeys. - - Args: - user_id : The user's ID - from_pkey : The starting pkey. - to_pkey : The end pkey. May be -1 to mean "latest". - limit: The max number of results to return. - Returns: - A tuple containing the list of event stream data and the last pkey. - """ - raise NotImplementedError() - - def max_token(self): - """ Get the latest currently-valid token. - - Returns: - The latest token.""" - raise NotImplementedError() diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py deleted file mode 100644 index fe44a488bc..0000000000 --- a/synapse/api/streams/event.py +++ /dev/null @@ -1,194 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains classes for streaming from the event stream: /events. -""" -from twisted.internet import defer - -from synapse.api.errors import EventStreamError -from synapse.api.events import SynapseEvent -from synapse.api.streams import PaginationStream, StreamData - -import logging - -logger = logging.getLogger(__name__) - - -class EventsStreamData(StreamData): - EVENT_TYPE = "EventsStream" - - def __init__(self, hs, room_id=None, feedback=False): - super(EventsStreamData, self).__init__(hs) - self.room_id = room_id - self.with_feedback = feedback - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit, direction): - data, latest_ver = yield self.store.get_room_events( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id, - with_feedback=self.with_feedback - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_room_events_max_id() - defer.returnValue(val) - - -class EventStream(PaginationStream): - - SEPARATOR = '_' - - def __init__(self, user_id, stream_data_list): - super(EventStream, self).__init__() - self.user_id = user_id - self.stream_data = stream_data_list - - @defer.inlineCallbacks - def fix_tokens(self, pagination_config): - pagination_config.from_tok = yield self.fix_token( - pagination_config.from_tok) - pagination_config.to_tok = yield self.fix_token( - pagination_config.to_tok) - - if ( - not pagination_config.to_tok - and pagination_config.direction == 'f' - ): - pagination_config.to_tok = yield self.get_current_max_token() - - logger.debug("pagination_config: %s", pagination_config) - - defer.returnValue(pagination_config) - - @defer.inlineCallbacks - def fix_token(self, token): - """Fixes unknown values in a token to known values. - - Args: - token (str): The token to fix up. - Returns: - The fixed-up token, which may == token. - """ - if token == PaginationStream.TOK_END: - new_token = yield self.get_current_max_token() - - logger.debug("fix_token: From %s to %s", token, new_token) - - token = new_token - - defer.returnValue(token) - - @defer.inlineCallbacks - def get_current_max_token(self): - new_token_parts = [] - for s in self.stream_data: - mx = yield s.max_token() - new_token_parts.append(str(mx)) - - new_token = EventStream.SEPARATOR.join(new_token_parts) - - logger.debug("get_current_max_token: %s", new_token) - - defer.returnValue(new_token) - - @defer.inlineCallbacks - def get_chunk(self, config): - # no support for limit on >1 streams, makes no sense. - if config.limit and len(self.stream_data) > 1: - raise EventStreamError( - 400, "Limit not supported on multiplexed streams." - ) - - chunk_data, next_tok = yield self._get_chunk_data( - config.from_tok, - config.to_tok, - config.limit, - config.direction, - ) - - defer.returnValue({ - "chunk": chunk_data, - "start": config.from_tok, - "end": next_tok - }) - - @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit, direction): - """ Get event data between the two tokens. - - Tokens are SEPARATOR separated values representing pkey values of - certain tables, and the position determines the StreamData invoked - according to the STREAM_DATA list. - - The magic value '-1' can be used to get the latest value. - - Args: - from_tok - The token to start from. - to_tok - The token to end at. Must have values > from_tok or be -1. - Returns: - A list of event data. - Raises: - EventStreamError if something went wrong. - """ - # sanity check - if to_tok is not None: - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") - - chunk = [] - next_ver = [] - for i, (from_pkey, to_pkey) in enumerate(zip( - self._split_token(from_tok), - self._split_token(to_tok) - )): - if from_pkey == to_pkey: - # tokens are the same, we have nothing to do. - next_ver.append(str(to_pkey)) - continue - - (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit, direction, - ) - - chunk.extend([ - e.get_dict() if isinstance(e, SynapseEvent) else e - for e in event_chunk - ]) - next_ver.append(str(max_pkey)) - - defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) - - def _split_token(self, token): - """Splits the given token into a list of pkeys. - - Args: - token (str): The token with SEPARATOR values. - Returns: - A list of ints. - """ - if token: - segments = token.split(EventStream.SEPARATOR) - else: - segments = [None] * len(self.stream_data) - return segments |