summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authormatrix.org <matrix@matrix.org>2014-08-12 15:10:52 +0100
committermatrix.org <matrix@matrix.org>2014-08-12 15:10:52 +0100
commit4f475c7697722e946e39e42f38f3dd03a95d8765 (patch)
tree076d96d3809fb836c7245fd9f7960e7b75888a77 /synapse/handlers
downloadsynapse-4f475c7697722e946e39e42f38f3dd03a95d8765.tar.xz
Reference Matrix Home Server
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py46
-rw-r--r--synapse/handlers/_base.py26
-rw-r--r--synapse/handlers/directory.py100
-rw-r--r--synapse/handlers/events.py149
-rw-r--r--synapse/handlers/federation.py74
-rw-r--r--synapse/handlers/login.py64
-rw-r--r--synapse/handlers/presence.py697
-rw-r--r--synapse/handlers/profile.py169
-rw-r--r--synapse/handlers/register.py100
-rw-r--r--synapse/handlers/room.py808
10 files changed, 2233 insertions, 0 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
new file mode 100644
index 0000000000..5688b68e49
--- /dev/null
+++ b/synapse/handlers/__init__.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from .register import RegistrationHandler
+from .room import (
+    MessageHandler, RoomCreationHandler, RoomMemberHandler, RoomListHandler
+)
+from .events import EventStreamHandler
+from .federation import FederationHandler
+from .login import LoginHandler
+from .profile import ProfileHandler
+from .presence import PresenceHandler
+from .directory import DirectoryHandler
+
+
+class Handlers(object):
+
+    """ A collection of all the event handlers.
+
+    There's no need to lazily create these; we'll just make them all eagerly
+    at construction time.
+    """
+
+    def __init__(self, hs):
+        self.registration_handler = RegistrationHandler(hs)
+        self.message_handler = MessageHandler(hs)
+        self.room_creation_handler = RoomCreationHandler(hs)
+        self.room_member_handler = RoomMemberHandler(hs)
+        self.event_stream_handler = EventStreamHandler(hs)
+        self.federation_handler = FederationHandler(hs)
+        self.profile_handler = ProfileHandler(hs)
+        self.presence_handler = PresenceHandler(hs)
+        self.room_list_handler = RoomListHandler(hs)
+        self.login_handler = LoginHandler(hs)
+        self.directory_handler = DirectoryHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
new file mode 100644
index 0000000000..87a392dd77
--- /dev/null
+++ b/synapse/handlers/_base.py
@@ -0,0 +1,26 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class BaseHandler(object):
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.event_factory = hs.get_event_factory()
+        self.auth = hs.get_auth()
+        self.notifier = hs.get_notifier()
+        self.room_lock = hs.get_room_lock_manager()
+        self.state_handler = hs.get_state_handler()
+        self.hs = hs
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
new file mode 100644
index 0000000000..456007c71d
--- /dev/null
+++ b/synapse/handlers/directory.py
@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from ._base import BaseHandler
+
+from synapse.api.errors import SynapseError
+
+import logging
+import json
+import urllib
+
+
+logger = logging.getLogger(__name__)
+
+
+# TODO(erikj): This needs to be factored out somewere
+PREFIX = "/matrix/client/api/v1"
+
+
+class DirectoryHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(DirectoryHandler, self).__init__(hs)
+        self.hs = hs
+        self.http_client = hs.get_http_client()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def create_association(self, room_alias, room_id, servers):
+        # TODO(erikj): Do auth.
+
+        if not room_alias.is_mine:
+            raise SynapseError(400, "Room alias must be local")
+            # TODO(erikj): Change this.
+
+        # TODO(erikj): Add transactions.
+
+        # TODO(erikj): Check if there is a current association.
+
+        yield self.store.create_room_alias_association(
+            room_alias,
+            room_id,
+            servers
+        )
+
+    @defer.inlineCallbacks
+    def get_association(self, room_alias, local_only=False):
+        # TODO(erikj): Do auth
+
+        room_id = None
+        if room_alias.is_mine:
+            result = yield self.store.get_association_from_room_alias(
+                room_alias
+            )
+
+            if result:
+                room_id = result.room_id
+                servers = result.servers
+        elif not local_only:
+            path = "%s/ds/room/%s?local_only=1" % (
+                PREFIX,
+                urllib.quote(room_alias.to_string())
+            )
+
+            result = None
+            try:
+                result = yield self.http_client.get_json(
+                    destination=room_alias.domain,
+                    path=path,
+                )
+            except:
+                # TODO(erikj): Handle this better?
+                logger.exception("Failed to get remote room alias")
+
+            if result and "room_id" in result and "servers" in result:
+                room_id = result["room_id"]
+                servers = result["servers"]
+
+        if not room_id:
+            defer.returnValue({})
+            return
+
+        defer.returnValue({
+            "room_id": room_id,
+            "servers": servers,
+        })
+        return
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
new file mode 100644
index 0000000000..79742a4e1c
--- /dev/null
+++ b/synapse/handlers/events.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from ._base import BaseHandler
+from synapse.api.streams.event import (
+    EventStream, MessagesStreamData, RoomMemberStreamData, FeedbackStreamData,
+    RoomDataStreamData
+)
+from synapse.handlers.presence import PresenceStreamData
+
+
+class EventStreamHandler(BaseHandler):
+
+    stream_data_classes = [
+        MessagesStreamData,
+        RoomMemberStreamData,
+        FeedbackStreamData,
+        RoomDataStreamData,
+        PresenceStreamData,
+    ]
+
+    def __init__(self, hs):
+        super(EventStreamHandler, self).__init__(hs)
+
+        # Count of active streams per user
+        self._streams_per_user = {}
+        # Grace timers per user to delay the "stopped" signal
+        self._stop_timer_per_user = {}
+
+        self.distributor = hs.get_distributor()
+        self.distributor.declare("started_user_eventstream")
+        self.distributor.declare("stopped_user_eventstream")
+
+        self.clock = hs.get_clock()
+
+    def get_event_stream_token(self, stream_type, store_id, start_token):
+        """Return the next token after this event.
+
+        Args:
+            stream_type (str): The StreamData.EVENT_TYPE
+            store_id (int): The new storage ID assigned from the data store.
+            start_token (str): The token the user started with.
+        Returns:
+            str: The end token.
+        """
+        for i, stream_cls in enumerate(EventStreamHandler.stream_data_classes):
+            if stream_cls.EVENT_TYPE == stream_type:
+                # this is the stream for this event, so replace this part of
+                # the token
+                store_ids = start_token.split(EventStream.SEPARATOR)
+                store_ids[i] = str(store_id)
+                return EventStream.SEPARATOR.join(store_ids)
+        raise RuntimeError("Didn't find a stream type %s" % stream_type)
+
+    @defer.inlineCallbacks
+    def get_stream(self, auth_user_id, pagin_config, timeout=0):
+        """Gets events as an event stream for this user.
+
+        This function looks for interesting *events* for this user. This is
+        different from the notifier, which looks for interested *users* who may
+        want to know about a single event.
+
+        Args:
+            auth_user_id (str): The user requesting their event stream.
+            pagin_config (synapse.api.streams.PaginationConfig): The config to
+            use when obtaining the stream.
+            timeout (int): The max time to wait for an incoming event in ms.
+        Returns:
+            A pagination stream API dict
+        """
+        auth_user = self.hs.parse_userid(auth_user_id)
+
+        stream_id = object()
+
+        try:
+            if auth_user not in self._streams_per_user:
+                self._streams_per_user[auth_user] = 0
+                if auth_user in self._stop_timer_per_user:
+                    self.clock.cancel_call_later(
+                        self._stop_timer_per_user.pop(auth_user))
+                else:
+                    self.distributor.fire(
+                        "started_user_eventstream", auth_user
+                    )
+            self._streams_per_user[auth_user] += 1
+
+            # construct an event stream with the correct data ordering
+            stream_data_list = []
+            for stream_class in EventStreamHandler.stream_data_classes:
+                stream_data_list.append(stream_class(self.hs))
+            event_stream = EventStream(auth_user_id, stream_data_list)
+
+            # fix unknown tokens to known tokens
+            pagin_config = yield event_stream.fix_tokens(pagin_config)
+
+            # register interest in receiving new events
+            self.notifier.store_events_for(user_id=auth_user_id,
+                                           stream_id=stream_id,
+                                           from_tok=pagin_config.from_tok)
+
+            # see if we can grab a chunk now
+            data_chunk = yield event_stream.get_chunk(config=pagin_config)
+
+            # if there are previous events, return those. If not, wait on the
+            # new events for 'timeout' seconds.
+            if len(data_chunk["chunk"]) == 0 and timeout != 0:
+                results = yield defer.maybeDeferred(
+                    self.notifier.get_events_for,
+                    user_id=auth_user_id,
+                    stream_id=stream_id,
+                    timeout=timeout
+                )
+                if results:
+                    defer.returnValue(results)
+
+            defer.returnValue(data_chunk)
+        finally:
+            # cleanup
+            self.notifier.purge_events_for(user_id=auth_user_id,
+                                           stream_id=stream_id)
+
+            self._streams_per_user[auth_user] -= 1
+            if not self._streams_per_user[auth_user]:
+                del self._streams_per_user[auth_user]
+
+                # 10 seconds of grace to allow the client to reconnect again
+                #   before we think they're gone
+                def _later():
+                    self.distributor.fire(
+                        "stopped_user_eventstream", auth_user
+                    )
+                    del self._stop_timer_per_user[auth_user]
+
+                self._stop_timer_per_user[auth_user] = (
+                    self.clock.call_later(5, _later)
+                )
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
new file mode 100644
index 0000000000..12e7afca4c
--- /dev/null
+++ b/synapse/handlers/federation.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Contains handlers for federation events."""
+
+from ._base import BaseHandler
+
+from synapse.api.events.room import InviteJoinEvent, RoomMemberEvent
+from synapse.api.constants import Membership
+from synapse.util.logutils import log_function
+
+from twisted.internet import defer
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+class FederationHandler(BaseHandler):
+
+    """Handles events that originated from federation."""
+
+    @log_function
+    @defer.inlineCallbacks
+    def on_receive(self, event, is_new_state):
+        if hasattr(event, "state_key") and not is_new_state:
+            logger.debug("Ignoring old state.")
+            return
+
+        target_is_mine = False
+        if hasattr(event, "target_host"):
+            target_is_mine = event.target_host == self.hs.hostname
+
+        if event.type == InviteJoinEvent.TYPE:
+            if not target_is_mine:
+                logger.debug("Ignoring invite/join event %s", event)
+                return
+
+            # If we receive an invite/join event then we need to join the
+            # sender to the given room.
+            # TODO: We should probably auth this or some such
+            content = event.content
+            content.update({"membership": Membership.JOIN})
+            new_event = self.event_factory.create_event(
+                etype=RoomMemberEvent.TYPE,
+                target_user_id=event.user_id,
+                room_id=event.room_id,
+                user_id=event.user_id,
+                membership=Membership.JOIN,
+                content=content
+            )
+
+            yield self.hs.get_handlers().room_member_handler.change_membership(
+                new_event,
+                True
+            )
+
+        else:
+            with (yield self.room_lock.lock(event.room_id)):
+                store_id = yield self.store.persist_event(event)
+
+            yield self.notifier.on_new_room_event(event, store_id)
diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py
new file mode 100644
index 0000000000..5a1acd7102
--- /dev/null
+++ b/synapse/handlers/login.py
@@ -0,0 +1,64 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from ._base import BaseHandler
+from synapse.api.errors import LoginError
+
+import bcrypt
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class LoginHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(LoginHandler, self).__init__(hs)
+        self.hs = hs
+
+    @defer.inlineCallbacks
+    def login(self, user, password):
+        """Login as the specified user with the specified password.
+
+        Args:
+            user (str): The user ID.
+            password (str): The password.
+        Returns:
+            The newly allocated access token.
+        Raises:
+            StoreError if there was a problem storing the token.
+            LoginError if there was an authentication problem.
+        """
+        # TODO do this better, it can't go in __init__ else it cyclic loops
+        if not hasattr(self, "reg_handler"):
+            self.reg_handler = self.hs.get_handlers().registration_handler
+
+        # pull out the hash for this user if they exist
+        user_info = yield self.store.get_user_by_id(user_id=user)
+        if not user_info:
+            logger.warn("Attempted to login as %s but they do not exist.", user)
+            raise LoginError(403, "")
+
+        stored_hash = user_info[0]["password_hash"]
+        if bcrypt.checkpw(password, stored_hash):
+            # generate an access token and store it.
+            token = self.reg_handler._generate_token(user)
+            logger.info("Adding token %s for user %s", token, user)
+            yield self.store.add_access_token_to_user(user, token)
+            defer.returnValue(token)
+        else:
+            logger.warn("Failed password login for user %s", user)
+            raise LoginError(403, "")
\ No newline at end of file
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
new file mode 100644
index 0000000000..38db4b1d67
--- /dev/null
+++ b/synapse/handlers/presence.py
@@ -0,0 +1,697 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError, AuthError
+from synapse.api.constants import PresenceState
+from synapse.api.streams import StreamData
+
+from ._base import BaseHandler
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+# TODO(paul): Maybe there's one of these I can steal from somewhere
+def partition(l, func):
+    """Partition the list by the result of func applied to each element."""
+    ret = {}
+
+    for x in l:
+        key = func(x)
+        if key not in ret:
+            ret[key] = []
+        ret[key].append(x)
+
+    return ret
+
+
+def partitionbool(l, func):
+    def boolfunc(x):
+        return bool(func(x))
+
+    ret = partition(l, boolfunc)
+    return ret.get(True, []), ret.get(False, [])
+
+
+class PresenceHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(PresenceHandler, self).__init__(hs)
+
+        self.homeserver = hs
+
+        distributor = hs.get_distributor()
+        distributor.observe("registered_user", self.registered_user)
+
+        distributor.observe(
+            "started_user_eventstream", self.started_user_eventstream
+        )
+        distributor.observe(
+            "stopped_user_eventstream", self.stopped_user_eventstream
+        )
+
+        distributor.observe("user_joined_room",
+            self.user_joined_room
+        )
+
+        distributor.declare("collect_presencelike_data")
+
+        distributor.declare("changed_presencelike_data")
+        distributor.observe(
+            "changed_presencelike_data", self.changed_presencelike_data
+        )
+
+        self.distributor = distributor
+
+        self.federation = hs.get_replication_layer()
+
+        self.federation.register_edu_handler(
+            "m.presence", self.incoming_presence
+        )
+        self.federation.register_edu_handler(
+            "m.presence_invite",
+            lambda origin, content: self.invite_presence(
+                observed_user=hs.parse_userid(content["observed_user"]),
+                observer_user=hs.parse_userid(content["observer_user"]),
+            )
+        )
+        self.federation.register_edu_handler(
+            "m.presence_accept",
+            lambda origin, content: self.accept_presence(
+                observed_user=hs.parse_userid(content["observed_user"]),
+                observer_user=hs.parse_userid(content["observer_user"]),
+            )
+        )
+        self.federation.register_edu_handler(
+            "m.presence_deny",
+            lambda origin, content: self.deny_presence(
+                observed_user=hs.parse_userid(content["observed_user"]),
+                observer_user=hs.parse_userid(content["observer_user"]),
+            )
+        )
+
+        # IN-MEMORY store, mapping local userparts to sets of local users to
+        # be informed of state changes.
+        self._local_pushmap = {}
+        # map local users to sets of remote /domain names/ who are interested
+        # in them
+        self._remote_sendmap = {}
+        # map remote users to sets of local users who're interested in them
+        self._remote_recvmap = {}
+
+        # map any user to a UserPresenceCache
+        self._user_cachemap = {}
+        self._user_cachemap_latest_serial = 0
+
+    def _get_or_make_usercache(self, user):
+        """If the cache entry doesn't exist, initialise a new one."""
+        if user not in self._user_cachemap:
+            self._user_cachemap[user] = UserPresenceCache()
+        return self._user_cachemap[user]
+
+    def _get_or_offline_usercache(self, user):
+        """If the cache entry doesn't exist, return an OFFLINE one but do not
+        store it into the cache."""
+        if user in self._user_cachemap:
+            return self._user_cachemap[user]
+        else:
+            statuscache = UserPresenceCache()
+            statuscache.update({"state": PresenceState.OFFLINE}, user)
+            return statuscache
+
+    def registered_user(self, user):
+        self.store.create_presence(user.localpart)
+
+    @defer.inlineCallbacks
+    def is_presence_visible(self, observer_user, observed_user):
+        assert(observed_user.is_mine)
+
+        if observer_user == observed_user:
+            defer.returnValue(True)
+
+        allowed_by_subscription = yield self.store.is_presence_visible(
+            observed_localpart=observed_user.localpart,
+            observer_userid=observer_user.to_string(),
+        )
+
+        if allowed_by_subscription:
+            defer.returnValue(True)
+
+        # TODO(paul): Check same channel
+
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def get_state(self, target_user, auth_user):
+        if target_user.is_mine:
+            visible = yield self.is_presence_visible(observer_user=auth_user,
+                observed_user=target_user
+            )
+
+            if visible:
+                state = yield self.store.get_presence_state(
+                    target_user.localpart
+                )
+                defer.returnValue(state)
+            else:
+                raise SynapseError(404, "Presence information not visible")
+        else:
+            # TODO(paul): Have remote server send us permissions set
+            defer.returnValue(
+                    self._get_or_offline_usercache(target_user).get_state()
+            )
+
+    @defer.inlineCallbacks
+    def set_state(self, target_user, auth_user, state):
+        if not target_user.is_mine:
+            raise SynapseError(400, "User is not hosted on this Home Server")
+
+        if target_user != auth_user:
+            raise AuthError(400, "Cannot set another user's displayname")
+
+        # TODO(paul): Sanity-check 'state'
+        if "status_msg" not in state:
+            state["status_msg"] = None
+
+        for k in state.keys():
+            if k not in ("state", "status_msg"):
+                raise SynapseError(
+                    400, "Unexpected presence state key '%s'" % (k,)
+                )
+
+        logger.debug("Updating presence state of %s to %s",
+                     target_user.localpart, state["state"])
+
+        state_to_store = dict(state)
+
+        yield defer.DeferredList([
+            self.store.set_presence_state(
+                target_user.localpart, state_to_store
+            ),
+            self.distributor.fire(
+                "collect_presencelike_data", target_user, state
+            ),
+        ])
+
+        now_online = state["state"] != PresenceState.OFFLINE
+        was_polling = target_user in self._user_cachemap
+
+        if now_online and not was_polling:
+            self.start_polling_presence(target_user, state=state)
+        elif not now_online and was_polling:
+            self.stop_polling_presence(target_user)
+
+        # TODO(paul): perform a presence push as part of start/stop poll so
+        #   we don't have to do this all the time
+        self.changed_presencelike_data(target_user, state)
+
+        if not now_online:
+            del self._user_cachemap[target_user]
+
+    def changed_presencelike_data(self, user, state):
+        statuscache = self._get_or_make_usercache(user)
+
+        self._user_cachemap_latest_serial += 1
+        statuscache.update(state, serial=self._user_cachemap_latest_serial)
+
+        self.push_presence(user, statuscache=statuscache)
+
+    def started_user_eventstream(self, user):
+        # TODO(paul): Use "last online" state
+        self.set_state(user, user, {"state": PresenceState.ONLINE})
+
+    def stopped_user_eventstream(self, user):
+        # TODO(paul): Save current state as "last online" state
+        self.set_state(user, user, {"state": PresenceState.OFFLINE})
+
+    @defer.inlineCallbacks
+    def user_joined_room(self, user, room_id):
+        localusers = set()
+        remotedomains = set()
+
+        rm_handler = self.homeserver.get_handlers().room_member_handler
+        yield rm_handler.fetch_room_distributions_into(room_id,
+                localusers=localusers, remotedomains=remotedomains,
+                ignore_user=user)
+
+        if user.is_mine:
+            yield self._send_presence_to_distribution(srcuser=user,
+                localusers=localusers, remotedomains=remotedomains,
+                statuscache=self._get_or_offline_usercache(user),
+            )
+
+        for srcuser in localusers:
+            yield self._send_presence(srcuser=srcuser, destuser=user,
+                statuscache=self._get_or_offline_usercache(srcuser),
+            )
+
+    @defer.inlineCallbacks
+    def send_invite(self, observer_user, observed_user):
+        if not observer_user.is_mine:
+            raise SynapseError(400, "User is not hosted on this Home Server")
+
+        yield self.store.add_presence_list_pending(
+            observer_user.localpart, observed_user.to_string()
+        )
+
+        if observed_user.is_mine:
+            yield self.invite_presence(observed_user, observer_user)
+        else:
+            yield self.federation.send_edu(
+                destination=observed_user.domain,
+                edu_type="m.presence_invite",
+                content={
+                    "observed_user": observed_user.to_string(),
+                    "observer_user": observer_user.to_string(),
+                }
+            )
+
+    @defer.inlineCallbacks
+    def _should_accept_invite(self, observed_user, observer_user):
+        if not observed_user.is_mine:
+            defer.returnValue(False)
+
+        row = yield self.store.has_presence_state(observed_user.localpart)
+        if not row:
+            defer.returnValue(False)
+
+        # TODO(paul): Eventually we'll ask the user's permission for this
+        # before accepting. For now just accept any invite request
+        defer.returnValue(True)
+
+    @defer.inlineCallbacks
+    def invite_presence(self, observed_user, observer_user):
+        accept = yield self._should_accept_invite(observed_user, observer_user)
+
+        if accept:
+            yield self.store.allow_presence_visible(
+                observed_user.localpart, observer_user.to_string()
+            )
+
+        if observer_user.is_mine:
+            if accept:
+                yield self.accept_presence(observed_user, observer_user)
+            else:
+                yield self.deny_presence(observed_user, observer_user)
+        else:
+            edu_type = "m.presence_accept" if accept else "m.presence_deny"
+
+            yield self.federation.send_edu(
+                destination=observer_user.domain,
+                edu_type=edu_type,
+                content={
+                    "observed_user": observed_user.to_string(),
+                    "observer_user": observer_user.to_string(),
+                }
+            )
+
+    @defer.inlineCallbacks
+    def accept_presence(self, observed_user, observer_user):
+        yield self.store.set_presence_list_accepted(
+            observer_user.localpart, observed_user.to_string()
+        )
+
+        self.start_polling_presence(observer_user, target_user=observed_user)
+
+    @defer.inlineCallbacks
+    def deny_presence(self, observed_user, observer_user):
+        yield self.store.del_presence_list(
+            observer_user.localpart, observed_user.to_string()
+        )
+
+        # TODO(paul): Inform the user somehow?
+
+    @defer.inlineCallbacks
+    def drop(self, observed_user, observer_user):
+        if not observer_user.is_mine:
+            raise SynapseError(400, "User is not hosted on this Home Server")
+
+        yield self.store.del_presence_list(
+            observer_user.localpart, observed_user.to_string()
+        )
+
+        self.stop_polling_presence(observer_user, target_user=observed_user)
+
+    @defer.inlineCallbacks
+    def get_presence_list(self, observer_user, accepted=None):
+        if not observer_user.is_mine:
+            raise SynapseError(400, "User is not hosted on this Home Server")
+
+        presence = yield self.store.get_presence_list(
+            observer_user.localpart, accepted=accepted
+        )
+
+        for p in presence:
+            observed_user = self.hs.parse_userid(p.pop("observed_user_id"))
+            p["observed_user"] = observed_user
+            p.update(self._get_or_offline_usercache(observed_user).get_state())
+
+        defer.returnValue(presence)
+
+    @defer.inlineCallbacks
+    def start_polling_presence(self, user, target_user=None, state=None):
+        logger.debug("Start polling for presence from %s", user)
+
+        if target_user:
+            target_users = [target_user]
+        else:
+            presence = yield self.store.get_presence_list(
+                user.localpart, accepted=True
+            )
+            target_users = [
+                self.hs.parse_userid(x["observed_user_id"]) for x in presence
+            ]
+
+        if state is None:
+            state = yield self.store.get_presence_state(user.localpart)
+
+        localusers, remoteusers = partitionbool(
+            target_users,
+            lambda u: u.is_mine
+        )
+
+        for target_user in localusers:
+            self._start_polling_local(user, target_user)
+
+        deferreds = []
+        remoteusers_by_domain = partition(remoteusers, lambda u: u.domain)
+        for domain in remoteusers_by_domain:
+            remoteusers = remoteusers_by_domain[domain]
+
+            deferreds.append(self._start_polling_remote(
+                user, domain, remoteusers
+            ))
+
+        yield defer.DeferredList(deferreds)
+
+    def _start_polling_local(self, user, target_user):
+        target_localpart = target_user.localpart
+
+        if not self.is_presence_visible(observer_user=user,
+            observed_user=target_user):
+            return
+
+        if target_localpart not in self._local_pushmap:
+            self._local_pushmap[target_localpart] = set()
+
+        self._local_pushmap[target_localpart].add(user)
+
+        self.push_update_to_clients(
+            observer_user=user,
+            observed_user=target_user,
+            statuscache=self._get_or_offline_usercache(target_user),
+        )
+
+    def _start_polling_remote(self, user, domain, remoteusers):
+        for u in remoteusers:
+            if u not in self._remote_recvmap:
+                self._remote_recvmap[u] = set()
+
+            self._remote_recvmap[u].add(user)
+
+        return self.federation.send_edu(
+            destination=domain,
+            edu_type="m.presence",
+            content={"poll": [u.to_string() for u in remoteusers]}
+        )
+
+    def stop_polling_presence(self, user, target_user=None):
+        logger.debug("Stop polling for presence from %s", user)
+
+        if not target_user or target_user.is_mine:
+            self._stop_polling_local(user, target_user=target_user)
+
+        deferreds = []
+
+        if target_user:
+            raise NotImplementedError("TODO: remove one user")
+
+        remoteusers = [u for u in self._remote_recvmap
+                       if user in self._remote_recvmap[u]]
+        remoteusers_by_domain = partition(remoteusers, lambda u: u.domain)
+
+        for domain in remoteusers_by_domain:
+            remoteusers = remoteusers_by_domain[domain]
+
+            deferreds.append(
+                self._stop_polling_remote(user, domain, remoteusers)
+            )
+
+        return defer.DeferredList(deferreds)
+
+    def _stop_polling_local(self, user, target_user):
+        for localpart in self._local_pushmap.keys():
+            if target_user and localpart != target_user.localpart:
+                continue
+
+            if user in self._local_pushmap[localpart]:
+                self._local_pushmap[localpart].remove(user)
+
+            if not self._local_pushmap[localpart]:
+                del self._local_pushmap[localpart]
+
+    def _stop_polling_remote(self, user, domain, remoteusers):
+        for u in remoteusers:
+            self._remote_recvmap[u].remove(user)
+
+            if not self._remote_recvmap[u]:
+                del self._remote_recvmap[u]
+
+        return self.federation.send_edu(
+            destination=domain,
+            edu_type="m.presence",
+            content={"unpoll": [u.to_string() for u in remoteusers]}
+        )
+
+    @defer.inlineCallbacks
+    def push_presence(self, user, statuscache):
+        assert(user.is_mine)
+
+        logger.debug("Pushing presence update from %s", user)
+
+        localusers = set(self._local_pushmap.get(user.localpart, set()))
+        remotedomains = set(self._remote_sendmap.get(user.localpart, set()))
+
+        # Reflect users' status changes back to themselves, so UIs look nice
+        # and also user is informed of server-forced pushes
+        localusers.add(user)
+
+        rm_handler = self.homeserver.get_handlers().room_member_handler
+        room_ids = yield rm_handler.get_rooms_for_user(user)
+
+        for room_id in room_ids:
+            yield rm_handler.fetch_room_distributions_into(
+                room_id, localusers=localusers, remotedomains=remotedomains,
+                ignore_user=user,
+            )
+
+        if not localusers and not remotedomains:
+            defer.returnValue(None)
+
+        yield self._send_presence_to_distribution(user,
+            localusers=localusers, remotedomains=remotedomains,
+            statuscache=statuscache
+        )
+
+    def _send_presence(self, srcuser, destuser, statuscache):
+        if destuser.is_mine:
+            self.push_update_to_clients(
+                observer_user=destuser,
+                observed_user=srcuser,
+                statuscache=statuscache)
+            return defer.succeed(None)
+        else:
+            return self._push_presence_remote(srcuser, destuser.domain,
+                state=statuscache.get_state()
+            )
+
+    @defer.inlineCallbacks
+    def _send_presence_to_distribution(self, srcuser, localusers=set(),
+            remotedomains=set(), statuscache=None):
+
+        for u in localusers:
+            logger.debug(" | push to local user %s", u)
+            self.push_update_to_clients(
+                observer_user=u,
+                observed_user=srcuser,
+                statuscache=statuscache,
+            )
+
+        deferreds = []
+        for domain in remotedomains:
+            logger.debug(" | push to remote domain %s", domain)
+            deferreds.append(self._push_presence_remote(srcuser, domain,
+                state=statuscache.get_state())
+            )
+
+        yield defer.DeferredList(deferreds)
+
+    @defer.inlineCallbacks
+    def _push_presence_remote(self, user, destination, state=None):
+        if state is None:
+            state = yield self.store.get_presence_state(user.localpart)
+            yield self.distributor.fire(
+                "collect_presencelike_data", user, state
+            )
+
+        yield self.federation.send_edu(
+            destination=destination,
+            edu_type="m.presence",
+            content={
+                "push": [
+                    dict(user_id=user.to_string(), **state),
+                ],
+            }
+        )
+
+    @defer.inlineCallbacks
+    def incoming_presence(self, origin, content):
+        deferreds = []
+
+        for push in content.get("push", []):
+            user = self.hs.parse_userid(push["user_id"])
+
+            logger.debug("Incoming presence update from %s", user)
+
+            observers = set(self._remote_recvmap.get(user, set()))
+
+            rm_handler = self.homeserver.get_handlers().room_member_handler
+            room_ids = yield rm_handler.get_rooms_for_user(user)
+
+            for room_id in room_ids:
+                yield rm_handler.fetch_room_distributions_into(
+                    room_id, localusers=observers, ignore_user=user
+                )
+
+            if not observers:
+                break
+
+            state = dict(push)
+            del state["user_id"]
+
+            statuscache = self._get_or_make_usercache(user)
+
+            self._user_cachemap_latest_serial += 1
+            statuscache.update(state, serial=self._user_cachemap_latest_serial)
+
+            for observer_user in observers:
+                self.push_update_to_clients(
+                    observer_user=observer_user,
+                    observed_user=user,
+                    statuscache=statuscache,
+                )
+
+            if state["state"] == PresenceState.OFFLINE:
+                del self._user_cachemap[user]
+
+        for poll in content.get("poll", []):
+            user = self.hs.parse_userid(poll)
+
+            if not user.is_mine:
+                continue
+
+            # TODO(paul) permissions checks
+
+            if not user in self._remote_sendmap:
+                self._remote_sendmap[user] = set()
+
+            self._remote_sendmap[user].add(origin)
+
+            deferreds.append(self._push_presence_remote(user, origin))
+
+        for unpoll in content.get("unpoll", []):
+            user = self.hs.parse_userid(unpoll)
+
+            if not user.is_mine:
+                continue
+
+            if user in self._remote_sendmap:
+                self._remote_sendmap[user].remove(origin)
+
+                if not self._remote_sendmap[user]:
+                    del self._remote_sendmap[user]
+
+        yield defer.DeferredList(deferreds)
+
+    def push_update_to_clients(self, observer_user, observed_user,
+                               statuscache):
+        self.notifier.on_new_user_event(
+            observer_user.to_string(),
+            event_data=statuscache.make_event(user=observed_user),
+            stream_type=PresenceStreamData,
+            store_id=statuscache.serial
+        )
+
+
+class PresenceStreamData(StreamData):
+    def __init__(self, hs):
+        super(PresenceStreamData, self).__init__(hs)
+        self.presence = hs.get_handlers().presence_handler
+
+    def get_rows(self, user_id, from_key, to_key, limit):
+        cachemap = self.presence._user_cachemap
+
+        # TODO(paul): limit, and filter by visibility
+        updates = [(k, cachemap[k]) for k in cachemap
+                   if from_key < cachemap[k].serial <= to_key]
+
+        if updates:
+            latest_serial = max([x[1].serial for x in updates])
+            data = [x[1].make_event(user=x[0]) for x in updates]
+            return ((data, latest_serial))
+        else:
+            return (([], self.presence._user_cachemap_latest_serial))
+
+    def max_token(self):
+        return self.presence._user_cachemap_latest_serial
+
+PresenceStreamData.EVENT_TYPE = PresenceStreamData
+
+
+class UserPresenceCache(object):
+    """Store an observed user's state and status message.
+
+    Includes the update timestamp.
+    """
+    def __init__(self):
+        self.state = {}
+        self.serial = None
+
+    def update(self, state, serial):
+        self.state.update(state)
+        # Delete keys that are now 'None'
+        for k in self.state.keys():
+            if self.state[k] is None:
+                del self.state[k]
+
+        self.serial = serial
+
+        if "status_msg" in state:
+            self.status_msg = state["status_msg"]
+        else:
+            self.status_msg = None
+
+    def get_state(self):
+        # clone it so caller can't break our cache
+        return dict(self.state)
+
+    def make_event(self, user):
+        content = self.get_state()
+        content["user_id"] = user.to_string()
+
+        return {"type": "m.presence", "content": content}
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
new file mode 100644
index 0000000000..a27206b002
--- /dev/null
+++ b/synapse/handlers/profile.py
@@ -0,0 +1,169 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.internet import defer
+
+from synapse.api.errors import SynapseError, AuthError
+
+from synapse.api.errors import CodeMessageException
+
+from ._base import BaseHandler
+
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+PREFIX = "/matrix/client/api/v1"
+
+
+class ProfileHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(ProfileHandler, self).__init__(hs)
+
+        self.client = hs.get_http_client()
+
+        distributor = hs.get_distributor()
+        self.distributor = distributor
+
+        distributor.observe("registered_user", self.registered_user)
+
+        distributor.observe(
+            "collect_presencelike_data", self.collect_presencelike_data
+        )
+
+    def registered_user(self, user):
+        self.store.create_profile(user.localpart)
+
+    @defer.inlineCallbacks
+    def get_displayname(self, target_user, local_only=False):
+        if target_user.is_mine:
+            displayname = yield self.store.get_profile_displayname(
+                target_user.localpart
+            )
+
+            defer.returnValue(displayname)
+        elif not local_only:
+            # TODO(paul): This should use the server-server API to ask another
+            # HS. For now we'll just have it use the http client to talk to the
+            # other HS's REST client API
+            path = PREFIX + "/profile/%s/displayname?local_only=1" % (
+                target_user.to_string()
+            )
+
+            try:
+                result = yield self.client.get_json(
+                    destination=target_user.domain,
+                    path=path
+                )
+            except CodeMessageException as e:
+                if e.code != 404:
+                    logger.exception("Failed to get displayname")
+
+                raise
+            except:
+                logger.exception("Failed to get displayname")
+
+            defer.returnValue(result["displayname"])
+        else:
+            raise SynapseError(400, "User is not hosted on this Home Server")
+
+    @defer.inlineCallbacks
+    def set_displayname(self, target_user, auth_user, new_displayname):
+        """target_user is the user whose displayname is to be changed;
+        auth_user is the user attempting to make this change."""
+        if not target_user.is_mine:
+            raise SynapseError(400, "User is not hosted on this Home Server")
+
+        if target_user != auth_user:
+            raise AuthError(400, "Cannot set another user's displayname")
+
+        yield self.store.set_profile_displayname(
+            target_user.localpart, new_displayname
+        )
+
+        yield self.distributor.fire(
+            "changed_presencelike_data", target_user, {
+                "displayname": new_displayname,
+            }
+        )
+
+    @defer.inlineCallbacks
+    def get_avatar_url(self, target_user, local_only=False):
+        if target_user.is_mine:
+            avatar_url = yield self.store.get_profile_avatar_url(
+                target_user.localpart
+            )
+
+            defer.returnValue(avatar_url)
+        elif not local_only:
+            # TODO(paul): This should use the server-server API to ask another
+            # HS. For now we'll just have it use the http client to talk to the
+            # other HS's REST client API
+            destination = target_user.domain
+            path = PREFIX + "/profile/%s/avatar_url?local_only=1" % (
+                target_user.to_string(),
+            )
+
+            try:
+                result = yield self.client.get_json(
+                    destination=destination,
+                    path=path
+                )
+            except CodeMessageException as e:
+                if e.code != 404:
+                    logger.exception("Failed to get avatar_url")
+                raise
+            except:
+                logger.exception("Failed to get avatar_url")
+
+            defer.returnValue(result["avatar_url"])
+        else:
+            raise SynapseError(400, "User is not hosted on this Home Server")
+
+    @defer.inlineCallbacks
+    def set_avatar_url(self, target_user, auth_user, new_avatar_url):
+        """target_user is the user whose avatar_url is to be changed;
+        auth_user is the user attempting to make this change."""
+        if not target_user.is_mine:
+            raise SynapseError(400, "User is not hosted on this Home Server")
+
+        if target_user != auth_user:
+            raise AuthError(400, "Cannot set another user's avatar_url")
+
+        yield self.store.set_profile_avatar_url(
+            target_user.localpart, new_avatar_url
+        )
+
+        yield self.distributor.fire(
+            "changed_presencelike_data", target_user, {
+                "avatar_url": new_avatar_url,
+            }
+        )
+
+    @defer.inlineCallbacks
+    def collect_presencelike_data(self, user, state):
+        if not user.is_mine:
+            defer.returnValue(None)
+
+        (displayname, avatar_url) = yield defer.gatherResults([
+            self.store.get_profile_displayname(user.localpart),
+            self.store.get_profile_avatar_url(user.localpart),
+        ])
+
+        state["displayname"] = displayname
+        state["avatar_url"] = avatar_url
+
+        defer.returnValue(None)
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
new file mode 100644
index 0000000000..246c1f6530
--- /dev/null
+++ b/synapse/handlers/register.py
@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Contains functions for registering clients."""
+from twisted.internet import defer
+
+from synapse.types import UserID
+from synapse.api.errors import SynapseError, RegistrationError
+from ._base import BaseHandler
+import synapse.util.stringutils as stringutils
+
+import base64
+import bcrypt
+
+
+class RegistrationHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(RegistrationHandler, self).__init__(hs)
+
+        self.distributor = hs.get_distributor()
+        self.distributor.declare("registered_user")
+
+    @defer.inlineCallbacks
+    def register(self, localpart=None, password=None):
+        """Registers a new client on the server.
+
+        Args:
+            localpart : The local part of the user ID to register. If None,
+              one will be randomly generated.
+            password (str) : The password to assign to this user so they can
+            login again.
+        Returns:
+            A tuple of (user_id, access_token).
+        Raises:
+            RegistrationError if there was a problem registering.
+        """
+        password_hash = None
+        if password:
+            password_hash = bcrypt.hashpw(password, bcrypt.gensalt())
+
+        if localpart:
+            user = UserID(localpart, self.hs.hostname, True)
+            user_id = user.to_string()
+
+            token = self._generate_token(user_id)
+            yield self.store.register(user_id=user_id,
+                token=token,
+                password_hash=password_hash)
+
+            self.distributor.fire("registered_user", user)
+            defer.returnValue((user_id, token))
+        else:
+            # autogen a random user ID
+            attempts = 0
+            user_id = None
+            token = None
+            while not user_id and not token:
+                try:
+                    localpart = self._generate_user_id()
+                    user = UserID(localpart, self.hs.hostname, True)
+                    user_id = user.to_string()
+
+                    token = self._generate_token(user_id)
+                    yield self.store.register(
+                        user_id=user_id,
+                        token=token,
+                        password_hash=password_hash)
+
+                    self.distributor.fire("registered_user", user)
+                    defer.returnValue((user_id, token))
+                except SynapseError:
+                    # if user id is taken, just generate another
+                    user_id = None
+                    token = None
+                    attempts += 1
+                    if attempts > 5:
+                        raise RegistrationError(
+                            500, "Cannot generate user ID.")
+
+    def _generate_token(self, user_id):
+        # urlsafe variant uses _ and - so use . as the separator and replace
+        # all =s with .s so http clients don't quote =s when it is used as
+        # query params.
+        return (base64.urlsafe_b64encode(user_id).replace('=', '.') + '.' +
+                stringutils.random_string(18))
+
+    def _generate_user_id(self):
+        return "-" + stringutils.random_string(18)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
new file mode 100644
index 0000000000..4d82b33993
--- /dev/null
+++ b/synapse/handlers/room.py
@@ -0,0 +1,808 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 matrix.org
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Contains functions for performing events on rooms."""
+from twisted.internet import defer
+
+from synapse.types import UserID, RoomAlias, RoomID
+from synapse.api.constants import Membership
+from synapse.api.errors import RoomError, StoreError, SynapseError
+from synapse.api.events.room import (
+    RoomTopicEvent, MessageEvent, InviteJoinEvent, RoomMemberEvent,
+    RoomConfigEvent
+)
+from synapse.api.streams.event import EventStream, MessagesStreamData
+from synapse.util import stringutils
+from ._base import BaseHandler
+
+import logging
+import json
+
+logger = logging.getLogger(__name__)
+
+
+class MessageHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(MessageHandler, self).__init__(hs)
+        self.hs = hs
+        self.clock = hs.get_clock()
+        self.event_factory = hs.get_event_factory()
+
+    @defer.inlineCallbacks
+    def get_message(self, msg_id=None, room_id=None, sender_id=None,
+                    user_id=None):
+        """ Retrieve a message.
+
+        Args:
+            msg_id (str): The message ID to obtain.
+            room_id (str): The room where the message resides.
+            sender_id (str): The user ID of the user who sent the message.
+            user_id (str): The user ID of the user making this request.
+        Returns:
+            The message, or None if no message exists.
+        Raises:
+            SynapseError if something went wrong.
+        """
+        yield self.auth.check_joined_room(room_id, user_id)
+
+        # Pull out the message from the db
+        msg = yield self.store.get_message(room_id=room_id,
+                                           msg_id=msg_id,
+                                           user_id=sender_id)
+
+        if msg:
+            defer.returnValue(msg)
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def send_message(self, event=None, suppress_auth=False, stamp_event=True):
+        """ Send a message.
+
+        Args:
+            event : The message event to store.
+            suppress_auth (bool) : True to suppress auth for this message. This
+            is primarily so the home server can inject messages into rooms at
+            will.
+            stamp_event (bool) : True to stamp event content with server keys.
+        Raises:
+            SynapseError if something went wrong.
+        """
+        if stamp_event:
+            event.content["hsob_ts"] = int(self.clock.time_msec())
+
+        with (yield self.room_lock.lock(event.room_id)):
+            if not suppress_auth:
+                yield self.auth.check(event, raises=True)
+
+            # store message in db
+            store_id = yield self.store.persist_event(event)
+
+            event.destinations = yield self.store.get_joined_hosts_for_room(
+                event.room_id
+            )
+
+            yield self.hs.get_federation().handle_new_event(event)
+
+            self.notifier.on_new_room_event(event, store_id)
+
+    @defer.inlineCallbacks
+    def get_messages(self, user_id=None, room_id=None, pagin_config=None,
+                     feedback=False):
+        """Get messages in a room.
+
+        Args:
+            user_id (str): The user requesting messages.
+            room_id (str): The room they want messages from.
+            pagin_config (synapse.api.streams.PaginationConfig): The pagination
+            config rules to apply, if any.
+            feedback (bool): True to get compressed feedback with the messages
+        Returns:
+            dict: Pagination API results
+        """
+        yield self.auth.check_joined_room(room_id, user_id)
+
+        data_source = [MessagesStreamData(self.hs, room_id=room_id,
+                                          feedback=feedback)]
+        event_stream = EventStream(user_id, data_source)
+        pagin_config = yield event_stream.fix_tokens(pagin_config)
+        data_chunk = yield event_stream.get_chunk(config=pagin_config)
+        defer.returnValue(data_chunk)
+
+    @defer.inlineCallbacks
+    def store_room_data(self, event=None, stamp_event=True):
+        """ Stores data for a room.
+
+        Args:
+            event : The room path event
+            stamp_event (bool) : True to stamp event content with server keys.
+        Raises:
+            SynapseError if something went wrong.
+        """
+
+        with (yield self.room_lock.lock(event.room_id)):
+            yield self.auth.check(event, raises=True)
+
+            if stamp_event:
+                event.content["hsob_ts"] = int(self.clock.time_msec())
+
+            yield self.state_handler.handle_new_event(event)
+
+            # store in db
+            store_id = yield self.store.store_room_data(
+                room_id=event.room_id,
+                etype=event.type,
+                state_key=event.state_key,
+                content=json.dumps(event.content)
+            )
+
+            event.destinations = yield self.store.get_joined_hosts_for_room(
+                event.room_id
+            )
+            self.notifier.on_new_room_event(event, store_id)
+
+        yield self.hs.get_federation().handle_new_event(event)
+
+    @defer.inlineCallbacks
+    def get_room_data(self, user_id=None, room_id=None,
+                      event_type=None, state_key="",
+                      public_room_rules=[],
+                      private_room_rules=["join"]):
+        """ Get data from a room.
+
+        Args:
+            event : The room path event
+            public_room_rules : A list of membership states the user can be in,
+            in order to read this data IN A PUBLIC ROOM. An empty list means
+            'any state'.
+            private_room_rules : A list of membership states the user can be
+            in, in order to read this data IN A PRIVATE ROOM. An empty list
+            means 'any state'.
+        Returns:
+            The path data content.
+        Raises:
+            SynapseError if something went wrong.
+        """
+        if event_type == RoomTopicEvent.TYPE:
+            # anyone invited/joined can read the topic
+            private_room_rules = ["invite", "join"]
+
+        # does this room exist
+        room = yield self.store.get_room(room_id)
+        if not room:
+            raise RoomError(403, "Room does not exist.")
+
+        # does this user exist in this room
+        member = yield self.store.get_room_member(
+            room_id=room_id,
+            user_id="" if not user_id else user_id)
+
+        member_state = member.membership if member else None
+
+        if room.is_public and public_room_rules:
+            # make sure the user meets public room rules
+            if member_state not in public_room_rules:
+                raise RoomError(403, "Member does not meet public room rules.")
+        elif not room.is_public and private_room_rules:
+            # make sure the user meets private room rules
+            if member_state not in private_room_rules:
+                raise RoomError(
+                    403, "Member does not meet private room rules.")
+
+        data = yield self.store.get_room_data(room_id, event_type, state_key)
+        defer.returnValue(data)
+
+    @defer.inlineCallbacks
+    def get_feedback(self, room_id=None, msg_sender_id=None, msg_id=None,
+                     user_id=None, fb_sender_id=None, fb_type=None):
+        yield self.auth.check_joined_room(room_id, user_id)
+
+        # Pull out the feedback from the db
+        fb = yield self.store.get_feedback(
+            room_id=room_id, msg_id=msg_id, msg_sender_id=msg_sender_id,
+            fb_sender_id=fb_sender_id, fb_type=fb_type
+        )
+
+        if fb:
+            defer.returnValue(fb)
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def send_feedback(self, event, stamp_event=True):
+        if stamp_event:
+            event.content["hsob_ts"] = int(self.clock.time_msec())
+
+        with (yield self.room_lock.lock(event.room_id)):
+            yield self.auth.check(event, raises=True)
+
+            # store message in db
+            store_id = yield self.store.persist_event(event)
+
+            event.destinations = yield self.store.get_joined_hosts_for_room(
+                event.room_id
+            )
+        yield self.hs.get_federation().handle_new_event(event)
+
+        self.notifier.on_new_room_event(event, store_id)
+
+    @defer.inlineCallbacks
+    def snapshot_all_rooms(self, user_id=None, pagin_config=None,
+                           feedback=False):
+        """Retrieve a snapshot of all rooms the user is invited or has joined.
+
+        This snapshot may include messages for all rooms where the user is
+        joined, depending on the pagination config.
+
+        Args:
+            user_id (str): The ID of the user making the request.
+            pagin_config (synapse.api.streams.PaginationConfig): The pagination
+            config used to determine how many messages *PER ROOM* to return.
+            feedback (bool): True to get feedback along with these messages.
+        Returns:
+            A list of dicts with "room_id" and "membership" keys for all rooms
+            the user is currently invited or joined in on. Rooms where the user
+            is joined on, may return a "messages" key with messages, depending
+            on the specified PaginationConfig.
+        """
+        room_list = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id=user_id,
+            membership_list=[Membership.INVITE, Membership.JOIN]
+        )
+        for room_info in room_list:
+            if room_info["membership"] != Membership.JOIN:
+                continue
+            try:
+                event_chunk = yield self.get_messages(
+                    user_id=user_id,
+                    pagin_config=pagin_config,
+                    feedback=feedback,
+                    room_id=room_info["room_id"]
+                )
+                room_info["messages"] = event_chunk
+            except:
+                pass
+        defer.returnValue(room_list)
+
+
+class RoomCreationHandler(BaseHandler):
+
+    @defer.inlineCallbacks
+    def create_room(self, user_id, room_id, config):
+        """ Creates a new room.
+
+        Args:
+            user_id (str): The ID of the user creating the new room.
+            room_id (str): The proposed ID for the new room. Can be None, in
+            which case one will be created for you.
+            config (dict) : A dict of configuration options.
+        Returns:
+            The new room ID.
+        Raises:
+            SynapseError if the room ID was taken, couldn't be stored, or
+            something went horribly wrong.
+        """
+
+        if "room_alias_name" in config:
+            room_alias = RoomAlias.create_local(
+                config["room_alias_name"],
+                self.hs
+            )
+            mapping = yield self.store.get_association_from_room_alias(
+                room_alias
+            )
+
+            if mapping:
+                raise SynapseError(400, "Room alias already taken")
+        else:
+            room_alias = None
+
+        if room_id:
+            # Ensure room_id is the correct type
+            room_id_obj = RoomID.from_string(room_id, self.hs)
+            if not room_id_obj.is_mine:
+                raise SynapseError(400, "Room id must be local")
+
+            yield self.store.store_room(
+                room_id=room_id,
+                room_creator_user_id=user_id,
+                is_public=config["visibility"] == "public"
+            )
+        else:
+            # autogen room IDs and try to create it. We may clash, so just
+            # try a few times till one goes through, giving up eventually.
+            attempts = 0
+            room_id = None
+            while attempts < 5:
+                try:
+                    random_string = stringutils.random_string(18)
+                    gen_room_id = RoomID.create_local(random_string, self.hs)
+                    yield self.store.store_room(
+                        room_id=gen_room_id.to_string(),
+                        room_creator_user_id=user_id,
+                        is_public=config["visibility"] == "public"
+                    )
+                    room_id = gen_room_id.to_string()
+                    break
+                except StoreError:
+                    attempts += 1
+            if not room_id:
+                raise StoreError(500, "Couldn't generate a room ID.")
+
+        config_event = self.event_factory.create_event(
+            etype=RoomConfigEvent.TYPE,
+            room_id=room_id,
+            user_id=user_id,
+            content=config,
+        )
+
+        if room_alias:
+            yield self.store.create_room_alias_association(
+                room_id=room_id,
+                room_alias=room_alias,
+                servers=[self.hs.hostname],
+            )
+
+        yield self.state_handler.handle_new_event(config_event)
+        # store_id = persist...
+
+        yield self.hs.get_federation().handle_new_event(config_event)
+        # self.notifier.on_new_room_event(event, store_id)
+
+        content = {"membership": Membership.JOIN}
+        join_event = self.event_factory.create_event(
+            etype=RoomMemberEvent.TYPE,
+            target_user_id=user_id,
+            room_id=room_id,
+            user_id=user_id,
+            membership=Membership.JOIN,
+            content=content
+        )
+
+        yield self.hs.get_handlers().room_member_handler.change_membership(
+            join_event,
+            broadcast_msg=True,
+            do_auth=False
+        )
+
+        result = {"room_id": room_id}
+        if room_alias:
+            result["room_alias"] = room_alias.to_string()
+
+        defer.returnValue(result)
+
+
+class RoomMemberHandler(BaseHandler):
+    # TODO(paul): This handler currently contains a messy conflation of
+    #   low-level API that works on UserID objects and so on, and REST-level
+    #   API that takes ID strings and returns pagination chunks. These concerns
+    #   ought to be separated out a lot better.
+
+    def __init__(self, hs):
+        super(RoomMemberHandler, self).__init__(hs)
+
+        self.clock = hs.get_clock()
+
+        self.distributor = hs.get_distributor()
+        self.distributor.declare("user_joined_room")
+
+    @defer.inlineCallbacks
+    def get_room_members(self, room_id, membership=Membership.JOIN):
+        hs = self.hs
+
+        memberships = yield self.store.get_room_members(
+            room_id=room_id, membership=membership
+        )
+
+        defer.returnValue([hs.parse_userid(m.user_id) for m in memberships])
+
+    @defer.inlineCallbacks
+    def fetch_room_distributions_into(self, room_id, localusers=None,
+                                      remotedomains=None, ignore_user=None):
+        """Fetch the distribution of a room, adding elements to either
+        'localusers' or 'remotedomains', which should be a set() if supplied.
+        If ignore_user is set, ignore that user.
+
+        This function returns nothing; its result is performed by the
+        side-effect on the two passed sets. This allows easy accumulation of
+        member lists of multiple rooms at once if required.
+        """
+        members = yield self.get_room_members(room_id)
+        for member in members:
+            if ignore_user is not None and member == ignore_user:
+                continue
+
+            if member.is_mine:
+                if localusers is not None:
+                    localusers.add(member)
+            else:
+                if remotedomains is not None:
+                    remotedomains.add(member.domain)
+
+    @defer.inlineCallbacks
+    def get_room_members_as_pagination_chunk(self, room_id=None, user_id=None,
+                                             limit=0, start_tok=None,
+                                             end_tok=None):
+        """Retrieve a list of room members in the room.
+
+        Args:
+            room_id (str): The room to get the member list for.
+            user_id (str): The ID of the user making the request.
+            limit (int): The max number of members to return.
+            start_tok (str): Optional. The start token if known.
+            end_tok (str): Optional. The end token if known.
+        Returns:
+            dict: A Pagination streamable dict.
+        Raises:
+            SynapseError if something goes wrong.
+        """
+        yield self.auth.check_joined_room(room_id, user_id)
+
+        member_list = yield self.store.get_room_members(room_id=room_id)
+        event_list = [
+            entry.as_event(self.event_factory).get_dict()
+            for entry in member_list
+        ]
+        chunk_data = {
+            "start": "START",
+            "end": "END",
+            "chunk": event_list
+        }
+        # TODO honor Pagination stream params
+        # TODO snapshot this list to return on subsequent requests when
+        # paginating
+        defer.returnValue(chunk_data)
+
+    @defer.inlineCallbacks
+    def get_room_member(self, room_id, member_user_id, auth_user_id):
+        """Retrieve a room member from a room.
+
+        Args:
+            room_id : The room the member is in.
+            member_user_id : The member's user ID
+            auth_user_id : The user ID of the user making this request.
+        Returns:
+            The room member, or None if this member does not exist.
+        Raises:
+            SynapseError if something goes wrong.
+        """
+        yield self.auth.check_joined_room(room_id, auth_user_id)
+
+        member = yield self.store.get_room_member(user_id=member_user_id,
+                                                  room_id=room_id)
+        defer.returnValue(member)
+
+    @defer.inlineCallbacks
+    def change_membership(self, event=None, broadcast_msg=False, do_auth=True):
+        """ Change the membership status of a user in a room.
+
+        Args:
+            event (SynapseEvent): The membership event
+            broadcast_msg (bool): True to inject a membership message into this
+                room on success.
+        Raises:
+            SynapseError if there was a problem changing the membership.
+        """
+
+        #broadcast_msg = False
+
+        prev_state = yield self.store.get_room_member(
+            event.target_user_id, event.room_id
+        )
+
+        if prev_state and prev_state.membership == event.membership:
+            # treat this event as a NOOP.
+            if do_auth:  # This is mainly to fix a unit test.
+                yield self.auth.check(event, raises=True)
+            defer.returnValue({})
+            return
+
+        room_id = event.room_id
+
+        # If we're trying to join a room then we have to do this differently
+        # if this HS is not currently in the room, i.e. we have to do the
+        # invite/join dance.
+        if event.membership == Membership.JOIN:
+            yield self._do_join(
+                event, do_auth=do_auth, broadcast_msg=broadcast_msg
+            )
+        else:
+            # This is not a JOIN, so we can handle it normally.
+            if do_auth:
+                yield self.auth.check(event, raises=True)
+
+            prev_state = yield self.store.get_room_member(
+                event.target_user_id, event.room_id
+            )
+            if prev_state and prev_state.membership == event.membership:
+                # double same action, treat this event as a NOOP.
+                defer.returnValue({})
+                return
+
+            yield self.state_handler.handle_new_event(event)
+            yield self._do_local_membership_update(
+                event,
+                membership=event.content["membership"],
+                broadcast_msg=broadcast_msg,
+            )
+
+        defer.returnValue({"room_id": room_id})
+
+    @defer.inlineCallbacks
+    def join_room_alias(self, joinee, room_alias, do_auth=True, content={}):
+        directory_handler = self.hs.get_handlers().directory_handler
+        mapping = yield directory_handler.get_association(room_alias)
+
+        if not mapping:
+            raise SynapseError(404, "No such room alias")
+
+        room_id = mapping["room_id"]
+        hosts = mapping["servers"]
+        if not hosts:
+            raise SynapseError(404, "No known servers")
+
+        host = hosts[0]
+
+        content.update({"membership": Membership.JOIN})
+        new_event = self.event_factory.create_event(
+            etype=RoomMemberEvent.TYPE,
+            target_user_id=joinee.to_string(),
+            room_id=room_id,
+            user_id=joinee.to_string(),
+            membership=Membership.JOIN,
+            content=content,
+        )
+
+        yield self._do_join(new_event, room_host=host, do_auth=True)
+
+        defer.returnValue({"room_id": room_id})
+
+    @defer.inlineCallbacks
+    def _do_join(self, event, room_host=None, do_auth=True, broadcast_msg=True):
+        joinee = self.hs.parse_userid(event.target_user_id)
+        # room_id = RoomID.from_string(event.room_id, self.hs)
+        room_id = event.room_id
+
+        # If event doesn't include a display name, add one.
+        yield self._fill_out_join_content(
+            joinee, event.content
+        )
+
+        # XXX: We don't do an auth check if we are doing an invite
+        # join dance for now, since we're kinda implicitly checking
+        # that we are allowed to join when we decide whether or not we
+        # need to do the invite/join dance.
+
+        room = yield self.store.get_room(room_id)
+
+        if room:
+            should_do_dance = False
+        elif room_host:
+            should_do_dance = True
+        else:
+            prev_state = yield self.store.get_room_member(
+                joinee.to_string(), room_id
+            )
+
+            if prev_state and prev_state.membership == Membership.INVITE:
+                room = yield self.store.get_room(room_id)
+                inviter = UserID.from_string(
+                    prev_state.sender, self.hs
+                )
+
+                should_do_dance = not inviter.is_mine and not room
+                room_host = inviter.domain
+            else:
+                should_do_dance = False
+
+        # We want to do the _do_update inside the room lock.
+        if not should_do_dance:
+            logger.debug("Doing normal join")
+
+            if do_auth:
+                yield self.auth.check(event, raises=True)
+
+            yield self.state_handler.handle_new_event(event)
+            yield self._do_local_membership_update(
+                event,
+                membership=event.content["membership"],
+                broadcast_msg=broadcast_msg,
+            )
+
+
+        if should_do_dance:
+            yield self._do_invite_join_dance(
+                room_id=room_id,
+                joinee=event.user_id,
+                target_host=room_host,
+                content=event.content,
+            )
+
+        user = self.hs.parse_userid(event.user_id)
+        self.distributor.fire(
+            "user_joined_room", user=user, room_id=room_id
+        )
+
+    @defer.inlineCallbacks
+    def _fill_out_join_content(self, user_id, content):
+        # If event doesn't include a display name, add one.
+        profile_handler = self.hs.get_handlers().profile_handler
+        if "displayname" not in content:
+            try:
+                display_name = yield profile_handler.get_displayname(
+                    user_id
+                )
+
+                if display_name:
+                    content["displayname"] = display_name
+            except:
+                logger.exception("Failed to set display_name")
+
+        if "avatar_url" not in content:
+            try:
+                avatar_url = yield profile_handler.get_avatar_url(
+                    user_id
+                )
+
+                if avatar_url:
+                    content["avatar_url"] = avatar_url
+            except:
+                logger.exception("Failed to set display_name")
+
+    @defer.inlineCallbacks
+    def _should_invite_join(self, room_id, prev_state, do_auth):
+        logger.debug("_should_invite_join: room_id: %s", room_id)
+
+        # XXX: We don't do an auth check if we are doing an invite
+        # join dance for now, since we're kinda implicitly checking
+        # that we are allowed to join when we decide whether or not we
+        # need to do the invite/join dance.
+
+        # Only do an invite join dance if a) we were invited,
+        # b) the person inviting was from a differnt HS and c) we are
+        # not currently in the room
+        room_host = None
+        if prev_state and prev_state.membership == Membership.INVITE:
+            room = yield self.store.get_room(room_id)
+            inviter = UserID.from_string(
+                prev_state.sender, self.hs
+            )
+
+            is_remote_invite_join = not inviter.is_mine and not room
+            room_host = inviter.domain
+        else:
+            is_remote_invite_join = False
+
+        defer.returnValue((is_remote_invite_join, room_host))
+
+    @defer.inlineCallbacks
+    def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]):
+        """Returns a list of roomids that the user has any of the given
+        membership states in."""
+        rooms = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id=user.to_string(), membership_list=membership_list
+        )
+
+        defer.returnValue([r["room_id"] for r in rooms])
+
+    @defer.inlineCallbacks
+    def _do_local_membership_update(self, event, membership, broadcast_msg):
+        # store membership
+        store_id = yield self.store.store_room_member(
+            user_id=event.target_user_id,
+            sender=event.user_id,
+            room_id=event.room_id,
+            content=event.content,
+            membership=membership
+        )
+
+        # Send a PDU to all hosts who have joined the room.
+        destinations = yield self.store.get_joined_hosts_for_room(
+            event.room_id
+        )
+
+        # If we're inviting someone, then we should also send it to that
+        # HS.
+        if membership == Membership.INVITE:
+            host = UserID.from_string(
+                event.target_user_id, self.hs
+            ).domain
+            destinations.append(host)
+
+        # If we are joining a remote HS, include that.
+        if membership == Membership.JOIN:
+            host = UserID.from_string(
+                event.target_user_id, self.hs
+            ).domain
+            destinations.append(host)
+
+        event.destinations = list(set(destinations))
+
+        yield self.hs.get_federation().handle_new_event(event)
+        self.notifier.on_new_room_event(event, store_id)
+
+        if broadcast_msg:
+            yield self._inject_membership_msg(
+                source=event.user_id,
+                target=event.target_user_id,
+                room_id=event.room_id,
+                membership=event.content["membership"]
+            )
+
+    @defer.inlineCallbacks
+    def _do_invite_join_dance(self, room_id, joinee, target_host, content):
+        logger.debug("Doing remote join dance")
+
+        # do invite join dance
+        federation = self.hs.get_federation()
+        new_event = self.event_factory.create_event(
+            etype=InviteJoinEvent.TYPE,
+            target_host=target_host,
+            room_id=room_id,
+            user_id=joinee,
+            content=content
+        )
+
+        new_event.destinations = [target_host]
+
+        yield self.store.store_room(
+            room_id, "", is_public=False
+        )
+
+        #yield self.state_handler.handle_new_event(event)
+        yield federation.handle_new_event(new_event)
+        yield federation.get_state_for_room(
+            target_host, room_id
+        )
+
+    @defer.inlineCallbacks
+    def _inject_membership_msg(self, room_id=None, source=None, target=None,
+                               membership=None):
+        # TODO this should be a different type of message, not m.text
+        if membership == Membership.INVITE:
+            body = "%s invited %s to the room." % (source, target)
+        elif membership == Membership.JOIN:
+            body = "%s joined the room." % (target)
+        elif membership == Membership.LEAVE:
+            body = "%s left the room." % (target)
+        else:
+            raise RoomError(500, "Unknown membership value %s" % membership)
+
+        membership_json = {
+            "msgtype": u"m.text",
+            "body": body,
+            "membership_source": source,
+            "membership_target": target,
+            "membership": membership,
+        }
+
+        msg_id = "m%s" % int(self.clock.time_msec())
+
+        event = self.event_factory.create_event(
+            etype=MessageEvent.TYPE,
+            room_id=room_id,
+            user_id="_homeserver_",
+            msg_id=msg_id,
+            content=membership_json
+        )
+
+        handler = self.hs.get_handlers().message_handler
+        yield handler.send_message(event, suppress_auth=True)
+
+
+class RoomListHandler(BaseHandler):
+
+    @defer.inlineCallbacks
+    def get_public_room_list(self):
+        chunk = yield self.store.get_rooms(is_public=True, with_topics=True)
+        defer.returnValue({"start": "START", "end": "END", "chunk": chunk})