summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py12
-rw-r--r--synapse/handlers/_base.py13
-rw-r--r--synapse/handlers/appservice.py69
-rw-r--r--synapse/handlers/auth.py277
-rw-r--r--synapse/handlers/federation.py48
-rw-r--r--synapse/handlers/identity.py119
-rw-r--r--synapse/handlers/login.py63
-rw-r--r--synapse/handlers/message.py31
-rw-r--r--synapse/handlers/presence.py67
-rw-r--r--synapse/handlers/register.py118
-rw-r--r--synapse/handlers/room.py48
-rw-r--r--synapse/handlers/typing.py16
12 files changed, 654 insertions, 227 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 8d345bf936..685792dbdc 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.appservice.scheduler import AppServiceScheduler
 from synapse.appservice.api import ApplicationServiceApi
 from .register import RegistrationHandler
 from .room import (
@@ -29,6 +30,8 @@ from .typing import TypingNotificationHandler
 from .admin import AdminHandler
 from .appservice import ApplicationServicesHandler
 from .sync import SyncHandler
+from .auth import AuthHandler
+from .identity import IdentityHandler
 
 
 class Handlers(object):
@@ -54,7 +57,14 @@ class Handlers(object):
         self.directory_handler = DirectoryHandler(hs)
         self.typing_notification_handler = TypingNotificationHandler(hs)
         self.admin_handler = AdminHandler(hs)
+        asapi = ApplicationServiceApi(hs)
         self.appservice_handler = ApplicationServicesHandler(
-            hs, ApplicationServiceApi(hs)
+            hs, asapi, AppServiceScheduler(
+                clock=hs.get_clock(),
+                store=hs.get_datastore(),
+                as_api=asapi
+            )
         )
         self.sync_handler = SyncHandler(hs)
+        self.auth_handler = AuthHandler(hs)
+        self.identity_handler = IdentityHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 48816a242d..4b3f4eadab 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -16,7 +16,6 @@
 from twisted.internet import defer
 
 from synapse.api.errors import LimitExceededError, SynapseError
-from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.api.constants import Membership, EventTypes
 from synapse.types import UserID
@@ -58,8 +57,6 @@ class BaseHandler(object):
 
     @defer.inlineCallbacks
     def _create_new_client_event(self, builder):
-        yield run_on_reactor()
-
         latest_ret = yield self.store.get_latest_events_in_room(
             builder.room_id,
         )
@@ -101,8 +98,6 @@ class BaseHandler(object):
     @defer.inlineCallbacks
     def handle_new_client_event(self, event, context, extra_destinations=[],
                                 extra_users=[], suppress_auth=False):
-        yield run_on_reactor()
-
         # We now need to go and hit out to wherever we need to hit out to.
 
         if not suppress_auth:
@@ -143,7 +138,9 @@ class BaseHandler(object):
                 )
 
         # Don't block waiting on waking up all the listeners.
-        d = self.notifier.on_new_room_event(event, extra_users=extra_users)
+        notify_d = self.notifier.on_new_room_event(
+            event, extra_users=extra_users
+        )
 
         def log_failure(f):
             logger.warn(
@@ -151,8 +148,8 @@ class BaseHandler(object):
                 event.event_id, f.value
             )
 
-        d.addErrback(log_failure)
+        notify_d.addErrback(log_failure)
 
-        yield federation_handler.handle_new_event(
+        federation_handler.handle_new_event(
             event, destinations=destinations,
         )
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 2c488a46f6..355ab317df 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -16,57 +16,36 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import Codes, StoreError, SynapseError
 from synapse.appservice import ApplicationService
 from synapse.types import UserID
-import synapse.util.stringutils as stringutils
 
 import logging
 
 logger = logging.getLogger(__name__)
 
 
+def log_failure(failure):
+    logger.error(
+        "Application Services Failure",
+        exc_info=(
+            failure.type,
+            failure.value,
+            failure.getTracebackObject()
+        )
+    )
+
+
 # NB: Purposefully not inheriting BaseHandler since that contains way too much
 # setup code which this handler does not need or use. This makes testing a lot
 # easier.
 class ApplicationServicesHandler(object):
 
-    def __init__(self, hs, appservice_api):
+    def __init__(self, hs, appservice_api, appservice_scheduler):
         self.store = hs.get_datastore()
         self.hs = hs
         self.appservice_api = appservice_api
-
-    @defer.inlineCallbacks
-    def register(self, app_service):
-        logger.info("Register -> %s", app_service)
-        # check the token is recognised
-        try:
-            stored_service = yield self.store.get_app_service_by_token(
-                app_service.token
-            )
-            if not stored_service:
-                raise StoreError(404, "Application service not found")
-        except StoreError:
-            raise SynapseError(
-                403, "Unrecognised application services token. "
-                "Consult the home server admin.",
-                errcode=Codes.FORBIDDEN
-            )
-
-        app_service.hs_token = self._generate_hs_token()
-
-        # create a sender for this application service which is used when
-        # creating rooms, etc..
-        account = yield self.hs.get_handlers().registration_handler.register()
-        app_service.sender = account[0]
-
-        yield self.store.update_app_service(app_service)
-        defer.returnValue(app_service)
-
-    @defer.inlineCallbacks
-    def unregister(self, token):
-        logger.info("Unregister as_token=%s", token)
-        yield self.store.unregister_app_service(token)
+        self.scheduler = appservice_scheduler
+        self.started_scheduler = False
 
     @defer.inlineCallbacks
     def notify_interested_services(self, event):
@@ -90,9 +69,13 @@ class ApplicationServicesHandler(object):
         if event.type == EventTypes.Member:
             yield self._check_user_exists(event.state_key)
 
-        # Fork off pushes to these services - XXX First cut, best effort
+        if not self.started_scheduler:
+            self.scheduler.start().addErrback(log_failure)
+            self.started_scheduler = True
+
+        # Fork off pushes to these services
         for service in services:
-            self.appservice_api.push(service, event)
+            self.scheduler.submit_event_for_as(service, event)
 
     @defer.inlineCallbacks
     def query_user_exists(self, user_id):
@@ -197,7 +180,14 @@ class ApplicationServicesHandler(object):
             return
 
         user_info = yield self.store.get_user_by_id(user_id)
-        defer.returnValue(len(user_info) == 0)
+        if not user_info:
+            defer.returnValue(False)
+            return
+
+        # user not found; could be the AS though, so check.
+        services = yield self.store.get_app_services()
+        service_list = [s for s in services if s.sender == user_id]
+        defer.returnValue(len(service_list) == 0)
 
     @defer.inlineCallbacks
     def _check_user_exists(self, user_id):
@@ -206,6 +196,3 @@ class ApplicationServicesHandler(object):
             exists = yield self.query_user_exists(user_id)
             defer.returnValue(exists)
         defer.returnValue(True)
-
-    def _generate_hs_token(self):
-        return stringutils.random_string(24)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
new file mode 100644
index 0000000000..4e2e50345e
--- /dev/null
+++ b/synapse/handlers/auth.py
@@ -0,0 +1,277 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014, 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from ._base import BaseHandler
+from synapse.api.constants import LoginType
+from synapse.types import UserID
+from synapse.api.errors import LoginError, Codes
+from synapse.http.client import SimpleHttpClient
+from synapse.util.async import run_on_reactor
+
+from twisted.web.client import PartialDownloadError
+
+import logging
+import bcrypt
+import simplejson
+
+import synapse.util.stringutils as stringutils
+
+
+logger = logging.getLogger(__name__)
+
+
+class AuthHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(AuthHandler, self).__init__(hs)
+        self.checkers = {
+            LoginType.PASSWORD: self._check_password_auth,
+            LoginType.RECAPTCHA: self._check_recaptcha,
+            LoginType.EMAIL_IDENTITY: self._check_email_identity,
+            LoginType.DUMMY: self._check_dummy_auth,
+        }
+        self.sessions = {}
+
+    @defer.inlineCallbacks
+    def check_auth(self, flows, clientdict, clientip=None):
+        """
+        Takes a dictionary sent by the client in the login / registration
+        protocol and handles the login flow.
+
+        Args:
+            flows: list of list of stages
+            authdict: The dictionary from the client root level, not the
+                      'auth' key: this method prompts for auth if none is sent.
+        Returns:
+            A tuple of authed, dict, dict where authed is true if the client
+            has successfully completed an auth flow. If it is true, the first
+            dict contains the authenticated credentials of each stage.
+
+            If authed is false, the first dictionary is the server response to
+            the login request and should be passed back to the client.
+
+            In either case, the second dict contains the parameters for this
+            request (which may have been given only in a previous call).
+        """
+
+        authdict = None
+        sid = None
+        if clientdict and 'auth' in clientdict:
+            authdict = clientdict['auth']
+            del clientdict['auth']
+            if 'session' in authdict:
+                sid = authdict['session']
+        sess = self._get_session_info(sid)
+
+        if len(clientdict) > 0:
+            # This was designed to allow the client to omit the parameters
+            # and just supply the session in subsequent calls so it split
+            # auth between devices by just sharing the session, (eg. so you
+            # could continue registration from your phone having clicked the
+            # email auth link on there). It's probably too open to abuse
+            # because it lets unauthenticated clients store arbitrary objects
+            # on a home server.
+            # sess['clientdict'] = clientdict
+            # self._save_session(sess)
+            pass
+        elif 'clientdict' in sess:
+            clientdict = sess['clientdict']
+
+        if not authdict:
+            defer.returnValue(
+                (False, self._auth_dict_for_flows(flows, sess), clientdict)
+            )
+
+        if 'creds' not in sess:
+            sess['creds'] = {}
+        creds = sess['creds']
+
+        # check auth type currently being presented
+        if 'type' in authdict:
+            if authdict['type'] not in self.checkers:
+                raise LoginError(400, "", Codes.UNRECOGNIZED)
+            result = yield self.checkers[authdict['type']](authdict, clientip)
+            if result:
+                creds[authdict['type']] = result
+                self._save_session(sess)
+
+        for f in flows:
+            if len(set(f) - set(creds.keys())) == 0:
+                logger.info("Auth completed with creds: %r", creds)
+                self._remove_session(sess)
+                defer.returnValue((True, creds, clientdict))
+
+        ret = self._auth_dict_for_flows(flows, sess)
+        ret['completed'] = creds.keys()
+        defer.returnValue((False, ret, clientdict))
+
+    @defer.inlineCallbacks
+    def add_oob_auth(self, stagetype, authdict, clientip):
+        """
+        Adds the result of out-of-band authentication into an existing auth
+        session. Currently used for adding the result of fallback auth.
+        """
+        if stagetype not in self.checkers:
+            raise LoginError(400, "", Codes.MISSING_PARAM)
+        if 'session' not in authdict:
+            raise LoginError(400, "", Codes.MISSING_PARAM)
+
+        sess = self._get_session_info(
+            authdict['session']
+        )
+        if 'creds' not in sess:
+            sess['creds'] = {}
+        creds = sess['creds']
+
+        result = yield self.checkers[stagetype](authdict, clientip)
+        if result:
+            creds[stagetype] = result
+            self._save_session(sess)
+            defer.returnValue(True)
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def _check_password_auth(self, authdict, _):
+        if "user" not in authdict or "password" not in authdict:
+            raise LoginError(400, "", Codes.MISSING_PARAM)
+
+        user = authdict["user"]
+        password = authdict["password"]
+        if not user.startswith('@'):
+            user = UserID.create(user, self.hs.hostname).to_string()
+
+        user_info = yield self.store.get_user_by_id(user_id=user)
+        if not user_info:
+            logger.warn("Attempted to login as %s but they do not exist", user)
+            raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+        stored_hash = user_info["password_hash"]
+        if bcrypt.checkpw(password, stored_hash):
+            defer.returnValue(user)
+        else:
+            logger.warn("Failed password login for user %s", user)
+            raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+    @defer.inlineCallbacks
+    def _check_recaptcha(self, authdict, clientip):
+        try:
+            user_response = authdict["response"]
+        except KeyError:
+            # Client tried to provide captcha but didn't give the parameter:
+            # bad request.
+            raise LoginError(
+                400, "Captcha response is required",
+                errcode=Codes.CAPTCHA_NEEDED
+            )
+
+        logger.info(
+            "Submitting recaptcha response %s with remoteip %s",
+            user_response, clientip
+        )
+
+        # TODO: get this from the homeserver rather than creating a new one for
+        # each request
+        try:
+            client = SimpleHttpClient(self.hs)
+            data = yield client.post_urlencoded_get_json(
+                "https://www.google.com/recaptcha/api/siteverify",
+                args={
+                    'secret': self.hs.config.recaptcha_private_key,
+                    'response': user_response,
+                    'remoteip': clientip,
+                }
+            )
+        except PartialDownloadError as pde:
+            # Twisted is silly
+            data = pde.response
+        resp_body = simplejson.loads(data)
+        if 'success' in resp_body and resp_body['success']:
+            defer.returnValue(True)
+        raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+    @defer.inlineCallbacks
+    def _check_email_identity(self, authdict, _):
+        yield run_on_reactor()
+
+        if 'threepid_creds' not in authdict:
+            raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
+
+        threepid_creds = authdict['threepid_creds']
+        identity_handler = self.hs.get_handlers().identity_handler
+
+        logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,))
+        threepid = yield identity_handler.threepid_from_creds(threepid_creds)
+
+        if not threepid:
+            raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+
+        threepid['threepid_creds'] = authdict['threepid_creds']
+
+        defer.returnValue(threepid)
+
+    @defer.inlineCallbacks
+    def _check_dummy_auth(self, authdict, _):
+        yield run_on_reactor()
+        defer.returnValue(True)
+
+    def _get_params_recaptcha(self):
+        return {"public_key": self.hs.config.recaptcha_public_key}
+
+    def _auth_dict_for_flows(self, flows, session):
+        public_flows = []
+        for f in flows:
+            public_flows.append(f)
+
+        get_params = {
+            LoginType.RECAPTCHA: self._get_params_recaptcha,
+        }
+
+        params = {}
+
+        for f in public_flows:
+            for stage in f:
+                if stage in get_params and stage not in params:
+                    params[stage] = get_params[stage]()
+
+        return {
+            "session": session['id'],
+            "flows": [{"stages": f} for f in public_flows],
+            "params": params
+        }
+
+    def _get_session_info(self, session_id):
+        if session_id not in self.sessions:
+            session_id = None
+
+        if not session_id:
+            # create a new session
+            while session_id is None or session_id in self.sessions:
+                session_id = stringutils.random_string(24)
+            self.sessions[session_id] = {
+                "id": session_id,
+            }
+
+        return self.sessions[session_id]
+
+    def _save_session(self, session):
+        # TODO: Persistent storage
+        logger.debug("Saving session %s", session)
+        self.sessions[session["id"]] = session
+
+    def _remove_session(self, session):
+        logger.debug("Removing session %s", session)
+        del self.sessions[session["id"]]
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 15ba417e06..85e2757227 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -73,8 +73,6 @@ class FederationHandler(BaseHandler):
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
 
-    @log_function
-    @defer.inlineCallbacks
     def handle_new_event(self, event, destinations):
         """ Takes in an event from the client to server side, that has already
         been authed and handled by the state module, and sends it to any
@@ -89,9 +87,7 @@ class FederationHandler(BaseHandler):
             processing.
         """
 
-        yield run_on_reactor()
-
-        self.replication_layer.send_pdu(event, destinations)
+        return self.replication_layer.send_pdu(event, destinations)
 
     @log_function
     @defer.inlineCallbacks
@@ -179,7 +175,7 @@ class FederationHandler(BaseHandler):
         # it's probably a good idea to mark it as not in retry-state
         # for sending (although this is a bit of a leap)
         retry_timings = yield self.store.get_destination_retry_timings(origin)
-        if (retry_timings and retry_timings.retry_last_ts):
+        if retry_timings and retry_timings["retry_last_ts"]:
             self.store.set_destination_retry_timings(origin, 0, 0)
 
         room = yield self.store.get_room(event.room_id)
@@ -201,10 +197,18 @@ class FederationHandler(BaseHandler):
                 target_user = UserID.from_string(target_user_id)
                 extra_users.append(target_user)
 
-            yield self.notifier.on_new_room_event(
+            d = self.notifier.on_new_room_event(
                 event, extra_users=extra_users
             )
 
+            def log_failure(f):
+                logger.warn(
+                    "Failed to notify about %s: %s",
+                    event.event_id, f.value
+                )
+
+            d.addErrback(log_failure)
+
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
                 user = UserID.from_string(event.state_key)
@@ -427,10 +431,18 @@ class FederationHandler(BaseHandler):
                 auth_events=auth_events,
             )
 
-            yield self.notifier.on_new_room_event(
+            d = self.notifier.on_new_room_event(
                 new_event, extra_users=[joinee]
             )
 
+            def log_failure(f):
+                logger.warn(
+                    "Failed to notify about %s: %s",
+                    new_event.event_id, f.value
+                )
+
+            d.addErrback(log_failure)
+
             logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
@@ -500,10 +512,18 @@ class FederationHandler(BaseHandler):
             target_user = UserID.from_string(target_user_id)
             extra_users.append(target_user)
 
-        yield self.notifier.on_new_room_event(
+        d = self.notifier.on_new_room_event(
             event, extra_users=extra_users
         )
 
+        def log_failure(f):
+            logger.warn(
+                "Failed to notify about %s: %s",
+                event.event_id, f.value
+            )
+
+        d.addErrback(log_failure)
+
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
                 user = UserID.from_string(event.state_key)
@@ -574,10 +594,18 @@ class FederationHandler(BaseHandler):
         )
 
         target_user = UserID.from_string(event.state_key)
-        yield self.notifier.on_new_room_event(
+        d = self.notifier.on_new_room_event(
             event, extra_users=[target_user],
         )
 
+        def log_failure(f):
+            logger.warn(
+                "Failed to notify about %s: %s",
+                event.event_id, f.value
+            )
+
+        d.addErrback(log_failure)
+
         defer.returnValue(event)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
new file mode 100644
index 0000000000..6200e10775
--- /dev/null
+++ b/synapse/handlers/identity.py
@@ -0,0 +1,119 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Utilities for interacting with Identity Servers"""
+from twisted.internet import defer
+
+from synapse.api.errors import (
+    CodeMessageException
+)
+from ._base import BaseHandler
+from synapse.http.client import SimpleHttpClient
+from synapse.util.async import run_on_reactor
+from synapse.api.errors import SynapseError
+
+import json
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class IdentityHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(IdentityHandler, self).__init__(hs)
+
+    @defer.inlineCallbacks
+    def threepid_from_creds(self, creds):
+        yield run_on_reactor()
+
+        # TODO: get this from the homeserver rather than creating a new one for
+        # each request
+        http_client = SimpleHttpClient(self.hs)
+        # XXX: make this configurable!
+        # trustedIdServers = ['matrix.org', 'localhost:8090']
+        trustedIdServers = ['matrix.org']
+
+        if 'id_server' in creds:
+            id_server = creds['id_server']
+        elif 'idServer' in creds:
+            id_server = creds['idServer']
+        else:
+            raise SynapseError(400, "No id_server in creds")
+
+        if 'client_secret' in creds:
+            client_secret = creds['client_secret']
+        elif 'clientSecret' in creds:
+            client_secret = creds['clientSecret']
+        else:
+            raise SynapseError(400, "No client_secret in creds")
+
+        if id_server not in trustedIdServers:
+            logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
+                        'credentials', id_server)
+            defer.returnValue(None)
+
+        data = {}
+        try:
+            data = yield http_client.get_json(
+                "https://%s%s" % (
+                    id_server,
+                    "/_matrix/identity/api/v1/3pid/getValidated3pid"
+                ),
+                {'sid': creds['sid'], 'client_secret': client_secret}
+            )
+        except CodeMessageException as e:
+            data = json.loads(e.msg)
+
+        if 'medium' in data:
+            defer.returnValue(data)
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def bind_threepid(self, creds, mxid):
+        yield run_on_reactor()
+        logger.debug("binding threepid %r to %s", creds, mxid)
+        http_client = SimpleHttpClient(self.hs)
+        data = None
+
+        if 'id_server' in creds:
+            id_server = creds['id_server']
+        elif 'idServer' in creds:
+            id_server = creds['idServer']
+        else:
+            raise SynapseError(400, "No id_server in creds")
+
+        if 'client_secret' in creds:
+            client_secret = creds['client_secret']
+        elif 'clientSecret' in creds:
+            client_secret = creds['clientSecret']
+        else:
+            raise SynapseError(400, "No client_secret in creds")
+
+        try:
+            data = yield http_client.post_urlencoded_get_json(
+                "https://%s%s" % (
+                    id_server, "/_matrix/identity/api/v1/3pid/bind"
+                ),
+                {
+                    'sid': creds['sid'],
+                    'client_secret': client_secret,
+                    'mxid': mxid,
+                }
+            )
+            logger.debug("bound threepid %r to %s", creds, mxid)
+        except CodeMessageException as e:
+            data = json.loads(e.msg)
+        defer.returnValue(data)
diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py
index 7447800460..91d87d503d 100644
--- a/synapse/handlers/login.py
+++ b/synapse/handlers/login.py
@@ -16,13 +16,9 @@
 from twisted.internet import defer
 
 from ._base import BaseHandler
-from synapse.api.errors import LoginError, Codes, CodeMessageException
-from synapse.http.client import SimpleHttpClient
-from synapse.util.emailutils import EmailException
-import synapse.util.emailutils as emailutils
+from synapse.api.errors import LoginError, Codes
 
 import bcrypt
-import json
 import logging
 
 logger = logging.getLogger(__name__)
@@ -57,7 +53,7 @@ class LoginHandler(BaseHandler):
             logger.warn("Attempted to login as %s but they do not exist", user)
             raise LoginError(403, "", errcode=Codes.FORBIDDEN)
 
-        stored_hash = user_info[0]["password_hash"]
+        stored_hash = user_info["password_hash"]
         if bcrypt.checkpw(password, stored_hash):
             # generate an access token and store it.
             token = self.reg_handler._generate_token(user)
@@ -69,48 +65,19 @@ class LoginHandler(BaseHandler):
             raise LoginError(403, "", errcode=Codes.FORBIDDEN)
 
     @defer.inlineCallbacks
-    def reset_password(self, user_id, email):
-        is_valid = yield self._check_valid_association(user_id, email)
-        logger.info("reset_password user=%s email=%s valid=%s", user_id, email,
-                    is_valid)
-        if is_valid:
-            try:
-                # send an email out
-                emailutils.send_email(
-                    smtp_server=self.hs.config.email_smtp_server,
-                    from_addr=self.hs.config.email_from_address,
-                    to_addr=email,
-                    subject="Password Reset",
-                    body="TODO."
-                )
-            except EmailException as e:
-                logger.exception(e)
+    def set_password(self, user_id, newpassword, token_id=None):
+        password_hash = bcrypt.hashpw(newpassword, bcrypt.gensalt())
 
-    @defer.inlineCallbacks
-    def _check_valid_association(self, user_id, email):
-        identity = yield self._query_email(email)
-        if identity and "mxid" in identity:
-            if identity["mxid"] == user_id:
-                defer.returnValue(True)
-                return
-        defer.returnValue(False)
+        yield self.store.user_set_password_hash(user_id, password_hash)
+        yield self.store.user_delete_access_tokens_apart_from(user_id, token_id)
+        yield self.hs.get_pusherpool().remove_pushers_by_user_access_token(
+            user_id, token_id
+        )
+        yield self.store.flush_user(user_id)
 
     @defer.inlineCallbacks
-    def _query_email(self, email):
-        http_client = SimpleHttpClient(self.hs)
-        try:
-            data = yield http_client.get_json(
-                # TODO FIXME This should be configurable.
-                # XXX: ID servers need to use HTTPS
-                "http://%s%s" % (
-                    "matrix.org:8090", "/_matrix/identity/api/v1/lookup"
-                ),
-                {
-                    'medium': 'email',
-                    'address': email
-                }
-            )
-            defer.returnValue(data)
-        except CodeMessageException as e:
-            data = json.loads(e.msg)
-            defer.returnValue(data)
+    def add_threepid(self, user_id, medium, address, validated_at):
+        yield self.store.user_add_threepid(
+            user_id, medium, address, validated_at,
+            self.hs.get_clock().time_msec()
+        )
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7b9685be7f..22e19af17f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -267,14 +267,14 @@ class MessageHandler(BaseHandler):
             user, pagination_config.get_source_config("presence"), None
         )
 
-        public_rooms = yield self.store.get_rooms(is_public=True)
-        public_room_ids = [r["room_id"] for r in public_rooms]
+        public_room_ids = yield self.store.get_public_room_ids()
 
         limit = pagin_config.limit
         if limit is None:
             limit = 10
 
-        for event in room_list:
+        @defer.inlineCallbacks
+        def handle_room(event):
             d = {
                 "room_id": event.room_id,
                 "membership": event.membership,
@@ -290,12 +290,19 @@ class MessageHandler(BaseHandler):
             rooms_ret.append(d)
 
             if event.membership != Membership.JOIN:
-                continue
+                return
             try:
-                messages, token = yield self.store.get_recent_events_for_room(
-                    event.room_id,
-                    limit=limit,
-                    end_token=now_token.room_key,
+                (messages, token), current_state = yield defer.gatherResults(
+                    [
+                        self.store.get_recent_events_for_room(
+                            event.room_id,
+                            limit=limit,
+                            end_token=now_token.room_key,
+                        ),
+                        self.state_handler.get_current_state(
+                            event.room_id
+                        ),
+                    ]
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -311,9 +318,6 @@ class MessageHandler(BaseHandler):
                     "end": end_token.to_string(),
                 }
 
-                current_state = yield self.state_handler.get_current_state(
-                    event.room_id
-                )
                 d["state"] = [
                     serialize_event(c, time_now, as_client_event)
                     for c in current_state.values()
@@ -321,6 +325,11 @@ class MessageHandler(BaseHandler):
             except:
                 logger.exception("Failed to get snapshot")
 
+        yield defer.gatherResults(
+            [handle_room(e) for e in room_list],
+            consumeErrors=True
+        )
+
         ret = {
             "rooms": rooms_ret,
             "presence": presence,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 731df00648..9e15610401 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,6 +33,13 @@ logger = logging.getLogger(__name__)
 metrics = synapse.metrics.get_metrics_for(__name__)
 
 
+# Don't bother bumping "last active" time if it differs by less than 60 seconds
+LAST_ACTIVE_GRANULARITY = 60*1000
+
+# Keep no more than this number of offline serial revisions
+MAX_OFFLINE_SERIALS = 1000
+
+
 # TODO(paul): Maybe there's one of these I can steal from somewhere
 def partition(l, func):
     """Partition the list by the result of func applied to each element."""
@@ -131,6 +138,9 @@ class PresenceHandler(BaseHandler):
         self._remote_sendmap = {}
         # map remote users to sets of local users who're interested in them
         self._remote_recvmap = {}
+        # list of (serial, set of(userids)) tuples, ordered by serial, latest
+        # first
+        self._remote_offline_serials = []
 
         # map any user to a UserPresenceCache
         self._user_cachemap = {}
@@ -282,6 +292,10 @@ class PresenceHandler(BaseHandler):
         if now is None:
             now = self.clock.time_msec()
 
+        prev_state = self._get_or_make_usercache(user)
+        if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY:
+            return
+
         self.changed_presencelike_data(user, {"last_active": now})
 
     def changed_presencelike_data(self, user, state):
@@ -706,8 +720,24 @@ class PresenceHandler(BaseHandler):
                 statuscache=statuscache,
             )
 
+            user_id = user.to_string()
+
             if state["presence"] == PresenceState.OFFLINE:
+                self._remote_offline_serials.insert(
+                    0,
+                    (self._user_cachemap_latest_serial, set([user_id]))
+                )
+                while len(self._remote_offline_serials) > MAX_OFFLINE_SERIALS:
+                    self._remote_offline_serials.pop()  # remove the oldest
                 del self._user_cachemap[user]
+            else:
+                # Remove the user from remote_offline_serials now that they're
+                # no longer offline
+                for idx, elem in enumerate(self._remote_offline_serials):
+                    (_, user_ids) = elem
+                    user_ids.discard(user_id)
+                    if not user_ids:
+                        self._remote_offline_serials.pop(idx)
 
         for poll in content.get("poll", []):
             user = UserID.from_string(poll)
@@ -829,26 +859,47 @@ class PresenceEventSource(object):
         presence = self.hs.get_handlers().presence_handler
         cachemap = presence._user_cachemap
 
+        max_serial = presence._user_cachemap_latest_serial
+
+        clock = self.clock
+        latest_serial = 0
+
         updates = []
         # TODO(paul): use a DeferredList ? How to limit concurrency.
         for observed_user in cachemap.keys():
             cached = cachemap[observed_user]
 
-            if cached.serial <= from_key:
+            if cached.serial <= from_key or cached.serial > max_serial:
                 continue
 
-            if (yield self.is_visible(observer_user, observed_user)):
-                updates.append((observed_user, cached))
+            if not (yield self.is_visible(observer_user, observed_user)):
+                continue
+
+            latest_serial = max(cached.serial, latest_serial)
+            updates.append(cached.make_event(user=observed_user, clock=clock))
 
         # TODO(paul): limit
 
-        if updates:
-            clock = self.clock
+        for serial, user_ids in presence._remote_offline_serials:
+            if serial <= from_key:
+                break
 
-            latest_serial = max([x[1].serial for x in updates])
-            data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
+            if serial > max_serial:
+                continue
 
-            defer.returnValue((data, latest_serial))
+            latest_serial = max(latest_serial, serial)
+            for u in user_ids:
+                updates.append({
+                    "type": "m.presence",
+                    "content": {"user_id": u, "presence": PresenceState.OFFLINE},
+                })
+        # TODO(paul): For the v2 API we want to tell the client their from_key
+        #   is too old if we fell off the end of the _remote_offline_serials
+        #   list, and get them to invalidate+resync. In v1 we have no such
+        #   concept so this is a best-effort result.
+
+        if updates:
+            defer.returnValue((updates, latest_serial))
         else:
             defer.returnValue(([], presence._user_cachemap_latest_serial))
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c25e321099..7b68585a17 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -18,18 +18,15 @@ from twisted.internet import defer
 
 from synapse.types import UserID
 from synapse.api.errors import (
-    AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError,
-    CodeMessageException
+    AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
 )
 from ._base import BaseHandler
 import synapse.util.stringutils as stringutils
 from synapse.util.async import run_on_reactor
-from synapse.http.client import SimpleHttpClient
 from synapse.http.client import CaptchaServerHttpClient
 
 import base64
 import bcrypt
-import json
 import logging
 import urllib
 
@@ -45,6 +42,30 @@ class RegistrationHandler(BaseHandler):
         self.distributor.declare("registered_user")
 
     @defer.inlineCallbacks
+    def check_username(self, localpart):
+        yield run_on_reactor()
+
+        if urllib.quote(localpart) != localpart:
+            raise SynapseError(
+                400,
+                "User ID must only contain characters which do not"
+                " require URL encoding."
+            )
+
+        user = UserID(localpart, self.hs.hostname)
+        user_id = user.to_string()
+
+        yield self.check_user_id_is_valid(user_id)
+
+        u = yield self.store.get_user_by_id(user_id)
+        if u:
+            raise SynapseError(
+                400,
+                "User ID already taken.",
+                errcode=Codes.USER_IN_USE,
+            )
+
+    @defer.inlineCallbacks
     def register(self, localpart=None, password=None):
         """Registers a new client on the server.
 
@@ -64,18 +85,11 @@ class RegistrationHandler(BaseHandler):
             password_hash = bcrypt.hashpw(password, bcrypt.gensalt())
 
         if localpart:
-            if localpart and urllib.quote(localpart) != localpart:
-                raise SynapseError(
-                    400,
-                    "User ID must only contain characters which do not"
-                    " require URL encoding."
-                )
+            yield self.check_username(localpart)
 
             user = UserID(localpart, self.hs.hostname)
             user_id = user.to_string()
 
-            yield self.check_user_id_is_valid(user_id)
-
             token = self._generate_token(user_id)
             yield self.store.register(
                 user_id=user_id,
@@ -157,7 +171,11 @@ class RegistrationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def check_recaptcha(self, ip, private_key, challenge, response):
-        """Checks a recaptcha is correct."""
+        """
+        Checks a recaptcha is correct.
+
+        Used only by c/s api v1
+        """
 
         captcha_response = yield self._validate_captcha(
             ip,
@@ -176,13 +194,18 @@ class RegistrationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def register_email(self, threepidCreds):
-        """Registers emails with an identity server."""
+        """
+        Registers emails with an identity server.
+
+        Used only by c/s api v1
+        """
 
         for c in threepidCreds:
             logger.info("validating theeepidcred sid %s on id server %s",
                         c['sid'], c['idServer'])
             try:
-                threepid = yield self._threepid_from_creds(c)
+                identity_handler = self.hs.get_handlers().identity_handler
+                threepid = yield identity_handler.threepid_from_creds(c)
             except:
                 logger.exception("Couldn't validate 3pid")
                 raise RegistrationError(400, "Couldn't validate 3pid")
@@ -194,12 +217,16 @@ class RegistrationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def bind_emails(self, user_id, threepidCreds):
-        """Links emails with a user ID and informs an identity server."""
+        """Links emails with a user ID and informs an identity server.
+
+        Used only by c/s api v1
+        """
 
         # Now we have a matrix ID, bind it to the threepids we were given
         for c in threepidCreds:
+            identity_handler = self.hs.get_handlers().identity_handler
             # XXX: This should be a deferred list, shouldn't it?
-            yield self._bind_threepid(c, user_id)
+            yield identity_handler.bind_threepid(c, user_id)
 
     @defer.inlineCallbacks
     def check_user_id_is_valid(self, user_id):
@@ -227,61 +254,11 @@ class RegistrationHandler(BaseHandler):
         return "-" + stringutils.random_string(18)
 
     @defer.inlineCallbacks
-    def _threepid_from_creds(self, creds):
-        # TODO: get this from the homeserver rather than creating a new one for
-        # each request
-        http_client = SimpleHttpClient(self.hs)
-        # XXX: make this configurable!
-        trustedIdServers = ['matrix.org:8090', 'matrix.org']
-        if not creds['idServer'] in trustedIdServers:
-            logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
-                        'credentials', creds['idServer'])
-            defer.returnValue(None)
-
-        data = {}
-        try:
-            data = yield http_client.get_json(
-                # XXX: This should be HTTPS
-                "http://%s%s" % (
-                    creds['idServer'],
-                    "/_matrix/identity/api/v1/3pid/getValidated3pid"
-                ),
-                {'sid': creds['sid'], 'clientSecret': creds['clientSecret']}
-            )
-        except CodeMessageException as e:
-            data = json.loads(e.msg)
-
-        if 'medium' in data:
-            defer.returnValue(data)
-        defer.returnValue(None)
-
-    @defer.inlineCallbacks
-    def _bind_threepid(self, creds, mxid):
-        yield
-        logger.debug("binding threepid")
-        http_client = SimpleHttpClient(self.hs)
-        data = None
-        try:
-            data = yield http_client.post_urlencoded_get_json(
-                # XXX: Change when ID servers are all HTTPS
-                "http://%s%s" % (
-                    creds['idServer'], "/_matrix/identity/api/v1/3pid/bind"
-                ),
-                {
-                    'sid': creds['sid'],
-                    'clientSecret': creds['clientSecret'],
-                    'mxid': mxid,
-                }
-            )
-            logger.debug("bound threepid")
-        except CodeMessageException as e:
-            data = json.loads(e.msg)
-        defer.returnValue(data)
-
-    @defer.inlineCallbacks
     def _validate_captcha(self, ip_addr, private_key, challenge, response):
         """Validates the captcha provided.
 
+        Used only by c/s api v1
+
         Returns:
             dict: Containing 'valid'(bool) and 'error_url'(str) if invalid.
 
@@ -299,6 +276,9 @@ class RegistrationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _submit_captcha(self, ip_addr, private_key, challenge, response):
+        """
+        Used only by c/s api v1
+        """
         # TODO: get this from the homeserver rather than creating a new one for
         # each request
         client = CaptchaServerHttpClient(self.hs)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 823affc380..cfa2e38ed2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -124,7 +124,7 @@ class RoomCreationHandler(BaseHandler):
         msg_handler = self.hs.get_handlers().message_handler
 
         for event in creation_events:
-            yield msg_handler.create_and_send_event(event)
+            yield msg_handler.create_and_send_event(event, ratelimit=False)
 
         if "name" in config:
             name = config["name"]
@@ -134,7 +134,7 @@ class RoomCreationHandler(BaseHandler):
                 "sender": user_id,
                 "state_key": "",
                 "content": {"name": name},
-            })
+            }, ratelimit=False)
 
         if "topic" in config:
             topic = config["topic"]
@@ -144,7 +144,7 @@ class RoomCreationHandler(BaseHandler):
                 "sender": user_id,
                 "state_key": "",
                 "content": {"topic": topic},
-            })
+            }, ratelimit=False)
 
         for invitee in invite_list:
             yield msg_handler.create_and_send_event({
@@ -153,7 +153,7 @@ class RoomCreationHandler(BaseHandler):
                 "room_id": room_id,
                 "sender": user_id,
                 "content": {"membership": Membership.INVITE},
-            })
+            }, ratelimit=False)
 
         result = {"room_id": room_id}
 
@@ -213,7 +213,8 @@ class RoomCreationHandler(BaseHandler):
                 "state_default": 50,
                 "ban": 50,
                 "kick": 50,
-                "redact": 50
+                "redact": 50,
+                "invite": 0,
             },
         )
 
@@ -311,25 +312,6 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue(chunk_data)
 
     @defer.inlineCallbacks
-    def get_room_member(self, room_id, member_user_id, auth_user_id):
-        """Retrieve a room member from a room.
-
-        Args:
-            room_id : The room the member is in.
-            member_user_id : The member's user ID
-            auth_user_id : The user ID of the user making this request.
-        Returns:
-            The room member, or None if this member does not exist.
-        Raises:
-            SynapseError if something goes wrong.
-        """
-        yield self.auth.check_joined_room(room_id, auth_user_id)
-
-        member = yield self.store.get_room_member(user_id=member_user_id,
-                                                  room_id=room_id)
-        defer.returnValue(member)
-
-    @defer.inlineCallbacks
     def change_membership(self, event, context, do_auth=True):
         """ Change the membership status of a user in a room.
 
@@ -547,11 +529,19 @@ class RoomListHandler(BaseHandler):
     @defer.inlineCallbacks
     def get_public_room_list(self):
         chunk = yield self.store.get_rooms(is_public=True)
-        for room in chunk:
-            joined_users = yield self.store.get_users_in_room(
-                room_id=room["room_id"],
-            )
-            room["num_joined_members"] = len(joined_users)
+        results = yield defer.gatherResults(
+            [
+                self.store.get_users_in_room(
+                    room_id=room["room_id"],
+                )
+                for room in chunk
+            ],
+            consumeErrors=True,
+        )
+
+        for i, room in enumerate(chunk):
+            room["num_joined_members"] = len(results[i])
+
         # FIXME (erikj): START is no longer a valid value
         defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c2762f92c7..c0b2bd7db0 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -223,6 +223,7 @@ class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
         self._handler = None
+        self._room_member_handler = None
 
     def handler(self):
         # Avoid cyclic dependency in handler setup
@@ -230,6 +231,11 @@ class TypingNotificationEventSource(object):
             self._handler = self.hs.get_handlers().typing_notification_handler
         return self._handler
 
+    def room_member_handler(self):
+        if not self._room_member_handler:
+            self._room_member_handler = self.hs.get_handlers().room_member_handler
+        return self._room_member_handler
+
     def _make_event_for(self, room_id):
         typing = self.handler()._room_typing[room_id]
         return {
@@ -240,19 +246,25 @@ class TypingNotificationEventSource(object):
             },
         }
 
+    @defer.inlineCallbacks
     def get_new_events_for_user(self, user, from_key, limit):
         from_key = int(from_key)
         handler = self.handler()
 
+        joined_room_ids = (
+            yield self.room_member_handler().get_joined_rooms_for_user(user)
+        )
+
         events = []
         for room_id in handler._room_serials:
+            if room_id not in joined_room_ids:
+                continue
             if handler._room_serials[room_id] <= from_key:
                 continue
 
-            # TODO: check if user is in room
             events.append(self._make_event_for(room_id))
 
-        return (events, handler._latest_room_serial)
+        defer.returnValue((events, handler._latest_room_serial))
 
     def get_current_key(self):
         return self.handler()._latest_room_serial