summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-06-09 14:21:23 +0100
committerErik Johnston <erik@matrix.org>2016-06-09 14:21:23 +0100
commitba0406d10da32ebebf4185f01841f236371e0ae8 (patch)
tree70f492094b7fb962a8161bd2304c6846b3ac3f40 /synapse/handlers
parentMerge pull request #801 from ruma/readme-history-storage (diff)
parentChange CHANGELOG (diff)
downloadsynapse-ba0406d10da32ebebf4185f01841f236371e0ae8.tar.xz
Merge branch 'release-v0.16.0' of github.com:matrix-org/synapse v0.16.0
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py23
-rw-r--r--synapse/handlers/_base.py369
-rw-r--r--synapse/handlers/appservice.py15
-rw-r--r--synapse/handlers/auth.py142
-rw-r--r--synapse/handlers/directory.py3
-rw-r--r--synapse/handlers/events.py2
-rw-r--r--synapse/handlers/federation.py233
-rw-r--r--synapse/handlers/message.py277
-rw-r--r--synapse/handlers/presence.py214
-rw-r--r--synapse/handlers/profile.py43
-rw-r--r--synapse/handlers/receipts.py33
-rw-r--r--synapse/handlers/register.py63
-rw-r--r--synapse/handlers/room.py709
-rw-r--r--synapse/handlers/room_member.py677
-rw-r--r--synapse/handlers/search.py17
-rw-r--r--synapse/handlers/sync.py1216
-rw-r--r--synapse/handlers/typing.py143
17 files changed, 2234 insertions, 1945 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 66d2c01123..d28e07f0d9 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,23 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.appservice.scheduler import AppServiceScheduler
-from synapse.appservice.api import ApplicationServiceApi
 from .register import RegistrationHandler
 from .room import (
-    RoomCreationHandler, RoomMemberHandler, RoomListHandler, RoomContextHandler,
+    RoomCreationHandler, RoomContextHandler,
 )
+from .room_member import RoomMemberHandler
 from .message import MessageHandler
 from .events import EventStreamHandler, EventHandler
 from .federation import FederationHandler
 from .profile import ProfileHandler
-from .presence import PresenceHandler
 from .directory import DirectoryHandler
-from .typing import TypingNotificationHandler
 from .admin import AdminHandler
-from .appservice import ApplicationServicesHandler
-from .sync import SyncHandler
-from .auth import AuthHandler
 from .identity import IdentityHandler
 from .receipts import ReceiptsHandler
 from .search import SearchHandler
@@ -52,22 +46,9 @@ class Handlers(object):
         self.event_handler = EventHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.profile_handler = ProfileHandler(hs)
-        self.presence_handler = PresenceHandler(hs)
-        self.room_list_handler = RoomListHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
-        self.typing_notification_handler = TypingNotificationHandler(hs)
         self.admin_handler = AdminHandler(hs)
         self.receipts_handler = ReceiptsHandler(hs)
-        asapi = ApplicationServiceApi(hs)
-        self.appservice_handler = ApplicationServicesHandler(
-            hs, asapi, AppServiceScheduler(
-                clock=hs.get_clock(),
-                store=hs.get_datastore(),
-                as_api=asapi
-            )
-        )
-        self.sync_handler = SyncHandler(hs)
-        self.auth_handler = AuthHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
         self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 90eabb6eb7..c904c6c500 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -15,13 +15,10 @@
 
 from twisted.internet import defer
 
-from synapse.api.errors import LimitExceededError, SynapseError, AuthError
-from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.api.errors import LimitExceededError
 from synapse.api.constants import Membership, EventTypes
-from synapse.types import UserID, RoomAlias, Requester
-from synapse.push.action_generator import ActionGenerator
+from synapse.types import UserID, Requester
 
-from synapse.util.logcontext import PreserveLoggingContext
 
 import logging
 
@@ -29,20 +26,13 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-VISIBILITY_PRIORITY = (
-    "world_readable",
-    "shared",
-    "invited",
-    "joined",
-)
-
-
 class BaseHandler(object):
     """
     Common base class for the event handlers.
 
-    :type store: synapse.storage.events.StateStore
-    :type state_handler: synapse.state.StateHandler
+    Attributes:
+        store (synapse.storage.events.StateStore):
+        state_handler (synapse.state.StateHandler):
     """
 
     def __init__(self, hs):
@@ -55,137 +45,10 @@ class BaseHandler(object):
         self.clock = hs.get_clock()
         self.hs = hs
 
-        self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
 
         self.event_builder_factory = hs.get_event_builder_factory()
 
-    @defer.inlineCallbacks
-    def filter_events_for_clients(self, user_tuples, events, event_id_to_state):
-        """ Returns dict of user_id -> list of events that user is allowed to
-        see.
-
-        :param (str, bool) user_tuples: (user id, is_peeking) for each
-            user to be checked. is_peeking should be true if:
-              * the user is not currently a member of the room, and:
-              * the user has not been a member of the room since the given
-                events
-        """
-        forgotten = yield defer.gatherResults([
-            self.store.who_forgot_in_room(
-                room_id,
-            )
-            for room_id in frozenset(e.room_id for e in events)
-        ], consumeErrors=True)
-
-        # Set of membership event_ids that have been forgotten
-        event_id_forgotten = frozenset(
-            row["event_id"] for rows in forgotten for row in rows
-        )
-
-        def allowed(event, user_id, is_peeking):
-            state = event_id_to_state[event.event_id]
-
-            # get the room_visibility at the time of the event.
-            visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
-            if visibility_event:
-                visibility = visibility_event.content.get("history_visibility", "shared")
-            else:
-                visibility = "shared"
-
-            if visibility not in VISIBILITY_PRIORITY:
-                visibility = "shared"
-
-            # if it was world_readable, it's easy: everyone can read it
-            if visibility == "world_readable":
-                return True
-
-            # Always allow history visibility events on boundaries. This is done
-            # by setting the effective visibility to the least restrictive
-            # of the old vs new.
-            if event.type == EventTypes.RoomHistoryVisibility:
-                prev_content = event.unsigned.get("prev_content", {})
-                prev_visibility = prev_content.get("history_visibility", None)
-
-                if prev_visibility not in VISIBILITY_PRIORITY:
-                    prev_visibility = "shared"
-
-                new_priority = VISIBILITY_PRIORITY.index(visibility)
-                old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
-                if old_priority < new_priority:
-                    visibility = prev_visibility
-
-            # get the user's membership at the time of the event. (or rather,
-            # just *after* the event. Which means that people can see their
-            # own join events, but not (currently) their own leave events.)
-            membership_event = state.get((EventTypes.Member, user_id), None)
-            if membership_event:
-                if membership_event.event_id in event_id_forgotten:
-                    membership = None
-                else:
-                    membership = membership_event.membership
-            else:
-                membership = None
-
-            # if the user was a member of the room at the time of the event,
-            # they can see it.
-            if membership == Membership.JOIN:
-                return True
-
-            if visibility == "joined":
-                # we weren't a member at the time of the event, so we can't
-                # see this event.
-                return False
-
-            elif visibility == "invited":
-                # user can also see the event if they were *invited* at the time
-                # of the event.
-                return membership == Membership.INVITE
-
-            else:
-                # visibility is shared: user can also see the event if they have
-                # become a member since the event
-                #
-                # XXX: if the user has subsequently joined and then left again,
-                # ideally we would share history up to the point they left. But
-                # we don't know when they left.
-                return not is_peeking
-
-        defer.returnValue({
-            user_id: [
-                event
-                for event in events
-                if allowed(event, user_id, is_peeking)
-            ]
-            for user_id, is_peeking in user_tuples
-        })
-
-    @defer.inlineCallbacks
-    def _filter_events_for_client(self, user_id, events, is_peeking=False):
-        """
-        Check which events a user is allowed to see
-
-        :param str user_id: user id to be checked
-        :param [synapse.events.EventBase] events: list of events to be checked
-        :param bool is_peeking should be True if:
-              * the user is not currently a member of the room, and:
-              * the user has not been a member of the room since the given
-                events
-        :rtype [synapse.events.EventBase]
-        """
-        types = (
-            (EventTypes.RoomHistoryVisibility, ""),
-            (EventTypes.Member, user_id),
-        )
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=types
-        )
-        res = yield self.filter_events_for_clients(
-            [(user_id, is_peeking)], events, event_id_to_state
-        )
-        defer.returnValue(res.get(user_id, []))
-
     def ratelimit(self, requester):
         time_now = self.clock.time()
         allowed, time_allowed = self.ratelimiter.send_message(
@@ -198,95 +61,6 @@ class BaseHandler(object):
                 retry_after_ms=int(1000 * (time_allowed - time_now)),
             )
 
-    @defer.inlineCallbacks
-    def _create_new_client_event(self, builder):
-        latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
-            builder.room_id,
-        )
-
-        if latest_ret:
-            depth = max([d for _, _, d in latest_ret]) + 1
-        else:
-            depth = 1
-
-        prev_events = [
-            (event_id, prev_hashes)
-            for event_id, prev_hashes, _ in latest_ret
-        ]
-
-        builder.prev_events = prev_events
-        builder.depth = depth
-
-        state_handler = self.state_handler
-
-        context = yield state_handler.compute_event_context(builder)
-
-        # If we've received an invite over federation, there are no latest
-        # events in the room, because we don't know enough about the graph
-        # fragment we received to treat it like a graph, so the above returned
-        # no relevant events. It may have returned some events (if we have
-        # joined and left the room), but not useful ones, like the invite.
-        if (
-            not self.is_host_in_room(context.current_state) and
-            builder.type == EventTypes.Member
-        ):
-            prev_member_event = yield self.store.get_room_member(
-                builder.sender, builder.room_id
-            )
-
-            # The prev_member_event may already be in context.current_state,
-            # despite us not being present in the room; in particular, if
-            # inviting user, and all other local users, have already left.
-            #
-            # In that case, we have all the information we need, and we don't
-            # want to drop "context" - not least because we may need to handle
-            # the invite locally, which will require us to have the whole
-            # context (not just prev_member_event) to auth it.
-            #
-            context_event_ids = (
-                e.event_id for e in context.current_state.values()
-            )
-
-            if (
-                prev_member_event and
-                prev_member_event.event_id not in context_event_ids
-            ):
-                # The prev_member_event is missing from context, so it must
-                # have arrived over federation and is an outlier. We forcibly
-                # set our context to the invite we received over federation
-                builder.prev_events = (
-                    prev_member_event.event_id,
-                    prev_member_event.prev_events
-                )
-
-                context = yield state_handler.compute_event_context(
-                    builder,
-                    old_state=(prev_member_event,),
-                    outlier=True
-                )
-
-        if builder.is_state():
-            builder.prev_state = yield self.store.add_event_hashes(
-                context.prev_state_events
-            )
-
-        yield self.auth.add_auth_events(builder, context)
-
-        add_hashes_and_signatures(
-            builder, self.server_name, self.signing_key
-        )
-
-        event = builder.build()
-
-        logger.debug(
-            "Created event %s with current state: %s",
-            event.event_id, context.current_state,
-        )
-
-        defer.returnValue(
-            (event, context,)
-        )
-
     def is_host_in_room(self, current_state):
         room_members = [
             (state_key, event.membership)
@@ -301,144 +75,13 @@ class BaseHandler(object):
                 return True
         for (state_key, membership) in room_members:
             if (
-                UserID.from_string(state_key).domain == self.hs.hostname
+                self.hs.is_mine_id(state_key)
                 and membership == Membership.JOIN
             ):
                 return True
         return False
 
     @defer.inlineCallbacks
-    def handle_new_client_event(
-        self,
-        requester,
-        event,
-        context,
-        ratelimit=True,
-        extra_users=[]
-    ):
-        # We now need to go and hit out to wherever we need to hit out to.
-
-        if ratelimit:
-            self.ratelimit(requester)
-
-        self.auth.check(event, auth_events=context.current_state)
-
-        yield self.maybe_kick_guest_users(event, context.current_state.values())
-
-        if event.type == EventTypes.CanonicalAlias:
-            # Check the alias is acually valid (at this time at least)
-            room_alias_str = event.content.get("alias", None)
-            if room_alias_str:
-                room_alias = RoomAlias.from_string(room_alias_str)
-                directory_handler = self.hs.get_handlers().directory_handler
-                mapping = yield directory_handler.get_association(room_alias)
-
-                if mapping["room_id"] != event.room_id:
-                    raise SynapseError(
-                        400,
-                        "Room alias %s does not point to the room" % (
-                            room_alias_str,
-                        )
-                    )
-
-        federation_handler = self.hs.get_handlers().federation_handler
-
-        if event.type == EventTypes.Member:
-            if event.content["membership"] == Membership.INVITE:
-                def is_inviter_member_event(e):
-                    return (
-                        e.type == EventTypes.Member and
-                        e.sender == event.sender
-                    )
-
-                event.unsigned["invite_room_state"] = [
-                    {
-                        "type": e.type,
-                        "state_key": e.state_key,
-                        "content": e.content,
-                        "sender": e.sender,
-                    }
-                    for k, e in context.current_state.items()
-                    if e.type in self.hs.config.room_invite_state_types
-                    or is_inviter_member_event(e)
-                ]
-
-                invitee = UserID.from_string(event.state_key)
-                if not self.hs.is_mine(invitee):
-                    # TODO: Can we add signature from remote server in a nicer
-                    # way? If we have been invited by a remote server, we need
-                    # to get them to sign the event.
-
-                    returned_invite = yield federation_handler.send_invite(
-                        invitee.domain,
-                        event,
-                    )
-
-                    event.unsigned.pop("room_state", None)
-
-                    # TODO: Make sure the signatures actually are correct.
-                    event.signatures.update(
-                        returned_invite.signatures
-                    )
-
-        if event.type == EventTypes.Redaction:
-            if self.auth.check_redaction(event, auth_events=context.current_state):
-                original_event = yield self.store.get_event(
-                    event.redacts,
-                    check_redacted=False,
-                    get_prev_content=False,
-                    allow_rejected=False,
-                    allow_none=False
-                )
-                if event.user_id != original_event.user_id:
-                    raise AuthError(
-                        403,
-                        "You don't have permission to redact events"
-                    )
-
-        if event.type == EventTypes.Create and context.current_state:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
-
-        action_generator = ActionGenerator(self.hs)
-        yield action_generator.handle_push_actions_for_event(
-            event, context, self
-        )
-
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
-            event, context=context
-        )
-
-        destinations = set()
-        for k, s in context.current_state.items():
-            try:
-                if k[0] == EventTypes.Member:
-                    if s.content["membership"] == Membership.JOIN:
-                        destinations.add(
-                            UserID.from_string(s.state_key).domain
-                        )
-            except SynapseError:
-                logger.warn(
-                    "Failed to get destination from event %s", s.event_id
-                )
-
-        with PreserveLoggingContext():
-            # Don't block waiting on waking up all the listeners.
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=extra_users
-            )
-
-        # If invite, remove room_state from unsigned before sending.
-        event.unsigned.pop("invite_room_state", None)
-
-        federation_handler.handle_new_event(
-            event, destinations=destinations,
-        )
-
-    @defer.inlineCallbacks
     def maybe_kick_guest_users(self, event, current_state):
         # Technically this function invalidates current_state by changing it.
         # Hopefully this isn't that important to the caller.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 75fc74c797..051ccdb380 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.appservice import ApplicationService
-from synapse.types import UserID
 
 import logging
 
@@ -35,16 +34,13 @@ def log_failure(failure):
     )
 
 
-# NB: Purposefully not inheriting BaseHandler since that contains way too much
-# setup code which this handler does not need or use. This makes testing a lot
-# easier.
 class ApplicationServicesHandler(object):
 
-    def __init__(self, hs, appservice_api, appservice_scheduler):
+    def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.hs = hs
-        self.appservice_api = appservice_api
-        self.scheduler = appservice_scheduler
+        self.is_mine_id = hs.is_mine_id
+        self.appservice_api = hs.get_application_service_api()
+        self.scheduler = hs.get_application_service_scheduler()
         self.started_scheduler = False
 
     @defer.inlineCallbacks
@@ -169,8 +165,7 @@ class ApplicationServicesHandler(object):
 
     @defer.inlineCallbacks
     def _is_unknown_user(self, user_id):
-        user = UserID.from_string(user_id)
-        if not self.hs.is_mine(user):
+        if not self.is_mine_id(user_id):
             # we don't know if they are unknown or not since it isn't one of our
             # users. We can't poke ASes.
             defer.returnValue(False)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 82d458b424..200793b5ed 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 from ._base import BaseHandler
 from synapse.api.constants import LoginType
 from synapse.types import UserID
-from synapse.api.errors import AuthError, LoginError, Codes
+from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
 from synapse.util.async import run_on_reactor
 
 from twisted.web.client import PartialDownloadError
@@ -49,6 +49,21 @@ class AuthHandler(BaseHandler):
         self.sessions = {}
         self.INVALID_TOKEN_HTTP_STATUS = 401
 
+        self.ldap_enabled = hs.config.ldap_enabled
+        self.ldap_server = hs.config.ldap_server
+        self.ldap_port = hs.config.ldap_port
+        self.ldap_tls = hs.config.ldap_tls
+        self.ldap_search_base = hs.config.ldap_search_base
+        self.ldap_search_property = hs.config.ldap_search_property
+        self.ldap_email_property = hs.config.ldap_email_property
+        self.ldap_full_name_property = hs.config.ldap_full_name_property
+
+        if self.ldap_enabled is True:
+            import ldap
+            logger.info("Import ldap version: %s", ldap.__version__)
+
+        self.hs = hs  # FIXME better possibility to access registrationHandler later?
+
     @defer.inlineCallbacks
     def check_auth(self, flows, clientdict, clientip):
         """
@@ -163,9 +178,13 @@ class AuthHandler(BaseHandler):
     def get_session_id(self, clientdict):
         """
         Gets the session ID for a client given the client dictionary
-        :param clientdict: The dictionary sent by the client in the request
-        :return: The string session ID the client sent. If the client did not
-                 send a session ID, returns None.
+
+        Args:
+            clientdict: The dictionary sent by the client in the request
+
+        Returns:
+            str|None: The string session ID the client sent. If the client did
+                not send a session ID, returns None.
         """
         sid = None
         if clientdict and 'auth' in clientdict:
@@ -179,9 +198,11 @@ class AuthHandler(BaseHandler):
         Store a key-value pair into the sessions data associated with this
         request. This data is stored server-side and cannot be modified by
         the client.
-        :param session_id: (string) The ID of this session as returned from check_auth
-        :param key: (string) The key to store the data under
-        :param value: (any) The data to store
+
+        Args:
+            session_id (string): The ID of this session as returned from check_auth
+            key (string): The key to store the data under
+            value (any): The data to store
         """
         sess = self._get_session_info(session_id)
         sess.setdefault('serverdict', {})[key] = value
@@ -190,9 +211,11 @@ class AuthHandler(BaseHandler):
     def get_session_data(self, session_id, key, default=None):
         """
         Retrieve data stored with set_session_data
-        :param session_id: (string) The ID of this session as returned from check_auth
-        :param key: (string) The key to store the data under
-        :param default: (any) Value to return if the key has not been set
+
+        Args:
+            session_id (string): The ID of this session as returned from check_auth
+            key (string): The key to store the data under
+            default (any): Value to return if the key has not been set
         """
         sess = self._get_session_info(session_id)
         return sess.setdefault('serverdict', {}).get(key, default)
@@ -207,8 +230,10 @@ class AuthHandler(BaseHandler):
         if not user_id.startswith('@'):
             user_id = UserID.create(user_id, self.hs.hostname).to_string()
 
-        user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
-        self._check_password(user_id, password, password_hash)
+        if not (yield self._check_password(user_id, password)):
+            logger.warn("Failed password login for user %s", user_id)
+            raise LoginError(403, "", errcode=Codes.FORBIDDEN)
+
         defer.returnValue(user_id)
 
     @defer.inlineCallbacks
@@ -332,8 +357,10 @@ class AuthHandler(BaseHandler):
             StoreError if there was a problem storing the token.
             LoginError if there was an authentication problem.
         """
-        user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
-        self._check_password(user_id, password, password_hash)
+
+        if not (yield self._check_password(user_id, password)):
+            logger.warn("Failed password login for user %s", user_id)
+            raise LoginError(403, "", errcode=Codes.FORBIDDEN)
 
         logger.info("Logging in user %s", user_id)
         access_token = yield self.issue_access_token(user_id)
@@ -399,11 +426,67 @@ class AuthHandler(BaseHandler):
         else:
             defer.returnValue(user_infos.popitem())
 
-    def _check_password(self, user_id, password, stored_hash):
-        """Checks that user_id has passed password, raises LoginError if not."""
-        if not self.validate_hash(password, stored_hash):
-            logger.warn("Failed password login for user %s", user_id)
-            raise LoginError(403, "", errcode=Codes.FORBIDDEN)
+    @defer.inlineCallbacks
+    def _check_password(self, user_id, password):
+        """
+        Returns:
+            True if the user_id successfully authenticated
+        """
+        valid_ldap = yield self._check_ldap_password(user_id, password)
+        if valid_ldap:
+            defer.returnValue(True)
+
+        valid_local_password = yield self._check_local_password(user_id, password)
+        if valid_local_password:
+            defer.returnValue(True)
+
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def _check_local_password(self, user_id, password):
+        try:
+            user_id, password_hash = yield self._find_user_id_and_pwd_hash(user_id)
+            defer.returnValue(self.validate_hash(password, password_hash))
+        except LoginError:
+            defer.returnValue(False)
+
+    @defer.inlineCallbacks
+    def _check_ldap_password(self, user_id, password):
+        if not self.ldap_enabled:
+            logger.debug("LDAP not configured")
+            defer.returnValue(False)
+
+        import ldap
+
+        logger.info("Authenticating %s with LDAP" % user_id)
+        try:
+            ldap_url = "%s:%s" % (self.ldap_server, self.ldap_port)
+            logger.debug("Connecting LDAP server at %s" % ldap_url)
+            l = ldap.initialize(ldap_url)
+            if self.ldap_tls:
+                logger.debug("Initiating TLS")
+                self._connection.start_tls_s()
+
+            local_name = UserID.from_string(user_id).localpart
+
+            dn = "%s=%s, %s" % (
+                self.ldap_search_property,
+                local_name,
+                self.ldap_search_base)
+            logger.debug("DN for LDAP authentication: %s" % dn)
+
+            l.simple_bind_s(dn.encode('utf-8'), password.encode('utf-8'))
+
+            if not (yield self.does_user_exist(user_id)):
+                handler = self.hs.get_handlers().registration_handler
+                user_id, access_token = (
+                    yield handler.register(localpart=local_name)
+                )
+
+            defer.returnValue(True)
+        except ldap.LDAPError, e:
+            logger.warn("LDAP error: %s", e)
+            defer.returnValue(False)
 
     @defer.inlineCallbacks
     def issue_access_token(self, user_id):
@@ -438,14 +521,19 @@ class AuthHandler(BaseHandler):
         ))
         return m.serialize()
 
-    def generate_short_term_login_token(self, user_id):
+    def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
         macaroon = self._generate_base_macaroon(user_id)
         macaroon.add_first_party_caveat("type = login")
         now = self.hs.get_clock().time_msec()
-        expiry = now + (2 * 60 * 1000)
+        expiry = now + duration_in_ms
         macaroon.add_first_party_caveat("time < %d" % (expiry,))
         return macaroon.serialize()
 
+    def generate_delete_pusher_token(self, user_id):
+        macaroon = self._generate_base_macaroon(user_id)
+        macaroon.add_first_party_caveat("type = delete_pusher")
+        return macaroon.serialize()
+
     def validate_short_term_login_token_and_get_user_id(self, login_token):
         try:
             macaroon = pymacaroons.Macaroon.deserialize(login_token)
@@ -480,7 +568,12 @@ class AuthHandler(BaseHandler):
 
         except_access_token_ids = [requester.access_token_id] if requester else []
 
-        yield self.store.user_set_password_hash(user_id, password_hash)
+        try:
+            yield self.store.user_set_password_hash(user_id, password_hash)
+        except StoreError as e:
+            if e.code == 404:
+                raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+            raise e
         yield self.store.user_delete_access_tokens(
             user_id, except_access_token_ids
         )
@@ -532,4 +625,7 @@ class AuthHandler(BaseHandler):
         Returns:
             Whether self.hash(password) == stored_hash (bool).
         """
-        return bcrypt.hashpw(password, stored_hash) == stored_hash
+        if stored_hash:
+            return bcrypt.hashpw(password, stored_hash) == stored_hash
+        else:
+            return False
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8eeb225811..4bea7f2b19 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler):
         super(DirectoryHandler, self).__init__(hs)
 
         self.state = hs.get_state_handler()
+        self.appservice_handler = hs.get_application_service_handler()
 
         self.federation = hs.get_replication_layer()
         self.federation.register_query_handler(
@@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler):
         )
         if not result:
             # Query AS to see if it exists
-            as_handler = self.hs.get_handlers().appservice_handler
+            as_handler = self.appservice_handler
             result = yield as_handler.query_room_alias_exists(room_alias)
         defer.returnValue(result)
 
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f25a252523..3a3a1257d3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -58,7 +58,7 @@ class EventStreamHandler(BaseHandler):
         If `only_keys` is not None, events from keys will be sent down.
         """
         auth_user = UserID.from_string(auth_user_id)
-        presence_handler = self.hs.get_handlers().presence_handler
+        presence_handler = self.hs.get_presence_handler()
 
         context = yield presence_handler.user_syncing(
             auth_user_id, affect_presence=affect_presence,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 267fedf114..ff83c608e7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -26,20 +26,21 @@ from synapse.api.errors import (
 from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.async import run_on_reactor
 from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
     compute_event_signature, add_hashes_and_signatures,
 )
-from synapse.types import UserID
+from synapse.types import UserID, get_domain_from_id
 
 from synapse.events.utils import prune_event
 
 from synapse.util.retryutils import NotRetryingDestination
 
 from synapse.push.action_generator import ActionGenerator
+from synapse.util.distributor import user_joined_room
 
 from twisted.internet import defer
 
@@ -49,10 +50,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def user_joined_room(distributor, user, room_id):
-    return distributor.fire("user_joined_room", user, room_id)
-
-
 class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
@@ -69,10 +66,6 @@ class FederationHandler(BaseHandler):
 
         self.hs = hs
 
-        self.distributor.observe("user_joined_room", self.user_joined_room)
-
-        self.waiting_for_join_list = {}
-
         self.store = hs.get_datastore()
         self.replication_layer = hs.get_replication_layer()
         self.state_handler = hs.get_state_handler()
@@ -102,8 +95,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, origin, pdu, state=None,
-                       auth_chain=None):
+    def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
         """ Called by the ReplicationLayer when we have a new pdu. We need to
         do auth checks and put it through the StateHandler.
         """
@@ -174,11 +166,7 @@ class FederationHandler(BaseHandler):
                     })
                     seen_ids.add(e.event_id)
 
-                yield self._handle_new_events(
-                    origin,
-                    event_infos,
-                    outliers=True
-                )
+                yield self._handle_new_events(origin, event_infos)
 
             try:
                 context, event_stream_id, max_stream_id = yield self._handle_new_event(
@@ -288,7 +276,14 @@ class FederationHandler(BaseHandler):
     @defer.inlineCallbacks
     def backfill(self, dest, room_id, limit, extremities=[]):
         """ Trigger a backfill request to `dest` for the given `room_id`
+
+        This will attempt to get more events from the remote. This may return
+        be successfull and still return no events if the other side has no new
+        events to offer.
         """
+        if dest == self.server_name:
+            raise SynapseError(400, "Can't backfill from self.")
+
         if not extremities:
             extremities = yield self.store.get_oldest_events_in_room(room_id)
 
@@ -299,6 +294,16 @@ class FederationHandler(BaseHandler):
             extremities=extremities,
         )
 
+        # Don't bother processing events we already have.
+        seen_events = yield self.store.have_events_in_timeline(
+            set(e.event_id for e in events)
+        )
+
+        events = [e for e in events if e.event_id not in seen_events]
+
+        if not events:
+            defer.returnValue([])
+
         event_map = {e.event_id: e for e in events}
 
         event_ids = set(e.event_id for e in events)
@@ -358,6 +363,7 @@ class FederationHandler(BaseHandler):
         for a in auth_events.values():
             if a.event_id in seen_events:
                 continue
+            a.internal_metadata.outlier = True
             ev_infos.append({
                 "event": a,
                 "auth_events": {
@@ -378,20 +384,23 @@ class FederationHandler(BaseHandler):
                 }
             })
 
+        yield self._handle_new_events(
+            dest, ev_infos,
+            backfilled=True,
+        )
+
         events.sort(key=lambda e: e.depth)
 
         for event in events:
             if event in events_to_state:
                 continue
 
-            ev_infos.append({
-                "event": event,
-            })
-
-        yield self._handle_new_events(
-            dest, ev_infos,
-            backfilled=True,
-        )
+            # We store these one at a time since each event depends on the
+            # previous to work out the state.
+            # TODO: We can probably do something more clever here.
+            yield self._handle_new_event(
+                dest, event
+            )
 
         defer.returnValue(events)
 
@@ -440,7 +449,7 @@ class FederationHandler(BaseHandler):
             joined_domains = {}
             for u, d in joined_users:
                 try:
-                    dom = UserID.from_string(u).domain
+                    dom = get_domain_from_id(u)
                     old_d = joined_domains.get(dom)
                     if old_d:
                         joined_domains[dom] = min(d, old_d)
@@ -455,7 +464,7 @@ class FederationHandler(BaseHandler):
 
         likely_domains = [
             domain for domain, depth in curr_domains
-            if domain is not self.server_name
+            if domain != self.server_name
         ]
 
         @defer.inlineCallbacks
@@ -463,11 +472,15 @@ class FederationHandler(BaseHandler):
             # TODO: Should we try multiple of these at a time?
             for dom in domains:
                 try:
-                    events = yield self.backfill(
+                    yield self.backfill(
                         dom, room_id,
                         limit=100,
                         extremities=[e for e in extremities.keys()]
                     )
+                    # If this succeeded then we probably already have the
+                    # appropriate stuff.
+                    # TODO: We can probably do something more intelligent here.
+                    defer.returnValue(True)
                 except SynapseError as e:
                     logger.info(
                         "Failed to backfill from %s because %s",
@@ -493,8 +506,6 @@ class FederationHandler(BaseHandler):
                     )
                     continue
 
-                if events:
-                    defer.returnValue(True)
             defer.returnValue(False)
 
         success = yield try_backfill(likely_domains)
@@ -666,9 +677,14 @@ class FederationHandler(BaseHandler):
             "state_key": user_id,
         })
 
-        event, context = yield self._create_new_client_event(
-            builder=builder,
-        )
+        try:
+            message_handler = self.hs.get_handlers().message_handler
+            event, context = yield message_handler._create_new_client_event(
+                builder=builder,
+            )
+        except AuthError as e:
+            logger.warn("Failed to create join %r because %s", event, e)
+            raise e
 
         self.auth.check(event, auth_events=context.current_state)
 
@@ -724,9 +740,7 @@ class FederationHandler(BaseHandler):
             try:
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
-                        destinations.add(
-                            UserID.from_string(s.state_key).domain
-                        )
+                        destinations.add(get_domain_from_id(s.state_key))
             except:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
@@ -761,6 +775,7 @@ class FederationHandler(BaseHandler):
         event = pdu
 
         event.internal_metadata.outlier = True
+        event.internal_metadata.invite_from_remote = True
 
         event.signatures.update(
             compute_event_signature(
@@ -788,13 +803,19 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def do_remotely_reject_invite(self, target_hosts, room_id, user_id):
-        origin, event = yield self._make_and_verify_event(
-            target_hosts,
-            room_id,
-            user_id,
-            "leave"
-        )
-        signed_event = self._sign_event(event)
+        try:
+            origin, event = yield self._make_and_verify_event(
+                target_hosts,
+                room_id,
+                user_id,
+                "leave"
+            )
+            signed_event = self._sign_event(event)
+        except SynapseError:
+            raise
+        except CodeMessageException as e:
+            logger.warn("Failed to reject invite: %s", e)
+            raise SynapseError(500, "Failed to reject invite")
 
         # Try the host we successfully got a response to /make_join/
         # request first.
@@ -804,10 +825,16 @@ class FederationHandler(BaseHandler):
         except ValueError:
             pass
 
-        yield self.replication_layer.send_leave(
-            target_hosts,
-            signed_event
-        )
+        try:
+            yield self.replication_layer.send_leave(
+                target_hosts,
+                signed_event
+            )
+        except SynapseError:
+            raise
+        except CodeMessageException as e:
+            logger.warn("Failed to reject invite: %s", e)
+            raise SynapseError(500, "Failed to reject invite")
 
         context = yield self.state_handler.compute_event_context(event)
 
@@ -883,11 +910,16 @@ class FederationHandler(BaseHandler):
             "state_key": user_id,
         })
 
-        event, context = yield self._create_new_client_event(
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(
             builder=builder,
         )
 
-        self.auth.check(event, auth_events=context.current_state)
+        try:
+            self.auth.check(event, auth_events=context.current_state)
+        except AuthError as e:
+            logger.warn("Failed to create new leave %r because %s", event, e)
+            raise e
 
         defer.returnValue(event)
 
@@ -934,9 +966,7 @@ class FederationHandler(BaseHandler):
             try:
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.LEAVE:
-                        destinations.add(
-                            UserID.from_string(s.state_key).domain
-                        )
+                        destinations.add(get_domain_from_id(s.state_key))
             except:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
@@ -1057,21 +1087,10 @@ class FederationHandler(BaseHandler):
     def get_min_depth_for_context(self, context):
         return self.store.get_min_depth(context)
 
-    @log_function
-    def user_joined_room(self, user, room_id):
-        waiters = self.waiting_for_join_list.get(
-            (user.to_string(), room_id),
-            []
-        )
-        while waiters:
-            waiters.pop().callback(None)
-
     @defer.inlineCallbacks
     @log_function
-    def _handle_new_event(self, origin, event, state=None, auth_events=None):
-
-        outlier = event.internal_metadata.is_outlier()
-
+    def _handle_new_event(self, origin, event, state=None, auth_events=None,
+                          backfilled=False):
         context = yield self._prep_event(
             origin, event,
             state=state,
@@ -1081,20 +1100,30 @@ class FederationHandler(BaseHandler):
         if not event.internal_metadata.is_outlier():
             action_generator = ActionGenerator(self.hs)
             yield action_generator.handle_push_actions_for_event(
-                event, context, self
+                event, context
             )
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
-            is_new_state=not outlier,
+            backfilled=backfilled,
+        )
+
+        # this intentionally does not yield: we don't care about the result
+        # and don't need to wait for it.
+        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+            event_stream_id, max_stream_id
         )
 
         defer.returnValue((context, event_stream_id, max_stream_id))
 
     @defer.inlineCallbacks
-    def _handle_new_events(self, origin, event_infos, backfilled=False,
-                           outliers=False):
+    def _handle_new_events(self, origin, event_infos, backfilled=False):
+        """Creates the appropriate contexts and persists events. The events
+        should not depend on one another, e.g. this should be used to persist
+        a bunch of outliers, but not a chunk of individual events that depend
+        on each other for state calculations.
+        """
         contexts = yield defer.gatherResults(
             [
                 self._prep_event(
@@ -1113,7 +1142,6 @@ class FederationHandler(BaseHandler):
                 for ev_info, context in itertools.izip(event_infos, contexts)
             ],
             backfilled=backfilled,
-            is_new_state=(not outliers and not backfilled),
         )
 
     @defer.inlineCallbacks
@@ -1128,11 +1156,9 @@ class FederationHandler(BaseHandler):
         """
         events_to_context = {}
         for e in itertools.chain(auth_events, state):
-            ctx = yield self.state_handler.compute_event_context(
-                e, outlier=True,
-            )
-            events_to_context[e.event_id] = ctx
             e.internal_metadata.outlier = True
+            ctx = yield self.state_handler.compute_event_context(e)
+            events_to_context[e.event_id] = ctx
 
         event_map = {
             e.event_id: e
@@ -1176,16 +1202,14 @@ class FederationHandler(BaseHandler):
                 (e, events_to_context[e.event_id])
                 for e in itertools.chain(auth_events, state)
             ],
-            is_new_state=False,
         )
 
         new_event_context = yield self.state_handler.compute_event_context(
-            event, old_state=state, outlier=False,
+            event, old_state=state
         )
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event, new_event_context,
-            is_new_state=True,
             current_state=state,
         )
 
@@ -1193,10 +1217,9 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _prep_event(self, origin, event, state=None, auth_events=None):
-        outlier = event.internal_metadata.is_outlier()
 
         context = yield self.state_handler.compute_event_context(
-            event, old_state=state, outlier=outlier,
+            event, old_state=state,
         )
 
         if not auth_events:
@@ -1482,8 +1505,9 @@ class FederationHandler(BaseHandler):
 
         try:
             self.auth.check(event, auth_events=auth_events)
-        except AuthError:
-            raise
+        except AuthError as e:
+            logger.warn("Failed auth resolution for %r because %s", event, e)
+            raise e
 
     @defer.inlineCallbacks
     def construct_auth_difference(self, local_auth, remote_auth):
@@ -1653,13 +1677,21 @@ class FederationHandler(BaseHandler):
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
             builder = self.event_builder_factory.new(event_dict)
             EventValidator().validate_new(builder)
-            event, context = yield self._create_new_client_event(builder=builder)
+            message_handler = self.hs.get_handlers().message_handler
+            event, context = yield message_handler._create_new_client_event(
+                builder=builder
+            )
 
             event, context = yield self.add_display_name_to_third_party_invite(
                 event_dict, event, context
             )
 
-            self.auth.check(event, context.current_state)
+            try:
+                self.auth.check(event, context.current_state)
+            except AuthError as e:
+                logger.warn("Denying new third party invite %r because %s", event, e)
+                raise e
+
             yield self._check_signature(event, auth_events=context.current_state)
             member_handler = self.hs.get_handlers().room_member_handler
             yield member_handler.send_membership_event(None, event, context)
@@ -1676,7 +1708,8 @@ class FederationHandler(BaseHandler):
     def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
         builder = self.event_builder_factory.new(event_dict)
 
-        event, context = yield self._create_new_client_event(
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(
             builder=builder,
         )
 
@@ -1684,7 +1717,11 @@ class FederationHandler(BaseHandler):
             event_dict, event, context
         )
 
-        self.auth.check(event, auth_events=context.current_state)
+        try:
+            self.auth.check(event, auth_events=context.current_state)
+        except AuthError as e:
+            logger.warn("Denying third party invite %r because %s", event, e)
+            raise e
         yield self._check_signature(event, auth_events=context.current_state)
 
         returned_invite = yield self.send_invite(origin, event)
@@ -1711,20 +1748,23 @@ class FederationHandler(BaseHandler):
         event_dict["content"]["third_party_invite"]["display_name"] = display_name
         builder = self.event_builder_factory.new(event_dict)
         EventValidator().validate_new(builder)
-        event, context = yield self._create_new_client_event(builder=builder)
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(builder=builder)
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
     def _check_signature(self, event, auth_events):
         """
         Checks that the signature in the event is consistent with its invite.
-        :param event (Event): The m.room.member event to check
-        :param auth_events (dict<(event type, state_key), event>)
 
-        :raises
-            AuthError if signature didn't match any keys, or key has been
+        Args:
+            event (Event): The m.room.member event to check
+            auth_events (dict<(event type, state_key), event>):
+
+        Raises:
+            AuthError: if signature didn't match any keys, or key has been
                 revoked,
-            SynapseError if a transient error meant a key couldn't be checked
+            SynapseError: if a transient error meant a key couldn't be checked
                 for revocation.
         """
         signed = event.content["third_party_invite"]["signed"]
@@ -1766,12 +1806,13 @@ class FederationHandler(BaseHandler):
         """
         Checks whether public_key has been revoked.
 
-        :param public_key (str): base-64 encoded public key.
-        :param url (str): Key revocation URL.
+        Args:
+            public_key (str): base-64 encoded public key.
+            url (str): Key revocation URL.
 
-        :raises
-            AuthError if they key has been revoked.
-            SynapseError if a transient error meant a key couldn't be checked
+        Raises:
+            AuthError: if they key has been revoked.
+            SynapseError: if a transient error meant a key couldn't be checked
                 for revocation.
         """
         try:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 5c50c611ba..15caf1950a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,12 +17,19 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.streams.config import PaginationConfig
+from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
+from synapse.push.action_generator import ActionGenerator
+from synapse.streams.config import PaginationConfig
+from synapse.types import (
+    UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
+)
 from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute, run_on_reactor
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.types import UserID, RoomStreamToken, StreamToken
+from synapse.util.logcontext import preserve_fn
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -33,10 +40,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def collect_presencelike_data(distributor, user, content):
-    return distributor.fire("collect_presencelike_data", user, content)
-
-
 class MessageHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -48,35 +51,6 @@ class MessageHandler(BaseHandler):
         self.snapshot_cache = SnapshotCache()
 
     @defer.inlineCallbacks
-    def get_message(self, msg_id=None, room_id=None, sender_id=None,
-                    user_id=None):
-        """ Retrieve a message.
-
-        Args:
-            msg_id (str): The message ID to obtain.
-            room_id (str): The room where the message resides.
-            sender_id (str): The user ID of the user who sent the message.
-            user_id (str): The user ID of the user making this request.
-        Returns:
-            The message, or None if no message exists.
-        Raises:
-            SynapseError if something went wrong.
-        """
-        yield self.auth.check_joined_room(room_id, user_id)
-
-        # Pull out the message from the db
-#        msg = yield self.store.get_message(
-#            room_id=room_id,
-#            msg_id=msg_id,
-#            user_id=sender_id
-#        )
-
-        # TODO (erikj): Once we work out the correct c-s api we need to think
-        # on how to do this.
-
-        defer.returnValue(None)
-
-    @defer.inlineCallbacks
     def get_messages(self, requester, room_id=None, pagin_config=None,
                      as_client_event=True):
         """Get messages in a room.
@@ -155,7 +129,8 @@ class MessageHandler(BaseHandler):
                 "end": next_token.to_string(),
             })
 
-        events = yield self._filter_events_for_client(
+        events = yield filter_events_for_client(
+            self.store,
             user_id,
             events,
             is_peeking=(member_event_id is None),
@@ -175,7 +150,7 @@ class MessageHandler(BaseHandler):
         defer.returnValue(chunk)
 
     @defer.inlineCallbacks
-    def create_event(self, event_dict, token_id=None, txn_id=None):
+    def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None):
         """
         Given a dict from a client, create a new event.
 
@@ -186,6 +161,9 @@ class MessageHandler(BaseHandler):
 
         Args:
             event_dict (dict): An entire event
+            token_id (str)
+            txn_id (str)
+            prev_event_ids (list): The prev event ids to use when creating the event
 
         Returns:
             Tuple of created event (FrozenEvent), Context
@@ -198,12 +176,8 @@ class MessageHandler(BaseHandler):
             membership = builder.content.get("membership", None)
             target = UserID.from_string(builder.state_key)
 
-            if membership == Membership.JOIN:
+            if membership in {Membership.JOIN, Membership.INVITE}:
                 # If event doesn't include a display name, add one.
-                yield collect_presencelike_data(
-                    self.distributor, target, builder.content
-                )
-            elif membership == Membership.INVITE:
                 profile = self.hs.get_handlers().profile_handler
                 content = builder.content
 
@@ -224,6 +198,7 @@ class MessageHandler(BaseHandler):
 
         event, context = yield self._create_new_client_event(
             builder=builder,
+            prev_event_ids=prev_event_ids,
         )
         defer.returnValue((event, context))
 
@@ -261,7 +236,7 @@ class MessageHandler(BaseHandler):
         )
 
         if event.type == EventTypes.Message:
-            presence = self.hs.get_handlers().presence_handler
+            presence = self.hs.get_presence_handler()
             yield presence.bump_presence_active_time(user)
 
     def deduplicate_state_event(self, event, context):
@@ -515,8 +490,8 @@ class MessageHandler(BaseHandler):
                     ]
                 ).addErrback(unwrapFirstError)
 
-                messages = yield self._filter_events_for_client(
-                    user_id, messages
+                messages = yield filter_events_for_client(
+                    self.store, user_id, messages
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -556,14 +531,7 @@ class MessageHandler(BaseHandler):
             except:
                 logger.exception("Failed to get snapshot")
 
-        # Only do N rooms at once
-        n = 5
-        d_list = [handle_room(e) for e in room_list]
-        for i in range(0, len(d_list), n):
-            yield defer.gatherResults(
-                d_list[i:i + n],
-                consumeErrors=True
-            ).addErrback(unwrapFirstError)
+        yield concurrently_execute(handle_room, room_list, 10)
 
         account_data_events = []
         for account_data_type, content in account_data.items():
@@ -658,8 +626,8 @@ class MessageHandler(BaseHandler):
             end_token=stream_token
         )
 
-        messages = yield self._filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking
         )
 
         start_token = StreamToken.START.copy_and_replace("room_key", token[0])
@@ -706,7 +674,7 @@ class MessageHandler(BaseHandler):
             and m.content["membership"] == Membership.JOIN
         ]
 
-        presence_handler = self.hs.get_handlers().presence_handler
+        presence_handler = self.hs.get_presence_handler()
 
         @defer.inlineCallbacks
         def get_presence():
@@ -739,8 +707,8 @@ class MessageHandler(BaseHandler):
             consumeErrors=True,
         ).addErrback(unwrapFirstError)
 
-        messages = yield self._filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking,
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking,
         )
 
         start_token = now_token.copy_and_replace("room_key", token[0])
@@ -763,3 +731,196 @@ class MessageHandler(BaseHandler):
             ret["membership"] = membership
 
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def _create_new_client_event(self, builder, prev_event_ids=None):
+        if prev_event_ids:
+            prev_events = yield self.store.add_event_hashes(prev_event_ids)
+            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
+            depth = prev_max_depth + 1
+        else:
+            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
+                builder.room_id,
+            )
+
+            if latest_ret:
+                depth = max([d for _, _, d in latest_ret]) + 1
+            else:
+                depth = 1
+
+            prev_events = [
+                (event_id, prev_hashes)
+                for event_id, prev_hashes, _ in latest_ret
+            ]
+
+        builder.prev_events = prev_events
+        builder.depth = depth
+
+        state_handler = self.state_handler
+
+        context = yield state_handler.compute_event_context(builder)
+
+        if builder.is_state():
+            builder.prev_state = yield self.store.add_event_hashes(
+                context.prev_state_events
+            )
+
+        yield self.auth.add_auth_events(builder, context)
+
+        signing_key = self.hs.config.signing_key[0]
+        add_hashes_and_signatures(
+            builder, self.server_name, signing_key
+        )
+
+        event = builder.build()
+
+        logger.debug(
+            "Created event %s with current state: %s",
+            event.event_id, context.current_state,
+        )
+
+        defer.returnValue(
+            (event, context,)
+        )
+
+    @defer.inlineCallbacks
+    def handle_new_client_event(
+        self,
+        requester,
+        event,
+        context,
+        ratelimit=True,
+        extra_users=[]
+    ):
+        # We now need to go and hit out to wherever we need to hit out to.
+
+        if ratelimit:
+            self.ratelimit(requester)
+
+        try:
+            self.auth.check(event, auth_events=context.current_state)
+        except AuthError as err:
+            logger.warn("Denying new event %r because %s", event, err)
+            raise err
+
+        yield self.maybe_kick_guest_users(event, context.current_state.values())
+
+        if event.type == EventTypes.CanonicalAlias:
+            # Check the alias is acually valid (at this time at least)
+            room_alias_str = event.content.get("alias", None)
+            if room_alias_str:
+                room_alias = RoomAlias.from_string(room_alias_str)
+                directory_handler = self.hs.get_handlers().directory_handler
+                mapping = yield directory_handler.get_association(room_alias)
+
+                if mapping["room_id"] != event.room_id:
+                    raise SynapseError(
+                        400,
+                        "Room alias %s does not point to the room" % (
+                            room_alias_str,
+                        )
+                    )
+
+        federation_handler = self.hs.get_handlers().federation_handler
+
+        if event.type == EventTypes.Member:
+            if event.content["membership"] == Membership.INVITE:
+                def is_inviter_member_event(e):
+                    return (
+                        e.type == EventTypes.Member and
+                        e.sender == event.sender
+                    )
+
+                event.unsigned["invite_room_state"] = [
+                    {
+                        "type": e.type,
+                        "state_key": e.state_key,
+                        "content": e.content,
+                        "sender": e.sender,
+                    }
+                    for k, e in context.current_state.items()
+                    if e.type in self.hs.config.room_invite_state_types
+                    or is_inviter_member_event(e)
+                ]
+
+                invitee = UserID.from_string(event.state_key)
+                if not self.hs.is_mine(invitee):
+                    # TODO: Can we add signature from remote server in a nicer
+                    # way? If we have been invited by a remote server, we need
+                    # to get them to sign the event.
+
+                    returned_invite = yield federation_handler.send_invite(
+                        invitee.domain,
+                        event,
+                    )
+
+                    event.unsigned.pop("room_state", None)
+
+                    # TODO: Make sure the signatures actually are correct.
+                    event.signatures.update(
+                        returned_invite.signatures
+                    )
+
+        if event.type == EventTypes.Redaction:
+            if self.auth.check_redaction(event, auth_events=context.current_state):
+                original_event = yield self.store.get_event(
+                    event.redacts,
+                    check_redacted=False,
+                    get_prev_content=False,
+                    allow_rejected=False,
+                    allow_none=False
+                )
+                if event.user_id != original_event.user_id:
+                    raise AuthError(
+                        403,
+                        "You don't have permission to redact events"
+                    )
+
+        if event.type == EventTypes.Create and context.current_state:
+            raise AuthError(
+                403,
+                "Changing the room create event is forbidden",
+            )
+
+        action_generator = ActionGenerator(self.hs)
+        yield action_generator.handle_push_actions_for_event(
+            event, context
+        )
+
+        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+            event, context=context
+        )
+
+        # this intentionally does not yield: we don't care about the result
+        # and don't need to wait for it.
+        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+            event_stream_id, max_stream_id
+        )
+
+        destinations = set()
+        for k, s in context.current_state.items():
+            try:
+                if k[0] == EventTypes.Member:
+                    if s.content["membership"] == Membership.JOIN:
+                        destinations.add(get_domain_from_id(s.state_key))
+            except SynapseError:
+                logger.warn(
+                    "Failed to get destination from event %s", s.event_id
+                )
+
+        @defer.inlineCallbacks
+        def _notify():
+            yield run_on_reactor()
+            self.notifier.on_new_room_event(
+                event, event_stream_id, max_stream_id,
+                extra_users=extra_users
+            )
+
+        preserve_fn(_notify)()
+
+        # If invite, remove room_state from unsigned before sending.
+        event.unsigned.pop("invite_room_state", None)
+
+        federation_handler.handle_new_event(
+            event, destinations=destinations,
+        )
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d0c8f1328b..6b70fa3817 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,11 +33,9 @@ from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID
+from synapse.types import UserID, get_domain_from_id
 import synapse.metrics
 
-from ._base import BaseHandler
-
 import logging
 
 
@@ -52,6 +50,8 @@ timers_fired_counter = metrics.register_counter("timers_fired")
 federation_presence_counter = metrics.register_counter("federation_presence")
 bump_active_time_counter = metrics.register_counter("bump_active_time")
 
+get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
+
 
 # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
 # "currently_active"
@@ -70,14 +70,18 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000
 # How often to resend presence to remote servers
 FEDERATION_PING_INTERVAL = 25 * 60 * 1000
 
+# How long we will wait before assuming that the syncs from an external process
+# are dead.
+EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
+
 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
-class PresenceHandler(BaseHandler):
+class PresenceHandler(object):
 
     def __init__(self, hs):
-        super(PresenceHandler, self).__init__(hs)
-        self.hs = hs
+        self.is_mine = hs.is_mine
+        self.is_mine_id = hs.is_mine_id
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
@@ -138,7 +142,7 @@ class PresenceHandler(BaseHandler):
                 obj=state.user_id,
                 then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
             )
-            if self.hs.is_mine_id(state.user_id):
+            if self.is_mine_id(state.user_id):
                 self.wheel_timer.insert(
                     now=now,
                     obj=state.user_id,
@@ -160,15 +164,26 @@ class PresenceHandler(BaseHandler):
         self.serial_to_user = {}
         self._next_serial = 1
 
-        # Keeps track of the number of *ongoing* syncs. While this is non zero
-        # a user will never go offline.
+        # Keeps track of the number of *ongoing* syncs on this process. While
+        # this is non zero a user will never go offline.
         self.user_to_num_current_syncs = {}
 
+        # Keeps track of the number of *ongoing* syncs on other processes.
+        # While any sync is ongoing on another process the user will never
+        # go offline.
+        # Each process has a unique identifier and an update frequency. If
+        # no update is received from that process within the update period then
+        # we assume that all the sync requests on that process have stopped.
+        # Stored as a dict from process_id to set of user_id, and a dict of
+        # process_id to millisecond timestamp last updated.
+        self.external_process_to_current_syncs = {}
+        self.external_process_last_updated_ms = {}
+
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
         self.clock.call_later(
-            0 * 1000,
+            30,
             self.clock.looping_call,
             self._handle_timeouts,
             5000,
@@ -228,7 +243,7 @@ class PresenceHandler(BaseHandler):
 
                 new_state, should_notify, should_ping = handle_update(
                     prev_state, new_state,
-                    is_mine=self.hs.is_mine_id(user_id),
+                    is_mine=self.is_mine_id(user_id),
                     wheel_timer=self.wheel_timer,
                     now=now
                 )
@@ -268,31 +283,48 @@ class PresenceHandler(BaseHandler):
         """Checks the presence of users that have timed out and updates as
         appropriate.
         """
+        logger.info("Handling presence timeouts")
         now = self.clock.time_msec()
 
-        with Measure(self.clock, "presence_handle_timeouts"):
-            # Fetch the list of users that *may* have timed out. Things may have
-            # changed since the timeout was set, so we won't necessarily have to
-            # take any action.
-            users_to_check = self.wheel_timer.fetch(now)
+        try:
+            with Measure(self.clock, "presence_handle_timeouts"):
+                # Fetch the list of users that *may* have timed out. Things may have
+                # changed since the timeout was set, so we won't necessarily have to
+                # take any action.
+                users_to_check = set(self.wheel_timer.fetch(now))
+
+                # Check whether the lists of syncing processes from an external
+                # process have expired.
+                expired_process_ids = [
+                    process_id for process_id, last_update
+                    in self.external_process_last_updated_ms.items()
+                    if now - last_update > EXTERNAL_PROCESS_EXPIRY
+                ]
+                for process_id in expired_process_ids:
+                    users_to_check.update(
+                        self.external_process_last_updated_ms.pop(process_id, ())
+                    )
+                    self.external_process_last_update.pop(process_id)
 
-            states = [
-                self.user_to_current_state.get(
-                    user_id, UserPresenceState.default(user_id)
-                )
-                for user_id in set(users_to_check)
-            ]
+                states = [
+                    self.user_to_current_state.get(
+                        user_id, UserPresenceState.default(user_id)
+                    )
+                    for user_id in users_to_check
+                ]
 
-            timers_fired_counter.inc_by(len(states))
+                timers_fired_counter.inc_by(len(states))
 
-            changes = handle_timeouts(
-                states,
-                is_mine_fn=self.hs.is_mine_id,
-                user_to_num_current_syncs=self.user_to_num_current_syncs,
-                now=now,
-            )
+                changes = handle_timeouts(
+                    states,
+                    is_mine_fn=self.is_mine_id,
+                    syncing_user_ids=self.get_currently_syncing_users(),
+                    now=now,
+                )
 
-        preserve_fn(self._update_states)(changes)
+            preserve_fn(self._update_states)(changes)
+        except:
+            logger.exception("Exception in _handle_timeouts loop")
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
@@ -365,6 +397,74 @@ class PresenceHandler(BaseHandler):
 
         defer.returnValue(_user_syncing())
 
+    def get_currently_syncing_users(self):
+        """Get the set of user ids that are currently syncing on this HS.
+        Returns:
+            set(str): A set of user_id strings.
+        """
+        syncing_user_ids = {
+            user_id for user_id, count in self.user_to_num_current_syncs.items()
+            if count
+        }
+        for user_ids in self.external_process_to_current_syncs.values():
+            syncing_user_ids.update(user_ids)
+        return syncing_user_ids
+
+    @defer.inlineCallbacks
+    def update_external_syncs(self, process_id, syncing_user_ids):
+        """Update the syncing users for an external process
+
+        Args:
+            process_id(str): An identifier for the process the users are
+                syncing against. This allows synapse to process updates
+                as user start and stop syncing against a given process.
+            syncing_user_ids(set(str)): The set of user_ids that are
+                currently syncing on that server.
+        """
+
+        # Grab the previous list of user_ids that were syncing on that process
+        prev_syncing_user_ids = (
+            self.external_process_to_current_syncs.get(process_id, set())
+        )
+        # Grab the current presence state for both the users that are syncing
+        # now and the users that were syncing before this update.
+        prev_states = yield self.current_state_for_users(
+            syncing_user_ids | prev_syncing_user_ids
+        )
+        updates = []
+        time_now_ms = self.clock.time_msec()
+
+        # For each new user that is syncing check if we need to mark them as
+        # being online.
+        for new_user_id in syncing_user_ids - prev_syncing_user_ids:
+            prev_state = prev_states[new_user_id]
+            if prev_state.state == PresenceState.OFFLINE:
+                updates.append(prev_state.copy_and_replace(
+                    state=PresenceState.ONLINE,
+                    last_active_ts=time_now_ms,
+                    last_user_sync_ts=time_now_ms,
+                ))
+            else:
+                updates.append(prev_state.copy_and_replace(
+                    last_user_sync_ts=time_now_ms,
+                ))
+
+        # For each user that is still syncing or stopped syncing update the
+        # last sync time so that we will correctly apply the grace period when
+        # they stop syncing.
+        for old_user_id in prev_syncing_user_ids:
+            prev_state = prev_states[old_user_id]
+            updates.append(prev_state.copy_and_replace(
+                last_user_sync_ts=time_now_ms,
+            ))
+
+        yield self._update_states(updates)
+
+        # Update the last updated time for the process. We expire the entries
+        # if we don't receive an update in the given timeframe.
+        self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+        self.external_process_to_current_syncs[process_id] = syncing_user_ids
+
     @defer.inlineCallbacks
     def current_state_for_user(self, user_id):
         """Get the current presence state for a user.
@@ -427,7 +527,7 @@ class PresenceHandler(BaseHandler):
 
         hosts_to_states = {}
         for room_id, states in room_ids_to_states.items():
-            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
             if not local_states:
                 continue
 
@@ -436,11 +536,11 @@ class PresenceHandler(BaseHandler):
                 hosts_to_states.setdefault(host, []).extend(local_states)
 
         for user_id, states in users_to_states.items():
-            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
             if not local_states:
                 continue
 
-            host = UserID.from_string(user_id).domain
+            host = get_domain_from_id(user_id)
             hosts_to_states.setdefault(host, []).extend(local_states)
 
         # TODO: de-dup hosts_to_states, as a single host might have multiple
@@ -611,14 +711,14 @@ class PresenceHandler(BaseHandler):
         # don't need to send to local clients here, as that is done as part
         # of the event stream/sync.
         # TODO: Only send to servers not already in the room.
-        if self.hs.is_mine(user):
+        if self.is_mine(user):
             state = yield self.current_state_for_user(user.to_string())
 
             hosts = yield self.store.get_joined_hosts_for_room(room_id)
             self._push_to_remotes({host: (state,) for host in hosts})
         else:
             user_ids = yield self.store.get_users_in_room(room_id)
-            user_ids = filter(self.hs.is_mine_id, user_ids)
+            user_ids = filter(self.is_mine_id, user_ids)
 
             states = yield self.current_state_for_users(user_ids)
 
@@ -628,7 +728,7 @@ class PresenceHandler(BaseHandler):
     def get_presence_list(self, observer_user, accepted=None):
         """Returns the presence for all users in their presence list.
         """
-        if not self.hs.is_mine(observer_user):
+        if not self.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         presence_list = yield self.store.get_presence_list(
@@ -659,7 +759,7 @@ class PresenceHandler(BaseHandler):
             observer_user.localpart, observed_user.to_string()
         )
 
-        if self.hs.is_mine(observed_user):
+        if self.is_mine(observed_user):
             yield self.invite_presence(observed_user, observer_user)
         else:
             yield self.federation.send_edu(
@@ -675,11 +775,11 @@ class PresenceHandler(BaseHandler):
     def invite_presence(self, observed_user, observer_user):
         """Handles new presence invites.
         """
-        if not self.hs.is_mine(observed_user):
+        if not self.is_mine(observed_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         # TODO: Don't auto accept
-        if self.hs.is_mine(observer_user):
+        if self.is_mine(observer_user):
             yield self.accept_presence(observed_user, observer_user)
         else:
             self.federation.send_edu(
@@ -742,7 +842,7 @@ class PresenceHandler(BaseHandler):
         Returns:
             A Deferred.
         """
-        if not self.hs.is_mine(observer_user):
+        if not self.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         yield self.store.del_presence_list(
@@ -834,7 +934,11 @@ def _format_user_presence_state(state, now):
 
 class PresenceEventSource(object):
     def __init__(self, hs):
-        self.hs = hs
+        # We can't call get_presence_handler here because there's a cycle:
+        #
+        #   Presence -> Notifier -> PresenceEventSource -> Presence
+        #
+        self.get_presence_handler = hs.get_presence_handler
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
 
@@ -860,7 +964,7 @@ class PresenceEventSource(object):
                 from_key = int(from_key)
             room_ids = room_ids or []
 
-            presence = self.hs.get_handlers().presence_handler
+            presence = self.get_presence_handler()
             stream_change_cache = self.store.presence_stream_cache
 
             if not room_ids:
@@ -877,13 +981,13 @@ class PresenceEventSource(object):
 
             user_ids_changed = set()
             changed = None
-            if from_key and max_token - from_key < 100:
-                # For small deltas, its quicker to get all changes and then
-                # work out if we share a room or they're in our presence list
+            if from_key:
                 changed = stream_change_cache.get_all_entities_changed(from_key)
 
-            # get_all_entities_changed can return None
-            if changed is not None:
+            if changed is not None and len(changed) < 500:
+                # For small deltas, its quicker to get all changes and then
+                # work out if we share a room or they're in our presence list
+                get_updates_counter.inc("stream")
                 for other_user_id in changed:
                     if other_user_id in friends:
                         user_ids_changed.add(other_user_id)
@@ -895,6 +999,8 @@ class PresenceEventSource(object):
             else:
                 # Too many possible updates. Find all users we can see and check
                 # if any of them have changed.
+                get_updates_counter.inc("full")
+
                 user_ids_to_check = set()
                 for room_id in room_ids:
                     users = yield self.store.get_users_in_room(room_id)
@@ -933,15 +1039,14 @@ class PresenceEventSource(object):
         return self.get_new_events(user, from_key=None, include_offline=False)
 
 
-def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     """Checks the presence of users that have timed out and updates as
     appropriate.
 
     Args:
         user_states(list): List of UserPresenceState's to check.
         is_mine_fn (fn): Function that returns if a user_id is ours
-        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
-            active syncs.
+        syncing_user_ids (set): Set of user_ids with active syncs.
         now (int): Current time in ms.
 
     Returns:
@@ -952,21 +1057,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
     for state in user_states:
         is_mine = is_mine_fn(state.user_id)
 
-        new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+        new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
         if new_state:
             changes[state.user_id] = new_state
 
     return changes.values()
 
 
-def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+def handle_timeout(state, is_mine, syncing_user_ids, now):
     """Checks the presence of the user to see if any of the timers have elapsed
 
     Args:
         state (UserPresenceState)
         is_mine (bool): Whether the user is ours
-        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
-            active syncs.
+        syncing_user_ids (set): Set of user_ids with active syncs.
         now (int): Current time in ms.
 
     Returns:
@@ -1000,7 +1104,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
 
         # If there are have been no sync for a while (and none ongoing),
         # set presence to offline
-        if not user_to_num_current_syncs.get(user_id, 0):
+        if user_id not in syncing_user_ids:
             if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
                     state=PresenceState.OFFLINE,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index b45eafbb49..e37409170d 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError, CodeMessageException
 from synapse.types import UserID, Requester
-from synapse.util import unwrapFirstError
 
 from ._base import BaseHandler
 
@@ -27,14 +26,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-def changed_presencelike_data(distributor, user, state):
-    return distributor.fire("changed_presencelike_data", user, state)
-
-
-def collect_presencelike_data(distributor, user, content):
-    return distributor.fire("collect_presencelike_data", user, content)
-
-
 class ProfileHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -46,17 +37,9 @@ class ProfileHandler(BaseHandler):
         )
 
         distributor = hs.get_distributor()
-        self.distributor = distributor
-
-        distributor.declare("collect_presencelike_data")
-        distributor.declare("changed_presencelike_data")
 
         distributor.observe("registered_user", self.registered_user)
 
-        distributor.observe(
-            "collect_presencelike_data", self.collect_presencelike_data
-        )
-
     def registered_user(self, user):
         return self.store.create_profile(user.localpart)
 
@@ -105,10 +88,6 @@ class ProfileHandler(BaseHandler):
             target_user.localpart, new_displayname
         )
 
-        yield changed_presencelike_data(self.distributor, target_user, {
-            "displayname": new_displayname,
-        })
-
         yield self._update_join_states(requester)
 
     @defer.inlineCallbacks
@@ -152,31 +131,9 @@ class ProfileHandler(BaseHandler):
             target_user.localpart, new_avatar_url
         )
 
-        yield changed_presencelike_data(self.distributor, target_user, {
-            "avatar_url": new_avatar_url,
-        })
-
         yield self._update_join_states(requester)
 
     @defer.inlineCallbacks
-    def collect_presencelike_data(self, user, state):
-        if not self.hs.is_mine(user):
-            defer.returnValue(None)
-
-        (displayname, avatar_url) = yield defer.gatherResults(
-            [
-                self.store.get_profile_displayname(user.localpart),
-                self.store.get_profile_avatar_url(user.localpart),
-            ],
-            consumeErrors=True
-        ).addErrback(unwrapFirstError)
-
-        state["displayname"] = displayname
-        state["avatar_url"] = avatar_url
-
-        defer.returnValue(None)
-
-    @defer.inlineCallbacks
     def on_profile_query(self, args):
         user = UserID.from_string(args["user_id"])
         if not self.hs.is_mine(user):
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 935c339707..e62722d78d 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -29,6 +29,8 @@ class ReceiptsHandler(BaseHandler):
     def __init__(self, hs):
         super(ReceiptsHandler, self).__init__(hs)
 
+        self.server_name = hs.config.server_name
+        self.store = hs.get_datastore()
         self.hs = hs
         self.federation = hs.get_replication_layer()
         self.federation.register_edu_handler(
@@ -80,6 +82,9 @@ class ReceiptsHandler(BaseHandler):
     def _handle_new_receipts(self, receipts):
         """Takes a list of receipts, stores them and informs the notifier.
         """
+        min_batch_id = None
+        max_batch_id = None
+
         for receipt in receipts:
             room_id = receipt["room_id"]
             receipt_type = receipt["receipt_type"]
@@ -97,10 +102,21 @@ class ReceiptsHandler(BaseHandler):
 
             stream_id, max_persisted_id = res
 
-            with PreserveLoggingContext():
-                self.notifier.on_new_event(
-                    "receipt_key", max_persisted_id, rooms=[room_id]
-                )
+            if min_batch_id is None or stream_id < min_batch_id:
+                min_batch_id = stream_id
+            if max_batch_id is None or max_persisted_id > max_batch_id:
+                max_batch_id = max_persisted_id
+
+        affected_room_ids = list(set([r["room_id"] for r in receipts]))
+
+        with PreserveLoggingContext():
+            self.notifier.on_new_event(
+                "receipt_key", max_batch_id, rooms=affected_room_ids
+            )
+            # Note that the min here shouldn't be relied upon to be accurate.
+            self.hs.get_pusherpool().on_new_receipts(
+                min_batch_id, max_batch_id, affected_room_ids
+            )
 
             defer.returnValue(True)
 
@@ -117,12 +133,9 @@ class ReceiptsHandler(BaseHandler):
             event_ids = receipt["event_ids"]
             data = receipt["data"]
 
-            remotedomains = set()
-
-            rm_handler = self.hs.get_handlers().room_member_handler
-            yield rm_handler.fetch_room_distributions_into(
-                room_id, localusers=None, remotedomains=remotedomains
-            )
+            remotedomains = yield self.store.get_joined_hosts_for_room(room_id)
+            remotedomains = remotedomains.copy()
+            remotedomains.discard(self.server_name)
 
             logger.debug("Sending receipt to: %r", remotedomains)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index f287ee247b..bbc07b045e 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -16,13 +16,14 @@
 """Contains functions for registering clients."""
 from twisted.internet import defer
 
-from synapse.types import UserID
+from synapse.types import UserID, Requester
 from synapse.api.errors import (
     AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
 )
 from ._base import BaseHandler
 from synapse.util.async import run_on_reactor
 from synapse.http.client import CaptchaServerHttpClient
+from synapse.util.distributor import registered_user
 
 import logging
 import urllib
@@ -30,10 +31,6 @@ import urllib
 logger = logging.getLogger(__name__)
 
 
-def registered_user(distributor, user):
-    return distributor.fire("registered_user", user)
-
-
 class RegistrationHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -361,8 +358,62 @@ class RegistrationHandler(BaseHandler):
         )
         defer.returnValue(data)
 
+    @defer.inlineCallbacks
+    def get_or_create_user(self, localpart, displayname, duration_seconds):
+        """Creates a new user if the user does not exist,
+        else revokes all previous access tokens and generates a new one.
+
+        Args:
+            localpart : The local part of the user ID to register. If None,
+              one will be randomly generated.
+        Returns:
+            A tuple of (user_id, access_token).
+        Raises:
+            RegistrationError if there was a problem registering.
+        """
+        yield run_on_reactor()
+
+        if localpart is None:
+            raise SynapseError(400, "Request must include user id")
+
+        need_register = True
+
+        try:
+            yield self.check_username(localpart)
+        except SynapseError as e:
+            if e.errcode == Codes.USER_IN_USE:
+                need_register = False
+            else:
+                raise
+
+        user = UserID(localpart, self.hs.hostname)
+        user_id = user.to_string()
+        auth_handler = self.hs.get_handlers().auth_handler
+        token = auth_handler.generate_short_term_login_token(user_id, duration_seconds)
+
+        if need_register:
+            yield self.store.register(
+                user_id=user_id,
+                token=token,
+                password_hash=None
+            )
+
+            yield registered_user(self.distributor, user)
+        else:
+            yield self.store.user_delete_access_tokens(user_id=user_id)
+            yield self.store.add_access_token_to_user(user_id=user_id, token=token)
+
+        if displayname is not None:
+            logger.info("setting user display name: %s -> %s", user_id, displayname)
+            profile_handler = self.hs.get_handlers().profile_handler
+            yield profile_handler.set_displayname(
+                user, Requester(user, token, False), displayname
+            )
+
+        defer.returnValue((user_id, token))
+
     def auth_handler(self):
-        return self.hs.get_handlers().auth_handler
+        return self.hs.get_auth_handler()
 
     @defer.inlineCallbacks
     def guest_access_token_for(self, medium, address, inviter_user_id):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index d5c56ce0d6..9fd34588dd 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -18,19 +18,17 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 
-from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken, Requester
+from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
 from synapse.api.constants import (
-    EventTypes, Membership, JoinRules, RoomCreationPreset,
+    EventTypes, JoinRules, RoomCreationPreset,
 )
-from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
-from synapse.util import stringutils, unwrapFirstError
-from synapse.util.logcontext import preserve_context_over_fn
-
-from signedjson.sign import verify_signed_json
-from signedjson.key import decode_verify_key_bytes
+from synapse.api.errors import AuthError, StoreError, SynapseError
+from synapse.util import stringutils
+from synapse.util.async import concurrently_execute
+from synapse.util.caches.response_cache import ResponseCache
+from synapse.visibility import filter_events_for_client
 
 from collections import OrderedDict
-from unpaddedbase64 import decode_base64
 
 import logging
 import math
@@ -38,21 +36,9 @@ import string
 
 logger = logging.getLogger(__name__)
 
-id_server_scheme = "https://"
-
-
-def user_left_room(distributor, user, room_id):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "user_left_room", user=user, room_id=room_id
-    )
+REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
 
-
-def user_joined_room(distributor, user, room_id):
-    return preserve_context_over_fn(
-        distributor.fire,
-        "user_joined_room", user=user, room_id=room_id
-    )
+id_server_scheme = "https://"
 
 
 class RoomCreationHandler(BaseHandler):
@@ -356,598 +342,31 @@ class RoomCreationHandler(BaseHandler):
             )
 
 
-class RoomMemberHandler(BaseHandler):
-    # TODO(paul): This handler currently contains a messy conflation of
-    #   low-level API that works on UserID objects and so on, and REST-level
-    #   API that takes ID strings and returns pagination chunks. These concerns
-    #   ought to be separated out a lot better.
-
+class RoomListHandler(BaseHandler):
     def __init__(self, hs):
-        super(RoomMemberHandler, self).__init__(hs)
-
-        self.clock = hs.get_clock()
-
-        self.distributor = hs.get_distributor()
-        self.distributor.declare("user_joined_room")
-        self.distributor.declare("user_left_room")
-
-    @defer.inlineCallbacks
-    def get_room_members(self, room_id):
-        users = yield self.store.get_users_in_room(room_id)
-
-        defer.returnValue([UserID.from_string(u) for u in users])
-
-    @defer.inlineCallbacks
-    def fetch_room_distributions_into(self, room_id, localusers=None,
-                                      remotedomains=None, ignore_user=None):
-        """Fetch the distribution of a room, adding elements to either
-        'localusers' or 'remotedomains', which should be a set() if supplied.
-        If ignore_user is set, ignore that user.
-
-        This function returns nothing; its result is performed by the
-        side-effect on the two passed sets. This allows easy accumulation of
-        member lists of multiple rooms at once if required.
-        """
-        members = yield self.get_room_members(room_id)
-        for member in members:
-            if ignore_user is not None and member == ignore_user:
-                continue
-
-            if self.hs.is_mine(member):
-                if localusers is not None:
-                    localusers.add(member)
-            else:
-                if remotedomains is not None:
-                    remotedomains.add(member.domain)
-
-    @defer.inlineCallbacks
-    def update_membership(
-            self,
-            requester,
-            target,
-            room_id,
-            action,
-            txn_id=None,
-            remote_room_hosts=None,
-            third_party_signed=None,
-            ratelimit=True,
-    ):
-        effective_membership_state = action
-        if action in ["kick", "unban"]:
-            effective_membership_state = "leave"
-        elif action == "forget":
-            effective_membership_state = "leave"
-
-        if third_party_signed is not None:
-            replication = self.hs.get_replication_layer()
-            yield replication.exchange_third_party_invite(
-                third_party_signed["sender"],
-                target.to_string(),
-                room_id,
-                third_party_signed,
-            )
-
-        msg_handler = self.hs.get_handlers().message_handler
-
-        content = {"membership": effective_membership_state}
-        if requester.is_guest:
-            content["kind"] = "guest"
-
-        event, context = yield msg_handler.create_event(
-            {
-                "type": EventTypes.Member,
-                "content": content,
-                "room_id": room_id,
-                "sender": requester.user.to_string(),
-                "state_key": target.to_string(),
-
-                # For backwards compatibility:
-                "membership": effective_membership_state,
-            },
-            token_id=requester.access_token_id,
-            txn_id=txn_id,
-        )
-
-        old_state = context.current_state.get((EventTypes.Member, event.state_key))
-        old_membership = old_state.content.get("membership") if old_state else None
-        if action == "unban" and old_membership != "ban":
-            raise SynapseError(
-                403,
-                "Cannot unban user who was not banned (membership=%s)" % old_membership,
-                errcode=Codes.BAD_STATE
-            )
-        if old_membership == "ban" and action != "unban":
-            raise SynapseError(
-                403,
-                "Cannot %s user who was is banned" % (action,),
-                errcode=Codes.BAD_STATE
-            )
-
-        member_handler = self.hs.get_handlers().room_member_handler
-        yield member_handler.send_membership_event(
-            requester,
-            event,
-            context,
-            ratelimit=ratelimit,
-            remote_room_hosts=remote_room_hosts,
-        )
-
-        if action == "forget":
-            yield self.forget(requester.user, room_id)
-
-    @defer.inlineCallbacks
-    def send_membership_event(
-            self,
-            requester,
-            event,
-            context,
-            remote_room_hosts=None,
-            ratelimit=True,
-    ):
-        """
-        Change the membership status of a user in a room.
-
-        Args:
-            requester (Requester): The local user who requested the membership
-                event. If None, certain checks, like whether this homeserver can
-                act as the sender, will be skipped.
-            event (SynapseEvent): The membership event.
-            context: The context of the event.
-            is_guest (bool): Whether the sender is a guest.
-            room_hosts ([str]): Homeservers which are likely to already be in
-                the room, and could be danced with in order to join this
-                homeserver for the first time.
-            ratelimit (bool): Whether to rate limit this request.
-        Raises:
-            SynapseError if there was a problem changing the membership.
-        """
-        remote_room_hosts = remote_room_hosts or []
-
-        target_user = UserID.from_string(event.state_key)
-        room_id = event.room_id
-
-        if requester is not None:
-            sender = UserID.from_string(event.sender)
-            assert sender == requester.user, (
-                "Sender (%s) must be same as requester (%s)" %
-                (sender, requester.user)
-            )
-            assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
-        else:
-            requester = Requester(target_user, None, False)
-
-        message_handler = self.hs.get_handlers().message_handler
-        prev_event = message_handler.deduplicate_state_event(event, context)
-        if prev_event is not None:
-            return
-
-        action = "send"
-
-        if event.membership == Membership.JOIN:
-            if requester.is_guest and not self._can_guest_join(context.current_state):
-                # This should be an auth check, but guests are a local concept,
-                # so don't really fit into the general auth process.
-                raise AuthError(403, "Guest access not allowed")
-            do_remote_join_dance, remote_room_hosts = self._should_do_dance(
-                context,
-                (self.get_inviter(event.state_key, context.current_state)),
-                remote_room_hosts,
-            )
-            if do_remote_join_dance:
-                action = "remote_join"
-        elif event.membership == Membership.LEAVE:
-            is_host_in_room = self.is_host_in_room(context.current_state)
-
-            if not is_host_in_room:
-                # perhaps we've been invited
-                inviter = self.get_inviter(target_user.to_string(), context.current_state)
-                if not inviter:
-                    raise SynapseError(404, "Not a known room")
-
-                if self.hs.is_mine(inviter):
-                    # the inviter was on our server, but has now left. Carry on
-                    # with the normal rejection codepath.
-                    #
-                    # This is a bit of a hack, because the room might still be
-                    # active on other servers.
-                    pass
-                else:
-                    # send the rejection to the inviter's HS.
-                    remote_room_hosts = remote_room_hosts + [inviter.domain]
-                    action = "remote_reject"
-
-        federation_handler = self.hs.get_handlers().federation_handler
-
-        if action == "remote_join":
-            if len(remote_room_hosts) == 0:
-                raise SynapseError(404, "No known servers")
-
-            # We don't do an auth check if we are doing an invite
-            # join dance for now, since we're kinda implicitly checking
-            # that we are allowed to join when we decide whether or not we
-            # need to do the invite/join dance.
-            yield federation_handler.do_invite_join(
-                remote_room_hosts,
-                event.room_id,
-                event.user_id,
-                event.content,
-            )
-        elif action == "remote_reject":
-            yield federation_handler.do_remotely_reject_invite(
-                remote_room_hosts,
-                room_id,
-                event.user_id
-            )
-        else:
-            yield self.handle_new_client_event(
-                requester,
-                event,
-                context,
-                extra_users=[target_user],
-                ratelimit=ratelimit,
-            )
-
-        prev_member_event = context.current_state.get(
-            (EventTypes.Member, target_user.to_string()),
-            None
-        )
-
-        if event.membership == Membership.JOIN:
-            if not prev_member_event or prev_member_event.membership != Membership.JOIN:
-                # Only fire user_joined_room if the user has acutally joined the
-                # room. Don't bother if the user is just changing their profile
-                # info.
-                yield user_joined_room(self.distributor, target_user, room_id)
-        elif event.membership == Membership.LEAVE:
-            if prev_member_event and prev_member_event.membership == Membership.JOIN:
-                user_left_room(self.distributor, target_user, room_id)
-
-    def _can_guest_join(self, current_state):
-        """
-        Returns whether a guest can join a room based on its current state.
-        """
-        guest_access = current_state.get((EventTypes.GuestAccess, ""), None)
-        return (
-            guest_access
-            and guest_access.content
-            and "guest_access" in guest_access.content
-            and guest_access.content["guest_access"] == "can_join"
-        )
-
-    def _should_do_dance(self, context, inviter, room_hosts=None):
-        # TODO: Shouldn't this be remote_room_host?
-        room_hosts = room_hosts or []
-
-        is_host_in_room = self.is_host_in_room(context.current_state)
-        if is_host_in_room:
-            return False, room_hosts
-
-        if inviter and not self.hs.is_mine(inviter):
-            room_hosts.append(inviter.domain)
-
-        return True, room_hosts
-
-    @defer.inlineCallbacks
-    def lookup_room_alias(self, room_alias):
-        """
-        Get the room ID associated with a room alias.
-
-        Args:
-            room_alias (RoomAlias): The alias to look up.
-        Returns:
-            A tuple of:
-                The room ID as a RoomID object.
-                Hosts likely to be participating in the room ([str]).
-        Raises:
-            SynapseError if room alias could not be found.
-        """
-        directory_handler = self.hs.get_handlers().directory_handler
-        mapping = yield directory_handler.get_association(room_alias)
-
-        if not mapping:
-            raise SynapseError(404, "No such room alias")
-
-        room_id = mapping["room_id"]
-        servers = mapping["servers"]
-
-        defer.returnValue((RoomID.from_string(room_id), servers))
-
-    def get_inviter(self, user_id, current_state):
-        prev_state = current_state.get((EventTypes.Member, user_id))
-        if prev_state and prev_state.membership == Membership.INVITE:
-            return UserID.from_string(prev_state.user_id)
-        return None
-
-    @defer.inlineCallbacks
-    def get_joined_rooms_for_user(self, user):
-        """Returns a list of roomids that the user has any of the given
-        membership states in."""
-
-        rooms = yield self.store.get_rooms_for_user(
-            user.to_string(),
-        )
-
-        # For some reason the list of events contains duplicates
-        # TODO(paul): work out why because I really don't think it should
-        room_ids = set(r.room_id for r in rooms)
-
-        defer.returnValue(room_ids)
-
-    @defer.inlineCallbacks
-    def do_3pid_invite(
-            self,
-            room_id,
-            inviter,
-            medium,
-            address,
-            id_server,
-            requester,
-            txn_id
-    ):
-        invitee = yield self._lookup_3pid(
-            id_server, medium, address
-        )
-
-        if invitee:
-            handler = self.hs.get_handlers().room_member_handler
-            yield handler.update_membership(
-                requester,
-                UserID.from_string(invitee),
-                room_id,
-                "invite",
-                txn_id=txn_id,
-            )
-        else:
-            yield self._make_and_store_3pid_invite(
-                requester,
-                id_server,
-                medium,
-                address,
-                room_id,
-                inviter,
-                txn_id=txn_id
-            )
-
-    @defer.inlineCallbacks
-    def _lookup_3pid(self, id_server, medium, address):
-        """Looks up a 3pid in the passed identity server.
-
-        Args:
-            id_server (str): The server name (including port, if required)
-                of the identity server to use.
-            medium (str): The type of the third party identifier (e.g. "email").
-            address (str): The third party identifier (e.g. "foo@example.com").
-
-        Returns:
-            (str) the matrix ID of the 3pid, or None if it is not recognized.
-        """
-        try:
-            data = yield self.hs.get_simple_http_client().get_json(
-                "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
-                {
-                    "medium": medium,
-                    "address": address,
-                }
-            )
-
-            if "mxid" in data:
-                if "signatures" not in data:
-                    raise AuthError(401, "No signatures on 3pid binding")
-                self.verify_any_signature(data, id_server)
-                defer.returnValue(data["mxid"])
-
-        except IOError as e:
-            logger.warn("Error from identity server lookup: %s" % (e,))
-            defer.returnValue(None)
-
-    @defer.inlineCallbacks
-    def verify_any_signature(self, data, server_hostname):
-        if server_hostname not in data["signatures"]:
-            raise AuthError(401, "No signature from server %s" % (server_hostname,))
-        for key_name, signature in data["signatures"][server_hostname].items():
-            key_data = yield self.hs.get_simple_http_client().get_json(
-                "%s%s/_matrix/identity/api/v1/pubkey/%s" %
-                (id_server_scheme, server_hostname, key_name,),
-            )
-            if "public_key" not in key_data:
-                raise AuthError(401, "No public key named %s from %s" %
-                                (key_name, server_hostname,))
-            verify_signed_json(
-                data,
-                server_hostname,
-                decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"]))
-            )
-            return
-
-    @defer.inlineCallbacks
-    def _make_and_store_3pid_invite(
-            self,
-            requester,
-            id_server,
-            medium,
-            address,
-            room_id,
-            user,
-            txn_id
-    ):
-        room_state = yield self.hs.get_state_handler().get_current_state(room_id)
-
-        inviter_display_name = ""
-        inviter_avatar_url = ""
-        member_event = room_state.get((EventTypes.Member, user.to_string()))
-        if member_event:
-            inviter_display_name = member_event.content.get("displayname", "")
-            inviter_avatar_url = member_event.content.get("avatar_url", "")
-
-        canonical_room_alias = ""
-        canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
-        if canonical_alias_event:
-            canonical_room_alias = canonical_alias_event.content.get("alias", "")
-
-        room_name = ""
-        room_name_event = room_state.get((EventTypes.Name, ""))
-        if room_name_event:
-            room_name = room_name_event.content.get("name", "")
-
-        room_join_rules = ""
-        join_rules_event = room_state.get((EventTypes.JoinRules, ""))
-        if join_rules_event:
-            room_join_rules = join_rules_event.content.get("join_rule", "")
-
-        room_avatar_url = ""
-        room_avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
-        if room_avatar_event:
-            room_avatar_url = room_avatar_event.content.get("url", "")
-
-        token, public_keys, fallback_public_key, display_name = (
-            yield self._ask_id_server_for_third_party_invite(
-                id_server=id_server,
-                medium=medium,
-                address=address,
-                room_id=room_id,
-                inviter_user_id=user.to_string(),
-                room_alias=canonical_room_alias,
-                room_avatar_url=room_avatar_url,
-                room_join_rules=room_join_rules,
-                room_name=room_name,
-                inviter_display_name=inviter_display_name,
-                inviter_avatar_url=inviter_avatar_url
-            )
-        )
-
-        msg_handler = self.hs.get_handlers().message_handler
-        yield msg_handler.create_and_send_nonmember_event(
-            requester,
-            {
-                "type": EventTypes.ThirdPartyInvite,
-                "content": {
-                    "display_name": display_name,
-                    "public_keys": public_keys,
-
-                    # For backwards compatibility:
-                    "key_validity_url": fallback_public_key["key_validity_url"],
-                    "public_key": fallback_public_key["public_key"],
-                },
-                "room_id": room_id,
-                "sender": user.to_string(),
-                "state_key": token,
-            },
-            txn_id=txn_id,
-        )
-
-    @defer.inlineCallbacks
-    def _ask_id_server_for_third_party_invite(
-            self,
-            id_server,
-            medium,
-            address,
-            room_id,
-            inviter_user_id,
-            room_alias,
-            room_avatar_url,
-            room_join_rules,
-            room_name,
-            inviter_display_name,
-            inviter_avatar_url
-    ):
-        """
-        Asks an identity server for a third party invite.
-
-        :param id_server (str): hostname + optional port for the identity server.
-        :param medium (str): The literal string "email".
-        :param address (str): The third party address being invited.
-        :param room_id (str): The ID of the room to which the user is invited.
-        :param inviter_user_id (str): The user ID of the inviter.
-        :param room_alias (str): An alias for the room, for cosmetic
-            notifications.
-        :param room_avatar_url (str): The URL of the room's avatar, for cosmetic
-            notifications.
-        :param room_join_rules (str): The join rules of the email
-            (e.g. "public").
-        :param room_name (str): The m.room.name of the room.
-        :param inviter_display_name (str): The current display name of the
-            inviter.
-        :param inviter_avatar_url (str): The URL of the inviter's avatar.
-
-        :return: A deferred tuple containing:
-            token (str): The token which must be signed to prove authenticity.
-            public_keys ([{"public_key": str, "key_validity_url": str}]):
-                public_key is a base64-encoded ed25519 public key.
-            fallback_public_key: One element from public_keys.
-            display_name (str): A user-friendly name to represent the invited
-                user.
-        """
-
-        is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
-            id_server_scheme, id_server,
+        super(RoomListHandler, self).__init__(hs)
+        self.response_cache = ResponseCache()
+        self.remote_list_request_cache = ResponseCache()
+        self.remote_list_cache = {}
+        self.fetch_looping_call = hs.get_clock().looping_call(
+            self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
         )
+        self.fetch_all_remote_lists()
 
-        invite_config = {
-            "medium": medium,
-            "address": address,
-            "room_id": room_id,
-            "room_alias": room_alias,
-            "room_avatar_url": room_avatar_url,
-            "room_join_rules": room_join_rules,
-            "room_name": room_name,
-            "sender": inviter_user_id,
-            "sender_display_name": inviter_display_name,
-            "sender_avatar_url": inviter_avatar_url,
-        }
-
-        if self.hs.config.invite_3pid_guest:
-            registration_handler = self.hs.get_handlers().registration_handler
-            guest_access_token = yield registration_handler.guest_access_token_for(
-                medium=medium,
-                address=address,
-                inviter_user_id=inviter_user_id,
-            )
-
-            guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
-                guest_access_token
-            )
-
-            invite_config.update({
-                "guest_access_token": guest_access_token,
-                "guest_user_id": guest_user_info["user"].to_string(),
-            })
-
-        data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
-            is_url,
-            invite_config
-        )
-        # TODO: Check for success
-        token = data["token"]
-        public_keys = data.get("public_keys", [])
-        if "public_key" in data:
-            fallback_public_key = {
-                "public_key": data["public_key"],
-                "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
-                    id_server_scheme, id_server,
-                ),
-            }
-        else:
-            fallback_public_key = public_keys[0]
-
-        if not public_keys:
-            public_keys.append(fallback_public_key)
-        display_name = data["display_name"]
-        defer.returnValue((token, public_keys, fallback_public_key, display_name))
-
-    def forget(self, user, room_id):
-        return self.store.forget(user.to_string(), room_id)
-
-
-class RoomListHandler(BaseHandler):
+    def get_local_public_room_list(self):
+        result = self.response_cache.get(())
+        if not result:
+            result = self.response_cache.set((), self._get_public_room_list())
+        return result
 
     @defer.inlineCallbacks
-    def get_public_room_list(self):
+    def _get_public_room_list(self):
         room_ids = yield self.store.get_public_room_ids()
 
+        results = []
+
         @defer.inlineCallbacks
         def handle_room(room_id):
-            aliases = yield self.store.get_aliases_for_room(room_id)
-
             # We pull each bit of state out indvidually to avoid pulling the
             # full state into memory. Due to how the caching works this should
             # be fairly quick, even if not originally in the cache.
@@ -962,6 +381,14 @@ class RoomListHandler(BaseHandler):
                     defer.returnValue(None)
 
             result = {"room_id": room_id}
+
+            joined_users = yield self.store.get_users_in_room(room_id)
+            if len(joined_users) == 0:
+                return
+
+            result["num_joined_members"] = len(joined_users)
+
+            aliases = yield self.store.get_aliases_for_room(room_id)
             if aliases:
                 result["aliases"] = aliases
 
@@ -1001,21 +428,61 @@ class RoomListHandler(BaseHandler):
                 if avatar_url:
                     result["avatar_url"] = avatar_url
 
-            joined_users = yield self.store.get_users_in_room(room_id)
-            result["num_joined_members"] = len(joined_users)
-
-            defer.returnValue(result)
+            results.append(result)
 
-        result = []
-        for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)):
-            chunk_result = yield defer.gatherResults([
-                handle_room(room_id)
-                for room_id in chunk
-            ], consumeErrors=True).addErrback(unwrapFirstError)
-            result.extend(v for v in chunk_result if v)
+        yield concurrently_execute(handle_room, room_ids, 10)
 
         # FIXME (erikj): START is no longer a valid value
-        defer.returnValue({"start": "START", "end": "END", "chunk": result})
+        defer.returnValue({"start": "START", "end": "END", "chunk": results})
+
+    @defer.inlineCallbacks
+    def fetch_all_remote_lists(self):
+        deferred = self.hs.get_replication_layer().get_public_rooms(
+            self.hs.config.secondary_directory_servers
+        )
+        self.remote_list_request_cache.set((), deferred)
+        self.remote_list_cache = yield deferred
+
+    @defer.inlineCallbacks
+    def get_aggregated_public_room_list(self):
+        """
+        Get the public room list from this server and the servers
+        specified in the secondary_directory_servers config option.
+        XXX: Pagination...
+        """
+        # We return the results from out cache which is updated by a looping call,
+        # unless we're missing a cache entry, in which case wait for the result
+        # of the fetch if there's one in progress. If not, omit that server.
+        wait = False
+        for s in self.hs.config.secondary_directory_servers:
+            if s not in self.remote_list_cache:
+                logger.warn("No cached room list from %s: waiting for fetch", s)
+                wait = True
+                break
+
+        if wait and self.remote_list_request_cache.get(()):
+            yield self.remote_list_request_cache.get(())
+
+        public_rooms = yield self.get_local_public_room_list()
+
+        # keep track of which room IDs we've seen so we can de-dup
+        room_ids = set()
+
+        # tag all the ones in our list with our server name.
+        # Also add the them to the de-deping set
+        for room in public_rooms['chunk']:
+            room["server_name"] = self.hs.hostname
+            room_ids.add(room["room_id"])
+
+        # Now add the results from federation
+        for server_name, server_result in self.remote_list_cache.items():
+            for room in server_result["chunk"]:
+                if room["room_id"] not in room_ids:
+                    room["server_name"] = server_name
+                    public_rooms["chunk"].append(room)
+                    room_ids.add(room["room_id"])
+
+        defer.returnValue(public_rooms)
 
 
 class RoomContextHandler(BaseHandler):
@@ -1040,10 +507,12 @@ class RoomContextHandler(BaseHandler):
         now_token = yield self.hs.get_event_sources().get_current_token()
 
         def filter_evts(events):
-            return self._filter_events_for_client(
+            return filter_events_for_client(
+                self.store,
                 user.to_string(),
                 events,
-                is_peeking=is_guest)
+                is_peeking=is_guest
+            )
 
         event = yield self.store.get_event(event_id, get_prev_content=True,
                                            allow_none=True)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
new file mode 100644
index 0000000000..7e616f44fd
--- /dev/null
+++ b/synapse/handlers/room_member.py
@@ -0,0 +1,677 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from twisted.internet import defer
+
+from ._base import BaseHandler
+
+from synapse.types import UserID, RoomID, Requester
+from synapse.api.constants import (
+    EventTypes, Membership,
+)
+from synapse.api.errors import AuthError, SynapseError, Codes
+from synapse.util.async import Linearizer
+from synapse.util.distributor import user_left_room, user_joined_room
+
+from signedjson.sign import verify_signed_json
+from signedjson.key import decode_verify_key_bytes
+
+from unpaddedbase64 import decode_base64
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+id_server_scheme = "https://"
+
+
+class RoomMemberHandler(BaseHandler):
+    # TODO(paul): This handler currently contains a messy conflation of
+    #   low-level API that works on UserID objects and so on, and REST-level
+    #   API that takes ID strings and returns pagination chunks. These concerns
+    #   ought to be separated out a lot better.
+
+    def __init__(self, hs):
+        super(RoomMemberHandler, self).__init__(hs)
+
+        self.member_linearizer = Linearizer()
+
+        self.clock = hs.get_clock()
+
+        self.distributor = hs.get_distributor()
+        self.distributor.declare("user_joined_room")
+        self.distributor.declare("user_left_room")
+
+    @defer.inlineCallbacks
+    def _local_membership_update(
+        self, requester, target, room_id, membership,
+        prev_event_ids,
+        txn_id=None,
+        ratelimit=True,
+    ):
+        msg_handler = self.hs.get_handlers().message_handler
+
+        content = {"membership": membership}
+        if requester.is_guest:
+            content["kind"] = "guest"
+
+        event, context = yield msg_handler.create_event(
+            {
+                "type": EventTypes.Member,
+                "content": content,
+                "room_id": room_id,
+                "sender": requester.user.to_string(),
+                "state_key": target.to_string(),
+
+                # For backwards compatibility:
+                "membership": membership,
+            },
+            token_id=requester.access_token_id,
+            txn_id=txn_id,
+            prev_event_ids=prev_event_ids,
+        )
+
+        yield msg_handler.handle_new_client_event(
+            requester,
+            event,
+            context,
+            extra_users=[target],
+            ratelimit=ratelimit,
+        )
+
+        prev_member_event = context.current_state.get(
+            (EventTypes.Member, target.to_string()),
+            None
+        )
+
+        if event.membership == Membership.JOIN:
+            if not prev_member_event or prev_member_event.membership != Membership.JOIN:
+                # Only fire user_joined_room if the user has acutally joined the
+                # room. Don't bother if the user is just changing their profile
+                # info.
+                yield user_joined_room(self.distributor, target, room_id)
+        elif event.membership == Membership.LEAVE:
+            if prev_member_event and prev_member_event.membership == Membership.JOIN:
+                user_left_room(self.distributor, target, room_id)
+
+    @defer.inlineCallbacks
+    def remote_join(self, remote_room_hosts, room_id, user, content):
+        if len(remote_room_hosts) == 0:
+            raise SynapseError(404, "No known servers")
+
+        # We don't do an auth check if we are doing an invite
+        # join dance for now, since we're kinda implicitly checking
+        # that we are allowed to join when we decide whether or not we
+        # need to do the invite/join dance.
+        yield self.hs.get_handlers().federation_handler.do_invite_join(
+            remote_room_hosts,
+            room_id,
+            user.to_string(),
+            content,
+        )
+        yield user_joined_room(self.distributor, user, room_id)
+
+    def reject_remote_invite(self, user_id, room_id, remote_room_hosts):
+        return self.hs.get_handlers().federation_handler.do_remotely_reject_invite(
+            remote_room_hosts,
+            room_id,
+            user_id
+        )
+
+    @defer.inlineCallbacks
+    def update_membership(
+            self,
+            requester,
+            target,
+            room_id,
+            action,
+            txn_id=None,
+            remote_room_hosts=None,
+            third_party_signed=None,
+            ratelimit=True,
+    ):
+        key = (target, room_id,)
+
+        with (yield self.member_linearizer.queue(key)):
+            result = yield self._update_membership(
+                requester,
+                target,
+                room_id,
+                action,
+                txn_id=txn_id,
+                remote_room_hosts=remote_room_hosts,
+                third_party_signed=third_party_signed,
+                ratelimit=ratelimit,
+            )
+
+        defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def _update_membership(
+            self,
+            requester,
+            target,
+            room_id,
+            action,
+            txn_id=None,
+            remote_room_hosts=None,
+            third_party_signed=None,
+            ratelimit=True,
+    ):
+        effective_membership_state = action
+        if action in ["kick", "unban"]:
+            effective_membership_state = "leave"
+
+        if third_party_signed is not None:
+            replication = self.hs.get_replication_layer()
+            yield replication.exchange_third_party_invite(
+                third_party_signed["sender"],
+                target.to_string(),
+                room_id,
+                third_party_signed,
+            )
+
+        if not remote_room_hosts:
+            remote_room_hosts = []
+
+        latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+        current_state = yield self.state_handler.get_current_state(
+            room_id, latest_event_ids=latest_event_ids,
+        )
+
+        old_state = current_state.get((EventTypes.Member, target.to_string()))
+        old_membership = old_state.content.get("membership") if old_state else None
+        if action == "unban" and old_membership != "ban":
+            raise SynapseError(
+                403,
+                "Cannot unban user who was not banned (membership=%s)" % old_membership,
+                errcode=Codes.BAD_STATE
+            )
+        if old_membership == "ban" and action != "unban":
+            raise SynapseError(
+                403,
+                "Cannot %s user who was banned" % (action,),
+                errcode=Codes.BAD_STATE
+            )
+
+        is_host_in_room = self.is_host_in_room(current_state)
+
+        if effective_membership_state == Membership.JOIN:
+            if requester.is_guest and not self._can_guest_join(current_state):
+                # This should be an auth check, but guests are a local concept,
+                # so don't really fit into the general auth process.
+                raise AuthError(403, "Guest access not allowed")
+
+            if not is_host_in_room:
+                inviter = yield self.get_inviter(target.to_string(), room_id)
+                if inviter and not self.hs.is_mine(inviter):
+                    remote_room_hosts.append(inviter.domain)
+
+                content = {"membership": Membership.JOIN}
+
+                profile = self.hs.get_handlers().profile_handler
+                content["displayname"] = yield profile.get_displayname(target)
+                content["avatar_url"] = yield profile.get_avatar_url(target)
+
+                if requester.is_guest:
+                    content["kind"] = "guest"
+
+                ret = yield self.remote_join(
+                    remote_room_hosts, room_id, target, content
+                )
+                defer.returnValue(ret)
+
+        elif effective_membership_state == Membership.LEAVE:
+            if not is_host_in_room:
+                # perhaps we've been invited
+                inviter = yield self.get_inviter(target.to_string(), room_id)
+                if not inviter:
+                    raise SynapseError(404, "Not a known room")
+
+                if self.hs.is_mine(inviter):
+                    # the inviter was on our server, but has now left. Carry on
+                    # with the normal rejection codepath.
+                    #
+                    # This is a bit of a hack, because the room might still be
+                    # active on other servers.
+                    pass
+                else:
+                    # send the rejection to the inviter's HS.
+                    remote_room_hosts = remote_room_hosts + [inviter.domain]
+
+                    try:
+                        ret = yield self.reject_remote_invite(
+                            target.to_string(), room_id, remote_room_hosts
+                        )
+                        defer.returnValue(ret)
+                    except SynapseError as e:
+                        logger.warn("Failed to reject invite: %s", e)
+
+                        yield self.store.locally_reject_invite(
+                            target.to_string(), room_id
+                        )
+
+                        defer.returnValue({})
+
+        yield self._local_membership_update(
+            requester=requester,
+            target=target,
+            room_id=room_id,
+            membership=effective_membership_state,
+            txn_id=txn_id,
+            ratelimit=ratelimit,
+            prev_event_ids=latest_event_ids,
+        )
+
+    @defer.inlineCallbacks
+    def send_membership_event(
+            self,
+            requester,
+            event,
+            context,
+            remote_room_hosts=None,
+            ratelimit=True,
+    ):
+        """
+        Change the membership status of a user in a room.
+
+        Args:
+            requester (Requester): The local user who requested the membership
+                event. If None, certain checks, like whether this homeserver can
+                act as the sender, will be skipped.
+            event (SynapseEvent): The membership event.
+            context: The context of the event.
+            is_guest (bool): Whether the sender is a guest.
+            room_hosts ([str]): Homeservers which are likely to already be in
+                the room, and could be danced with in order to join this
+                homeserver for the first time.
+            ratelimit (bool): Whether to rate limit this request.
+        Raises:
+            SynapseError if there was a problem changing the membership.
+        """
+        remote_room_hosts = remote_room_hosts or []
+
+        target_user = UserID.from_string(event.state_key)
+        room_id = event.room_id
+
+        if requester is not None:
+            sender = UserID.from_string(event.sender)
+            assert sender == requester.user, (
+                "Sender (%s) must be same as requester (%s)" %
+                (sender, requester.user)
+            )
+            assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
+        else:
+            requester = Requester(target_user, None, False)
+
+        message_handler = self.hs.get_handlers().message_handler
+        prev_event = message_handler.deduplicate_state_event(event, context)
+        if prev_event is not None:
+            return
+
+        if event.membership == Membership.JOIN:
+            if requester.is_guest and not self._can_guest_join(context.current_state):
+                # This should be an auth check, but guests are a local concept,
+                # so don't really fit into the general auth process.
+                raise AuthError(403, "Guest access not allowed")
+
+        yield message_handler.handle_new_client_event(
+            requester,
+            event,
+            context,
+            extra_users=[target_user],
+            ratelimit=ratelimit,
+        )
+
+        prev_member_event = context.current_state.get(
+            (EventTypes.Member, target_user.to_string()),
+            None
+        )
+
+        if event.membership == Membership.JOIN:
+            if not prev_member_event or prev_member_event.membership != Membership.JOIN:
+                # Only fire user_joined_room if the user has acutally joined the
+                # room. Don't bother if the user is just changing their profile
+                # info.
+                yield user_joined_room(self.distributor, target_user, room_id)
+        elif event.membership == Membership.LEAVE:
+            if prev_member_event and prev_member_event.membership == Membership.JOIN:
+                user_left_room(self.distributor, target_user, room_id)
+
+    def _can_guest_join(self, current_state):
+        """
+        Returns whether a guest can join a room based on its current state.
+        """
+        guest_access = current_state.get((EventTypes.GuestAccess, ""), None)
+        return (
+            guest_access
+            and guest_access.content
+            and "guest_access" in guest_access.content
+            and guest_access.content["guest_access"] == "can_join"
+        )
+
+    @defer.inlineCallbacks
+    def lookup_room_alias(self, room_alias):
+        """
+        Get the room ID associated with a room alias.
+
+        Args:
+            room_alias (RoomAlias): The alias to look up.
+        Returns:
+            A tuple of:
+                The room ID as a RoomID object.
+                Hosts likely to be participating in the room ([str]).
+        Raises:
+            SynapseError if room alias could not be found.
+        """
+        directory_handler = self.hs.get_handlers().directory_handler
+        mapping = yield directory_handler.get_association(room_alias)
+
+        if not mapping:
+            raise SynapseError(404, "No such room alias")
+
+        room_id = mapping["room_id"]
+        servers = mapping["servers"]
+
+        defer.returnValue((RoomID.from_string(room_id), servers))
+
+    @defer.inlineCallbacks
+    def get_inviter(self, user_id, room_id):
+        invite = yield self.store.get_invite_for_user_in_room(
+            user_id=user_id,
+            room_id=room_id,
+        )
+        if invite:
+            defer.returnValue(UserID.from_string(invite.sender))
+
+    @defer.inlineCallbacks
+    def do_3pid_invite(
+            self,
+            room_id,
+            inviter,
+            medium,
+            address,
+            id_server,
+            requester,
+            txn_id
+    ):
+        invitee = yield self._lookup_3pid(
+            id_server, medium, address
+        )
+
+        if invitee:
+            yield self.update_membership(
+                requester,
+                UserID.from_string(invitee),
+                room_id,
+                "invite",
+                txn_id=txn_id,
+            )
+        else:
+            yield self._make_and_store_3pid_invite(
+                requester,
+                id_server,
+                medium,
+                address,
+                room_id,
+                inviter,
+                txn_id=txn_id
+            )
+
+    @defer.inlineCallbacks
+    def _lookup_3pid(self, id_server, medium, address):
+        """Looks up a 3pid in the passed identity server.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
+
+        Returns:
+            str: the matrix ID of the 3pid, or None if it is not recognized.
+        """
+        try:
+            data = yield self.hs.get_simple_http_client().get_json(
+                "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server,),
+                {
+                    "medium": medium,
+                    "address": address,
+                }
+            )
+
+            if "mxid" in data:
+                if "signatures" not in data:
+                    raise AuthError(401, "No signatures on 3pid binding")
+                self.verify_any_signature(data, id_server)
+                defer.returnValue(data["mxid"])
+
+        except IOError as e:
+            logger.warn("Error from identity server lookup: %s" % (e,))
+            defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def verify_any_signature(self, data, server_hostname):
+        if server_hostname not in data["signatures"]:
+            raise AuthError(401, "No signature from server %s" % (server_hostname,))
+        for key_name, signature in data["signatures"][server_hostname].items():
+            key_data = yield self.hs.get_simple_http_client().get_json(
+                "%s%s/_matrix/identity/api/v1/pubkey/%s" %
+                (id_server_scheme, server_hostname, key_name,),
+            )
+            if "public_key" not in key_data:
+                raise AuthError(401, "No public key named %s from %s" %
+                                (key_name, server_hostname,))
+            verify_signed_json(
+                data,
+                server_hostname,
+                decode_verify_key_bytes(key_name, decode_base64(key_data["public_key"]))
+            )
+            return
+
+    @defer.inlineCallbacks
+    def _make_and_store_3pid_invite(
+            self,
+            requester,
+            id_server,
+            medium,
+            address,
+            room_id,
+            user,
+            txn_id
+    ):
+        room_state = yield self.hs.get_state_handler().get_current_state(room_id)
+
+        inviter_display_name = ""
+        inviter_avatar_url = ""
+        member_event = room_state.get((EventTypes.Member, user.to_string()))
+        if member_event:
+            inviter_display_name = member_event.content.get("displayname", "")
+            inviter_avatar_url = member_event.content.get("avatar_url", "")
+
+        canonical_room_alias = ""
+        canonical_alias_event = room_state.get((EventTypes.CanonicalAlias, ""))
+        if canonical_alias_event:
+            canonical_room_alias = canonical_alias_event.content.get("alias", "")
+
+        room_name = ""
+        room_name_event = room_state.get((EventTypes.Name, ""))
+        if room_name_event:
+            room_name = room_name_event.content.get("name", "")
+
+        room_join_rules = ""
+        join_rules_event = room_state.get((EventTypes.JoinRules, ""))
+        if join_rules_event:
+            room_join_rules = join_rules_event.content.get("join_rule", "")
+
+        room_avatar_url = ""
+        room_avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
+        if room_avatar_event:
+            room_avatar_url = room_avatar_event.content.get("url", "")
+
+        token, public_keys, fallback_public_key, display_name = (
+            yield self._ask_id_server_for_third_party_invite(
+                id_server=id_server,
+                medium=medium,
+                address=address,
+                room_id=room_id,
+                inviter_user_id=user.to_string(),
+                room_alias=canonical_room_alias,
+                room_avatar_url=room_avatar_url,
+                room_join_rules=room_join_rules,
+                room_name=room_name,
+                inviter_display_name=inviter_display_name,
+                inviter_avatar_url=inviter_avatar_url
+            )
+        )
+
+        msg_handler = self.hs.get_handlers().message_handler
+        yield msg_handler.create_and_send_nonmember_event(
+            requester,
+            {
+                "type": EventTypes.ThirdPartyInvite,
+                "content": {
+                    "display_name": display_name,
+                    "public_keys": public_keys,
+
+                    # For backwards compatibility:
+                    "key_validity_url": fallback_public_key["key_validity_url"],
+                    "public_key": fallback_public_key["public_key"],
+                },
+                "room_id": room_id,
+                "sender": user.to_string(),
+                "state_key": token,
+            },
+            txn_id=txn_id,
+        )
+
+    @defer.inlineCallbacks
+    def _ask_id_server_for_third_party_invite(
+            self,
+            id_server,
+            medium,
+            address,
+            room_id,
+            inviter_user_id,
+            room_alias,
+            room_avatar_url,
+            room_join_rules,
+            room_name,
+            inviter_display_name,
+            inviter_avatar_url
+    ):
+        """
+        Asks an identity server for a third party invite.
+
+        Args:
+            id_server (str): hostname + optional port for the identity server.
+            medium (str): The literal string "email".
+            address (str): The third party address being invited.
+            room_id (str): The ID of the room to which the user is invited.
+            inviter_user_id (str): The user ID of the inviter.
+            room_alias (str): An alias for the room, for cosmetic notifications.
+            room_avatar_url (str): The URL of the room's avatar, for cosmetic
+                notifications.
+            room_join_rules (str): The join rules of the email (e.g. "public").
+            room_name (str): The m.room.name of the room.
+            inviter_display_name (str): The current display name of the
+                inviter.
+            inviter_avatar_url (str): The URL of the inviter's avatar.
+
+        Returns:
+            A deferred tuple containing:
+                token (str): The token which must be signed to prove authenticity.
+                public_keys ([{"public_key": str, "key_validity_url": str}]):
+                    public_key is a base64-encoded ed25519 public key.
+                fallback_public_key: One element from public_keys.
+                display_name (str): A user-friendly name to represent the invited
+                    user.
+        """
+
+        is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
+            id_server_scheme, id_server,
+        )
+
+        invite_config = {
+            "medium": medium,
+            "address": address,
+            "room_id": room_id,
+            "room_alias": room_alias,
+            "room_avatar_url": room_avatar_url,
+            "room_join_rules": room_join_rules,
+            "room_name": room_name,
+            "sender": inviter_user_id,
+            "sender_display_name": inviter_display_name,
+            "sender_avatar_url": inviter_avatar_url,
+        }
+
+        if self.hs.config.invite_3pid_guest:
+            registration_handler = self.hs.get_handlers().registration_handler
+            guest_access_token = yield registration_handler.guest_access_token_for(
+                medium=medium,
+                address=address,
+                inviter_user_id=inviter_user_id,
+            )
+
+            guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
+                guest_access_token
+            )
+
+            invite_config.update({
+                "guest_access_token": guest_access_token,
+                "guest_user_id": guest_user_info["user"].to_string(),
+            })
+
+        data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
+            is_url,
+            invite_config
+        )
+        # TODO: Check for success
+        token = data["token"]
+        public_keys = data.get("public_keys", [])
+        if "public_key" in data:
+            fallback_public_key = {
+                "public_key": data["public_key"],
+                "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+                    id_server_scheme, id_server,
+                ),
+            }
+        else:
+            fallback_public_key = public_keys[0]
+
+        if not public_keys:
+            public_keys.append(fallback_public_key)
+        display_name = data["display_name"]
+        defer.returnValue((token, public_keys, fallback_public_key, display_name))
+
+    @defer.inlineCallbacks
+    def forget(self, user, room_id):
+        user_id = user.to_string()
+
+        member = yield self.state_handler.get_current_state(
+            room_id=room_id,
+            event_type=EventTypes.Member,
+            state_key=user_id
+        )
+        membership = member.membership if member else None
+
+        if membership is not None and membership != Membership.LEAVE:
+            raise SynapseError(400, "User %s in room %s" % (
+                user_id, room_id
+            ))
+
+        if membership:
+            yield self.store.forget(user_id, room_id)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9937d8dd7f..df75d70fac 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.api.filtering import Filter
 from synapse.api.errors import SynapseError
 from synapse.events.utils import serialize_event
+from synapse.visibility import filter_events_for_client
 
 from unpaddedbase64 import decode_base64, encode_base64
 
@@ -172,8 +173,8 @@ class SearchHandler(BaseHandler):
 
             filtered_events = search_filter.filter([r["event"] for r in results])
 
-            events = yield self._filter_events_for_client(
-                user.to_string(), filtered_events
+            events = yield filter_events_for_client(
+                self.store, user.to_string(), filtered_events
             )
 
             events.sort(key=lambda e: -rank_map[e.event_id])
@@ -223,8 +224,8 @@ class SearchHandler(BaseHandler):
                     r["event"] for r in results
                 ])
 
-                events = yield self._filter_events_for_client(
-                    user.to_string(), filtered_events
+                events = yield filter_events_for_client(
+                    self.store, user.to_string(), filtered_events
                 )
 
                 room_events.extend(events)
@@ -281,12 +282,12 @@ class SearchHandler(BaseHandler):
                     event.room_id, event.event_id, before_limit, after_limit
                 )
 
-                res["events_before"] = yield self._filter_events_for_client(
-                    user.to_string(), res["events_before"]
+                res["events_before"] = yield filter_events_for_client(
+                    self.store, user.to_string(), res["events_before"]
                 )
 
-                res["events_after"] = yield self._filter_events_for_client(
-                    user.to_string(), res["events_after"]
+                res["events_after"] = yield filter_events_for_client(
+                    self.store, user.to_string(), res["events_after"]
                 )
 
                 res["start"] = now_token.copy_and_replace(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 1f6fde8e8a..be26a491ff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,14 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseHandler
-
-from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.async import concurrently_execute
+from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
+from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
+from synapse.visibility import filter_events_for_client
 
 from twisted.internet import defer
 
@@ -35,6 +34,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [
     "user",
     "filter_collection",
     "is_guest",
+    "request_key",
 ])
 
 
@@ -130,14 +130,16 @@ class SyncResult(collections.namedtuple("SyncResult", [
         )
 
 
-class SyncHandler(BaseHandler):
+class SyncHandler(object):
 
     def __init__(self, hs):
-        super(SyncHandler, self).__init__(hs)
+        self.store = hs.get_datastore()
+        self.notifier = hs.get_notifier()
+        self.presence_handler = hs.get_presence_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
+        self.response_cache = ResponseCache()
 
-    @defer.inlineCallbacks
     def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
                                full_state=False):
         """Get the sync for a client if we have new data for it now. Otherwise
@@ -146,7 +148,19 @@ class SyncHandler(BaseHandler):
         Returns:
             A Deferred SyncResult.
         """
+        result = self.response_cache.get(sync_config.request_key)
+        if not result:
+            result = self.response_cache.set(
+                sync_config.request_key,
+                self._wait_for_sync_for_user(
+                    sync_config, since_token, timeout, full_state
+                )
+            )
+        return result
 
+    @defer.inlineCallbacks
+    def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
+                                full_state):
         context = LoggingContext.current_context()
         if context:
             if since_token is None:
@@ -179,197 +193,15 @@ class SyncHandler(BaseHandler):
         Returns:
             A Deferred SyncResult.
         """
-        if since_token is None or full_state:
-            return self.full_state_sync(sync_config, since_token)
-        else:
-            return self.incremental_sync_with_gap(sync_config, since_token)
-
-    @defer.inlineCallbacks
-    def full_state_sync(self, sync_config, timeline_since_token):
-        """Get a sync for a client which is starting without any state.
-
-        If a 'message_since_token' is given, only timeline events which have
-        happened since that token will be returned.
-
-        Returns:
-            A Deferred SyncResult.
-        """
-        now_token = yield self.event_sources.get_current_token()
-
-        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token
-        )
-
-        presence_stream = self.event_sources.sources["presence"]
-        # TODO (mjark): This looks wrong, shouldn't we be getting the presence
-        # UP to the present rather than after the present?
-        pagination_config = PaginationConfig(from_token=now_token)
-        presence, _ = yield presence_stream.get_pagination_rows(
-            user=sync_config.user,
-            pagination_config=pagination_config.get_source_config("presence"),
-            key=None
-        )
-
-        membership_list = (
-            Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
-        )
-
-        room_list = yield self.store.get_rooms_for_user_where_membership_is(
-            user_id=sync_config.user.to_string(),
-            membership_list=membership_list
-        )
-
-        account_data, account_data_by_room = (
-            yield self.store.get_account_data_for_user(
-                sync_config.user.to_string()
-            )
-        )
-
-        account_data['m.push_rules'] = yield self.push_rules_for_user(
-            sync_config.user
-        )
-
-        tags_by_room = yield self.store.get_tags_for_user(
-            sync_config.user.to_string()
-        )
-
-        joined = []
-        invited = []
-        archived = []
-        deferreds = []
-
-        room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
-        for room_list_chunk in room_list_chunks:
-            for event in room_list_chunk:
-                if event.membership == Membership.JOIN:
-                    room_sync_deferred = preserve_fn(
-                        self.full_state_sync_for_joined_room
-                    )(
-                        room_id=event.room_id,
-                        sync_config=sync_config,
-                        now_token=now_token,
-                        timeline_since_token=timeline_since_token,
-                        ephemeral_by_room=ephemeral_by_room,
-                        tags_by_room=tags_by_room,
-                        account_data_by_room=account_data_by_room,
-                    )
-                    room_sync_deferred.addCallback(joined.append)
-                    deferreds.append(room_sync_deferred)
-                elif event.membership == Membership.INVITE:
-                    invite = yield self.store.get_event(event.event_id)
-                    invited.append(InvitedSyncResult(
-                        room_id=event.room_id,
-                        invite=invite,
-                    ))
-                elif event.membership in (Membership.LEAVE, Membership.BAN):
-                    # Always send down rooms we were banned or kicked from.
-                    if not sync_config.filter_collection.include_leave:
-                        if event.membership == Membership.LEAVE:
-                            if sync_config.user.to_string() == event.sender:
-                                continue
-
-                    leave_token = now_token.copy_and_replace(
-                        "room_key", "s%d" % (event.stream_ordering,)
-                    )
-                    room_sync_deferred = preserve_fn(
-                        self.full_state_sync_for_archived_room
-                    )(
-                        sync_config=sync_config,
-                        room_id=event.room_id,
-                        leave_event_id=event.event_id,
-                        leave_token=leave_token,
-                        timeline_since_token=timeline_since_token,
-                        tags_by_room=tags_by_room,
-                        account_data_by_room=account_data_by_room,
-                    )
-                    room_sync_deferred.addCallback(archived.append)
-                    deferreds.append(room_sync_deferred)
-
-            yield defer.gatherResults(
-                deferreds, consumeErrors=True
-            ).addErrback(unwrapFirstError)
-
-        account_data_for_user = sync_config.filter_collection.filter_account_data(
-            self.account_data_for_user(account_data)
-        )
-
-        presence = sync_config.filter_collection.filter_presence(
-            presence
-        )
-
-        defer.returnValue(SyncResult(
-            presence=presence,
-            account_data=account_data_for_user,
-            joined=joined,
-            invited=invited,
-            archived=archived,
-            next_batch=now_token,
-        ))
-
-    @defer.inlineCallbacks
-    def full_state_sync_for_joined_room(self, room_id, sync_config,
-                                        now_token, timeline_since_token,
-                                        ephemeral_by_room, tags_by_room,
-                                        account_data_by_room):
-        """Sync a room for a client which is starting without any state
-        Returns:
-            A Deferred JoinedSyncResult.
-        """
-
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, now_token, since_token=timeline_since_token
-        )
-
-        room_sync = yield self.incremental_sync_with_gap_for_room(
-            room_id, sync_config,
-            now_token=now_token,
-            since_token=timeline_since_token,
-            ephemeral_by_room=ephemeral_by_room,
-            tags_by_room=tags_by_room,
-            account_data_by_room=account_data_by_room,
-            batch=batch,
-            full_state=True,
-        )
-
-        defer.returnValue(room_sync)
+        return self.generate_sync_result(sync_config, since_token, full_state)
 
     @defer.inlineCallbacks
     def push_rules_for_user(self, user):
         user_id = user.to_string()
-        rawrules = yield self.store.get_push_rules_for_user(user_id)
-        enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
-        rules = format_push_rules_for_user(user, rawrules, enabled_map)
+        rules = yield self.store.get_push_rules_for_user(user_id)
+        rules = format_push_rules_for_user(user, rules)
         defer.returnValue(rules)
 
-    def account_data_for_user(self, account_data):
-        account_data_events = []
-
-        for account_data_type, content in account_data.items():
-            account_data_events.append({
-                "type": account_data_type,
-                "content": content,
-            })
-
-        return account_data_events
-
-    def account_data_for_room(self, room_id, tags_by_room, account_data_by_room):
-        account_data_events = []
-        tags = tags_by_room.get(room_id)
-        if tags is not None:
-            account_data_events.append({
-                "type": "m.tag",
-                "content": {"tags": tags},
-            })
-
-        account_data = account_data_by_room.get(room_id, {})
-        for account_data_type, content in account_data.items():
-            account_data_events.append({
-                "type": account_data_type,
-                "content": content,
-            })
-
-        return account_data_events
-
     @defer.inlineCallbacks
     def ephemeral_by_room(self, sync_config, now_token, since_token=None):
         """Get the ephemeral events for each room the user is in
@@ -432,255 +264,44 @@ class SyncHandler(BaseHandler):
 
         defer.returnValue((now_token, ephemeral_by_room))
 
-    def full_state_sync_for_archived_room(self, room_id, sync_config,
-                                          leave_event_id, leave_token,
-                                          timeline_since_token, tags_by_room,
-                                          account_data_by_room):
-        """Sync a room for a client which is starting without any state
-        Returns:
-            A Deferred ArchivedSyncResult.
-        """
-
-        return self.incremental_sync_for_archived_room(
-            sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
-            account_data_by_room, full_state=True, leave_token=leave_token,
-        )
-
     @defer.inlineCallbacks
-    def incremental_sync_with_gap(self, sync_config, since_token):
-        """ Get the incremental delta needed to bring the client up to
-        date with the server.
-        Returns:
-            A Deferred SyncResult.
+    def _load_filtered_recents(self, room_id, sync_config, now_token,
+                               since_token=None, recents=None, newly_joined_room=False):
         """
-        now_token = yield self.event_sources.get_current_token()
-
-        rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-        room_ids = [room.room_id for room in rooms]
-
-        presence_source = self.event_sources.sources["presence"]
-        presence, presence_key = yield presence_source.get_new_events(
-            user=sync_config.user,
-            from_key=since_token.presence_key,
-            limit=sync_config.filter_collection.presence_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("presence_key", presence_key)
-
-        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token, since_token
-        )
-
-        rm_handler = self.hs.get_handlers().room_member_handler
-        app_service = yield self.store.get_app_service_by_user_id(
-            sync_config.user.to_string()
-        )
-        if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
-        else:
-            joined_room_ids = yield rm_handler.get_joined_rooms_for_user(
-                sync_config.user
-            )
-
-        user_id = sync_config.user.to_string()
-
-        timeline_limit = sync_config.filter_collection.timeline_limit()
-
-        tags_by_room = yield self.store.get_updated_tags(
-            user_id,
-            since_token.account_data_key,
-        )
-
-        account_data, account_data_by_room = (
-            yield self.store.get_updated_account_data_for_user(
-                user_id,
-                since_token.account_data_key,
-            )
-        )
-
-        push_rules_changed = yield self.store.have_push_rules_changed_for_user(
-            user_id, int(since_token.push_rules_key)
-        )
-
-        if push_rules_changed:
-            account_data["m.push_rules"] = yield self.push_rules_for_user(
-                sync_config.user
-            )
-
-        # Get a list of membership change events that have happened.
-        rooms_changed = yield self.store.get_membership_changes_for_user(
-            user_id, since_token.room_key, now_token.room_key
-        )
-
-        mem_change_events_by_room_id = {}
-        for event in rooms_changed:
-            mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
-
-        newly_joined_rooms = []
-        archived = []
-        invited = []
-        for room_id, events in mem_change_events_by_room_id.items():
-            non_joins = [e for e in events if e.membership != Membership.JOIN]
-            has_join = len(non_joins) != len(events)
-
-            # We want to figure out if we joined the room at some point since
-            # the last sync (even if we have since left). This is to make sure
-            # we do send down the room, and with full state, where necessary
-            if room_id in joined_room_ids or has_join:
-                old_state = yield self.get_state_at(room_id, since_token)
-                old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
-                if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
-                        newly_joined_rooms.append(room_id)
-
-                if room_id in joined_room_ids:
-                    continue
-
-            if not non_joins:
-                continue
-
-            # Only bother if we're still currently invited
-            should_invite = non_joins[-1].membership == Membership.INVITE
-            if should_invite:
-                room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
-                if room_sync:
-                    invited.append(room_sync)
-
-            # Always include leave/ban events. Just take the last one.
-            # TODO: How do we handle ban -> leave in same batch?
-            leave_events = [
-                e for e in non_joins
-                if e.membership in (Membership.LEAVE, Membership.BAN)
-            ]
-
-            if leave_events:
-                leave_event = leave_events[-1]
-                room_sync = yield self.incremental_sync_for_archived_room(
-                    sync_config, room_id, leave_event.event_id, since_token,
-                    tags_by_room, account_data_by_room,
-                    full_state=room_id in newly_joined_rooms
-                )
-                if room_sync:
-                    archived.append(room_sync)
-
-        # Get all events for rooms we're currently joined to.
-        room_to_events = yield self.store.get_room_events_stream_for_rooms(
-            room_ids=joined_room_ids,
-            from_key=since_token.room_key,
-            to_key=now_token.room_key,
-            limit=timeline_limit + 1,
-        )
-
-        joined = []
-        # We loop through all room ids, even if there are no new events, in case
-        # there are non room events taht we need to notify about.
-        for room_id in joined_room_ids:
-            room_entry = room_to_events.get(room_id, None)
-
-            if room_entry:
-                events, start_key = room_entry
-
-                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
-
-                newly_joined_room = room_id in newly_joined_rooms
-                full_state = newly_joined_room
-
-                batch = yield self.load_filtered_recents(
-                    room_id, sync_config, prev_batch_token,
-                    since_token=since_token,
-                    recents=events,
-                    newly_joined_room=newly_joined_room,
-                )
-            else:
-                batch = TimelineBatch(
-                    events=[],
-                    prev_batch=since_token,
-                    limited=False,
-                )
-                full_state = False
-
-            room_sync = yield self.incremental_sync_with_gap_for_room(
-                room_id=room_id,
-                sync_config=sync_config,
-                since_token=since_token,
-                now_token=now_token,
-                ephemeral_by_room=ephemeral_by_room,
-                tags_by_room=tags_by_room,
-                account_data_by_room=account_data_by_room,
-                batch=batch,
-                full_state=full_state,
-            )
-            if room_sync:
-                joined.append(room_sync)
-
-        # For each newly joined room, we want to send down presence of
-        # existing users.
-        presence_handler = self.hs.get_handlers().presence_handler
-        extra_presence_users = set()
-        for room_id in newly_joined_rooms:
-            users = yield self.store.get_users_in_room(event.room_id)
-            extra_presence_users.update(users)
-
-        # For each new member, send down presence.
-        for joined_sync in joined:
-            it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values())
-            for event in it:
-                if event.type == EventTypes.Member:
-                    if event.membership == Membership.JOIN:
-                        extra_presence_users.add(event.state_key)
-
-        states = yield presence_handler.get_states(
-            [u for u in extra_presence_users if u != user_id],
-            as_event=True,
-        )
-        presence.extend(states)
-
-        account_data_for_user = sync_config.filter_collection.filter_account_data(
-            self.account_data_for_user(account_data)
-        )
-
-        presence = sync_config.filter_collection.filter_presence(
-            presence
-        )
-
-        defer.returnValue(SyncResult(
-            presence=presence,
-            account_data=account_data_for_user,
-            joined=joined,
-            invited=invited,
-            archived=archived,
-            next_batch=now_token,
-        ))
-
-    @defer.inlineCallbacks
-    def load_filtered_recents(self, room_id, sync_config, now_token,
-                              since_token=None, recents=None, newly_joined_room=False):
-        """
-        :returns a Deferred TimelineBatch
+        Returns:
+            a Deferred TimelineBatch
         """
         with Measure(self.clock, "load_filtered_recents"):
-            filtering_factor = 2
             timeline_limit = sync_config.filter_collection.timeline_limit()
-            load_limit = max(timeline_limit * filtering_factor, 10)
-            max_repeat = 5  # Only try a few times per room, otherwise
-            room_key = now_token.room_key
-            end_key = room_key
 
             if recents is None or newly_joined_room or timeline_limit < len(recents):
                 limited = True
             else:
                 limited = False
 
-            if recents is not None:
+            if recents:
                 recents = sync_config.filter_collection.filter_room_timeline(recents)
-                recents = yield self._filter_events_for_client(
+                recents = yield filter_events_for_client(
+                    self.store,
                     sync_config.user.to_string(),
                     recents,
                 )
             else:
                 recents = []
 
+            if not limited:
+                defer.returnValue(TimelineBatch(
+                    events=recents,
+                    prev_batch=now_token,
+                    limited=False
+                ))
+
+            filtering_factor = 2
+            load_limit = max(timeline_limit * filtering_factor, 10)
+            max_repeat = 5  # Only try a few times per room, otherwise
+            room_key = now_token.room_key
+            end_key = room_key
+
             since_key = None
             if since_token and not newly_joined_room:
                 since_key = since_token.room_key
@@ -695,7 +316,8 @@ class SyncHandler(BaseHandler):
                 loaded_recents = sync_config.filter_collection.filter_room_timeline(
                     events
                 )
-                loaded_recents = yield self._filter_events_for_client(
+                loaded_recents = yield filter_events_for_client(
+                    self.store,
                     sync_config.user.to_string(),
                     loaded_recents,
                 )
@@ -723,109 +345,15 @@ class SyncHandler(BaseHandler):
         ))
 
     @defer.inlineCallbacks
-    def incremental_sync_with_gap_for_room(self, room_id, sync_config,
-                                           since_token, now_token,
-                                           ephemeral_by_room, tags_by_room,
-                                           account_data_by_room,
-                                           batch, full_state=False):
-        state = yield self.compute_state_delta(
-            room_id, batch, sync_config, since_token, now_token,
-            full_state=full_state
-        )
-
-        account_data = self.account_data_for_room(
-            room_id, tags_by_room, account_data_by_room
-        )
-
-        account_data = sync_config.filter_collection.filter_room_account_data(
-            account_data
-        )
-
-        ephemeral = sync_config.filter_collection.filter_room_ephemeral(
-            ephemeral_by_room.get(room_id, [])
-        )
-
-        unread_notifications = {}
-        room_sync = JoinedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=state,
-            ephemeral=ephemeral,
-            account_data=account_data,
-            unread_notifications=unread_notifications,
-        )
-
-        if room_sync:
-            notifs = yield self.unread_notifs_for_room_id(
-                room_id, sync_config
-            )
-
-            if notifs is not None:
-                unread_notifications["notification_count"] = notifs["notify_count"]
-                unread_notifications["highlight_count"] = notifs["highlight_count"]
-
-        logger.debug("Room sync: %r", room_sync)
-
-        defer.returnValue(room_sync)
-
-    @defer.inlineCallbacks
-    def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
-                                           since_token, tags_by_room,
-                                           account_data_by_room, full_state,
-                                           leave_token=None):
-        """ Get the incremental delta needed to bring the client up to date for
-        the archived room.
-        Returns:
-            A Deferred ArchivedSyncResult
-        """
-
-        if not leave_token:
-            stream_token = yield self.store.get_stream_token_for_event(
-                leave_event_id
-            )
-
-            leave_token = since_token.copy_and_replace("room_key", stream_token)
-
-        if since_token and since_token.is_after(leave_token):
-            defer.returnValue(None)
-
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, leave_token, since_token,
-        )
-
-        logger.debug("Recents %r", batch)
-
-        state_events_delta = yield self.compute_state_delta(
-            room_id, batch, sync_config, since_token, leave_token,
-            full_state=full_state
-        )
-
-        account_data = self.account_data_for_room(
-            room_id, tags_by_room, account_data_by_room
-        )
-
-        account_data = sync_config.filter_collection.filter_room_account_data(
-            account_data
-        )
-
-        room_sync = ArchivedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=state_events_delta,
-            account_data=account_data,
-        )
-
-        logger.debug("Room sync: %r", room_sync)
-
-        defer.returnValue(room_sync)
-
-    @defer.inlineCallbacks
     def get_state_after_event(self, event):
         """
         Get the room state after the given event
 
-        :param synapse.events.EventBase event: event of interest
-        :return: A Deferred map from ((type, state_key)->Event)
+        Args:
+            event(synapse.events.EventBase): event of interest
+
+        Returns:
+            A Deferred map from ((type, state_key)->Event)
         """
         state = yield self.store.get_state_for_event(event.event_id)
         if event.is_state():
@@ -836,9 +364,13 @@ class SyncHandler(BaseHandler):
     @defer.inlineCallbacks
     def get_state_at(self, room_id, stream_position):
         """ Get the room state at a particular stream position
-        :param str room_id: room for which to get state
-        :param StreamToken stream_position: point at which to get state
-        :returns: A Deferred map from ((type, state_key)->Event)
+
+        Args:
+            room_id(str): room for which to get state
+            stream_position(StreamToken): point at which to get state
+
+        Returns:
+            A Deferred map from ((type, state_key)->Event)
         """
         last_events, token = yield self.store.get_recent_events_for_room(
             room_id, end_token=stream_position.room_key, limit=1,
@@ -859,15 +391,18 @@ class SyncHandler(BaseHandler):
         """ Works out the differnce in state between the start of the timeline
         and the previous sync.
 
-        :param str room_id
-        :param TimelineBatch batch: The timeline batch for the room that will
-            be sent to the user.
-        :param sync_config
-        :param str since_token: Token of the end of the previous batch. May be None.
-        :param str now_token: Token of the end of the current batch.
-        :param bool full_state: Whether to force returning the full state.
+        Args:
+            room_id(str):
+            batch(synapse.handlers.sync.TimelineBatch): The timeline batch for
+                the room that will be sent to the user.
+            sync_config(synapse.handlers.sync.SyncConfig):
+            since_token(str|None): Token of the end of the previous batch. May
+                be None.
+            now_token(str): Token of the end of the current batch.
+            full_state(bool): Whether to force returning the full state.
 
-        :returns A new event dictionary
+        Returns:
+             A deferred new event dictionary
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
@@ -934,24 +469,6 @@ class SyncHandler(BaseHandler):
                 for e in sync_config.filter_collection.filter_room_state(state.values())
             })
 
-    def check_joined_room(self, sync_config, state_delta):
-        """
-        Check if the user has just joined the given room (so should
-        be given the full state)
-
-        :param sync_config:
-        :param dict[(str,str), synapse.events.FrozenEvent] state_delta: the
-           difference in state since the last sync
-
-        :returns A deferred Tuple (state_delta, limited)
-        """
-        join_event = state_delta.get((
-            EventTypes.Member, sync_config.user.to_string()), None)
-        if join_event is not None:
-            if join_event.content["membership"] == Membership.JOIN:
-                return True
-        return False
-
     @defer.inlineCallbacks
     def unread_notifs_for_room_id(self, room_id, sync_config):
         with Measure(self.clock, "unread_notifs_for_room_id"):
@@ -972,6 +489,551 @@ class SyncHandler(BaseHandler):
             # count is whatever it was last time.
             defer.returnValue(None)
 
+    @defer.inlineCallbacks
+    def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+        """Generates a sync result.
+
+        Args:
+            sync_config (SyncConfig)
+            since_token (StreamToken)
+            full_state (bool)
+
+        Returns:
+            Deferred(SyncResult)
+        """
+
+        # NB: The now_token gets changed by some of the generate_sync_* methods,
+        # this is due to some of the underlying streams not supporting the ability
+        # to query up to a given point.
+        # Always use the `now_token` in `SyncResultBuilder`
+        now_token = yield self.event_sources.get_current_token()
+
+        sync_result_builder = SyncResultBuilder(
+            sync_config, full_state,
+            since_token=since_token,
+            now_token=now_token,
+        )
+
+        account_data_by_room = yield self._generate_sync_entry_for_account_data(
+            sync_result_builder
+        )
+
+        res = yield self._generate_sync_entry_for_rooms(
+            sync_result_builder, account_data_by_room
+        )
+        newly_joined_rooms, newly_joined_users = res
+
+        yield self._generate_sync_entry_for_presence(
+            sync_result_builder, newly_joined_rooms, newly_joined_users
+        )
+
+        defer.returnValue(SyncResult(
+            presence=sync_result_builder.presence,
+            account_data=sync_result_builder.account_data,
+            joined=sync_result_builder.joined,
+            invited=sync_result_builder.invited,
+            archived=sync_result_builder.archived,
+            next_batch=sync_result_builder.now_token,
+        ))
+
+    @defer.inlineCallbacks
+    def _generate_sync_entry_for_account_data(self, sync_result_builder):
+        """Generates the account data portion of the sync response. Populates
+        `sync_result_builder` with the result.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+
+        Returns:
+            Deferred(dict): A dictionary containing the per room account data.
+        """
+        sync_config = sync_result_builder.sync_config
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+
+        if since_token and not sync_result_builder.full_state:
+            account_data, account_data_by_room = (
+                yield self.store.get_updated_account_data_for_user(
+                    user_id,
+                    since_token.account_data_key,
+                )
+            )
+
+            push_rules_changed = yield self.store.have_push_rules_changed_for_user(
+                user_id, int(since_token.push_rules_key)
+            )
+
+            if push_rules_changed:
+                account_data["m.push_rules"] = yield self.push_rules_for_user(
+                    sync_config.user
+                )
+        else:
+            account_data, account_data_by_room = (
+                yield self.store.get_account_data_for_user(
+                    sync_config.user.to_string()
+                )
+            )
+
+            account_data['m.push_rules'] = yield self.push_rules_for_user(
+                sync_config.user
+            )
+
+        account_data_for_user = sync_config.filter_collection.filter_account_data([
+            {"type": account_data_type, "content": content}
+            for account_data_type, content in account_data.items()
+        ])
+
+        sync_result_builder.account_data = account_data_for_user
+
+        defer.returnValue(account_data_by_room)
+
+    @defer.inlineCallbacks
+    def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
+                                          newly_joined_users):
+        """Generates the presence portion of the sync response. Populates the
+        `sync_result_builder` with the result.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            newly_joined_rooms(list): List of rooms that the user has joined
+                since the last sync (or empty if an initial sync)
+            newly_joined_users(list): List of users that have joined rooms
+                since the last sync (or empty if an initial sync)
+        """
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+        user = sync_result_builder.sync_config.user
+
+        presence_source = self.event_sources.sources["presence"]
+
+        since_token = sync_result_builder.since_token
+        if since_token and not sync_result_builder.full_state:
+            presence_key = since_token.presence_key
+            include_offline = True
+        else:
+            presence_key = None
+            include_offline = False
+
+        presence, presence_key = yield presence_source.get_new_events(
+            user=user,
+            from_key=presence_key,
+            is_guest=sync_config.is_guest,
+            include_offline=include_offline,
+        )
+        sync_result_builder.now_token = now_token.copy_and_replace(
+            "presence_key", presence_key
+        )
+
+        extra_users_ids = set(newly_joined_users)
+        for room_id in newly_joined_rooms:
+            users = yield self.store.get_users_in_room(room_id)
+            extra_users_ids.update(users)
+        extra_users_ids.discard(user.to_string())
+
+        states = yield self.presence_handler.get_states(
+            extra_users_ids,
+            as_event=True,
+        )
+        presence.extend(states)
+
+        # Deduplicate the presence entries so that there's at most one per user
+        presence = {p["content"]["user_id"]: p for p in presence}.values()
+
+        presence = sync_config.filter_collection.filter_presence(
+            presence
+        )
+
+        sync_result_builder.presence = presence
+
+    @defer.inlineCallbacks
+    def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
+        """Generates the rooms portion of the sync response. Populates the
+        `sync_result_builder` with the result.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            account_data_by_room(dict): Dictionary of per room account data
+
+        Returns:
+            Deferred(tuple): Returns a 2-tuple of
+            `(newly_joined_rooms, newly_joined_users)`
+        """
+        user_id = sync_result_builder.sync_config.user.to_string()
+
+        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+            sync_result_builder.sync_config,
+            now_token=sync_result_builder.now_token,
+            since_token=sync_result_builder.since_token,
+        )
+        sync_result_builder.now_token = now_token
+
+        ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
+            "m.ignored_user_list", user_id=user_id,
+        )
+
+        if ignored_account_data:
+            ignored_users = ignored_account_data.get("ignored_users", {}).keys()
+        else:
+            ignored_users = frozenset()
+
+        if sync_result_builder.since_token:
+            res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
+            room_entries, invited, newly_joined_rooms = res
+
+            tags_by_room = yield self.store.get_updated_tags(
+                user_id,
+                sync_result_builder.since_token.account_data_key,
+            )
+        else:
+            res = yield self._get_all_rooms(sync_result_builder, ignored_users)
+            room_entries, invited, newly_joined_rooms = res
+
+            tags_by_room = yield self.store.get_tags_for_user(user_id)
+
+        def handle_room_entries(room_entry):
+            return self._generate_room_entry(
+                sync_result_builder,
+                ignored_users,
+                room_entry,
+                ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
+                tags=tags_by_room.get(room_entry.room_id),
+                account_data=account_data_by_room.get(room_entry.room_id, {}),
+                always_include=sync_result_builder.full_state,
+            )
+
+        yield concurrently_execute(handle_room_entries, room_entries, 10)
+
+        sync_result_builder.invited.extend(invited)
+
+        # Now we want to get any newly joined users
+        newly_joined_users = set()
+        if sync_result_builder.since_token:
+            for joined_sync in sync_result_builder.joined:
+                it = itertools.chain(
+                    joined_sync.timeline.events, joined_sync.state.values()
+                )
+                for event in it:
+                    if event.type == EventTypes.Member:
+                        if event.membership == Membership.JOIN:
+                            newly_joined_users.add(event.state_key)
+
+        defer.returnValue((newly_joined_rooms, newly_joined_users))
+
+    @defer.inlineCallbacks
+    def _get_rooms_changed(self, sync_result_builder, ignored_users):
+        """Gets the the changes that have happened since the last sync.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            ignored_users(set(str)): Set of users ignored by user.
+
+        Returns:
+            Deferred(tuple): Returns a tuple of the form:
+            `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)`
+        """
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+
+        assert since_token
+
+        app_service = yield self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            rooms = yield self.store.get_app_service_rooms(app_service)
+            joined_room_ids = set(r.room_id for r in rooms)
+        else:
+            rooms = yield self.store.get_rooms_for_user(user_id)
+            joined_room_ids = set(r.room_id for r in rooms)
+
+        # Get a list of membership change events that have happened.
+        rooms_changed = yield self.store.get_membership_changes_for_user(
+            user_id, since_token.room_key, now_token.room_key
+        )
+
+        mem_change_events_by_room_id = {}
+        for event in rooms_changed:
+            mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
+
+        newly_joined_rooms = []
+        room_entries = []
+        invited = []
+        for room_id, events in mem_change_events_by_room_id.items():
+            non_joins = [e for e in events if e.membership != Membership.JOIN]
+            has_join = len(non_joins) != len(events)
+
+            # We want to figure out if we joined the room at some point since
+            # the last sync (even if we have since left). This is to make sure
+            # we do send down the room, and with full state, where necessary
+            if room_id in joined_room_ids or has_join:
+                old_state = yield self.get_state_at(room_id, since_token)
+                old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+                if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
+                    newly_joined_rooms.append(room_id)
+
+                if room_id in joined_room_ids:
+                    continue
+
+            if not non_joins:
+                continue
+
+            # Only bother if we're still currently invited
+            should_invite = non_joins[-1].membership == Membership.INVITE
+            if should_invite:
+                if event.sender not in ignored_users:
+                    room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+                    if room_sync:
+                        invited.append(room_sync)
+
+            # Always include leave/ban events. Just take the last one.
+            # TODO: How do we handle ban -> leave in same batch?
+            leave_events = [
+                e for e in non_joins
+                if e.membership in (Membership.LEAVE, Membership.BAN)
+            ]
+
+            if leave_events:
+                leave_event = leave_events[-1]
+                leave_stream_token = yield self.store.get_stream_token_for_event(
+                    leave_event.event_id
+                )
+                leave_token = since_token.copy_and_replace(
+                    "room_key", leave_stream_token
+                )
+
+                if since_token and since_token.is_after(leave_token):
+                    continue
+
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    rtype="archived",
+                    events=None,
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=since_token,
+                    upto_token=leave_token,
+                ))
+
+        timeline_limit = sync_config.filter_collection.timeline_limit()
+
+        # Get all events for rooms we're currently joined to.
+        room_to_events = yield self.store.get_room_events_stream_for_rooms(
+            room_ids=joined_room_ids,
+            from_key=since_token.room_key,
+            to_key=now_token.room_key,
+            limit=timeline_limit + 1,
+        )
+
+        # We loop through all room ids, even if there are no new events, in case
+        # there are non room events taht we need to notify about.
+        for room_id in joined_room_ids:
+            room_entry = room_to_events.get(room_id, None)
+
+            if room_entry:
+                events, start_key = room_entry
+
+                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    rtype="joined",
+                    events=events,
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=None if room_id in newly_joined_rooms else since_token,
+                    upto_token=prev_batch_token,
+                ))
+            else:
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    rtype="joined",
+                    events=[],
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=since_token,
+                    upto_token=since_token,
+                ))
+
+        defer.returnValue((room_entries, invited, newly_joined_rooms))
+
+    @defer.inlineCallbacks
+    def _get_all_rooms(self, sync_result_builder, ignored_users):
+        """Returns entries for all rooms for the user.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            ignored_users(set(str)): Set of users ignored by user.
+
+        Returns:
+            Deferred(tuple): Returns a tuple of the form:
+            `([RoomSyncResultBuilder], [InvitedSyncResult], [])`
+        """
+
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+
+        membership_list = (
+            Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
+        )
+
+        room_list = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id=user_id,
+            membership_list=membership_list
+        )
+
+        room_entries = []
+        invited = []
+
+        for event in room_list:
+            if event.membership == Membership.JOIN:
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=event.room_id,
+                    rtype="joined",
+                    events=None,
+                    newly_joined=False,
+                    full_state=True,
+                    since_token=since_token,
+                    upto_token=now_token,
+                ))
+            elif event.membership == Membership.INVITE:
+                if event.sender in ignored_users:
+                    continue
+                invite = yield self.store.get_event(event.event_id)
+                invited.append(InvitedSyncResult(
+                    room_id=event.room_id,
+                    invite=invite,
+                ))
+            elif event.membership in (Membership.LEAVE, Membership.BAN):
+                # Always send down rooms we were banned or kicked from.
+                if not sync_config.filter_collection.include_leave:
+                    if event.membership == Membership.LEAVE:
+                        if user_id == event.sender:
+                            continue
+
+                leave_token = now_token.copy_and_replace(
+                    "room_key", "s%d" % (event.stream_ordering,)
+                )
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=event.room_id,
+                    rtype="archived",
+                    events=None,
+                    newly_joined=False,
+                    full_state=True,
+                    since_token=since_token,
+                    upto_token=leave_token,
+                ))
+
+        defer.returnValue((room_entries, invited, []))
+
+    @defer.inlineCallbacks
+    def _generate_room_entry(self, sync_result_builder, ignored_users,
+                             room_builder, ephemeral, tags, account_data,
+                             always_include=False):
+        """Populates the `joined` and `archived` section of `sync_result_builder`
+        based on the `room_builder`.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            ignored_users(set(str)): Set of users ignored by user.
+            room_builder(RoomSyncResultBuilder)
+            ephemeral(list): List of new ephemeral events for room
+            tags(list): List of *all* tags for room, or None if there has been
+                no change.
+            account_data(list): List of new account data for room
+            always_include(bool): Always include this room in the sync response,
+                even if empty.
+        """
+        newly_joined = room_builder.newly_joined
+        full_state = (
+            room_builder.full_state
+            or newly_joined
+            or sync_result_builder.full_state
+        )
+        events = room_builder.events
+
+        # We want to shortcut out as early as possible.
+        if not (always_include or account_data or ephemeral or full_state):
+            if events == [] and tags is None:
+                return
+
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+
+        room_id = room_builder.room_id
+        since_token = room_builder.since_token
+        upto_token = room_builder.upto_token
+
+        batch = yield self._load_filtered_recents(
+            room_id, sync_config,
+            now_token=upto_token,
+            since_token=since_token,
+            recents=events,
+            newly_joined_room=newly_joined,
+        )
+
+        account_data_events = []
+        if tags is not None:
+            account_data_events.append({
+                "type": "m.tag",
+                "content": {"tags": tags},
+            })
+
+        for account_data_type, content in account_data.items():
+            account_data_events.append({
+                "type": account_data_type,
+                "content": content,
+            })
+
+        account_data = sync_config.filter_collection.filter_room_account_data(
+            account_data_events
+        )
+
+        ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+
+        if not (always_include or batch or account_data or ephemeral or full_state):
+            return
+
+        state = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, now_token,
+            full_state=full_state
+        )
+
+        if room_builder.rtype == "joined":
+            unread_notifications = {}
+            room_sync = JoinedSyncResult(
+                room_id=room_id,
+                timeline=batch,
+                state=state,
+                ephemeral=ephemeral,
+                account_data=account_data_events,
+                unread_notifications=unread_notifications,
+            )
+
+            if room_sync or always_include:
+                notifs = yield self.unread_notifs_for_room_id(
+                    room_id, sync_config
+                )
+
+                if notifs is not None:
+                    unread_notifications["notification_count"] = notifs["notify_count"]
+                    unread_notifications["highlight_count"] = notifs["highlight_count"]
+
+                sync_result_builder.joined.append(room_sync)
+        elif room_builder.rtype == "archived":
+            room_sync = ArchivedSyncResult(
+                room_id=room_id,
+                timeline=batch,
+                state=state,
+                account_data=account_data,
+            )
+            if room_sync or always_include:
+                sync_result_builder.archived.append(room_sync)
+        else:
+            raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+
 
 def _action_has_highlight(actions):
     for action in actions:
@@ -1019,3 +1081,51 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
         (e.type, e.state_key): e
         for e in evs
     }
+
+
+class SyncResultBuilder(object):
+    "Used to help build up a new SyncResult for a user"
+    def __init__(self, sync_config, full_state, since_token, now_token):
+        """
+        Args:
+            sync_config(SyncConfig)
+            full_state(bool): The full_state flag as specified by user
+            since_token(StreamToken): The token supplied by user, or None.
+            now_token(StreamToken): The token to sync up to.
+        """
+        self.sync_config = sync_config
+        self.full_state = full_state
+        self.since_token = since_token
+        self.now_token = now_token
+
+        self.presence = []
+        self.account_data = []
+        self.joined = []
+        self.invited = []
+        self.archived = []
+
+
+class RoomSyncResultBuilder(object):
+    """Stores information needed to create either a `JoinedSyncResult` or
+    `ArchivedSyncResult`.
+    """
+    def __init__(self, room_id, rtype, events, newly_joined, full_state,
+                 since_token, upto_token):
+        """
+        Args:
+            room_id(str)
+            rtype(str): One of `"joined"` or `"archived"`
+            events(list): List of events to include in the room, (more events
+                may be added when generating result).
+            newly_joined(bool): If the user has newly joined the room
+            full_state(bool): Whether the full state should be sent in result
+            since_token(StreamToken): Earliest point to return events from, or None
+            upto_token(StreamToken): Latest point to return events from.
+        """
+        self.room_id = room_id
+        self.rtype = rtype
+        self.events = events
+        self.newly_joined = newly_joined
+        self.full_state = full_state
+        self.since_token = since_token
+        self.upto_token = upto_token
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 8ce27f49ec..861b8f7989 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,8 +15,6 @@
 
 from twisted.internet import defer
 
-from ._base import BaseHandler
-
 from synapse.api.errors import SynapseError, AuthError
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.metrics import Measure
@@ -32,14 +30,16 @@ logger = logging.getLogger(__name__)
 
 # A tiny object useful for storing a user's membership in a room, as a mapping
 # key
-RoomMember = namedtuple("RoomMember", ("room_id", "user"))
+RoomMember = namedtuple("RoomMember", ("room_id", "user_id"))
 
 
-class TypingNotificationHandler(BaseHandler):
+class TypingHandler(object):
     def __init__(self, hs):
-        super(TypingNotificationHandler, self).__init__(hs)
-
-        self.homeserver = hs
+        self.store = hs.get_datastore()
+        self.server_name = hs.config.server_name
+        self.auth = hs.get_auth()
+        self.is_mine_id = hs.is_mine_id
+        self.notifier = hs.get_notifier()
 
         self.clock = hs.get_clock()
 
@@ -67,20 +67,23 @@ class TypingNotificationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def started_typing(self, target_user, auth_user, room_id, timeout):
-        if not self.hs.is_mine(target_user):
+        target_user_id = target_user.to_string()
+        auth_user_id = auth_user.to_string()
+
+        if not self.is_mine_id(target_user_id):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if target_user != auth_user:
+        if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
 
-        yield self.auth.check_joined_room(room_id, target_user.to_string())
+        yield self.auth.check_joined_room(room_id, target_user_id)
 
         logger.debug(
-            "%s has started typing in %s", target_user.to_string(), room_id
+            "%s has started typing in %s", target_user_id, room_id
         )
 
         until = self.clock.time_msec() + timeout
-        member = RoomMember(room_id=room_id, user=target_user)
+        member = RoomMember(room_id=room_id, user_id=target_user_id)
 
         was_present = member in self._member_typing_until
 
@@ -104,25 +107,28 @@ class TypingNotificationHandler(BaseHandler):
 
         yield self._push_update(
             room_id=room_id,
-            user=target_user,
+            user_id=target_user_id,
             typing=True,
         )
 
     @defer.inlineCallbacks
     def stopped_typing(self, target_user, auth_user, room_id):
-        if not self.hs.is_mine(target_user):
+        target_user_id = target_user.to_string()
+        auth_user_id = auth_user.to_string()
+
+        if not self.is_mine_id(target_user_id):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if target_user != auth_user:
+        if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
 
-        yield self.auth.check_joined_room(room_id, target_user.to_string())
+        yield self.auth.check_joined_room(room_id, target_user_id)
 
         logger.debug(
-            "%s has stopped typing in %s", target_user.to_string(), room_id
+            "%s has stopped typing in %s", target_user_id, room_id
         )
 
-        member = RoomMember(room_id=room_id, user=target_user)
+        member = RoomMember(room_id=room_id, user_id=target_user_id)
 
         if member in self._member_typing_timer:
             self.clock.cancel_call_later(self._member_typing_timer[member])
@@ -132,8 +138,9 @@ class TypingNotificationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
-        if self.hs.is_mine(user):
-            member = RoomMember(room_id=room_id, user=user)
+        user_id = user.to_string()
+        if self.is_mine_id(user_id):
+            member = RoomMember(room_id=room_id, user_id=user_id)
             yield self._stopped_typing(member)
 
     @defer.inlineCallbacks
@@ -144,7 +151,7 @@ class TypingNotificationHandler(BaseHandler):
 
         yield self._push_update(
             room_id=member.room_id,
-            user=member.user,
+            user_id=member.user_id,
             typing=False,
         )
 
@@ -156,61 +163,53 @@ class TypingNotificationHandler(BaseHandler):
             del self._member_typing_timer[member]
 
     @defer.inlineCallbacks
-    def _push_update(self, room_id, user, typing):
-        localusers = set()
-        remotedomains = set()
-
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(
-            room_id, localusers=localusers, remotedomains=remotedomains
-        )
-
-        if localusers:
-            self._push_update_local(
-                room_id=room_id,
-                user=user,
-                typing=typing
-            )
+    def _push_update(self, room_id, user_id, typing):
+        domains = yield self.store.get_joined_hosts_for_room(room_id)
 
         deferreds = []
-        for domain in remotedomains:
-            deferreds.append(self.federation.send_edu(
-                destination=domain,
-                edu_type="m.typing",
-                content={
-                    "room_id": room_id,
-                    "user_id": user.to_string(),
-                    "typing": typing,
-                },
-            ))
+        for domain in domains:
+            if domain == self.server_name:
+                self._push_update_local(
+                    room_id=room_id,
+                    user_id=user_id,
+                    typing=typing
+                )
+            else:
+                deferreds.append(self.federation.send_edu(
+                    destination=domain,
+                    edu_type="m.typing",
+                    content={
+                        "room_id": room_id,
+                        "user_id": user_id,
+                        "typing": typing,
+                    },
+                ))
 
         yield defer.DeferredList(deferreds, consumeErrors=True)
 
     @defer.inlineCallbacks
     def _recv_edu(self, origin, content):
         room_id = content["room_id"]
-        user = UserID.from_string(content["user_id"])
+        user_id = content["user_id"]
 
-        localusers = set()
+        # Check that the string is a valid user id
+        UserID.from_string(user_id)
 
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(
-            room_id, localusers=localusers
-        )
+        domains = yield self.store.get_joined_hosts_for_room(room_id)
 
-        if localusers:
+        if self.server_name in domains:
             self._push_update_local(
                 room_id=room_id,
-                user=user,
+                user_id=user_id,
                 typing=content["typing"]
             )
 
-    def _push_update_local(self, room_id, user, typing):
+    def _push_update_local(self, room_id, user_id, typing):
         room_set = self._room_typing.setdefault(room_id, set())
         if typing:
-            room_set.add(user)
+            room_set.add(user_id)
         else:
-            room_set.discard(user)
+            room_set.discard(user_id)
 
         self._latest_room_serial += 1
         self._room_serials[room_id] = self._latest_room_serial
@@ -226,9 +225,7 @@ class TypingNotificationHandler(BaseHandler):
         for room_id, serial in self._room_serials.items():
             if last_id < serial and serial <= current_id:
                 typing = self._room_typing[room_id]
-                typing_bytes = json.dumps([
-                    u.to_string() for u in typing
-                ], ensure_ascii=False)
+                typing_bytes = json.dumps(list(typing), ensure_ascii=False)
                 rows.append((serial, room_id, typing_bytes))
         rows.sort()
         return rows
@@ -238,34 +235,26 @@ class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
         self.clock = hs.get_clock()
-        self._handler = None
-        self._room_member_handler = None
-
-    def handler(self):
-        # Avoid cyclic dependency in handler setup
-        if not self._handler:
-            self._handler = self.hs.get_handlers().typing_notification_handler
-        return self._handler
-
-    def room_member_handler(self):
-        if not self._room_member_handler:
-            self._room_member_handler = self.hs.get_handlers().room_member_handler
-        return self._room_member_handler
+        # We can't call get_typing_handler here because there's a cycle:
+        #
+        #   Typing -> Notifier -> TypingNotificationEventSource -> Typing
+        #
+        self.get_typing_handler = hs.get_typing_handler
 
     def _make_event_for(self, room_id):
-        typing = self.handler()._room_typing[room_id]
+        typing = self.get_typing_handler()._room_typing[room_id]
         return {
             "type": "m.typing",
             "room_id": room_id,
             "content": {
-                "user_ids": [u.to_string() for u in typing],
+                "user_ids": list(typing),
             },
         }
 
     def get_new_events(self, from_key, room_ids, **kwargs):
         with Measure(self.clock, "typing.get_new_events"):
             from_key = int(from_key)
-            handler = self.handler()
+            handler = self.get_typing_handler()
 
             events = []
             for room_id in room_ids:
@@ -279,7 +268,7 @@ class TypingNotificationEventSource(object):
             return events, handler._latest_room_serial
 
     def get_current_key(self):
-        return self.handler()._latest_room_serial
+        return self.get_typing_handler()._latest_room_serial
 
     def get_pagination_rows(self, user, pagination_config, key):
         return ([], pagination_config.from_key)