summary refs log tree commit diff
path: root/synapse/api
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/api')
-rw-r--r--synapse/api/__init__.py14
-rw-r--r--synapse/api/auth.py164
-rw-r--r--synapse/api/constants.py42
-rw-r--r--synapse/api/errors.py114
-rw-r--r--synapse/api/events/__init__.py152
-rw-r--r--synapse/api/events/factory.py50
-rw-r--r--synapse/api/events/room.py99
-rw-r--r--synapse/api/notifier.py186
-rw-r--r--synapse/api/streams/__init__.py96
-rw-r--r--synapse/api/streams/event.py247
10 files changed, 1164 insertions, 0 deletions
diff --git a/synapse/api/__init__.py b/synapse/api/__init__.py
new file mode 100644
index 0000000000..fe8a073cd3
--- /dev/null
+++ b/synapse/api/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
new file mode 100644
index 0000000000..5c66a7261f
--- /dev/null
+++ b/synapse/api/auth.py
@@ -0,0 +1,164 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""This module contains classes for authenticating the user."""
+from twisted.internet import defer
+
+from synapse.api.constants import Membership
+from synapse.api.errors import AuthError, StoreError
+from synapse.api.events.room import (RoomTopicEvent, RoomMemberEvent,
+                                     MessageEvent, FeedbackEvent)
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class Auth(object):
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def check(self, event, raises=False):
+        """ Checks if this event is correctly authed.
+
+        Returns:
+            True if the auth checks pass.
+        Raises:
+            AuthError if there was a problem authorising this event. This will
+            be raised only if raises=True.
+        """
+        try:
+            if event.type in [RoomTopicEvent.TYPE, MessageEvent.TYPE,
+                              FeedbackEvent.TYPE]:
+                yield self.check_joined_room(event.room_id, event.user_id)
+                defer.returnValue(True)
+            elif event.type == RoomMemberEvent.TYPE:
+                allowed = yield self.is_membership_change_allowed(event)
+                defer.returnValue(allowed)
+            else:
+                raise AuthError(500, "Unknown event type %s" % event.type)
+        except AuthError as e:
+            logger.info("Event auth check failed on event %s with msg: %s",
+                        event, e.msg)
+            if raises:
+                raise e
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def check_joined_room(self, room_id, user_id):
+        try:
+            member = yield self.store.get_room_member(
+                room_id=room_id,
+                user_id=user_id
+            )
+            if not member or member.membership != Membership.JOIN:
+                raise AuthError(403, "User %s not in room %s" %
+                                (user_id, room_id))
+            defer.returnValue(member)
+        except AttributeError:
+            pass
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def is_membership_change_allowed(self, event):
+        # does this room even exist
+        room = yield self.store.get_room(event.room_id)
+        if not room:
+            raise AuthError(403, "Room does not exist")
+
+        # get info about the caller
+        try:
+            caller = yield self.store.get_room_member(
+                user_id=event.user_id,
+                room_id=event.room_id)
+        except:
+            caller = None
+        caller_in_room = caller and caller.membership == "join"
+
+        # get info about the target
+        try:
+            target = yield self.store.get_room_member(
+                user_id=event.target_user_id,
+                room_id=event.room_id)
+        except:
+            target = None
+        target_in_room = target and target.membership == "join"
+
+        membership = event.content["membership"]
+
+        if Membership.INVITE == membership:
+            # Invites are valid iff caller is in the room and target isn't.
+            if not caller_in_room:  # caller isn't joined
+                raise AuthError(403, "You are not in room %s." % event.room_id)
+            elif target_in_room:  # the target is already in the room.
+                raise AuthError(403, "%s is already in the room." %
+                                     event.target_user_id)
+        elif Membership.JOIN == membership:
+            # Joins are valid iff caller == target and they were:
+            # invited: They are accepting the invitation
+            # joined: It's a NOOP
+            if event.user_id != event.target_user_id:
+                raise AuthError(403, "Cannot force another user to join.")
+            elif room.is_public:
+                pass  # anyone can join public rooms.
+            elif (not caller or caller.membership not in
+                    [Membership.INVITE, Membership.JOIN]):
+                raise AuthError(403, "You are not invited to this room.")
+        elif Membership.LEAVE == membership:
+            if not caller_in_room:  # trying to leave a room you aren't joined
+                raise AuthError(403, "You are not in room %s." % event.room_id)
+            elif event.target_user_id != event.user_id:
+                # trying to force another user to leave
+                raise AuthError(403, "Cannot force %s to leave." %
+                                event.target_user_id)
+        else:
+            raise AuthError(500, "Unknown membership %s" % membership)
+
+        defer.returnValue(True)
+
+    def get_user_by_req(self, request):
+        """ Get a registered user's ID.
+
+        Args:
+            request - An HTTP request with an access_token query parameter.
+        Returns:
+            UserID : User ID object of the user making the request
+        Raises:
+            AuthError if no user by that token exists or the token is invalid.
+        """
+        # Can optionally look elsewhere in the request (e.g. headers)
+        try:
+            return self.get_user_by_token(request.args["access_token"][0])
+        except KeyError:
+            raise AuthError(403, "Missing access token.")
+
+    @defer.inlineCallbacks
+    def get_user_by_token(self, token):
+        """ Get a registered user's ID.
+
+        Args:
+            token (str)- The access token to get the user by.
+        Returns:
+            UserID : User ID object of the user who has that access token.
+        Raises:
+            AuthError if no user by that token exists or the token is invalid.
+        """
+        try:
+            user_id = yield self.store.get_user_by_token(token=token)
+            defer.returnValue(self.hs.parse_userid(user_id))
+        except StoreError:
+            raise AuthError(403, "Unrecognised access token.")
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
new file mode 100644
index 0000000000..37bf41bfb3
--- /dev/null
+++ b/synapse/api/constants.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Contains constants from the specification."""
+
+
+class Membership(object):
+
+    """Represents the membership states of a user in a room."""
+    INVITE = u"invite"
+    JOIN = u"join"
+    KNOCK = u"knock"
+    LEAVE = u"leave"
+
+
+class Feedback(object):
+
+    """Represents the types of feedback a user can send in response to a
+    message."""
+
+    DELIVERED = u"d"
+    READ = u"r"
+    LIST = (DELIVERED, READ)
+
+
+class PresenceState(object):
+    """Represents the presence state of a user."""
+    OFFLINE = 0
+    BUSY = 1
+    ONLINE = 2
+    FREE_FOR_CHAT = 3
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
new file mode 100644
index 0000000000..7ad4d636c2
--- /dev/null
+++ b/synapse/api/errors.py
@@ -0,0 +1,114 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Contains exceptions and error codes."""
+
+import logging
+
+
+class Codes(object):
+    FORBIDDEN = "M_FORBIDDEN"
+    BAD_JSON = "M_BAD_JSON"
+    NOT_JSON = "M_NOT_JSON"
+    USER_IN_USE = "M_USER_IN_USE"
+    ROOM_IN_USE = "M_ROOM_IN_USE"
+    BAD_PAGINATION = "M_BAD_PAGINATION"
+    UNKNOWN = "M_UNKNOWN"
+    NOT_FOUND = "M_NOT_FOUND"
+
+
+class CodeMessageException(Exception):
+    """An exception with integer code and message string attributes."""
+
+    def __init__(self, code, msg):
+        logging.error("%s: %s, %s", type(self).__name__, code, msg)
+        super(CodeMessageException, self).__init__("%d: %s" % (code, msg))
+        self.code = code
+        self.msg = msg
+
+
+class SynapseError(CodeMessageException):
+    """A base error which can be caught for all synapse events."""
+    def __init__(self, code, msg, errcode=""):
+        """Constructs a synapse error.
+
+        Args:
+            code (int): The integer error code (typically an HTTP response code)
+            msg (str): The human-readable error message.
+            err (str): The error code e.g 'M_FORBIDDEN'
+        """
+        super(SynapseError, self).__init__(code, msg)
+        self.errcode = errcode
+
+
+class RoomError(SynapseError):
+    """An error raised when a room event fails."""
+    pass
+
+
+class RegistrationError(SynapseError):
+    """An error raised when a registration event fails."""
+    pass
+
+
+class AuthError(SynapseError):
+    """An error raised when there was a problem authorising an event."""
+
+    def __init__(self, *args, **kwargs):
+        if "errcode" not in kwargs:
+            kwargs["errcode"] = Codes.FORBIDDEN
+        super(AuthError, self).__init__(*args, **kwargs)
+
+
+class EventStreamError(SynapseError):
+    """An error raised when there a problem with the event stream."""
+    pass
+
+
+class LoginError(SynapseError):
+    """An error raised when there was a problem logging in."""
+    pass
+
+
+class StoreError(SynapseError):
+    """An error raised when there was a problem storing some data."""
+    pass
+
+
+def cs_exception(exception):
+    if isinstance(exception, SynapseError):
+        return cs_error(
+            exception.msg,
+            Codes.UNKNOWN if not exception.errcode else exception.errcode)
+    elif isinstance(exception, CodeMessageException):
+        return cs_error(exception.msg)
+    else:
+        logging.error("Unknown exception type: %s", type(exception))
+
+
+def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
+    """ Utility method for constructing an error response for client-server
+    interactions.
+
+    Args:
+        msg (str): The error message.
+        code (int): The error code.
+        kwargs : Additional keys to add to the response.
+    Returns:
+        A dict representing the error response JSON.
+    """
+    err = {"error": msg, "errcode": code}
+    for key, value in kwargs.iteritems():
+        err[key] = value
+    return err
diff --git a/synapse/api/events/__init__.py b/synapse/api/events/__init__.py
new file mode 100644
index 0000000000..bc2daf3361
--- /dev/null
+++ b/synapse/api/events/__init__.py
@@ -0,0 +1,152 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.api.errors import SynapseError, Codes
+from synapse.util.jsonobject import JsonEncodedObject
+
+
+class SynapseEvent(JsonEncodedObject):
+
+    """Base class for Synapse events. These are JSON objects which must abide
+    by a certain well-defined structure.
+    """
+
+    # Attributes that are currently assumed by the federation side:
+    # Mandatory:
+    # - event_id
+    # - room_id
+    # - type
+    # - is_state
+    #
+    # Optional:
+    # - state_key (mandatory when is_state is True)
+    # - prev_events (these can be filled out by the federation layer itself.)
+    # - prev_state
+
+    valid_keys = [
+        "event_id",
+        "type",
+        "room_id",
+        "user_id",  # sender/initiator
+        "content",  # HTTP body, JSON
+    ]
+
+    internal_keys = [
+        "is_state",
+        "state_key",
+        "prev_events",
+        "prev_state",
+        "depth",
+        "destinations",
+        "origin",
+    ]
+
+    required_keys = [
+        "event_id",
+        "room_id",
+        "content",
+    ]
+
+    def __init__(self, raises=True, **kwargs):
+        super(SynapseEvent, self).__init__(**kwargs)
+        if "content" in kwargs:
+            self.check_json(self.content, raises=raises)
+
+    def get_content_template(self):
+        """ Retrieve the JSON template for this event as a dict.
+
+        The template must be a dict representing the JSON to match. Only
+        required keys should be present. The values of the keys in the template
+        are checked via type() to the values of the same keys in the actual
+        event JSON.
+
+        NB: If loading content via json.loads, you MUST define strings as
+        unicode.
+
+        For example:
+            Content:
+                {
+                    "name": u"bob",
+                    "age": 18,
+                    "friends": [u"mike", u"jill"]
+                }
+            Template:
+                {
+                    "name": u"string",
+                    "age": 0,
+                    "friends": [u"string"]
+                }
+            The values "string" and 0 could be anything, so long as the types
+            are the same as the content.
+        """
+        raise NotImplementedError("get_content_template not implemented.")
+
+    def check_json(self, content, raises=True):
+        """Checks the given JSON content abides by the rules of the template.
+
+        Args:
+            content : A JSON object to check.
+            raises: True to raise a SynapseError if the check fails.
+        Returns:
+            True if the content passes the template. Returns False if the check
+            fails and raises=False.
+        Raises:
+            SynapseError if the check fails and raises=True.
+        """
+        # recursively call to inspect each layer
+        err_msg = self._check_json(content, self.get_content_template())
+        if err_msg:
+            if raises:
+                raise SynapseError(400, err_msg, Codes.BAD_JSON)
+            else:
+                return False
+        else:
+            return True
+
+    def _check_json(self, content, template):
+        """Check content and template matches.
+
+        If the template is a dict, each key in the dict will be validated with
+        the content, else it will just compare the types of content and
+        template. This basic type check is required because this function will
+        be recursively called and could be called with just strs or ints.
+
+        Args:
+            content: The content to validate.
+            template: The validation template.
+        Returns:
+            str: An error message if the validation fails, else None.
+        """
+        if type(content) != type(template):
+            return "Mismatched types: %s" % template
+
+        if type(template) == dict:
+            for key in template:
+                if key not in content:
+                    return "Missing %s key" % key
+
+                if type(content[key]) != type(template[key]):
+                    return "Key %s is of the wrong type." % key
+
+                if type(content[key]) == dict:
+                    # we must go deeper
+                    msg = self._check_json(content[key], template[key])
+                    if msg:
+                        return msg
+                elif type(content[key]) == list:
+                    # make sure each item type in content matches the template
+                    for entry in content[key]:
+                        msg = self._check_json(entry, template[key][0])
+                        if msg:
+                            return msg
diff --git a/synapse/api/events/factory.py b/synapse/api/events/factory.py
new file mode 100644
index 0000000000..ea7afa234e
--- /dev/null
+++ b/synapse/api/events/factory.py
@@ -0,0 +1,50 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.api.events.room import (
+    RoomTopicEvent, MessageEvent, RoomMemberEvent, FeedbackEvent,
+    InviteJoinEvent, RoomConfigEvent
+)
+
+from synapse.util.stringutils import random_string
+
+
+class EventFactory(object):
+
+    _event_classes = [
+        RoomTopicEvent,
+        MessageEvent,
+        RoomMemberEvent,
+        FeedbackEvent,
+        InviteJoinEvent,
+        RoomConfigEvent
+    ]
+
+    def __init__(self):
+        self._event_list = {}  # dict of TYPE to event class
+        for event_class in EventFactory._event_classes:
+            self._event_list[event_class.TYPE] = event_class
+
+    def create_event(self, etype=None, **kwargs):
+        kwargs["type"] = etype
+        if "event_id" not in kwargs:
+            kwargs["event_id"] = random_string(10)
+
+        try:
+            handler = self._event_list[etype]
+        except KeyError:  # unknown event type
+            # TODO allow custom event types.
+            raise NotImplementedError("Unknown etype=%s" % etype)
+
+        return handler(**kwargs)
diff --git a/synapse/api/events/room.py b/synapse/api/events/room.py
new file mode 100644
index 0000000000..b31cd19f4b
--- /dev/null
+++ b/synapse/api/events/room.py
@@ -0,0 +1,99 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from . import SynapseEvent
+
+
+class RoomTopicEvent(SynapseEvent):
+    TYPE = "m.room.topic"
+
+    def __init__(self, **kwargs):
+        kwargs["state_key"] = ""
+        super(RoomTopicEvent, self).__init__(**kwargs)
+
+    def get_content_template(self):
+        return {"topic": u"string"}
+
+
+class RoomMemberEvent(SynapseEvent):
+    TYPE = "m.room.member"
+
+    valid_keys = SynapseEvent.valid_keys + [
+        "target_user_id",  # target
+        "membership",  # action
+    ]
+
+    def __init__(self, **kwargs):
+        if "target_user_id" in kwargs:
+            kwargs["state_key"] = kwargs["target_user_id"]
+        super(RoomMemberEvent, self).__init__(**kwargs)
+
+    def get_content_template(self):
+        return {"membership": u"string"}
+
+
+class MessageEvent(SynapseEvent):
+    TYPE = "m.room.message"
+
+    valid_keys = SynapseEvent.valid_keys + [
+        "msg_id",  # unique per room + user combo
+    ]
+
+    def __init__(self, **kwargs):
+        super(MessageEvent, self).__init__(**kwargs)
+
+    def get_content_template(self):
+        return {"msgtype": u"string"}
+
+
+class FeedbackEvent(SynapseEvent):
+    TYPE = "m.room.message.feedback"
+
+    valid_keys = SynapseEvent.valid_keys + [
+        "msg_id",  # the message ID being acknowledged
+        "msg_sender_id",  # person who is sending the feedback is 'user_id'
+        "feedback_type",  # the type of feedback (delivery, read, etc)
+    ]
+
+    def __init__(self, **kwargs):
+        super(FeedbackEvent, self).__init__(**kwargs)
+
+    def get_content_template(self):
+        return {}
+
+
+class InviteJoinEvent(SynapseEvent):
+    TYPE = "m.room.invite_join"
+
+    valid_keys = SynapseEvent.valid_keys + [
+        "target_user_id",
+        "target_host",
+    ]
+
+    def __init__(self, **kwargs):
+        super(InviteJoinEvent, self).__init__(**kwargs)
+
+    def get_content_template(self):
+        return {}
+
+
+class RoomConfigEvent(SynapseEvent):
+    TYPE = "m.room.config"
+
+    def __init__(self, **kwargs):
+        kwargs["state_key"] = ""
+        super(RoomConfigEvent, self).__init__(**kwargs)
+
+    def get_content_template(self):
+        return {}
diff --git a/synapse/api/notifier.py b/synapse/api/notifier.py
new file mode 100644
index 0000000000..974f7f0ba0
--- /dev/null
+++ b/synapse/api/notifier.py
@@ -0,0 +1,186 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.api.constants import Membership
+from synapse.api.events.room import RoomMemberEvent
+
+from twisted.internet import defer
+from twisted.internet import reactor
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class Notifier(object):
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.hs = hs
+        self.stored_event_listeners = {}
+
+    @defer.inlineCallbacks
+    def on_new_room_event(self, event, store_id):
+        """Called when there is a new room event which may potentially be sent
+        down listening users' event streams.
+
+        This function looks for interested *users* who may want to be notified
+        for this event. This is different to users requesting from the event
+        stream which looks for interested *events* for this user.
+
+        Args:
+            event (SynapseEvent): The new event, which must have a room_id
+            store_id (int): The ID of this event after it was stored with the
+            data store.
+        '"""
+        member_list = yield self.store.get_room_members(room_id=event.room_id,
+                                                        membership="join")
+        if not member_list:
+            member_list = []
+
+        member_list = [u.user_id for u in member_list]
+
+        # invites MUST prod the person being invited, who won't be in the room.
+        if (event.type == RoomMemberEvent.TYPE and
+                event.content["membership"] == Membership.INVITE):
+            member_list.append(event.target_user_id)
+
+        for user_id in member_list:
+            if user_id in self.stored_event_listeners:
+                self._notify_and_callback(
+                    user_id=user_id,
+                    event_data=event.get_dict(),
+                    stream_type=event.type,
+                    store_id=store_id)
+
+    def on_new_user_event(self, user_id, event_data, stream_type, store_id):
+        if user_id in self.stored_event_listeners:
+            self._notify_and_callback(
+                user_id=user_id,
+                event_data=event_data,
+                stream_type=stream_type,
+                store_id=store_id
+            )
+
+    def _notify_and_callback(self, user_id, event_data, stream_type, store_id):
+        logger.debug(
+            "Notifying %s of a new event.",
+            user_id
+        )
+
+        stream_ids = list(self.stored_event_listeners[user_id])
+        for stream_id in stream_ids:
+            self._notify_and_callback_stream(user_id, stream_id, event_data,
+                                             stream_type, store_id)
+
+        if not self.stored_event_listeners[user_id]:
+            del self.stored_event_listeners[user_id]
+
+    def _notify_and_callback_stream(self, user_id, stream_id, event_data,
+                                    stream_type, store_id):
+
+        event_listener = self.stored_event_listeners[user_id].pop(stream_id)
+        return_event_object = {
+            k: event_listener[k] for k in ["start", "chunk", "end"]
+        }
+
+        # work out the new end token
+        token = event_listener["start"]
+        end = self._next_token(stream_type, store_id, token)
+        return_event_object["end"] = end
+
+        # add the event to the chunk
+        chunk = event_listener["chunk"]
+        chunk.append(event_data)
+
+        # callback the defer. We know this can't have been resolved before as
+        # we always remove the event_listener from the map before resolving.
+        event_listener["defer"].callback(return_event_object)
+
+    def _next_token(self, stream_type, store_id, current_token):
+        stream_handler = self.hs.get_handlers().event_stream_handler
+        return stream_handler.get_event_stream_token(
+            stream_type,
+            store_id,
+            current_token
+        )
+
+    def store_events_for(self, user_id=None, stream_id=None, from_tok=None):
+        """Store all incoming events for this user. This should be paired with
+        get_events_for to return chunked data.
+
+        Args:
+            user_id (str): The user to monitor incoming events for.
+            stream (object): The stream that is receiving events
+            from_tok (str): The token to monitor incoming events from.
+        """
+        event_listener = {
+            "start": from_tok,
+            "chunk": [],
+            "end": from_tok,
+            "defer": defer.Deferred(),
+        }
+
+        if user_id not in self.stored_event_listeners:
+            self.stored_event_listeners[user_id] = {stream_id: event_listener}
+        else:
+            self.stored_event_listeners[user_id][stream_id] = event_listener
+
+    def purge_events_for(self, user_id=None, stream_id=None):
+        """Purges any stored events for this user.
+
+        Args:
+            user_id (str): The user to purge stored events for.
+        """
+        try:
+            del self.stored_event_listeners[user_id][stream_id]
+            if not self.stored_event_listeners[user_id]:
+                del self.stored_event_listeners[user_id]
+        except KeyError:
+            pass
+
+    def get_events_for(self, user_id=None, stream_id=None, timeout=0):
+        """Retrieve stored events for this user, waiting if necessary.
+
+        It is advisable to wrap this call in a maybeDeferred.
+
+        Args:
+            user_id (str): The user to get events for.
+            timeout (int): The time in seconds to wait before giving up.
+        Returns:
+            A Deferred or a dict containing the chunk data, depending on if
+            there was data to return yet. The Deferred callback may be None if
+            there were no events before the timeout expired.
+        """
+        logger.debug("%s is listening for events.", user_id)
+
+        if len(self.stored_event_listeners[user_id][stream_id]["chunk"]) > 0:
+            logger.debug("%s returning existing chunk.", user_id)
+            return self.stored_event_listeners[user_id][stream_id]
+
+        reactor.callLater(
+            (timeout / 1000.0), self._timeout, user_id, stream_id
+        )
+        return self.stored_event_listeners[user_id][stream_id]["defer"]
+
+    def _timeout(self, user_id, stream_id):
+        try:
+            # We remove the event_listener from the map so that we can't
+            # resolve the deferred twice.
+            event_listeners = self.stored_event_listeners[user_id]
+            event_listener = event_listeners.pop(stream_id)
+            event_listener["defer"].callback(None)
+            logger.debug("%s event listening timed out.", user_id)
+        except KeyError:
+            pass
diff --git a/synapse/api/streams/__init__.py b/synapse/api/streams/__init__.py
new file mode 100644
index 0000000000..08137c1e79
--- /dev/null
+++ b/synapse/api/streams/__init__.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.api.errors import SynapseError
+
+
+class PaginationConfig(object):
+
+    """A configuration object which stores pagination parameters."""
+
+    def __init__(self, from_tok=None, to_tok=None, limit=0):
+        self.from_tok = from_tok
+        self.to_tok = to_tok
+        self.limit = limit
+
+    @classmethod
+    def from_request(cls, request, raise_invalid_params=True):
+        params = {
+            "from_tok": PaginationStream.TOK_START,
+            "to_tok": PaginationStream.TOK_END,
+            "limit": 0
+        }
+
+        query_param_mappings = [  # 3-tuple of qp_key, attribute, rules
+            ("from", "from_tok", lambda x: type(x) == str),
+            ("to", "to_tok", lambda x: type(x) == str),
+            ("limit", "limit", lambda x: x.isdigit())
+        ]
+
+        for qp, attr, is_valid in query_param_mappings:
+            if qp in request.args:
+                if is_valid(request.args[qp][0]):
+                    params[attr] = request.args[qp][0]
+                elif raise_invalid_params:
+                    raise SynapseError(400, "%s parameter is invalid." % qp)
+
+        return PaginationConfig(**params)
+
+
+class PaginationStream(object):
+
+    """ An interface for streaming data as chunks. """
+
+    TOK_START = "START"
+    TOK_END = "END"
+
+    def get_chunk(self, config=None):
+        """ Return the next chunk in the stream.
+
+        Args:
+            config (PaginationConfig): The config to aid which chunk to get.
+        Returns:
+            A dict containing the new start token "start", the new end token
+            "end" and the data "chunk" as a list.
+        """
+        raise NotImplementedError()
+
+
+class StreamData(object):
+
+    """ An interface for obtaining streaming data from a table. """
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+
+    def get_rows(self, user_id, from_pkey, to_pkey, limit):
+        """ Get event stream data between the specified pkeys.
+
+        Args:
+            user_id : The user's ID
+            from_pkey : The starting pkey.
+            to_pkey : The end pkey. May be -1 to mean "latest".
+            limit: The max number of results to return.
+        Returns:
+            A tuple containing the list of event stream data and the last pkey.
+        """
+        raise NotImplementedError()
+
+    def max_token(self):
+        """ Get the latest currently-valid token.
+
+        Returns:
+            The latest token."""
+        raise NotImplementedError()
diff --git a/synapse/api/streams/event.py b/synapse/api/streams/event.py
new file mode 100644
index 0000000000..0cc1a3e36a
--- /dev/null
+++ b/synapse/api/streams/event.py
@@ -0,0 +1,247 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""This module contains classes for streaming from the event stream: /events.
+"""
+from twisted.internet import defer
+
+from synapse.api.errors import EventStreamError
+from synapse.api.events.room import (
+    RoomMemberEvent, MessageEvent, FeedbackEvent, RoomTopicEvent
+)
+from synapse.api.streams import PaginationStream, StreamData
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class MessagesStreamData(StreamData):
+    EVENT_TYPE = MessageEvent.TYPE
+
+    def __init__(self, hs, room_id=None, feedback=False):
+        super(MessagesStreamData, self).__init__(hs)
+        self.room_id = room_id
+        self.with_feedback = feedback
+
+    @defer.inlineCallbacks
+    def get_rows(self, user_id, from_key, to_key, limit):
+        (data, latest_ver) = yield self.store.get_message_stream(
+            user_id=user_id,
+            from_key=from_key,
+            to_key=to_key,
+            limit=limit,
+            room_id=self.room_id,
+            with_feedback=self.with_feedback
+        )
+        defer.returnValue((data, latest_ver))
+
+    @defer.inlineCallbacks
+    def max_token(self):
+        val = yield self.store.get_max_message_id()
+        defer.returnValue(val)
+
+
+class RoomMemberStreamData(StreamData):
+    EVENT_TYPE = RoomMemberEvent.TYPE
+
+    @defer.inlineCallbacks
+    def get_rows(self, user_id, from_key, to_key, limit):
+        (data, latest_ver) = yield self.store.get_room_member_stream(
+            user_id=user_id,
+            from_key=from_key,
+            to_key=to_key
+        )
+
+        defer.returnValue((data, latest_ver))
+
+    @defer.inlineCallbacks
+    def max_token(self):
+        val = yield self.store.get_max_room_member_id()
+        defer.returnValue(val)
+
+
+class FeedbackStreamData(StreamData):
+    EVENT_TYPE = FeedbackEvent.TYPE
+
+    def __init__(self, hs, room_id=None):
+        super(FeedbackStreamData, self).__init__(hs)
+        self.room_id = room_id
+
+    @defer.inlineCallbacks
+    def get_rows(self, user_id, from_key, to_key, limit):
+        (data, latest_ver) = yield self.store.get_feedback_stream(
+            user_id=user_id,
+            from_key=from_key,
+            to_key=to_key,
+            limit=limit,
+            room_id=self.room_id
+        )
+        defer.returnValue((data, latest_ver))
+
+    @defer.inlineCallbacks
+    def max_token(self):
+        val = yield self.store.get_max_feedback_id()
+        defer.returnValue(val)
+
+
+class RoomDataStreamData(StreamData):
+    EVENT_TYPE = RoomTopicEvent.TYPE  # TODO need multiple event types
+
+    def __init__(self, hs, room_id=None):
+        super(RoomDataStreamData, self).__init__(hs)
+        self.room_id = room_id
+
+    @defer.inlineCallbacks
+    def get_rows(self, user_id, from_key, to_key, limit):
+        (data, latest_ver) = yield self.store.get_room_data_stream(
+            user_id=user_id,
+            from_key=from_key,
+            to_key=to_key,
+            limit=limit,
+            room_id=self.room_id
+        )
+        defer.returnValue((data, latest_ver))
+
+    @defer.inlineCallbacks
+    def max_token(self):
+        val = yield self.store.get_max_room_data_id()
+        defer.returnValue(val)
+
+
+class EventStream(PaginationStream):
+
+    SEPARATOR = '_'
+
+    def __init__(self, user_id, stream_data_list):
+        super(EventStream, self).__init__()
+        self.user_id = user_id
+        self.stream_data = stream_data_list
+
+    @defer.inlineCallbacks
+    def fix_tokens(self, pagination_config):
+        pagination_config.from_tok = yield self.fix_token(
+            pagination_config.from_tok)
+        pagination_config.to_tok = yield self.fix_token(
+            pagination_config.to_tok)
+        defer.returnValue(pagination_config)
+
+    @defer.inlineCallbacks
+    def fix_token(self, token):
+        """Fixes unknown values in a token to known values.
+
+        Args:
+            token (str): The token to fix up.
+        Returns:
+            The fixed-up token, which may == token.
+        """
+        # replace TOK_START and TOK_END with 0_0_0 or -1_-1_-1 depending.
+        replacements = [
+            (PaginationStream.TOK_START, "0"),
+            (PaginationStream.TOK_END, "-1")
+        ]
+        for magic_token, key in replacements:
+            if magic_token == token:
+                token = EventStream.SEPARATOR.join(
+                    [key] * len(self.stream_data)
+                )
+
+        # replace -1 values with an actual pkey
+        token_segments = self._split_token(token)
+        for i, tok in enumerate(token_segments):
+            if tok == -1:
+                # add 1 to the max token because results are EXCLUSIVE from the
+                # latest version.
+                token_segments[i] = 1 + (yield self.stream_data[i].max_token())
+        defer.returnValue(EventStream.SEPARATOR.join(
+            str(x) for x in token_segments
+        ))
+
+    @defer.inlineCallbacks
+    def get_chunk(self, config=None):
+        # no support for limit on >1 streams, makes no sense.
+        if config.limit and len(self.stream_data) > 1:
+            raise EventStreamError(
+                400, "Limit not supported on multiplexed streams."
+            )
+
+        (chunk_data, next_tok) = yield self._get_chunk_data(config.from_tok,
+                                                            config.to_tok,
+                                                            config.limit)
+
+        defer.returnValue({
+            "chunk": chunk_data,
+            "start": config.from_tok,
+            "end": next_tok
+        })
+
+    @defer.inlineCallbacks
+    def _get_chunk_data(self, from_tok, to_tok, limit):
+        """ Get event data between the two tokens.
+
+        Tokens are SEPARATOR separated values representing pkey values of
+        certain tables, and the position determines the StreamData invoked
+        according to the STREAM_DATA list.
+
+        The magic value '-1' can be used to get the latest value.
+
+        Args:
+            from_tok - The token to start from.
+            to_tok - The token to end at. Must have values > from_tok or be -1.
+        Returns:
+            A list of event data.
+        Raises:
+            EventStreamError if something went wrong.
+        """
+        # sanity check
+        if (from_tok.count(EventStream.SEPARATOR) !=
+                to_tok.count(EventStream.SEPARATOR) or
+                (from_tok.count(EventStream.SEPARATOR) + 1) !=
+                len(self.stream_data)):
+            raise EventStreamError(400, "Token lengths don't match.")
+
+        chunk = []
+        next_ver = []
+        for i, (from_pkey, to_pkey) in enumerate(zip(
+            self._split_token(from_tok),
+            self._split_token(to_tok)
+        )):
+            if from_pkey == to_pkey:
+                # tokens are the same, we have nothing to do.
+                next_ver.append(str(to_pkey))
+                continue
+
+            (event_chunk, max_pkey) = yield self.stream_data[i].get_rows(
+                self.user_id, from_pkey, to_pkey, limit
+            )
+
+            chunk += event_chunk
+            next_ver.append(str(max_pkey))
+
+        defer.returnValue((chunk, EventStream.SEPARATOR.join(next_ver)))
+
+    def _split_token(self, token):
+        """Splits the given token into a list of pkeys.
+
+        Args:
+            token (str): The token with SEPARATOR values.
+        Returns:
+            A list of ints.
+        """
+        segments = token.split(EventStream.SEPARATOR)
+        try:
+            int_segments = [int(x) for x in segments]
+        except ValueError:
+            raise EventStreamError(400, "Bad token: %s" % token)
+        return int_segments