diff options
Diffstat (limited to 'synapse/api')
-rw-r--r-- | synapse/api/__init__.py | 14 | ||||
-rw-r--r-- | synapse/api/auth.py | 164 | ||||
-rw-r--r-- | synapse/api/constants.py | 42 | ||||
-rw-r--r-- | synapse/api/errors.py | 114 | ||||
-rw-r--r-- | synapse/api/events/__init__.py | 152 | ||||
-rw-r--r-- | synapse/api/events/factory.py | 50 | ||||
-rw-r--r-- | synapse/api/events/room.py | 99 | ||||
-rw-r--r-- | synapse/api/notifier.py | 186 | ||||
-rw-r--r-- | synapse/api/streams/__init__.py | 96 | ||||
-rw-r--r-- | synapse/api/streams/event.py | 247 |
10 files changed, 1164 insertions, 0 deletions
diff --git a/synapse/api/__init__.py b/synapse/api/__init__.py new file mode 100644 index 0000000000..fe8a073cd3 --- /dev/null +++ b/synapse/api/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/synapse/api/auth.py b/synapse/api/auth.py new file mode 100644 index 0000000000..5c66a7261f --- /dev/null +++ b/synapse/api/auth.py @@ -0,0 +1,164 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This module contains classes for authenticating the user.""" +from twisted.internet import defer + +from synapse.api.constants import Membership +from synapse.api.errors import AuthError, StoreError +from synapse.api.events.room import (RoomTopicEvent, RoomMemberEvent, + MessageEvent, FeedbackEvent) + +import logging + +logger = logging.getLogger(__name__) + + +class Auth(object): + + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def check(self, event, raises=False): + """ Checks if this event is correctly authed. + + Returns: + True if the auth checks pass. + Raises: + AuthError if there was a problem authorising this event. This will + be raised only if raises=True. + """ + try: + if event.type in [RoomTopicEvent.TYPE, MessageEvent.TYPE, + FeedbackEvent.TYPE]: + yield self.check_joined_room(event.room_id, event.user_id) + defer.returnValue(True) + elif event.type == RoomMemberEvent.TYPE: + allowed = yield self.is_membership_change_allowed(event) + defer.returnValue(allowed) + else: + raise AuthError(500, "Unknown event type %s" % event.type) + except AuthError as e: + logger.info("Event auth check failed on event %s with msg: %s", + event, e.msg) + if raises: + raise e + defer.returnValue(False) + + @defer.inlineCallbacks + def check_joined_room(self, room_id, user_id): + try: + member = yield self.store.get_room_member( + room_id=room_id, + user_id=user_id + ) + if not member or member.membership != Membership.JOIN: + raise AuthError(403, "User %s not in room %s" % + (user_id, room_id)) + defer.returnValue(member) + except AttributeError: + pass + defer.returnValue(None) + + @defer.inlineCallbacks + def is_membership_change_allowed(self, event): + # does this room even exist + room = yield self.store.get_room(event.room_id) + if not room: + raise AuthError(403, "Room does not exist") + + # get info about the caller + try: + caller = yield self.store.get_room_member( + user_id=event.user_id, + room_id=event.room_id) + except: + caller = None + caller_in_room = caller and caller.membership == "join" + + # get info about the target + try: + target = yield self.store.get_room_member( + user_id=event.target_user_id, + room_id=event.room_id) + except: + target = None + target_in_room = target and target.membership == "join" + + membership = event.content["membership"] + + if Membership.INVITE == membership: + # Invites are valid iff caller is in the room and target isn't. + if not caller_in_room: # caller isn't joined + raise AuthError(403, "You are not in room %s." % event.room_id) + elif target_in_room: # the target is already in the room. + raise AuthError(403, "%s is already in the room." % + event.target_user_id) + elif Membership.JOIN == membership: + # Joins are valid iff caller == target and they were: + # invited: They are accepting the invitation + # joined: It's a NOOP + if event.user_id != event.target_user_id: + raise AuthError(403, "Cannot force another user to join.") + elif room.is_public: + pass # anyone can join public rooms. + elif (not caller or caller.membership not in + [Membership.INVITE, Membership.JOIN]): + raise AuthError(403, "You are not invited to this room.") + elif Membership.LEAVE == membership: + if not caller_in_room: # trying to leave a room you aren't joined + raise AuthError(403, "You are not in room %s." % event.room_id) + elif event.target_user_id != event.user_id: + # trying to force another user to leave + raise AuthError(403, "Cannot force %s to leave." % + event.target_user_id) + else: + raise AuthError(500, "Unknown membership %s" % membership) + + defer.returnValue(True) + + def get_user_by_req(self, request): + """ Get a registered user's ID. + + Args: + request - An HTTP request with an access_token query parameter. + Returns: + UserID : User ID object of the user making the request + Raises: + AuthError if no user by that token exists or the token is invalid. + """ + # Can optionally look elsewhere in the request (e.g. headers) + try: + return self.get_user_by_token(request.args["access_token"][0]) + except KeyError: + raise AuthError(403, "Missing access token.") + + @defer.inlineCallbacks + def get_user_by_token(self, token): + """ Get a registered user's ID. + + Args: + token (str)- The access token to get the user by. + Returns: + UserID : User ID object of the user who has that access token. + Raises: + AuthError if no user by that token exists or the token is invalid. + """ + try: + user_id = yield self.store.get_user_by_token(token=token) + defer.returnValue(self.hs.parse_userid(user_id)) + except StoreError: + raise AuthError(403, "Unrecognised access token.") diff --git a/synapse/api/constants.py b/synapse/api/constants.py new file mode 100644 index 0000000000..37bf41bfb3 --- /dev/null +++ b/synapse/api/constants.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Contains constants from the specification.""" + + +class Membership(object): + + """Represents the membership states of a user in a room.""" + INVITE = u"invite" + JOIN = u"join" + KNOCK = u"knock" + LEAVE = u"leave" + + +class Feedback(object): + + """Represents the types of feedback a user can send in response to a + message.""" + + DELIVERED = u"d" + READ = u"r" + LIST = (DELIVERED, READ) + + +class PresenceState(object): + """Represents the presence state of a user.""" + OFFLINE = 0 + BUSY = 1 + ONLINE = 2 + FREE_FOR_CHAT = 3 diff --git a/synapse/api/errors.py b/synapse/api/errors.py new file mode 100644 index 0000000000..7ad4d636c2 --- /dev/null +++ b/synapse/api/errors.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Contains exceptions and error codes.""" + +import logging + + +class Codes(object): + FORBIDDEN = "M_FORBIDDEN" + BAD_JSON = "M_BAD_JSON" + NOT_JSON = "M_NOT_JSON" + USER_IN_USE = "M_USER_IN_USE" + ROOM_IN_USE = "M_ROOM_IN_USE" + BAD_PAGINATION = "M_BAD_PAGINATION" + UNKNOWN = "M_UNKNOWN" + NOT_FOUND = "M_NOT_FOUND" + + +class CodeMessageException(Exception): + """An exception with integer code and message string attributes.""" + + def __init__(self, code, msg): + logging.error("%s: %s, %s", type(self).__name__, code, msg) + super(CodeMessageException, self).__init__("%d: %s" % (code, msg)) + self.code = code + self.msg = msg + + +class SynapseError(CodeMessageException): + """A base error which can be caught for all synapse events.""" + def __init__(self, code, msg, errcode=""): + """Constructs a synapse error. + + Args: + code (int): The integer error code (typically an HTTP response code) + msg (str): The human-readable error message. + err (str): The error code e.g 'M_FORBIDDEN' + """ + super(SynapseError, self).__init__(code, msg) + self.errcode = errcode + + +class RoomError(SynapseError): + """An error raised when a room event fails.""" + pass + + +class RegistrationError(SynapseError): + """An error raised when a registration event fails.""" + pass + + +class AuthError(SynapseError): + """An error raised when there was a problem authorising an event.""" + + def __init__(self, *args, **kwargs): + if "errcode" not in kwargs: + kwargs["errcode"] = Codes.FORBIDDEN + super(AuthError, self).__init__(*args, **kwargs) + + +class EventStreamError(SynapseError): + """An error raised when there a problem with the event stream.""" + pass + + +class LoginError(SynapseError): + """An error raised when there was a problem logging in.""" + pass + + +class StoreError(SynapseError): + """An error raised when there was a problem storing some data.""" + pass + + +def cs_exception(exception): + if isinstance(exception, SynapseError): + return cs_error( + exception.msg, + Codes.UNKNOWN if not exception.errcode else exception.errcode) + elif isinstance(exception, CodeMessageException): + return cs_error(exception.msg) + else: + logging.error("Unknown exception type: %s", type(exception)) + + +def cs_error(msg, code=Codes.UNKNOWN, **kwargs): + """ Utility method for constructing an error response for client-server + interactions. + + Args: + msg (str): The error message. + code (int): The error code. + kwargs : Additional keys to add to the response. + Returns: + A dict representing the error response JSON. + """ + err = {"error": msg, "errcode": code} + for key, value in kwargs.iteritems(): + err[key] = value + return err diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py new file mode 100644 index 0000000000..bc2daf3361 --- /dev/null +++ b/synapse/api/events/__init__.py @@ -0,0 +1,152 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.errors import SynapseError, Codes +from synapse.util.jsonobject import JsonEncodedObject + + +class SynapseEvent(JsonEncodedObject): + + """Base class for Synapse events. These are JSON objects which must abide + by a certain well-defined structure. + """ + + # Attributes that are currently assumed by the federation side: + # Mandatory: + # - event_id + # - room_id + # - type + # - is_state + # + # Optional: + # - state_key (mandatory when is_state is True) + # - prev_events (these can be filled out by the federation layer itself.) + # - prev_state + + valid_keys = [ + "event_id", + "type", + "room_id", + "user_id", # sender/initiator + "content", # HTTP body, JSON + ] + + internal_keys = [ + "is_state", + "state_key", + "prev_events", + "prev_state", + "depth", + "destinations", + "origin", + ] + + required_keys = [ + "event_id", + "room_id", + "content", + ] + + def __init__(self, raises=True, **kwargs): + super(SynapseEvent, self).__init__(**kwargs) + if "content" in kwargs: + self.check_json(self.content, raises=raises) + + def get_content_template(self): + """ Retrieve the JSON template for this event as a dict. + + The template must be a dict representing the JSON to match. Only + required keys should be present. The values of the keys in the template + are checked via type() to the values of the same keys in the actual + event JSON. + + NB: If loading content via json.loads, you MUST define strings as + unicode. + + For example: + Content: + { + "name": u"bob", + "age": 18, + "friends": [u"mike", u"jill"] + } + Template: + { + "name": u"string", + "age": 0, + "friends": [u"string"] + } + The values "string" and 0 could be anything, so long as the types + are the same as the content. + """ + raise NotImplementedError("get_content_template not implemented.") + + def check_json(self, content, raises=True): + """Checks the given JSON content abides by the rules of the template. + + Args: + content : A JSON object to check. + raises: True to raise a SynapseError if the check fails. + Returns: + True if the content passes the template. Returns False if the check + fails and raises=False. + Raises: + SynapseError if the check fails and raises=True. + """ + # recursively call to inspect each layer + err_msg = self._check_json(content, self.get_content_template()) + if err_msg: + if raises: + raise SynapseError(400, err_msg, Codes.BAD_JSON) + else: + return False + else: + return True + + def _check_json(self, content, template): + """Check content and template matches. + + If the template is a dict, each key in the dict will be validated with + the content, else it will just compare the types of content and + template. This basic type check is required because this function will + be recursively called and could be called with just strs or ints. + + Args: + content: The content to validate. + template: The validation template. + Returns: + str: An error message if the validation fails, else None. + """ + if type(content) != type(template): + return "Mismatched types: %s" % template + + if type(template) == dict: + for key in template: + if key not in content: + return "Missing %s key" % key + + if type(content[key]) != type(template[key]): + return "Key %s is of the wrong type." % key + + if type(content[key]) == dict: + # we must go deeper + msg = self._check_json(content[key], template[key]) + if msg: + return msg + elif type(content[key]) == list: + # make sure each item type in content matches the template + for entry in content[key]: + msg = self._check_json(entry, template[key][0]) + if msg: + return msg diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py new file mode 100644 index 0000000000..ea7afa234e --- /dev/null +++ b/synapse/api/events/factory.py @@ -0,0 +1,50 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.events.room import ( + RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent, + InviteJoinEvent, RoomConfigEvent +) + +from synapse.util.stringutils import random_string + + +class EventFactory(object): + + _event_classes = [ + RoomTopicEvent, + MessageEvent, + RoomMemberEvent, + FeedbackEvent, + InviteJoinEvent, + RoomConfigEvent + ] + + def __init__(self): + self._event_list = {} # dict of TYPE to event class + for event_class in EventFactory._event_classes: + self._event_list[event_class.TYPE] = event_class + + def create_event(self, etype=None, **kwargs): + kwargs["type"] = etype + if "event_id" not in kwargs: + kwargs["event_id"] = random_string(10) + + try: + handler = self._event_list[etype] + except KeyError: # unknown event type + # TODO allow custom event types. + raise NotImplementedError("Unknown etype=%s" % etype) + + return handler(**kwargs) diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py new file mode 100644 index 0000000000..b31cd19f4b --- /dev/null +++ b/synapse/api/events/room.py @@ -0,0 +1,99 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from . import SynapseEvent + + +class RoomTopicEvent(SynapseEvent): + TYPE = "m.room.topic" + + def __init__(self, **kwargs): + kwargs["state_key"] = "" + super(RoomTopicEvent, self).__init__(**kwargs) + + def get_content_template(self): + return {"topic": u"string"} + + +class RoomMemberEvent(SynapseEvent): + TYPE = "m.room.member" + + valid_keys = SynapseEvent.valid_keys + [ + "target_user_id", # target + "membership", # action + ] + + def __init__(self, **kwargs): + if "target_user_id" in kwargs: + kwargs["state_key"] = kwargs["target_user_id"] + super(RoomMemberEvent, self).__init__(**kwargs) + + def get_content_template(self): + return {"membership": u"string"} + + +class MessageEvent(SynapseEvent): + TYPE = "m.room.message" + + valid_keys = SynapseEvent.valid_keys + [ + "msg_id", # unique per room + user combo + ] + + def __init__(self, **kwargs): + super(MessageEvent, self).__init__(**kwargs) + + def get_content_template(self): + return {"msgtype": u"string"} + + +class FeedbackEvent(SynapseEvent): + TYPE = "m.room.message.feedback" + + valid_keys = SynapseEvent.valid_keys + [ + "msg_id", # the message ID being acknowledged + "msg_sender_id", # person who is sending the feedback is 'user_id' + "feedback_type", # the type of feedback (delivery, read, etc) + ] + + def __init__(self, **kwargs): + super(FeedbackEvent, self).__init__(**kwargs) + + def get_content_template(self): + return {} + + +class InviteJoinEvent(SynapseEvent): + TYPE = "m.room.invite_join" + + valid_keys = SynapseEvent.valid_keys + [ + "target_user_id", + "target_host", + ] + + def __init__(self, **kwargs): + super(InviteJoinEvent, self).__init__(**kwargs) + + def get_content_template(self): + return {} + + +class RoomConfigEvent(SynapseEvent): + TYPE = "m.room.config" + + def __init__(self, **kwargs): + kwargs["state_key"] = "" + super(RoomConfigEvent, self).__init__(**kwargs) + + def get_content_template(self): + return {} diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py new file mode 100644 index 0000000000..974f7f0ba0 --- /dev/null +++ b/synapse/api/notifier.py @@ -0,0 +1,186 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.constants import Membership +from synapse.api.events.room import RoomMemberEvent + +from twisted.internet import defer +from twisted.internet import reactor + +import logging + +logger = logging.getLogger(__name__) + + +class Notifier(object): + + def __init__(self, hs): + self.store = hs.get_datastore() + self.hs = hs + self.stored_event_listeners = {} + + @defer.inlineCallbacks + def on_new_room_event(self, event, store_id): + """Called when there is a new room event which may potentially be sent + down listening users' event streams. + + This function looks for interested *users* who may want to be notified + for this event. This is different to users requesting from the event + stream which looks for interested *events* for this user. + + Args: + event (SynapseEvent): The new event, which must have a room_id + store_id (int): The ID of this event after it was stored with the + data store. + '""" + member_list = yield self.store.get_room_members(room_id=event.room_id, + membership="join") + if not member_list: + member_list = [] + + member_list = [u.user_id for u in member_list] + + # invites MUST prod the person being invited, who won't be in the room. + if (event.type == RoomMemberEvent.TYPE and + event.content["membership"] == Membership.INVITE): + member_list.append(event.target_user_id) + + for user_id in member_list: + if user_id in self.stored_event_listeners: + self._notify_and_callback( + user_id=user_id, + event_data=event.get_dict(), + stream_type=event.type, + store_id=store_id) + + def on_new_user_event(self, user_id, event_data, stream_type, store_id): + if user_id in self.stored_event_listeners: + self._notify_and_callback( + user_id=user_id, + event_data=event_data, + stream_type=stream_type, + store_id=store_id + ) + + def _notify_and_callback(self, user_id, event_data, stream_type, store_id): + logger.debug( + "Notifying %s of a new event.", + user_id + ) + + stream_ids = list(self.stored_event_listeners[user_id]) + for stream_id in stream_ids: + self._notify_and_callback_stream(user_id, stream_id, event_data, + stream_type, store_id) + + if not self.stored_event_listeners[user_id]: + del self.stored_event_listeners[user_id] + + def _notify_and_callback_stream(self, user_id, stream_id, event_data, + stream_type, store_id): + + event_listener = self.stored_event_listeners[user_id].pop(stream_id) + return_event_object = { + k: event_listener[k] for k in ["start", "chunk", "end"] + } + + # work out the new end token + token = event_listener["start"] + end = self._next_token(stream_type, store_id, token) + return_event_object["end"] = end + + # add the event to the chunk + chunk = event_listener["chunk"] + chunk.append(event_data) + + # callback the defer. We know this can't have been resolved before as + # we always remove the event_listener from the map before resolving. + event_listener["defer"].callback(return_event_object) + + def _next_token(self, stream_type, store_id, current_token): + stream_handler = self.hs.get_handlers().event_stream_handler + return stream_handler.get_event_stream_token( + stream_type, + store_id, + current_token + ) + + def store_events_for(self, user_id=None, stream_id=None, from_tok=None): + """Store all incoming events for this user. This should be paired with + get_events_for to return chunked data. + + Args: + user_id (str): The user to monitor incoming events for. + stream (object): The stream that is receiving events + from_tok (str): The token to monitor incoming events from. + """ + event_listener = { + "start": from_tok, + "chunk": [], + "end": from_tok, + "defer": defer.Deferred(), + } + + if user_id not in self.stored_event_listeners: + self.stored_event_listeners[user_id] = {stream_id: event_listener} + else: + self.stored_event_listeners[user_id][stream_id] = event_listener + + def purge_events_for(self, user_id=None, stream_id=None): + """Purges any stored events for this user. + + Args: + user_id (str): The user to purge stored events for. + """ + try: + del self.stored_event_listeners[user_id][stream_id] + if not self.stored_event_listeners[user_id]: + del self.stored_event_listeners[user_id] + except KeyError: + pass + + def get_events_for(self, user_id=None, stream_id=None, timeout=0): + """Retrieve stored events for this user, waiting if necessary. + + It is advisable to wrap this call in a maybeDeferred. + + Args: + user_id (str): The user to get events for. + timeout (int): The time in seconds to wait before giving up. + Returns: + A Deferred or a dict containing the chunk data, depending on if + there was data to return yet. The Deferred callback may be None if + there were no events before the timeout expired. + """ + logger.debug("%s is listening for events.", user_id) + + if len(self.stored_event_listeners[user_id][stream_id]["chunk"]) > 0: + logger.debug("%s returning existing chunk.", user_id) + return self.stored_event_listeners[user_id][stream_id] + + reactor.callLater( + (timeout / 1000.0), self._timeout, user_id, stream_id + ) + return self.stored_event_listeners[user_id][stream_id]["defer"] + + def _timeout(self, user_id, stream_id): + try: + # We remove the event_listener from the map so that we can't + # resolve the deferred twice. + event_listeners = self.stored_event_listeners[user_id] + event_listener = event_listeners.pop(stream_id) + event_listener["defer"].callback(None) + logger.debug("%s event listening timed out.", user_id) + except KeyError: + pass diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py new file mode 100644 index 0000000000..08137c1e79 --- /dev/null +++ b/synapse/api/streams/__init__.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.api.errors import SynapseError + + +class PaginationConfig(object): + + """A configuration object which stores pagination parameters.""" + + def __init__(self, from_tok=None, to_tok=None, limit=0): + self.from_tok = from_tok + self.to_tok = to_tok + self.limit = limit + + @classmethod + def from_request(cls, request, raise_invalid_params=True): + params = { + "from_tok": PaginationStream.TOK_START, + "to_tok": PaginationStream.TOK_END, + "limit": 0 + } + + query_param_mappings = [ # 3-tuple of qp_key, attribute, rules + ("from", "from_tok", lambda x: type(x) == str), + ("to", "to_tok", lambda x: type(x) == str), + ("limit", "limit", lambda x: x.isdigit()) + ] + + for qp, attr, is_valid in query_param_mappings: + if qp in request.args: + if is_valid(request.args[qp][0]): + params[attr] = request.args[qp][0] + elif raise_invalid_params: + raise SynapseError(400, "%s parameter is invalid." % qp) + + return PaginationConfig(**params) + + +class PaginationStream(object): + + """ An interface for streaming data as chunks. """ + + TOK_START = "START" + TOK_END = "END" + + def get_chunk(self, config=None): + """ Return the next chunk in the stream. + + Args: + config (PaginationConfig): The config to aid which chunk to get. + Returns: + A dict containing the new start token "start", the new end token + "end" and the data "chunk" as a list. + """ + raise NotImplementedError() + + +class StreamData(object): + + """ An interface for obtaining streaming data from a table. """ + + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + + def get_rows(self, user_id, from_pkey, to_pkey, limit): + """ Get event stream data between the specified pkeys. + + Args: + user_id : The user's ID + from_pkey : The starting pkey. + to_pkey : The end pkey. May be -1 to mean "latest". + limit: The max number of results to return. + Returns: + A tuple containing the list of event stream data and the last pkey. + """ + raise NotImplementedError() + + def max_token(self): + """ Get the latest currently-valid token. + + Returns: + The latest token.""" + raise NotImplementedError() diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py new file mode 100644 index 0000000000..0cc1a3e36a --- /dev/null +++ b/synapse/api/streams/event.py @@ -0,0 +1,247 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 matrix.org +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""This module contains classes for streaming from the event stream: /events. +""" +from twisted.internet import defer + +from synapse.api.errors import EventStreamError +from synapse.api.events.room import ( + RoomMemberEvent, MessageEvent, FeedbackEvent, RoomTopicEvent +) +from synapse.api.streams import PaginationStream, StreamData + +import logging + +logger = logging.getLogger(__name__) + + +class MessagesStreamData(StreamData): + EVENT_TYPE = MessageEvent.TYPE + + def __init__(self, hs, room_id=None, feedback=False): + super(MessagesStreamData, self).__init__(hs) + self.room_id = room_id + self.with_feedback = feedback + + @defer.inlineCallbacks + def get_rows(self, user_id, from_key, to_key, limit): + (data, latest_ver) = yield self.store.get_message_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key, + limit=limit, + room_id=self.room_id, + with_feedback=self.with_feedback + ) + defer.returnValue((data, latest_ver)) + + @defer.inlineCallbacks + def max_token(self): + val = yield self.store.get_max_message_id() + defer.returnValue(val) + + +class RoomMemberStreamData(StreamData): + EVENT_TYPE = RoomMemberEvent.TYPE + + @defer.inlineCallbacks + def get_rows(self, user_id, from_key, to_key, limit): + (data, latest_ver) = yield self.store.get_room_member_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key + ) + + defer.returnValue((data, latest_ver)) + + @defer.inlineCallbacks + def max_token(self): + val = yield self.store.get_max_room_member_id() + defer.returnValue(val) + + +class FeedbackStreamData(StreamData): + EVENT_TYPE = FeedbackEvent.TYPE + + def __init__(self, hs, room_id=None): + super(FeedbackStreamData, self).__init__(hs) + self.room_id = room_id + + @defer.inlineCallbacks + def get_rows(self, user_id, from_key, to_key, limit): + (data, latest_ver) = yield self.store.get_feedback_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key, + limit=limit, + room_id=self.room_id + ) + defer.returnValue((data, latest_ver)) + + @defer.inlineCallbacks + def max_token(self): + val = yield self.store.get_max_feedback_id() + defer.returnValue(val) + + +class RoomDataStreamData(StreamData): + EVENT_TYPE = RoomTopicEvent.TYPE # TODO need multiple event types + + def __init__(self, hs, room_id=None): + super(RoomDataStreamData, self).__init__(hs) + self.room_id = room_id + + @defer.inlineCallbacks + def get_rows(self, user_id, from_key, to_key, limit): + (data, latest_ver) = yield self.store.get_room_data_stream( + user_id=user_id, + from_key=from_key, + to_key=to_key, + limit=limit, + room_id=self.room_id + ) + defer.returnValue((data, latest_ver)) + + @defer.inlineCallbacks + def max_token(self): + val = yield self.store.get_max_room_data_id() + defer.returnValue(val) + + +class EventStream(PaginationStream): + + SEPARATOR = '_' + + def __init__(self, user_id, stream_data_list): + super(EventStream, self).__init__() + self.user_id = user_id + self.stream_data = stream_data_list + + @defer.inlineCallbacks + def fix_tokens(self, pagination_config): + pagination_config.from_tok = yield self.fix_token( + pagination_config.from_tok) + pagination_config.to_tok = yield self.fix_token( + pagination_config.to_tok) + defer.returnValue(pagination_config) + + @defer.inlineCallbacks + def fix_token(self, token): + """Fixes unknown values in a token to known values. + + Args: + token (str): The token to fix up. + Returns: + The fixed-up token, which may == token. + """ + # replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending. + replacements = [ + (PaginationStream.TOK_START, "0"), + (PaginationStream.TOK_END, "-1") + ] + for magic_token, key in replacements: + if magic_token == token: + token = EventStream.SEPARATOR.join( + [key] * len(self.stream_data) + ) + + # replace -1 values with an actual pkey + token_segments = self._split_token(token) + for i, tok in enumerate(token_segments): + if tok == -1: + # add 1 to the max token because results are EXCLUSIVE from the + # latest version. + token_segments[i] = 1 + (yield self.stream_data[i].max_token()) + defer.returnValue(EventStream.SEPARATOR.join( + str(x) for x in token_segments + )) + + @defer.inlineCallbacks + def get_chunk(self, config=None): + # no support for limit on >1 streams, makes no sense. + if config.limit and len(self.stream_data) > 1: + raise EventStreamError( + 400, "Limit not supported on multiplexed streams." + ) + + (chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok, + config.to_tok, + config.limit) + + defer.returnValue({ + "chunk": chunk_data, + "start": config.from_tok, + "end": next_tok + }) + + @defer.inlineCallbacks + def _get_chunk_data(self, from_tok, to_tok, limit): + """ Get event data between the two tokens. + + Tokens are SEPARATOR separated values representing pkey values of + certain tables, and the position determines the StreamData invoked + according to the STREAM_DATA list. + + The magic value '-1' can be used to get the latest value. + + Args: + from_tok - The token to start from. + to_tok - The token to end at. Must have values > from_tok or be -1. + Returns: + A list of event data. + Raises: + EventStreamError if something went wrong. + """ + # sanity check + if (from_tok.count(EventStream.SEPARATOR) != + to_tok.count(EventStream.SEPARATOR) or + (from_tok.count(EventStream.SEPARATOR) + 1) != + len(self.stream_data)): + raise EventStreamError(400, "Token lengths don't match.") + + chunk = [] + next_ver = [] + for i, (from_pkey, to_pkey) in enumerate(zip( + self._split_token(from_tok), + self._split_token(to_tok) + )): + if from_pkey == to_pkey: + # tokens are the same, we have nothing to do. + next_ver.append(str(to_pkey)) + continue + + (event_chunk, max_pkey) = yield self.stream_data[i].get_rows( + self.user_id, from_pkey, to_pkey, limit + ) + + chunk += event_chunk + next_ver.append(str(max_pkey)) + + defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver))) + + def _split_token(self, token): + """Splits the given token into a list of pkeys. + + Args: + token (str): The token with SEPARATOR values. + Returns: + A list of ints. + """ + segments = token.split(EventStream.SEPARATOR) + try: + int_segments = [int(x) for x in segments] + except ValueError: + raise EventStreamError(400, "Bad token: %s" % token) + return int_segments |