summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py57
-rwxr-xr-xsynapse/app/homeserver.py6
-rw-r--r--synapse/config/repository.py3
-rw-r--r--synapse/federation/federation_server.py15
-rw-r--r--synapse/federation/transport/server.py12
-rw-r--r--synapse/handlers/_base.py48
-rw-r--r--synapse/handlers/auth.py2
-rw-r--r--synapse/handlers/directory.py47
-rw-r--r--synapse/handlers/federation.py97
-rw-r--r--synapse/handlers/message.py8
-rw-r--r--synapse/handlers/presence.py116
-rw-r--r--synapse/handlers/profile.py17
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/register.py16
-rw-r--r--synapse/handlers/room.py178
-rw-r--r--synapse/handlers/sync.py6
-rw-r--r--synapse/handlers/typing.py14
-rw-r--r--synapse/notifier.py48
-rw-r--r--synapse/push/baserules.py57
-rw-r--r--synapse/python_dependencies.py2
-rw-r--r--synapse/replication/__init__.py14
-rw-r--r--synapse/replication/resource.py320
-rw-r--r--synapse/rest/client/v1/directory.py9
-rw-r--r--synapse/rest/client/v1/login.py6
-rw-r--r--synapse/rest/client/v1/profile.py4
-rw-r--r--synapse/rest/client/v1/push_rule.py41
-rw-r--r--synapse/rest/client/v1/room.py24
-rw-r--r--synapse/storage/__init__.py14
-rw-r--r--synapse/storage/account_data.py44
-rw-r--r--synapse/storage/appservice.py34
-rw-r--r--synapse/storage/directory.py15
-rw-r--r--synapse/storage/engines/__init__.py5
-rw-r--r--synapse/storage/engines/postgres.py5
-rw-r--r--synapse/storage/engines/sqlite3.py5
-rw-r--r--synapse/storage/events.py51
-rw-r--r--synapse/storage/prepare_database.py13
-rw-r--r--synapse/storage/presence.py31
-rw-r--r--synapse/storage/push_rule.py29
-rw-r--r--synapse/storage/pusher.py2
-rw-r--r--synapse/storage/receipts.py20
-rw-r--r--synapse/storage/registration.py50
-rw-r--r--synapse/storage/schema/delta/30/alias_creator.sql16
-rw-r--r--synapse/storage/schema/delta/30/as_users.py59
-rw-r--r--synapse/storage/schema/delta/30/threepid_guest_access_tokens.sql24
-rw-r--r--synapse/storage/state.py2
-rw-r--r--synapse/storage/tags.py61
-rw-r--r--synapse/storage/transactions.py2
-rw-r--r--synapse/storage/util/id_generators.py69
-rw-r--r--synapse/util/caches/descriptors.py6
-rw-r--r--synapse/util/caches/expiringcache.py16
-rw-r--r--synapse/util/caches/stream_change_cache.py22
51 files changed, 1458 insertions, 306 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index e2f84c4d57..183245443c 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -434,31 +434,46 @@ class Auth(object):
 
         if event.user_id != invite_event.user_id:
             return False
-        try:
-            public_key = invite_event.content["public_key"]
-            if signed["mxid"] != event.state_key:
-                return False
-            if signed["token"] != token:
-                return False
-            for server, signature_block in signed["signatures"].items():
-                for key_name, encoded_signature in signature_block.items():
-                    if not key_name.startswith("ed25519:"):
-                        return False
-                    verify_key = decode_verify_key_bytes(
-                        key_name,
-                        decode_base64(public_key)
-                    )
-                    verify_signed_json(signed, server, verify_key)
 
-                    # We got the public key from the invite, so we know that the
-                    # correct server signed the signed bundle.
-                    # The caller is responsible for checking that the signing
-                    # server has not revoked that public key.
-                    return True
+        if signed["mxid"] != event.state_key:
             return False
-        except (KeyError, SignatureVerifyException,):
+        if signed["token"] != token:
             return False
 
+        for public_key_object in self.get_public_keys(invite_event):
+            public_key = public_key_object["public_key"]
+            try:
+                for server, signature_block in signed["signatures"].items():
+                    for key_name, encoded_signature in signature_block.items():
+                        if not key_name.startswith("ed25519:"):
+                            continue
+                        verify_key = decode_verify_key_bytes(
+                            key_name,
+                            decode_base64(public_key)
+                        )
+                        verify_signed_json(signed, server, verify_key)
+
+                        # We got the public key from the invite, so we know that the
+                        # correct server signed the signed bundle.
+                        # The caller is responsible for checking that the signing
+                        # server has not revoked that public key.
+                        return True
+            except (KeyError, SignatureVerifyException,):
+                continue
+        return False
+
+    def get_public_keys(self, invite_event):
+        public_keys = []
+        if "public_key" in invite_event.content:
+            o = {
+                "public_key": invite_event.content["public_key"],
+            }
+            if "key_validity_url" in invite_event.content:
+                o["key_validity_url"] = invite_event.content["key_validity_url"]
+            public_keys.append(o)
+        public_keys.extend(invite_event.content.get("public_keys", []))
+        return public_keys
+
     def _get_power_level_event(self, auth_events):
         key = (EventTypes.PowerLevels, "", )
         return auth_events.get(key)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 2b4be7bdd0..021dc1d610 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -63,6 +63,7 @@ from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.util.logcontext import LoggingContext
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
 from synapse.federation.transport.server import TransportLayerServer
 
 from synapse import events
@@ -169,6 +170,9 @@ class SynapseHomeServer(HomeServer):
                 if name == "metrics" and self.get_config().enable_metrics:
                     resources[METRICS_PREFIX] = MetricsResource(self)
 
+                if name == "replication":
+                    resources[REPLICATION_PREFIX] = ReplicationResource(self)
+
         root_resource = create_resource_tree(resources)
         if tls:
             reactor.listenSSL(
@@ -382,7 +386,7 @@ def setup(config_options):
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
 
-    database_engine = create_engine(config.database_config["name"])
+    database_engine = create_engine(config)
     config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
 
     hs = SynapseHomeServer(
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 2fcf872449..2e96c09013 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -97,4 +97,7 @@ class ContentRepositoryConfig(Config):
         - width: 640
           height: 480
           method: scale
+        - width: 800
+          height: 600
+          method: scale
         """ % locals()
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 90718192dd..e8bfbe7cb5 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -543,8 +543,19 @@ class FederationServer(FederationBase):
         return event
 
     @defer.inlineCallbacks
-    def exchange_third_party_invite(self, invite):
-        ret = yield self.handler.exchange_third_party_invite(invite)
+    def exchange_third_party_invite(
+            self,
+            sender_user_id,
+            target_user_id,
+            room_id,
+            signed,
+    ):
+        ret = yield self.handler.exchange_third_party_invite(
+            sender_user_id,
+            target_user_id,
+            room_id,
+            signed,
+        )
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 65e054f7dd..6e92e2f8f4 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -425,7 +425,17 @@ class On3pidBindServlet(BaseFederationServlet):
             last_exception = None
             for invite in content["invites"]:
                 try:
-                    yield self.handler.exchange_third_party_invite(invite)
+                    if "signed" not in invite or "token" not in invite["signed"]:
+                        message = ("Rejecting received notification of third-"
+                                   "party invite without signed: %s" % (invite,))
+                        logger.info(message)
+                        raise SynapseError(400, message)
+                    yield self.handler.exchange_third_party_invite(
+                        invite["sender"],
+                        invite["mxid"],
+                        invite["room_id"],
+                        invite["signed"],
+                    )
                 except Exception as e:
                     last_exception = e
             if last_exception:
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 5b27ec1362..4d9787c1a8 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -160,10 +160,10 @@ class BaseHandler(object):
         )
         defer.returnValue(res.get(user_id, []))
 
-    def ratelimit(self, user_id):
+    def ratelimit(self, requester):
         time_now = self.clock.time()
         allowed, time_allowed = self.ratelimiter.send_message(
-            user_id, time_now,
+            requester.user.to_string(), time_now,
             msg_rate_hz=self.hs.config.rc_messages_per_second,
             burst_count=self.hs.config.rc_message_burst_count,
         )
@@ -199,8 +199,7 @@ class BaseHandler(object):
         # events in the room, because we don't know enough about the graph
         # fragment we received to treat it like a graph, so the above returned
         # no relevant events. It may have returned some events (if we have
-        # joined and left the room), but not useful ones, like the invite. So we
-        # forcibly set our context to the invite we received over federation.
+        # joined and left the room), but not useful ones, like the invite.
         if (
             not self.is_host_in_room(context.current_state) and
             builder.type == EventTypes.Member
@@ -208,7 +207,27 @@ class BaseHandler(object):
             prev_member_event = yield self.store.get_room_member(
                 builder.sender, builder.room_id
             )
-            if prev_member_event:
+
+            # The prev_member_event may already be in context.current_state,
+            # despite us not being present in the room; in particular, if
+            # inviting user, and all other local users, have already left.
+            #
+            # In that case, we have all the information we need, and we don't
+            # want to drop "context" - not least because we may need to handle
+            # the invite locally, which will require us to have the whole
+            # context (not just prev_member_event) to auth it.
+            #
+            context_event_ids = (
+                e.event_id for e in context.current_state.values()
+            )
+
+            if (
+                prev_member_event and
+                prev_member_event.event_id not in context_event_ids
+            ):
+                # The prev_member_event is missing from context, so it must
+                # have arrived over federation and is an outlier. We forcibly
+                # set our context to the invite we received over federation
                 builder.prev_events = (
                     prev_member_event.event_id,
                     prev_member_event.prev_events
@@ -263,11 +282,18 @@ class BaseHandler(object):
         return False
 
     @defer.inlineCallbacks
-    def handle_new_client_event(self, event, context, ratelimit=True, extra_users=[]):
+    def handle_new_client_event(
+        self,
+        requester,
+        event,
+        context,
+        ratelimit=True,
+        extra_users=[]
+    ):
         # We now need to go and hit out to wherever we need to hit out to.
 
         if ratelimit:
-            self.ratelimit(event.sender)
+            self.ratelimit(requester)
 
         self.auth.check(event, auth_events=context.current_state)
 
@@ -293,6 +319,12 @@ class BaseHandler(object):
 
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.INVITE:
+                def is_inviter_member_event(e):
+                    return (
+                        e.type == EventTypes.Member and
+                        e.sender == event.sender
+                    )
+
                 event.unsigned["invite_room_state"] = [
                     {
                         "type": e.type,
@@ -306,7 +338,7 @@ class BaseHandler(object):
                         EventTypes.CanonicalAlias,
                         EventTypes.RoomAvatar,
                         EventTypes.Name,
-                    )
+                    ) or is_inviter_member_event(e)
                 ]
 
                 invitee = UserID.from_string(event.state_key)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 62e82a2570..7a4afe446d 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -477,4 +477,4 @@ class AuthHandler(BaseHandler):
         Returns:
             Whether self.hash(password) == stored_hash (bool).
         """
-        return bcrypt.checkpw(password, stored_hash)
+        return bcrypt.hashpw(password, stored_hash) == stored_hash
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index e0a778e7ff..c4aaa11918 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -17,9 +17,9 @@
 from twisted.internet import defer
 from ._base import BaseHandler
 
-from synapse.api.errors import SynapseError, Codes, CodeMessageException
+from synapse.api.errors import SynapseError, Codes, CodeMessageException, AuthError
 from synapse.api.constants import EventTypes
-from synapse.types import RoomAlias
+from synapse.types import RoomAlias, UserID
 
 import logging
 import string
@@ -38,7 +38,7 @@ class DirectoryHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def _create_association(self, room_alias, room_id, servers=None):
+    def _create_association(self, room_alias, room_id, servers=None, creator=None):
         # general association creation for both human users and app services
 
         for wchar in string.whitespace:
@@ -60,7 +60,8 @@ class DirectoryHandler(BaseHandler):
         yield self.store.create_room_alias_association(
             room_alias,
             room_id,
-            servers
+            servers,
+            creator=creator,
         )
 
     @defer.inlineCallbacks
@@ -77,7 +78,7 @@ class DirectoryHandler(BaseHandler):
                 400, "This alias is reserved by an application service.",
                 errcode=Codes.EXCLUSIVE
             )
-        yield self._create_association(room_alias, room_id, servers)
+        yield self._create_association(room_alias, room_id, servers, creator=user_id)
 
     @defer.inlineCallbacks
     def create_appservice_association(self, service, room_alias, room_id,
@@ -95,7 +96,11 @@ class DirectoryHandler(BaseHandler):
     def delete_association(self, user_id, room_alias):
         # association deletion for human users
 
-        # TODO Check if server admin
+        can_delete = yield self._user_can_delete_alias(room_alias, user_id)
+        if not can_delete:
+            raise AuthError(
+                403, "You don't have permission to delete the alias.",
+            )
 
         can_delete = yield self.can_modify_alias(
             room_alias,
@@ -212,17 +217,21 @@ class DirectoryHandler(BaseHandler):
             )
 
     @defer.inlineCallbacks
-    def send_room_alias_update_event(self, user_id, room_id):
+    def send_room_alias_update_event(self, requester, user_id, room_id):
         aliases = yield self.store.get_aliases_for_room(room_id)
 
         msg_handler = self.hs.get_handlers().message_handler
-        yield msg_handler.create_and_send_nonmember_event({
-            "type": EventTypes.Aliases,
-            "state_key": self.hs.hostname,
-            "room_id": room_id,
-            "sender": user_id,
-            "content": {"aliases": aliases},
-        }, ratelimit=False)
+        yield msg_handler.create_and_send_nonmember_event(
+            requester,
+            {
+                "type": EventTypes.Aliases,
+                "state_key": self.hs.hostname,
+                "room_id": room_id,
+                "sender": user_id,
+                "content": {"aliases": aliases},
+            },
+            ratelimit=False
+        )
 
     @defer.inlineCallbacks
     def get_association_from_room_alias(self, room_alias):
@@ -257,3 +266,13 @@ class DirectoryHandler(BaseHandler):
                 return
         # either no interested services, or no service with an exclusive lock
         defer.returnValue(True)
+
+    @defer.inlineCallbacks
+    def _user_can_delete_alias(self, alias, user_id):
+        creator = yield self.store.get_room_alias_creator(alias.to_string())
+
+        if creator and creator == user_id:
+            defer.returnValue(True)
+
+        is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
+        defer.returnValue(is_admin)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ac15f9e5dd..6e50b0963e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,6 +14,9 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
+from signedjson.key import decode_verify_key_bytes
+from signedjson.sign import verify_signed_json
+from unpaddedbase64 import decode_base64
 
 from ._base import BaseHandler
 
@@ -1620,19 +1623,15 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def exchange_third_party_invite(self, invite):
-        sender = invite["sender"]
-        room_id = invite["room_id"]
-
-        if "signed" not in invite or "token" not in invite["signed"]:
-            logger.info(
-                "Discarding received notification of third party invite "
-                "without signed: %s" % (invite,)
-            )
-            return
-
+    def exchange_third_party_invite(
+            self,
+            sender_user_id,
+            target_user_id,
+            room_id,
+            signed,
+    ):
         third_party_invite = {
-            "signed": invite["signed"],
+            "signed": signed,
         }
 
         event_dict = {
@@ -1642,8 +1641,8 @@ class FederationHandler(BaseHandler):
                 "third_party_invite": third_party_invite,
             },
             "room_id": room_id,
-            "sender": sender,
-            "state_key": invite["mxid"],
+            "sender": sender_user_id,
+            "state_key": target_user_id,
         }
 
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
@@ -1656,11 +1655,11 @@ class FederationHandler(BaseHandler):
             )
 
             self.auth.check(event, context.current_state)
-            yield self._validate_keyserver(event, auth_events=context.current_state)
+            yield self._check_signature(event, auth_events=context.current_state)
             member_handler = self.hs.get_handlers().room_member_handler
-            yield member_handler.send_membership_event(event, context, from_client=False)
+            yield member_handler.send_membership_event(None, event, context)
         else:
-            destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)])
+            destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
             yield self.replication_layer.forward_third_party_invite(
                 destinations,
                 room_id,
@@ -1681,13 +1680,13 @@ class FederationHandler(BaseHandler):
         )
 
         self.auth.check(event, auth_events=context.current_state)
-        yield self._validate_keyserver(event, auth_events=context.current_state)
+        yield self._check_signature(event, auth_events=context.current_state)
 
         returned_invite = yield self.send_invite(origin, event)
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
         member_handler = self.hs.get_handlers().room_member_handler
-        yield member_handler.send_membership_event(event, context, from_client=False)
+        yield member_handler.send_membership_event(None, event, context)
 
     @defer.inlineCallbacks
     def add_display_name_to_third_party_invite(self, event_dict, event, context):
@@ -1711,17 +1710,69 @@ class FederationHandler(BaseHandler):
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
-    def _validate_keyserver(self, event, auth_events):
-        token = event.content["third_party_invite"]["signed"]["token"]
+    def _check_signature(self, event, auth_events):
+        """
+        Checks that the signature in the event is consistent with its invite.
+        :param event (Event): The m.room.member event to check
+        :param auth_events (dict<(event type, state_key), event>)
+
+        :raises
+            AuthError if signature didn't match any keys, or key has been
+                revoked,
+            SynapseError if a transient error meant a key couldn't be checked
+                for revocation.
+        """
+        signed = event.content["third_party_invite"]["signed"]
+        token = signed["token"]
 
         invite_event = auth_events.get(
             (EventTypes.ThirdPartyInvite, token,)
         )
 
+        if not invite_event:
+            raise AuthError(403, "Could not find invite")
+
+        last_exception = None
+        for public_key_object in self.hs.get_auth().get_public_keys(invite_event):
+            try:
+                for server, signature_block in signed["signatures"].items():
+                    for key_name, encoded_signature in signature_block.items():
+                        if not key_name.startswith("ed25519:"):
+                            continue
+
+                        public_key = public_key_object["public_key"]
+                        verify_key = decode_verify_key_bytes(
+                            key_name,
+                            decode_base64(public_key)
+                        )
+                        verify_signed_json(signed, server, verify_key)
+                        if "key_validity_url" in public_key_object:
+                            yield self._check_key_revocation(
+                                public_key,
+                                public_key_object["key_validity_url"]
+                            )
+                        return
+            except Exception as e:
+                last_exception = e
+        raise last_exception
+
+    @defer.inlineCallbacks
+    def _check_key_revocation(self, public_key, url):
+        """
+        Checks whether public_key has been revoked.
+
+        :param public_key (str): base-64 encoded public key.
+        :param url (str): Key revocation URL.
+
+        :raises
+            AuthError if they key has been revoked.
+            SynapseError if a transient error meant a key couldn't be checked
+                for revocation.
+        """
         try:
             response = yield self.hs.get_simple_http_client().get_json(
-                invite_event.content["key_validity_url"],
-                {"public_key": invite_event.content["public_key"]}
+                url,
+                {"public_key": public_key}
             )
         except Exception:
             raise SynapseError(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index afa7c9c36c..cace1cb82a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -215,7 +215,7 @@ class MessageHandler(BaseHandler):
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
-    def send_nonmember_event(self, event, context, ratelimit=True):
+    def send_nonmember_event(self, requester, event, context, ratelimit=True):
         """
         Persists and notifies local clients and federation of an event.
 
@@ -241,6 +241,7 @@ class MessageHandler(BaseHandler):
                 defer.returnValue(prev_state)
 
         yield self.handle_new_client_event(
+            requester=requester,
             event=event,
             context=context,
             ratelimit=ratelimit,
@@ -268,9 +269,9 @@ class MessageHandler(BaseHandler):
     @defer.inlineCallbacks
     def create_and_send_nonmember_event(
         self,
+        requester,
         event_dict,
         ratelimit=True,
-        token_id=None,
         txn_id=None
     ):
         """
@@ -280,10 +281,11 @@ class MessageHandler(BaseHandler):
         """
         event, context = yield self.create_event(
             event_dict,
-            token_id=token_id,
+            token_id=requester.access_token_id,
             txn_id=txn_id
         )
         yield self.send_nonmember_event(
+            requester,
             event,
             context,
             ratelimit=ratelimit,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index aed640450f..f6cf343174 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -46,6 +46,7 @@ logger = logging.getLogger(__name__)
 metrics = synapse.metrics.get_metrics_for(__name__)
 
 notified_presence_counter = metrics.register_counter("notified_presence")
+federation_presence_out_counter = metrics.register_counter("federation_presence_out")
 presence_updates_counter = metrics.register_counter("presence_updates")
 timers_fired_counter = metrics.register_counter("timers_fired")
 federation_presence_counter = metrics.register_counter("federation_presence")
@@ -129,6 +130,10 @@ class PresenceHandler(BaseHandler):
             for state in active_presence
         }
 
+        metrics.register_callback(
+            "user_to_current_state_size", lambda: len(self.user_to_current_state)
+        )
+
         now = self.clock.time_msec()
         for state in active_presence:
             self.wheel_timer.insert(
@@ -259,6 +264,8 @@ class PresenceHandler(BaseHandler):
                 if user_id not in to_notify
             }
             if to_federation_ping:
+                federation_presence_out_counter.inc_by(len(to_federation_ping))
+
                 _, _, hosts_to_states = yield self._get_interested_parties(
                     to_federation_ping.values()
                 )
@@ -522,6 +529,7 @@ class PresenceHandler(BaseHandler):
                 new_fields["last_active_ts"] = now - last_active_ago
 
             new_fields["status_msg"] = push.get("status_msg", None)
+            new_fields["currently_active"] = push.get("currently_active", False)
 
             prev_state = yield self.current_state_for_user(user_id)
             updates.append(prev_state.copy_and_replace(**new_fields))
@@ -770,6 +778,25 @@ class PresenceHandler(BaseHandler):
 
         defer.returnValue(observer_user.to_string() in accepted_observers)
 
+    @defer.inlineCallbacks
+    def get_all_presence_updates(self, last_id, current_id):
+        """
+        Gets a list of presence update rows from between the given stream ids.
+        Each row has:
+        - stream_id(str)
+        - user_id(str)
+        - state(str)
+        - last_active_ts(int)
+        - last_federation_update_ts(int)
+        - last_user_sync_ts(int)
+        - status_msg(int)
+        - currently_active(int)
+        """
+        # TODO(markjh): replicate the unpersisted changes.
+        # This could use the in-memory stores for recent changes.
+        rows = yield self.store.get_all_presence_updates(last_id, current_id)
+        defer.returnValue(rows)
+
 
 def should_notify(old_state, new_state):
     """Decides if a presence state change should be sent to interested parties.
@@ -835,39 +862,66 @@ class PresenceEventSource(object):
         # We don't try and limit the presence updates by the current token, as
         # sending down the rare duplicate is not a concern.
 
-        user_id = user.to_string()
-        if from_key is not None:
-            from_key = int(from_key)
-        room_ids = room_ids or []
-
-        presence = self.hs.get_handlers().presence_handler
+        with Measure(self.clock, "presence.get_new_events"):
+            user_id = user.to_string()
+            if from_key is not None:
+                from_key = int(from_key)
+            room_ids = room_ids or []
 
-        if not room_ids:
-            rooms = yield self.store.get_rooms_for_user(user_id)
-            room_ids = set(e.room_id for e in rooms)
+            presence = self.hs.get_handlers().presence_handler
+            stream_change_cache = self.store.presence_stream_cache
 
-        user_ids_to_check = set()
-        for room_id in room_ids:
-            users = yield self.store.get_users_in_room(room_id)
-            user_ids_to_check.update(users)
-
-        plist = yield self.store.get_presence_list_accepted(user.localpart)
-        user_ids_to_check.update([row["observed_user_id"] for row in plist])
-
-        # Always include yourself. Only really matters for when the user is
-        # not in any rooms, but still.
-        user_ids_to_check.add(user_id)
-
-        max_token = self.store.get_current_presence_token()
-
-        if from_key:
-            user_ids_changed = self.store.presence_stream_cache.get_entities_changed(
-                user_ids_to_check, from_key,
-            )
-        else:
-            user_ids_changed = user_ids_to_check
-
-        updates = yield presence.current_state_for_users(user_ids_changed)
+            if not room_ids:
+                rooms = yield self.store.get_rooms_for_user(user_id)
+                room_ids = set(e.room_id for e in rooms)
+            else:
+                room_ids = set(room_ids)
+
+            max_token = self.store.get_current_presence_token()
+
+            plist = yield self.store.get_presence_list_accepted(user.localpart)
+            friends = set(row["observed_user_id"] for row in plist)
+            friends.add(user_id)  # So that we receive our own presence
+
+            user_ids_changed = set()
+            changed = None
+            if from_key and max_token - from_key < 100:
+                # For small deltas, its quicker to get all changes and then
+                # work out if we share a room or they're in our presence list
+                changed = stream_change_cache.get_all_entities_changed(from_key)
+
+            # get_all_entities_changed can return None
+            if changed is not None:
+                for other_user_id in changed:
+                    if other_user_id in friends:
+                        user_ids_changed.add(other_user_id)
+                        continue
+                    other_rooms = yield self.store.get_rooms_for_user(other_user_id)
+                    if room_ids.intersection(e.room_id for e in other_rooms):
+                        user_ids_changed.add(other_user_id)
+                        continue
+            else:
+                # Too many possible updates. Find all users we can see and check
+                # if any of them have changed.
+                user_ids_to_check = set()
+                for room_id in room_ids:
+                    users = yield self.store.get_users_in_room(room_id)
+                    user_ids_to_check.update(users)
+
+                user_ids_to_check.update(friends)
+
+                # Always include yourself. Only really matters for when the user is
+                # not in any rooms, but still.
+                user_ids_to_check.add(user_id)
+
+                if from_key:
+                    user_ids_changed = stream_change_cache.get_entities_changed(
+                        user_ids_to_check, from_key,
+                    )
+                else:
+                    user_ids_changed = user_ids_to_check
+
+            updates = yield presence.current_state_for_users(user_ids_changed)
 
         now = self.clock.time_msec()
 
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index c9ad5944e6..b45eafbb49 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -89,13 +89,13 @@ class ProfileHandler(BaseHandler):
                 defer.returnValue(result["displayname"])
 
     @defer.inlineCallbacks
-    def set_displayname(self, target_user, auth_user, new_displayname):
+    def set_displayname(self, target_user, requester, new_displayname):
         """target_user is the user whose displayname is to be changed;
         auth_user is the user attempting to make this change."""
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if target_user != auth_user:
+        if target_user != requester.user:
             raise AuthError(400, "Cannot set another user's displayname")
 
         if new_displayname == '':
@@ -109,7 +109,7 @@ class ProfileHandler(BaseHandler):
             "displayname": new_displayname,
         })
 
-        yield self._update_join_states(target_user)
+        yield self._update_join_states(requester)
 
     @defer.inlineCallbacks
     def get_avatar_url(self, target_user):
@@ -139,13 +139,13 @@ class ProfileHandler(BaseHandler):
             defer.returnValue(result["avatar_url"])
 
     @defer.inlineCallbacks
-    def set_avatar_url(self, target_user, auth_user, new_avatar_url):
+    def set_avatar_url(self, target_user, requester, new_avatar_url):
         """target_user is the user whose avatar_url is to be changed;
         auth_user is the user attempting to make this change."""
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if target_user != auth_user:
+        if target_user != requester.user:
             raise AuthError(400, "Cannot set another user's avatar_url")
 
         yield self.store.set_profile_avatar_url(
@@ -156,7 +156,7 @@ class ProfileHandler(BaseHandler):
             "avatar_url": new_avatar_url,
         })
 
-        yield self._update_join_states(target_user)
+        yield self._update_join_states(requester)
 
     @defer.inlineCallbacks
     def collect_presencelike_data(self, user, state):
@@ -199,11 +199,12 @@ class ProfileHandler(BaseHandler):
         defer.returnValue(response)
 
     @defer.inlineCallbacks
-    def _update_join_states(self, user):
+    def _update_join_states(self, requester):
+        user = requester.user
         if not self.hs.is_mine(user):
             return
 
-        self.ratelimit(user.to_string())
+        self.ratelimit(requester)
 
         joins = yield self.store.get_rooms_for_user(
             user.to_string(),
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index de4c694714..935c339707 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -36,8 +36,6 @@ class ReceiptsHandler(BaseHandler):
         )
         self.clock = self.hs.get_clock()
 
-        self._receipt_cache = None
-
     @defer.inlineCallbacks
     def received_client_receipt(self, room_id, receipt_type, user_id,
                                 event_id):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index f8959e5d82..c5e5b28811 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -157,6 +157,7 @@ class RegistrationHandler(BaseHandler):
                     )
                 except SynapseError:
                     # if user id is taken, just generate another
+                    user = None
                     user_id = None
                     token = None
                     attempts += 1
@@ -349,3 +350,18 @@ class RegistrationHandler(BaseHandler):
 
     def auth_handler(self):
         return self.hs.get_handlers().auth_handler
+
+    @defer.inlineCallbacks
+    def guest_access_token_for(self, medium, address, inviter_user_id):
+        access_token = yield self.store.get_3pid_guest_access_token(medium, address)
+        if access_token:
+            defer.returnValue(access_token)
+
+        _, access_token = yield self.register(
+            generate_token=True,
+            make_guest=True
+        )
+        access_token = yield self.store.save_or_get_3pid_guest_access_token(
+            medium, address, access_token, inviter_user_id
+        )
+        defer.returnValue(access_token)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index b00cac4bd4..0cb6c521c4 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 
-from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken
+from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken, Requester
 from synapse.api.constants import (
     EventTypes, Membership, JoinRules, RoomCreationPreset,
 )
@@ -90,7 +90,7 @@ class RoomCreationHandler(BaseHandler):
         """
         user_id = requester.user.to_string()
 
-        self.ratelimit(user_id)
+        self.ratelimit(requester)
 
         if "room_alias_name" in config:
             for wchar in string.whitespace:
@@ -185,26 +185,32 @@ class RoomCreationHandler(BaseHandler):
 
         if "name" in config:
             name = config["name"]
-            yield msg_handler.create_and_send_nonmember_event({
-                "type": EventTypes.Name,
-                "room_id": room_id,
-                "sender": user_id,
-                "state_key": "",
-                "content": {"name": name},
-            }, ratelimit=False)
+            yield msg_handler.create_and_send_nonmember_event(
+                requester,
+                {
+                    "type": EventTypes.Name,
+                    "room_id": room_id,
+                    "sender": user_id,
+                    "state_key": "",
+                    "content": {"name": name},
+                },
+                ratelimit=False)
 
         if "topic" in config:
             topic = config["topic"]
-            yield msg_handler.create_and_send_nonmember_event({
-                "type": EventTypes.Topic,
-                "room_id": room_id,
-                "sender": user_id,
-                "state_key": "",
-                "content": {"topic": topic},
-            }, ratelimit=False)
+            yield msg_handler.create_and_send_nonmember_event(
+                requester,
+                {
+                    "type": EventTypes.Topic,
+                    "room_id": room_id,
+                    "sender": user_id,
+                    "state_key": "",
+                    "content": {"topic": topic},
+                },
+                ratelimit=False)
 
         for invitee in invite_list:
-            room_member_handler.update_membership(
+            yield room_member_handler.update_membership(
                 requester,
                 UserID.from_string(invitee),
                 room_id,
@@ -231,7 +237,7 @@ class RoomCreationHandler(BaseHandler):
         if room_alias:
             result["room_alias"] = room_alias.to_string()
             yield directory_handler.send_room_alias_update_event(
-                user_id, room_id
+                requester, user_id, room_id
             )
 
         defer.returnValue(result)
@@ -263,7 +269,11 @@ class RoomCreationHandler(BaseHandler):
         @defer.inlineCallbacks
         def send(etype, content, **kwargs):
             event = create(etype, content, **kwargs)
-            yield msg_handler.create_and_send_nonmember_event(event, ratelimit=False)
+            yield msg_handler.create_and_send_nonmember_event(
+                creator,
+                event,
+                ratelimit=False
+            )
 
         config = RoomCreationHandler.PRESETS_DICT[preset_config]
 
@@ -398,6 +408,7 @@ class RoomMemberHandler(BaseHandler):
             action,
             txn_id=None,
             remote_room_hosts=None,
+            third_party_signed=None,
             ratelimit=True,
     ):
         effective_membership_state = action
@@ -406,6 +417,15 @@ class RoomMemberHandler(BaseHandler):
         elif action == "forget":
             effective_membership_state = "leave"
 
+        if third_party_signed is not None:
+            replication = self.hs.get_replication_layer()
+            yield replication.exchange_third_party_invite(
+                third_party_signed["sender"],
+                target.to_string(),
+                room_id,
+                third_party_signed,
+            )
+
         msg_handler = self.hs.get_handlers().message_handler
 
         content = {"membership": effective_membership_state}
@@ -444,12 +464,11 @@ class RoomMemberHandler(BaseHandler):
 
         member_handler = self.hs.get_handlers().room_member_handler
         yield member_handler.send_membership_event(
+            requester,
             event,
             context,
-            is_guest=requester.is_guest,
             ratelimit=ratelimit,
             remote_room_hosts=remote_room_hosts,
-            from_client=True,
         )
 
         if action == "forget":
@@ -458,17 +477,19 @@ class RoomMemberHandler(BaseHandler):
     @defer.inlineCallbacks
     def send_membership_event(
             self,
+            requester,
             event,
             context,
-            is_guest=False,
             remote_room_hosts=None,
             ratelimit=True,
-            from_client=True,
     ):
         """
         Change the membership status of a user in a room.
 
         Args:
+            requester (Requester): The local user who requested the membership
+                event. If None, certain checks, like whether this homeserver can
+                act as the sender, will be skipped.
             event (SynapseEvent): The membership event.
             context: The context of the event.
             is_guest (bool): Whether the sender is a guest.
@@ -476,19 +497,23 @@ class RoomMemberHandler(BaseHandler):
                 the room, and could be danced with in order to join this
                 homeserver for the first time.
             ratelimit (bool): Whether to rate limit this request.
-            from_client (bool): Whether this request is the result of a local
-                client request (rather than over federation). If so, we will
-                perform extra checks, like that this homeserver can act as this
-                client.
         Raises:
             SynapseError if there was a problem changing the membership.
         """
+        remote_room_hosts = remote_room_hosts or []
+
         target_user = UserID.from_string(event.state_key)
         room_id = event.room_id
 
-        if from_client:
+        if requester is not None:
             sender = UserID.from_string(event.sender)
+            assert sender == requester.user, (
+                "Sender (%s) must be same as requester (%s)" %
+                (sender, requester.user)
+            )
             assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
+        else:
+            requester = Requester(target_user, None, False)
 
         message_handler = self.hs.get_handlers().message_handler
         prev_event = message_handler.deduplicate_state_event(event, context)
@@ -498,7 +523,7 @@ class RoomMemberHandler(BaseHandler):
         action = "send"
 
         if event.membership == Membership.JOIN:
-            if is_guest and not self._can_guest_join(context.current_state):
+            if requester.is_guest and not self._can_guest_join(context.current_state):
                 # This should be an auth check, but guests are a local concept,
                 # so don't really fit into the general auth process.
                 raise AuthError(403, "Guest access not allowed")
@@ -511,8 +536,24 @@ class RoomMemberHandler(BaseHandler):
                 action = "remote_join"
         elif event.membership == Membership.LEAVE:
             is_host_in_room = self.is_host_in_room(context.current_state)
+
             if not is_host_in_room:
-                action = "remote_reject"
+                # perhaps we've been invited
+                inviter = self.get_inviter(target_user.to_string(), context.current_state)
+                if not inviter:
+                    raise SynapseError(404, "Not a known room")
+
+                if self.hs.is_mine(inviter):
+                    # the inviter was on our server, but has now left. Carry on
+                    # with the normal rejection codepath.
+                    #
+                    # This is a bit of a hack, because the room might still be
+                    # active on other servers.
+                    pass
+                else:
+                    # send the rejection to the inviter's HS.
+                    remote_room_hosts = remote_room_hosts + [inviter.domain]
+                    action = "remote_reject"
 
         federation_handler = self.hs.get_handlers().federation_handler
 
@@ -531,16 +572,14 @@ class RoomMemberHandler(BaseHandler):
                 event.content,
             )
         elif action == "remote_reject":
-            inviter = self.get_inviter(target_user.to_string(), context.current_state)
-            if not inviter:
-                raise SynapseError(404, "No known servers")
             yield federation_handler.do_remotely_reject_invite(
-                [inviter.domain],
+                remote_room_hosts,
                 room_id,
                 event.user_id
             )
         else:
             yield self.handle_new_client_event(
+                requester,
                 event,
                 context,
                 extra_users=[target_user],
@@ -659,12 +698,12 @@ class RoomMemberHandler(BaseHandler):
             )
         else:
             yield self._make_and_store_3pid_invite(
+                requester,
                 id_server,
                 medium,
                 address,
                 room_id,
                 inviter,
-                requester.access_token_id,
                 txn_id=txn_id
             )
 
@@ -722,12 +761,12 @@ class RoomMemberHandler(BaseHandler):
     @defer.inlineCallbacks
     def _make_and_store_3pid_invite(
             self,
+            requester,
             id_server,
             medium,
             address,
             room_id,
             user,
-            token_id,
             txn_id
     ):
         room_state = yield self.hs.get_state_handler().get_current_state(room_id)
@@ -759,7 +798,7 @@ class RoomMemberHandler(BaseHandler):
         if room_avatar_event:
             room_avatar_url = room_avatar_event.content.get("url", "")
 
-        token, public_key, key_validity_url, display_name = (
+        token, public_keys, fallback_public_key, display_name = (
             yield self._ask_id_server_for_third_party_invite(
                 id_server=id_server,
                 medium=medium,
@@ -774,20 +813,24 @@ class RoomMemberHandler(BaseHandler):
                 inviter_avatar_url=inviter_avatar_url
             )
         )
+
         msg_handler = self.hs.get_handlers().message_handler
         yield msg_handler.create_and_send_nonmember_event(
+            requester,
             {
                 "type": EventTypes.ThirdPartyInvite,
                 "content": {
                     "display_name": display_name,
-                    "key_validity_url": key_validity_url,
-                    "public_key": public_key,
+                    "public_keys": public_keys,
+
+                    # For backwards compatibility:
+                    "key_validity_url": fallback_public_key["key_validity_url"],
+                    "public_key": fallback_public_key["public_key"],
                 },
                 "room_id": room_id,
                 "sender": user.to_string(),
                 "state_key": token,
             },
-            token_id=token_id,
             txn_id=txn_id,
         )
 
@@ -806,6 +849,41 @@ class RoomMemberHandler(BaseHandler):
             inviter_display_name,
             inviter_avatar_url
     ):
+        """
+        Asks an identity server for a third party invite.
+
+        :param id_server (str): hostname + optional port for the identity server.
+        :param medium (str): The literal string "email".
+        :param address (str): The third party address being invited.
+        :param room_id (str): The ID of the room to which the user is invited.
+        :param inviter_user_id (str): The user ID of the inviter.
+        :param room_alias (str): An alias for the room, for cosmetic
+            notifications.
+        :param room_avatar_url (str): The URL of the room's avatar, for cosmetic
+            notifications.
+        :param room_join_rules (str): The join rules of the email
+            (e.g. "public").
+        :param room_name (str): The m.room.name of the room.
+        :param inviter_display_name (str): The current display name of the
+            inviter.
+        :param inviter_avatar_url (str): The URL of the inviter's avatar.
+
+        :return: A deferred tuple containing:
+            token (str): The token which must be signed to prove authenticity.
+            public_keys ([{"public_key": str, "key_validity_url": str}]):
+                public_key is a base64-encoded ed25519 public key.
+            fallback_public_key: One element from public_keys.
+            display_name (str): A user-friendly name to represent the invited
+                user.
+        """
+
+        registration_handler = self.hs.get_handlers().registration_handler
+        guest_access_token = yield registration_handler.guest_access_token_for(
+            medium=medium,
+            address=address,
+            inviter_user_id=inviter_user_id,
+        )
+
         is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
             id_server_scheme, id_server,
         )
@@ -822,16 +900,26 @@ class RoomMemberHandler(BaseHandler):
                 "sender": inviter_user_id,
                 "sender_display_name": inviter_display_name,
                 "sender_avatar_url": inviter_avatar_url,
+                "guest_access_token": guest_access_token,
             }
         )
         # TODO: Check for success
         token = data["token"]
-        public_key = data["public_key"]
+        public_keys = data.get("public_keys", [])
+        if "public_key" in data:
+            fallback_public_key = {
+                "public_key": data["public_key"],
+                "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+                    id_server_scheme, id_server,
+                ),
+            }
+        else:
+            fallback_public_key = public_keys[0]
+
+        if not public_keys:
+            public_keys.append(fallback_public_key)
         display_name = data["display_name"]
-        key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
-            id_server_scheme, id_server,
-        )
-        defer.returnValue((token, public_key, key_validity_url, display_name))
+        defer.returnValue((token, public_keys, fallback_public_key, display_name))
 
     def forget(self, user, room_id):
         return self.store.forget(user.to_string(), room_id)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c87ff75c05..fded6e4009 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -121,7 +121,11 @@ class SyncResult(collections.namedtuple("SyncResult", [
         events.
         """
         return bool(
-            self.presence or self.joined or self.invited or self.archived
+            self.presence or
+            self.joined or
+            self.invited or
+            self.archived or
+            self.account_data
         )
 
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index b16d0017df..8ce27f49ec 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -25,6 +25,7 @@ from synapse.types import UserID
 import logging
 
 from collections import namedtuple
+import ujson as json
 
 logger = logging.getLogger(__name__)
 
@@ -219,6 +220,19 @@ class TypingNotificationHandler(BaseHandler):
                 "typing_key", self._latest_room_serial, rooms=[room_id]
             )
 
+    def get_all_typing_updates(self, last_id, current_id):
+        # TODO: Work out a way to do this without scanning the entire state.
+        rows = []
+        for room_id, serial in self._room_serials.items():
+            if last_id < serial and serial <= current_id:
+                typing = self._room_typing[room_id]
+                typing_bytes = json.dumps([
+                    u.to_string() for u in typing
+                ], ensure_ascii=False)
+                rows.append((serial, room_id, typing_bytes))
+        rows.sort()
+        return rows
+
 
 class TypingNotificationEventSource(object):
     def __init__(self, hs):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 560866b26e..3c36a20868 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -159,6 +159,8 @@ class Notifier(object):
             self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
         )
 
+        self.replication_deferred = ObservableDeferred(defer.Deferred())
+
         # This is not a very cheap test to perform, but it's only executed
         # when rendering the metrics page, which is likely once per minute at
         # most when scraping it.
@@ -207,6 +209,8 @@ class Notifier(object):
             ))
             self._notify_pending_new_room_events(max_room_stream_id)
 
+            self.notify_replication()
+
     def _notify_pending_new_room_events(self, max_room_stream_id):
         """Notify for the room events that were queued waiting for a previous
         event to be persisted.
@@ -276,6 +280,8 @@ class Notifier(object):
                 except:
                     logger.exception("Failed to notify listener")
 
+            self.notify_replication()
+
     @defer.inlineCallbacks
     def wait_for_events(self, user_id, timeout, callback, room_ids=None,
                         from_token=StreamToken("s0", "0", "0", "0", "0")):
@@ -479,3 +485,45 @@ class Notifier(object):
             room_streams = self.room_to_user_streams.setdefault(room_id, set())
             room_streams.add(new_user_stream)
             new_user_stream.rooms.add(room_id)
+
+    def notify_replication(self):
+        """Notify the any replication listeners that there's a new event"""
+        with PreserveLoggingContext():
+            deferred = self.replication_deferred
+            self.replication_deferred = ObservableDeferred(defer.Deferred())
+            deferred.callback(None)
+
+    @defer.inlineCallbacks
+    def wait_for_replication(self, callback, timeout):
+        """Wait for an event to happen.
+
+        :param callback:
+            Gets called whenever an event happens. If this returns a truthy
+            value then ``wait_for_replication`` returns, otherwise it waits
+            for another event.
+        :param int timeout:
+            How many milliseconds to wait for callback return a truthy value.
+        :returns:
+            A deferred that resolves with the value returned by the callback.
+        """
+        listener = _NotificationListener(None)
+
+        def timed_out():
+            listener.deferred.cancel()
+
+        timer = self.clock.call_later(timeout / 1000., timed_out)
+        while True:
+            listener.deferred = self.replication_deferred.observe()
+            result = yield callback()
+            if result:
+                break
+
+            try:
+                with PreserveLoggingContext():
+                    yield listener.deferred
+            except defer.CancelledError:
+                break
+
+        self.clock.cancel_call_later(timer, ignore_errs=True)
+
+        defer.returnValue(result)
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 0832c77cb4..86a2998bcc 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -13,46 +13,67 @@
 # limitations under the License.
 
 from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
+import copy
 
 
 def list_with_base_rules(rawrules):
+    """Combine the list of rules set by the user with the default push rules
+
+    :param list rawrules: The rules the user has modified or set.
+    :returns: A new list with the rules set by the user combined with the
+        defaults.
+    """
     ruleslist = []
 
+    # Grab the base rules that the user has modified.
+    # The modified base rules have a priority_class of -1.
+    modified_base_rules = {
+        r['rule_id']: r for r in rawrules if r['priority_class'] < 0
+    }
+
+    # Remove the modified base rules from the list, They'll be added back
+    # in the default postions in the list.
+    rawrules = [r for r in rawrules if r['priority_class'] >= 0]
+
     # shove the server default rules for each kind onto the end of each
     current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
 
     ruleslist.extend(make_base_prepend_rules(
-        PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+        PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
     ))
 
     for r in rawrules:
         if r['priority_class'] < current_prio_class:
             while r['priority_class'] < current_prio_class:
                 ruleslist.extend(make_base_append_rules(
-                    PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+                    PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
+                    modified_base_rules,
                 ))
                 current_prio_class -= 1
                 if current_prio_class > 0:
                     ruleslist.extend(make_base_prepend_rules(
-                        PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+                        PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
+                        modified_base_rules,
                     ))
 
         ruleslist.append(r)
 
     while current_prio_class > 0:
         ruleslist.extend(make_base_append_rules(
-            PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+            PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
+            modified_base_rules,
         ))
         current_prio_class -= 1
         if current_prio_class > 0:
             ruleslist.extend(make_base_prepend_rules(
-                PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+                PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
+                modified_base_rules,
             ))
 
     return ruleslist
 
 
-def make_base_append_rules(kind):
+def make_base_append_rules(kind, modified_base_rules):
     rules = []
 
     if kind == 'override':
@@ -62,15 +83,31 @@ def make_base_append_rules(kind):
     elif kind == 'content':
         rules = BASE_APPEND_CONTENT_RULES
 
+    # Copy the rules before modifying them
+    rules = copy.deepcopy(rules)
+    for r in rules:
+        # Only modify the actions, keep the conditions the same.
+        modified = modified_base_rules.get(r['rule_id'])
+        if modified:
+            r['actions'] = modified['actions']
+
     return rules
 
 
-def make_base_prepend_rules(kind):
+def make_base_prepend_rules(kind, modified_base_rules):
     rules = []
 
     if kind == 'override':
         rules = BASE_PREPEND_OVERRIDE_RULES
 
+    # Copy the rules before modifying them
+    rules = copy.deepcopy(rules)
+    for r in rules:
+        # Only modify the actions, keep the conditions the same.
+        modified = modified_base_rules.get(r['rule_id'])
+        if modified:
+            r['actions'] = modified['actions']
+
     return rules
 
 
@@ -263,18 +300,24 @@ BASE_APPEND_UNDERRIDE_RULES = [
 ]
 
 
+BASE_RULE_IDS = set()
+
 for r in BASE_APPEND_CONTENT_RULES:
     r['priority_class'] = PRIORITY_CLASS_MAP['content']
     r['default'] = True
+    BASE_RULE_IDS.add(r['rule_id'])
 
 for r in BASE_PREPEND_OVERRIDE_RULES:
     r['priority_class'] = PRIORITY_CLASS_MAP['override']
     r['default'] = True
+    BASE_RULE_IDS.add(r['rule_id'])
 
 for r in BASE_APPEND_OVRRIDE_RULES:
     r['priority_class'] = PRIORITY_CLASS_MAP['override']
     r['default'] = True
+    BASE_RULE_IDS.add(r['rule_id'])
 
 for r in BASE_APPEND_UNDERRIDE_RULES:
     r['priority_class'] = PRIORITY_CLASS_MAP['underride']
     r['default'] = True
+    BASE_RULE_IDS.add(r['rule_id'])
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 75bf3d13aa..35933324a4 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
 
 REQUIREMENTS = {
     "frozendict>=0.4": ["frozendict"],
-    "unpaddedbase64>=1.0.1": ["unpaddedbase64>=1.0.1"],
+    "unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
     "canonicaljson>=1.0.0": ["canonicaljson>=1.0.0"],
     "signedjson>=1.0.0": ["signedjson>=1.0.0"],
     "pynacl==0.3.0": ["nacl==0.3.0", "nacl.bindings"],
diff --git a/synapse/replication/__init__.py b/synapse/replication/__init__.py
new file mode 100644
index 0000000000..b7df13c9ee
--- /dev/null
+++ b/synapse/replication/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
new file mode 100644
index 0000000000..e0d039518d
--- /dev/null
+++ b/synapse/replication/resource.py
@@ -0,0 +1,320 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.http.servlet import parse_integer, parse_string
+from synapse.http.server import request_handler, finish_request
+
+from twisted.web.resource import Resource
+from twisted.web.server import NOT_DONE_YET
+from twisted.internet import defer
+
+import ujson as json
+
+import collections
+import logging
+
+logger = logging.getLogger(__name__)
+
+REPLICATION_PREFIX = "/_synapse/replication"
+
+STREAM_NAMES = (
+    ("events",),
+    ("presence",),
+    ("typing",),
+    ("receipts",),
+    ("user_account_data", "room_account_data", "tag_account_data",),
+    ("backfill",),
+)
+
+
+class ReplicationResource(Resource):
+    """
+    HTTP endpoint for extracting data from synapse.
+
+    The streams of data returned by the endpoint are controlled by the
+    parameters given to the API. To return a given stream pass a query
+    parameter with a position in the stream to return data from or the
+    special value "-1" to return data from the start of the stream.
+
+    If there is no data for any of the supplied streams after the given
+    position then the request will block until there is data for one
+    of the streams. This allows clients to long-poll this API.
+
+    The possible streams are:
+
+    * "streams": A special stream returing the positions of other streams.
+    * "events": The new events seen on the server.
+    * "presence": Presence updates.
+    * "typing": Typing updates.
+    * "receipts": Receipt updates.
+    * "user_account_data": Top-level per user account data.
+    * "room_account_data: Per room per user account data.
+    * "tag_account_data": Per room per user tags.
+    * "backfill": Old events that have been backfilled from other servers.
+
+    The API takes two additional query parameters:
+
+    * "timeout": How long to wait before returning an empty response.
+    * "limit": The maximum number of rows to return for the selected streams.
+
+    The response is a JSON object with keys for each stream with updates. Under
+    each key is a JSON object with:
+
+    * "postion": The current position of the stream.
+    * "field_names": The names of the fields in each row.
+    * "rows": The updates as an array of arrays.
+
+    There are a number of ways this API could be used:
+
+    1) To replicate the contents of the backing database to another database.
+    2) To be notified when the contents of a shared backing database changes.
+    3) To "tail" the activity happening on a server for debugging.
+
+    In the first case the client would track all of the streams and store it's
+    own copy of the data.
+
+    In the second case the client might theoretically just be able to follow
+    the "streams" stream to track where the other streams are. However in
+    practise it will probably need to get the contents of the streams in
+    order to expire the any in-memory caches. Whether it gets the contents
+    of the streams from this replication API or directly from the backing
+    store is a matter of taste.
+
+    In the third case the client would use the "streams" stream to find what
+    streams are available and their current positions. Then it can start
+    long-polling this replication API for new data on those streams.
+    """
+
+    isLeaf = True
+
+    def __init__(self, hs):
+        Resource.__init__(self)  # Resource is old-style, so no super()
+
+        self.version_string = hs.version_string
+        self.store = hs.get_datastore()
+        self.sources = hs.get_event_sources()
+        self.presence_handler = hs.get_handlers().presence_handler
+        self.typing_handler = hs.get_handlers().typing_notification_handler
+        self.notifier = hs.notifier
+
+    def render_GET(self, request):
+        self._async_render_GET(request)
+        return NOT_DONE_YET
+
+    @defer.inlineCallbacks
+    def current_replication_token(self):
+        stream_token = yield self.sources.get_current_token()
+        backfill_token = yield self.store.get_current_backfill_token()
+
+        defer.returnValue(_ReplicationToken(
+            stream_token.room_stream_id,
+            int(stream_token.presence_key),
+            int(stream_token.typing_key),
+            int(stream_token.receipt_key),
+            int(stream_token.account_data_key),
+            backfill_token,
+        ))
+
+    @request_handler
+    @defer.inlineCallbacks
+    def _async_render_GET(self, request):
+        limit = parse_integer(request, "limit", 100)
+        timeout = parse_integer(request, "timeout", 10 * 1000)
+
+        request.setHeader(b"Content-Type", b"application/json")
+        writer = _Writer(request)
+
+        @defer.inlineCallbacks
+        def replicate():
+            current_token = yield self.current_replication_token()
+            logger.info("Replicating up to %r", current_token)
+
+            yield self.account_data(writer, current_token, limit)
+            yield self.events(writer, current_token, limit)
+            yield self.presence(writer, current_token)  # TODO: implement limit
+            yield self.typing(writer, current_token)  # TODO: implement limit
+            yield self.receipts(writer, current_token, limit)
+            self.streams(writer, current_token)
+
+            logger.info("Replicated %d rows", writer.total)
+            defer.returnValue(writer.total)
+
+        yield self.notifier.wait_for_replication(replicate, timeout)
+
+        writer.finish()
+
+    def streams(self, writer, current_token):
+        request_token = parse_string(writer.request, "streams")
+
+        streams = []
+
+        if request_token is not None:
+            if request_token == "-1":
+                for names, position in zip(STREAM_NAMES, current_token):
+                    streams.extend((name, position) for name in names)
+            else:
+                items = zip(
+                    STREAM_NAMES,
+                    current_token,
+                    _ReplicationToken(request_token)
+                )
+                for names, current_id, last_id in items:
+                    if last_id < current_id:
+                        streams.extend((name, current_id) for name in names)
+
+            if streams:
+                writer.write_header_and_rows(
+                    "streams", streams, ("name", "position"),
+                    position=str(current_token)
+                )
+
+    @defer.inlineCallbacks
+    def events(self, writer, current_token, limit):
+        request_events = parse_integer(writer.request, "events")
+        request_backfill = parse_integer(writer.request, "backfill")
+
+        if request_events is not None or request_backfill is not None:
+            if request_events is None:
+                request_events = current_token.events
+            if request_backfill is None:
+                request_backfill = current_token.backfill
+            events_rows, backfill_rows = yield self.store.get_all_new_events(
+                request_backfill, request_events,
+                current_token.backfill, current_token.events,
+                limit
+            )
+            writer.write_header_and_rows(
+                "events", events_rows, ("position", "internal", "json")
+            )
+            writer.write_header_and_rows(
+                "backfill", backfill_rows, ("position", "internal", "json")
+            )
+
+    @defer.inlineCallbacks
+    def presence(self, writer, current_token):
+        current_position = current_token.presence
+
+        request_presence = parse_integer(writer.request, "presence")
+
+        if request_presence is not None:
+            presence_rows = yield self.presence_handler.get_all_presence_updates(
+                request_presence, current_position
+            )
+            writer.write_header_and_rows("presence", presence_rows, (
+                "position", "user_id", "state", "last_active_ts",
+                "last_federation_update_ts", "last_user_sync_ts",
+                "status_msg", "currently_active",
+            ))
+
+    @defer.inlineCallbacks
+    def typing(self, writer, current_token):
+        current_position = current_token.presence
+
+        request_typing = parse_integer(writer.request, "typing")
+
+        if request_typing is not None:
+            typing_rows = yield self.typing_handler.get_all_typing_updates(
+                request_typing, current_position
+            )
+            writer.write_header_and_rows("typing", typing_rows, (
+                "position", "room_id", "typing"
+            ))
+
+    @defer.inlineCallbacks
+    def receipts(self, writer, current_token, limit):
+        current_position = current_token.receipts
+
+        request_receipts = parse_integer(writer.request, "receipts")
+
+        if request_receipts is not None:
+            receipts_rows = yield self.store.get_all_updated_receipts(
+                request_receipts, current_position, limit
+            )
+            writer.write_header_and_rows("receipts", receipts_rows, (
+                "position", "room_id", "receipt_type", "user_id", "event_id", "data"
+            ))
+
+    @defer.inlineCallbacks
+    def account_data(self, writer, current_token, limit):
+        current_position = current_token.account_data
+
+        user_account_data = parse_integer(writer.request, "user_account_data")
+        room_account_data = parse_integer(writer.request, "room_account_data")
+        tag_account_data = parse_integer(writer.request, "tag_account_data")
+
+        if user_account_data is not None or room_account_data is not None:
+            if user_account_data is None:
+                user_account_data = current_position
+            if room_account_data is None:
+                room_account_data = current_position
+            user_rows, room_rows = yield self.store.get_all_updated_account_data(
+                user_account_data, room_account_data, current_position, limit
+            )
+            writer.write_header_and_rows("user_account_data", user_rows, (
+                "position", "user_id", "type", "content"
+            ))
+            writer.write_header_and_rows("room_account_data", room_rows, (
+                "position", "user_id", "room_id", "type", "content"
+            ))
+
+        if tag_account_data is not None:
+            tag_rows = yield self.store.get_all_updated_tags(
+                tag_account_data, current_position, limit
+            )
+            writer.write_header_and_rows("tag_account_data", tag_rows, (
+                "position", "user_id", "room_id", "tags"
+            ))
+
+
+class _Writer(object):
+    """Writes the streams as a JSON object as the response to the request"""
+    def __init__(self, request):
+        self.streams = {}
+        self.request = request
+        self.total = 0
+
+    def write_header_and_rows(self, name, rows, fields, position=None):
+        if not rows:
+            return
+
+        if position is None:
+            position = rows[-1][0]
+
+        self.streams[name] = {
+            "position": str(position),
+            "field_names": fields,
+            "rows": rows,
+        }
+
+        self.total += len(rows)
+
+    def finish(self):
+        self.request.write(json.dumps(self.streams, ensure_ascii=False))
+        finish_request(self.request)
+
+
+class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
+    "events", "presence", "typing", "receipts", "account_data", "backfill",
+))):
+    __slots__ = []
+
+    def __new__(cls, *args):
+        if len(args) == 1:
+            return cls(*(int(value) for value in args[0].split("_")))
+        else:
+            return super(_ReplicationToken, cls).__new__(cls, *args)
+
+    def __str__(self):
+        return "_".join(str(value) for value in self)
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index 74ec1e50e0..8bfe9fdea8 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -75,7 +75,11 @@ class ClientDirectoryServer(ClientV1RestServlet):
                 yield dir_handler.create_association(
                     user_id, room_alias, room_id, servers
                 )
-                yield dir_handler.send_room_alias_update_event(user_id, room_id)
+                yield dir_handler.send_room_alias_update_event(
+                    requester,
+                    user_id,
+                    room_id
+                )
             except SynapseError as e:
                 raise e
             except:
@@ -118,9 +122,6 @@ class ClientDirectoryServer(ClientV1RestServlet):
 
         requester = yield self.auth.get_user_by_req(request)
         user = requester.user
-        is_admin = yield self.auth.is_server_admin(user)
-        if not is_admin:
-            raise AuthError(403, "You need to be a server admin")
 
         room_alias = RoomAlias.from_string(room_alias)
 
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 79101106ac..f13272da8e 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -404,10 +404,12 @@ def _parse_json(request):
     try:
         content = json.loads(request.content.read())
         if type(content) != dict:
-            raise SynapseError(400, "Content must be a JSON object.")
+            raise SynapseError(
+                400, "Content must be a JSON object.", errcode=Codes.BAD_JSON
+            )
         return content
     except ValueError:
-        raise SynapseError(400, "Content not JSON.")
+        raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
 
 
 def register_servlets(hs, http_server):
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index 3c5a212920..953764bd8e 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -51,7 +51,7 @@ class ProfileDisplaynameRestServlet(ClientV1RestServlet):
             defer.returnValue((400, "Unable to parse name"))
 
         yield self.handlers.profile_handler.set_displayname(
-            user, requester.user, new_name)
+            user, requester, new_name)
 
         defer.returnValue((200, {}))
 
@@ -88,7 +88,7 @@ class ProfileAvatarURLRestServlet(ClientV1RestServlet):
             defer.returnValue((400, "Unable to parse name"))
 
         yield self.handlers.profile_handler.set_avatar_url(
-            user, requester.user, new_name)
+            user, requester, new_name)
 
         defer.returnValue((200, {}))
 
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index 5db2805d68..970a019223 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -22,7 +22,7 @@ from .base import ClientV1RestServlet, client_path_patterns
 from synapse.storage.push_rule import (
     InconsistentRuleException, RuleNotFoundException
 )
-import synapse.push.baserules as baserules
+from synapse.push.baserules import list_with_base_rules, BASE_RULE_IDS
 from synapse.push.rulekinds import (
     PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
 )
@@ -55,6 +55,10 @@ class PushRuleRestServlet(ClientV1RestServlet):
             yield self.set_rule_attr(requester.user.to_string(), spec, content)
             defer.returnValue((200, {}))
 
+        if spec['rule_id'].startswith('.'):
+            # Rule ids starting with '.' are reserved for server default rules.
+            raise SynapseError(400, "cannot add new rule_ids that start with '.'")
+
         try:
             (conditions, actions) = _rule_tuple_from_request_object(
                 spec['template'],
@@ -128,7 +132,7 @@ class PushRuleRestServlet(ClientV1RestServlet):
             ruleslist.append(rule)
 
         # We're going to be mutating this a lot, so do a deep copy
-        ruleslist = copy.deepcopy(baserules.list_with_base_rules(ruleslist))
+        ruleslist = copy.deepcopy(list_with_base_rules(ruleslist))
 
         rules = {'global': {}, 'device': {}}
 
@@ -197,13 +201,17 @@ class PushRuleRestServlet(ClientV1RestServlet):
             return self.hs.get_datastore().set_push_rule_enabled(
                 user_id, namespaced_rule_id, val
             )
-        else:
-            raise UnrecognizedRequestError()
-
-    def get_rule_attr(self, user_id, namespaced_rule_id, attr):
-        if attr == 'enabled':
-            return self.hs.get_datastore().get_push_rule_enabled_by_user_rule_id(
-                user_id, namespaced_rule_id
+        elif spec['attr'] == 'actions':
+            actions = val.get('actions')
+            _check_actions(actions)
+            namespaced_rule_id = _namespaced_rule_id_from_spec(spec)
+            rule_id = spec['rule_id']
+            is_default_rule = rule_id.startswith(".")
+            if is_default_rule:
+                if namespaced_rule_id not in BASE_RULE_IDS:
+                    raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,))
+            return self.hs.get_datastore().set_push_rule_actions(
+                user_id, namespaced_rule_id, actions, is_default_rule
             )
         else:
             raise UnrecognizedRequestError()
@@ -282,6 +290,15 @@ def _rule_tuple_from_request_object(rule_template, rule_id, req_obj):
         raise InvalidRuleException("No actions found")
     actions = req_obj['actions']
 
+    _check_actions(actions)
+
+    return conditions, actions
+
+
+def _check_actions(actions):
+    if not isinstance(actions, list):
+        raise InvalidRuleException("No actions found")
+
     for a in actions:
         if a in ['notify', 'dont_notify', 'coalesce']:
             pass
@@ -290,8 +307,6 @@ def _rule_tuple_from_request_object(rule_template, rule_id, req_obj):
         else:
             raise InvalidRuleException("Unrecognised action")
 
-    return conditions, actions
-
 
 def _add_empty_priority_class_arrays(d):
     for pc in PRIORITY_CLASS_MAP.keys():
@@ -332,7 +347,9 @@ def _filter_ruleset_with_path(ruleset, path):
 
     attr = path[0]
     if attr in the_rule:
-        return the_rule[attr]
+        # Make sure we return a JSON object as the attribute may be a
+        # JSON value.
+        return {attr: the_rule[attr]}
     else:
         raise UnrecognizedRequestError()
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index e6f5c5614a..cbf3673eff 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -158,12 +158,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
 
         if event_type == EventTypes.Member:
             yield self.handlers.room_member_handler.send_membership_event(
+                requester,
                 event,
                 context,
-                is_guest=requester.is_guest,
             )
         else:
-            yield msg_handler.send_nonmember_event(event, context)
+            yield msg_handler.send_nonmember_event(requester, event, context)
 
         defer.returnValue((200, {"event_id": event.event_id}))
 
@@ -183,13 +183,13 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
 
         msg_handler = self.handlers.message_handler
         event = yield msg_handler.create_and_send_nonmember_event(
+            requester,
             {
                 "type": event_type,
                 "content": content,
                 "room_id": room_id,
                 "sender": requester.user.to_string(),
             },
-            token_id=requester.access_token_id,
             txn_id=txn_id,
         )
 
@@ -228,6 +228,13 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
             allow_guest=True,
         )
 
+        try:
+            content = _parse_json(request)
+        except:
+            # Turns out we used to ignore the body entirely, and some clients
+            # cheekily send invalid bodies.
+            content = {}
+
         if RoomID.is_valid(room_identifier):
             room_id = room_identifier
             remote_room_hosts = None
@@ -248,6 +255,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
             action="join",
             txn_id=txn_id,
             remote_room_hosts=remote_room_hosts,
+            third_party_signed=content.get("third_party_signed", None),
         )
 
         defer.returnValue((200, {"room_id": room_id}))
@@ -424,7 +432,12 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
         }:
             raise AuthError(403, "Guest access not allowed")
 
-        content = _parse_json(request)
+        try:
+            content = _parse_json(request)
+        except:
+            # Turns out we used to ignore the body entirely, and some clients
+            # cheekily send invalid bodies.
+            content = {}
 
         if membership_action == "invite" and self._has_3pid_invite_keys(content):
             yield self.handlers.room_member_handler.do_3pid_invite(
@@ -451,6 +464,7 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
             room_id=room_id,
             action=membership_action,
             txn_id=txn_id,
+            third_party_signed=content.get("third_party_signed", None),
         )
 
         defer.returnValue((200, {}))
@@ -490,6 +504,7 @@ class RoomRedactEventRestServlet(ClientV1RestServlet):
 
         msg_handler = self.handlers.message_handler
         event = yield msg_handler.create_and_send_nonmember_event(
+            requester,
             {
                 "type": EventTypes.Redaction,
                 "content": content,
@@ -497,7 +512,6 @@ class RoomRedactEventRestServlet(ClientV1RestServlet):
                 "sender": requester.user.to_string(),
                 "redacts": event_id,
             },
-            token_id=requester.access_token_id,
             txn_id=txn_id,
         )
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 9be1d12fac..f257721ea3 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -115,13 +115,13 @@ class DataStore(RoomMemberStore, RoomStore,
             db_conn, "presence_stream", "stream_id"
         )
 
-        self._transaction_id_gen = IdGenerator("sent_transactions", "id", self)
-        self._state_groups_id_gen = IdGenerator("state_groups", "id", self)
-        self._access_tokens_id_gen = IdGenerator("access_tokens", "id", self)
-        self._refresh_tokens_id_gen = IdGenerator("refresh_tokens", "id", self)
-        self._pushers_id_gen = IdGenerator("pushers", "id", self)
-        self._push_rule_id_gen = IdGenerator("push_rules", "id", self)
-        self._push_rules_enable_id_gen = IdGenerator("push_rules_enable", "id", self)
+        self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
+        self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
+        self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
+        self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
+        self._pushers_id_gen = IdGenerator(db_conn, "pushers", "id")
+        self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
+        self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
 
         events_max = self._stream_id_gen.get_max_token()
         event_cache_prefill, min_event_val = self._get_cache_dict(
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index 91cbf399b6..faddefe219 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -83,8 +83,40 @@ class AccountDataStore(SQLBaseStore):
             "get_account_data_for_room", get_account_data_for_room_txn
         )
 
-    def get_updated_account_data_for_user(self, user_id, stream_id, room_ids=None):
-        """Get all the client account_data for a that's changed.
+    def get_all_updated_account_data(self, last_global_id, last_room_id,
+                                     current_id, limit):
+        """Get all the client account_data that has changed on the server
+        Args:
+            last_global_id(int): The position to fetch from for top level data
+            last_room_id(int): The position to fetch from for per room data
+            current_id(int): The position to fetch up to.
+        Returns:
+            A deferred pair of lists of tuples of stream_id int, user_id string,
+            room_id string, type string, and content string.
+        """
+        def get_updated_account_data_txn(txn):
+            sql = (
+                "SELECT stream_id, user_id, account_data_type, content"
+                " FROM account_data WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_global_id, current_id, limit))
+            global_results = txn.fetchall()
+
+            sql = (
+                "SELECT stream_id, user_id, room_id, account_data_type, content"
+                " FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_room_id, current_id, limit))
+            room_results = txn.fetchall()
+            return (global_results, room_results)
+        return self.runInteraction(
+            "get_all_updated_account_data_txn", get_updated_account_data_txn
+        )
+
+    def get_updated_account_data_for_user(self, user_id, stream_id):
+        """Get all the client account_data for a that's changed for a user
 
         Args:
             user_id(str): The user to get the account_data for.
@@ -163,12 +195,12 @@ class AccountDataStore(SQLBaseStore):
             )
             self._update_max_stream_id(txn, next_id)
 
-        with (yield self._account_data_id_gen.get_next(self)) as next_id:
+        with self._account_data_id_gen.get_next() as next_id:
             yield self.runInteraction(
                 "add_room_account_data", add_account_data_txn, next_id
             )
 
-        result = yield self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_max_token()
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -202,12 +234,12 @@ class AccountDataStore(SQLBaseStore):
             )
             self._update_max_stream_id(txn, next_id)
 
-        with (yield self._account_data_id_gen.get_next(self)) as next_id:
+        with self._account_data_id_gen.get_next() as next_id:
             yield self.runInteraction(
                 "add_user_account_data", add_account_data_txn, next_id
             )
 
-        result = yield self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_max_token()
         defer.returnValue(result)
 
     def _update_max_stream_id(self, txn, next_id):
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 1100c67714..371600eebb 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -34,8 +34,8 @@ class ApplicationServiceStore(SQLBaseStore):
     def __init__(self, hs):
         super(ApplicationServiceStore, self).__init__(hs)
         self.hostname = hs.hostname
-        self.services_cache = []
-        self._populate_appservice_cache(
+        self.services_cache = ApplicationServiceStore.load_appservices(
+            hs.hostname,
             hs.config.app_service_config_files
         )
 
@@ -144,21 +144,23 @@ class ApplicationServiceStore(SQLBaseStore):
 
         return rooms_for_user_matching_user_id
 
-    def _load_appservice(self, as_info):
+    @classmethod
+    def _load_appservice(cls, hostname, as_info, config_filename):
         required_string_fields = [
-            # TODO: Add id here when it's stable to release
-            "url", "as_token", "hs_token", "sender_localpart"
+            "id", "url", "as_token", "hs_token", "sender_localpart"
         ]
         for field in required_string_fields:
             if not isinstance(as_info.get(field), basestring):
-                raise KeyError("Required string field: '%s'", field)
+                raise KeyError("Required string field: '%s' (%s)" % (
+                    field, config_filename,
+                ))
 
         localpart = as_info["sender_localpart"]
         if urllib.quote(localpart) != localpart:
             raise ValueError(
                 "sender_localpart needs characters which are not URL encoded."
             )
-        user = UserID(localpart, self.hostname)
+        user = UserID(localpart, hostname)
         user_id = user.to_string()
 
         # namespace checks
@@ -188,25 +190,30 @@ class ApplicationServiceStore(SQLBaseStore):
             namespaces=as_info["namespaces"],
             hs_token=as_info["hs_token"],
             sender=user_id,
-            id=as_info["id"] if "id" in as_info else as_info["as_token"],
+            id=as_info["id"],
         )
 
-    def _populate_appservice_cache(self, config_files):
-        """Populates a cache of Application Services from the config files."""
+    @classmethod
+    def load_appservices(cls, hostname, config_files):
+        """Returns a list of Application Services from the config files."""
         if not isinstance(config_files, list):
             logger.warning(
                 "Expected %s to be a list of AS config files.", config_files
             )
-            return
+            return []
 
         # Dicts of value -> filename
         seen_as_tokens = {}
         seen_ids = {}
 
+        appservices = []
+
         for config_file in config_files:
             try:
                 with open(config_file, 'r') as f:
-                    appservice = self._load_appservice(yaml.load(f))
+                    appservice = ApplicationServiceStore._load_appservice(
+                        hostname, yaml.load(f), config_file
+                    )
                     if appservice.id in seen_ids:
                         raise ConfigError(
                             "Cannot reuse ID across application services: "
@@ -226,11 +233,12 @@ class ApplicationServiceStore(SQLBaseStore):
                         )
                     seen_as_tokens[appservice.token] = config_file
                     logger.info("Loaded application service: %s", appservice)
-                    self.services_cache.append(appservice)
+                    appservices.append(appservice)
             except Exception as e:
                 logger.error("Failed to load appservice from '%s'", config_file)
                 logger.exception(e)
                 raise
+        return appservices
 
 
 class ApplicationServiceTransactionStore(SQLBaseStore):
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 1556619d5e..012a0b414a 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -70,13 +70,14 @@ class DirectoryStore(SQLBaseStore):
         )
 
     @defer.inlineCallbacks
-    def create_room_alias_association(self, room_alias, room_id, servers):
+    def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
         """ Creates an associatin between  a room alias and room_id/servers
 
         Args:
             room_alias (RoomAlias)
             room_id (str)
             servers (list)
+            creator (str): Optional user_id of creator.
 
         Returns:
             Deferred
@@ -87,6 +88,7 @@ class DirectoryStore(SQLBaseStore):
                 {
                     "room_alias": room_alias.to_string(),
                     "room_id": room_id,
+                    "creator": creator,
                 },
                 desc="create_room_alias_association",
             )
@@ -107,6 +109,17 @@ class DirectoryStore(SQLBaseStore):
             )
         self.get_aliases_for_room.invalidate((room_id,))
 
+    def get_room_alias_creator(self, room_alias):
+        return self._simple_select_one_onecol(
+            table="room_aliases",
+            keyvalues={
+                "room_alias": room_alias,
+            },
+            retcol="creator",
+            desc="get_room_alias_creator",
+            allow_none=True
+        )
+
     @defer.inlineCallbacks
     def delete_room_alias(self, room_alias):
         room_id = yield self.runInteraction(
diff --git a/synapse/storage/engines/__init__.py b/synapse/storage/engines/__init__.py
index 4290aea83a..a48230b93f 100644
--- a/synapse/storage/engines/__init__.py
+++ b/synapse/storage/engines/__init__.py
@@ -26,12 +26,13 @@ SUPPORTED_MODULE = {
 }
 
 
-def create_engine(name):
+def create_engine(config):
+    name = config.database_config["name"]
     engine_class = SUPPORTED_MODULE.get(name, None)
 
     if engine_class:
         module = importlib.import_module(name)
-        return engine_class(module)
+        return engine_class(module, config=config)
 
     raise RuntimeError(
         "Unsupported database engine '%s'" % (name,)
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 17b7a9c077..a09685b4df 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -21,9 +21,10 @@ from ._base import IncorrectDatabaseSetup
 class PostgresEngine(object):
     single_threaded = False
 
-    def __init__(self, database_module):
+    def __init__(self, database_module, config):
         self.module = database_module
         self.module.extensions.register_type(self.module.extensions.UNICODE)
+        self.config = config
 
     def check_database(self, txn):
         txn.execute("SHOW SERVER_ENCODING")
@@ -44,7 +45,7 @@ class PostgresEngine(object):
         )
 
     def prepare_database(self, db_conn):
-        prepare_database(db_conn, self)
+        prepare_database(db_conn, self, config=self.config)
 
     def is_deadlock(self, error):
         if isinstance(error, self.module.DatabaseError):
diff --git a/synapse/storage/engines/sqlite3.py b/synapse/storage/engines/sqlite3.py
index 91fac33b8b..522b905949 100644
--- a/synapse/storage/engines/sqlite3.py
+++ b/synapse/storage/engines/sqlite3.py
@@ -23,8 +23,9 @@ import struct
 class Sqlite3Engine(object):
     single_threaded = True
 
-    def __init__(self, database_module):
+    def __init__(self, database_module, config):
         self.module = database_module
+        self.config = config
 
     def check_database(self, txn):
         pass
@@ -38,7 +39,7 @@ class Sqlite3Engine(object):
 
     def prepare_database(self, db_conn):
         prepare_sqlite3_database(db_conn)
-        prepare_database(db_conn, self)
+        prepare_database(db_conn, self, config=self.config)
 
     def is_deadlock(self, error):
         return False
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 1dd3236829..60936500d8 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -75,8 +75,8 @@ class EventsStore(SQLBaseStore):
                 yield stream_orderings
             stream_ordering_manager = stream_ordering_manager()
         else:
-            stream_ordering_manager = yield self._stream_id_gen.get_next_mult(
-                self, len(events_and_contexts)
+            stream_ordering_manager = self._stream_id_gen.get_next_mult(
+                len(events_and_contexts)
             )
 
         with stream_ordering_manager as stream_orderings:
@@ -109,7 +109,7 @@ class EventsStore(SQLBaseStore):
             stream_ordering = self.min_stream_token
 
         if stream_ordering is None:
-            stream_ordering_manager = yield self._stream_id_gen.get_next(self)
+            stream_ordering_manager = self._stream_id_gen.get_next()
         else:
             @contextmanager
             def stream_ordering_manager():
@@ -1064,3 +1064,48 @@ class EventsStore(SQLBaseStore):
             yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
 
         defer.returnValue(result)
+
+    def get_current_backfill_token(self):
+        """The current minimum token that backfilled events have reached"""
+
+        # TODO: Fix race with the persit_event txn by using one of the
+        # stream id managers
+        return -self.min_stream_token
+
+    def get_all_new_events(self, last_backfill_id, last_forward_id,
+                           current_backfill_id, current_forward_id, limit):
+        """Get all the new events that have arrived at the server either as
+        new events or as backfilled events"""
+        def get_all_new_events_txn(txn):
+            sql = (
+                "SELECT e.stream_ordering, ej.internal_metadata, ej.json"
+                " FROM events as e"
+                " JOIN event_json as ej"
+                " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+                " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
+                " ORDER BY e.stream_ordering ASC"
+                " LIMIT ?"
+            )
+            if last_forward_id != current_forward_id:
+                txn.execute(sql, (last_forward_id, current_forward_id, limit))
+                new_forward_events = txn.fetchall()
+            else:
+                new_forward_events = []
+
+            sql = (
+                "SELECT -e.stream_ordering, ej.internal_metadata, ej.json"
+                " FROM events as e"
+                " JOIN event_json as ej"
+                " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
+                " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
+                " ORDER BY e.stream_ordering DESC"
+                " LIMIT ?"
+            )
+            if last_backfill_id != current_backfill_id:
+                txn.execute(sql, (-last_backfill_id, -current_backfill_id, limit))
+                new_backfill_events = txn.fetchall()
+            else:
+                new_backfill_events = []
+
+            return (new_forward_events, new_backfill_events)
+        return self.runInteraction("get_all_new_events", get_all_new_events_txn)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 0fd5d497ab..3f29aad1e8 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -50,7 +50,7 @@ class UpgradeDatabaseException(PrepareDatabaseException):
     pass
 
 
-def prepare_database(db_conn, database_engine):
+def prepare_database(db_conn, database_engine, config):
     """Prepares a database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
     """
@@ -61,10 +61,10 @@ def prepare_database(db_conn, database_engine):
         if version_info:
             user_version, delta_files, upgraded = version_info
             _upgrade_existing_database(
-                cur, user_version, delta_files, upgraded, database_engine
+                cur, user_version, delta_files, upgraded, database_engine, config
             )
         else:
-            _setup_new_database(cur, database_engine)
+            _setup_new_database(cur, database_engine, config)
 
         # cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,))
 
@@ -75,7 +75,7 @@ def prepare_database(db_conn, database_engine):
         raise
 
 
-def _setup_new_database(cur, database_engine):
+def _setup_new_database(cur, database_engine, config):
     """Sets up the database by finding a base set of "full schemas" and then
     applying any necessary deltas.
 
@@ -148,11 +148,12 @@ def _setup_new_database(cur, database_engine):
         applied_delta_files=[],
         upgraded=False,
         database_engine=database_engine,
+        config=config,
     )
 
 
 def _upgrade_existing_database(cur, current_version, applied_delta_files,
-                               upgraded, database_engine):
+                               upgraded, database_engine, config):
     """Upgrades an existing database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
@@ -245,7 +246,7 @@ def _upgrade_existing_database(cur, current_version, applied_delta_files,
                         module_name, absolute_path, python_file
                     )
                 logger.debug("Running script %s", relative_path)
-                module.run_upgrade(cur, database_engine)
+                module.run_upgrade(cur, database_engine, config=config)
             elif ext == ".pyc":
                 # Sometimes .pyc files turn up anyway even though we've
                 # disabled their generation; e.g. from distribution package
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 70ece56548..4cec31e316 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -58,17 +58,20 @@ class UserPresenceState(namedtuple("UserPresenceState",
 class PresenceStore(SQLBaseStore):
     @defer.inlineCallbacks
     def update_presence(self, presence_states):
-        stream_id_manager = yield self._presence_id_gen.get_next(self)
-        with stream_id_manager as stream_id:
+        stream_ordering_manager = self._presence_id_gen.get_next_mult(
+            len(presence_states)
+        )
+
+        with stream_ordering_manager as stream_orderings:
             yield self.runInteraction(
                 "update_presence",
-                self._update_presence_txn, stream_id, presence_states,
+                self._update_presence_txn, stream_orderings, presence_states,
             )
 
-        defer.returnValue((stream_id, self._presence_id_gen.get_max_token()))
+        defer.returnValue((stream_orderings[-1], self._presence_id_gen.get_max_token()))
 
-    def _update_presence_txn(self, txn, stream_id, presence_states):
-        for state in presence_states:
+    def _update_presence_txn(self, txn, stream_orderings, presence_states):
+        for stream_id, state in zip(stream_orderings, presence_states):
             txn.call_after(
                 self.presence_stream_cache.entity_has_changed,
                 state.user_id, stream_id,
@@ -112,6 +115,22 @@ class PresenceStore(SQLBaseStore):
                 args
             )
 
+    def get_all_presence_updates(self, last_id, current_id):
+        def get_all_presence_updates_txn(txn):
+            sql = (
+                "SELECT stream_id, user_id, state, last_active_ts,"
+                " last_federation_update_ts, last_user_sync_ts, status_msg,"
+                " currently_active"
+                " FROM presence_stream"
+                " WHERE ? < stream_id AND stream_id <= ?"
+            )
+            txn.execute(sql, (last_id, current_id))
+            return txn.fetchall()
+
+        return self.runInteraction(
+            "get_all_presence_updates", get_all_presence_updates_txn
+        )
+
     @defer.inlineCallbacks
     def get_presence_for_users(self, user_ids):
         rows = yield self._simple_select_many_batch(
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index e19a81e41f..56e69495b1 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -226,7 +226,7 @@ class PushRuleStore(SQLBaseStore):
 
         if txn.rowcount == 0:
             # We didn't update a row with the given rule_id so insert one
-            push_rule_id = self._push_rule_id_gen.get_next_txn(txn)
+            push_rule_id = self._push_rule_id_gen.get_next()
 
             self._simple_insert_txn(
                 txn,
@@ -279,7 +279,7 @@ class PushRuleStore(SQLBaseStore):
         defer.returnValue(ret)
 
     def _set_push_rule_enabled_txn(self, txn, user_id, rule_id, enabled):
-        new_id = self._push_rules_enable_id_gen.get_next_txn(txn)
+        new_id = self._push_rules_enable_id_gen.get_next()
         self._simple_upsert_txn(
             txn,
             "push_rules_enable",
@@ -294,6 +294,31 @@ class PushRuleStore(SQLBaseStore):
             self.get_push_rules_enabled_for_user.invalidate, (user_id,)
         )
 
+    def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule):
+        actions_json = json.dumps(actions)
+
+        def set_push_rule_actions_txn(txn):
+            if is_default_rule:
+                # Add a dummy rule to the rules table with the user specified
+                # actions.
+                priority_class = -1
+                priority = 1
+                self._upsert_push_rule_txn(
+                    txn, user_id, rule_id, priority_class, priority,
+                    "[]", actions_json
+                )
+            else:
+                self._simple_update_one_txn(
+                    txn,
+                    "push_rules",
+                    {'user_name': user_id, 'rule_id': rule_id},
+                    {'actions': actions_json},
+                )
+
+        return self.runInteraction(
+            "set_push_rule_actions", set_push_rule_actions_txn,
+        )
+
 
 class RuleNotFoundException(Exception):
     pass
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index c23648cdbc..7693ab9082 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -84,7 +84,7 @@ class PusherStore(SQLBaseStore):
                    app_display_name, device_display_name,
                    pushkey, pushkey_ts, lang, data, profile_tag=""):
         try:
-            next_id = yield self._pushers_id_gen.get_next()
+            next_id = self._pushers_id_gen.get_next()
             yield self._simple_upsert(
                 "pushers",
                 dict(
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index a7343c97f7..dbc074d6b5 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -330,7 +330,7 @@ class ReceiptsStore(SQLBaseStore):
                 "insert_receipt_conv", graph_to_linear
             )
 
-        stream_id_manager = yield self._receipts_id_gen.get_next(self)
+        stream_id_manager = self._receipts_id_gen.get_next()
         with stream_id_manager as stream_id:
             have_persisted = yield self.runInteraction(
                 "insert_linearized_receipt",
@@ -347,7 +347,7 @@ class ReceiptsStore(SQLBaseStore):
             room_id, receipt_type, user_id, event_ids, data
         )
 
-        max_persisted_id = yield self._stream_id_gen.get_max_token()
+        max_persisted_id = self._stream_id_gen.get_max_token()
 
         defer.returnValue((stream_id, max_persisted_id))
 
@@ -390,3 +390,19 @@ class ReceiptsStore(SQLBaseStore):
                 "data": json.dumps(data),
             }
         )
+
+    def get_all_updated_receipts(self, last_id, current_id, limit):
+        def get_all_updated_receipts_txn(txn):
+            sql = (
+                "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
+                " FROM receipts_linearized"
+                " WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC"
+                " LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+
+            return txn.fetchall()
+        return self.runInteraction(
+            "get_all_updated_receipts", get_all_updated_receipts_txn
+        )
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 967c732bda..ad1157f979 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -40,7 +40,7 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if there was a problem adding this.
         """
-        next_id = yield self._access_tokens_id_gen.get_next()
+        next_id = self._access_tokens_id_gen.get_next()
 
         yield self._simple_insert(
             "access_tokens",
@@ -62,7 +62,7 @@ class RegistrationStore(SQLBaseStore):
         Raises:
             StoreError if there was a problem adding this.
         """
-        next_id = yield self._refresh_tokens_id_gen.get_next()
+        next_id = self._refresh_tokens_id_gen.get_next()
 
         yield self._simple_insert(
             "refresh_tokens",
@@ -99,7 +99,7 @@ class RegistrationStore(SQLBaseStore):
     def _register(self, txn, user_id, token, password_hash, was_guest, make_guest):
         now = int(self.clock.time())
 
-        next_id = self._access_tokens_id_gen.get_next_txn(txn)
+        next_id = self._access_tokens_id_gen.get_next()
 
         try:
             if was_guest:
@@ -387,3 +387,47 @@ class RegistrationStore(SQLBaseStore):
             "find_next_generated_user_id",
             _find_next_generated_user_id
         )))
+
+    @defer.inlineCallbacks
+    def get_3pid_guest_access_token(self, medium, address):
+        ret = yield self._simple_select_one(
+            "threepid_guest_access_tokens",
+            {
+                "medium": medium,
+                "address": address
+            },
+            ["guest_access_token"], True, 'get_3pid_guest_access_token'
+        )
+        if ret:
+            defer.returnValue(ret["guest_access_token"])
+        defer.returnValue(None)
+
+    @defer.inlineCallbacks
+    def save_or_get_3pid_guest_access_token(
+            self, medium, address, access_token, inviter_user_id
+    ):
+        """
+        Gets the 3pid's guest access token if exists, else saves access_token.
+
+        :param medium (str): Medium of the 3pid. Must be "email".
+        :param address (str): 3pid address.
+        :param access_token (str): The access token to persist if none is
+            already persisted.
+        :param inviter_user_id (str): User ID of the inviter.
+        :return (deferred str): Whichever access token is persisted at the end
+            of this function call.
+        """
+        def insert(txn):
+            txn.execute(
+                "INSERT INTO threepid_guest_access_tokens "
+                "(medium, address, guest_access_token, first_inviter) "
+                "VALUES (?, ?, ?, ?)",
+                (medium, address, access_token, inviter_user_id)
+            )
+
+        try:
+            yield self.runInteraction("save_3pid_guest_access_token", insert)
+            defer.returnValue(access_token)
+        except self.database_engine.module.IntegrityError:
+            ret = yield self.get_3pid_guest_access_token(medium, address)
+            defer.returnValue(ret)
diff --git a/synapse/storage/schema/delta/30/alias_creator.sql b/synapse/storage/schema/delta/30/alias_creator.sql
new file mode 100644
index 0000000000..c9d0dde638
--- /dev/null
+++ b/synapse/storage/schema/delta/30/alias_creator.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE room_aliases ADD COLUMN creator TEXT;
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
new file mode 100644
index 0000000000..4cf4dd0917
--- /dev/null
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -0,0 +1,59 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from synapse.storage.appservice import ApplicationServiceStore
+
+
+logger = logging.getLogger(__name__)
+
+
+def run_upgrade(cur, database_engine, config, *args, **kwargs):
+    # NULL indicates user was not registered by an appservice.
+    cur.execute("ALTER TABLE users ADD COLUMN appservice_id TEXT")
+
+    cur.execute("SELECT name FROM users")
+    rows = cur.fetchall()
+
+    config_files = []
+    try:
+        config_files = config.app_service_config_files
+    except AttributeError:
+        logger.warning("Could not get app_service_config_files from config")
+        pass
+
+    appservices = ApplicationServiceStore.load_appservices(
+        config.server_name, config_files
+    )
+
+    owned = {}
+
+    for row in rows:
+        user_id = row[0]
+        for appservice in appservices:
+            if appservice.is_exclusive_user(user_id):
+                if user_id in owned.keys():
+                    logger.error(
+                        "user_id %s was owned by more than one application"
+                        " service (IDs %s and %s); assigning arbitrarily to %s" %
+                        (user_id, owned[user_id], appservice.id, owned[user_id])
+                    )
+                owned[user_id] = appservice.id
+
+    for user_id, as_id in owned.items():
+        cur.execute(
+            database_engine.convert_param_style(
+                "UPDATE users SET appservice_id = ? WHERE name = ?"
+            ),
+            (as_id, user_id)
+        )
diff --git a/synapse/storage/schema/delta/30/threepid_guest_access_tokens.sql b/synapse/storage/schema/delta/30/threepid_guest_access_tokens.sql
new file mode 100644
index 0000000000..0dd2f1360c
--- /dev/null
+++ b/synapse/storage/schema/delta/30/threepid_guest_access_tokens.sql
@@ -0,0 +1,24 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stores guest account access tokens generated for unbound 3pids.
+CREATE TABLE threepid_guest_access_tokens(
+    medium TEXT, -- The medium of the 3pid. Must be "email".
+    address TEXT, -- The 3pid address.
+    guest_access_token TEXT, -- The access token for a guest user for this 3pid.
+    first_inviter TEXT -- User ID of the first user to invite this 3pid to a room.
+);
+
+CREATE UNIQUE INDEX threepid_guest_access_tokens_index ON threepid_guest_access_tokens(medium, address);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 372b540002..8ed8a21b0a 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -83,7 +83,7 @@ class StateStore(SQLBaseStore):
             if event.is_state():
                 state_events[(event.type, event.state_key)] = event
 
-            state_group = self._state_groups_id_gen.get_next_txn(txn)
+            state_group = self._state_groups_id_gen.get_next()
             self._simple_insert_txn(
                 txn,
                 table="state_groups",
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 9551aa9739..a0e6b42b30 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -59,6 +59,59 @@ class TagsStore(SQLBaseStore):
         return deferred
 
     @defer.inlineCallbacks
+    def get_all_updated_tags(self, last_id, current_id, limit):
+        """Get all the client tags that have changed on the server
+        Args:
+            last_id(int): The position to fetch from.
+            current_id(int): The position to fetch up to.
+        Returns:
+            A deferred list of tuples of stream_id int, user_id string,
+            room_id string, tag string and content string.
+        """
+        def get_all_updated_tags_txn(txn):
+            sql = (
+                "SELECT stream_id, user_id, room_id"
+                " FROM room_tags_revisions as r"
+                " WHERE ? < stream_id AND stream_id <= ?"
+                " ORDER BY stream_id ASC LIMIT ?"
+            )
+            txn.execute(sql, (last_id, current_id, limit))
+            return txn.fetchall()
+
+        tag_ids = yield self.runInteraction(
+            "get_all_updated_tags", get_all_updated_tags_txn
+        )
+
+        def get_tag_content(txn, tag_ids):
+            sql = (
+                "SELECT tag, content"
+                " FROM room_tags"
+                " WHERE user_id=? AND room_id=?"
+            )
+            results = []
+            for stream_id, user_id, room_id in tag_ids:
+                txn.execute(sql, (user_id, room_id))
+                tags = []
+                for tag, content in txn.fetchall():
+                    tags.append(json.dumps(tag) + ":" + content)
+                tag_json = "{" + ",".join(tags) + "}"
+                results.append((stream_id, user_id, room_id, tag_json))
+
+            return results
+
+        batch_size = 50
+        results = []
+        for i in xrange(0, len(tag_ids), batch_size):
+            tags = yield self.runInteraction(
+                "get_all_updated_tag_content",
+                get_tag_content,
+                tag_ids[i:i + batch_size],
+            )
+            results.extend(tags)
+
+        defer.returnValue(results)
+
+    @defer.inlineCallbacks
     def get_updated_tags(self, user_id, stream_id):
         """Get all the tags for the rooms where the tags have changed since the
         given version
@@ -142,12 +195,12 @@ class TagsStore(SQLBaseStore):
             )
             self._update_revision_txn(txn, user_id, room_id, next_id)
 
-        with (yield self._account_data_id_gen.get_next(self)) as next_id:
+        with self._account_data_id_gen.get_next() as next_id:
             yield self.runInteraction("add_tag", add_tag_txn, next_id)
 
         self.get_tags_for_user.invalidate((user_id,))
 
-        result = yield self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_max_token()
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -164,12 +217,12 @@ class TagsStore(SQLBaseStore):
             txn.execute(sql, (user_id, room_id, tag))
             self._update_revision_txn(txn, user_id, room_id, next_id)
 
-        with (yield self._account_data_id_gen.get_next(self)) as next_id:
+        with self._account_data_id_gen.get_next() as next_id:
             yield self.runInteraction("remove_tag", remove_tag_txn, next_id)
 
         self.get_tags_for_user.invalidate((user_id,))
 
-        result = yield self._account_data_id_gen.get_max_token()
+        result = self._account_data_id_gen.get_max_token()
         defer.returnValue(result)
 
     def _update_revision_txn(self, txn, user_id, room_id, next_id):
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 4475c451c1..d338dfcf0a 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -117,7 +117,7 @@ class TransactionStore(SQLBaseStore):
     def _prep_send_transaction(self, txn, transaction_id, destination,
                                origin_server_ts):
 
-        next_id = self._transaction_id_gen.get_next_txn(txn)
+        next_id = self._transaction_id_gen.get_next()
 
         # First we find out what the prev_txns should be.
         # Since we know that we are only sending one transaction at a time,
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index ef5e4a4668..efe3f68e6e 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -13,51 +13,30 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
 from collections import deque
 import contextlib
 import threading
 
 
 class IdGenerator(object):
-    def __init__(self, table, column, store):
+    def __init__(self, db_conn, table, column):
         self.table = table
         self.column = column
-        self.store = store
         self._lock = threading.Lock()
-        self._next_id = None
+        cur = db_conn.cursor()
+        self._next_id = self._load_next_id(cur)
+        cur.close()
 
-    @defer.inlineCallbacks
-    def get_next(self):
-        if self._next_id is None:
-            yield self.store.runInteraction(
-                "IdGenerator_%s" % (self.table,),
-                self.get_next_txn,
-            )
+    def _load_next_id(self, txn):
+        txn.execute("SELECT MAX(%s) FROM %s" % (self.column, self.table,))
+        val, = txn.fetchone()
+        return val + 1 if val else 1
 
+    def get_next(self):
         with self._lock:
             i = self._next_id
             self._next_id += 1
-            defer.returnValue(i)
-
-    def get_next_txn(self, txn):
-        with self._lock:
-            if self._next_id:
-                i = self._next_id
-                self._next_id += 1
-                return i
-            else:
-                txn.execute(
-                    "SELECT MAX(%s) FROM %s" % (self.column, self.table,)
-                )
-
-                val, = txn.fetchone()
-                cur = val or 0
-                cur += 1
-                self._next_id = cur + 1
-
-                return cur
+            return i
 
 
 class StreamIdGenerator(object):
@@ -69,7 +48,7 @@ class StreamIdGenerator(object):
     persistence of events can complete out of order.
 
     Usage:
-        with stream_id_gen.get_next_txn(txn) as stream_id:
+        with stream_id_gen.get_next() as stream_id:
             # ... persist event ...
     """
     def __init__(self, db_conn, table, column):
@@ -79,15 +58,21 @@ class StreamIdGenerator(object):
         self._lock = threading.Lock()
 
         cur = db_conn.cursor()
-        self._current_max = self._get_or_compute_current_max(cur)
+        self._current_max = self._load_current_max(cur)
         cur.close()
 
         self._unfinished_ids = deque()
 
-    def get_next(self, store):
+    def _load_current_max(self, txn):
+        txn.execute("SELECT MAX(%s) FROM %s" % (self.column, self.table))
+        rows = txn.fetchall()
+        val, = rows[0]
+        return int(val) if val else 1
+
+    def get_next(self):
         """
         Usage:
-            with yield stream_id_gen.get_next as stream_id:
+            with stream_id_gen.get_next() as stream_id:
                 # ... persist event ...
         """
         with self._lock:
@@ -106,10 +91,10 @@ class StreamIdGenerator(object):
 
         return manager()
 
-    def get_next_mult(self, store, n):
+    def get_next_mult(self, n):
         """
         Usage:
-            with yield stream_id_gen.get_next(store, n) as stream_ids:
+            with stream_id_gen.get_next(n) as stream_ids:
                 # ... persist events ...
         """
         with self._lock:
@@ -139,13 +124,3 @@ class StreamIdGenerator(object):
                 return self._unfinished_ids[0] - 1
 
             return self._current_max
-
-    def _get_or_compute_current_max(self, txn):
-        with self._lock:
-            txn.execute("SELECT MAX(%s) FROM %s" % (self.column, self.table))
-            rows = txn.fetchall()
-            val, = rows[0]
-
-            self._current_max = int(val) if val else 1
-
-            return self._current_max
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 277854ccbc..35544b19fd 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -28,6 +28,7 @@ from twisted.internet import defer
 
 from collections import OrderedDict
 
+import os
 import functools
 import inspect
 import threading
@@ -38,6 +39,9 @@ logger = logging.getLogger(__name__)
 _CacheSentinel = object()
 
 
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
 class Cache(object):
 
     def __init__(self, name, max_entries=1000, keylen=1, lru=True, tree=False):
@@ -140,6 +144,8 @@ class CacheDescriptor(object):
     """
     def __init__(self, orig, max_entries=1000, num_args=1, lru=True, tree=False,
                  inlineCallbacks=False):
+        max_entries = int(max_entries * CACHE_SIZE_FACTOR)
+
         self.orig = orig
 
         if inlineCallbacks:
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 62cae99649..e863a8f8a9 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from synapse.util.caches import cache_counter, caches_by_name
+
 import logging
 
 
@@ -47,6 +49,8 @@ class ExpiringCache(object):
 
         self._cache = {}
 
+        caches_by_name[cache_name] = self._cache
+
     def start(self):
         if not self._expiry_ms:
             # Don't bother starting the loop if things never expire
@@ -72,7 +76,12 @@ class ExpiringCache(object):
                 self._cache.pop(k)
 
     def __getitem__(self, key):
-        entry = self._cache[key]
+        try:
+            entry = self._cache[key]
+            cache_counter.inc_hits(self._cache_name)
+        except KeyError:
+            cache_counter.inc_misses(self._cache_name)
+            raise
 
         if self._reset_expiry_on_get:
             entry.time = self._clock.time_msec()
@@ -105,9 +114,12 @@ class ExpiringCache(object):
 
         logger.debug(
             "[%s] _prune_cache before: %d, after len: %d",
-            self._cache_name, begin_length, len(self._cache.keys())
+            self._cache_name, begin_length, len(self._cache)
         )
 
+    def __len__(self):
+        return len(self._cache)
+
 
 class _CacheEntry(object):
     def __init__(self, time, value):
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b37f1c0725..a1aec7aa55 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -18,11 +18,15 @@ from synapse.util.caches import cache_counter, caches_by_name
 
 from blist import sorteddict
 import logging
+import os
 
 
 logger = logging.getLogger(__name__)
 
 
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
 class StreamChangeCache(object):
     """Keeps track of the stream positions of the latest change in a set of entities.
 
@@ -33,7 +37,7 @@ class StreamChangeCache(object):
     old then the cache will simply return all given entities.
     """
     def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
-        self._max_size = max_size
+        self._max_size = int(max_size * CACHE_SIZE_FACTOR)
         self._entity_to_key = {}
         self._cache = sorteddict()
         self._earliest_known_stream_pos = current_stream_pos
@@ -85,6 +89,22 @@ class StreamChangeCache(object):
 
         return result
 
+    def get_all_entities_changed(self, stream_pos):
+        """Returns all entites that have had new things since the given
+        position. If the position is too old it will return None.
+        """
+        assert type(stream_pos) is int
+
+        if stream_pos >= self._earliest_known_stream_pos:
+            keys = self._cache.keys()
+            i = keys.bisect_right(stream_pos)
+
+            return (
+                self._cache[k] for k in keys[i:]
+            )
+        else:
+            return None
+
     def entity_has_changed(self, entity, stream_pos):
         """Informs the cache that the entity has been changed at the given
         position.