diff options
Diffstat (limited to 'synapse/api/streams')
-rw-r--r-- | synapse/api/streams/__init__.py | 96 | ||||
-rw-r--r-- | synapse/api/streams/event.py | 247 |
2 files changed, 343 insertions, 0 deletions
diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py new file mode 100644 index 0000000000..08137c1e79 --- /dev/null +++ b/synapse/api/streams/__init__.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.errors import SynapseError + + +class PaginationConfig(object): + + """A configuration object which stores pagination parameters.""" + + def __init__(self, from_tok=None, to_tok=None, limit=0): + self.from_tok = from_tok + self.to_tok = to_tok + self.limit = limit + + @classmethod + def from_request(cls, request, raise_invalid_params=True): + params = { + "from_tok": PaginationStream.TOK_START, + "to_tok": PaginationStream.TOK_END, + "limit": 0 + } + + query_param_mappings = [ # 3-tuple of qp_key, attribute, rules + ("from", "from_tok", lambda x: type(x) == str), + ("to", "to_tok", lambda x: type(x) == str), + ("limit", "limit", lambda x: x.isdigit()) + ] + + for qp, attr, is_valid in query_param_mappings: + if qp in request.args: + if is_valid(request.args[qp][0]): + params[attr] = request.args[qp][0] + elif raise_invalid_params: + raise SynapseError(400, "%s parameter is invalid." % qp) + + return PaginationConfig(**params) + + +class PaginationStream(object): + + """ An interface for streaming data as chunks. """ + + TOK_START = "START" + TOK_END = "END" + + def get_chunk(self, config=None): + """ Return the next chunk in the stream. + + Args: + config (PaginationConfig): The config to aid which chunk to get. + Returns: + A dict containing the new start token "start", the new end token + "end" and the data "chunk" as a list. + """ + raise NotImplementedError() + + +class StreamData(object): + + """ An interface for obtaining streaming data from a table. """ + + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + + def get_rows(self, user_id, from_pkey, to_pkey, limit): + """ Get event stream data between the specified pkeys. + + Args: + user_id : The user's ID + from_pkey : The starting pkey. + to_pkey : The end pkey. May be -1 to mean "latest". + limit: The max number of results to return. + Returns: + A tuple containing the list of event stream data and the last pkey. + """ + raise NotImplementedError() + + def max_token(self): + """ Get the latest currently-valid token. + + Returns: + The latest token.""" + raise NotImplementedError() diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py new file mode 100644 index 0000000000..0cc1a3e36a --- /dev/null +++ b/synapse/api/streams/event.py @@ -0,0 +1,247 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This module contains classes for streaming from the event stream: /events. +""" +from twisted.internet import defer + +from synapse.api.errors import EventStreamError +from synapse.api.events.room import ( + RoomMemberEvent, MessageEvent, FeedbackEvent, RoomTopicEvent +) +from synapse.api.streams import PaginationStream, StreamData + +import logging + +logger = logging.getLogger(__name__) + + +class MessagesStreamData(StreamData): + EVENT_TYPE = MessageEvent.TYPE + + def __init__(self, hs, room_id=None, feedback=False): + super(MessagesStreamData, self).__init__(hs) + self.room_id = room_id + self.with_feedback = feedback + + @defer.inlineCallbacks + def get_rows(self, user_id, from_key, to_key, limit): + (data, latest_ver) = yield self.store.get_message_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key, + limit=limit, + room_id=self.room_id, + with_feedback=self.with_feedback + ) + defer.returnValue((data, latest_ver)) + + @defer.inlineCallbacks + def max_token(self): + val = yield self.store.get_max_message_id() + defer.returnValue(val) + + +class RoomMemberStreamData(StreamData): + EVENT_TYPE = RoomMemberEvent.TYPE + + @defer.inlineCallbacks + def get_rows(self, user_id, from_key, to_key, limit): + (data, latest_ver) = yield self.store.get_room_member_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key + ) + + defer.returnValue((data, latest_ver)) + + @defer.inlineCallbacks + def max_token(self): + val = yield self.store.get_max_room_member_id() + defer.returnValue(val) + + +class FeedbackStreamData(StreamData): + EVENT_TYPE = FeedbackEvent.TYPE + + def __init__(self, hs, room_id=None): + super(FeedbackStreamData, self).__init__(hs) + self.room_id = room_id + + @defer.inlineCallbacks + def get_rows(self, user_id, from_key, to_key, limit): + (data, latest_ver) = yield self.store.get_feedback_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key, + limit=limit, + room_id=self.room_id + ) + defer.returnValue((data, latest_ver)) + + @defer.inlineCallbacks + def max_token(self): + val = yield self.store.get_max_feedback_id() + defer.returnValue(val) + + +class RoomDataStreamData(StreamData): + EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types + + def __init__(self, hs, room_id=None): + super(RoomDataStreamData, self).__init__(hs) + self.room_id = room_id + + @defer.inlineCallbacks + def get_rows(self, user_id, from_key, to_key, limit): + (data, latest_ver) = yield self.store.get_room_data_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key, + limit=limit, + room_id=self.room_id + ) + defer.returnValue((data, latest_ver)) + + @defer.inlineCallbacks + def max_token(self): + val = yield self.store.get_max_room_data_id() + defer.returnValue(val) + + +class EventStream(PaginationStream): + + SEPARATOR = '_' + + def __init__(self, user_id, stream_data_list): + super(EventStream, self).__init__() + self.user_id = user_id + self.stream_data = stream_data_list + + @defer.inlineCallbacks + def fix_tokens(self, pagination_config): + pagination_config.from_tok = yield self.fix_token( + pagination_config.from_tok) + pagination_config.to_tok = yield self.fix_token( + pagination_config.to_tok) + defer.returnValue(pagination_config) + + @defer.inlineCallbacks + def fix_token(self, token): + """Fixes unknown values in a token to known values. + + Args: + token (str): The token to fix up. + Returns: + The fixed-up token, which may == token. + """ + # replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending. + replacements = [ + (PaginationStream.TOK_START, "0"), + (PaginationStream.TOK_END, "-1") + ] + for magic_token, key in replacements: + if magic_token == token: + token = EventStream.SEPARATOR.join( + [key] * len(self.stream_data) + ) + + # replace -1 values with an actual pkey + token_segments = self._split_token(token) + for i, tok in enumerate(token_segments): + if tok == -1: + # add 1 to the max token because results are EXCLUSIVE from the + # latest version. + token_segments[i] = 1 + (yield self.stream_data[i].max_token()) + defer.returnValue(EventStream.SEPARATOR.join( + str(x) for x in token_segments + )) + + @defer.inlineCallbacks + def get_chunk(self, config=None): + # no support for limit on >1 streams, makes no sense. + if config.limit and len(self.stream_data) > 1: + raise EventStreamError( + 400, "Limit not supported on multiplexed streams." + ) + + (chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok, + config.to_tok, + config.limit) + + defer.returnValue({ + "chunk": chunk_data, + "start": config.from_tok, + "end": next_tok + }) + + @defer.inlineCallbacks + def _get_chunk_data(self, from_tok, to_tok, limit): + """ Get event data between the two tokens. + + Tokens are SEPARATOR separated values representing pkey values of + certain tables, and the position determines the StreamData invoked + according to the STREAM_DATA list. + + The magic value '-1' can be used to get the latest value. + + Args: + from_tok - The token to start from. + to_tok - The token to end at. Must have values > from_tok or be -1. + Returns: + A list of event data. + Raises: + EventStreamError if something went wrong. + """ + # sanity check + if (from_tok.count(EventStream.SEPARATOR) != + to_tok.count(EventStream.SEPARATOR) or + (from_tok.count(EventStream.SEPARATOR) + 1) != + len(self.stream_data)): + raise EventStreamError(400, "Token lengths don't match.") + + chunk = [] + next_ver = [] + for i, (from_pkey, to_pkey) in enumerate(zip( + self._split_token(from_tok), + self._split_token(to_tok) + )): + if from_pkey == to_pkey: + # tokens are the same, we have nothing to do. + next_ver.append(str(to_pkey)) + continue + + (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( + self.user_id, from_pkey, to_pkey, limit + ) + + chunk += event_chunk + next_ver.append(str(max_pkey)) + + defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) + + def _split_token(self, token): + """Splits the given token into a list of pkeys. + + Args: + token (str): The token with SEPARATOR values. + Returns: + A list of ints. + """ + segments = token.split(EventStream.SEPARATOR) + try: + int_segments = [int(x) for x in segments] + except ValueError: + raise EventStreamError(400, "Bad token: %s" % token) + return int_segments |