diff options
author | Erik Johnston <erik@matrix.org> | 2014-08-26 18:57:46 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-08-26 18:57:46 +0100 |
commit | 3a2a5b959cb1f56b26af32e1ad4c1db424279eb7 (patch) | |
tree | 5f5b515cdaa586f1f7f92337471dfdc2e0d4683d /synapse/api | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into stream_refactor (diff) | |
download | synapse-3a2a5b959cb1f56b26af32e1ad4c1db424279eb7.tar.xz |
WIP: Completely change how event streaming and pagination work. This reflects the change in the underlying storage model.
Diffstat (limited to 'synapse/api')
-rw-r--r-- | synapse/api/notifier.py | 196 | ||||
-rw-r--r-- | synapse/api/streams/__init__.py | 107 | ||||
-rw-r--r-- | synapse/api/streams/event.py | 194 |
3 files changed, 0 insertions, 497 deletions
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py deleted file mode 100644 index ec9c4e513d..0000000000 --- a/synapse/api/notifier.py +++ /dev/null @@ -1,196 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.api.constants import Membership -from synapse.api.events.room import RoomMemberEvent -from synapse.api.streams.event import EventsStreamData - -from twisted.internet import defer -from twisted.internet import reactor - -import logging - -logger = logging.getLogger(__name__) - - -class Notifier(object): - - def __init__(self, hs): - self.store = hs.get_datastore() - self.hs = hs - self.stored_event_listeners = {} - - @defer.inlineCallbacks - def on_new_room_event(self, event, store_id): - """Called when there is a new room event which may potentially be sent - down listening users' event streams. - - This function looks for interested *users* who may want to be notified - for this event. This is different to users requesting from the event - stream which looks for interested *events* for this user. - - Args: - event (SynapseEvent): The new event, which must have a room_id - store_id (int): The ID of this event after it was stored with the - data store. - '""" - member_list = yield self.store.get_room_members(room_id=event.room_id, - membership="join") - if not member_list: - member_list = [] - - member_list = [u.user_id for u in member_list] - - # invites MUST prod the person being invited, who won't be in the room. - if (event.type == RoomMemberEvent.TYPE and - event.content["membership"] == Membership.INVITE): - member_list.append(event.state_key) - # similarly, LEAVEs must be sent to the person leaving - if (event.type == RoomMemberEvent.TYPE and - event.content["membership"] == Membership.LEAVE): - member_list.append(event.state_key) - - for user_id in member_list: - if user_id in self.stored_event_listeners: - self._notify_and_callback( - user_id=user_id, - event_data=event.get_dict(), - stream_type=EventsStreamData.EVENT_TYPE, - store_id=store_id) - - def on_new_user_event(self, user_id, event_data, stream_type, store_id): - if user_id in self.stored_event_listeners: - self._notify_and_callback( - user_id=user_id, - event_data=event_data, - stream_type=stream_type, - store_id=store_id - ) - - def _notify_and_callback(self, user_id, event_data, stream_type, store_id): - logger.debug( - "Notifying %s of a new event.", - user_id - ) - - stream_ids = list(self.stored_event_listeners[user_id]) - for stream_id in stream_ids: - self._notify_and_callback_stream(user_id, stream_id, event_data, - stream_type, store_id) - - if not self.stored_event_listeners[user_id]: - del self.stored_event_listeners[user_id] - - def _notify_and_callback_stream(self, user_id, stream_id, event_data, - stream_type, store_id): - - event_listener = self.stored_event_listeners[user_id].pop(stream_id) - return_event_object = { - k: event_listener[k] for k in ["start", "chunk", "end"] - } - - # work out the new end token - token = event_listener["start"] - end = self._next_token(stream_type, store_id, token) - return_event_object["end"] = end - - # add the event to the chunk - chunk = event_listener["chunk"] - chunk.append(event_data) - - # callback the defer. We know this can't have been resolved before as - # we always remove the event_listener from the map before resolving. - event_listener["defer"].callback(return_event_object) - - def _next_token(self, stream_type, store_id, current_token): - stream_handler = self.hs.get_handlers().event_stream_handler - return stream_handler.get_event_stream_token( - stream_type, - store_id, - current_token - ) - - def store_events_for(self, user_id=None, stream_id=None, from_tok=None): - """Store all incoming events for this user. This should be paired with - get_events_for to return chunked data. - - Args: - user_id (str): The user to monitor incoming events for. - stream (object): The stream that is receiving events - from_tok (str): The token to monitor incoming events from. - """ - event_listener = { - "start": from_tok, - "chunk": [], - "end": from_tok, - "defer": defer.Deferred(), - } - - if user_id not in self.stored_event_listeners: - self.stored_event_listeners[user_id] = {stream_id: event_listener} - else: - self.stored_event_listeners[user_id][stream_id] = event_listener - - def purge_events_for(self, user_id=None, stream_id=None): - """Purges any stored events for this user. - - Args: - user_id (str): The user to purge stored events for. - """ - try: - del self.stored_event_listeners[user_id][stream_id] - if not self.stored_event_listeners[user_id]: - del self.stored_event_listeners[user_id] - except KeyError: - pass - - def get_events_for(self, user_id=None, stream_id=None, timeout=0): - """Retrieve stored events for this user, waiting if necessary. - - It is advisable to wrap this call in a maybeDeferred. - - Args: - user_id (str): The user to get events for. - timeout (int): The time in seconds to wait before giving up. - Returns: - A Deferred or a dict containing the chunk data, depending on if - there was data to return yet. The Deferred callback may be None if - there were no events before the timeout expired. - """ - logger.debug("%s is listening for events.", user_id) - - try: - streams = self.stored_event_listeners[user_id][stream_id]["chunk"] - if streams: - logger.debug("%s returning existing chunk.", user_id) - return streams - except KeyError: - return None - - reactor.callLater( - (timeout / 1000.0), self._timeout, user_id, stream_id - ) - return self.stored_event_listeners[user_id][stream_id]["defer"] - - def _timeout(self, user_id, stream_id): - try: - # We remove the event_listener from the map so that we can't - # resolve the deferred twice. - event_listeners = self.stored_event_listeners[user_id] - event_listener = event_listeners.pop(stream_id) - event_listener["defer"].callback(None) - logger.debug("%s event listening timed out.", user_id) - except KeyError: - pass diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py deleted file mode 100644 index 0ba4783ea2..0000000000 --- a/synapse/api/streams/__init__.py +++ /dev/null @@ -1,107 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.api.errors import SynapseError -from synapse.types import StreamToken - - -class PaginationConfig(object): - - """A configuration object which stores pagination parameters.""" - - def __init__(self, from_tok=None, to_tok=None, direction='f', limit=0): - self.from_tok = StreamToken(from_tok) if from_tok else None - self.to_tok = StreamToken(to_tok) if to_tok else None - self.direction = 'f' if direction == 'f' else 'b' - self.limit = int(limit) - - @classmethod - def from_request(cls, request, raise_invalid_params=True): - params = { - "from_tok": "END", - "direction": 'f', - } - - query_param_mappings = [ # 3-tuple of qp_key, attribute, rules - ("from", "from_tok", lambda x: type(x) == str), - ("to", "to_tok", lambda x: type(x) == str), - ("limit", "limit", lambda x: x.isdigit()), - ("dir", "direction", lambda x: x == 'f' or x == 'b'), - ] - - for qp, attr, is_valid in query_param_mappings: - if qp in request.args: - if is_valid(request.args[qp][0]): - params[attr] = request.args[qp][0] - elif raise_invalid_params: - raise SynapseError(400, "%s parameter is invalid." % qp) - - try: - return PaginationConfig(**params) - except: - raise SynapseError(400, "Invalid request.") - - def __str__(self): - return ( - "<PaginationConfig from_tok=%s, to_tok=%s, " - "direction=%s, limit=%s>" - ) % (self.from_tok, self.to_tok, self.direction, self.limit) - - -class PaginationStream(object): - - """ An interface for streaming data as chunks. """ - - TOK_END = "END" - - def get_chunk(self, config=None): - """ Return the next chunk in the stream. - - Args: - config (PaginationConfig): The config to aid which chunk to get. - Returns: - A dict containing the new start token "start", the new end token - "end" and the data "chunk" as a list. - """ - raise NotImplementedError() - - -class StreamData(object): - - """ An interface for obtaining streaming data from a table. """ - - def __init__(self, hs): - self.hs = hs - self.store = hs.get_datastore() - - def get_rows(self, user_id, from_pkey, to_pkey, limit, direction): - """ Get event stream data between the specified pkeys. - - Args: - user_id : The user's ID - from_pkey : The starting pkey. - to_pkey : The end pkey. May be -1 to mean "latest". - limit: The max number of results to return. - Returns: - A tuple containing the list of event stream data and the last pkey. - """ - raise NotImplementedError() - - def max_token(self): - """ Get the latest currently-valid token. - - Returns: - The latest token.""" - raise NotImplementedError() diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py deleted file mode 100644 index fe44a488bc..0000000000 --- a/synapse/api/streams/event.py +++ /dev/null @@ -1,194 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014 matrix.org -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""This module contains classes for streaming from the event stream: /events. -""" -from twisted.internet import defer - -from synapse.api.errors import EventStreamError -from synapse.api.events import SynapseEvent -from synapse.api.streams import PaginationStream, StreamData - -import logging - -logger = logging.getLogger(__name__) - - -class EventsStreamData(StreamData): - EVENT_TYPE = "EventsStream" - - def __init__(self, hs, room_id=None, feedback=False): - super(EventsStreamData, self).__init__(hs) - self.room_id = room_id - self.with_feedback = feedback - - @defer.inlineCallbacks - def get_rows(self, user_id, from_key, to_key, limit, direction): - data, latest_ver = yield self.store.get_room_events( - user_id=user_id, - from_key=from_key, - to_key=to_key, - limit=limit, - room_id=self.room_id, - with_feedback=self.with_feedback - ) - defer.returnValue((data, latest_ver)) - - @defer.inlineCallbacks - def max_token(self): - val = yield self.store.get_room_events_max_id() - defer.returnValue(val) - - -class EventStream(PaginationStream): - - SEPARATOR = '_' - - def __init__(self, user_id, stream_data_list): - super(EventStream, self).__init__() - self.user_id = user_id - self.stream_data = stream_data_list - - @defer.inlineCallbacks - def fix_tokens(self, pagination_config): - pagination_config.from_tok = yield self.fix_token( - pagination_config.from_tok) - pagination_config.to_tok = yield self.fix_token( - pagination_config.to_tok) - - if ( - not pagination_config.to_tok - and pagination_config.direction == 'f' - ): - pagination_config.to_tok = yield self.get_current_max_token() - - logger.debug("pagination_config: %s", pagination_config) - - defer.returnValue(pagination_config) - - @defer.inlineCallbacks - def fix_token(self, token): - """Fixes unknown values in a token to known values. - - Args: - token (str): The token to fix up. - Returns: - The fixed-up token, which may == token. - """ - if token == PaginationStream.TOK_END: - new_token = yield self.get_current_max_token() - - logger.debug("fix_token: From %s to %s", token, new_token) - - token = new_token - - defer.returnValue(token) - - @defer.inlineCallbacks - def get_current_max_token(self): - new_token_parts = [] - for s in self.stream_data: - mx = yield s.max_token() - new_token_parts.append(str(mx)) - - new_token = EventStream.SEPARATOR.join(new_token_parts) - - logger.debug("get_current_max_token: %s", new_token) - - defer.returnValue(new_token) - - @defer.inlineCallbacks - def get_chunk(self, config): - # no support for limit on >1 streams, makes no sense. - if config.limit and len(self.stream_data) > 1: - raise EventStreamError( - 400, "Limit not supported on multiplexed streams." - ) - - chunk_data, next_tok = yield self._get_chunk_data( - config.from_tok, - config.to_tok, - config.limit, - config.direction, - ) - - defer.returnValue({ - "chunk": chunk_data, - "start": config.from_tok, - "end": next_tok - }) - - @defer.inlineCallbacks - def _get_chunk_data(self, from_tok, to_tok, limit, direction): - """ Get event data between the two tokens. - - Tokens are SEPARATOR separated values representing pkey values of - certain tables, and the position determines the StreamData invoked - according to the STREAM_DATA list. - - The magic value '-1' can be used to get the latest value. - - Args: - from_tok - The token to start from. - to_tok - The token to end at. Must have values > from_tok or be -1. - Returns: - A list of event data. - Raises: - EventStreamError if something went wrong. - """ - # sanity check - if to_tok is not None: - if (from_tok.count(EventStream.SEPARATOR) != - to_tok.count(EventStream.SEPARATOR) or - (from_tok.count(EventStream.SEPARATOR) + 1) != - len(self.stream_data)): - raise EventStreamError(400, "Token lengths don't match.") - - chunk = [] - next_ver = [] - for i, (from_pkey, to_pkey) in enumerate(zip( - self._split_token(from_tok), - self._split_token(to_tok) - )): - if from_pkey == to_pkey: - # tokens are the same, we have nothing to do. - next_ver.append(str(to_pkey)) - continue - - (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( - self.user_id, from_pkey, to_pkey, limit, direction, - ) - - chunk.extend([ - e.get_dict() if isinstance(e, SynapseEvent) else e - for e in event_chunk - ]) - next_ver.append(str(max_pkey)) - - defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) - - def _split_token(self, token): - """Splits the given token into a list of pkeys. - - Args: - token (str): The token with SEPARATOR values. - Returns: - A list of ints. - """ - if token: - segments = token.split(EventStream.SEPARATOR) - else: - segments = [None] * len(self.stream_data) - return segments |