summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-03-09 13:29:41 +0000
committerErik Johnston <erik@matrix.org>2015-03-09 13:29:41 +0000
commitf31e65ca8b3a056b81c9ee1c8e5be298e36ed495 (patch)
tree414d6a488f090cea0aff41ef8ca7346f47567a62 /synapse/handlers
parentMerge branch 'develop' of github.com:matrix-org/synapse into erikj-perf (diff)
parentD'oh: underscore, not hyphen (diff)
downloadsynapse-f31e65ca8b3a056b81c9ee1c8e5be298e36ed495.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj-perf
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py7
-rw-r--r--synapse/handlers/_base.py5
-rw-r--r--synapse/handlers/appservice.py211
-rw-r--r--synapse/handlers/directory.py122
-rw-r--r--synapse/handlers/events.py86
-rw-r--r--synapse/handlers/federation.py637
-rw-r--r--synapse/handlers/login.py33
-rw-r--r--synapse/handlers/message.py77
-rw-r--r--synapse/handlers/presence.py48
-rw-r--r--synapse/handlers/profile.py23
-rw-r--r--synapse/handlers/register.py118
-rw-r--r--synapse/handlers/room.py71
-rw-r--r--synapse/handlers/sync.py439
-rw-r--r--synapse/handlers/typing.py5
14 files changed, 1580 insertions, 302 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index fe071a4bc2..8d345bf936 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.appservice.api import ApplicationServiceApi
 from .register import RegistrationHandler
 from .room import (
     RoomCreationHandler, RoomMemberHandler, RoomListHandler
@@ -26,6 +27,8 @@ from .presence import PresenceHandler
 from .directory import DirectoryHandler
 from .typing import TypingNotificationHandler
 from .admin import AdminHandler
+from .appservice import ApplicationServicesHandler
+from .sync import SyncHandler
 
 
 class Handlers(object):
@@ -51,3 +54,7 @@ class Handlers(object):
         self.directory_handler = DirectoryHandler(hs)
         self.typing_notification_handler = TypingNotificationHandler(hs)
         self.admin_handler = AdminHandler(hs)
+        self.appservice_handler = ApplicationServicesHandler(
+            hs, ApplicationServiceApi(hs)
+        )
+        self.sync_handler = SyncHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index f33d17a31e..1773fa20aa 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -19,6 +19,7 @@ from synapse.api.errors import LimitExceededError, SynapseError
 from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.api.constants import Membership, EventTypes
+from synapse.types import UserID
 
 import logging
 
@@ -113,7 +114,7 @@ class BaseHandler(object):
 
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.INVITE:
-                invitee = self.hs.parse_userid(event.state_key)
+                invitee = UserID.from_string(event.state_key)
                 if not self.hs.is_mine(invitee):
                     # TODO: Can we add signature from remote server in a nicer
                     # way? If we have been invited by a remote server, we need
@@ -134,7 +135,7 @@ class BaseHandler(object):
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
                         destinations.add(
-                            self.hs.parse_userid(s.state_key).domain
+                            UserID.from_string(s.state_key).domain
                         )
             except SynapseError:
                 logger.warn(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
new file mode 100644
index 0000000000..2c488a46f6
--- /dev/null
+++ b/synapse/handlers/appservice.py
@@ -0,0 +1,211 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import Codes, StoreError, SynapseError
+from synapse.appservice import ApplicationService
+from synapse.types import UserID
+import synapse.util.stringutils as stringutils
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+# NB: Purposefully not inheriting BaseHandler since that contains way too much
+# setup code which this handler does not need or use. This makes testing a lot
+# easier.
+class ApplicationServicesHandler(object):
+
+    def __init__(self, hs, appservice_api):
+        self.store = hs.get_datastore()
+        self.hs = hs
+        self.appservice_api = appservice_api
+
+    @defer.inlineCallbacks
+    def register(self, app_service):
+        logger.info("Register -> %s", app_service)
+        # check the token is recognised
+        try:
+            stored_service = yield self.store.get_app_service_by_token(
+                app_service.token
+            )
+            if not stored_service:
+                raise StoreError(404, "Application service not found")
+        except StoreError:
+            raise SynapseError(
+                403, "Unrecognised application services token. "
+                "Consult the home server admin.",
+                errcode=Codes.FORBIDDEN
+            )
+
+        app_service.hs_token = self._generate_hs_token()
+
+        # create a sender for this application service which is used when
+        # creating rooms, etc..
+        account = yield self.hs.get_handlers().registration_handler.register()
+        app_service.sender = account[0]
+
+        yield self.store.update_app_service(app_service)
+        defer.returnValue(app_service)
+
+    @defer.inlineCallbacks
+    def unregister(self, token):
+        logger.info("Unregister as_token=%s", token)
+        yield self.store.unregister_app_service(token)
+
+    @defer.inlineCallbacks
+    def notify_interested_services(self, event):
+        """Notifies (pushes) all application services interested in this event.
+
+        Pushing is done asynchronously, so this method won't block for any
+        prolonged length of time.
+
+        Args:
+            event(Event): The event to push out to interested services.
+        """
+        # Gather interested services
+        services = yield self._get_services_for_event(event)
+        if len(services) == 0:
+            return  # no services need notifying
+
+        # Do we know this user exists? If not, poke the user query API for
+        # all services which match that user regex. This needs to block as these
+        # user queries need to be made BEFORE pushing the event.
+        yield self._check_user_exists(event.sender)
+        if event.type == EventTypes.Member:
+            yield self._check_user_exists(event.state_key)
+
+        # Fork off pushes to these services - XXX First cut, best effort
+        for service in services:
+            self.appservice_api.push(service, event)
+
+    @defer.inlineCallbacks
+    def query_user_exists(self, user_id):
+        """Check if any application service knows this user_id exists.
+
+        Args:
+            user_id(str): The user to query if they exist on any AS.
+        Returns:
+            True if this user exists on at least one application service.
+        """
+        user_query_services = yield self._get_services_for_user(
+            user_id=user_id
+        )
+        for user_service in user_query_services:
+            is_known_user = yield self.appservice_api.query_user(
+                user_service, user_id
+            )
+            if is_known_user:
+                defer.returnValue(True)
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def query_room_alias_exists(self, room_alias):
+        """Check if an application service knows this room alias exists.
+
+        Args:
+            room_alias(RoomAlias): The room alias to query.
+        Returns:
+            namedtuple: with keys "room_id" and "servers" or None if no
+            association can be found.
+        """
+        room_alias_str = room_alias.to_string()
+        alias_query_services = yield self._get_services_for_event(
+            event=None,
+            restrict_to=ApplicationService.NS_ALIASES,
+            alias_list=[room_alias_str]
+        )
+        for alias_service in alias_query_services:
+            is_known_alias = yield self.appservice_api.query_alias(
+                alias_service, room_alias_str
+            )
+            if is_known_alias:
+                # the alias exists now so don't query more ASes.
+                result = yield self.store.get_association_from_room_alias(
+                    room_alias
+                )
+                defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def _get_services_for_event(self, event, restrict_to="", alias_list=None):
+        """Retrieve a list of application services interested in this event.
+
+        Args:
+            event(Event): The event to check. Can be None if alias_list is not.
+            restrict_to(str): The namespace to restrict regex tests to.
+            alias_list: A list of aliases to get services for. If None, this
+            list is obtained from the database.
+        Returns:
+            list<ApplicationService>: A list of services interested in this
+            event based on the service regex.
+        """
+        member_list = None
+        if hasattr(event, "room_id"):
+            # We need to know the aliases associated with this event.room_id,
+            # if any.
+            if not alias_list:
+                alias_list = yield self.store.get_aliases_for_room(
+                    event.room_id
+                )
+            # We need to know the members associated with this event.room_id,
+            # if any.
+            member_list = yield self.store.get_room_members(
+                room_id=event.room_id,
+                membership=Membership.JOIN
+            )
+
+        services = yield self.store.get_app_services()
+        interested_list = [
+            s for s in services if (
+                s.is_interested(event, restrict_to, alias_list, member_list)
+            )
+        ]
+        defer.returnValue(interested_list)
+
+    @defer.inlineCallbacks
+    def _get_services_for_user(self, user_id):
+        services = yield self.store.get_app_services()
+        interested_list = [
+            s for s in services if (
+                s.is_interested_in_user(user_id)
+            )
+        ]
+        defer.returnValue(interested_list)
+
+    @defer.inlineCallbacks
+    def _is_unknown_user(self, user_id):
+        user = UserID.from_string(user_id)
+        if not self.hs.is_mine(user):
+            # we don't know if they are unknown or not since it isn't one of our
+            # users. We can't poke ASes.
+            defer.returnValue(False)
+            return
+
+        user_info = yield self.store.get_user_by_id(user_id)
+        defer.returnValue(len(user_info) == 0)
+
+    @defer.inlineCallbacks
+    def _check_user_exists(self, user_id):
+        unknown_user = yield self._is_unknown_user(user_id)
+        if unknown_user:
+            exists = yield self.query_user_exists(user_id)
+            defer.returnValue(exists)
+        defer.returnValue(True)
+
+    def _generate_hs_token(self):
+        return stringutils.random_string(24)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 91fceda2ac..f76febee8f 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -19,6 +19,7 @@ from ._base import BaseHandler
 
 from synapse.api.errors import SynapseError, Codes, CodeMessageException
 from synapse.api.constants import EventTypes
+from synapse.types import RoomAlias
 
 import logging
 
@@ -36,18 +37,15 @@ class DirectoryHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def create_association(self, user_id, room_alias, room_id, servers=None):
-
-        # TODO(erikj): Do auth.
+    def _create_association(self, room_alias, room_id, servers=None):
+        # general association creation for both human users and app services
 
         if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room alias must be local")
             # TODO(erikj): Change this.
 
         # TODO(erikj): Add transactions.
-
         # TODO(erikj): Check if there is a current association.
-
         if not servers:
             servers = yield self.store.get_joined_hosts_for_room(room_id)
 
@@ -61,22 +59,77 @@ class DirectoryHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
+    def create_association(self, user_id, room_alias, room_id, servers=None):
+        # association creation for human users
+        # TODO(erikj): Do user auth.
+
+        can_create = yield self.can_modify_alias(
+            room_alias,
+            user_id=user_id
+        )
+        if not can_create:
+            raise SynapseError(
+                400, "This alias is reserved by an application service.",
+                errcode=Codes.EXCLUSIVE
+            )
+        yield self._create_association(room_alias, room_id, servers)
+
+    @defer.inlineCallbacks
+    def create_appservice_association(self, service, room_alias, room_id,
+                                      servers=None):
+        if not service.is_interested_in_alias(room_alias.to_string()):
+            raise SynapseError(
+                400, "This application service has not reserved"
+                " this kind of alias.", errcode=Codes.EXCLUSIVE
+            )
+
+        # association creation for app services
+        yield self._create_association(room_alias, room_id, servers)
+
+    @defer.inlineCallbacks
     def delete_association(self, user_id, room_alias):
+        # association deletion for human users
+
         # TODO Check if server admin
 
+        can_delete = yield self.can_modify_alias(
+            room_alias,
+            user_id=user_id
+        )
+        if not can_delete:
+            raise SynapseError(
+                400, "This alias is reserved by an application service.",
+                errcode=Codes.EXCLUSIVE
+            )
+
+        yield self._delete_association(room_alias)
+
+    @defer.inlineCallbacks
+    def delete_appservice_association(self, service, room_alias):
+        if not service.is_interested_in_alias(room_alias.to_string()):
+            raise SynapseError(
+                400,
+                "This application service has not reserved this kind of alias",
+                errcode=Codes.EXCLUSIVE
+            )
+        yield self._delete_association(room_alias)
+
+    @defer.inlineCallbacks
+    def _delete_association(self, room_alias):
         if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room alias must be local")
 
-        room_id = yield self.store.delete_room_alias(room_alias)
+        yield self.store.delete_room_alias(room_alias)
 
-        if room_id:
-            yield self._update_room_alias_events(user_id, room_id)
+        # TODO - Looks like _update_room_alias_event has never been implemented
+        # if room_id:
+        #    yield self._update_room_alias_events(user_id, room_id)
 
     @defer.inlineCallbacks
     def get_association(self, room_alias):
         room_id = None
         if self.hs.is_mine(room_alias):
-            result = yield self.store.get_association_from_room_alias(
+            result = yield self.get_association_from_room_alias(
                 room_alias
             )
 
@@ -107,12 +160,21 @@ class DirectoryHandler(BaseHandler):
         if not room_id:
             raise SynapseError(
                 404,
-                "Room alias %r not found" % (room_alias.to_string(),),
+                "Room alias %s not found" % (room_alias.to_string(),),
                 Codes.NOT_FOUND
             )
 
         extra_servers = yield self.store.get_joined_hosts_for_room(room_id)
-        servers = list(set(extra_servers) | set(servers))
+        servers = set(extra_servers) | set(servers)
+
+        # If this server is in the list of servers, return it first.
+        if self.server_name in servers:
+            servers = (
+                [self.server_name]
+                + [s for s in servers if s != self.server_name]
+            )
+        else:
+            servers = list(servers)
 
         defer.returnValue({
             "room_id": room_id,
@@ -122,13 +184,13 @@ class DirectoryHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def on_directory_query(self, args):
-        room_alias = self.hs.parse_roomalias(args["room_alias"])
+        room_alias = RoomAlias.from_string(args["room_alias"])
         if not self.hs.is_mine(room_alias):
             raise SynapseError(
                 400, "Room Alias is not hosted on this Home Server"
             )
 
-        result = yield self.store.get_association_from_room_alias(
+        result = yield self.get_association_from_room_alias(
             room_alias
         )
 
@@ -156,3 +218,37 @@ class DirectoryHandler(BaseHandler):
             "sender": user_id,
             "content": {"aliases": aliases},
         }, ratelimit=False)
+
+    @defer.inlineCallbacks
+    def get_association_from_room_alias(self, room_alias):
+        result = yield self.store.get_association_from_room_alias(
+            room_alias
+        )
+        if not result:
+            # Query AS to see if it exists
+            as_handler = self.hs.get_handlers().appservice_handler
+            result = yield as_handler.query_room_alias_exists(room_alias)
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def can_modify_alias(self, alias, user_id=None):
+        # Any application service "interested" in an alias they are regexing on
+        # can modify the alias.
+        # Users can only modify the alias if ALL the interested services have
+        # non-exclusive locks on the alias (or there are no interested services)
+        services = yield self.store.get_app_services()
+        interested_services = [
+            s for s in services if s.is_interested_in_alias(alias.to_string())
+        ]
+
+        for service in interested_services:
+            if user_id == service.sender:
+                # this user IS the app service so they can do whatever they like
+                defer.returnValue(True)
+                return
+            elif service.is_exclusive_alias(alias.to_string()):
+                # another service has an exclusive lock on this alias.
+                defer.returnValue(False)
+                return
+        # either no interested services, or no service with an exclusive lock
+        defer.returnValue(True)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 103bc67c42..d3297b7292 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -17,10 +17,13 @@ from twisted.internet import defer
 
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
+from synapse.types import UserID
+from synapse.events.utils import serialize_event
 
 from ._base import BaseHandler
 
 import logging
+import random
 
 
 logger = logging.getLogger(__name__)
@@ -47,38 +50,46 @@ class EventStreamHandler(BaseHandler):
     @defer.inlineCallbacks
     @log_function
     def get_stream(self, auth_user_id, pagin_config, timeout=0,
-                   as_client_event=True):
-        auth_user = self.hs.parse_userid(auth_user_id)
+                   as_client_event=True, affect_presence=True):
+        auth_user = UserID.from_string(auth_user_id)
 
         try:
-            if auth_user not in self._streams_per_user:
-                self._streams_per_user[auth_user] = 0
-                if auth_user in self._stop_timer_per_user:
-                    try:
-                        self.clock.cancel_call_later(
-                            self._stop_timer_per_user.pop(auth_user)
+            if affect_presence:
+                if auth_user not in self._streams_per_user:
+                    self._streams_per_user[auth_user] = 0
+                    if auth_user in self._stop_timer_per_user:
+                        try:
+                            self.clock.cancel_call_later(
+                                self._stop_timer_per_user.pop(auth_user)
+                            )
+                        except:
+                            logger.exception("Failed to cancel event timer")
+                    else:
+                        yield self.distributor.fire(
+                            "started_user_eventstream", auth_user
                         )
-                    except:
-                        logger.exception("Failed to cancel event timer")
-                else:
-                    yield self.distributor.fire(
-                        "started_user_eventstream", auth_user
-                    )
-            self._streams_per_user[auth_user] += 1
-
-            if pagin_config.from_token is None:
-                pagin_config.from_token = None
+                self._streams_per_user[auth_user] += 1
 
             rm_handler = self.hs.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(auth_user)
 
+            if timeout:
+                # If they've set a timeout set a minimum limit.
+                timeout = max(timeout, 500)
+
+                # Add some randomness to this value to try and mitigate against
+                # thundering herds on restart.
+                timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
+
             with PreserveLoggingContext():
                 events, tokens = yield self.notifier.get_events_for(
                     auth_user, room_ids, pagin_config, timeout
                 )
 
+            time_now = self.clock.time_msec()
+
             chunks = [
-                self.hs.serialize_event(e, as_client_event) for e in events
+                serialize_event(e, time_now, as_client_event) for e in events
             ]
 
             chunk = {
@@ -90,27 +101,28 @@ class EventStreamHandler(BaseHandler):
             defer.returnValue(chunk)
 
         finally:
-            self._streams_per_user[auth_user] -= 1
-            if not self._streams_per_user[auth_user]:
-                del self._streams_per_user[auth_user]
-
-                # 10 seconds of grace to allow the client to reconnect again
-                #   before we think they're gone
-                def _later():
-                    logger.debug(
-                        "_later stopped_user_eventstream %s", auth_user
-                    )
+            if affect_presence:
+                self._streams_per_user[auth_user] -= 1
+                if not self._streams_per_user[auth_user]:
+                    del self._streams_per_user[auth_user]
+
+                    # 10 seconds of grace to allow the client to reconnect again
+                    #   before we think they're gone
+                    def _later():
+                        logger.debug(
+                            "_later stopped_user_eventstream %s", auth_user
+                        )
 
-                    self._stop_timer_per_user.pop(auth_user, None)
+                        self._stop_timer_per_user.pop(auth_user, None)
 
-                    yield self.distributor.fire(
-                        "stopped_user_eventstream", auth_user
-                    )
+                        return self.distributor.fire(
+                            "stopped_user_eventstream", auth_user
+                        )
 
-                logger.debug("Scheduling _later: for %s", auth_user)
-                self._stop_timer_per_user[auth_user] = (
-                    self.clock.call_later(30, _later)
-                )
+                    logger.debug("Scheduling _later: for %s", auth_user)
+                    self._stop_timer_per_user[auth_user] = (
+                        self.clock.call_later(30, _later)
+                    )
 
 
 class EventHandler(BaseHandler):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 81203bf1a3..ae4e9b316d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -17,21 +17,21 @@
 
 from ._base import BaseHandler
 
-from synapse.events.utils import prune_event
 from synapse.api.errors import (
-    AuthError, FederationError, SynapseError, StoreError,
+    AuthError, FederationError, StoreError,
 )
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
+from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
-    compute_event_signature, check_event_content_hash,
-    add_hashes_and_signatures,
+    compute_event_signature, add_hashes_and_signatures,
 )
-from syutil.jsonutil import encode_canonical_json
+from synapse.types import UserID
 
 from twisted.internet import defer
 
+import itertools
 import logging
 
 
@@ -112,33 +112,6 @@ class FederationHandler(BaseHandler):
 
         logger.debug("Processing event: %s", event.event_id)
 
-        redacted_event = prune_event(event)
-
-        redacted_pdu_json = redacted_event.get_pdu_json()
-        try:
-            yield self.keyring.verify_json_for_server(
-                event.origin, redacted_pdu_json
-            )
-        except SynapseError as e:
-            logger.warn(
-                "Signature check failed for %s redacted to %s",
-                encode_canonical_json(pdu.get_pdu_json()),
-                encode_canonical_json(redacted_pdu_json),
-            )
-            raise FederationError(
-                "ERROR",
-                e.code,
-                e.msg,
-                affected=event.event_id,
-            )
-
-        if not check_event_content_hash(event):
-            logger.warn(
-                "Event content has been tampered, redacting %s, %s",
-                event.event_id, encode_canonical_json(event.get_dict())
-            )
-            event = redacted_event
-
         logger.debug("Event: %s", event)
 
         # FIXME (erikj): Awful hack to make the case where we are not currently
@@ -148,41 +121,38 @@ class FederationHandler(BaseHandler):
             event.room_id,
             self.server_name
         )
-        if not is_in_room and not event.internal_metadata.outlier:
+        if not is_in_room and not event.internal_metadata.is_outlier():
             logger.debug("Got event for room we're not in.")
+            current_state = state
 
-            replication = self.replication_layer
+        event_ids = set()
+        if state:
+            event_ids |= {e.event_id for e in state}
+        if auth_chain:
+            event_ids |= {e.event_id for e in auth_chain}
 
-            if not state:
-                state, auth_chain = yield replication.get_state_for_room(
-                    origin, context=event.room_id, event_id=event.event_id,
-                )
+        seen_ids = set(
+            (yield self.store.have_events(event_ids)).keys()
+        )
 
-            if not auth_chain:
-                auth_chain = yield replication.get_event_auth(
-                    origin,
-                    context=event.room_id,
-                    event_id=event.event_id,
-                )
+        if state and auth_chain is not None:
+            # If we have any state or auth_chain given to us by the replication
+            # layer, then we should handle them (if we haven't before.)
+            for e in itertools.chain(auth_chain, state):
+                if e.event_id in seen_ids:
+                    continue
 
-            for e in auth_chain:
                 e.internal_metadata.outlier = True
                 try:
-                    yield self._handle_new_event(e, fetch_auth_from=origin)
-                except:
-                    logger.exception(
-                        "Failed to handle auth event %s",
-                        e.event_id,
+                    auth_ids = [e_id for e_id, _ in e.auth_events]
+                    auth = {
+                        (e.type, e.state_key): e for e in auth_chain
+                        if e.event_id in auth_ids
+                    }
+                    yield self._handle_new_event(
+                        origin, e, auth_events=auth
                     )
-
-            current_state = state
-
-        if state:
-            for e in state:
-                logging.info("A :) %r", e)
-                e.internal_metadata.outlier = True
-                try:
-                    yield self._handle_new_event(e)
+                    seen_ids.add(e.event_id)
                 except:
                     logger.exception(
                         "Failed to handle state event %s",
@@ -191,6 +161,7 @@ class FederationHandler(BaseHandler):
 
         try:
             yield self._handle_new_event(
+                origin,
                 event,
                 state=state,
                 backfilled=backfilled,
@@ -227,7 +198,7 @@ class FederationHandler(BaseHandler):
             extra_users = []
             if event.type == EventTypes.Member:
                 target_user_id = event.state_key
-                target_user = self.hs.parse_userid(target_user_id)
+                target_user = UserID.from_string(target_user_id)
                 extra_users.append(target_user)
 
             yield self.notifier.on_new_room_event(
@@ -236,7 +207,7 @@ class FederationHandler(BaseHandler):
 
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
-                user = self.hs.parse_userid(event.state_key)
+                user = UserID.from_string(event.state_key)
                 yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
                 )
@@ -305,7 +276,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def do_invite_join(self, target_host, room_id, joinee, content, snapshot):
+    def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot):
         """ Attempts to join the `joinee` to the room `room_id` via the
         server `target_host`.
 
@@ -319,8 +290,8 @@ class FederationHandler(BaseHandler):
         """
         logger.debug("Joining %s to %s", joinee, room_id)
 
-        pdu = yield self.replication_layer.make_join(
-            target_host,
+        origin, pdu = yield self.replication_layer.make_join(
+            target_hosts,
             room_id,
             joinee
         )
@@ -341,7 +312,7 @@ class FederationHandler(BaseHandler):
         self.room_queues[room_id] = []
 
         builder = self.event_builder_factory.new(
-            event.get_pdu_json()
+            unfreeze(event.get_pdu_json())
         )
 
         handled_events = set()
@@ -362,11 +333,20 @@ class FederationHandler(BaseHandler):
 
             new_event = builder.build()
 
+            # Try the host we successfully got a response to /make_join/
+            # request first.
+            try:
+                target_hosts.remove(origin)
+                target_hosts.insert(0, origin)
+            except ValueError:
+                pass
+
             ret = yield self.replication_layer.send_join(
-                target_host,
+                target_hosts,
                 new_event
             )
 
+            origin = ret["origin"]
             state = ret["state"]
             auth_chain = ret["auth_chain"]
             auth_chain.sort(key=lambda e: e.depth)
@@ -392,8 +372,19 @@ class FederationHandler(BaseHandler):
 
             for e in auth_chain:
                 e.internal_metadata.outlier = True
+
+                if e.event_id == event.event_id:
+                    continue
+
                 try:
-                    yield self._handle_new_event(e)
+                    auth_ids = [e_id for e_id, _ in e.auth_events]
+                    auth = {
+                        (e.type, e.state_key): e for e in auth_chain
+                        if e.event_id in auth_ids
+                    }
+                    yield self._handle_new_event(
+                        origin, e, auth_events=auth
+                    )
                 except:
                     logger.exception(
                         "Failed to handle auth event %s",
@@ -401,11 +392,18 @@ class FederationHandler(BaseHandler):
                     )
 
             for e in state:
-                # FIXME: Auth these.
+                if e.event_id == event.event_id:
+                    continue
+
                 e.internal_metadata.outlier = True
                 try:
+                    auth_ids = [e_id for e_id, _ in e.auth_events]
+                    auth = {
+                        (e.type, e.state_key): e for e in auth_chain
+                        if e.event_id in auth_ids
+                    }
                     yield self._handle_new_event(
-                        e, fetch_auth_from=target_host
+                        origin, e, auth_events=auth
                     )
                 except:
                     logger.exception(
@@ -413,10 +411,18 @@ class FederationHandler(BaseHandler):
                         e.event_id,
                     )
 
+            auth_ids = [e_id for e_id, _ in event.auth_events]
+            auth_events = {
+                (e.type, e.state_key): e for e in auth_chain
+                if e.event_id in auth_ids
+            }
+
             yield self._handle_new_event(
+                origin,
                 new_event,
                 state=state,
                 current_state=state,
+                auth_events=auth_events,
             )
 
             yield self.notifier.on_new_room_event(
@@ -480,7 +486,7 @@ class FederationHandler(BaseHandler):
 
         event.internal_metadata.outlier = False
 
-        context = yield self._handle_new_event(event)
+        context = yield self._handle_new_event(origin, event)
 
         logger.debug(
             "on_send_join_request: After _handle_new_event: %s, sigs: %s",
@@ -491,7 +497,7 @@ class FederationHandler(BaseHandler):
         extra_users = []
         if event.type == EventTypes.Member:
             target_user_id = event.state_key
-            target_user = self.hs.parse_userid(target_user_id)
+            target_user = UserID.from_string(target_user_id)
             extra_users.append(target_user)
 
         yield self.notifier.on_new_room_event(
@@ -500,7 +506,7 @@ class FederationHandler(BaseHandler):
 
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
-                user = self.hs.parse_userid(event.state_key)
+                user = UserID.from_string(event.state_key)
                 yield self.distributor.fire(
                     "user_joined_room", user=user, room_id=event.room_id
                 )
@@ -514,13 +520,15 @@ class FederationHandler(BaseHandler):
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
                         destinations.add(
-                            self.hs.parse_userid(s.state_key).domain
+                            UserID.from_string(s.state_key).domain
                         )
             except:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
                 )
 
+        destinations.discard(origin)
+
         logger.debug(
             "on_send_join_request: Sending event: %s, signatures: %s",
             event.event_id,
@@ -565,7 +573,7 @@ class FederationHandler(BaseHandler):
             backfilled=False,
         )
 
-        target_user = self.hs.parse_userid(event.state_key)
+        target_user = UserID.from_string(event.state_key)
         yield self.notifier.on_new_room_event(
             event, extra_users=[target_user],
         )
@@ -573,12 +581,13 @@ class FederationHandler(BaseHandler):
         defer.returnValue(event)
 
     @defer.inlineCallbacks
-    def get_state_for_pdu(self, origin, room_id, event_id):
+    def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True):
         yield run_on_reactor()
 
-        in_room = yield self.auth.check_host_in_room(room_id, origin)
-        if not in_room:
-            raise AuthError(403, "Host not in room.")
+        if do_auth:
+            in_room = yield self.auth.check_host_in_room(room_id, origin)
+            if not in_room:
+                raise AuthError(403, "Host not in room.")
 
         state_groups = yield self.store.get_state_groups(
             [event_id]
@@ -641,6 +650,7 @@ class FederationHandler(BaseHandler):
         event = yield self.store.get_event(
             event_id,
             allow_none=True,
+            allow_rejected=True,
         )
 
         if event:
@@ -681,11 +691,12 @@ class FederationHandler(BaseHandler):
             waiters.pop().callback(None)
 
     @defer.inlineCallbacks
-    def _handle_new_event(self, event, state=None, backfilled=False,
-                          current_state=None, fetch_auth_from=None):
+    @log_function
+    def _handle_new_event(self, origin, event, state=None, backfilled=False,
+                          current_state=None, auth_events=None):
 
         logger.debug(
-            "_handle_new_event: Before annotate: %s, sigs: %s",
+            "_handle_new_event: %s, sigs: %s",
             event.event_id, event.signatures,
         )
 
@@ -693,65 +704,46 @@ class FederationHandler(BaseHandler):
             event, old_state=state
         )
 
+        if not auth_events:
+            auth_events = context.auth_events
+
         logger.debug(
-            "_handle_new_event: Before auth fetch: %s, sigs: %s",
-            event.event_id, event.signatures,
+            "_handle_new_event: %s, auth_events: %s",
+            event.event_id, auth_events,
         )
 
         is_new_state = not event.internal_metadata.is_outlier()
 
-        known_ids = set(
-            [s.event_id for s in context.auth_events.values()]
-        )
-
-        for e_id, _ in event.auth_events:
-            if e_id not in known_ids:
-                e = yield self.store.get_event(e_id, allow_none=True)
-
-                if not e and fetch_auth_from is not None:
-                    # Grab the auth_chain over federation if we are missing
-                    # auth events.
-                    auth_chain = yield self.replication_layer.get_event_auth(
-                        fetch_auth_from, event.event_id, event.room_id
-                    )
-                    for auth_event in auth_chain:
-                        yield self._handle_new_event(auth_event)
-                    e = yield self.store.get_event(e_id, allow_none=True)
-
-                if not e:
-                    # TODO: Do some conflict res to make sure that we're
-                    # not the ones who are wrong.
-                    logger.info(
-                        "Rejecting %s as %s not in db or %s",
-                        event.event_id, e_id, known_ids,
-                    )
-                    # FIXME: How does raising AuthError work with federation?
-                    raise AuthError(403, "Cannot find auth event")
-
-                context.auth_events[(e.type, e.state_key)] = e
-
-        logger.debug(
-            "_handle_new_event: Before hack: %s, sigs: %s",
-            event.event_id, event.signatures,
-        )
-
+        # This is a hack to fix some old rooms where the initial join event
+        # didn't reference the create event in its auth events.
         if event.type == EventTypes.Member and not event.auth_events:
             if len(event.prev_events) == 1:
                 c = yield self.store.get_event(event.prev_events[0][0])
                 if c.type == EventTypes.Create:
-                    context.auth_events[(c.type, c.state_key)] = c
+                    auth_events[(c.type, c.state_key)] = c
 
-        logger.debug(
-            "_handle_new_event: Before auth check: %s, sigs: %s",
-            event.event_id, event.signatures,
-        )
+        try:
+            yield self.do_auth(
+                origin, event, context, auth_events=auth_events
+            )
+        except AuthError as e:
+            logger.warn(
+                "Rejecting %s because %s",
+                event.event_id, e.msg
+            )
 
-        self.auth.check(event, auth_events=context.auth_events)
+            context.rejected = RejectedReason.AUTH_ERROR
 
-        logger.debug(
-            "_handle_new_event: Before persist_event: %s, sigs: %s",
-            event.event_id, event.signatures,
-        )
+            # FIXME: Don't store as rejected with AUTH_ERROR if we haven't
+            # seen all the auth events.
+            yield self.store.persist_event(
+                event,
+                context=context,
+                backfilled=backfilled,
+                is_new_state=False,
+                current_state=current_state,
+            )
+            raise
 
         yield self.store.persist_event(
             event,
@@ -761,9 +753,388 @@ class FederationHandler(BaseHandler):
             current_state=current_state,
         )
 
-        logger.debug(
-            "_handle_new_event: After persist_event: %s, sigs: %s",
-            event.event_id, event.signatures,
+        defer.returnValue(context)
+
+    @defer.inlineCallbacks
+    def on_query_auth(self, origin, event_id, remote_auth_chain, rejects,
+                      missing):
+        # Just go through and process each event in `remote_auth_chain`. We
+        # don't want to fall into the trap of `missing` being wrong.
+        for e in remote_auth_chain:
+            try:
+                yield self._handle_new_event(origin, e)
+            except AuthError:
+                pass
+
+        # Now get the current auth_chain for the event.
+        local_auth_chain = yield self.store.get_auth_chain([event_id])
+
+        # TODO: Check if we would now reject event_id. If so we need to tell
+        # everyone.
+
+        ret = yield self.construct_auth_difference(
+            local_auth_chain, remote_auth_chain
         )
 
-        defer.returnValue(context)
+        for event in ret["auth_chain"]:
+            event.signatures.update(
+                compute_event_signature(
+                    event,
+                    self.hs.hostname,
+                    self.hs.config.signing_key[0]
+                )
+            )
+
+        logger.debug("on_query_auth returning: %s", ret)
+
+        defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def on_get_missing_events(self, origin, room_id, earliest_events,
+                              latest_events, limit, min_depth):
+        in_room = yield self.auth.check_host_in_room(
+            room_id,
+            origin
+        )
+        if not in_room:
+            raise AuthError(403, "Host not in room.")
+
+        limit = min(limit, 20)
+        min_depth = max(min_depth, 0)
+
+        missing_events = yield self.store.get_missing_events(
+            room_id=room_id,
+            earliest_events=earliest_events,
+            latest_events=latest_events,
+            limit=limit,
+            min_depth=min_depth,
+        )
+
+        defer.returnValue(missing_events)
+
+    @defer.inlineCallbacks
+    @log_function
+    def do_auth(self, origin, event, context, auth_events):
+        # Check if we have all the auth events.
+        have_events = yield self.store.have_events(
+            [e_id for e_id, _ in event.auth_events]
+        )
+
+        event_auth_events = set(e_id for e_id, _ in event.auth_events)
+        seen_events = set(have_events.keys())
+
+        missing_auth = event_auth_events - seen_events
+
+        if missing_auth:
+            logger.info("Missing auth: %s", missing_auth)
+            # If we don't have all the auth events, we need to get them.
+            try:
+                remote_auth_chain = yield self.replication_layer.get_event_auth(
+                    origin, event.room_id, event.event_id
+                )
+
+                seen_remotes = yield self.store.have_events(
+                    [e.event_id for e in remote_auth_chain]
+                )
+
+                for e in remote_auth_chain:
+                    if e.event_id in seen_remotes.keys():
+                        continue
+
+                    if e.event_id == event.event_id:
+                        continue
+
+                    try:
+                        auth_ids = [e_id for e_id, _ in e.auth_events]
+                        auth = {
+                            (e.type, e.state_key): e for e in remote_auth_chain
+                            if e.event_id in auth_ids
+                        }
+                        e.internal_metadata.outlier = True
+
+                        logger.debug(
+                            "do_auth %s missing_auth: %s",
+                            event.event_id, e.event_id
+                        )
+                        yield self._handle_new_event(
+                            origin, e, auth_events=auth
+                        )
+
+                        if e.event_id in event_auth_events:
+                            auth_events[(e.type, e.state_key)] = e
+                    except AuthError:
+                        pass
+
+                have_events = yield self.store.have_events(
+                    [e_id for e_id, _ in event.auth_events]
+                )
+                seen_events = set(have_events.keys())
+            except:
+                # FIXME:
+                logger.exception("Failed to get auth chain")
+
+        # FIXME: Assumes we have and stored all the state for all the
+        # prev_events
+        current_state = set(e.event_id for e in auth_events.values())
+        different_auth = event_auth_events - current_state
+
+        if different_auth and not event.internal_metadata.is_outlier():
+            # Do auth conflict res.
+            logger.info("Different auth: %s", different_auth)
+
+            different_events = yield defer.gatherResults(
+                [
+                    self.store.get_event(
+                        d,
+                        allow_none=True,
+                        allow_rejected=False,
+                    )
+                    for d in different_auth
+                    if d in have_events and not have_events[d]
+                ],
+                consumeErrors=True
+            )
+
+            if different_events:
+                local_view = dict(auth_events)
+                remote_view = dict(auth_events)
+                remote_view.update({
+                    (d.type, d.state_key): d for d in different_events
+                })
+
+                new_state, prev_state = self.state_handler.resolve_events(
+                    [local_view.values(), remote_view.values()],
+                    event
+                )
+
+                auth_events.update(new_state)
+
+                current_state = set(e.event_id for e in auth_events.values())
+                different_auth = event_auth_events - current_state
+
+                context.current_state.update(auth_events)
+                context.state_group = None
+
+        if different_auth and not event.internal_metadata.is_outlier():
+            logger.info("Different auth after resolution: %s", different_auth)
+
+            # Only do auth resolution if we have something new to say.
+            # We can't rove an auth failure.
+            do_resolution = False
+
+            provable = [
+                RejectedReason.NOT_ANCESTOR, RejectedReason.NOT_ANCESTOR,
+            ]
+
+            for e_id in different_auth:
+                if e_id in have_events:
+                    if have_events[e_id] in provable:
+                        do_resolution = True
+                        break
+
+            if do_resolution:
+                # 1. Get what we think is the auth chain.
+                auth_ids = self.auth.compute_auth_events(
+                    event, context.current_state
+                )
+                local_auth_chain = yield self.store.get_auth_chain(auth_ids)
+
+                try:
+                    # 2. Get remote difference.
+                    result = yield self.replication_layer.query_auth(
+                        origin,
+                        event.room_id,
+                        event.event_id,
+                        local_auth_chain,
+                    )
+
+                    seen_remotes = yield self.store.have_events(
+                        [e.event_id for e in result["auth_chain"]]
+                    )
+
+                    # 3. Process any remote auth chain events we haven't seen.
+                    for ev in result["auth_chain"]:
+                        if ev.event_id in seen_remotes.keys():
+                            continue
+
+                        if ev.event_id == event.event_id:
+                            continue
+
+                        try:
+                            auth_ids = [e_id for e_id, _ in ev.auth_events]
+                            auth = {
+                                (e.type, e.state_key): e
+                                for e in result["auth_chain"]
+                                if e.event_id in auth_ids
+                            }
+                            ev.internal_metadata.outlier = True
+
+                            logger.debug(
+                                "do_auth %s different_auth: %s",
+                                event.event_id, e.event_id
+                            )
+
+                            yield self._handle_new_event(
+                                origin, ev, auth_events=auth
+                            )
+
+                            if ev.event_id in event_auth_events:
+                                auth_events[(ev.type, ev.state_key)] = ev
+                        except AuthError:
+                            pass
+
+                except:
+                    # FIXME:
+                    logger.exception("Failed to query auth chain")
+
+                # 4. Look at rejects and their proofs.
+                # TODO.
+
+                context.current_state.update(auth_events)
+                context.state_group = None
+
+        try:
+            self.auth.check(event, auth_events=auth_events)
+        except AuthError:
+            raise
+
+    @defer.inlineCallbacks
+    def construct_auth_difference(self, local_auth, remote_auth):
+        """ Given a local and remote auth chain, find the differences. This
+        assumes that we have already processed all events in remote_auth
+
+        Params:
+            local_auth (list)
+            remote_auth (list)
+
+        Returns:
+            dict
+        """
+
+        logger.debug("construct_auth_difference Start!")
+
+        # TODO: Make sure we are OK with local_auth or remote_auth having more
+        # auth events in them than strictly necessary.
+
+        def sort_fun(ev):
+            return ev.depth, ev.event_id
+
+        logger.debug("construct_auth_difference after sort_fun!")
+
+        # We find the differences by starting at the "bottom" of each list
+        # and iterating up on both lists. The lists are ordered by depth and
+        # then event_id, we iterate up both lists until we find the event ids
+        # don't match. Then we look at depth/event_id to see which side is
+        # missing that event, and iterate only up that list. Repeat.
+
+        remote_list = list(remote_auth)
+        remote_list.sort(key=sort_fun)
+
+        local_list = list(local_auth)
+        local_list.sort(key=sort_fun)
+
+        local_iter = iter(local_list)
+        remote_iter = iter(remote_list)
+
+        logger.debug("construct_auth_difference before get_next!")
+
+        def get_next(it, opt=None):
+            try:
+                return it.next()
+            except:
+                return opt
+
+        current_local = get_next(local_iter)
+        current_remote = get_next(remote_iter)
+
+        logger.debug("construct_auth_difference before while")
+
+        missing_remotes = []
+        missing_locals = []
+        while current_local or current_remote:
+            if current_remote is None:
+                missing_locals.append(current_local)
+                current_local = get_next(local_iter)
+                continue
+
+            if current_local is None:
+                missing_remotes.append(current_remote)
+                current_remote = get_next(remote_iter)
+                continue
+
+            if current_local.event_id == current_remote.event_id:
+                current_local = get_next(local_iter)
+                current_remote = get_next(remote_iter)
+                continue
+
+            if current_local.depth < current_remote.depth:
+                missing_locals.append(current_local)
+                current_local = get_next(local_iter)
+                continue
+
+            if current_local.depth > current_remote.depth:
+                missing_remotes.append(current_remote)
+                current_remote = get_next(remote_iter)
+                continue
+
+            # They have the same depth, so we fall back to the event_id order
+            if current_local.event_id < current_remote.event_id:
+                missing_locals.append(current_local)
+                current_local = get_next(local_iter)
+
+            if current_local.event_id > current_remote.event_id:
+                missing_remotes.append(current_remote)
+                current_remote = get_next(remote_iter)
+                continue
+
+        logger.debug("construct_auth_difference after while")
+
+        # missing locals should be sent to the server
+        # We should find why we are missing remotes, as they will have been
+        # rejected.
+
+        # Remove events from missing_remotes if they are referencing a missing
+        # remote. We only care about the "root" rejected ones.
+        missing_remote_ids = [e.event_id for e in missing_remotes]
+        base_remote_rejected = list(missing_remotes)
+        for e in missing_remotes:
+            for e_id, _ in e.auth_events:
+                if e_id in missing_remote_ids:
+                    try:
+                        base_remote_rejected.remove(e)
+                    except ValueError:
+                        pass
+
+        reason_map = {}
+
+        for e in base_remote_rejected:
+            reason = yield self.store.get_rejection_reason(e.event_id)
+            if reason is None:
+                # TODO: e is not in the current state, so we should
+                # construct some proof of that.
+                continue
+
+            reason_map[e.event_id] = reason
+
+            if reason == RejectedReason.AUTH_ERROR:
+                pass
+            elif reason == RejectedReason.REPLACED:
+                # TODO: Get proof
+                pass
+            elif reason == RejectedReason.NOT_ANCESTOR:
+                # TODO: Get proof.
+                pass
+
+        logger.debug("construct_auth_difference returning")
+
+        defer.returnValue({
+            "auth_chain": local_auth,
+            "rejects": {
+                e.event_id: {
+                    "reason": reason_map[e.event_id],
+                    "proof": None,
+                }
+                for e in base_remote_rejected
+            },
+            "missing": [e.event_id for e in missing_locals],
+        })
diff --git a/synapse/handlers/login.py b/synapse/handlers/login.py
index d297d71c03..7447800460 100644
--- a/synapse/handlers/login.py
+++ b/synapse/handlers/login.py
@@ -16,12 +16,13 @@
 from twisted.internet import defer
 
 from ._base import BaseHandler
-from synapse.api.errors import LoginError, Codes
+from synapse.api.errors import LoginError, Codes, CodeMessageException
 from synapse.http.client import SimpleHttpClient
 from synapse.util.emailutils import EmailException
 import synapse.util.emailutils as emailutils
 
 import bcrypt
+import json
 import logging
 
 logger = logging.getLogger(__name__)
@@ -96,16 +97,20 @@ class LoginHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _query_email(self, email):
-        httpCli = SimpleHttpClient(self.hs)
-        data = yield httpCli.get_json(
-            # TODO FIXME This should be configurable.
-            # XXX: ID servers need to use HTTPS
-            "http://%s%s" % (
-                "matrix.org:8090", "/_matrix/identity/api/v1/lookup"
-            ),
-            {
-                'medium': 'email',
-                'address': email
-            }
-        )
-        defer.returnValue(data)
+        http_client = SimpleHttpClient(self.hs)
+        try:
+            data = yield http_client.get_json(
+                # TODO FIXME This should be configurable.
+                # XXX: ID servers need to use HTTPS
+                "http://%s%s" % (
+                    "matrix.org:8090", "/_matrix/identity/api/v1/lookup"
+                ),
+                {
+                    'medium': 'email',
+                    'address': email
+                }
+            )
+            defer.returnValue(data)
+        except CodeMessageException as e:
+            data = json.loads(e.msg)
+            defer.returnValue(data)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f2a2f16933..7b9685be7f 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,10 +16,12 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import RoomError
+from synapse.api.errors import RoomError, SynapseError
 from synapse.streams.config import PaginationConfig
+from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.types import UserID
 
 from ._base import BaseHandler
 
@@ -33,6 +35,7 @@ class MessageHandler(BaseHandler):
     def __init__(self, hs):
         super(MessageHandler, self).__init__(hs)
         self.hs = hs
+        self.state = hs.get_state_handler()
         self.clock = hs.get_clock()
         self.validator = EventValidator()
 
@@ -89,7 +92,7 @@ class MessageHandler(BaseHandler):
                 yield self.hs.get_event_sources().get_current_token()
             )
 
-        user = self.hs.parse_userid(user_id)
+        user = UserID.from_string(user_id)
 
         events, next_key = yield data_source.get_pagination_rows(
             user, pagin_config.get_source_config("room"), room_id
@@ -99,9 +102,11 @@ class MessageHandler(BaseHandler):
             "room_key", next_key
         )
 
+        time_now = self.clock.time_msec()
+
         chunk = {
             "chunk": [
-                self.hs.serialize_event(e, as_client_event) for e in events
+                serialize_event(e, time_now, as_client_event) for e in events
             ],
             "start": pagin_config.from_token.to_string(),
             "end": next_token.to_string(),
@@ -110,7 +115,8 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
-    def create_and_send_event(self, event_dict, ratelimit=True):
+    def create_and_send_event(self, event_dict, ratelimit=True,
+                              client=None, txn_id=None):
         """ Given a dict from a client, create and handle a new event.
 
         Creates an FrozenEvent object, filling out auth_events, prev_events,
@@ -130,13 +136,13 @@ class MessageHandler(BaseHandler):
         if ratelimit:
             self.ratelimit(builder.user_id)
         # TODO(paul): Why does 'event' not have a 'user' object?
-        user = self.hs.parse_userid(builder.user_id)
+        user = UserID.from_string(builder.user_id)
         assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
 
         if builder.type == EventTypes.Member:
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
-                joinee = self.hs.parse_userid(builder.state_key)
+                joinee = UserID.from_string(builder.state_key)
                 # If event doesn't include a display name, add one.
                 yield self.distributor.fire(
                     "collect_presencelike_data",
@@ -144,6 +150,15 @@ class MessageHandler(BaseHandler):
                     builder.content
                 )
 
+        if client is not None:
+            if client.token_id is not None:
+                builder.internal_metadata.token_id = client.token_id
+            if client.device_id is not None:
+                builder.internal_metadata.device_id = client.device_id
+
+        if txn_id is not None:
+            builder.internal_metadata.txn_id = txn_id
+
         event, context = yield self._create_new_client_event(
             builder=builder,
         )
@@ -210,7 +225,10 @@ class MessageHandler(BaseHandler):
 
         # TODO: This is duplicating logic from snapshot_all_rooms
         current_state = yield self.state_handler.get_current_state(room_id)
-        defer.returnValue([self.hs.serialize_event(c) for c in current_state])
+        now = self.clock.time_msec()
+        defer.returnValue(
+            [serialize_event(c, now) for c in current_state.values()]
+        )
 
     @defer.inlineCallbacks
     def snapshot_all_rooms(self, user_id=None, pagin_config=None,
@@ -237,7 +255,7 @@ class MessageHandler(BaseHandler):
             membership_list=[Membership.INVITE, Membership.JOIN]
         )
 
-        user = self.hs.parse_userid(user_id)
+        user = UserID.from_string(user_id)
 
         rooms_ret = []
 
@@ -282,10 +300,11 @@ class MessageHandler(BaseHandler):
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
                 end_token = now_token.copy_and_replace("room_key", token[1])
+                time_now = self.clock.time_msec()
 
                 d["messages"] = {
                     "chunk": [
-                        self.hs.serialize_event(m, as_client_event)
+                        serialize_event(m, time_now, as_client_event)
                         for m in messages
                     ],
                     "start": start_token.to_string(),
@@ -296,7 +315,8 @@ class MessageHandler(BaseHandler):
                     event.room_id
                 )
                 d["state"] = [
-                    self.hs.serialize_event(c) for c in current_state
+                    serialize_event(c, time_now, as_client_event)
+                    for c in current_state.values()
                 ]
             except:
                 logger.exception("Failed to get snapshot")
@@ -312,20 +332,27 @@ class MessageHandler(BaseHandler):
     @defer.inlineCallbacks
     def room_initial_sync(self, user_id, room_id, pagin_config=None,
                           feedback=False):
-        yield self.auth.check_joined_room(room_id, user_id)
+        current_state = yield self.state.get_current_state(
+            room_id=room_id,
+        )
+
+        yield self.auth.check_joined_room(
+            room_id, user_id,
+            current_state=current_state
+        )
 
         # TODO(paul): I wish I was called with user objects not user_id
         #   strings...
-        auth_user = self.hs.parse_userid(user_id)
+        auth_user = UserID.from_string(user_id)
 
         # TODO: These concurrently
-        state_tuples = yield self.state_handler.get_current_state(room_id)
-        state = [self.hs.serialize_event(x) for x in state_tuples]
+        time_now = self.clock.time_msec()
+        state = [
+            serialize_event(x, time_now)
+            for x in current_state.values()
+        ]
 
-        member_event = (yield self.store.get_room_member(
-            user_id=user_id,
-            room_id=room_id
-        ))
+        member_event = current_state.get((EventTypes.Member, user_id,))
 
         now_token = yield self.hs.get_event_sources().get_current_token()
 
@@ -342,28 +369,34 @@ class MessageHandler(BaseHandler):
         start_token = now_token.copy_and_replace("room_key", token[0])
         end_token = now_token.copy_and_replace("room_key", token[1])
 
-        room_members = yield self.store.get_room_members(room_id)
+        room_members = [
+            m for m in current_state.values()
+            if m.type == EventTypes.Member
+            and m.content["membership"] == Membership.JOIN
+        ]
 
         presence_handler = self.hs.get_handlers().presence_handler
         presence = []
         for m in room_members:
             try:
                 member_presence = yield presence_handler.get_state(
-                    target_user=self.hs.parse_userid(m.user_id),
+                    target_user=UserID.from_string(m.user_id),
                     auth_user=auth_user,
                     as_event=True,
                 )
                 presence.append(member_presence)
-            except Exception:
+            except SynapseError:
                 logger.exception(
                     "Failed to get member presence of %r", m.user_id
                 )
 
+        time_now = self.clock.time_msec()
+
         defer.returnValue({
             "membership": member_event.membership,
             "room_id": room_id,
             "messages": {
-                "chunk": [self.hs.serialize_event(m) for m in messages],
+                "chunk": [serialize_event(m, time_now) for m in messages],
                 "start": start_token.to_string(),
                 "end": end_token.to_string(),
             },
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 8aeed99274..8ef248ecf2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -20,6 +20,7 @@ from synapse.api.constants import PresenceState
 
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.types import UserID
 
 from ._base import BaseHandler
 
@@ -86,6 +87,10 @@ class PresenceHandler(BaseHandler):
             "changed_presencelike_data", self.changed_presencelike_data
         )
 
+        # outbound signal from the presence module to advertise when a user's
+        # presence has changed
+        distributor.declare("user_presence_changed")
+
         self.distributor = distributor
 
         self.federation = hs.get_replication_layer()
@@ -96,22 +101,22 @@ class PresenceHandler(BaseHandler):
         self.federation.register_edu_handler(
             "m.presence_invite",
             lambda origin, content: self.invite_presence(
-                observed_user=hs.parse_userid(content["observed_user"]),
-                observer_user=hs.parse_userid(content["observer_user"]),
+                observed_user=UserID.from_string(content["observed_user"]),
+                observer_user=UserID.from_string(content["observer_user"]),
             )
         )
         self.federation.register_edu_handler(
             "m.presence_accept",
             lambda origin, content: self.accept_presence(
-                observed_user=hs.parse_userid(content["observed_user"]),
-                observer_user=hs.parse_userid(content["observer_user"]),
+                observed_user=UserID.from_string(content["observed_user"]),
+                observer_user=UserID.from_string(content["observer_user"]),
             )
         )
         self.federation.register_edu_handler(
             "m.presence_deny",
             lambda origin, content: self.deny_presence(
-                observed_user=hs.parse_userid(content["observed_user"]),
-                observer_user=hs.parse_userid(content["observer_user"]),
+                observed_user=UserID.from_string(content["observed_user"]),
+                observer_user=UserID.from_string(content["observer_user"]),
             )
         )
 
@@ -418,7 +423,7 @@ class PresenceHandler(BaseHandler):
         )
 
         for p in presence:
-            observed_user = self.hs.parse_userid(p.pop("observed_user_id"))
+            observed_user = UserID.from_string(p.pop("observed_user_id"))
             p["observed_user"] = observed_user
             p.update(self._get_or_offline_usercache(observed_user).get_state())
             if "last_active" in p:
@@ -441,7 +446,7 @@ class PresenceHandler(BaseHandler):
                 user.localpart, accepted=True
             )
             target_users = set([
-                self.hs.parse_userid(x["observed_user_id"]) for x in presence
+                UserID.from_string(x["observed_user_id"]) for x in presence
             ])
 
             # Also include people in all my rooms
@@ -452,9 +457,9 @@ class PresenceHandler(BaseHandler):
         if state is None:
             state = yield self.store.get_presence_state(user.localpart)
         else:
-#            statuscache = self._get_or_make_usercache(user)
-#            self._user_cachemap_latest_serial += 1
-#            statuscache.update(state, self._user_cachemap_latest_serial)
+            # statuscache = self._get_or_make_usercache(user)
+            # self._user_cachemap_latest_serial += 1
+            # statuscache.update(state, self._user_cachemap_latest_serial)
             pass
 
         yield self.push_update_to_local_and_remote(
@@ -487,7 +492,7 @@ class PresenceHandler(BaseHandler):
                 user, domain, remoteusers
             ))
 
-        yield defer.DeferredList(deferreds)
+        yield defer.DeferredList(deferreds, consumeErrors=True)
 
     def _start_polling_local(self, user, target_user):
         target_localpart = target_user.localpart
@@ -543,7 +548,7 @@ class PresenceHandler(BaseHandler):
                 self._stop_polling_remote(user, domain, remoteusers)
             )
 
-        return defer.DeferredList(deferreds)
+        return defer.DeferredList(deferreds, consumeErrors=True)
 
     def _stop_polling_local(self, user, target_user):
         for localpart in self._local_pushmap.keys():
@@ -603,6 +608,7 @@ class PresenceHandler(BaseHandler):
             room_ids=room_ids,
             statuscache=statuscache,
         )
+        yield self.distributor.fire("user_presence_changed", user, statuscache)
 
     @defer.inlineCallbacks
     def _push_presence_remote(self, user, destination, state=None):
@@ -646,13 +652,15 @@ class PresenceHandler(BaseHandler):
         deferreds = []
 
         for push in content.get("push", []):
-            user = self.hs.parse_userid(push["user_id"])
+            user = UserID.from_string(push["user_id"])
 
             logger.debug("Incoming presence update from %s", user)
 
             observers = set(self._remote_recvmap.get(user, set()))
             if observers:
-                logger.debug(" | %d interested local observers %r", len(observers), observers)
+                logger.debug(
+                    " | %d interested local observers %r", len(observers), observers
+                )
 
             rm_handler = self.homeserver.get_handlers().room_member_handler
             room_ids = yield rm_handler.get_rooms_for_user(user)
@@ -694,14 +702,14 @@ class PresenceHandler(BaseHandler):
                 del self._user_cachemap[user]
 
         for poll in content.get("poll", []):
-            user = self.hs.parse_userid(poll)
+            user = UserID.from_string(poll)
 
             if not self.hs.is_mine(user):
                 continue
 
             # TODO(paul) permissions checks
 
-            if not user in self._remote_sendmap:
+            if user not in self._remote_sendmap:
                 self._remote_sendmap[user] = set()
 
             self._remote_sendmap[user].add(origin)
@@ -709,7 +717,7 @@ class PresenceHandler(BaseHandler):
             deferreds.append(self._push_presence_remote(user, origin))
 
         for unpoll in content.get("unpoll", []):
-            user = self.hs.parse_userid(unpoll)
+            user = UserID.from_string(unpoll)
 
             if not self.hs.is_mine(user):
                 continue
@@ -721,7 +729,7 @@ class PresenceHandler(BaseHandler):
                     del self._remote_sendmap[user]
 
         with PreserveLoggingContext():
-            yield defer.DeferredList(deferreds)
+            yield defer.DeferredList(deferreds, consumeErrors=True)
 
     @defer.inlineCallbacks
     def push_update_to_local_and_remote(self, observed_user, statuscache,
@@ -760,7 +768,7 @@ class PresenceHandler(BaseHandler):
                 )
             )
 
-        yield defer.DeferredList(deferreds)
+        yield defer.DeferredList(deferreds, consumeErrors=True)
 
         defer.returnValue((localusers, remote_domains))
 
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 7777d3cc94..2ddf9d5378 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, AuthError, CodeMessageException
 from synapse.api.constants import EventTypes, Membership
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.types import UserID
 
 from ._base import BaseHandler
 
@@ -169,7 +170,7 @@ class ProfileHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def on_profile_query(self, args):
-        user = self.hs.parse_userid(args["user_id"])
+        user = UserID.from_string(args["user_id"])
         if not self.hs.is_mine(user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
@@ -211,10 +212,16 @@ class ProfileHandler(BaseHandler):
             )
 
             msg_handler = self.hs.get_handlers().message_handler
-            yield msg_handler.create_and_send_event({
-                "type": EventTypes.Member,
-                "room_id": j.room_id,
-                "state_key": user.to_string(),
-                "content": content,
-                "sender": user.to_string()
-            }, ratelimit=False)
+            try:
+                yield msg_handler.create_and_send_event({
+                    "type": EventTypes.Member,
+                    "room_id": j.room_id,
+                    "state_key": user.to_string(),
+                    "content": content,
+                    "sender": user.to_string()
+                }, ratelimit=False)
+            except Exception as e:
+                logger.warn(
+                    "Failed to update join event for room %s - %s",
+                    j.room_id, str(e.message)
+                )
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 732652c228..cda4a8502a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -18,7 +18,8 @@ from twisted.internet import defer
 
 from synapse.types import UserID
 from synapse.api.errors import (
-    SynapseError, RegistrationError, InvalidCaptchaError
+    AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError,
+    CodeMessageException
 )
 from ._base import BaseHandler
 import synapse.util.stringutils as stringutils
@@ -28,6 +29,7 @@ from synapse.http.client import CaptchaServerHttpClient
 
 import base64
 import bcrypt
+import json
 import logging
 
 logger = logging.getLogger(__name__)
@@ -64,6 +66,8 @@ class RegistrationHandler(BaseHandler):
             user = UserID(localpart, self.hs.hostname)
             user_id = user.to_string()
 
+            yield self.check_user_id_is_valid(user_id)
+
             token = self._generate_token(user_id)
             yield self.store.register(
                 user_id=user_id,
@@ -82,6 +86,7 @@ class RegistrationHandler(BaseHandler):
                     localpart = self._generate_user_id()
                     user = UserID(localpart, self.hs.hostname)
                     user_id = user.to_string()
+                    yield self.check_user_id_is_valid(user_id)
 
                     token = self._generate_token(user_id)
                     yield self.store.register(
@@ -99,6 +104,47 @@ class RegistrationHandler(BaseHandler):
                         raise RegistrationError(
                             500, "Cannot generate user ID.")
 
+        # create a default avatar for the user
+        # XXX: ideally clients would explicitly specify one, but given they don't
+        # and we want consistent and pretty identicons for random users, we'll
+        # do it here.
+        try:
+            auth_user = UserID.from_string(user_id)
+            media_repository = self.hs.get_resource_for_media_repository()
+            identicon_resource = media_repository.getChildWithDefault("identicon", None)
+            upload_resource = media_repository.getChildWithDefault("upload", None)
+            identicon_bytes = identicon_resource.generate_identicon(user_id, 320, 320)
+            content_uri = yield upload_resource.create_content(
+                "image/png", None, identicon_bytes, len(identicon_bytes), auth_user
+            )
+            profile_handler = self.hs.get_handlers().profile_handler
+            profile_handler.set_avatar_url(
+                auth_user, auth_user, ("%s#auto" % (content_uri,))
+            )
+        except NotImplementedError:
+            pass  # make tests pass without messing around creating default avatars
+
+        defer.returnValue((user_id, token))
+
+    @defer.inlineCallbacks
+    def appservice_register(self, user_localpart, as_token):
+        user = UserID(user_localpart, self.hs.hostname)
+        user_id = user.to_string()
+        service = yield self.store.get_app_service_by_token(as_token)
+        if not service:
+            raise AuthError(403, "Invalid application service token.")
+        if not service.is_interested_in_user(user_id):
+            raise SynapseError(
+                400, "Invalid user localpart for this application service.",
+                errcode=Codes.EXCLUSIVE
+            )
+        token = self._generate_token(user_id)
+        yield self.store.register(
+            user_id=user_id,
+            token=token,
+            password_hash=""
+        )
+        self.distributor.fire("registered_user", user)
         defer.returnValue((user_id, token))
 
     @defer.inlineCallbacks
@@ -147,6 +193,21 @@ class RegistrationHandler(BaseHandler):
             # XXX: This should be a deferred list, shouldn't it?
             yield self._bind_threepid(c, user_id)
 
+    @defer.inlineCallbacks
+    def check_user_id_is_valid(self, user_id):
+        # valid user IDs must not clash with any user ID namespaces claimed by
+        # application services.
+        services = yield self.store.get_app_services()
+        interested_services = [
+            s for s in services if s.is_interested_in_user(user_id)
+        ]
+        for service in interested_services:
+            if service.is_exclusive_user(user_id):
+                raise SynapseError(
+                    400, "This user ID is reserved by an application service.",
+                    errcode=Codes.EXCLUSIVE
+                )
+
     def _generate_token(self, user_id):
         # urlsafe variant uses _ and - so use . as the separator and replace
         # all =s with .s so http clients don't quote =s when it is used as
@@ -161,21 +222,26 @@ class RegistrationHandler(BaseHandler):
     def _threepid_from_creds(self, creds):
         # TODO: get this from the homeserver rather than creating a new one for
         # each request
-        httpCli = SimpleHttpClient(self.hs)
+        http_client = SimpleHttpClient(self.hs)
         # XXX: make this configurable!
-        trustedIdServers = ['matrix.org:8090']
+        trustedIdServers = ['matrix.org:8090', 'matrix.org']
         if not creds['idServer'] in trustedIdServers:
             logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
                         'credentials', creds['idServer'])
             defer.returnValue(None)
-        data = yield httpCli.get_json(
-            # XXX: This should be HTTPS
-            "http://%s%s" % (
-                creds['idServer'],
-                "/_matrix/identity/api/v1/3pid/getValidated3pid"
-            ),
-            {'sid': creds['sid'], 'clientSecret': creds['clientSecret']}
-        )
+
+        data = {}
+        try:
+            data = yield http_client.get_json(
+                # XXX: This should be HTTPS
+                "http://%s%s" % (
+                    creds['idServer'],
+                    "/_matrix/identity/api/v1/3pid/getValidated3pid"
+                ),
+                {'sid': creds['sid'], 'clientSecret': creds['clientSecret']}
+            )
+        except CodeMessageException as e:
+            data = json.loads(e.msg)
 
         if 'medium' in data:
             defer.returnValue(data)
@@ -185,19 +251,23 @@ class RegistrationHandler(BaseHandler):
     def _bind_threepid(self, creds, mxid):
         yield
         logger.debug("binding threepid")
-        httpCli = SimpleHttpClient(self.hs)
-        data = yield httpCli.post_urlencoded_get_json(
-            # XXX: Change when ID servers are all HTTPS
-            "http://%s%s" % (
-                creds['idServer'], "/_matrix/identity/api/v1/3pid/bind"
-            ),
-            {
-                'sid': creds['sid'],
-                'clientSecret': creds['clientSecret'],
-                'mxid': mxid,
-            }
-        )
-        logger.debug("bound threepid")
+        http_client = SimpleHttpClient(self.hs)
+        data = None
+        try:
+            data = yield http_client.post_urlencoded_get_json(
+                # XXX: Change when ID servers are all HTTPS
+                "http://%s%s" % (
+                    creds['idServer'], "/_matrix/identity/api/v1/3pid/bind"
+                ),
+                {
+                    'sid': creds['sid'],
+                    'clientSecret': creds['clientSecret'],
+                    'mxid': mxid,
+                }
+            )
+            logger.debug("bound threepid")
+        except CodeMessageException as e:
+            data = json.loads(e.msg)
         defer.returnValue(data)
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6d0db18e51..80f7ee3f12 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -16,12 +16,14 @@
 """Contains functions for performing events on rooms."""
 from twisted.internet import defer
 
+from ._base import BaseHandler
+
 from synapse.types import UserID, RoomAlias, RoomID
 from synapse.api.constants import EventTypes, Membership, JoinRules
 from synapse.api.errors import StoreError, SynapseError
 from synapse.util import stringutils
 from synapse.util.async import run_on_reactor
-from ._base import BaseHandler
+from synapse.events.utils import serialize_event
 
 import logging
 
@@ -64,7 +66,7 @@ class RoomCreationHandler(BaseHandler):
         invite_list = config.get("invite", [])
         for i in invite_list:
             try:
-                self.hs.parse_userid(i)
+                UserID.from_string(i)
             except:
                 raise SynapseError(400, "Invalid user_id: %s" % (i,))
 
@@ -114,7 +116,7 @@ class RoomCreationHandler(BaseHandler):
                 servers=[self.hs.hostname],
             )
 
-        user = self.hs.parse_userid(user_id)
+        user = UserID.from_string(user_id)
         creation_events = self._create_events_for_new_room(
             user, room_id, is_public=is_public
         )
@@ -246,11 +248,9 @@ class RoomMemberHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_room_members(self, room_id):
-        hs = self.hs
-
         users = yield self.store.get_users_in_room(room_id)
 
-        defer.returnValue([hs.parse_userid(u) for u in users])
+        defer.returnValue([UserID.from_string(u) for u in users])
 
     @defer.inlineCallbacks
     def fetch_room_distributions_into(self, room_id, localusers=None,
@@ -295,8 +295,9 @@ class RoomMemberHandler(BaseHandler):
         yield self.auth.check_joined_room(room_id, user_id)
 
         member_list = yield self.store.get_room_members(room_id=room_id)
+        time_now = self.clock.time_msec()
         event_list = [
-            self.hs.serialize_event(entry)
+            serialize_event(entry, time_now)
             for entry in member_list
         ]
         chunk_data = {
@@ -368,7 +369,7 @@ class RoomMemberHandler(BaseHandler):
             )
 
             if prev_state and prev_state.membership == Membership.JOIN:
-                user = self.hs.parse_userid(event.user_id)
+                user = UserID.from_string(event.user_id)
                 self.distributor.fire(
                     "user_left_room", user=user, room_id=event.room_id
                 )
@@ -388,8 +389,6 @@ class RoomMemberHandler(BaseHandler):
         if not hosts:
             raise SynapseError(404, "No known servers")
 
-        host = hosts[0]
-
         # If event doesn't include a display name, add one.
         yield self.distributor.fire(
             "collect_presencelike_data", joinee, content
@@ -406,13 +405,13 @@ class RoomMemberHandler(BaseHandler):
         })
         event, context = yield self._create_new_client_event(builder)
 
-        yield self._do_join(event, context, room_host=host, do_auth=True)
+        yield self._do_join(event, context, room_hosts=hosts, do_auth=True)
 
         defer.returnValue({"room_id": room_id})
 
     @defer.inlineCallbacks
-    def _do_join(self, event, context, room_host=None, do_auth=True):
-        joinee = self.hs.parse_userid(event.state_key)
+    def _do_join(self, event, context, room_hosts=None, do_auth=True):
+        joinee = UserID.from_string(event.state_key)
         # room_id = RoomID.from_string(event.room_id, self.hs)
         room_id = event.room_id
 
@@ -440,7 +439,7 @@ class RoomMemberHandler(BaseHandler):
 
         if is_host_in_room:
             should_do_dance = False
-        elif room_host:  # TODO: Shouldn't this be remote_room_host?
+        elif room_hosts:  # TODO: Shouldn't this be remote_room_host?
             should_do_dance = True
         else:
             # TODO(markjh): get prev_state from snapshot
@@ -452,7 +451,7 @@ class RoomMemberHandler(BaseHandler):
                 inviter = UserID.from_string(prev_state.user_id)
 
                 should_do_dance = not self.hs.is_mine(inviter)
-                room_host = inviter.domain
+                room_hosts = [inviter.domain]
             else:
                 # return the same error as join_room_alias does
                 raise SynapseError(404, "No known servers")
@@ -460,10 +459,10 @@ class RoomMemberHandler(BaseHandler):
         if should_do_dance:
             handler = self.hs.get_handlers().federation_handler
             yield handler.do_invite_join(
-                room_host,
+                room_hosts,
                 room_id,
                 event.user_id,
-                event.get_dict()["content"],  # FIXME To get a non-frozen dict
+                event.content,  # FIXME To get a non-frozen dict
                 context
             )
         else:
@@ -476,7 +475,7 @@ class RoomMemberHandler(BaseHandler):
                 do_auth=do_auth,
             )
 
-        user = self.hs.parse_userid(event.user_id)
+        user = UserID.from_string(event.user_id)
         yield self.distributor.fire(
             "user_joined_room", user=user, room_id=room_id
         )
@@ -511,9 +510,16 @@ class RoomMemberHandler(BaseHandler):
     def get_rooms_for_user(self, user, membership_list=[Membership.JOIN]):
         """Returns a list of roomids that the user has any of the given
         membership states in."""
-        rooms = yield self.store.get_rooms_for_user_where_membership_is(
-            user_id=user.to_string(), membership_list=membership_list
+
+        app_service = yield self.store.get_app_service_by_user_id(
+            user.to_string()
         )
+        if app_service:
+            rooms = yield self.store.get_app_service_rooms(app_service)
+        else:
+            rooms = yield self.store.get_rooms_for_user_where_membership_is(
+                user_id=user.to_string(), membership_list=membership_list
+            )
 
         # For some reason the list of events contains duplicates
         # TODO(paul): work out why because I really don't think it should
@@ -526,7 +532,7 @@ class RoomMemberHandler(BaseHandler):
                                     do_auth):
         yield run_on_reactor()
 
-        target_user = self.hs.parse_userid(event.state_key)
+        target_user = UserID.from_string(event.state_key)
 
         yield self.handle_new_client_event(
             event,
@@ -560,13 +566,24 @@ class RoomEventSource(object):
 
         to_key = yield self.get_current_key()
 
-        events, end_key = yield self.store.get_room_events_stream(
-            user_id=user.to_string(),
-            from_key=from_key,
-            to_key=to_key,
-            room_id=None,
-            limit=limit,
+        app_service = yield self.store.get_app_service_by_user_id(
+            user.to_string()
         )
+        if app_service:
+            events, end_key = yield self.store.get_appservice_room_stream(
+                service=app_service,
+                from_key=from_key,
+                to_key=to_key,
+                limit=limit,
+            )
+        else:
+            events, end_key = yield self.store.get_room_events_stream(
+                user_id=user.to_string(),
+                from_key=from_key,
+                to_key=to_key,
+                room_id=None,
+                limit=limit,
+            )
 
         defer.returnValue((events, end_key))
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
new file mode 100644
index 0000000000..7883bbd834
--- /dev/null
+++ b/synapse/handlers/sync.py
@@ -0,0 +1,439 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import BaseHandler
+
+from synapse.streams.config import PaginationConfig
+from synapse.api.constants import Membership, EventTypes
+
+from twisted.internet import defer
+
+import collections
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+SyncConfig = collections.namedtuple("SyncConfig", [
+    "user",
+    "client_info",
+    "limit",
+    "gap",
+    "sort",
+    "backfill",
+    "filter",
+])
+
+
+class RoomSyncResult(collections.namedtuple("RoomSyncResult", [
+    "room_id",
+    "limited",
+    "published",
+    "events",
+    "state",
+    "prev_batch",
+    "ephemeral",
+])):
+    __slots__ = []
+
+    def __nonzero__(self):
+        """Make the result appear empty if there are no updates. This is used
+        to tell if room needs to be part of the sync result.
+        """
+        return bool(self.events or self.state or self.ephemeral)
+
+
+class SyncResult(collections.namedtuple("SyncResult", [
+    "next_batch",  # Token for the next sync
+    "private_user_data",  # List of private events for the user.
+    "public_user_data",  # List of public events for all users.
+    "rooms",  # RoomSyncResult for each room.
+])):
+    __slots__ = []
+
+    def __nonzero__(self):
+        """Make the result appear empty if there are no updates. This is used
+        to tell if the notifier needs to wait for more events when polling for
+        events.
+        """
+        return bool(
+            self.private_user_data or self.public_user_data or self.rooms
+        )
+
+
+class SyncHandler(BaseHandler):
+
+    def __init__(self, hs):
+        super(SyncHandler, self).__init__(hs)
+        self.event_sources = hs.get_event_sources()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0):
+        """Get the sync for a client if we have new data for it now. Otherwise
+        wait for new data to arrive on the server. If the timeout expires, then
+        return an empty sync result.
+        Returns:
+            A Deferred SyncResult.
+        """
+        if timeout == 0 or since_token is None:
+            result = yield self.current_sync_for_user(sync_config, since_token)
+            defer.returnValue(result)
+        else:
+            def current_sync_callback():
+                return self.current_sync_for_user(sync_config, since_token)
+
+            rm_handler = self.hs.get_handlers().room_member_handler
+            room_ids = yield rm_handler.get_rooms_for_user(sync_config.user)
+            result = yield self.notifier.wait_for_events(
+                sync_config.user, room_ids,
+                sync_config.filter, timeout, current_sync_callback
+            )
+            defer.returnValue(result)
+
+    def current_sync_for_user(self, sync_config, since_token=None):
+        """Get the sync for client needed to match what the server has now.
+        Returns:
+            A Deferred SyncResult.
+        """
+        if since_token is None:
+            return self.initial_sync(sync_config)
+        else:
+            if sync_config.gap:
+                return self.incremental_sync_with_gap(sync_config, since_token)
+            else:
+                # TODO(mjark): Handle gapless sync
+                raise NotImplementedError()
+
+    @defer.inlineCallbacks
+    def initial_sync(self, sync_config):
+        """Get a sync for a client which is starting without any state
+        Returns:
+            A Deferred SyncResult.
+        """
+        if sync_config.sort == "timeline,desc":
+            # TODO(mjark): Handle going through events in reverse order?.
+            # What does "most recent events" mean when applying the limits mean
+            # in this case?
+            raise NotImplementedError()
+
+        now_token = yield self.event_sources.get_current_token()
+
+        presence_stream = self.event_sources.sources["presence"]
+        # TODO (mjark): This looks wrong, shouldn't we be getting the presence
+        # UP to the present rather than after the present?
+        pagination_config = PaginationConfig(from_token=now_token)
+        presence, _ = yield presence_stream.get_pagination_rows(
+            user=sync_config.user,
+            pagination_config=pagination_config.get_source_config("presence"),
+            key=None
+        )
+        room_list = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id=sync_config.user.to_string(),
+            membership_list=[Membership.INVITE, Membership.JOIN]
+        )
+
+        # TODO (mjark): Does public mean "published"?
+        published_rooms = yield self.store.get_rooms(is_public=True)
+        published_room_ids = set(r["room_id"] for r in published_rooms)
+
+        rooms = []
+        for event in room_list:
+            room_sync = yield self.initial_sync_for_room(
+                event.room_id, sync_config, now_token, published_room_ids
+            )
+            rooms.append(room_sync)
+
+        defer.returnValue(SyncResult(
+            public_user_data=presence,
+            private_user_data=[],
+            rooms=rooms,
+            next_batch=now_token,
+        ))
+
+    @defer.inlineCallbacks
+    def initial_sync_for_room(self, room_id, sync_config, now_token,
+                              published_room_ids):
+        """Sync a room for a client which is starting without any state
+        Returns:
+            A Deferred RoomSyncResult.
+        """
+
+        recents, prev_batch_token, limited = yield self.load_filtered_recents(
+            room_id, sync_config, now_token,
+        )
+
+        current_state = yield self.state_handler.get_current_state(
+            room_id
+        )
+        current_state_events = current_state.values()
+
+        defer.returnValue(RoomSyncResult(
+            room_id=room_id,
+            published=room_id in published_room_ids,
+            events=recents,
+            prev_batch=prev_batch_token,
+            state=current_state_events,
+            limited=limited,
+            ephemeral=[],
+        ))
+
+    @defer.inlineCallbacks
+    def incremental_sync_with_gap(self, sync_config, since_token):
+        """ Get the incremental delta needed to bring the client up to
+        date with the server.
+        Returns:
+            A Deferred SyncResult.
+        """
+        if sync_config.sort == "timeline,desc":
+            # TODO(mjark): Handle going through events in reverse order?.
+            # What does "most recent events" mean when applying the limits mean
+            # in this case?
+            raise NotImplementedError()
+
+        now_token = yield self.event_sources.get_current_token()
+
+        presence_source = self.event_sources.sources["presence"]
+        presence, presence_key = yield presence_source.get_new_events_for_user(
+            user=sync_config.user,
+            from_key=since_token.presence_key,
+            limit=sync_config.limit,
+        )
+        now_token = now_token.copy_and_replace("presence_key", presence_key)
+
+        typing_source = self.event_sources.sources["typing"]
+        typing, typing_key = yield typing_source.get_new_events_for_user(
+            user=sync_config.user,
+            from_key=since_token.typing_key,
+            limit=sync_config.limit,
+        )
+        now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+        typing_by_room = {event["room_id"]: [event] for event in typing}
+        for event in typing:
+            event.pop("room_id")
+        logger.debug("Typing %r", typing_by_room)
+
+        rm_handler = self.hs.get_handlers().room_member_handler
+        room_ids = yield rm_handler.get_rooms_for_user(sync_config.user)
+
+        # TODO (mjark): Does public mean "published"?
+        published_rooms = yield self.store.get_rooms(is_public=True)
+        published_room_ids = set(r["room_id"] for r in published_rooms)
+
+        room_events, _ = yield self.store.get_room_events_stream(
+            sync_config.user.to_string(),
+            from_key=since_token.room_key,
+            to_key=now_token.room_key,
+            room_id=None,
+            limit=sync_config.limit + 1,
+        )
+
+        rooms = []
+        if len(room_events) <= sync_config.limit:
+            # There is no gap in any of the rooms. Therefore we can just
+            # partition the new events by room and return them.
+            events_by_room_id = {}
+            for event in room_events:
+                events_by_room_id.setdefault(event.room_id, []).append(event)
+
+            for room_id in room_ids:
+                recents = events_by_room_id.get(room_id, [])
+                state = [event for event in recents if event.is_state()]
+                if recents:
+                    prev_batch = now_token.copy_and_replace(
+                        "room_key", recents[0].internal_metadata.before
+                    )
+                else:
+                    prev_batch = now_token
+
+                state = yield self.check_joined_room(
+                    sync_config, room_id, state
+                )
+
+                room_sync = RoomSyncResult(
+                    room_id=room_id,
+                    published=room_id in published_room_ids,
+                    events=recents,
+                    prev_batch=prev_batch,
+                    state=state,
+                    limited=False,
+                    ephemeral=typing_by_room.get(room_id, [])
+                )
+                if room_sync:
+                    rooms.append(room_sync)
+        else:
+            for room_id in room_ids:
+                room_sync = yield self.incremental_sync_with_gap_for_room(
+                    room_id, sync_config, since_token, now_token,
+                    published_room_ids, typing_by_room
+                )
+                if room_sync:
+                    rooms.append(room_sync)
+
+        defer.returnValue(SyncResult(
+            public_user_data=presence,
+            private_user_data=[],
+            rooms=rooms,
+            next_batch=now_token,
+        ))
+
+    @defer.inlineCallbacks
+    def load_filtered_recents(self, room_id, sync_config, now_token,
+                              since_token=None):
+        limited = True
+        recents = []
+        filtering_factor = 2
+        load_limit = max(sync_config.limit * filtering_factor, 100)
+        max_repeat = 3  # Only try a few times per room, otherwise
+        room_key = now_token.room_key
+        end_key = room_key
+
+        while limited and len(recents) < sync_config.limit and max_repeat:
+            events, keys = yield self.store.get_recent_events_for_room(
+                room_id,
+                limit=load_limit + 1,
+                from_token=since_token.room_key if since_token else None,
+                end_token=end_key,
+            )
+            (room_key, _) = keys
+            end_key = "s" + room_key.split('-')[-1]
+            loaded_recents = sync_config.filter.filter_room_events(events)
+            loaded_recents.extend(recents)
+            recents = loaded_recents
+            if len(events) <= load_limit:
+                limited = False
+            max_repeat -= 1
+
+        if len(recents) > sync_config.limit:
+            recents = recents[-sync_config.limit:]
+            room_key = recents[0].internal_metadata.before
+
+        prev_batch_token = now_token.copy_and_replace(
+            "room_key", room_key
+        )
+
+        defer.returnValue((recents, prev_batch_token, limited))
+
+    @defer.inlineCallbacks
+    def incremental_sync_with_gap_for_room(self, room_id, sync_config,
+                                           since_token, now_token,
+                                           published_room_ids, typing_by_room):
+        """ Get the incremental delta needed to bring the client up to date for
+        the room. Gives the client the most recent events and the changes to
+        state.
+        Returns:
+            A Deferred RoomSyncResult
+        """
+
+        # TODO(mjark): Check for redactions we might have missed.
+
+        recents, prev_batch_token, limited = yield self.load_filtered_recents(
+            room_id, sync_config, now_token, since_token,
+        )
+
+        logging.debug("Recents %r", recents)
+
+        # TODO(mjark): This seems racy since this isn't being passed a
+        # token to indicate what point in the stream this is
+        current_state = yield self.state_handler.get_current_state(
+            room_id
+        )
+        current_state_events = current_state.values()
+
+        state_at_previous_sync = yield self.get_state_at_previous_sync(
+            room_id, since_token=since_token
+        )
+
+        state_events_delta = yield self.compute_state_delta(
+            since_token=since_token,
+            previous_state=state_at_previous_sync,
+            current_state=current_state_events,
+        )
+
+        state_events_delta = yield self.check_joined_room(
+            sync_config, room_id, state_events_delta
+        )
+
+        room_sync = RoomSyncResult(
+            room_id=room_id,
+            published=room_id in published_room_ids,
+            events=recents,
+            prev_batch=prev_batch_token,
+            state=state_events_delta,
+            limited=limited,
+            ephemeral=typing_by_room.get(room_id, [])
+        )
+
+        logging.debug("Room sync: %r", room_sync)
+
+        defer.returnValue(room_sync)
+
+    @defer.inlineCallbacks
+    def get_state_at_previous_sync(self, room_id, since_token):
+        """ Get the room state at the previous sync the client made.
+        Returns:
+            A Deferred list of Events.
+        """
+        last_events, token = yield self.store.get_recent_events_for_room(
+            room_id, end_token=since_token.room_key, limit=1,
+        )
+
+        if last_events:
+            last_event = last_events[0]
+            last_context = yield self.state_handler.compute_event_context(
+                last_event
+            )
+            if last_event.is_state():
+                state = [last_event] + last_context.current_state.values()
+            else:
+                state = last_context.current_state.values()
+        else:
+            state = ()
+        defer.returnValue(state)
+
+    def compute_state_delta(self, since_token, previous_state, current_state):
+        """ Works out the differnce in state between the current state and the
+        state the client got when it last performed a sync.
+        Returns:
+            A list of events.
+        """
+        # TODO(mjark) Check if the state events were received by the server
+        # after the previous sync, since we need to include those state
+        # updates even if they occured logically before the previous event.
+        # TODO(mjark) Check for new redactions in the state events.
+        previous_dict = {event.event_id: event for event in previous_state}
+        state_delta = []
+        for event in current_state:
+            if event.event_id not in previous_dict:
+                state_delta.append(event)
+        return state_delta
+
+    @defer.inlineCallbacks
+    def check_joined_room(self, sync_config, room_id, state_delta):
+        joined = False
+        for event in state_delta:
+            if (
+                event.type == EventTypes.Member
+                and event.state_key == sync_config.user.to_string()
+            ):
+                if event.content["membership"] == Membership.JOIN:
+                    joined = True
+
+        if joined:
+            res = yield self.state_handler.get_current_state(room_id)
+            state_delta = res.values()
+
+        defer.returnValue(state_delta)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index cd9638dd04..c2762f92c7 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from ._base import BaseHandler
 
 from synapse.api.errors import SynapseError, AuthError
+from synapse.types import UserID
 
 import logging
 
@@ -180,12 +181,12 @@ class TypingNotificationHandler(BaseHandler):
                 },
             ))
 
-        yield defer.DeferredList(deferreds, consumeErrors=False)
+        yield defer.DeferredList(deferreds, consumeErrors=True)
 
     @defer.inlineCallbacks
     def _recv_edu(self, origin, content):
         room_id = content["room_id"]
-        user = self.homeserver.parse_userid(content["user_id"])
+        user = UserID.from_string(content["user_id"])
 
         localusers = set()