summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py22
-rw-r--r--synapse/handlers/_base.py368
-rw-r--r--synapse/handlers/appservice.py15
-rw-r--r--synapse/handlers/auth.py23
-rw-r--r--synapse/handlers/directory.py3
-rw-r--r--synapse/handlers/events.py2
-rw-r--r--synapse/handlers/federation.py31
-rw-r--r--synapse/handlers/message.py224
-rw-r--r--synapse/handlers/presence.py214
-rw-r--r--synapse/handlers/receipts.py11
-rw-r--r--synapse/handlers/register.py58
-rw-r--r--synapse/handlers/room.py66
-rw-r--r--synapse/handlers/room_member.py51
-rw-r--r--synapse/handlers/search.py17
-rw-r--r--synapse/handlers/sync.py1151
-rw-r--r--synapse/handlers/typing.py143
16 files changed, 1241 insertions, 1158 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index f4dbf47c1d..d28e07f0d9 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,24 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.appservice.scheduler import AppServiceScheduler
-from synapse.appservice.api import ApplicationServiceApi
 from .register import RegistrationHandler
 from .room import (
-    RoomCreationHandler, RoomListHandler, RoomContextHandler,
+    RoomCreationHandler, RoomContextHandler,
 )
 from .room_member import RoomMemberHandler
 from .message import MessageHandler
 from .events import EventStreamHandler, EventHandler
 from .federation import FederationHandler
 from .profile import ProfileHandler
-from .presence import PresenceHandler
 from .directory import DirectoryHandler
-from .typing import TypingNotificationHandler
 from .admin import AdminHandler
-from .appservice import ApplicationServicesHandler
-from .sync import SyncHandler
-from .auth import AuthHandler
 from .identity import IdentityHandler
 from .receipts import ReceiptsHandler
 from .search import SearchHandler
@@ -53,22 +46,9 @@ class Handlers(object):
         self.event_handler = EventHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.profile_handler = ProfileHandler(hs)
-        self.presence_handler = PresenceHandler(hs)
-        self.room_list_handler = RoomListHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
-        self.typing_notification_handler = TypingNotificationHandler(hs)
         self.admin_handler = AdminHandler(hs)
         self.receipts_handler = ReceiptsHandler(hs)
-        asapi = ApplicationServiceApi(hs)
-        self.appservice_handler = ApplicationServicesHandler(
-            hs, asapi, AppServiceScheduler(
-                clock=hs.get_clock(),
-                store=hs.get_datastore(),
-                as_api=asapi
-            )
-        )
-        self.sync_handler = SyncHandler(hs)
-        self.auth_handler = AuthHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
         self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 13a675b208..c904c6c500 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -15,13 +15,10 @@
 
 from twisted.internet import defer
 
-from synapse.api.errors import LimitExceededError, SynapseError, AuthError
-from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.api.errors import LimitExceededError
 from synapse.api.constants import Membership, EventTypes
-from synapse.types import UserID, RoomAlias, Requester
-from synapse.push.action_generator import ActionGenerator
+from synapse.types import UserID, Requester
 
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
 
 import logging
 
@@ -29,23 +26,6 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-VISIBILITY_PRIORITY = (
-    "world_readable",
-    "shared",
-    "invited",
-    "joined",
-)
-
-
-MEMBERSHIP_PRIORITY = (
-    Membership.JOIN,
-    Membership.INVITE,
-    Membership.KNOCK,
-    Membership.LEAVE,
-    Membership.BAN,
-)
-
-
 class BaseHandler(object):
     """
     Common base class for the event handlers.
@@ -65,161 +45,10 @@ class BaseHandler(object):
         self.clock = hs.get_clock()
         self.hs = hs
 
-        self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
 
         self.event_builder_factory = hs.get_event_builder_factory()
 
-    @defer.inlineCallbacks
-    def filter_events_for_clients(self, user_tuples, events, event_id_to_state):
-        """ Returns dict of user_id -> list of events that user is allowed to
-        see.
-
-        Args:
-            user_tuples (str, bool): (user id, is_peeking) for each user to be
-                checked. is_peeking should be true if:
-                * the user is not currently a member of the room, and:
-                * the user has not been a member of the room since the
-                given events
-            events ([synapse.events.EventBase]): list of events to filter
-        """
-        forgotten = yield defer.gatherResults([
-            self.store.who_forgot_in_room(
-                room_id,
-            )
-            for room_id in frozenset(e.room_id for e in events)
-        ], consumeErrors=True)
-
-        # Set of membership event_ids that have been forgotten
-        event_id_forgotten = frozenset(
-            row["event_id"] for rows in forgotten for row in rows
-        )
-
-        def allowed(event, user_id, is_peeking):
-            """
-            Args:
-                event (synapse.events.EventBase): event to check
-                user_id (str)
-                is_peeking (bool)
-            """
-            state = event_id_to_state[event.event_id]
-
-            # get the room_visibility at the time of the event.
-            visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
-            if visibility_event:
-                visibility = visibility_event.content.get("history_visibility", "shared")
-            else:
-                visibility = "shared"
-
-            if visibility not in VISIBILITY_PRIORITY:
-                visibility = "shared"
-
-            # if it was world_readable, it's easy: everyone can read it
-            if visibility == "world_readable":
-                return True
-
-            # Always allow history visibility events on boundaries. This is done
-            # by setting the effective visibility to the least restrictive
-            # of the old vs new.
-            if event.type == EventTypes.RoomHistoryVisibility:
-                prev_content = event.unsigned.get("prev_content", {})
-                prev_visibility = prev_content.get("history_visibility", None)
-
-                if prev_visibility not in VISIBILITY_PRIORITY:
-                    prev_visibility = "shared"
-
-                new_priority = VISIBILITY_PRIORITY.index(visibility)
-                old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
-                if old_priority < new_priority:
-                    visibility = prev_visibility
-
-            # likewise, if the event is the user's own membership event, use
-            # the 'most joined' membership
-            membership = None
-            if event.type == EventTypes.Member and event.state_key == user_id:
-                membership = event.content.get("membership", None)
-                if membership not in MEMBERSHIP_PRIORITY:
-                    membership = "leave"
-
-                prev_content = event.unsigned.get("prev_content", {})
-                prev_membership = prev_content.get("membership", None)
-                if prev_membership not in MEMBERSHIP_PRIORITY:
-                    prev_membership = "leave"
-
-                new_priority = MEMBERSHIP_PRIORITY.index(membership)
-                old_priority = MEMBERSHIP_PRIORITY.index(prev_membership)
-                if old_priority < new_priority:
-                    membership = prev_membership
-
-            # otherwise, get the user's membership at the time of the event.
-            if membership is None:
-                membership_event = state.get((EventTypes.Member, user_id), None)
-                if membership_event:
-                    if membership_event.event_id not in event_id_forgotten:
-                        membership = membership_event.membership
-
-            # if the user was a member of the room at the time of the event,
-            # they can see it.
-            if membership == Membership.JOIN:
-                return True
-
-            if visibility == "joined":
-                # we weren't a member at the time of the event, so we can't
-                # see this event.
-                return False
-
-            elif visibility == "invited":
-                # user can also see the event if they were *invited* at the time
-                # of the event.
-                return membership == Membership.INVITE
-
-            else:
-                # visibility is shared: user can also see the event if they have
-                # become a member since the event
-                #
-                # XXX: if the user has subsequently joined and then left again,
-                # ideally we would share history up to the point they left. But
-                # we don't know when they left.
-                return not is_peeking
-
-        defer.returnValue({
-            user_id: [
-                event
-                for event in events
-                if allowed(event, user_id, is_peeking)
-            ]
-            for user_id, is_peeking in user_tuples
-        })
-
-    @defer.inlineCallbacks
-    def _filter_events_for_client(self, user_id, events, is_peeking=False):
-        """
-        Check which events a user is allowed to see
-
-        Args:
-            user_id(str): user id to be checked
-            events([synapse.events.EventBase]): list of events to be checked
-            is_peeking(bool): should be True if:
-              * the user is not currently a member of the room, and:
-              * the user has not been a member of the room since the given
-                events
-
-        Returns:
-            [synapse.events.EventBase]
-        """
-        types = (
-            (EventTypes.RoomHistoryVisibility, ""),
-            (EventTypes.Member, user_id),
-        )
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=types
-        )
-        res = yield self.filter_events_for_clients(
-            [(user_id, is_peeking)], events, event_id_to_state
-        )
-        defer.returnValue(res.get(user_id, []))
-
     def ratelimit(self, requester):
         time_now = self.clock.time()
         allowed, time_allowed = self.ratelimiter.send_message(
@@ -232,56 +61,6 @@ class BaseHandler(object):
                 retry_after_ms=int(1000 * (time_allowed - time_now)),
             )
 
-    @defer.inlineCallbacks
-    def _create_new_client_event(self, builder, prev_event_ids=None):
-        if prev_event_ids:
-            prev_events = yield self.store.add_event_hashes(prev_event_ids)
-            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
-            depth = prev_max_depth + 1
-        else:
-            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
-                builder.room_id,
-            )
-
-            if latest_ret:
-                depth = max([d for _, _, d in latest_ret]) + 1
-            else:
-                depth = 1
-
-            prev_events = [
-                (event_id, prev_hashes)
-                for event_id, prev_hashes, _ in latest_ret
-            ]
-
-        builder.prev_events = prev_events
-        builder.depth = depth
-
-        state_handler = self.state_handler
-
-        context = yield state_handler.compute_event_context(builder)
-
-        if builder.is_state():
-            builder.prev_state = yield self.store.add_event_hashes(
-                context.prev_state_events
-            )
-
-        yield self.auth.add_auth_events(builder, context)
-
-        add_hashes_and_signatures(
-            builder, self.server_name, self.signing_key
-        )
-
-        event = builder.build()
-
-        logger.debug(
-            "Created event %s with current state: %s",
-            event.event_id, context.current_state,
-        )
-
-        defer.returnValue(
-            (event, context,)
-        )
-
     def is_host_in_room(self, current_state):
         room_members = [
             (state_key, event.membership)
@@ -296,154 +75,13 @@ class BaseHandler(object):
                 return True
         for (state_key, membership) in room_members:
             if (
-                UserID.from_string(state_key).domain == self.hs.hostname
+                self.hs.is_mine_id(state_key)
                 and membership == Membership.JOIN
             ):
                 return True
         return False
 
     @defer.inlineCallbacks
-    def handle_new_client_event(
-        self,
-        requester,
-        event,
-        context,
-        ratelimit=True,
-        extra_users=[]
-    ):
-        # We now need to go and hit out to wherever we need to hit out to.
-
-        if ratelimit:
-            self.ratelimit(requester)
-
-        try:
-            self.auth.check(event, auth_events=context.current_state)
-        except AuthError as err:
-            logger.warn("Denying new event %r because %s", event, err)
-            raise err
-
-        yield self.maybe_kick_guest_users(event, context.current_state.values())
-
-        if event.type == EventTypes.CanonicalAlias:
-            # Check the alias is acually valid (at this time at least)
-            room_alias_str = event.content.get("alias", None)
-            if room_alias_str:
-                room_alias = RoomAlias.from_string(room_alias_str)
-                directory_handler = self.hs.get_handlers().directory_handler
-                mapping = yield directory_handler.get_association(room_alias)
-
-                if mapping["room_id"] != event.room_id:
-                    raise SynapseError(
-                        400,
-                        "Room alias %s does not point to the room" % (
-                            room_alias_str,
-                        )
-                    )
-
-        federation_handler = self.hs.get_handlers().federation_handler
-
-        if event.type == EventTypes.Member:
-            if event.content["membership"] == Membership.INVITE:
-                def is_inviter_member_event(e):
-                    return (
-                        e.type == EventTypes.Member and
-                        e.sender == event.sender
-                    )
-
-                event.unsigned["invite_room_state"] = [
-                    {
-                        "type": e.type,
-                        "state_key": e.state_key,
-                        "content": e.content,
-                        "sender": e.sender,
-                    }
-                    for k, e in context.current_state.items()
-                    if e.type in self.hs.config.room_invite_state_types
-                    or is_inviter_member_event(e)
-                ]
-
-                invitee = UserID.from_string(event.state_key)
-                if not self.hs.is_mine(invitee):
-                    # TODO: Can we add signature from remote server in a nicer
-                    # way? If we have been invited by a remote server, we need
-                    # to get them to sign the event.
-
-                    returned_invite = yield federation_handler.send_invite(
-                        invitee.domain,
-                        event,
-                    )
-
-                    event.unsigned.pop("room_state", None)
-
-                    # TODO: Make sure the signatures actually are correct.
-                    event.signatures.update(
-                        returned_invite.signatures
-                    )
-
-        if event.type == EventTypes.Redaction:
-            if self.auth.check_redaction(event, auth_events=context.current_state):
-                original_event = yield self.store.get_event(
-                    event.redacts,
-                    check_redacted=False,
-                    get_prev_content=False,
-                    allow_rejected=False,
-                    allow_none=False
-                )
-                if event.user_id != original_event.user_id:
-                    raise AuthError(
-                        403,
-                        "You don't have permission to redact events"
-                    )
-
-        if event.type == EventTypes.Create and context.current_state:
-            raise AuthError(
-                403,
-                "Changing the room create event is forbidden",
-            )
-
-        action_generator = ActionGenerator(self.hs)
-        yield action_generator.handle_push_actions_for_event(
-            event, context, self
-        )
-
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
-            event, context=context
-        )
-
-        # this intentionally does not yield: we don't care about the result
-        # and don't need to wait for it.
-        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
-            event_stream_id, max_stream_id
-        )
-
-        destinations = set()
-        for k, s in context.current_state.items():
-            try:
-                if k[0] == EventTypes.Member:
-                    if s.content["membership"] == Membership.JOIN:
-                        destinations.add(
-                            UserID.from_string(s.state_key).domain
-                        )
-            except SynapseError:
-                logger.warn(
-                    "Failed to get destination from event %s", s.event_id
-                )
-
-        with PreserveLoggingContext():
-            # Don't block waiting on waking up all the listeners.
-            self.notifier.on_new_room_event(
-                event, event_stream_id, max_stream_id,
-                extra_users=extra_users
-            )
-
-        # If invite, remove room_state from unsigned before sending.
-        event.unsigned.pop("invite_room_state", None)
-
-        federation_handler.handle_new_event(
-            event, destinations=destinations,
-        )
-
-    @defer.inlineCallbacks
     def maybe_kick_guest_users(self, event, current_state):
         # Technically this function invalidates current_state by changing it.
         # Hopefully this isn't that important to the caller.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 75fc74c797..051ccdb380 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.appservice import ApplicationService
-from synapse.types import UserID
 
 import logging
 
@@ -35,16 +34,13 @@ def log_failure(failure):
     )
 
 
-# NB: Purposefully not inheriting BaseHandler since that contains way too much
-# setup code which this handler does not need or use. This makes testing a lot
-# easier.
 class ApplicationServicesHandler(object):
 
-    def __init__(self, hs, appservice_api, appservice_scheduler):
+    def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.hs = hs
-        self.appservice_api = appservice_api
-        self.scheduler = appservice_scheduler
+        self.is_mine_id = hs.is_mine_id
+        self.appservice_api = hs.get_application_service_api()
+        self.scheduler = hs.get_application_service_scheduler()
         self.started_scheduler = False
 
     @defer.inlineCallbacks
@@ -169,8 +165,7 @@ class ApplicationServicesHandler(object):
 
     @defer.inlineCallbacks
     def _is_unknown_user(self, user_id):
-        user = UserID.from_string(user_id)
-        if not self.hs.is_mine(user):
+        if not self.is_mine_id(user_id):
             # we don't know if they are unknown or not since it isn't one of our
             # users. We can't poke ASes.
             defer.returnValue(False)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 61fe56032a..200793b5ed 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 from ._base import BaseHandler
 from synapse.api.constants import LoginType
 from synapse.types import UserID
-from synapse.api.errors import AuthError, LoginError, Codes
+from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
 from synapse.util.async import run_on_reactor
 
 from twisted.web.client import PartialDownloadError
@@ -521,14 +521,19 @@ class AuthHandler(BaseHandler):
         ))
         return m.serialize()
 
-    def generate_short_term_login_token(self, user_id):
+    def generate_short_term_login_token(self, user_id, duration_in_ms=(2 * 60 * 1000)):
         macaroon = self._generate_base_macaroon(user_id)
         macaroon.add_first_party_caveat("type = login")
         now = self.hs.get_clock().time_msec()
-        expiry = now + (2 * 60 * 1000)
+        expiry = now + duration_in_ms
         macaroon.add_first_party_caveat("time < %d" % (expiry,))
         return macaroon.serialize()
 
+    def generate_delete_pusher_token(self, user_id):
+        macaroon = self._generate_base_macaroon(user_id)
+        macaroon.add_first_party_caveat("type = delete_pusher")
+        return macaroon.serialize()
+
     def validate_short_term_login_token_and_get_user_id(self, login_token):
         try:
             macaroon = pymacaroons.Macaroon.deserialize(login_token)
@@ -563,7 +568,12 @@ class AuthHandler(BaseHandler):
 
         except_access_token_ids = [requester.access_token_id] if requester else []
 
-        yield self.store.user_set_password_hash(user_id, password_hash)
+        try:
+            yield self.store.user_set_password_hash(user_id, password_hash)
+        except StoreError as e:
+            if e.code == 404:
+                raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+            raise e
         yield self.store.user_delete_access_tokens(
             user_id, except_access_token_ids
         )
@@ -615,4 +625,7 @@ class AuthHandler(BaseHandler):
         Returns:
             Whether self.hash(password) == stored_hash (bool).
         """
-        return bcrypt.hashpw(password, stored_hash) == stored_hash
+        if stored_hash:
+            return bcrypt.hashpw(password, stored_hash) == stored_hash
+        else:
+            return False
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8eeb225811..4bea7f2b19 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler):
         super(DirectoryHandler, self).__init__(hs)
 
         self.state = hs.get_state_handler()
+        self.appservice_handler = hs.get_application_service_handler()
 
         self.federation = hs.get_replication_layer()
         self.federation.register_query_handler(
@@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler):
         )
         if not result:
             # Query AS to see if it exists
-            as_handler = self.hs.get_handlers().appservice_handler
+            as_handler = self.appservice_handler
             result = yield as_handler.query_room_alias_exists(room_alias)
         defer.returnValue(result)
 
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index f25a252523..3a3a1257d3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -58,7 +58,7 @@ class EventStreamHandler(BaseHandler):
         If `only_keys` is not None, events from keys will be sent down.
         """
         auth_user = UserID.from_string(auth_user_id)
-        presence_handler = self.hs.get_handlers().presence_handler
+        presence_handler = self.hs.get_presence_handler()
 
         context = yield presence_handler.user_syncing(
             auth_user_id, affect_presence=affect_presence,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d95e0b23b1..648a505e65 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -33,7 +33,7 @@ from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
     compute_event_signature, add_hashes_and_signatures,
 )
-from synapse.types import UserID
+from synapse.types import UserID, get_domain_from_id
 
 from synapse.events.utils import prune_event
 
@@ -453,7 +453,7 @@ class FederationHandler(BaseHandler):
             joined_domains = {}
             for u, d in joined_users:
                 try:
-                    dom = UserID.from_string(u).domain
+                    dom = get_domain_from_id(u)
                     old_d = joined_domains.get(dom)
                     if old_d:
                         joined_domains[dom] = min(d, old_d)
@@ -682,7 +682,8 @@ class FederationHandler(BaseHandler):
         })
 
         try:
-            event, context = yield self._create_new_client_event(
+            message_handler = self.hs.get_handlers().message_handler
+            event, context = yield message_handler._create_new_client_event(
                 builder=builder,
             )
         except AuthError as e:
@@ -743,9 +744,7 @@ class FederationHandler(BaseHandler):
             try:
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.JOIN:
-                        destinations.add(
-                            UserID.from_string(s.state_key).domain
-                        )
+                        destinations.add(get_domain_from_id(s.state_key))
             except:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
@@ -915,7 +914,8 @@ class FederationHandler(BaseHandler):
             "state_key": user_id,
         })
 
-        event, context = yield self._create_new_client_event(
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(
             builder=builder,
         )
 
@@ -970,9 +970,7 @@ class FederationHandler(BaseHandler):
             try:
                 if k[0] == EventTypes.Member:
                     if s.content["membership"] == Membership.LEAVE:
-                        destinations.add(
-                            UserID.from_string(s.state_key).domain
-                        )
+                        destinations.add(get_domain_from_id(s.state_key))
             except:
                 logger.warn(
                     "Failed to get destination from event %s", s.event_id
@@ -1115,7 +1113,7 @@ class FederationHandler(BaseHandler):
         if not event.internal_metadata.is_outlier():
             action_generator = ActionGenerator(self.hs)
             yield action_generator.handle_push_actions_for_event(
-                event, context, self
+                event, context
             )
 
         event_stream_id, max_stream_id = yield self.store.persist_event(
@@ -1692,7 +1690,10 @@ class FederationHandler(BaseHandler):
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
             builder = self.event_builder_factory.new(event_dict)
             EventValidator().validate_new(builder)
-            event, context = yield self._create_new_client_event(builder=builder)
+            message_handler = self.hs.get_handlers().message_handler
+            event, context = yield message_handler._create_new_client_event(
+                builder=builder
+            )
 
             event, context = yield self.add_display_name_to_third_party_invite(
                 event_dict, event, context
@@ -1720,7 +1721,8 @@ class FederationHandler(BaseHandler):
     def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
         builder = self.event_builder_factory.new(event_dict)
 
-        event, context = yield self._create_new_client_event(
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(
             builder=builder,
         )
 
@@ -1759,7 +1761,8 @@ class FederationHandler(BaseHandler):
         event_dict["content"]["third_party_invite"]["display_name"] = display_name
         builder = self.event_builder_factory.new(event_dict)
         EventValidator().validate_new(builder)
-        event, context = yield self._create_new_client_event(builder=builder)
+        message_handler = self.hs.get_handlers().message_handler
+        event, context = yield message_handler._create_new_client_event(builder=builder)
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index f51feda2f4..15caf1950a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,13 +17,19 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.streams.config import PaginationConfig
+from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
+from synapse.push.action_generator import ActionGenerator
+from synapse.streams.config import PaginationConfig
+from synapse.types import (
+    UserID, RoomAlias, RoomStreamToken, StreamToken, get_domain_from_id
+)
 from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
+from synapse.util.async import concurrently_execute, run_on_reactor
 from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.types import UserID, RoomStreamToken, StreamToken
+from synapse.util.logcontext import preserve_fn
+from synapse.visibility import filter_events_for_client
 
 from ._base import BaseHandler
 
@@ -123,7 +129,8 @@ class MessageHandler(BaseHandler):
                 "end": next_token.to_string(),
             })
 
-        events = yield self._filter_events_for_client(
+        events = yield filter_events_for_client(
+            self.store,
             user_id,
             events,
             is_peeking=(member_event_id is None),
@@ -229,7 +236,7 @@ class MessageHandler(BaseHandler):
         )
 
         if event.type == EventTypes.Message:
-            presence = self.hs.get_handlers().presence_handler
+            presence = self.hs.get_presence_handler()
             yield presence.bump_presence_active_time(user)
 
     def deduplicate_state_event(self, event, context):
@@ -483,8 +490,8 @@ class MessageHandler(BaseHandler):
                     ]
                 ).addErrback(unwrapFirstError)
 
-                messages = yield self._filter_events_for_client(
-                    user_id, messages
+                messages = yield filter_events_for_client(
+                    self.store, user_id, messages
                 )
 
                 start_token = now_token.copy_and_replace("room_key", token[0])
@@ -619,8 +626,8 @@ class MessageHandler(BaseHandler):
             end_token=stream_token
         )
 
-        messages = yield self._filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking
         )
 
         start_token = StreamToken.START.copy_and_replace("room_key", token[0])
@@ -667,7 +674,7 @@ class MessageHandler(BaseHandler):
             and m.content["membership"] == Membership.JOIN
         ]
 
-        presence_handler = self.hs.get_handlers().presence_handler
+        presence_handler = self.hs.get_presence_handler()
 
         @defer.inlineCallbacks
         def get_presence():
@@ -700,8 +707,8 @@ class MessageHandler(BaseHandler):
             consumeErrors=True,
         ).addErrback(unwrapFirstError)
 
-        messages = yield self._filter_events_for_client(
-            user_id, messages, is_peeking=is_peeking,
+        messages = yield filter_events_for_client(
+            self.store, user_id, messages, is_peeking=is_peeking,
         )
 
         start_token = now_token.copy_and_replace("room_key", token[0])
@@ -724,3 +731,196 @@ class MessageHandler(BaseHandler):
             ret["membership"] = membership
 
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def _create_new_client_event(self, builder, prev_event_ids=None):
+        if prev_event_ids:
+            prev_events = yield self.store.add_event_hashes(prev_event_ids)
+            prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
+            depth = prev_max_depth + 1
+        else:
+            latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
+                builder.room_id,
+            )
+
+            if latest_ret:
+                depth = max([d for _, _, d in latest_ret]) + 1
+            else:
+                depth = 1
+
+            prev_events = [
+                (event_id, prev_hashes)
+                for event_id, prev_hashes, _ in latest_ret
+            ]
+
+        builder.prev_events = prev_events
+        builder.depth = depth
+
+        state_handler = self.state_handler
+
+        context = yield state_handler.compute_event_context(builder)
+
+        if builder.is_state():
+            builder.prev_state = yield self.store.add_event_hashes(
+                context.prev_state_events
+            )
+
+        yield self.auth.add_auth_events(builder, context)
+
+        signing_key = self.hs.config.signing_key[0]
+        add_hashes_and_signatures(
+            builder, self.server_name, signing_key
+        )
+
+        event = builder.build()
+
+        logger.debug(
+            "Created event %s with current state: %s",
+            event.event_id, context.current_state,
+        )
+
+        defer.returnValue(
+            (event, context,)
+        )
+
+    @defer.inlineCallbacks
+    def handle_new_client_event(
+        self,
+        requester,
+        event,
+        context,
+        ratelimit=True,
+        extra_users=[]
+    ):
+        # We now need to go and hit out to wherever we need to hit out to.
+
+        if ratelimit:
+            self.ratelimit(requester)
+
+        try:
+            self.auth.check(event, auth_events=context.current_state)
+        except AuthError as err:
+            logger.warn("Denying new event %r because %s", event, err)
+            raise err
+
+        yield self.maybe_kick_guest_users(event, context.current_state.values())
+
+        if event.type == EventTypes.CanonicalAlias:
+            # Check the alias is acually valid (at this time at least)
+            room_alias_str = event.content.get("alias", None)
+            if room_alias_str:
+                room_alias = RoomAlias.from_string(room_alias_str)
+                directory_handler = self.hs.get_handlers().directory_handler
+                mapping = yield directory_handler.get_association(room_alias)
+
+                if mapping["room_id"] != event.room_id:
+                    raise SynapseError(
+                        400,
+                        "Room alias %s does not point to the room" % (
+                            room_alias_str,
+                        )
+                    )
+
+        federation_handler = self.hs.get_handlers().federation_handler
+
+        if event.type == EventTypes.Member:
+            if event.content["membership"] == Membership.INVITE:
+                def is_inviter_member_event(e):
+                    return (
+                        e.type == EventTypes.Member and
+                        e.sender == event.sender
+                    )
+
+                event.unsigned["invite_room_state"] = [
+                    {
+                        "type": e.type,
+                        "state_key": e.state_key,
+                        "content": e.content,
+                        "sender": e.sender,
+                    }
+                    for k, e in context.current_state.items()
+                    if e.type in self.hs.config.room_invite_state_types
+                    or is_inviter_member_event(e)
+                ]
+
+                invitee = UserID.from_string(event.state_key)
+                if not self.hs.is_mine(invitee):
+                    # TODO: Can we add signature from remote server in a nicer
+                    # way? If we have been invited by a remote server, we need
+                    # to get them to sign the event.
+
+                    returned_invite = yield federation_handler.send_invite(
+                        invitee.domain,
+                        event,
+                    )
+
+                    event.unsigned.pop("room_state", None)
+
+                    # TODO: Make sure the signatures actually are correct.
+                    event.signatures.update(
+                        returned_invite.signatures
+                    )
+
+        if event.type == EventTypes.Redaction:
+            if self.auth.check_redaction(event, auth_events=context.current_state):
+                original_event = yield self.store.get_event(
+                    event.redacts,
+                    check_redacted=False,
+                    get_prev_content=False,
+                    allow_rejected=False,
+                    allow_none=False
+                )
+                if event.user_id != original_event.user_id:
+                    raise AuthError(
+                        403,
+                        "You don't have permission to redact events"
+                    )
+
+        if event.type == EventTypes.Create and context.current_state:
+            raise AuthError(
+                403,
+                "Changing the room create event is forbidden",
+            )
+
+        action_generator = ActionGenerator(self.hs)
+        yield action_generator.handle_push_actions_for_event(
+            event, context
+        )
+
+        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+            event, context=context
+        )
+
+        # this intentionally does not yield: we don't care about the result
+        # and don't need to wait for it.
+        preserve_fn(self.hs.get_pusherpool().on_new_notifications)(
+            event_stream_id, max_stream_id
+        )
+
+        destinations = set()
+        for k, s in context.current_state.items():
+            try:
+                if k[0] == EventTypes.Member:
+                    if s.content["membership"] == Membership.JOIN:
+                        destinations.add(get_domain_from_id(s.state_key))
+            except SynapseError:
+                logger.warn(
+                    "Failed to get destination from event %s", s.event_id
+                )
+
+        @defer.inlineCallbacks
+        def _notify():
+            yield run_on_reactor()
+            self.notifier.on_new_room_event(
+                event, event_stream_id, max_stream_id,
+                extra_users=extra_users
+            )
+
+        preserve_fn(_notify)()
+
+        # If invite, remove room_state from unsigned before sending.
+        event.unsigned.pop("invite_room_state", None)
+
+        federation_handler.handle_new_event(
+            event, destinations=destinations,
+        )
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 639567953a..6b70fa3817 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -33,11 +33,9 @@ from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
-from synapse.types import UserID
+from synapse.types import UserID, get_domain_from_id
 import synapse.metrics
 
-from ._base import BaseHandler
-
 import logging
 
 
@@ -52,6 +50,8 @@ timers_fired_counter = metrics.register_counter("timers_fired")
 federation_presence_counter = metrics.register_counter("federation_presence")
 bump_active_time_counter = metrics.register_counter("bump_active_time")
 
+get_updates_counter = metrics.register_counter("get_updates", labels=["type"])
+
 
 # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
 # "currently_active"
@@ -70,14 +70,18 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000
 # How often to resend presence to remote servers
 FEDERATION_PING_INTERVAL = 25 * 60 * 1000
 
+# How long we will wait before assuming that the syncs from an external process
+# are dead.
+EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
+
 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
-class PresenceHandler(BaseHandler):
+class PresenceHandler(object):
 
     def __init__(self, hs):
-        super(PresenceHandler, self).__init__(hs)
-        self.hs = hs
+        self.is_mine = hs.is_mine
+        self.is_mine_id = hs.is_mine_id
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.wheel_timer = WheelTimer()
@@ -138,7 +142,7 @@ class PresenceHandler(BaseHandler):
                 obj=state.user_id,
                 then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
             )
-            if self.hs.is_mine_id(state.user_id):
+            if self.is_mine_id(state.user_id):
                 self.wheel_timer.insert(
                     now=now,
                     obj=state.user_id,
@@ -160,15 +164,26 @@ class PresenceHandler(BaseHandler):
         self.serial_to_user = {}
         self._next_serial = 1
 
-        # Keeps track of the number of *ongoing* syncs. While this is non zero
-        # a user will never go offline.
+        # Keeps track of the number of *ongoing* syncs on this process. While
+        # this is non zero a user will never go offline.
         self.user_to_num_current_syncs = {}
 
+        # Keeps track of the number of *ongoing* syncs on other processes.
+        # While any sync is ongoing on another process the user will never
+        # go offline.
+        # Each process has a unique identifier and an update frequency. If
+        # no update is received from that process within the update period then
+        # we assume that all the sync requests on that process have stopped.
+        # Stored as a dict from process_id to set of user_id, and a dict of
+        # process_id to millisecond timestamp last updated.
+        self.external_process_to_current_syncs = {}
+        self.external_process_last_updated_ms = {}
+
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
         self.clock.call_later(
-            30 * 1000,
+            30,
             self.clock.looping_call,
             self._handle_timeouts,
             5000,
@@ -228,7 +243,7 @@ class PresenceHandler(BaseHandler):
 
                 new_state, should_notify, should_ping = handle_update(
                     prev_state, new_state,
-                    is_mine=self.hs.is_mine_id(user_id),
+                    is_mine=self.is_mine_id(user_id),
                     wheel_timer=self.wheel_timer,
                     now=now
                 )
@@ -268,31 +283,48 @@ class PresenceHandler(BaseHandler):
         """Checks the presence of users that have timed out and updates as
         appropriate.
         """
+        logger.info("Handling presence timeouts")
         now = self.clock.time_msec()
 
-        with Measure(self.clock, "presence_handle_timeouts"):
-            # Fetch the list of users that *may* have timed out. Things may have
-            # changed since the timeout was set, so we won't necessarily have to
-            # take any action.
-            users_to_check = self.wheel_timer.fetch(now)
+        try:
+            with Measure(self.clock, "presence_handle_timeouts"):
+                # Fetch the list of users that *may* have timed out. Things may have
+                # changed since the timeout was set, so we won't necessarily have to
+                # take any action.
+                users_to_check = set(self.wheel_timer.fetch(now))
+
+                # Check whether the lists of syncing processes from an external
+                # process have expired.
+                expired_process_ids = [
+                    process_id for process_id, last_update
+                    in self.external_process_last_updated_ms.items()
+                    if now - last_update > EXTERNAL_PROCESS_EXPIRY
+                ]
+                for process_id in expired_process_ids:
+                    users_to_check.update(
+                        self.external_process_last_updated_ms.pop(process_id, ())
+                    )
+                    self.external_process_last_update.pop(process_id)
 
-            states = [
-                self.user_to_current_state.get(
-                    user_id, UserPresenceState.default(user_id)
-                )
-                for user_id in set(users_to_check)
-            ]
+                states = [
+                    self.user_to_current_state.get(
+                        user_id, UserPresenceState.default(user_id)
+                    )
+                    for user_id in users_to_check
+                ]
 
-            timers_fired_counter.inc_by(len(states))
+                timers_fired_counter.inc_by(len(states))
 
-            changes = handle_timeouts(
-                states,
-                is_mine_fn=self.hs.is_mine_id,
-                user_to_num_current_syncs=self.user_to_num_current_syncs,
-                now=now,
-            )
+                changes = handle_timeouts(
+                    states,
+                    is_mine_fn=self.is_mine_id,
+                    syncing_user_ids=self.get_currently_syncing_users(),
+                    now=now,
+                )
 
-        preserve_fn(self._update_states)(changes)
+            preserve_fn(self._update_states)(changes)
+        except:
+            logger.exception("Exception in _handle_timeouts loop")
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
@@ -365,6 +397,74 @@ class PresenceHandler(BaseHandler):
 
         defer.returnValue(_user_syncing())
 
+    def get_currently_syncing_users(self):
+        """Get the set of user ids that are currently syncing on this HS.
+        Returns:
+            set(str): A set of user_id strings.
+        """
+        syncing_user_ids = {
+            user_id for user_id, count in self.user_to_num_current_syncs.items()
+            if count
+        }
+        for user_ids in self.external_process_to_current_syncs.values():
+            syncing_user_ids.update(user_ids)
+        return syncing_user_ids
+
+    @defer.inlineCallbacks
+    def update_external_syncs(self, process_id, syncing_user_ids):
+        """Update the syncing users for an external process
+
+        Args:
+            process_id(str): An identifier for the process the users are
+                syncing against. This allows synapse to process updates
+                as user start and stop syncing against a given process.
+            syncing_user_ids(set(str)): The set of user_ids that are
+                currently syncing on that server.
+        """
+
+        # Grab the previous list of user_ids that were syncing on that process
+        prev_syncing_user_ids = (
+            self.external_process_to_current_syncs.get(process_id, set())
+        )
+        # Grab the current presence state for both the users that are syncing
+        # now and the users that were syncing before this update.
+        prev_states = yield self.current_state_for_users(
+            syncing_user_ids | prev_syncing_user_ids
+        )
+        updates = []
+        time_now_ms = self.clock.time_msec()
+
+        # For each new user that is syncing check if we need to mark them as
+        # being online.
+        for new_user_id in syncing_user_ids - prev_syncing_user_ids:
+            prev_state = prev_states[new_user_id]
+            if prev_state.state == PresenceState.OFFLINE:
+                updates.append(prev_state.copy_and_replace(
+                    state=PresenceState.ONLINE,
+                    last_active_ts=time_now_ms,
+                    last_user_sync_ts=time_now_ms,
+                ))
+            else:
+                updates.append(prev_state.copy_and_replace(
+                    last_user_sync_ts=time_now_ms,
+                ))
+
+        # For each user that is still syncing or stopped syncing update the
+        # last sync time so that we will correctly apply the grace period when
+        # they stop syncing.
+        for old_user_id in prev_syncing_user_ids:
+            prev_state = prev_states[old_user_id]
+            updates.append(prev_state.copy_and_replace(
+                last_user_sync_ts=time_now_ms,
+            ))
+
+        yield self._update_states(updates)
+
+        # Update the last updated time for the process. We expire the entries
+        # if we don't receive an update in the given timeframe.
+        self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+        self.external_process_to_current_syncs[process_id] = syncing_user_ids
+
     @defer.inlineCallbacks
     def current_state_for_user(self, user_id):
         """Get the current presence state for a user.
@@ -427,7 +527,7 @@ class PresenceHandler(BaseHandler):
 
         hosts_to_states = {}
         for room_id, states in room_ids_to_states.items():
-            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
             if not local_states:
                 continue
 
@@ -436,11 +536,11 @@ class PresenceHandler(BaseHandler):
                 hosts_to_states.setdefault(host, []).extend(local_states)
 
         for user_id, states in users_to_states.items():
-            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            local_states = filter(lambda s: self.is_mine_id(s.user_id), states)
             if not local_states:
                 continue
 
-            host = UserID.from_string(user_id).domain
+            host = get_domain_from_id(user_id)
             hosts_to_states.setdefault(host, []).extend(local_states)
 
         # TODO: de-dup hosts_to_states, as a single host might have multiple
@@ -611,14 +711,14 @@ class PresenceHandler(BaseHandler):
         # don't need to send to local clients here, as that is done as part
         # of the event stream/sync.
         # TODO: Only send to servers not already in the room.
-        if self.hs.is_mine(user):
+        if self.is_mine(user):
             state = yield self.current_state_for_user(user.to_string())
 
             hosts = yield self.store.get_joined_hosts_for_room(room_id)
             self._push_to_remotes({host: (state,) for host in hosts})
         else:
             user_ids = yield self.store.get_users_in_room(room_id)
-            user_ids = filter(self.hs.is_mine_id, user_ids)
+            user_ids = filter(self.is_mine_id, user_ids)
 
             states = yield self.current_state_for_users(user_ids)
 
@@ -628,7 +728,7 @@ class PresenceHandler(BaseHandler):
     def get_presence_list(self, observer_user, accepted=None):
         """Returns the presence for all users in their presence list.
         """
-        if not self.hs.is_mine(observer_user):
+        if not self.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         presence_list = yield self.store.get_presence_list(
@@ -659,7 +759,7 @@ class PresenceHandler(BaseHandler):
             observer_user.localpart, observed_user.to_string()
         )
 
-        if self.hs.is_mine(observed_user):
+        if self.is_mine(observed_user):
             yield self.invite_presence(observed_user, observer_user)
         else:
             yield self.federation.send_edu(
@@ -675,11 +775,11 @@ class PresenceHandler(BaseHandler):
     def invite_presence(self, observed_user, observer_user):
         """Handles new presence invites.
         """
-        if not self.hs.is_mine(observed_user):
+        if not self.is_mine(observed_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         # TODO: Don't auto accept
-        if self.hs.is_mine(observer_user):
+        if self.is_mine(observer_user):
             yield self.accept_presence(observed_user, observer_user)
         else:
             self.federation.send_edu(
@@ -742,7 +842,7 @@ class PresenceHandler(BaseHandler):
         Returns:
             A Deferred.
         """
-        if not self.hs.is_mine(observer_user):
+        if not self.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
         yield self.store.del_presence_list(
@@ -834,7 +934,11 @@ def _format_user_presence_state(state, now):
 
 class PresenceEventSource(object):
     def __init__(self, hs):
-        self.hs = hs
+        # We can't call get_presence_handler here because there's a cycle:
+        #
+        #   Presence -> Notifier -> PresenceEventSource -> Presence
+        #
+        self.get_presence_handler = hs.get_presence_handler
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
 
@@ -860,7 +964,7 @@ class PresenceEventSource(object):
                 from_key = int(from_key)
             room_ids = room_ids or []
 
-            presence = self.hs.get_handlers().presence_handler
+            presence = self.get_presence_handler()
             stream_change_cache = self.store.presence_stream_cache
 
             if not room_ids:
@@ -877,13 +981,13 @@ class PresenceEventSource(object):
 
             user_ids_changed = set()
             changed = None
-            if from_key and max_token - from_key < 100:
-                # For small deltas, its quicker to get all changes and then
-                # work out if we share a room or they're in our presence list
+            if from_key:
                 changed = stream_change_cache.get_all_entities_changed(from_key)
 
-            # get_all_entities_changed can return None
-            if changed is not None:
+            if changed is not None and len(changed) < 500:
+                # For small deltas, its quicker to get all changes and then
+                # work out if we share a room or they're in our presence list
+                get_updates_counter.inc("stream")
                 for other_user_id in changed:
                     if other_user_id in friends:
                         user_ids_changed.add(other_user_id)
@@ -895,6 +999,8 @@ class PresenceEventSource(object):
             else:
                 # Too many possible updates. Find all users we can see and check
                 # if any of them have changed.
+                get_updates_counter.inc("full")
+
                 user_ids_to_check = set()
                 for room_id in room_ids:
                     users = yield self.store.get_users_in_room(room_id)
@@ -933,15 +1039,14 @@ class PresenceEventSource(object):
         return self.get_new_events(user, from_key=None, include_offline=False)
 
 
-def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     """Checks the presence of users that have timed out and updates as
     appropriate.
 
     Args:
         user_states(list): List of UserPresenceState's to check.
         is_mine_fn (fn): Function that returns if a user_id is ours
-        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
-            active syncs.
+        syncing_user_ids (set): Set of user_ids with active syncs.
         now (int): Current time in ms.
 
     Returns:
@@ -952,21 +1057,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
     for state in user_states:
         is_mine = is_mine_fn(state.user_id)
 
-        new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+        new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
         if new_state:
             changes[state.user_id] = new_state
 
     return changes.values()
 
 
-def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+def handle_timeout(state, is_mine, syncing_user_ids, now):
     """Checks the presence of the user to see if any of the timers have elapsed
 
     Args:
         state (UserPresenceState)
         is_mine (bool): Whether the user is ours
-        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
-            active syncs.
+        syncing_user_ids (set): Set of user_ids with active syncs.
         now (int): Current time in ms.
 
     Returns:
@@ -1000,7 +1104,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
 
         # If there are have been no sync for a while (and none ongoing),
         # set presence to offline
-        if not user_to_num_current_syncs.get(user_id, 0):
+        if user_id not in syncing_user_ids:
             if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
                     state=PresenceState.OFFLINE,
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index a390a1b8bd..e62722d78d 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -29,6 +29,8 @@ class ReceiptsHandler(BaseHandler):
     def __init__(self, hs):
         super(ReceiptsHandler, self).__init__(hs)
 
+        self.server_name = hs.config.server_name
+        self.store = hs.get_datastore()
         self.hs = hs
         self.federation = hs.get_replication_layer()
         self.federation.register_edu_handler(
@@ -131,12 +133,9 @@ class ReceiptsHandler(BaseHandler):
             event_ids = receipt["event_ids"]
             data = receipt["data"]
 
-            remotedomains = set()
-
-            rm_handler = self.hs.get_handlers().room_member_handler
-            yield rm_handler.fetch_room_distributions_into(
-                room_id, localusers=None, remotedomains=remotedomains
-            )
+            remotedomains = yield self.store.get_joined_hosts_for_room(room_id)
+            remotedomains = remotedomains.copy()
+            remotedomains.discard(self.server_name)
 
             logger.debug("Sending receipt to: %r", remotedomains)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index b0862067e1..bbc07b045e 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -16,7 +16,7 @@
 """Contains functions for registering clients."""
 from twisted.internet import defer
 
-from synapse.types import UserID
+from synapse.types import UserID, Requester
 from synapse.api.errors import (
     AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
 )
@@ -358,8 +358,62 @@ class RegistrationHandler(BaseHandler):
         )
         defer.returnValue(data)
 
+    @defer.inlineCallbacks
+    def get_or_create_user(self, localpart, displayname, duration_seconds):
+        """Creates a new user if the user does not exist,
+        else revokes all previous access tokens and generates a new one.
+
+        Args:
+            localpart : The local part of the user ID to register. If None,
+              one will be randomly generated.
+        Returns:
+            A tuple of (user_id, access_token).
+        Raises:
+            RegistrationError if there was a problem registering.
+        """
+        yield run_on_reactor()
+
+        if localpart is None:
+            raise SynapseError(400, "Request must include user id")
+
+        need_register = True
+
+        try:
+            yield self.check_username(localpart)
+        except SynapseError as e:
+            if e.errcode == Codes.USER_IN_USE:
+                need_register = False
+            else:
+                raise
+
+        user = UserID(localpart, self.hs.hostname)
+        user_id = user.to_string()
+        auth_handler = self.hs.get_handlers().auth_handler
+        token = auth_handler.generate_short_term_login_token(user_id, duration_seconds)
+
+        if need_register:
+            yield self.store.register(
+                user_id=user_id,
+                token=token,
+                password_hash=None
+            )
+
+            yield registered_user(self.distributor, user)
+        else:
+            yield self.store.user_delete_access_tokens(user_id=user_id)
+            yield self.store.add_access_token_to_user(user_id=user_id, token=token)
+
+        if displayname is not None:
+            logger.info("setting user display name: %s -> %s", user_id, displayname)
+            profile_handler = self.hs.get_handlers().profile_handler
+            yield profile_handler.set_displayname(
+                user, Requester(user, token, False), displayname
+            )
+
+        defer.returnValue((user_id, token))
+
     def auth_handler(self):
-        return self.hs.get_handlers().auth_handler
+        return self.hs.get_auth_handler()
 
     @defer.inlineCallbacks
     def guest_access_token_for(self, medium, address, inviter_user_id):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index dd9c18df84..9fd34588dd 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -26,6 +26,7 @@ from synapse.api.errors import AuthError, StoreError, SynapseError
 from synapse.util import stringutils
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
+from synapse.visibility import filter_events_for_client
 
 from collections import OrderedDict
 
@@ -35,6 +36,8 @@ import string
 
 logger = logging.getLogger(__name__)
 
+REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
+
 id_server_scheme = "https://"
 
 
@@ -343,8 +346,14 @@ class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
         self.response_cache = ResponseCache()
+        self.remote_list_request_cache = ResponseCache()
+        self.remote_list_cache = {}
+        self.fetch_looping_call = hs.get_clock().looping_call(
+            self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
+        )
+        self.fetch_all_remote_lists()
 
-    def get_public_room_list(self):
+    def get_local_public_room_list(self):
         result = self.response_cache.get(())
         if not result:
             result = self.response_cache.set((), self._get_public_room_list())
@@ -426,6 +435,55 @@ class RoomListHandler(BaseHandler):
         # FIXME (erikj): START is no longer a valid value
         defer.returnValue({"start": "START", "end": "END", "chunk": results})
 
+    @defer.inlineCallbacks
+    def fetch_all_remote_lists(self):
+        deferred = self.hs.get_replication_layer().get_public_rooms(
+            self.hs.config.secondary_directory_servers
+        )
+        self.remote_list_request_cache.set((), deferred)
+        self.remote_list_cache = yield deferred
+
+    @defer.inlineCallbacks
+    def get_aggregated_public_room_list(self):
+        """
+        Get the public room list from this server and the servers
+        specified in the secondary_directory_servers config option.
+        XXX: Pagination...
+        """
+        # We return the results from out cache which is updated by a looping call,
+        # unless we're missing a cache entry, in which case wait for the result
+        # of the fetch if there's one in progress. If not, omit that server.
+        wait = False
+        for s in self.hs.config.secondary_directory_servers:
+            if s not in self.remote_list_cache:
+                logger.warn("No cached room list from %s: waiting for fetch", s)
+                wait = True
+                break
+
+        if wait and self.remote_list_request_cache.get(()):
+            yield self.remote_list_request_cache.get(())
+
+        public_rooms = yield self.get_local_public_room_list()
+
+        # keep track of which room IDs we've seen so we can de-dup
+        room_ids = set()
+
+        # tag all the ones in our list with our server name.
+        # Also add the them to the de-deping set
+        for room in public_rooms['chunk']:
+            room["server_name"] = self.hs.hostname
+            room_ids.add(room["room_id"])
+
+        # Now add the results from federation
+        for server_name, server_result in self.remote_list_cache.items():
+            for room in server_result["chunk"]:
+                if room["room_id"] not in room_ids:
+                    room["server_name"] = server_name
+                    public_rooms["chunk"].append(room)
+                    room_ids.add(room["room_id"])
+
+        defer.returnValue(public_rooms)
+
 
 class RoomContextHandler(BaseHandler):
     @defer.inlineCallbacks
@@ -449,10 +507,12 @@ class RoomContextHandler(BaseHandler):
         now_token = yield self.hs.get_event_sources().get_current_token()
 
         def filter_evts(events):
-            return self._filter_events_for_client(
+            return filter_events_for_client(
+                self.store,
                 user.to_string(),
                 events,
-                is_peeking=is_guest)
+                is_peeking=is_guest
+            )
 
         event = yield self.store.get_event(event_id, get_prev_content=True,
                                            allow_none=True)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed2cda837f..7e616f44fd 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -56,35 +56,6 @@ class RoomMemberHandler(BaseHandler):
         self.distributor.declare("user_left_room")
 
     @defer.inlineCallbacks
-    def get_room_members(self, room_id):
-        users = yield self.store.get_users_in_room(room_id)
-
-        defer.returnValue([UserID.from_string(u) for u in users])
-
-    @defer.inlineCallbacks
-    def fetch_room_distributions_into(self, room_id, localusers=None,
-                                      remotedomains=None, ignore_user=None):
-        """Fetch the distribution of a room, adding elements to either
-        'localusers' or 'remotedomains', which should be a set() if supplied.
-        If ignore_user is set, ignore that user.
-
-        This function returns nothing; its result is performed by the
-        side-effect on the two passed sets. This allows easy accumulation of
-        member lists of multiple rooms at once if required.
-        """
-        members = yield self.get_room_members(room_id)
-        for member in members:
-            if ignore_user is not None and member == ignore_user:
-                continue
-
-            if self.hs.is_mine(member):
-                if localusers is not None:
-                    localusers.add(member)
-            else:
-                if remotedomains is not None:
-                    remotedomains.add(member.domain)
-
-    @defer.inlineCallbacks
     def _local_membership_update(
         self, requester, target, room_id, membership,
         prev_event_ids,
@@ -113,7 +84,7 @@ class RoomMemberHandler(BaseHandler):
             prev_event_ids=prev_event_ids,
         )
 
-        yield self.handle_new_client_event(
+        yield msg_handler.handle_new_client_event(
             requester,
             event,
             context,
@@ -357,7 +328,7 @@ class RoomMemberHandler(BaseHandler):
                 # so don't really fit into the general auth process.
                 raise AuthError(403, "Guest access not allowed")
 
-        yield self.handle_new_client_event(
+        yield message_handler.handle_new_client_event(
             requester,
             event,
             context,
@@ -427,21 +398,6 @@ class RoomMemberHandler(BaseHandler):
             defer.returnValue(UserID.from_string(invite.sender))
 
     @defer.inlineCallbacks
-    def get_joined_rooms_for_user(self, user):
-        """Returns a list of roomids that the user has any of the given
-        membership states in."""
-
-        rooms = yield self.store.get_rooms_for_user(
-            user.to_string(),
-        )
-
-        # For some reason the list of events contains duplicates
-        # TODO(paul): work out why because I really don't think it should
-        room_ids = set(r.room_id for r in rooms)
-
-        defer.returnValue(room_ids)
-
-    @defer.inlineCallbacks
     def do_3pid_invite(
             self,
             room_id,
@@ -457,8 +413,7 @@ class RoomMemberHandler(BaseHandler):
         )
 
         if invitee:
-            handler = self.hs.get_handlers().room_member_handler
-            yield handler.update_membership(
+            yield self.update_membership(
                 requester,
                 UserID.from_string(invitee),
                 room_id,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9937d8dd7f..df75d70fac 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -21,6 +21,7 @@ from synapse.api.constants import Membership, EventTypes
 from synapse.api.filtering import Filter
 from synapse.api.errors import SynapseError
 from synapse.events.utils import serialize_event
+from synapse.visibility import filter_events_for_client
 
 from unpaddedbase64 import decode_base64, encode_base64
 
@@ -172,8 +173,8 @@ class SearchHandler(BaseHandler):
 
             filtered_events = search_filter.filter([r["event"] for r in results])
 
-            events = yield self._filter_events_for_client(
-                user.to_string(), filtered_events
+            events = yield filter_events_for_client(
+                self.store, user.to_string(), filtered_events
             )
 
             events.sort(key=lambda e: -rank_map[e.event_id])
@@ -223,8 +224,8 @@ class SearchHandler(BaseHandler):
                     r["event"] for r in results
                 ])
 
-                events = yield self._filter_events_for_client(
-                    user.to_string(), filtered_events
+                events = yield filter_events_for_client(
+                    self.store, user.to_string(), filtered_events
                 )
 
                 room_events.extend(events)
@@ -281,12 +282,12 @@ class SearchHandler(BaseHandler):
                     event.room_id, event.event_id, before_limit, after_limit
                 )
 
-                res["events_before"] = yield self._filter_events_for_client(
-                    user.to_string(), res["events_before"]
+                res["events_before"] = yield filter_events_for_client(
+                    self.store, user.to_string(), res["events_before"]
                 )
 
-                res["events_after"] = yield self._filter_events_for_client(
-                    user.to_string(), res["events_after"]
+                res["events_after"] = yield filter_events_for_client(
+                    self.store, user.to_string(), res["events_after"]
                 )
 
                 res["start"] = now_token.copy_and_replace(
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 231140b655..be26a491ff 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,15 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import BaseHandler
-
-from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
+from synapse.visibility import filter_events_for_client
 
 from twisted.internet import defer
 
@@ -132,10 +130,12 @@ class SyncResult(collections.namedtuple("SyncResult", [
         )
 
 
-class SyncHandler(BaseHandler):
+class SyncHandler(object):
 
     def __init__(self, hs):
-        super(SyncHandler, self).__init__(hs)
+        self.store = hs.get_datastore()
+        self.notifier = hs.get_notifier()
+        self.presence_handler = hs.get_presence_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
         self.response_cache = ResponseCache()
@@ -193,189 +193,15 @@ class SyncHandler(BaseHandler):
         Returns:
             A Deferred SyncResult.
         """
-        if since_token is None or full_state:
-            return self.full_state_sync(sync_config, since_token)
-        else:
-            return self.incremental_sync_with_gap(sync_config, since_token)
-
-    @defer.inlineCallbacks
-    def full_state_sync(self, sync_config, timeline_since_token):
-        """Get a sync for a client which is starting without any state.
-
-        If a 'message_since_token' is given, only timeline events which have
-        happened since that token will be returned.
-
-        Returns:
-            A Deferred SyncResult.
-        """
-        now_token = yield self.event_sources.get_current_token()
-
-        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token
-        )
-
-        presence_stream = self.event_sources.sources["presence"]
-        # TODO (mjark): This looks wrong, shouldn't we be getting the presence
-        # UP to the present rather than after the present?
-        pagination_config = PaginationConfig(from_token=now_token)
-        presence, _ = yield presence_stream.get_pagination_rows(
-            user=sync_config.user,
-            pagination_config=pagination_config.get_source_config("presence"),
-            key=None
-        )
-
-        membership_list = (
-            Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
-        )
-
-        room_list = yield self.store.get_rooms_for_user_where_membership_is(
-            user_id=sync_config.user.to_string(),
-            membership_list=membership_list
-        )
-
-        account_data, account_data_by_room = (
-            yield self.store.get_account_data_for_user(
-                sync_config.user.to_string()
-            )
-        )
-
-        account_data['m.push_rules'] = yield self.push_rules_for_user(
-            sync_config.user
-        )
-
-        tags_by_room = yield self.store.get_tags_for_user(
-            sync_config.user.to_string()
-        )
-
-        joined = []
-        invited = []
-        archived = []
-
-        user_id = sync_config.user.to_string()
-
-        @defer.inlineCallbacks
-        def _generate_room_entry(event):
-            if event.membership == Membership.JOIN:
-                room_result = yield self.full_state_sync_for_joined_room(
-                    room_id=event.room_id,
-                    sync_config=sync_config,
-                    now_token=now_token,
-                    timeline_since_token=timeline_since_token,
-                    ephemeral_by_room=ephemeral_by_room,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
-                joined.append(room_result)
-            elif event.membership == Membership.INVITE:
-                invite = yield self.store.get_event(event.event_id)
-                invited.append(InvitedSyncResult(
-                    room_id=event.room_id,
-                    invite=invite,
-                ))
-            elif event.membership in (Membership.LEAVE, Membership.BAN):
-                # Always send down rooms we were banned or kicked from.
-                if not sync_config.filter_collection.include_leave:
-                    if event.membership == Membership.LEAVE:
-                        if user_id == event.sender:
-                            return
-
-                leave_token = now_token.copy_and_replace(
-                    "room_key", "s%d" % (event.stream_ordering,)
-                )
-                room_result = yield self.full_state_sync_for_archived_room(
-                    sync_config=sync_config,
-                    room_id=event.room_id,
-                    leave_event_id=event.event_id,
-                    leave_token=leave_token,
-                    timeline_since_token=timeline_since_token,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
-                archived.append(room_result)
-
-        yield concurrently_execute(_generate_room_entry, room_list, 10)
-
-        account_data_for_user = sync_config.filter_collection.filter_account_data(
-            self.account_data_for_user(account_data)
-        )
-
-        presence = sync_config.filter_collection.filter_presence(
-            presence
-        )
-
-        defer.returnValue(SyncResult(
-            presence=presence,
-            account_data=account_data_for_user,
-            joined=joined,
-            invited=invited,
-            archived=archived,
-            next_batch=now_token,
-        ))
-
-    @defer.inlineCallbacks
-    def full_state_sync_for_joined_room(self, room_id, sync_config,
-                                        now_token, timeline_since_token,
-                                        ephemeral_by_room, tags_by_room,
-                                        account_data_by_room):
-        """Sync a room for a client which is starting without any state
-        Returns:
-            A Deferred JoinedSyncResult.
-        """
-
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, now_token, since_token=timeline_since_token
-        )
-
-        room_sync = yield self.incremental_sync_with_gap_for_room(
-            room_id, sync_config,
-            now_token=now_token,
-            since_token=timeline_since_token,
-            ephemeral_by_room=ephemeral_by_room,
-            tags_by_room=tags_by_room,
-            account_data_by_room=account_data_by_room,
-            batch=batch,
-            full_state=True,
-        )
-
-        defer.returnValue(room_sync)
+        return self.generate_sync_result(sync_config, since_token, full_state)
 
     @defer.inlineCallbacks
     def push_rules_for_user(self, user):
         user_id = user.to_string()
-        rawrules = yield self.store.get_push_rules_for_user(user_id)
-        enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
-        rules = format_push_rules_for_user(user, rawrules, enabled_map)
+        rules = yield self.store.get_push_rules_for_user(user_id)
+        rules = format_push_rules_for_user(user, rules)
         defer.returnValue(rules)
 
-    def account_data_for_user(self, account_data):
-        account_data_events = []
-
-        for account_data_type, content in account_data.items():
-            account_data_events.append({
-                "type": account_data_type,
-                "content": content,
-            })
-
-        return account_data_events
-
-    def account_data_for_room(self, room_id, tags_by_room, account_data_by_room):
-        account_data_events = []
-        tags = tags_by_room.get(room_id)
-        if tags is not None:
-            account_data_events.append({
-                "type": "m.tag",
-                "content": {"tags": tags},
-            })
-
-        account_data = account_data_by_room.get(room_id, {})
-        for account_data_type, content in account_data.items():
-            account_data_events.append({
-                "type": account_data_type,
-                "content": content,
-            })
-
-        return account_data_events
-
     @defer.inlineCallbacks
     def ephemeral_by_room(self, sync_config, now_token, since_token=None):
         """Get the ephemeral events for each room the user is in
@@ -438,256 +264,44 @@ class SyncHandler(BaseHandler):
 
         defer.returnValue((now_token, ephemeral_by_room))
 
-    def full_state_sync_for_archived_room(self, room_id, sync_config,
-                                          leave_event_id, leave_token,
-                                          timeline_since_token, tags_by_room,
-                                          account_data_by_room):
-        """Sync a room for a client which is starting without any state
-        Returns:
-            A Deferred ArchivedSyncResult.
-        """
-
-        return self.incremental_sync_for_archived_room(
-            sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
-            account_data_by_room, full_state=True, leave_token=leave_token,
-        )
-
-    @defer.inlineCallbacks
-    def incremental_sync_with_gap(self, sync_config, since_token):
-        """ Get the incremental delta needed to bring the client up to
-        date with the server.
-        Returns:
-            A Deferred SyncResult.
-        """
-        now_token = yield self.event_sources.get_current_token()
-
-        rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-        room_ids = [room.room_id for room in rooms]
-
-        presence_source = self.event_sources.sources["presence"]
-        presence, presence_key = yield presence_source.get_new_events(
-            user=sync_config.user,
-            from_key=since_token.presence_key,
-            limit=sync_config.filter_collection.presence_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("presence_key", presence_key)
-
-        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token, since_token
-        )
-
-        rm_handler = self.hs.get_handlers().room_member_handler
-        app_service = yield self.store.get_app_service_by_user_id(
-            sync_config.user.to_string()
-        )
-        if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
-        else:
-            joined_room_ids = yield rm_handler.get_joined_rooms_for_user(
-                sync_config.user
-            )
-
-        user_id = sync_config.user.to_string()
-
-        timeline_limit = sync_config.filter_collection.timeline_limit()
-
-        tags_by_room = yield self.store.get_updated_tags(
-            user_id,
-            since_token.account_data_key,
-        )
-
-        account_data, account_data_by_room = (
-            yield self.store.get_updated_account_data_for_user(
-                user_id,
-                since_token.account_data_key,
-            )
-        )
-
-        push_rules_changed = yield self.store.have_push_rules_changed_for_user(
-            user_id, int(since_token.push_rules_key)
-        )
-
-        if push_rules_changed:
-            account_data["m.push_rules"] = yield self.push_rules_for_user(
-                sync_config.user
-            )
-
-        # Get a list of membership change events that have happened.
-        rooms_changed = yield self.store.get_membership_changes_for_user(
-            user_id, since_token.room_key, now_token.room_key
-        )
-
-        mem_change_events_by_room_id = {}
-        for event in rooms_changed:
-            mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
-
-        newly_joined_rooms = []
-        archived = []
-        invited = []
-        for room_id, events in mem_change_events_by_room_id.items():
-            non_joins = [e for e in events if e.membership != Membership.JOIN]
-            has_join = len(non_joins) != len(events)
-
-            # We want to figure out if we joined the room at some point since
-            # the last sync (even if we have since left). This is to make sure
-            # we do send down the room, and with full state, where necessary
-            if room_id in joined_room_ids or has_join:
-                old_state = yield self.get_state_at(room_id, since_token)
-                old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
-                if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
-                        newly_joined_rooms.append(room_id)
-
-                if room_id in joined_room_ids:
-                    continue
-
-            if not non_joins:
-                continue
-
-            # Only bother if we're still currently invited
-            should_invite = non_joins[-1].membership == Membership.INVITE
-            if should_invite:
-                room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
-                if room_sync:
-                    invited.append(room_sync)
-
-            # Always include leave/ban events. Just take the last one.
-            # TODO: How do we handle ban -> leave in same batch?
-            leave_events = [
-                e for e in non_joins
-                if e.membership in (Membership.LEAVE, Membership.BAN)
-            ]
-
-            if leave_events:
-                leave_event = leave_events[-1]
-                room_sync = yield self.incremental_sync_for_archived_room(
-                    sync_config, room_id, leave_event.event_id, since_token,
-                    tags_by_room, account_data_by_room,
-                    full_state=room_id in newly_joined_rooms
-                )
-                if room_sync:
-                    archived.append(room_sync)
-
-        # Get all events for rooms we're currently joined to.
-        room_to_events = yield self.store.get_room_events_stream_for_rooms(
-            room_ids=joined_room_ids,
-            from_key=since_token.room_key,
-            to_key=now_token.room_key,
-            limit=timeline_limit + 1,
-        )
-
-        joined = []
-        # We loop through all room ids, even if there are no new events, in case
-        # there are non room events taht we need to notify about.
-        for room_id in joined_room_ids:
-            room_entry = room_to_events.get(room_id, None)
-
-            if room_entry:
-                events, start_key = room_entry
-
-                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
-
-                newly_joined_room = room_id in newly_joined_rooms
-                full_state = newly_joined_room
-
-                batch = yield self.load_filtered_recents(
-                    room_id, sync_config, prev_batch_token,
-                    since_token=since_token,
-                    recents=events,
-                    newly_joined_room=newly_joined_room,
-                )
-            else:
-                batch = TimelineBatch(
-                    events=[],
-                    prev_batch=since_token,
-                    limited=False,
-                )
-                full_state = False
-
-            room_sync = yield self.incremental_sync_with_gap_for_room(
-                room_id=room_id,
-                sync_config=sync_config,
-                since_token=since_token,
-                now_token=now_token,
-                ephemeral_by_room=ephemeral_by_room,
-                tags_by_room=tags_by_room,
-                account_data_by_room=account_data_by_room,
-                batch=batch,
-                full_state=full_state,
-            )
-            if room_sync:
-                joined.append(room_sync)
-
-        # For each newly joined room, we want to send down presence of
-        # existing users.
-        presence_handler = self.hs.get_handlers().presence_handler
-        extra_presence_users = set()
-        for room_id in newly_joined_rooms:
-            users = yield self.store.get_users_in_room(event.room_id)
-            extra_presence_users.update(users)
-
-        # For each new member, send down presence.
-        for joined_sync in joined:
-            it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values())
-            for event in it:
-                if event.type == EventTypes.Member:
-                    if event.membership == Membership.JOIN:
-                        extra_presence_users.add(event.state_key)
-
-        states = yield presence_handler.get_states(
-            [u for u in extra_presence_users if u != user_id],
-            as_event=True,
-        )
-        presence.extend(states)
-
-        account_data_for_user = sync_config.filter_collection.filter_account_data(
-            self.account_data_for_user(account_data)
-        )
-
-        presence = sync_config.filter_collection.filter_presence(
-            presence
-        )
-
-        defer.returnValue(SyncResult(
-            presence=presence,
-            account_data=account_data_for_user,
-            joined=joined,
-            invited=invited,
-            archived=archived,
-            next_batch=now_token,
-        ))
-
     @defer.inlineCallbacks
-    def load_filtered_recents(self, room_id, sync_config, now_token,
-                              since_token=None, recents=None, newly_joined_room=False):
+    def _load_filtered_recents(self, room_id, sync_config, now_token,
+                               since_token=None, recents=None, newly_joined_room=False):
         """
         Returns:
             a Deferred TimelineBatch
         """
         with Measure(self.clock, "load_filtered_recents"):
-            filtering_factor = 2
             timeline_limit = sync_config.filter_collection.timeline_limit()
-            load_limit = max(timeline_limit * filtering_factor, 10)
-            max_repeat = 5  # Only try a few times per room, otherwise
-            room_key = now_token.room_key
-            end_key = room_key
 
             if recents is None or newly_joined_room or timeline_limit < len(recents):
                 limited = True
             else:
                 limited = False
 
-            if recents is not None:
+            if recents:
                 recents = sync_config.filter_collection.filter_room_timeline(recents)
-                recents = yield self._filter_events_for_client(
+                recents = yield filter_events_for_client(
+                    self.store,
                     sync_config.user.to_string(),
                     recents,
                 )
             else:
                 recents = []
 
+            if not limited:
+                defer.returnValue(TimelineBatch(
+                    events=recents,
+                    prev_batch=now_token,
+                    limited=False
+                ))
+
+            filtering_factor = 2
+            load_limit = max(timeline_limit * filtering_factor, 10)
+            max_repeat = 5  # Only try a few times per room, otherwise
+            room_key = now_token.room_key
+            end_key = room_key
+
             since_key = None
             if since_token and not newly_joined_room:
                 since_key = since_token.room_key
@@ -702,7 +316,8 @@ class SyncHandler(BaseHandler):
                 loaded_recents = sync_config.filter_collection.filter_room_timeline(
                     events
                 )
-                loaded_recents = yield self._filter_events_for_client(
+                loaded_recents = yield filter_events_for_client(
+                    self.store,
                     sync_config.user.to_string(),
                     loaded_recents,
                 )
@@ -730,103 +345,6 @@ class SyncHandler(BaseHandler):
         ))
 
     @defer.inlineCallbacks
-    def incremental_sync_with_gap_for_room(self, room_id, sync_config,
-                                           since_token, now_token,
-                                           ephemeral_by_room, tags_by_room,
-                                           account_data_by_room,
-                                           batch, full_state=False):
-        state = yield self.compute_state_delta(
-            room_id, batch, sync_config, since_token, now_token,
-            full_state=full_state
-        )
-
-        account_data = self.account_data_for_room(
-            room_id, tags_by_room, account_data_by_room
-        )
-
-        account_data = sync_config.filter_collection.filter_room_account_data(
-            account_data
-        )
-
-        ephemeral = sync_config.filter_collection.filter_room_ephemeral(
-            ephemeral_by_room.get(room_id, [])
-        )
-
-        unread_notifications = {}
-        room_sync = JoinedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=state,
-            ephemeral=ephemeral,
-            account_data=account_data,
-            unread_notifications=unread_notifications,
-        )
-
-        if room_sync:
-            notifs = yield self.unread_notifs_for_room_id(
-                room_id, sync_config
-            )
-
-            if notifs is not None:
-                unread_notifications["notification_count"] = notifs["notify_count"]
-                unread_notifications["highlight_count"] = notifs["highlight_count"]
-
-        logger.debug("Room sync: %r", room_sync)
-
-        defer.returnValue(room_sync)
-
-    @defer.inlineCallbacks
-    def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
-                                           since_token, tags_by_room,
-                                           account_data_by_room, full_state,
-                                           leave_token=None):
-        """ Get the incremental delta needed to bring the client up to date for
-        the archived room.
-        Returns:
-            A Deferred ArchivedSyncResult
-        """
-
-        if not leave_token:
-            stream_token = yield self.store.get_stream_token_for_event(
-                leave_event_id
-            )
-
-            leave_token = since_token.copy_and_replace("room_key", stream_token)
-
-        if since_token and since_token.is_after(leave_token):
-            defer.returnValue(None)
-
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, leave_token, since_token,
-        )
-
-        logger.debug("Recents %r", batch)
-
-        state_events_delta = yield self.compute_state_delta(
-            room_id, batch, sync_config, since_token, leave_token,
-            full_state=full_state
-        )
-
-        account_data = self.account_data_for_room(
-            room_id, tags_by_room, account_data_by_room
-        )
-
-        account_data = sync_config.filter_collection.filter_room_account_data(
-            account_data
-        )
-
-        room_sync = ArchivedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=state_events_delta,
-            account_data=account_data,
-        )
-
-        logger.debug("Room sync: %r", room_sync)
-
-        defer.returnValue(room_sync)
-
-    @defer.inlineCallbacks
     def get_state_after_event(self, event):
         """
         Get the room state after the given event
@@ -951,26 +469,6 @@ class SyncHandler(BaseHandler):
                 for e in sync_config.filter_collection.filter_room_state(state.values())
             })
 
-    def check_joined_room(self, sync_config, state_delta):
-        """
-        Check if the user has just joined the given room (so should
-        be given the full state)
-
-        Args:
-            sync_config(synapse.handlers.sync.SyncConfig):
-            state_delta(dict[(str,str), synapse.events.FrozenEvent]): the
-                difference in state since the last sync
-
-        Returns:
-             A deferred Tuple (state_delta, limited)
-        """
-        join_event = state_delta.get((
-            EventTypes.Member, sync_config.user.to_string()), None)
-        if join_event is not None:
-            if join_event.content["membership"] == Membership.JOIN:
-                return True
-        return False
-
     @defer.inlineCallbacks
     def unread_notifs_for_room_id(self, room_id, sync_config):
         with Measure(self.clock, "unread_notifs_for_room_id"):
@@ -991,6 +489,551 @@ class SyncHandler(BaseHandler):
             # count is whatever it was last time.
             defer.returnValue(None)
 
+    @defer.inlineCallbacks
+    def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+        """Generates a sync result.
+
+        Args:
+            sync_config (SyncConfig)
+            since_token (StreamToken)
+            full_state (bool)
+
+        Returns:
+            Deferred(SyncResult)
+        """
+
+        # NB: The now_token gets changed by some of the generate_sync_* methods,
+        # this is due to some of the underlying streams not supporting the ability
+        # to query up to a given point.
+        # Always use the `now_token` in `SyncResultBuilder`
+        now_token = yield self.event_sources.get_current_token()
+
+        sync_result_builder = SyncResultBuilder(
+            sync_config, full_state,
+            since_token=since_token,
+            now_token=now_token,
+        )
+
+        account_data_by_room = yield self._generate_sync_entry_for_account_data(
+            sync_result_builder
+        )
+
+        res = yield self._generate_sync_entry_for_rooms(
+            sync_result_builder, account_data_by_room
+        )
+        newly_joined_rooms, newly_joined_users = res
+
+        yield self._generate_sync_entry_for_presence(
+            sync_result_builder, newly_joined_rooms, newly_joined_users
+        )
+
+        defer.returnValue(SyncResult(
+            presence=sync_result_builder.presence,
+            account_data=sync_result_builder.account_data,
+            joined=sync_result_builder.joined,
+            invited=sync_result_builder.invited,
+            archived=sync_result_builder.archived,
+            next_batch=sync_result_builder.now_token,
+        ))
+
+    @defer.inlineCallbacks
+    def _generate_sync_entry_for_account_data(self, sync_result_builder):
+        """Generates the account data portion of the sync response. Populates
+        `sync_result_builder` with the result.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+
+        Returns:
+            Deferred(dict): A dictionary containing the per room account data.
+        """
+        sync_config = sync_result_builder.sync_config
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+
+        if since_token and not sync_result_builder.full_state:
+            account_data, account_data_by_room = (
+                yield self.store.get_updated_account_data_for_user(
+                    user_id,
+                    since_token.account_data_key,
+                )
+            )
+
+            push_rules_changed = yield self.store.have_push_rules_changed_for_user(
+                user_id, int(since_token.push_rules_key)
+            )
+
+            if push_rules_changed:
+                account_data["m.push_rules"] = yield self.push_rules_for_user(
+                    sync_config.user
+                )
+        else:
+            account_data, account_data_by_room = (
+                yield self.store.get_account_data_for_user(
+                    sync_config.user.to_string()
+                )
+            )
+
+            account_data['m.push_rules'] = yield self.push_rules_for_user(
+                sync_config.user
+            )
+
+        account_data_for_user = sync_config.filter_collection.filter_account_data([
+            {"type": account_data_type, "content": content}
+            for account_data_type, content in account_data.items()
+        ])
+
+        sync_result_builder.account_data = account_data_for_user
+
+        defer.returnValue(account_data_by_room)
+
+    @defer.inlineCallbacks
+    def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
+                                          newly_joined_users):
+        """Generates the presence portion of the sync response. Populates the
+        `sync_result_builder` with the result.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            newly_joined_rooms(list): List of rooms that the user has joined
+                since the last sync (or empty if an initial sync)
+            newly_joined_users(list): List of users that have joined rooms
+                since the last sync (or empty if an initial sync)
+        """
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+        user = sync_result_builder.sync_config.user
+
+        presence_source = self.event_sources.sources["presence"]
+
+        since_token = sync_result_builder.since_token
+        if since_token and not sync_result_builder.full_state:
+            presence_key = since_token.presence_key
+            include_offline = True
+        else:
+            presence_key = None
+            include_offline = False
+
+        presence, presence_key = yield presence_source.get_new_events(
+            user=user,
+            from_key=presence_key,
+            is_guest=sync_config.is_guest,
+            include_offline=include_offline,
+        )
+        sync_result_builder.now_token = now_token.copy_and_replace(
+            "presence_key", presence_key
+        )
+
+        extra_users_ids = set(newly_joined_users)
+        for room_id in newly_joined_rooms:
+            users = yield self.store.get_users_in_room(room_id)
+            extra_users_ids.update(users)
+        extra_users_ids.discard(user.to_string())
+
+        states = yield self.presence_handler.get_states(
+            extra_users_ids,
+            as_event=True,
+        )
+        presence.extend(states)
+
+        # Deduplicate the presence entries so that there's at most one per user
+        presence = {p["content"]["user_id"]: p for p in presence}.values()
+
+        presence = sync_config.filter_collection.filter_presence(
+            presence
+        )
+
+        sync_result_builder.presence = presence
+
+    @defer.inlineCallbacks
+    def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
+        """Generates the rooms portion of the sync response. Populates the
+        `sync_result_builder` with the result.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            account_data_by_room(dict): Dictionary of per room account data
+
+        Returns:
+            Deferred(tuple): Returns a 2-tuple of
+            `(newly_joined_rooms, newly_joined_users)`
+        """
+        user_id = sync_result_builder.sync_config.user.to_string()
+
+        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+            sync_result_builder.sync_config,
+            now_token=sync_result_builder.now_token,
+            since_token=sync_result_builder.since_token,
+        )
+        sync_result_builder.now_token = now_token
+
+        ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
+            "m.ignored_user_list", user_id=user_id,
+        )
+
+        if ignored_account_data:
+            ignored_users = ignored_account_data.get("ignored_users", {}).keys()
+        else:
+            ignored_users = frozenset()
+
+        if sync_result_builder.since_token:
+            res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
+            room_entries, invited, newly_joined_rooms = res
+
+            tags_by_room = yield self.store.get_updated_tags(
+                user_id,
+                sync_result_builder.since_token.account_data_key,
+            )
+        else:
+            res = yield self._get_all_rooms(sync_result_builder, ignored_users)
+            room_entries, invited, newly_joined_rooms = res
+
+            tags_by_room = yield self.store.get_tags_for_user(user_id)
+
+        def handle_room_entries(room_entry):
+            return self._generate_room_entry(
+                sync_result_builder,
+                ignored_users,
+                room_entry,
+                ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
+                tags=tags_by_room.get(room_entry.room_id),
+                account_data=account_data_by_room.get(room_entry.room_id, {}),
+                always_include=sync_result_builder.full_state,
+            )
+
+        yield concurrently_execute(handle_room_entries, room_entries, 10)
+
+        sync_result_builder.invited.extend(invited)
+
+        # Now we want to get any newly joined users
+        newly_joined_users = set()
+        if sync_result_builder.since_token:
+            for joined_sync in sync_result_builder.joined:
+                it = itertools.chain(
+                    joined_sync.timeline.events, joined_sync.state.values()
+                )
+                for event in it:
+                    if event.type == EventTypes.Member:
+                        if event.membership == Membership.JOIN:
+                            newly_joined_users.add(event.state_key)
+
+        defer.returnValue((newly_joined_rooms, newly_joined_users))
+
+    @defer.inlineCallbacks
+    def _get_rooms_changed(self, sync_result_builder, ignored_users):
+        """Gets the the changes that have happened since the last sync.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            ignored_users(set(str)): Set of users ignored by user.
+
+        Returns:
+            Deferred(tuple): Returns a tuple of the form:
+            `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)`
+        """
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+
+        assert since_token
+
+        app_service = yield self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            rooms = yield self.store.get_app_service_rooms(app_service)
+            joined_room_ids = set(r.room_id for r in rooms)
+        else:
+            rooms = yield self.store.get_rooms_for_user(user_id)
+            joined_room_ids = set(r.room_id for r in rooms)
+
+        # Get a list of membership change events that have happened.
+        rooms_changed = yield self.store.get_membership_changes_for_user(
+            user_id, since_token.room_key, now_token.room_key
+        )
+
+        mem_change_events_by_room_id = {}
+        for event in rooms_changed:
+            mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
+
+        newly_joined_rooms = []
+        room_entries = []
+        invited = []
+        for room_id, events in mem_change_events_by_room_id.items():
+            non_joins = [e for e in events if e.membership != Membership.JOIN]
+            has_join = len(non_joins) != len(events)
+
+            # We want to figure out if we joined the room at some point since
+            # the last sync (even if we have since left). This is to make sure
+            # we do send down the room, and with full state, where necessary
+            if room_id in joined_room_ids or has_join:
+                old_state = yield self.get_state_at(room_id, since_token)
+                old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+                if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
+                    newly_joined_rooms.append(room_id)
+
+                if room_id in joined_room_ids:
+                    continue
+
+            if not non_joins:
+                continue
+
+            # Only bother if we're still currently invited
+            should_invite = non_joins[-1].membership == Membership.INVITE
+            if should_invite:
+                if event.sender not in ignored_users:
+                    room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+                    if room_sync:
+                        invited.append(room_sync)
+
+            # Always include leave/ban events. Just take the last one.
+            # TODO: How do we handle ban -> leave in same batch?
+            leave_events = [
+                e for e in non_joins
+                if e.membership in (Membership.LEAVE, Membership.BAN)
+            ]
+
+            if leave_events:
+                leave_event = leave_events[-1]
+                leave_stream_token = yield self.store.get_stream_token_for_event(
+                    leave_event.event_id
+                )
+                leave_token = since_token.copy_and_replace(
+                    "room_key", leave_stream_token
+                )
+
+                if since_token and since_token.is_after(leave_token):
+                    continue
+
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    rtype="archived",
+                    events=None,
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=since_token,
+                    upto_token=leave_token,
+                ))
+
+        timeline_limit = sync_config.filter_collection.timeline_limit()
+
+        # Get all events for rooms we're currently joined to.
+        room_to_events = yield self.store.get_room_events_stream_for_rooms(
+            room_ids=joined_room_ids,
+            from_key=since_token.room_key,
+            to_key=now_token.room_key,
+            limit=timeline_limit + 1,
+        )
+
+        # We loop through all room ids, even if there are no new events, in case
+        # there are non room events taht we need to notify about.
+        for room_id in joined_room_ids:
+            room_entry = room_to_events.get(room_id, None)
+
+            if room_entry:
+                events, start_key = room_entry
+
+                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    rtype="joined",
+                    events=events,
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=None if room_id in newly_joined_rooms else since_token,
+                    upto_token=prev_batch_token,
+                ))
+            else:
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    rtype="joined",
+                    events=[],
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=since_token,
+                    upto_token=since_token,
+                ))
+
+        defer.returnValue((room_entries, invited, newly_joined_rooms))
+
+    @defer.inlineCallbacks
+    def _get_all_rooms(self, sync_result_builder, ignored_users):
+        """Returns entries for all rooms for the user.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            ignored_users(set(str)): Set of users ignored by user.
+
+        Returns:
+            Deferred(tuple): Returns a tuple of the form:
+            `([RoomSyncResultBuilder], [InvitedSyncResult], [])`
+        """
+
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+
+        membership_list = (
+            Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
+        )
+
+        room_list = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id=user_id,
+            membership_list=membership_list
+        )
+
+        room_entries = []
+        invited = []
+
+        for event in room_list:
+            if event.membership == Membership.JOIN:
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=event.room_id,
+                    rtype="joined",
+                    events=None,
+                    newly_joined=False,
+                    full_state=True,
+                    since_token=since_token,
+                    upto_token=now_token,
+                ))
+            elif event.membership == Membership.INVITE:
+                if event.sender in ignored_users:
+                    continue
+                invite = yield self.store.get_event(event.event_id)
+                invited.append(InvitedSyncResult(
+                    room_id=event.room_id,
+                    invite=invite,
+                ))
+            elif event.membership in (Membership.LEAVE, Membership.BAN):
+                # Always send down rooms we were banned or kicked from.
+                if not sync_config.filter_collection.include_leave:
+                    if event.membership == Membership.LEAVE:
+                        if user_id == event.sender:
+                            continue
+
+                leave_token = now_token.copy_and_replace(
+                    "room_key", "s%d" % (event.stream_ordering,)
+                )
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=event.room_id,
+                    rtype="archived",
+                    events=None,
+                    newly_joined=False,
+                    full_state=True,
+                    since_token=since_token,
+                    upto_token=leave_token,
+                ))
+
+        defer.returnValue((room_entries, invited, []))
+
+    @defer.inlineCallbacks
+    def _generate_room_entry(self, sync_result_builder, ignored_users,
+                             room_builder, ephemeral, tags, account_data,
+                             always_include=False):
+        """Populates the `joined` and `archived` section of `sync_result_builder`
+        based on the `room_builder`.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            ignored_users(set(str)): Set of users ignored by user.
+            room_builder(RoomSyncResultBuilder)
+            ephemeral(list): List of new ephemeral events for room
+            tags(list): List of *all* tags for room, or None if there has been
+                no change.
+            account_data(list): List of new account data for room
+            always_include(bool): Always include this room in the sync response,
+                even if empty.
+        """
+        newly_joined = room_builder.newly_joined
+        full_state = (
+            room_builder.full_state
+            or newly_joined
+            or sync_result_builder.full_state
+        )
+        events = room_builder.events
+
+        # We want to shortcut out as early as possible.
+        if not (always_include or account_data or ephemeral or full_state):
+            if events == [] and tags is None:
+                return
+
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+
+        room_id = room_builder.room_id
+        since_token = room_builder.since_token
+        upto_token = room_builder.upto_token
+
+        batch = yield self._load_filtered_recents(
+            room_id, sync_config,
+            now_token=upto_token,
+            since_token=since_token,
+            recents=events,
+            newly_joined_room=newly_joined,
+        )
+
+        account_data_events = []
+        if tags is not None:
+            account_data_events.append({
+                "type": "m.tag",
+                "content": {"tags": tags},
+            })
+
+        for account_data_type, content in account_data.items():
+            account_data_events.append({
+                "type": account_data_type,
+                "content": content,
+            })
+
+        account_data = sync_config.filter_collection.filter_room_account_data(
+            account_data_events
+        )
+
+        ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+
+        if not (always_include or batch or account_data or ephemeral or full_state):
+            return
+
+        state = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, now_token,
+            full_state=full_state
+        )
+
+        if room_builder.rtype == "joined":
+            unread_notifications = {}
+            room_sync = JoinedSyncResult(
+                room_id=room_id,
+                timeline=batch,
+                state=state,
+                ephemeral=ephemeral,
+                account_data=account_data_events,
+                unread_notifications=unread_notifications,
+            )
+
+            if room_sync or always_include:
+                notifs = yield self.unread_notifs_for_room_id(
+                    room_id, sync_config
+                )
+
+                if notifs is not None:
+                    unread_notifications["notification_count"] = notifs["notify_count"]
+                    unread_notifications["highlight_count"] = notifs["highlight_count"]
+
+                sync_result_builder.joined.append(room_sync)
+        elif room_builder.rtype == "archived":
+            room_sync = ArchivedSyncResult(
+                room_id=room_id,
+                timeline=batch,
+                state=state,
+                account_data=account_data,
+            )
+            if room_sync or always_include:
+                sync_result_builder.archived.append(room_sync)
+        else:
+            raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+
 
 def _action_has_highlight(actions):
     for action in actions:
@@ -1038,3 +1081,51 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
         (e.type, e.state_key): e
         for e in evs
     }
+
+
+class SyncResultBuilder(object):
+    "Used to help build up a new SyncResult for a user"
+    def __init__(self, sync_config, full_state, since_token, now_token):
+        """
+        Args:
+            sync_config(SyncConfig)
+            full_state(bool): The full_state flag as specified by user
+            since_token(StreamToken): The token supplied by user, or None.
+            now_token(StreamToken): The token to sync up to.
+        """
+        self.sync_config = sync_config
+        self.full_state = full_state
+        self.since_token = since_token
+        self.now_token = now_token
+
+        self.presence = []
+        self.account_data = []
+        self.joined = []
+        self.invited = []
+        self.archived = []
+
+
+class RoomSyncResultBuilder(object):
+    """Stores information needed to create either a `JoinedSyncResult` or
+    `ArchivedSyncResult`.
+    """
+    def __init__(self, room_id, rtype, events, newly_joined, full_state,
+                 since_token, upto_token):
+        """
+        Args:
+            room_id(str)
+            rtype(str): One of `"joined"` or `"archived"`
+            events(list): List of events to include in the room, (more events
+                may be added when generating result).
+            newly_joined(bool): If the user has newly joined the room
+            full_state(bool): Whether the full state should be sent in result
+            since_token(StreamToken): Earliest point to return events from, or None
+            upto_token(StreamToken): Latest point to return events from.
+        """
+        self.room_id = room_id
+        self.rtype = rtype
+        self.events = events
+        self.newly_joined = newly_joined
+        self.full_state = full_state
+        self.since_token = since_token
+        self.upto_token = upto_token
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 8ce27f49ec..861b8f7989 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,8 +15,6 @@
 
 from twisted.internet import defer
 
-from ._base import BaseHandler
-
 from synapse.api.errors import SynapseError, AuthError
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.metrics import Measure
@@ -32,14 +30,16 @@ logger = logging.getLogger(__name__)
 
 # A tiny object useful for storing a user's membership in a room, as a mapping
 # key
-RoomMember = namedtuple("RoomMember", ("room_id", "user"))
+RoomMember = namedtuple("RoomMember", ("room_id", "user_id"))
 
 
-class TypingNotificationHandler(BaseHandler):
+class TypingHandler(object):
     def __init__(self, hs):
-        super(TypingNotificationHandler, self).__init__(hs)
-
-        self.homeserver = hs
+        self.store = hs.get_datastore()
+        self.server_name = hs.config.server_name
+        self.auth = hs.get_auth()
+        self.is_mine_id = hs.is_mine_id
+        self.notifier = hs.get_notifier()
 
         self.clock = hs.get_clock()
 
@@ -67,20 +67,23 @@ class TypingNotificationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def started_typing(self, target_user, auth_user, room_id, timeout):
-        if not self.hs.is_mine(target_user):
+        target_user_id = target_user.to_string()
+        auth_user_id = auth_user.to_string()
+
+        if not self.is_mine_id(target_user_id):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if target_user != auth_user:
+        if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
 
-        yield self.auth.check_joined_room(room_id, target_user.to_string())
+        yield self.auth.check_joined_room(room_id, target_user_id)
 
         logger.debug(
-            "%s has started typing in %s", target_user.to_string(), room_id
+            "%s has started typing in %s", target_user_id, room_id
         )
 
         until = self.clock.time_msec() + timeout
-        member = RoomMember(room_id=room_id, user=target_user)
+        member = RoomMember(room_id=room_id, user_id=target_user_id)
 
         was_present = member in self._member_typing_until
 
@@ -104,25 +107,28 @@ class TypingNotificationHandler(BaseHandler):
 
         yield self._push_update(
             room_id=room_id,
-            user=target_user,
+            user_id=target_user_id,
             typing=True,
         )
 
     @defer.inlineCallbacks
     def stopped_typing(self, target_user, auth_user, room_id):
-        if not self.hs.is_mine(target_user):
+        target_user_id = target_user.to_string()
+        auth_user_id = auth_user.to_string()
+
+        if not self.is_mine_id(target_user_id):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if target_user != auth_user:
+        if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
 
-        yield self.auth.check_joined_room(room_id, target_user.to_string())
+        yield self.auth.check_joined_room(room_id, target_user_id)
 
         logger.debug(
-            "%s has stopped typing in %s", target_user.to_string(), room_id
+            "%s has stopped typing in %s", target_user_id, room_id
         )
 
-        member = RoomMember(room_id=room_id, user=target_user)
+        member = RoomMember(room_id=room_id, user_id=target_user_id)
 
         if member in self._member_typing_timer:
             self.clock.cancel_call_later(self._member_typing_timer[member])
@@ -132,8 +138,9 @@ class TypingNotificationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
-        if self.hs.is_mine(user):
-            member = RoomMember(room_id=room_id, user=user)
+        user_id = user.to_string()
+        if self.is_mine_id(user_id):
+            member = RoomMember(room_id=room_id, user_id=user_id)
             yield self._stopped_typing(member)
 
     @defer.inlineCallbacks
@@ -144,7 +151,7 @@ class TypingNotificationHandler(BaseHandler):
 
         yield self._push_update(
             room_id=member.room_id,
-            user=member.user,
+            user_id=member.user_id,
             typing=False,
         )
 
@@ -156,61 +163,53 @@ class TypingNotificationHandler(BaseHandler):
             del self._member_typing_timer[member]
 
     @defer.inlineCallbacks
-    def _push_update(self, room_id, user, typing):
-        localusers = set()
-        remotedomains = set()
-
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(
-            room_id, localusers=localusers, remotedomains=remotedomains
-        )
-
-        if localusers:
-            self._push_update_local(
-                room_id=room_id,
-                user=user,
-                typing=typing
-            )
+    def _push_update(self, room_id, user_id, typing):
+        domains = yield self.store.get_joined_hosts_for_room(room_id)
 
         deferreds = []
-        for domain in remotedomains:
-            deferreds.append(self.federation.send_edu(
-                destination=domain,
-                edu_type="m.typing",
-                content={
-                    "room_id": room_id,
-                    "user_id": user.to_string(),
-                    "typing": typing,
-                },
-            ))
+        for domain in domains:
+            if domain == self.server_name:
+                self._push_update_local(
+                    room_id=room_id,
+                    user_id=user_id,
+                    typing=typing
+                )
+            else:
+                deferreds.append(self.federation.send_edu(
+                    destination=domain,
+                    edu_type="m.typing",
+                    content={
+                        "room_id": room_id,
+                        "user_id": user_id,
+                        "typing": typing,
+                    },
+                ))
 
         yield defer.DeferredList(deferreds, consumeErrors=True)
 
     @defer.inlineCallbacks
     def _recv_edu(self, origin, content):
         room_id = content["room_id"]
-        user = UserID.from_string(content["user_id"])
+        user_id = content["user_id"]
 
-        localusers = set()
+        # Check that the string is a valid user id
+        UserID.from_string(user_id)
 
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        yield rm_handler.fetch_room_distributions_into(
-            room_id, localusers=localusers
-        )
+        domains = yield self.store.get_joined_hosts_for_room(room_id)
 
-        if localusers:
+        if self.server_name in domains:
             self._push_update_local(
                 room_id=room_id,
-                user=user,
+                user_id=user_id,
                 typing=content["typing"]
             )
 
-    def _push_update_local(self, room_id, user, typing):
+    def _push_update_local(self, room_id, user_id, typing):
         room_set = self._room_typing.setdefault(room_id, set())
         if typing:
-            room_set.add(user)
+            room_set.add(user_id)
         else:
-            room_set.discard(user)
+            room_set.discard(user_id)
 
         self._latest_room_serial += 1
         self._room_serials[room_id] = self._latest_room_serial
@@ -226,9 +225,7 @@ class TypingNotificationHandler(BaseHandler):
         for room_id, serial in self._room_serials.items():
             if last_id < serial and serial <= current_id:
                 typing = self._room_typing[room_id]
-                typing_bytes = json.dumps([
-                    u.to_string() for u in typing
-                ], ensure_ascii=False)
+                typing_bytes = json.dumps(list(typing), ensure_ascii=False)
                 rows.append((serial, room_id, typing_bytes))
         rows.sort()
         return rows
@@ -238,34 +235,26 @@ class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
         self.clock = hs.get_clock()
-        self._handler = None
-        self._room_member_handler = None
-
-    def handler(self):
-        # Avoid cyclic dependency in handler setup
-        if not self._handler:
-            self._handler = self.hs.get_handlers().typing_notification_handler
-        return self._handler
-
-    def room_member_handler(self):
-        if not self._room_member_handler:
-            self._room_member_handler = self.hs.get_handlers().room_member_handler
-        return self._room_member_handler
+        # We can't call get_typing_handler here because there's a cycle:
+        #
+        #   Typing -> Notifier -> TypingNotificationEventSource -> Typing
+        #
+        self.get_typing_handler = hs.get_typing_handler
 
     def _make_event_for(self, room_id):
-        typing = self.handler()._room_typing[room_id]
+        typing = self.get_typing_handler()._room_typing[room_id]
         return {
             "type": "m.typing",
             "room_id": room_id,
             "content": {
-                "user_ids": [u.to_string() for u in typing],
+                "user_ids": list(typing),
             },
         }
 
     def get_new_events(self, from_key, room_ids, **kwargs):
         with Measure(self.clock, "typing.get_new_events"):
             from_key = int(from_key)
-            handler = self.handler()
+            handler = self.get_typing_handler()
 
             events = []
             for room_id in room_ids:
@@ -279,7 +268,7 @@ class TypingNotificationEventSource(object):
             return events, handler._latest_room_serial
 
     def get_current_key(self):
-        return self.handler()._latest_room_serial
+        return self.get_typing_handler()._latest_room_serial
 
     def get_pagination_rows(self, user, pagination_config, key):
         return ([], pagination_config.from_key)