summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py50
-rw-r--r--synapse/app/pusher.py23
-rw-r--r--synapse/appservice/scheduler.py14
-rw-r--r--synapse/config/server.py10
-rw-r--r--synapse/federation/federation_client.py20
-rw-r--r--synapse/federation/transport/client.py12
-rw-r--r--synapse/federation/transport/server.py50
-rw-r--r--synapse/handlers/__init__.py16
-rw-r--r--synapse/handlers/appservice.py15
-rw-r--r--synapse/handlers/auth.py14
-rw-r--r--synapse/handlers/directory.py3
-rw-r--r--synapse/handlers/presence.py119
-rw-r--r--synapse/handlers/register.py2
-rw-r--r--synapse/handlers/room.py59
-rw-r--r--synapse/handlers/sync.py1147
-rw-r--r--synapse/handlers/typing.py64
-rw-r--r--synapse/notifier.py10
-rw-r--r--synapse/push/emailpusher.py12
-rw-r--r--synapse/push/mailer.py32
-rw-r--r--synapse/replication/presence_resource.py59
-rw-r--r--synapse/replication/resource.py2
-rw-r--r--synapse/rest/client/v1/login.py11
-rw-r--r--synapse/rest/client/v1/pusher.py57
-rw-r--r--synapse/rest/client/v1/room.py5
-rw-r--r--synapse/rest/client/v2_alpha/account.py4
-rw-r--r--synapse/rest/client/v2_alpha/auth.py2
-rw-r--r--synapse/rest/client/v2_alpha/register.py2
-rw-r--r--synapse/rest/client/v2_alpha/tokenrefresh.py2
-rw-r--r--synapse/server.py25
-rw-r--r--synapse/storage/__init__.py16
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/event_push_actions.py95
-rw-r--r--synapse/storage/push_rule.py13
-rw-r--r--synapse/storage/receipts.py2
-rw-r--r--synapse/storage/schema/delta/30/as_users.py4
-rw-r--r--synapse/storage/signatures.py25
36 files changed, 1318 insertions, 680 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 2474a1453b..31e1abb964 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""This module contains classes for authenticating the user."""
 from canonicaljson import encode_canonical_json
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json, SignatureVerifyException
@@ -42,13 +41,20 @@ AuthEventTypes = (
 
 
 class Auth(object):
-
+    """
+    FIXME: This class contains a mix of functions for authenticating users
+    of our client-server API and authenticating events added to room graphs.
+    """
     def __init__(self, hs):
         self.hs = hs
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.state = hs.get_state_handler()
         self.TOKEN_NOT_FOUND_HTTP_STATUS = 401
+        # Docs for these currently lives at
+        # https://github.com/matrix-org/matrix-doc/blob/master/drafts/macaroons_caveats.rst
+        # In addition, we have type == delete_pusher which grants access only to
+        # delete pushers.
         self._KNOWN_CAVEAT_PREFIXES = set([
             "gen = ",
             "guest = ",
@@ -120,6 +126,24 @@ class Auth(object):
                 return allowed
 
             self.check_event_sender_in_room(event, auth_events)
+
+            # Special case to allow m.room.third_party_invite events wherever
+            # a user is allowed to issue invites.  Fixes
+            # https://github.com/vector-im/vector-web/issues/1208 hopefully
+            if event.type == EventTypes.ThirdPartyInvite:
+                user_level = self._get_user_power_level(event.user_id, auth_events)
+                invite_level = self._get_named_level(auth_events, "invite", 0)
+
+                if user_level < invite_level:
+                    raise AuthError(
+                        403, (
+                            "You cannot issue a third party invite for %s." %
+                            (event.content.display_name,)
+                        )
+                    )
+                else:
+                    return True
+
             self._can_send_event(event, auth_events)
 
             if event.type == EventTypes.PowerLevels:
@@ -507,7 +531,7 @@ class Auth(object):
             return default
 
     @defer.inlineCallbacks
-    def get_user_by_req(self, request, allow_guest=False):
+    def get_user_by_req(self, request, allow_guest=False, rights="access"):
         """ Get a registered user's ID.
 
         Args:
@@ -529,7 +553,7 @@ class Auth(object):
                 )
 
             access_token = request.args["access_token"][0]
-            user_info = yield self.get_user_by_access_token(access_token)
+            user_info = yield self.get_user_by_access_token(access_token, rights)
             user = user_info["user"]
             token_id = user_info["token_id"]
             is_guest = user_info["is_guest"]
@@ -590,7 +614,7 @@ class Auth(object):
         defer.returnValue(user_id)
 
     @defer.inlineCallbacks
-    def get_user_by_access_token(self, token):
+    def get_user_by_access_token(self, token, rights="access"):
         """ Get a registered user's ID.
 
         Args:
@@ -601,7 +625,7 @@ class Auth(object):
             AuthError if no user by that token exists or the token is invalid.
         """
         try:
-            ret = yield self.get_user_from_macaroon(token)
+            ret = yield self.get_user_from_macaroon(token, rights)
         except AuthError:
             # TODO(daniel): Remove this fallback when all existing access tokens
             # have been re-issued as macaroons.
@@ -609,11 +633,11 @@ class Auth(object):
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
-    def get_user_from_macaroon(self, macaroon_str):
+    def get_user_from_macaroon(self, macaroon_str, rights="access"):
         try:
             macaroon = pymacaroons.Macaroon.deserialize(macaroon_str)
 
-            self.validate_macaroon(macaroon, "access", self.hs.config.expire_access_token)
+            self.validate_macaroon(macaroon, rights, self.hs.config.expire_access_token)
 
             user_prefix = "user_id = "
             user = None
@@ -636,6 +660,13 @@ class Auth(object):
                     "is_guest": True,
                     "token_id": None,
                 }
+            elif rights == "delete_pusher":
+                # We don't store these tokens in the database
+                ret = {
+                    "user": user,
+                    "is_guest": False,
+                    "token_id": None,
+                }
             else:
                 # This codepath exists so that we can actually return a
                 # token ID, because we use token IDs in place of device
@@ -667,7 +698,8 @@ class Auth(object):
 
         Args:
             macaroon(pymacaroons.Macaroon): The macaroon to validate
-            type_string(str): The kind of token this is (e.g. "access", "refresh")
+            type_string(str): The kind of token required (e.g. "access", "refresh",
+                              "delete_pusher")
             verify_expiry(bool): Whether to verify whether the macaroon has expired.
                 This should really always be True, but no clients currently implement
                 token refresh, so we can't enforce expiry yet.
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 135dd58c15..f1de1e7ce9 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -21,6 +21,7 @@ from synapse.config._base import ConfigError
 from synapse.config.database import DatabaseConfig
 from synapse.config.logger import LoggingConfig
 from synapse.config.emailconfig import EmailConfig
+from synapse.config.key import KeyConfig
 from synapse.http.site import SynapseSite
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.storage.roommember import RoomMemberStore
@@ -63,6 +64,26 @@ class SlaveConfig(DatabaseConfig):
         self.pid_file = self.abspath(config.get("pid_file"))
         self.public_baseurl = config["public_baseurl"]
 
+        # some things used by the auth handler but not actually used in the
+        # pusher codebase
+        self.bcrypt_rounds = None
+        self.ldap_enabled = None
+        self.ldap_server = None
+        self.ldap_port = None
+        self.ldap_tls = None
+        self.ldap_search_base = None
+        self.ldap_search_property = None
+        self.ldap_email_property = None
+        self.ldap_full_name_property = None
+
+        # We would otherwise try to use the registration shared secret as the
+        # macaroon shared secret if there was no macaroon_shared_secret, but
+        # that means pulling in RegistrationConfig too. We don't need to be
+        # backwards compaitible in the pusher codebase so just make people set
+        # macaroon_shared_secret. We set this to None to prevent it referencing
+        # an undefined key.
+        self.registration_shared_secret = None
+
     def default_config(self, server_name, **kwargs):
         pid_file = self.abspath("pusher.pid")
         return """\
@@ -95,7 +116,7 @@ class SlaveConfig(DatabaseConfig):
         """ % locals()
 
 
-class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig):
+class PusherSlaveConfig(SlaveConfig, LoggingConfig, EmailConfig, KeyConfig):
     pass
 
 
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 47a4e9f864..9afc8fd754 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -56,22 +56,22 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class AppServiceScheduler(object):
+class ApplicationServiceScheduler(object):
     """ Public facing API for this module. Does the required DI to tie the
     components together. This also serves as the "event_pool", which in this
     case is a simple array.
     """
 
-    def __init__(self, clock, store, as_api):
-        self.clock = clock
-        self.store = store
-        self.as_api = as_api
+    def __init__(self, hs):
+        self.clock = hs.get_clock()
+        self.store = hs.get_datastore()
+        self.as_api = hs.get_application_service_api()
 
         def create_recoverer(service, callback):
-            return _Recoverer(clock, store, as_api, service, callback)
+            return _Recoverer(self.clock, self.store, self.as_api, service, callback)
 
         self.txn_ctrl = _TransactionController(
-            clock, store, as_api, create_recoverer
+            self.clock, self.store, self.as_api, create_recoverer
         )
         self.queuer = _ServiceQueuer(self.txn_ctrl)
 
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 0b5f462e44..c2d8f8a52f 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -29,6 +29,7 @@ class ServerConfig(Config):
         self.user_agent_suffix = config.get("user_agent_suffix")
         self.use_frozen_dicts = config.get("use_frozen_dicts", True)
         self.public_baseurl = config.get("public_baseurl")
+        self.secondary_directory_servers = config.get("secondary_directory_servers", [])
 
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != '/':
@@ -156,6 +157,15 @@ class ServerConfig(Config):
         # hard limit.
         soft_file_limit: 0
 
+        # A list of other Home Servers to fetch the public room directory from
+        # and include in the public room directory of this home server
+        # This is a temporary stopgap solution to populate new server with a
+        # list of rooms until there exists a good solution of a decentralized
+        # room directory.
+        # secondary_directory_servers:
+        #     - matrix.org
+        #     - vector.im
+
         # List of ports that Synapse should listen on, their purpose and their
         # configuration.
         listeners:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 37ee469fa2..d835c1b038 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -24,6 +24,7 @@ from synapse.api.errors import (
     CodeMessageException, HttpResponseException, SynapseError,
 )
 from synapse.util import unwrapFirstError
+from synapse.util.async import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
@@ -551,6 +552,25 @@ class FederationClient(FederationBase):
         raise RuntimeError("Failed to send to any server.")
 
     @defer.inlineCallbacks
+    def get_public_rooms(self, destinations):
+        results_by_server = {}
+
+        @defer.inlineCallbacks
+        def _get_result(s):
+            if s == self.server_name:
+                defer.returnValue()
+
+            try:
+                result = yield self.transport_layer.get_public_rooms(s)
+                results_by_server[s] = result
+            except:
+                logger.exception("Error getting room list from server %r", s)
+
+        yield concurrently_execute(_get_result, destinations, 3)
+
+        defer.returnValue(results_by_server)
+
+    @defer.inlineCallbacks
     def query_auth(self, destination, room_id, event_id, local_auth):
         """
         Params:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index cd2841c4db..ebb698e278 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -226,6 +226,18 @@ class TransportLayerClient(object):
 
     @defer.inlineCallbacks
     @log_function
+    def get_public_rooms(self, remote_server):
+        path = PREFIX + "/publicRooms"
+
+        response = yield self.client.get_json(
+            destination=remote_server,
+            path=path,
+        )
+
+        defer.returnValue(response)
+
+    @defer.inlineCallbacks
+    @log_function
     def exchange_third_party_invite(self, destination, room_id, event_dict):
         path = PREFIX + "/exchange_third_party_invite/%s" % (room_id,)
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 5b6c7d11dd..a1a334955f 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -134,10 +134,12 @@ class Authenticator(object):
 
 
 class BaseFederationServlet(object):
-    def __init__(self, handler, authenticator, ratelimiter, server_name):
+    def __init__(self, handler, authenticator, ratelimiter, server_name,
+                 room_list_handler):
         self.handler = handler
         self.authenticator = authenticator
         self.ratelimiter = ratelimiter
+        self.room_list_handler = room_list_handler
 
     def _wrap(self, code):
         authenticator = self.authenticator
@@ -492,6 +494,50 @@ class OpenIdUserInfo(BaseFederationServlet):
         return code
 
 
+class PublicRoomList(BaseFederationServlet):
+    """
+    Fetch the public room list for this server.
+
+    This API returns information in the same format as /publicRooms on the
+    client API, but will only ever include local public rooms and hence is
+    intended for consumption by other home servers.
+
+    GET /publicRooms HTTP/1.1
+
+    HTTP/1.1 200 OK
+    Content-Type: application/json
+
+    {
+        "chunk": [
+            {
+                "aliases": [
+                    "#test:localhost"
+                ],
+                "guest_can_join": false,
+                "name": "test room",
+                "num_joined_members": 3,
+                "room_id": "!whkydVegtvatLfXmPN:localhost",
+                "world_readable": false
+            }
+        ],
+        "end": "END",
+        "start": "START"
+    }
+    """
+
+    PATH = "/publicRooms"
+
+    @defer.inlineCallbacks
+    def on_GET(self, request):
+        data = yield self.room_list_handler.get_local_public_room_list()
+        defer.returnValue((200, data))
+
+    # Avoid doing remote HS authorization checks which are done by default by
+    # BaseFederationServlet.
+    def _wrap(self, code):
+        return code
+
+
 SERVLET_CLASSES = (
     FederationSendServlet,
     FederationPullServlet,
@@ -513,6 +559,7 @@ SERVLET_CLASSES = (
     FederationThirdPartyInviteExchangeServlet,
     On3pidBindServlet,
     OpenIdUserInfo,
+    PublicRoomList,
 )
 
 
@@ -523,4 +570,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
             authenticator=authenticator,
             ratelimiter=ratelimiter,
             server_name=hs.hostname,
+            room_list_handler=hs.get_room_list_handler(),
         ).register(resource)
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 9442ae6f1d..d28e07f0d9 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -13,11 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.appservice.scheduler import AppServiceScheduler
-from synapse.appservice.api import ApplicationServiceApi
 from .register import RegistrationHandler
 from .room import (
-    RoomCreationHandler, RoomListHandler, RoomContextHandler,
+    RoomCreationHandler, RoomContextHandler,
 )
 from .room_member import RoomMemberHandler
 from .message import MessageHandler
@@ -26,8 +24,6 @@ from .federation import FederationHandler
 from .profile import ProfileHandler
 from .directory import DirectoryHandler
 from .admin import AdminHandler
-from .appservice import ApplicationServicesHandler
-from .auth import AuthHandler
 from .identity import IdentityHandler
 from .receipts import ReceiptsHandler
 from .search import SearchHandler
@@ -50,19 +46,9 @@ class Handlers(object):
         self.event_handler = EventHandler(hs)
         self.federation_handler = FederationHandler(hs)
         self.profile_handler = ProfileHandler(hs)
-        self.room_list_handler = RoomListHandler(hs)
         self.directory_handler = DirectoryHandler(hs)
         self.admin_handler = AdminHandler(hs)
         self.receipts_handler = ReceiptsHandler(hs)
-        asapi = ApplicationServiceApi(hs)
-        self.appservice_handler = ApplicationServicesHandler(
-            hs, asapi, AppServiceScheduler(
-                clock=hs.get_clock(),
-                store=hs.get_datastore(),
-                as_api=asapi
-            )
-        )
-        self.auth_handler = AuthHandler(hs)
         self.identity_handler = IdentityHandler(hs)
         self.search_handler = SearchHandler(hs)
         self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 75fc74c797..051ccdb380 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,6 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.appservice import ApplicationService
-from synapse.types import UserID
 
 import logging
 
@@ -35,16 +34,13 @@ def log_failure(failure):
     )
 
 
-# NB: Purposefully not inheriting BaseHandler since that contains way too much
-# setup code which this handler does not need or use. This makes testing a lot
-# easier.
 class ApplicationServicesHandler(object):
 
-    def __init__(self, hs, appservice_api, appservice_scheduler):
+    def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.hs = hs
-        self.appservice_api = appservice_api
-        self.scheduler = appservice_scheduler
+        self.is_mine_id = hs.is_mine_id
+        self.appservice_api = hs.get_application_service_api()
+        self.scheduler = hs.get_application_service_scheduler()
         self.started_scheduler = False
 
     @defer.inlineCallbacks
@@ -169,8 +165,7 @@ class ApplicationServicesHandler(object):
 
     @defer.inlineCallbacks
     def _is_unknown_user(self, user_id):
-        user = UserID.from_string(user_id)
-        if not self.hs.is_mine(user):
+        if not self.is_mine_id(user_id):
             # we don't know if they are unknown or not since it isn't one of our
             # users. We can't poke ASes.
             defer.returnValue(False)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 68d0d78fc6..200793b5ed 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 from ._base import BaseHandler
 from synapse.api.constants import LoginType
 from synapse.types import UserID
-from synapse.api.errors import AuthError, LoginError, Codes
+from synapse.api.errors import AuthError, LoginError, Codes, StoreError, SynapseError
 from synapse.util.async import run_on_reactor
 
 from twisted.web.client import PartialDownloadError
@@ -529,6 +529,11 @@ class AuthHandler(BaseHandler):
         macaroon.add_first_party_caveat("time < %d" % (expiry,))
         return macaroon.serialize()
 
+    def generate_delete_pusher_token(self, user_id):
+        macaroon = self._generate_base_macaroon(user_id)
+        macaroon.add_first_party_caveat("type = delete_pusher")
+        return macaroon.serialize()
+
     def validate_short_term_login_token_and_get_user_id(self, login_token):
         try:
             macaroon = pymacaroons.Macaroon.deserialize(login_token)
@@ -563,7 +568,12 @@ class AuthHandler(BaseHandler):
 
         except_access_token_ids = [requester.access_token_id] if requester else []
 
-        yield self.store.user_set_password_hash(user_id, password_hash)
+        try:
+            yield self.store.user_set_password_hash(user_id, password_hash)
+        except StoreError as e:
+            if e.code == 404:
+                raise SynapseError(404, "Unknown user", Codes.NOT_FOUND)
+            raise e
         yield self.store.user_delete_access_tokens(
             user_id, except_access_token_ids
         )
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8eeb225811..4bea7f2b19 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -33,6 +33,7 @@ class DirectoryHandler(BaseHandler):
         super(DirectoryHandler, self).__init__(hs)
 
         self.state = hs.get_state_handler()
+        self.appservice_handler = hs.get_application_service_handler()
 
         self.federation = hs.get_replication_layer()
         self.federation.register_query_handler(
@@ -281,7 +282,7 @@ class DirectoryHandler(BaseHandler):
         )
         if not result:
             # Query AS to see if it exists
-            as_handler = self.hs.get_handlers().appservice_handler
+            as_handler = self.appservice_handler
             result = yield as_handler.query_room_alias_exists(room_alias)
         defer.returnValue(result)
 
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 37f57301fb..fc8538b41e 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -68,6 +68,10 @@ FEDERATION_TIMEOUT = 30 * 60 * 1000
 # How often to resend presence to remote servers
 FEDERATION_PING_INTERVAL = 25 * 60 * 1000
 
+# How long we will wait before assuming that the syncs from an external process
+# are dead.
+EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
+
 assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
@@ -158,10 +162,21 @@ class PresenceHandler(object):
         self.serial_to_user = {}
         self._next_serial = 1
 
-        # Keeps track of the number of *ongoing* syncs. While this is non zero
-        # a user will never go offline.
+        # Keeps track of the number of *ongoing* syncs on this process. While
+        # this is non zero a user will never go offline.
         self.user_to_num_current_syncs = {}
 
+        # Keeps track of the number of *ongoing* syncs on other processes.
+        # While any sync is ongoing on another process the user will never
+        # go offline.
+        # Each process has a unique identifier and an update frequency. If
+        # no update is received from that process within the update period then
+        # we assume that all the sync requests on that process have stopped.
+        # Stored as a dict from process_id to set of user_id, and a dict of
+        # process_id to millisecond timestamp last updated.
+        self.external_process_to_current_syncs = {}
+        self.external_process_last_updated_ms = {}
+
         # Start a LoopingCall in 30s that fires every 5s.
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
@@ -272,13 +287,26 @@ class PresenceHandler(object):
             # Fetch the list of users that *may* have timed out. Things may have
             # changed since the timeout was set, so we won't necessarily have to
             # take any action.
-            users_to_check = self.wheel_timer.fetch(now)
+            users_to_check = set(self.wheel_timer.fetch(now))
+
+            # Check whether the lists of syncing processes from an external
+            # process have expired.
+            expired_process_ids = [
+                process_id for process_id, last_update
+                in self.external_process_last_update.items()
+                if now - last_update > EXTERNAL_PROCESS_EXPIRY
+            ]
+            for process_id in expired_process_ids:
+                users_to_check.update(
+                    self.external_process_to_current_syncs.pop(process_id, ())
+                )
+                self.external_process_last_update.pop(process_id)
 
             states = [
                 self.user_to_current_state.get(
                     user_id, UserPresenceState.default(user_id)
                 )
-                for user_id in set(users_to_check)
+                for user_id in users_to_check
             ]
 
             timers_fired_counter.inc_by(len(states))
@@ -286,7 +314,7 @@ class PresenceHandler(object):
             changes = handle_timeouts(
                 states,
                 is_mine_fn=self.is_mine_id,
-                user_to_num_current_syncs=self.user_to_num_current_syncs,
+                syncing_users=self.get_syncing_users(),
                 now=now,
             )
 
@@ -363,6 +391,73 @@ class PresenceHandler(object):
 
         defer.returnValue(_user_syncing())
 
+    def get_currently_syncing_users(self):
+        """Get the set of user ids that are currently syncing on this HS.
+        Returns:
+            set(str): A set of user_id strings.
+        """
+        syncing_user_ids = {
+            user_id for user_id, count in self.user_to_num_current_syncs.items()
+            if count
+        }
+        syncing_user_ids.update(self.external_process_to_current_syncs.values())
+        return syncing_user_ids
+
+    @defer.inlineCallbacks
+    def update_external_syncs(self, process_id, syncing_user_ids):
+        """Update the syncing users for an external process
+
+        Args:
+            process_id(str): An identifier for the process the users are
+                syncing against. This allows synapse to process updates
+                as user start and stop syncing against a given process.
+            syncing_user_ids(set(str)): The set of user_ids that are
+                currently syncing on that server.
+        """
+
+        # Grab the previous list of user_ids that were syncing on that process
+        prev_syncing_user_ids = (
+            self.external_process_to_current_syncs.get(process_id, set())
+        )
+        # Grab the current presence state for both the users that are syncing
+        # now and the users that were syncing before this update.
+        prev_states = yield self.current_state_for_users(
+            syncing_user_ids | prev_syncing_user_ids
+        )
+        updates = []
+        time_now_ms = self.clock.time_msec()
+
+        # For each new user that is syncing check if we need to mark them as
+        # being online.
+        for new_user_id in syncing_user_ids - prev_syncing_user_ids:
+            prev_state = prev_states[new_user_id]
+            if prev_state.state == PresenceState.OFFLINE:
+                updates.append(prev_state.copy_and_replace(
+                    state=PresenceState.ONLINE,
+                    last_active_ts=time_now_ms,
+                    last_user_sync_ts=time_now_ms,
+                ))
+            else:
+                updates.append(prev_state.copy_and_replace(
+                    last_user_sync_ts=time_now_ms,
+                ))
+
+        # For each user that is still syncing or stopped syncing update the
+        # last sync time so that we will correctly apply the grace period when
+        # they stop syncing.
+        for old_user_id in prev_syncing_user_ids:
+            prev_state = prev_states[old_user_id]
+            updates.append(prev_state.copy_and_replace(
+                last_user_sync_ts=time_now_ms,
+            ))
+
+        yield self._update_states(updates)
+
+        # Update the last updated time for the process. We expire the entries
+        # if we don't receive an update in the given timeframe.
+        self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
+        self.external_process_to_current_syncs[process_id] = syncing_user_ids
+
     @defer.inlineCallbacks
     def current_state_for_user(self, user_id):
         """Get the current presence state for a user.
@@ -935,15 +1030,14 @@ class PresenceEventSource(object):
         return self.get_new_events(user, from_key=None, include_offline=False)
 
 
-def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     """Checks the presence of users that have timed out and updates as
     appropriate.
 
     Args:
         user_states(list): List of UserPresenceState's to check.
         is_mine_fn (fn): Function that returns if a user_id is ours
-        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
-            active syncs.
+        syncing_user_ids (set): Set of user_ids with active syncs.
         now (int): Current time in ms.
 
     Returns:
@@ -954,21 +1048,20 @@ def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
     for state in user_states:
         is_mine = is_mine_fn(state.user_id)
 
-        new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+        new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
         if new_state:
             changes[state.user_id] = new_state
 
     return changes.values()
 
 
-def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+def handle_timeout(state, is_mine, syncing_user_ids, now):
     """Checks the presence of the user to see if any of the timers have elapsed
 
     Args:
         state (UserPresenceState)
         is_mine (bool): Whether the user is ours
-        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
-            active syncs.
+        syncing_user_ids (set): Set of user_ids with active syncs.
         now (int): Current time in ms.
 
     Returns:
@@ -1002,7 +1095,7 @@ def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
 
         # If there are have been no sync for a while (and none ongoing),
         # set presence to offline
-        if not user_to_num_current_syncs.get(user_id, 0):
+        if user_id not in syncing_user_ids:
             if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
                     state=PresenceState.OFFLINE,
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 16f33f8371..bbc07b045e 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -413,7 +413,7 @@ class RegistrationHandler(BaseHandler):
         defer.returnValue((user_id, token))
 
     def auth_handler(self):
-        return self.hs.get_handlers().auth_handler
+        return self.hs.get_auth_handler()
 
     @defer.inlineCallbacks
     def guest_access_token_for(self, medium, address, inviter_user_id):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 3d63b3c513..9fd34588dd 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -36,6 +36,8 @@ import string
 
 logger = logging.getLogger(__name__)
 
+REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
+
 id_server_scheme = "https://"
 
 
@@ -344,8 +346,14 @@ class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
         self.response_cache = ResponseCache()
+        self.remote_list_request_cache = ResponseCache()
+        self.remote_list_cache = {}
+        self.fetch_looping_call = hs.get_clock().looping_call(
+            self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
+        )
+        self.fetch_all_remote_lists()
 
-    def get_public_room_list(self):
+    def get_local_public_room_list(self):
         result = self.response_cache.get(())
         if not result:
             result = self.response_cache.set((), self._get_public_room_list())
@@ -427,6 +435,55 @@ class RoomListHandler(BaseHandler):
         # FIXME (erikj): START is no longer a valid value
         defer.returnValue({"start": "START", "end": "END", "chunk": results})
 
+    @defer.inlineCallbacks
+    def fetch_all_remote_lists(self):
+        deferred = self.hs.get_replication_layer().get_public_rooms(
+            self.hs.config.secondary_directory_servers
+        )
+        self.remote_list_request_cache.set((), deferred)
+        self.remote_list_cache = yield deferred
+
+    @defer.inlineCallbacks
+    def get_aggregated_public_room_list(self):
+        """
+        Get the public room list from this server and the servers
+        specified in the secondary_directory_servers config option.
+        XXX: Pagination...
+        """
+        # We return the results from out cache which is updated by a looping call,
+        # unless we're missing a cache entry, in which case wait for the result
+        # of the fetch if there's one in progress. If not, omit that server.
+        wait = False
+        for s in self.hs.config.secondary_directory_servers:
+            if s not in self.remote_list_cache:
+                logger.warn("No cached room list from %s: waiting for fetch", s)
+                wait = True
+                break
+
+        if wait and self.remote_list_request_cache.get(()):
+            yield self.remote_list_request_cache.get(())
+
+        public_rooms = yield self.get_local_public_room_list()
+
+        # keep track of which room IDs we've seen so we can de-dup
+        room_ids = set()
+
+        # tag all the ones in our list with our server name.
+        # Also add the them to the de-deping set
+        for room in public_rooms['chunk']:
+            room["server_name"] = self.hs.hostname
+            room_ids.add(room["room_id"])
+
+        # Now add the results from federation
+        for server_name, server_result in self.remote_list_cache.items():
+            for room in server_result["chunk"]:
+                if room["room_id"] not in room_ids:
+                    room["server_name"] = server_name
+                    public_rooms["chunk"].append(room)
+                    room_ids.add(room["room_id"])
+
+        defer.returnValue(public_rooms)
+
 
 class RoomContextHandler(BaseHandler):
     @defer.inlineCallbacks
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9ebfccc8bf..5307b62b85 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
 from synapse.util.async import concurrently_execute
 from synapse.util.logcontext import LoggingContext
@@ -194,157 +193,7 @@ class SyncHandler(object):
         Returns:
             A Deferred SyncResult.
         """
-        if since_token is None or full_state:
-            return self.full_state_sync(sync_config, since_token)
-        else:
-            return self.incremental_sync_with_gap(sync_config, since_token)
-
-    @defer.inlineCallbacks
-    def full_state_sync(self, sync_config, timeline_since_token):
-        """Get a sync for a client which is starting without any state.
-
-        If a 'message_since_token' is given, only timeline events which have
-        happened since that token will be returned.
-
-        Returns:
-            A Deferred SyncResult.
-        """
-        now_token = yield self.event_sources.get_current_token()
-
-        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token
-        )
-
-        presence_stream = self.event_sources.sources["presence"]
-        # TODO (mjark): This looks wrong, shouldn't we be getting the presence
-        # UP to the present rather than after the present?
-        pagination_config = PaginationConfig(from_token=now_token)
-        presence, _ = yield presence_stream.get_pagination_rows(
-            user=sync_config.user,
-            pagination_config=pagination_config.get_source_config("presence"),
-            key=None
-        )
-
-        membership_list = (
-            Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
-        )
-
-        room_list = yield self.store.get_rooms_for_user_where_membership_is(
-            user_id=sync_config.user.to_string(),
-            membership_list=membership_list
-        )
-
-        account_data, account_data_by_room = (
-            yield self.store.get_account_data_for_user(
-                sync_config.user.to_string()
-            )
-        )
-
-        account_data['m.push_rules'] = yield self.push_rules_for_user(
-            sync_config.user
-        )
-
-        tags_by_room = yield self.store.get_tags_for_user(
-            sync_config.user.to_string()
-        )
-
-        ignored_users = account_data.get(
-            "m.ignored_user_list", {}
-        ).get("ignored_users", {}).keys()
-
-        joined = []
-        invited = []
-        archived = []
-
-        user_id = sync_config.user.to_string()
-
-        @defer.inlineCallbacks
-        def _generate_room_entry(event):
-            if event.membership == Membership.JOIN:
-                room_result = yield self.full_state_sync_for_joined_room(
-                    room_id=event.room_id,
-                    sync_config=sync_config,
-                    now_token=now_token,
-                    timeline_since_token=timeline_since_token,
-                    ephemeral_by_room=ephemeral_by_room,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
-                joined.append(room_result)
-            elif event.membership == Membership.INVITE:
-                if event.sender in ignored_users:
-                    return
-                invite = yield self.store.get_event(event.event_id)
-                invited.append(InvitedSyncResult(
-                    room_id=event.room_id,
-                    invite=invite,
-                ))
-            elif event.membership in (Membership.LEAVE, Membership.BAN):
-                # Always send down rooms we were banned or kicked from.
-                if not sync_config.filter_collection.include_leave:
-                    if event.membership == Membership.LEAVE:
-                        if user_id == event.sender:
-                            return
-
-                leave_token = now_token.copy_and_replace(
-                    "room_key", "s%d" % (event.stream_ordering,)
-                )
-                room_result = yield self.full_state_sync_for_archived_room(
-                    sync_config=sync_config,
-                    room_id=event.room_id,
-                    leave_event_id=event.event_id,
-                    leave_token=leave_token,
-                    timeline_since_token=timeline_since_token,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
-                archived.append(room_result)
-
-        yield concurrently_execute(_generate_room_entry, room_list, 10)
-
-        account_data_for_user = sync_config.filter_collection.filter_account_data(
-            self.account_data_for_user(account_data)
-        )
-
-        presence = sync_config.filter_collection.filter_presence(
-            presence
-        )
-
-        defer.returnValue(SyncResult(
-            presence=presence,
-            account_data=account_data_for_user,
-            joined=joined,
-            invited=invited,
-            archived=archived,
-            next_batch=now_token,
-        ))
-
-    @defer.inlineCallbacks
-    def full_state_sync_for_joined_room(self, room_id, sync_config,
-                                        now_token, timeline_since_token,
-                                        ephemeral_by_room, tags_by_room,
-                                        account_data_by_room):
-        """Sync a room for a client which is starting without any state
-        Returns:
-            A Deferred JoinedSyncResult.
-        """
-
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, now_token, since_token=timeline_since_token
-        )
-
-        room_sync = yield self.incremental_sync_with_gap_for_room(
-            room_id, sync_config,
-            now_token=now_token,
-            since_token=timeline_since_token,
-            ephemeral_by_room=ephemeral_by_room,
-            tags_by_room=tags_by_room,
-            account_data_by_room=account_data_by_room,
-            batch=batch,
-            full_state=True,
-        )
-
-        defer.returnValue(room_sync)
+        return self.generate_sync_result(sync_config, since_token, full_state)
 
     @defer.inlineCallbacks
     def push_rules_for_user(self, user):
@@ -354,35 +203,6 @@ class SyncHandler(object):
         rules = format_push_rules_for_user(user, rawrules, enabled_map)
         defer.returnValue(rules)
 
-    def account_data_for_user(self, account_data):
-        account_data_events = []
-
-        for account_data_type, content in account_data.items():
-            account_data_events.append({
-                "type": account_data_type,
-                "content": content,
-            })
-
-        return account_data_events
-
-    def account_data_for_room(self, room_id, tags_by_room, account_data_by_room):
-        account_data_events = []
-        tags = tags_by_room.get(room_id)
-        if tags is not None:
-            account_data_events.append({
-                "type": "m.tag",
-                "content": {"tags": tags},
-            })
-
-        account_data = account_data_by_room.get(room_id, {})
-        for account_data_type, content in account_data.items():
-            account_data_events.append({
-                "type": account_data_type,
-                "content": content,
-            })
-
-        return account_data_events
-
     @defer.inlineCallbacks
     def ephemeral_by_room(self, sync_config, now_token, since_token=None):
         """Get the ephemeral events for each room the user is in
@@ -445,258 +265,22 @@ class SyncHandler(object):
 
         defer.returnValue((now_token, ephemeral_by_room))
 
-    def full_state_sync_for_archived_room(self, room_id, sync_config,
-                                          leave_event_id, leave_token,
-                                          timeline_since_token, tags_by_room,
-                                          account_data_by_room):
-        """Sync a room for a client which is starting without any state
-        Returns:
-            A Deferred ArchivedSyncResult.
-        """
-
-        return self.incremental_sync_for_archived_room(
-            sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
-            account_data_by_room, full_state=True, leave_token=leave_token,
-        )
-
-    @defer.inlineCallbacks
-    def incremental_sync_with_gap(self, sync_config, since_token):
-        """ Get the incremental delta needed to bring the client up to
-        date with the server.
-        Returns:
-            A Deferred SyncResult.
-        """
-        now_token = yield self.event_sources.get_current_token()
-
-        rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-        room_ids = [room.room_id for room in rooms]
-
-        presence_source = self.event_sources.sources["presence"]
-        presence, presence_key = yield presence_source.get_new_events(
-            user=sync_config.user,
-            from_key=since_token.presence_key,
-            limit=sync_config.filter_collection.presence_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("presence_key", presence_key)
-
-        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token, since_token
-        )
-
-        app_service = yield self.store.get_app_service_by_user_id(
-            sync_config.user.to_string()
-        )
-        if app_service:
-            rooms = yield self.store.get_app_service_rooms(app_service)
-            joined_room_ids = set(r.room_id for r in rooms)
-        else:
-            rooms = yield self.store.get_rooms_for_user(
-                sync_config.user.to_string()
-            )
-            joined_room_ids = set(r.room_id for r in rooms)
-
-        user_id = sync_config.user.to_string()
-
-        timeline_limit = sync_config.filter_collection.timeline_limit()
-
-        tags_by_room = yield self.store.get_updated_tags(
-            user_id,
-            since_token.account_data_key,
-        )
-
-        account_data, account_data_by_room = (
-            yield self.store.get_updated_account_data_for_user(
-                user_id,
-                since_token.account_data_key,
-            )
-        )
-
-        push_rules_changed = yield self.store.have_push_rules_changed_for_user(
-            user_id, int(since_token.push_rules_key)
-        )
-
-        if push_rules_changed:
-            account_data["m.push_rules"] = yield self.push_rules_for_user(
-                sync_config.user
-            )
-
-        ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
-            "m.ignored_user_list", user_id=user_id,
-        )
-
-        if ignored_account_data:
-            ignored_users = ignored_account_data.get("ignored_users", {}).keys()
-        else:
-            ignored_users = frozenset()
-
-        # Get a list of membership change events that have happened.
-        rooms_changed = yield self.store.get_membership_changes_for_user(
-            user_id, since_token.room_key, now_token.room_key
-        )
-
-        mem_change_events_by_room_id = {}
-        for event in rooms_changed:
-            mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
-
-        newly_joined_rooms = []
-        archived = []
-        invited = []
-        for room_id, events in mem_change_events_by_room_id.items():
-            non_joins = [e for e in events if e.membership != Membership.JOIN]
-            has_join = len(non_joins) != len(events)
-
-            # We want to figure out if we joined the room at some point since
-            # the last sync (even if we have since left). This is to make sure
-            # we do send down the room, and with full state, where necessary
-            if room_id in joined_room_ids or has_join:
-                old_state = yield self.get_state_at(room_id, since_token)
-                old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
-                if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
-                        newly_joined_rooms.append(room_id)
-
-                if room_id in joined_room_ids:
-                    continue
-
-            if not non_joins:
-                continue
-
-            # Only bother if we're still currently invited
-            should_invite = non_joins[-1].membership == Membership.INVITE
-            if should_invite:
-                if event.sender not in ignored_users:
-                    room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
-                    if room_sync:
-                        invited.append(room_sync)
-
-            # Always include leave/ban events. Just take the last one.
-            # TODO: How do we handle ban -> leave in same batch?
-            leave_events = [
-                e for e in non_joins
-                if e.membership in (Membership.LEAVE, Membership.BAN)
-            ]
-
-            if leave_events:
-                leave_event = leave_events[-1]
-                room_sync = yield self.incremental_sync_for_archived_room(
-                    sync_config, room_id, leave_event.event_id, since_token,
-                    tags_by_room, account_data_by_room,
-                    full_state=room_id in newly_joined_rooms
-                )
-                if room_sync:
-                    archived.append(room_sync)
-
-        # Get all events for rooms we're currently joined to.
-        room_to_events = yield self.store.get_room_events_stream_for_rooms(
-            room_ids=joined_room_ids,
-            from_key=since_token.room_key,
-            to_key=now_token.room_key,
-            limit=timeline_limit + 1,
-        )
-
-        joined = []
-        # We loop through all room ids, even if there are no new events, in case
-        # there are non room events taht we need to notify about.
-        for room_id in joined_room_ids:
-            room_entry = room_to_events.get(room_id, None)
-
-            if room_entry:
-                events, start_key = room_entry
-
-                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
-
-                newly_joined_room = room_id in newly_joined_rooms
-                full_state = newly_joined_room
-
-                batch = yield self.load_filtered_recents(
-                    room_id, sync_config, prev_batch_token,
-                    since_token=since_token,
-                    recents=events,
-                    newly_joined_room=newly_joined_room,
-                )
-            else:
-                batch = TimelineBatch(
-                    events=[],
-                    prev_batch=since_token,
-                    limited=False,
-                )
-                full_state = False
-
-            room_sync = yield self.incremental_sync_with_gap_for_room(
-                room_id=room_id,
-                sync_config=sync_config,
-                since_token=since_token,
-                now_token=now_token,
-                ephemeral_by_room=ephemeral_by_room,
-                tags_by_room=tags_by_room,
-                account_data_by_room=account_data_by_room,
-                batch=batch,
-                full_state=full_state,
-            )
-            if room_sync:
-                joined.append(room_sync)
-
-        # For each newly joined room, we want to send down presence of
-        # existing users.
-        presence_handler = self.presence_handler
-        extra_presence_users = set()
-        for room_id in newly_joined_rooms:
-            users = yield self.store.get_users_in_room(event.room_id)
-            extra_presence_users.update(users)
-
-        # For each new member, send down presence.
-        for joined_sync in joined:
-            it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values())
-            for event in it:
-                if event.type == EventTypes.Member:
-                    if event.membership == Membership.JOIN:
-                        extra_presence_users.add(event.state_key)
-
-        states = yield presence_handler.get_states(
-            [u for u in extra_presence_users if u != user_id],
-            as_event=True,
-        )
-        presence.extend(states)
-
-        account_data_for_user = sync_config.filter_collection.filter_account_data(
-            self.account_data_for_user(account_data)
-        )
-
-        presence = sync_config.filter_collection.filter_presence(
-            presence
-        )
-
-        defer.returnValue(SyncResult(
-            presence=presence,
-            account_data=account_data_for_user,
-            joined=joined,
-            invited=invited,
-            archived=archived,
-            next_batch=now_token,
-        ))
-
     @defer.inlineCallbacks
-    def load_filtered_recents(self, room_id, sync_config, now_token,
-                              since_token=None, recents=None, newly_joined_room=False):
+    def _load_filtered_recents(self, room_id, sync_config, now_token,
+                               since_token=None, recents=None, newly_joined_room=False):
         """
         Returns:
             a Deferred TimelineBatch
         """
         with Measure(self.clock, "load_filtered_recents"):
-            filtering_factor = 2
             timeline_limit = sync_config.filter_collection.timeline_limit()
-            load_limit = max(timeline_limit * filtering_factor, 10)
-            max_repeat = 5  # Only try a few times per room, otherwise
-            room_key = now_token.room_key
-            end_key = room_key
 
             if recents is None or newly_joined_room or timeline_limit < len(recents):
                 limited = True
             else:
                 limited = False
 
-            if recents is not None:
+            if recents:
                 recents = sync_config.filter_collection.filter_room_timeline(recents)
                 recents = yield filter_events_for_client(
                     self.store,
@@ -706,6 +290,19 @@ class SyncHandler(object):
             else:
                 recents = []
 
+            if not limited:
+                defer.returnValue(TimelineBatch(
+                    events=recents,
+                    prev_batch=now_token,
+                    limited=False
+                ))
+
+            filtering_factor = 2
+            load_limit = max(timeline_limit * filtering_factor, 10)
+            max_repeat = 5  # Only try a few times per room, otherwise
+            room_key = now_token.room_key
+            end_key = room_key
+
             since_key = None
             if since_token and not newly_joined_room:
                 since_key = since_token.room_key
@@ -749,103 +346,6 @@ class SyncHandler(object):
         ))
 
     @defer.inlineCallbacks
-    def incremental_sync_with_gap_for_room(self, room_id, sync_config,
-                                           since_token, now_token,
-                                           ephemeral_by_room, tags_by_room,
-                                           account_data_by_room,
-                                           batch, full_state=False):
-        state = yield self.compute_state_delta(
-            room_id, batch, sync_config, since_token, now_token,
-            full_state=full_state
-        )
-
-        account_data = self.account_data_for_room(
-            room_id, tags_by_room, account_data_by_room
-        )
-
-        account_data = sync_config.filter_collection.filter_room_account_data(
-            account_data
-        )
-
-        ephemeral = sync_config.filter_collection.filter_room_ephemeral(
-            ephemeral_by_room.get(room_id, [])
-        )
-
-        unread_notifications = {}
-        room_sync = JoinedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=state,
-            ephemeral=ephemeral,
-            account_data=account_data,
-            unread_notifications=unread_notifications,
-        )
-
-        if room_sync:
-            notifs = yield self.unread_notifs_for_room_id(
-                room_id, sync_config
-            )
-
-            if notifs is not None:
-                unread_notifications["notification_count"] = notifs["notify_count"]
-                unread_notifications["highlight_count"] = notifs["highlight_count"]
-
-        logger.debug("Room sync: %r", room_sync)
-
-        defer.returnValue(room_sync)
-
-    @defer.inlineCallbacks
-    def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
-                                           since_token, tags_by_room,
-                                           account_data_by_room, full_state,
-                                           leave_token=None):
-        """ Get the incremental delta needed to bring the client up to date for
-        the archived room.
-        Returns:
-            A Deferred ArchivedSyncResult
-        """
-
-        if not leave_token:
-            stream_token = yield self.store.get_stream_token_for_event(
-                leave_event_id
-            )
-
-            leave_token = since_token.copy_and_replace("room_key", stream_token)
-
-        if since_token and since_token.is_after(leave_token):
-            defer.returnValue(None)
-
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, leave_token, since_token,
-        )
-
-        logger.debug("Recents %r", batch)
-
-        state_events_delta = yield self.compute_state_delta(
-            room_id, batch, sync_config, since_token, leave_token,
-            full_state=full_state
-        )
-
-        account_data = self.account_data_for_room(
-            room_id, tags_by_room, account_data_by_room
-        )
-
-        account_data = sync_config.filter_collection.filter_room_account_data(
-            account_data
-        )
-
-        room_sync = ArchivedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=state_events_delta,
-            account_data=account_data,
-        )
-
-        logger.debug("Room sync: %r", room_sync)
-
-        defer.returnValue(room_sync)
-
-    @defer.inlineCallbacks
     def get_state_after_event(self, event):
         """
         Get the room state after the given event
@@ -970,26 +470,6 @@ class SyncHandler(object):
                 for e in sync_config.filter_collection.filter_room_state(state.values())
             })
 
-    def check_joined_room(self, sync_config, state_delta):
-        """
-        Check if the user has just joined the given room (so should
-        be given the full state)
-
-        Args:
-            sync_config(synapse.handlers.sync.SyncConfig):
-            state_delta(dict[(str,str), synapse.events.FrozenEvent]): the
-                difference in state since the last sync
-
-        Returns:
-             A deferred Tuple (state_delta, limited)
-        """
-        join_event = state_delta.get((
-            EventTypes.Member, sync_config.user.to_string()), None)
-        if join_event is not None:
-            if join_event.content["membership"] == Membership.JOIN:
-                return True
-        return False
-
     @defer.inlineCallbacks
     def unread_notifs_for_room_id(self, room_id, sync_config):
         with Measure(self.clock, "unread_notifs_for_room_id"):
@@ -1010,6 +490,551 @@ class SyncHandler(object):
             # count is whatever it was last time.
             defer.returnValue(None)
 
+    @defer.inlineCallbacks
+    def generate_sync_result(self, sync_config, since_token=None, full_state=False):
+        """Generates a sync result.
+
+        Args:
+            sync_config (SyncConfig)
+            since_token (StreamToken)
+            full_state (bool)
+
+        Returns:
+            Deferred(SyncResult)
+        """
+
+        # NB: The now_token gets changed by some of the generate_sync_* methods,
+        # this is due to some of the underlying streams not supporting the ability
+        # to query up to a given point.
+        # Always use the `now_token` in `SyncResultBuilder`
+        now_token = yield self.event_sources.get_current_token()
+
+        sync_result_builder = SyncResultBuilder(
+            sync_config, full_state,
+            since_token=since_token,
+            now_token=now_token,
+        )
+
+        account_data_by_room = yield self._generate_sync_entry_for_account_data(
+            sync_result_builder
+        )
+
+        res = yield self._generate_sync_entry_for_rooms(
+            sync_result_builder, account_data_by_room
+        )
+        newly_joined_rooms, newly_joined_users = res
+
+        yield self._generate_sync_entry_for_presence(
+            sync_result_builder, newly_joined_rooms, newly_joined_users
+        )
+
+        defer.returnValue(SyncResult(
+            presence=sync_result_builder.presence,
+            account_data=sync_result_builder.account_data,
+            joined=sync_result_builder.joined,
+            invited=sync_result_builder.invited,
+            archived=sync_result_builder.archived,
+            next_batch=sync_result_builder.now_token,
+        ))
+
+    @defer.inlineCallbacks
+    def _generate_sync_entry_for_account_data(self, sync_result_builder):
+        """Generates the account data portion of the sync response. Populates
+        `sync_result_builder` with the result.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+
+        Returns:
+            Deferred(dict): A dictionary containing the per room account data.
+        """
+        sync_config = sync_result_builder.sync_config
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+
+        if since_token and not sync_result_builder.full_state:
+            account_data, account_data_by_room = (
+                yield self.store.get_updated_account_data_for_user(
+                    user_id,
+                    since_token.account_data_key,
+                )
+            )
+
+            push_rules_changed = yield self.store.have_push_rules_changed_for_user(
+                user_id, int(since_token.push_rules_key)
+            )
+
+            if push_rules_changed:
+                account_data["m.push_rules"] = yield self.push_rules_for_user(
+                    sync_config.user
+                )
+        else:
+            account_data, account_data_by_room = (
+                yield self.store.get_account_data_for_user(
+                    sync_config.user.to_string()
+                )
+            )
+
+            account_data['m.push_rules'] = yield self.push_rules_for_user(
+                sync_config.user
+            )
+
+        account_data_for_user = sync_config.filter_collection.filter_account_data([
+            {"type": account_data_type, "content": content}
+            for account_data_type, content in account_data.items()
+        ])
+
+        sync_result_builder.account_data = account_data_for_user
+
+        defer.returnValue(account_data_by_room)
+
+    @defer.inlineCallbacks
+    def _generate_sync_entry_for_presence(self, sync_result_builder, newly_joined_rooms,
+                                          newly_joined_users):
+        """Generates the presence portion of the sync response. Populates the
+        `sync_result_builder` with the result.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            newly_joined_rooms(list): List of rooms that the user has joined
+                since the last sync (or empty if an initial sync)
+            newly_joined_users(list): List of users that have joined rooms
+                since the last sync (or empty if an initial sync)
+        """
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+        user = sync_result_builder.sync_config.user
+
+        presence_source = self.event_sources.sources["presence"]
+
+        since_token = sync_result_builder.since_token
+        if since_token and not sync_result_builder.full_state:
+            presence_key = since_token.presence_key
+            include_offline = True
+        else:
+            presence_key = None
+            include_offline = False
+
+        presence, presence_key = yield presence_source.get_new_events(
+            user=user,
+            from_key=presence_key,
+            is_guest=sync_config.is_guest,
+            include_offline=include_offline,
+        )
+        sync_result_builder.now_token = now_token.copy_and_replace(
+            "presence_key", presence_key
+        )
+
+        extra_users_ids = set(newly_joined_users)
+        for room_id in newly_joined_rooms:
+            users = yield self.store.get_users_in_room(room_id)
+            extra_users_ids.update(users)
+        extra_users_ids.discard(user.to_string())
+
+        states = yield self.presence_handler.get_states(
+            extra_users_ids,
+            as_event=True,
+        )
+        presence.extend(states)
+
+        # Deduplicate the presence entries so that there's at most one per user
+        presence = {p["content"]["user_id"]: p for p in presence}.values()
+
+        presence = sync_config.filter_collection.filter_presence(
+            presence
+        )
+
+        sync_result_builder.presence = presence
+
+    @defer.inlineCallbacks
+    def _generate_sync_entry_for_rooms(self, sync_result_builder, account_data_by_room):
+        """Generates the rooms portion of the sync response. Populates the
+        `sync_result_builder` with the result.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            account_data_by_room(dict): Dictionary of per room account data
+
+        Returns:
+            Deferred(tuple): Returns a 2-tuple of
+            `(newly_joined_rooms, newly_joined_users)`
+        """
+        user_id = sync_result_builder.sync_config.user.to_string()
+
+        now_token, ephemeral_by_room = yield self.ephemeral_by_room(
+            sync_result_builder.sync_config,
+            now_token=sync_result_builder.now_token,
+            since_token=sync_result_builder.since_token,
+        )
+        sync_result_builder.now_token = now_token
+
+        ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
+            "m.ignored_user_list", user_id=user_id,
+        )
+
+        if ignored_account_data:
+            ignored_users = ignored_account_data.get("ignored_users", {}).keys()
+        else:
+            ignored_users = frozenset()
+
+        if sync_result_builder.since_token:
+            res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
+            room_entries, invited, newly_joined_rooms = res
+
+            tags_by_room = yield self.store.get_updated_tags(
+                user_id,
+                sync_result_builder.since_token.account_data_key,
+            )
+        else:
+            res = yield self._get_all_rooms(sync_result_builder, ignored_users)
+            room_entries, invited, newly_joined_rooms = res
+
+            tags_by_room = yield self.store.get_tags_for_user(user_id)
+
+        def handle_room_entries(room_entry):
+            return self._generate_room_entry(
+                sync_result_builder,
+                ignored_users,
+                room_entry,
+                ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
+                tags=tags_by_room.get(room_entry.room_id),
+                account_data=account_data_by_room.get(room_entry.room_id, {}),
+                always_include=sync_result_builder.full_state,
+            )
+
+        yield concurrently_execute(handle_room_entries, room_entries, 10)
+
+        sync_result_builder.invited.extend(invited)
+
+        # Now we want to get any newly joined users
+        newly_joined_users = set()
+        if sync_result_builder.since_token:
+            for joined_sync in sync_result_builder.joined:
+                it = itertools.chain(
+                    joined_sync.timeline.events, joined_sync.state.values()
+                )
+                for event in it:
+                    if event.type == EventTypes.Member:
+                        if event.membership == Membership.JOIN:
+                            newly_joined_users.add(event.state_key)
+
+        defer.returnValue((newly_joined_rooms, newly_joined_users))
+
+    @defer.inlineCallbacks
+    def _get_rooms_changed(self, sync_result_builder, ignored_users):
+        """Gets the the changes that have happened since the last sync.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            ignored_users(set(str)): Set of users ignored by user.
+
+        Returns:
+            Deferred(tuple): Returns a tuple of the form:
+            `([RoomSyncResultBuilder], [InvitedSyncResult], newly_joined_rooms)`
+        """
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+
+        assert since_token
+
+        app_service = yield self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            rooms = yield self.store.get_app_service_rooms(app_service)
+            joined_room_ids = set(r.room_id for r in rooms)
+        else:
+            rooms = yield self.store.get_rooms_for_user(user_id)
+            joined_room_ids = set(r.room_id for r in rooms)
+
+        # Get a list of membership change events that have happened.
+        rooms_changed = yield self.store.get_membership_changes_for_user(
+            user_id, since_token.room_key, now_token.room_key
+        )
+
+        mem_change_events_by_room_id = {}
+        for event in rooms_changed:
+            mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
+
+        newly_joined_rooms = []
+        room_entries = []
+        invited = []
+        for room_id, events in mem_change_events_by_room_id.items():
+            non_joins = [e for e in events if e.membership != Membership.JOIN]
+            has_join = len(non_joins) != len(events)
+
+            # We want to figure out if we joined the room at some point since
+            # the last sync (even if we have since left). This is to make sure
+            # we do send down the room, and with full state, where necessary
+            if room_id in joined_room_ids or has_join:
+                old_state = yield self.get_state_at(room_id, since_token)
+                old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+                if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
+                    newly_joined_rooms.append(room_id)
+
+                if room_id in joined_room_ids:
+                    continue
+
+            if not non_joins:
+                continue
+
+            # Only bother if we're still currently invited
+            should_invite = non_joins[-1].membership == Membership.INVITE
+            if should_invite:
+                if event.sender not in ignored_users:
+                    room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+                    if room_sync:
+                        invited.append(room_sync)
+
+            # Always include leave/ban events. Just take the last one.
+            # TODO: How do we handle ban -> leave in same batch?
+            leave_events = [
+                e for e in non_joins
+                if e.membership in (Membership.LEAVE, Membership.BAN)
+            ]
+
+            if leave_events:
+                leave_event = leave_events[-1]
+                leave_stream_token = yield self.store.get_stream_token_for_event(
+                    leave_event.event_id
+                )
+                leave_token = since_token.copy_and_replace(
+                    "room_key", leave_stream_token
+                )
+
+                if since_token and since_token.is_after(leave_token):
+                    continue
+
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    rtype="archived",
+                    events=None,
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=since_token,
+                    upto_token=leave_token,
+                ))
+
+        timeline_limit = sync_config.filter_collection.timeline_limit()
+
+        # Get all events for rooms we're currently joined to.
+        room_to_events = yield self.store.get_room_events_stream_for_rooms(
+            room_ids=joined_room_ids,
+            from_key=since_token.room_key,
+            to_key=now_token.room_key,
+            limit=timeline_limit + 1,
+        )
+
+        # We loop through all room ids, even if there are no new events, in case
+        # there are non room events taht we need to notify about.
+        for room_id in joined_room_ids:
+            room_entry = room_to_events.get(room_id, None)
+
+            if room_entry:
+                events, start_key = room_entry
+
+                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
+
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    rtype="joined",
+                    events=events,
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=None if room_id in newly_joined_rooms else since_token,
+                    upto_token=prev_batch_token,
+                ))
+            else:
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=room_id,
+                    rtype="joined",
+                    events=[],
+                    newly_joined=room_id in newly_joined_rooms,
+                    full_state=False,
+                    since_token=since_token,
+                    upto_token=since_token,
+                ))
+
+        defer.returnValue((room_entries, invited, newly_joined_rooms))
+
+    @defer.inlineCallbacks
+    def _get_all_rooms(self, sync_result_builder, ignored_users):
+        """Returns entries for all rooms for the user.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            ignored_users(set(str)): Set of users ignored by user.
+
+        Returns:
+            Deferred(tuple): Returns a tuple of the form:
+            `([RoomSyncResultBuilder], [InvitedSyncResult], [])`
+        """
+
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+
+        membership_list = (
+            Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
+        )
+
+        room_list = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id=user_id,
+            membership_list=membership_list
+        )
+
+        room_entries = []
+        invited = []
+
+        for event in room_list:
+            if event.membership == Membership.JOIN:
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=event.room_id,
+                    rtype="joined",
+                    events=None,
+                    newly_joined=False,
+                    full_state=True,
+                    since_token=since_token,
+                    upto_token=now_token,
+                ))
+            elif event.membership == Membership.INVITE:
+                if event.sender in ignored_users:
+                    continue
+                invite = yield self.store.get_event(event.event_id)
+                invited.append(InvitedSyncResult(
+                    room_id=event.room_id,
+                    invite=invite,
+                ))
+            elif event.membership in (Membership.LEAVE, Membership.BAN):
+                # Always send down rooms we were banned or kicked from.
+                if not sync_config.filter_collection.include_leave:
+                    if event.membership == Membership.LEAVE:
+                        if user_id == event.sender:
+                            continue
+
+                leave_token = now_token.copy_and_replace(
+                    "room_key", "s%d" % (event.stream_ordering,)
+                )
+                room_entries.append(RoomSyncResultBuilder(
+                    room_id=event.room_id,
+                    rtype="archived",
+                    events=None,
+                    newly_joined=False,
+                    full_state=True,
+                    since_token=since_token,
+                    upto_token=leave_token,
+                ))
+
+        defer.returnValue((room_entries, invited, []))
+
+    @defer.inlineCallbacks
+    def _generate_room_entry(self, sync_result_builder, ignored_users,
+                             room_builder, ephemeral, tags, account_data,
+                             always_include=False):
+        """Populates the `joined` and `archived` section of `sync_result_builder`
+        based on the `room_builder`.
+
+        Args:
+            sync_result_builder(SyncResultBuilder)
+            ignored_users(set(str)): Set of users ignored by user.
+            room_builder(RoomSyncResultBuilder)
+            ephemeral(list): List of new ephemeral events for room
+            tags(list): List of *all* tags for room, or None if there has been
+                no change.
+            account_data(list): List of new account data for room
+            always_include(bool): Always include this room in the sync response,
+                even if empty.
+        """
+        newly_joined = room_builder.newly_joined
+        full_state = (
+            room_builder.full_state
+            or newly_joined
+            or sync_result_builder.full_state
+        )
+        events = room_builder.events
+
+        # We want to shortcut out as early as possible.
+        if not (always_include or account_data or ephemeral or full_state):
+            if events == [] and tags is None:
+                return
+
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+        sync_config = sync_result_builder.sync_config
+
+        room_id = room_builder.room_id
+        since_token = room_builder.since_token
+        upto_token = room_builder.upto_token
+
+        batch = yield self._load_filtered_recents(
+            room_id, sync_config,
+            now_token=upto_token,
+            since_token=since_token,
+            recents=events,
+            newly_joined_room=newly_joined,
+        )
+
+        account_data_events = []
+        if tags is not None:
+            account_data_events.append({
+                "type": "m.tag",
+                "content": {"tags": tags},
+            })
+
+        for account_data_type, content in account_data.items():
+            account_data_events.append({
+                "type": account_data_type,
+                "content": content,
+            })
+
+        account_data = sync_config.filter_collection.filter_room_account_data(
+            account_data_events
+        )
+
+        ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+
+        if not (always_include or batch or account_data or ephemeral or full_state):
+            return
+
+        state = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, now_token,
+            full_state=full_state
+        )
+
+        if room_builder.rtype == "joined":
+            unread_notifications = {}
+            room_sync = JoinedSyncResult(
+                room_id=room_id,
+                timeline=batch,
+                state=state,
+                ephemeral=ephemeral,
+                account_data=account_data_events,
+                unread_notifications=unread_notifications,
+            )
+
+            if room_sync or always_include:
+                notifs = yield self.unread_notifs_for_room_id(
+                    room_id, sync_config
+                )
+
+                if notifs is not None:
+                    unread_notifications["notification_count"] = notifs["notify_count"]
+                    unread_notifications["highlight_count"] = notifs["highlight_count"]
+
+                sync_result_builder.joined.append(room_sync)
+        elif room_builder.rtype == "archived":
+            room_sync = ArchivedSyncResult(
+                room_id=room_id,
+                timeline=batch,
+                state=state,
+                account_data=account_data,
+            )
+            if room_sync or always_include:
+                sync_result_builder.archived.append(room_sync)
+        else:
+            raise Exception("Unrecognized rtype: %r", room_builder.rtype)
+
 
 def _action_has_highlight(actions):
     for action in actions:
@@ -1057,3 +1082,51 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
         (e.type, e.state_key): e
         for e in evs
     }
+
+
+class SyncResultBuilder(object):
+    "Used to help build up a new SyncResult for a user"
+    def __init__(self, sync_config, full_state, since_token, now_token):
+        """
+        Args:
+            sync_config(SyncConfig)
+            full_state(bool): The full_state flag as specified by user
+            since_token(StreamToken): The token supplied by user, or None.
+            now_token(StreamToken): The token to sync up to.
+        """
+        self.sync_config = sync_config
+        self.full_state = full_state
+        self.since_token = since_token
+        self.now_token = now_token
+
+        self.presence = []
+        self.account_data = []
+        self.joined = []
+        self.invited = []
+        self.archived = []
+
+
+class RoomSyncResultBuilder(object):
+    """Stores information needed to create either a `JoinedSyncResult` or
+    `ArchivedSyncResult`.
+    """
+    def __init__(self, room_id, rtype, events, newly_joined, full_state,
+                 since_token, upto_token):
+        """
+        Args:
+            room_id(str)
+            rtype(str): One of `"joined"` or `"archived"`
+            events(list): List of events to include in the room, (more events
+                may be added when generating result).
+            newly_joined(bool): If the user has newly joined the room
+            full_state(bool): Whether the full state should be sent in result
+            since_token(StreamToken): Earliest point to return events from, or None
+            upto_token(StreamToken): Latest point to return events from.
+        """
+        self.room_id = room_id
+        self.rtype = rtype
+        self.events = events
+        self.newly_joined = newly_joined
+        self.full_state = full_state
+        self.since_token = since_token
+        self.upto_token = upto_token
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d46f05f426..3c54307bed 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
 
 # A tiny object useful for storing a user's membership in a room, as a mapping
 # key
-RoomMember = namedtuple("RoomMember", ("room_id", "user"))
+RoomMember = namedtuple("RoomMember", ("room_id", "user_id"))
 
 
 class TypingHandler(object):
@@ -38,7 +38,7 @@ class TypingHandler(object):
         self.store = hs.get_datastore()
         self.server_name = hs.config.server_name
         self.auth = hs.get_auth()
-        self.is_mine = hs.is_mine
+        self.is_mine_id = hs.is_mine_id
         self.notifier = hs.get_notifier()
 
         self.clock = hs.get_clock()
@@ -67,20 +67,23 @@ class TypingHandler(object):
 
     @defer.inlineCallbacks
     def started_typing(self, target_user, auth_user, room_id, timeout):
-        if not self.is_mine(target_user):
+        target_user_id = target_user.to_string()
+        auth_user_id = auth_user.to_string()
+
+        if not self.is_mine_id(target_user_id):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if target_user != auth_user:
+        if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
 
-        yield self.auth.check_joined_room(room_id, target_user.to_string())
+        yield self.auth.check_joined_room(room_id, target_user_id)
 
         logger.debug(
-            "%s has started typing in %s", target_user.to_string(), room_id
+            "%s has started typing in %s", target_user_id, room_id
         )
 
         until = self.clock.time_msec() + timeout
-        member = RoomMember(room_id=room_id, user=target_user)
+        member = RoomMember(room_id=room_id, user_id=target_user_id)
 
         was_present = member in self._member_typing_until
 
@@ -104,25 +107,28 @@ class TypingHandler(object):
 
         yield self._push_update(
             room_id=room_id,
-            user=target_user,
+            user_id=target_user_id,
             typing=True,
         )
 
     @defer.inlineCallbacks
     def stopped_typing(self, target_user, auth_user, room_id):
-        if not self.is_mine(target_user):
+        target_user_id = target_user.to_string()
+        auth_user_id = auth_user.to_string()
+
+        if not self.is_mine_id(target_user_id):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if target_user != auth_user:
+        if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
 
-        yield self.auth.check_joined_room(room_id, target_user.to_string())
+        yield self.auth.check_joined_room(room_id, target_user_id)
 
         logger.debug(
-            "%s has stopped typing in %s", target_user.to_string(), room_id
+            "%s has stopped typing in %s", target_user_id, room_id
         )
 
-        member = RoomMember(room_id=room_id, user=target_user)
+        member = RoomMember(room_id=room_id, user_id=target_user_id)
 
         if member in self._member_typing_timer:
             self.clock.cancel_call_later(self._member_typing_timer[member])
@@ -132,8 +138,9 @@ class TypingHandler(object):
 
     @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
-        if self.is_mine(user):
-            member = RoomMember(room_id=room_id, user=user)
+        user_id = user.to_string()
+        if self.is_mine_id(user_id):
+            member = RoomMember(room_id=room_id, user=user_id)
             yield self._stopped_typing(member)
 
     @defer.inlineCallbacks
@@ -144,7 +151,7 @@ class TypingHandler(object):
 
         yield self._push_update(
             room_id=member.room_id,
-            user=member.user,
+            user_id=member.user_id,
             typing=False,
         )
 
@@ -156,7 +163,7 @@ class TypingHandler(object):
             del self._member_typing_timer[member]
 
     @defer.inlineCallbacks
-    def _push_update(self, room_id, user, typing):
+    def _push_update(self, room_id, user_id, typing):
         domains = yield self.store.get_joined_hosts_for_room(room_id)
 
         deferreds = []
@@ -164,7 +171,7 @@ class TypingHandler(object):
             if domain == self.server_name:
                 self._push_update_local(
                     room_id=room_id,
-                    user=user,
+                    user_id=user_id,
                     typing=typing
                 )
             else:
@@ -173,7 +180,7 @@ class TypingHandler(object):
                     edu_type="m.typing",
                     content={
                         "room_id": room_id,
-                        "user_id": user.to_string(),
+                        "user_id": user_id,
                         "typing": typing,
                     },
                 ))
@@ -183,23 +190,26 @@ class TypingHandler(object):
     @defer.inlineCallbacks
     def _recv_edu(self, origin, content):
         room_id = content["room_id"]
-        user = UserID.from_string(content["user_id"])
+        user_id = content["user_id"]
+
+        # Check that the string is a valid user id
+        UserID.from_string(user_id)
 
         domains = yield self.store.get_joined_hosts_for_room(room_id)
 
         if self.server_name in domains:
             self._push_update_local(
                 room_id=room_id,
-                user=user,
+                user_id=user_id,
                 typing=content["typing"]
             )
 
-    def _push_update_local(self, room_id, user, typing):
+    def _push_update_local(self, room_id, user_id, typing):
         room_set = self._room_typing.setdefault(room_id, set())
         if typing:
-            room_set.add(user)
+            room_set.add(user_id)
         else:
-            room_set.discard(user)
+            room_set.discard(user_id)
 
         self._latest_room_serial += 1
         self._room_serials[room_id] = self._latest_room_serial
@@ -215,9 +225,7 @@ class TypingHandler(object):
         for room_id, serial in self._room_serials.items():
             if last_id < serial and serial <= current_id:
                 typing = self._room_typing[room_id]
-                typing_bytes = json.dumps([
-                    u.to_string() for u in typing
-                ], ensure_ascii=False)
+                typing_bytes = json.dumps(list(typing), ensure_ascii=False)
                 rows.append((serial, room_id, typing_bytes))
         rows.sort()
         return rows
@@ -239,7 +247,7 @@ class TypingNotificationEventSource(object):
             "type": "m.typing",
             "room_id": room_id,
             "content": {
-                "user_ids": [u.to_string() for u in typing],
+                "user_ids": list(typing),
             },
         }
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 33b79c0ec7..cbec4d30ae 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -140,8 +140,6 @@ class Notifier(object):
     UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
 
     def __init__(self, hs):
-        self.hs = hs
-
         self.user_to_user_stream = {}
         self.room_to_user_streams = {}
         self.appservice_to_user_streams = {}
@@ -151,6 +149,8 @@ class Notifier(object):
         self.pending_new_room_events = []
 
         self.clock = hs.get_clock()
+        self.appservice_handler = hs.get_application_service_handler()
+        self.state_handler = hs.get_state_handler()
 
         hs.get_distributor().observe(
             "user_joined_room", self._user_joined_room
@@ -232,9 +232,7 @@ class Notifier(object):
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
-        self.hs.get_handlers().appservice_handler.notify_interested_services(
-            event
-        )
+        self.appservice_handler.notify_interested_services(event)
 
         app_streams = set()
 
@@ -449,7 +447,7 @@ class Notifier(object):
 
     @defer.inlineCallbacks
     def _is_world_readable(self, room_id):
-        state = yield self.hs.get_state_handler().get_current_state(
+        state = yield self.state_handler.get_current_state(
             room_id,
             EventTypes.RoomHistoryVisibility
         )
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index a72cba8306..12a3ec7fd8 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -44,7 +44,8 @@ THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000)
 
 # does each email include all unread notifs, or just the ones which have happened
 # since the last mail?
-INCLUDE_ALL_UNREAD_NOTIFS = True
+# XXX: this is currently broken as it includes ones from parted rooms(!)
+INCLUDE_ALL_UNREAD_NOTIFS = False
 
 
 class EmailPusher(object):
@@ -72,7 +73,12 @@ class EmailPusher(object):
         self.processing = False
 
         if self.hs.config.email_enable_notifs:
-            self.mailer = Mailer(self.hs)
+            if 'data' in pusherdict and 'brand' in pusherdict['data']:
+                app_name = pusherdict['data']['brand']
+            else:
+                app_name = self.hs.config.email_app_name
+
+            self.mailer = Mailer(self.hs, app_name)
         else:
             self.mailer = None
 
@@ -273,5 +279,5 @@ class EmailPusher(object):
         logger.info("Sending notif email for user %r", self.user_id)
 
         yield self.mailer.send_notification_mail(
-            self.user_id, self.email, push_actions, reason
+            self.app_id, self.user_id, self.email, push_actions, reason
         )
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 3ae92d1574..c3431cdbf2 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -41,7 +41,7 @@ logger = logging.getLogger(__name__)
 
 
 MESSAGE_FROM_PERSON_IN_ROOM = "You have a message on %(app)s from %(person)s " \
-                              "in the %s room..."
+                              "in the %(room)s room..."
 MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..."
 MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..."
 MESSAGES_IN_ROOM = "You have messages on %(app)s in the %(room)s room..."
@@ -78,12 +78,14 @@ ALLOWED_ATTRS = {
 
 
 class Mailer(object):
-    def __init__(self, hs):
+    def __init__(self, hs, app_name):
         self.hs = hs
         self.store = self.hs.get_datastore()
+        self.auth_handler = self.hs.get_auth_handler()
         self.state_handler = self.hs.get_state_handler()
         loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir)
-        self.app_name = self.hs.config.email_app_name
+        self.app_name = app_name
+        logger.info("Created Mailer for app_name %s" % app_name)
         env = jinja2.Environment(loader=loader)
         env.filters["format_ts"] = format_ts_filter
         env.filters["mxc_to_http"] = self.mxc_to_http_filter
@@ -95,7 +97,8 @@ class Mailer(object):
         )
 
     @defer.inlineCallbacks
-    def send_notification_mail(self, user_id, email_address, push_actions, reason):
+    def send_notification_mail(self, app_id, user_id, email_address,
+                               push_actions, reason):
         raw_from = email.utils.parseaddr(self.hs.config.email_notif_from)[1]
         raw_to = email.utils.parseaddr(email_address)[1]
 
@@ -122,6 +125,8 @@ class Mailer(object):
             user_display_name = yield self.store.get_profile_displayname(
                 UserID.from_string(user_id).localpart
             )
+            if user_display_name is None:
+                user_display_name = user_id
         except StoreError:
             user_display_name = user_id
 
@@ -157,7 +162,9 @@ class Mailer(object):
 
         template_vars = {
             "user_display_name": user_display_name,
-            "unsubscribe_link": self.make_unsubscribe_link(),
+            "unsubscribe_link": self.make_unsubscribe_link(
+                user_id, app_id, email_address
+            ),
             "summary_text": summary_text,
             "app_name": self.app_name,
             "rooms": rooms,
@@ -423,9 +430,18 @@ class Mailer(object):
                 notif['room_id'], notif['event_id']
             )
 
-    def make_unsubscribe_link(self):
-        # XXX: matrix.to
-        return "https://vector.im/#/settings"
+    def make_unsubscribe_link(self, user_id, app_id, email_address):
+        params = {
+            "access_token": self.auth_handler.generate_delete_pusher_token(user_id),
+            "app_id": app_id,
+            "pushkey": email_address,
+        }
+
+        # XXX: make r0 once API is stable
+        return "%s_matrix/client/unstable/pushers/remove?%s" % (
+            self.hs.config.public_baseurl,
+            urllib.urlencode(params),
+        )
 
     def mxc_to_http_filter(self, value, width, height, resize_method="crop"):
         if value[0:6] != "mxc://":
diff --git a/synapse/replication/presence_resource.py b/synapse/replication/presence_resource.py
new file mode 100644
index 0000000000..fc18130ab4
--- /dev/null
+++ b/synapse/replication/presence_resource.py
@@ -0,0 +1,59 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.server import respond_with_json_bytes, request_handler
+from synapse.http.servlet import parse_json_object_from_request
+
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
+from twisted.internet import defer
+
+
+class PresenceResource(Resource):
+    """
+    HTTP endpoint for marking users as syncing.
+
+    POST /_synapse/replication/presence HTTP/1.1
+    Content-Type: application/json
+
+    {
+        "process_id": "<process_id>",
+        "syncing_users": ["<user_id>"]
+    }
+    """
+
+    def __init__(self, hs):
+        Resource.__init__(self)  # Resource is old-style, so no super()
+
+        self.version_string = hs.version_string
+        self.clock = hs.get_clock()
+        self.presence_handler = hs.get_presence_handler()
+
+    def render_POST(self, request):
+        self._async_render_POST(request)
+        return NOT_DONE_YET
+
+    @request_handler()
+    @defer.inlineCallbacks
+    def _async_render_POST(self, request):
+        content = parse_json_object_from_request(request)
+
+        process_id = content["process_id"]
+        syncing_user_ids = content["syncing_users"]
+
+        yield self.presence_handler.update_external_syncs(
+            process_id, set(syncing_user_ids)
+        )
+
+        respond_with_json_bytes(request, 200, "{}")
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 847f212a3d..8c2d487ff4 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -16,6 +16,7 @@
 from synapse.http.servlet import parse_integer, parse_string
 from synapse.http.server import request_handler, finish_request
 from synapse.replication.pusher_resource import PusherResource
+from synapse.replication.presence_resource import PresenceResource
 
 from twisted.web.resource import Resource
 from twisted.web.server import NOT_DONE_YET
@@ -115,6 +116,7 @@ class ReplicationResource(Resource):
         self.clock = hs.get_clock()
 
         self.putChild("remove_pushers", PusherResource(hs))
+        self.putChild("syncing_users", PresenceResource(hs))
 
     def render_GET(self, request):
         self._async_render_GET(request)
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 3b5544851b..8df9d10efa 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -58,6 +58,7 @@ class LoginRestServlet(ClientV1RestServlet):
         self.cas_required_attributes = hs.config.cas_required_attributes
         self.servername = hs.config.server_name
         self.http_client = hs.get_simple_http_client()
+        self.auth_handler = self.hs.get_auth_handler()
 
     def on_GET(self, request):
         flows = []
@@ -143,7 +144,7 @@ class LoginRestServlet(ClientV1RestServlet):
                 user_id, self.hs.hostname
             ).to_string()
 
-        auth_handler = self.handlers.auth_handler
+        auth_handler = self.auth_handler
         user_id, access_token, refresh_token = yield auth_handler.login_with_password(
             user_id=user_id,
             password=login_submission["password"])
@@ -160,7 +161,7 @@ class LoginRestServlet(ClientV1RestServlet):
     @defer.inlineCallbacks
     def do_token_login(self, login_submission):
         token = login_submission['token']
-        auth_handler = self.handlers.auth_handler
+        auth_handler = self.auth_handler
         user_id = (
             yield auth_handler.validate_short_term_login_token_and_get_user_id(token)
         )
@@ -194,7 +195,7 @@ class LoginRestServlet(ClientV1RestServlet):
                     raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED)
 
         user_id = UserID.create(user, self.hs.hostname).to_string()
-        auth_handler = self.handlers.auth_handler
+        auth_handler = self.auth_handler
         user_exists = yield auth_handler.does_user_exist(user_id)
         if user_exists:
             user_id, access_token, refresh_token = (
@@ -243,7 +244,7 @@ class LoginRestServlet(ClientV1RestServlet):
             raise LoginError(401, "Invalid JWT", errcode=Codes.UNAUTHORIZED)
 
         user_id = UserID.create(user, self.hs.hostname).to_string()
-        auth_handler = self.handlers.auth_handler
+        auth_handler = self.auth_handler
         user_exists = yield auth_handler.does_user_exist(user_id)
         if user_exists:
             user_id, access_token, refresh_token = (
@@ -412,7 +413,7 @@ class CasTicketServlet(ClientV1RestServlet):
                     raise LoginError(401, "Unauthorized", errcode=Codes.UNAUTHORIZED)
 
         user_id = UserID.create(user, self.hs.hostname).to_string()
-        auth_handler = self.handlers.auth_handler
+        auth_handler = self.auth_handler
         user_exists = yield auth_handler.does_user_exist(user_id)
         if not user_exists:
             user_id, _ = (
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index ab928a16da..9a2ed6ed88 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -17,7 +17,11 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, Codes
 from synapse.push import PusherConfigException
-from synapse.http.servlet import parse_json_object_from_request
+from synapse.http.servlet import (
+    parse_json_object_from_request, parse_string, RestServlet
+)
+from synapse.http.server import finish_request
+from synapse.api.errors import StoreError
 
 from .base import ClientV1RestServlet, client_path_patterns
 
@@ -136,6 +140,57 @@ class PushersSetRestServlet(ClientV1RestServlet):
         return 200, {}
 
 
+class PushersRemoveRestServlet(RestServlet):
+    """
+    To allow pusher to be delete by clicking a link (ie. GET request)
+    """
+    PATTERNS = client_path_patterns("/pushers/remove$")
+    SUCCESS_HTML = "<html><body>You have been unsubscribed</body><html>"
+
+    def __init__(self, hs):
+        super(RestServlet, self).__init__()
+        self.hs = hs
+        self.notifier = hs.get_notifier()
+        self.auth = hs.get_v1auth()
+
+    @defer.inlineCallbacks
+    def on_GET(self, request):
+        requester = yield self.auth.get_user_by_req(request, rights="delete_pusher")
+        user = requester.user
+
+        app_id = parse_string(request, "app_id", required=True)
+        pushkey = parse_string(request, "pushkey", required=True)
+
+        pusher_pool = self.hs.get_pusherpool()
+
+        try:
+            yield pusher_pool.remove_pusher(
+                app_id=app_id,
+                pushkey=pushkey,
+                user_id=user.to_string(),
+            )
+        except StoreError as se:
+            if se.code != 404:
+                # This is fine: they're already unsubscribed
+                raise
+
+        self.notifier.on_new_replication_data()
+
+        request.setResponseCode(200)
+        request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
+        request.setHeader(b"Server", self.hs.version_string)
+        request.setHeader(b"Content-Length", b"%d" % (
+            len(PushersRemoveRestServlet.SUCCESS_HTML),
+        ))
+        request.write(PushersRemoveRestServlet.SUCCESS_HTML)
+        finish_request(request)
+        defer.returnValue(None)
+
+    def on_OPTIONS(self, _):
+        return 200, {}
+
+
 def register_servlets(hs, http_server):
     PushersRestServlet(hs).register(http_server)
     PushersSetRestServlet(hs).register(http_server)
+    PushersRemoveRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 644aa4e513..db52a1fc39 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -279,8 +279,9 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, request):
-        handler = self.handlers.room_list_handler
-        data = yield handler.get_public_room_list()
+        handler = self.hs.get_room_list_handler()
+        data = yield handler.get_aggregated_public_room_list()
+
         defer.returnValue((200, data))
 
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index c88c270537..9a84873a5f 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -35,7 +35,7 @@ class PasswordRestServlet(RestServlet):
         super(PasswordRestServlet, self).__init__()
         self.hs = hs
         self.auth = hs.get_auth()
-        self.auth_handler = hs.get_handlers().auth_handler
+        self.auth_handler = hs.get_auth_handler()
 
     @defer.inlineCallbacks
     def on_POST(self, request):
@@ -97,7 +97,7 @@ class ThreepidRestServlet(RestServlet):
         self.hs = hs
         self.identity_handler = hs.get_handlers().identity_handler
         self.auth = hs.get_auth()
-        self.auth_handler = hs.get_handlers().auth_handler
+        self.auth_handler = hs.get_auth_handler()
 
     @defer.inlineCallbacks
     def on_GET(self, request):
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index 78181b7b18..58d3cad6a1 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -104,7 +104,7 @@ class AuthRestServlet(RestServlet):
         super(AuthRestServlet, self).__init__()
         self.hs = hs
         self.auth = hs.get_auth()
-        self.auth_handler = hs.get_handlers().auth_handler
+        self.auth_handler = hs.get_auth_handler()
         self.registration_handler = hs.get_handlers().registration_handler
 
     @defer.inlineCallbacks
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 1ecc02d94d..2088c316d1 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -49,7 +49,7 @@ class RegisterRestServlet(RestServlet):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
-        self.auth_handler = hs.get_handlers().auth_handler
+        self.auth_handler = hs.get_auth_handler()
         self.registration_handler = hs.get_handlers().registration_handler
         self.identity_handler = hs.get_handlers().identity_handler
 
diff --git a/synapse/rest/client/v2_alpha/tokenrefresh.py b/synapse/rest/client/v2_alpha/tokenrefresh.py
index a158c2209a..8270e8787f 100644
--- a/synapse/rest/client/v2_alpha/tokenrefresh.py
+++ b/synapse/rest/client/v2_alpha/tokenrefresh.py
@@ -38,7 +38,7 @@ class TokenRefreshRestServlet(RestServlet):
         body = parse_json_object_from_request(request)
         try:
             old_refresh_token = body["refresh_token"]
-            auth_handler = self.hs.get_handlers().auth_handler
+            auth_handler = self.hs.get_auth_handler()
             (user_id, new_refresh_token) = yield self.store.exchange_refresh_token(
                 old_refresh_token, auth_handler.generate_refresh_token)
             new_access_token = yield auth_handler.issue_access_token(user_id)
diff --git a/synapse/server.py b/synapse/server.py
index 01f828819f..dd4b81c658 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -22,6 +22,8 @@
 from twisted.web.client import BrowserLikePolicyForHTTPS
 from twisted.enterprise import adbapi
 
+from synapse.appservice.scheduler import ApplicationServiceScheduler
+from synapse.appservice.api import ApplicationServiceApi
 from synapse.federation import initialize_http_replication
 from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory
 from synapse.notifier import Notifier
@@ -30,6 +32,9 @@ from synapse.handlers import Handlers
 from synapse.handlers.presence import PresenceHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import TypingHandler
+from synapse.handlers.room import RoomListHandler
+from synapse.handlers.auth import AuthHandler
+from synapse.handlers.appservice import ApplicationServicesHandler
 from synapse.state import StateHandler
 from synapse.storage import DataStore
 from synapse.util import Clock
@@ -84,6 +89,11 @@ class HomeServer(object):
         'presence_handler',
         'sync_handler',
         'typing_handler',
+        'room_list_handler',
+        'auth_handler',
+        'application_service_api',
+        'application_service_scheduler',
+        'application_service_handler',
         'notifier',
         'distributor',
         'client_resource',
@@ -179,6 +189,21 @@ class HomeServer(object):
     def build_sync_handler(self):
         return SyncHandler(self)
 
+    def build_room_list_handler(self):
+        return RoomListHandler(self)
+
+    def build_auth_handler(self):
+        return AuthHandler(self)
+
+    def build_application_service_api(self):
+        return ApplicationServiceApi(self)
+
+    def build_application_service_scheduler(self):
+        return ApplicationServiceScheduler(self)
+
+    def build_application_service_handler(self):
+        return ApplicationServicesHandler(self)
+
     def build_event_sources(self):
         return EventSources(self)
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d970fde9e8..8581796b7e 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
 from .appservice import (
     ApplicationServiceStore, ApplicationServiceTransactionStore
 )
-from ._base import Cache
+from ._base import Cache, LoggingTransaction
 from .directory import DirectoryStore
 from .events import EventsStore
 from .presence import PresenceStore, UserPresenceState
@@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore,
 
     def __init__(self, db_conn, hs):
         self.hs = hs
+        self._clock = hs.get_clock()
         self.database_engine = hs.database_engine
 
         self.client_ip_last_seen = Cache(
@@ -173,6 +174,19 @@ class DataStore(RoomMemberStore, RoomStore,
             prefilled_cache=push_rules_prefill,
         )
 
+        cur = LoggingTransaction(
+            db_conn.cursor(),
+            name="_find_stream_orderings_for_times_txn",
+            database_engine=self.database_engine,
+            after_callbacks=[]
+        )
+        self._find_stream_orderings_for_times_txn(cur)
+        cur.close()
+
+        self.find_stream_orderings_looping_call = self._clock.looping_call(
+            self._find_stream_orderings_for_times, 60 * 60 * 1000
+        )
+
         super(DataStore, self).__init__(hs)
 
     def take_presence_startup_info(self):
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e0d7098692..32c6677d47 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -152,8 +152,8 @@ class SQLBaseStore(object):
 
     def __init__(self, hs):
         self.hs = hs
-        self._db_pool = hs.get_db_pool()
         self._clock = hs.get_clock()
+        self._db_pool = hs.get_db_pool()
 
         self._previous_txn_total_time = 0
         self._current_txn_total_time = 0
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 9705db5c47..940e11d7a2 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,6 +24,10 @@ logger = logging.getLogger(__name__)
 
 
 class EventPushActionsStore(SQLBaseStore):
+    def __init__(self, hs):
+        self.stream_ordering_month_ago = None
+        super(EventPushActionsStore, self).__init__(hs)
+
     def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
         """
         Args:
@@ -115,7 +119,8 @@ class EventPushActionsStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_unread_push_actions_for_user_in_range(self, user_id,
                                                   min_stream_ordering,
-                                                  max_stream_ordering=None):
+                                                  max_stream_ordering=None,
+                                                  limit=20):
         def get_after_receipt(txn):
             sql = (
                 "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, "
@@ -147,7 +152,8 @@ class EventPushActionsStore(SQLBaseStore):
             if max_stream_ordering is not None:
                 sql += " AND ep.stream_ordering <= ?"
                 args.append(max_stream_ordering)
-            sql += " ORDER BY ep.stream_ordering ASC"
+            sql += " ORDER BY ep.stream_ordering ASC LIMIT ?"
+            args.append(limit)
             txn.execute(sql, args)
             return txn.fetchall()
         after_read_receipt = yield self.runInteraction(
@@ -224,18 +230,93 @@ class EventPushActionsStore(SQLBaseStore):
             (room_id, event_id)
         )
 
-    def _remove_push_actions_before_txn(self, txn, room_id, user_id,
-                                        topological_ordering):
+    def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
+                                            topological_ordering):
+        """
+        Purges old, stale push actions for a user and room before a given
+        topological_ordering
+        Args:
+            txn: The transcation
+            room_id: Room ID to delete from
+            user_id: user ID to delete for
+            topological_ordering: The lowest topological ordering which will
+                                  not be deleted.
+        """
         txn.call_after(
             self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
             (room_id, user_id, )
         )
+
+        # We need to join on the events table to get the received_ts for
+        # event_push_actions and sqlite won't let us use a join in a delete so
+        # we can't just delete where received_ts < x. Furthermore we can
+        # only identify event_push_actions by a tuple of room_id, event_id
+        # we we can't use a subquery.
+        # Instead, we look up the stream ordering for the last event in that
+        # room received before the threshold time and delete event_push_actions
+        # in the room with a stream_odering before that.
         txn.execute(
-            "DELETE FROM event_push_actions"
-            " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
-            (room_id, user_id, topological_ordering,)
+            "DELETE FROM event_push_actions "
+            " WHERE user_id = ? AND room_id = ? AND "
+            " topological_ordering < ? AND stream_ordering < ?",
+            (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
+        )
+
+    @defer.inlineCallbacks
+    def _find_stream_orderings_for_times(self):
+        yield self.runInteraction(
+            "_find_stream_orderings_for_times",
+            self._find_stream_orderings_for_times_txn
+        )
+
+    def _find_stream_orderings_for_times_txn(self, txn):
+        logger.info("Searching for stream ordering 1 month ago")
+        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 month ago: it's %d",
+            self.stream_ordering_month_ago
         )
 
+    def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
+        """
+        Find the stream_ordering of the first event that was received after
+        a given timestamp. This is relatively slow as there is no index on
+        received_ts but we can then use this to delete push actions before
+        this.
+
+        received_ts must necessarily be in the same order as stream_ordering
+        and stream_ordering is indexed, so we manually binary search using
+        stream_ordering
+        """
+        txn.execute("SELECT MAX(stream_ordering) FROM events")
+        max_stream_ordering = txn.fetchone()[0]
+
+        if max_stream_ordering is None:
+            return 0
+
+        range_start = 0
+        range_end = max_stream_ordering
+
+        sql = (
+            "SELECT received_ts FROM events"
+            " WHERE stream_ordering > ?"
+            " ORDER BY stream_ordering"
+            " LIMIT 1"
+        )
+
+        while range_end - range_start > 1:
+            middle = int((range_end + range_start) / 2)
+            txn.execute(sql, (middle,))
+            middle_ts = txn.fetchone()[0]
+            if ts > middle_ts:
+                range_start = middle
+            else:
+                range_end = middle
+
+        return range_end
+
 
 def _action_has_highlight(actions):
     for action in actions:
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index f285f59afd..ebb97c8474 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -66,7 +66,10 @@ class PushRuleStore(SQLBaseStore):
         if not user_ids:
             defer.returnValue({})
 
-        results = {}
+        results = {
+            user_id: []
+            for user_id in user_ids
+        }
 
         rows = yield self._simple_select_many_batch(
             table="push_rules",
@@ -90,7 +93,10 @@ class PushRuleStore(SQLBaseStore):
         if not user_ids:
             defer.returnValue({})
 
-        results = {}
+        results = {
+            user_id: {}
+            for user_id in user_ids
+        }
 
         rows = yield self._simple_select_many_batch(
             table="push_rules_enable",
@@ -100,7 +106,8 @@ class PushRuleStore(SQLBaseStore):
             desc="bulk_get_push_rules_enabled",
         )
         for row in rows:
-            results.setdefault(row['user_name'], {})[row['rule_id']] = row['enabled']
+            enabled = bool(row['enabled'])
+            results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
         defer.returnValue(results)
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 964f30dff7..8c26f39fbb 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -321,7 +321,7 @@ class ReceiptsStore(SQLBaseStore):
         )
 
         if receipt_type == "m.read" and topological_ordering:
-            self._remove_push_actions_before_txn(
+            self._remove_old_push_actions_before_txn(
                 txn,
                 room_id=room_id,
                 user_id=user_id,
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index b417e3ac08..5b7d8d1ab5 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from synapse.storage.appservice import ApplicationServiceStore
+from synapse.config.appservice import load_appservices
 
 
 logger = logging.getLogger(__name__)
@@ -38,7 +38,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
         logger.warning("Could not get app_service_config_files from config")
         pass
 
-    appservices = ApplicationServiceStore.load_appservices(
+    appservices = load_appservices(
         config.server_name, config_files
     )
 
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index b10f2a5787..ea6823f18d 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -19,17 +19,24 @@ from ._base import SQLBaseStore
 
 from unpaddedbase64 import encode_base64
 from synapse.crypto.event_signing import compute_event_reference_hash
+from synapse.util.caches.descriptors import cached, cachedList
 
 
 class SignatureStore(SQLBaseStore):
     """Persistence for event signatures and hashes"""
 
+    @cached(lru=True)
+    def get_event_reference_hash(self, event_id):
+        return self._get_event_reference_hashes_txn(event_id)
+
+    @cachedList(cached_method_name="get_event_reference_hash",
+                list_name="event_ids", num_args=1)
     def get_event_reference_hashes(self, event_ids):
         def f(txn):
-            return [
-                self._get_event_reference_hashes_txn(txn, ev)
-                for ev in event_ids
-            ]
+            return {
+                event_id: self._get_event_reference_hashes_txn(txn, event_id)
+                for event_id in event_ids
+            }
 
         return self.runInteraction(
             "get_event_reference_hashes",
@@ -41,15 +48,15 @@ class SignatureStore(SQLBaseStore):
         hashes = yield self.get_event_reference_hashes(
             event_ids
         )
-        hashes = [
-            {
+        hashes = {
+            e_id: {
                 k: encode_base64(v) for k, v in h.items()
                 if k == "sha256"
             }
-            for h in hashes
-        ]
+            for e_id, h in hashes.items()
+        }
 
-        defer.returnValue(zip(event_ids, hashes))
+        defer.returnValue(hashes.items())
 
     def _get_event_reference_hashes_txn(self, txn, event_id):
         """Get all the hashes for a given PDU.