summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2016-03-27 22:54:42 +0100
committerMatthew Hodgson <matthew@matrix.org>2016-03-27 22:54:42 +0100
commitd9d48aad2d58deb5db422a5373a4dac9334a0618 (patch)
tree63e51372ca9ace4971403928bd46440ff9e455e2 /synapse/handlers
parentinitial WIP of a tentative preview_url endpoint - incomplete, untested, exper... (diff)
parenttypo (diff)
downloadsynapse-d9d48aad2d58deb5db422a5373a4dac9334a0618.tar.xz
Merge branch 'develop' into matthew/preview_urls
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/_base.py253
-rw-r--r--synapse/handlers/auth.py85
-rw-r--r--synapse/handlers/directory.py104
-rw-r--r--synapse/handlers/events.py108
-rw-r--r--synapse/handlers/federation.py186
-rw-r--r--synapse/handlers/identity.py26
-rw-r--r--synapse/handlers/message.py153
-rw-r--r--synapse/handlers/presence.py1817
-rw-r--r--synapse/handlers/profile.py48
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/register.py115
-rw-r--r--synapse/handlers/room.py860
-rw-r--r--synapse/handlers/sync.py882
-rw-r--r--synapse/handlers/typing.py37
14 files changed, 2531 insertions, 2145 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 744a9ee507..90eabb6eb7 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -18,7 +18,7 @@ from twisted.internet import defer
 from synapse.api.errors import LimitExceededError, SynapseError, AuthError
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.api.constants import Membership, EventTypes
-from synapse.types import UserID, RoomAlias
+from synapse.types import UserID, RoomAlias, Requester
 from synapse.push.action_generator import ActionGenerator
 
 from synapse.util.logcontext import PreserveLoggingContext
@@ -29,6 +29,14 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+VISIBILITY_PRIORITY = (
+    "world_readable",
+    "shared",
+    "invited",
+    "joined",
+)
+
+
 class BaseHandler(object):
     """
     Common base class for the event handlers.
@@ -53,25 +61,16 @@ class BaseHandler(object):
         self.event_builder_factory = hs.get_event_builder_factory()
 
     @defer.inlineCallbacks
-    def _filter_events_for_clients(self, user_tuples, events):
+    def filter_events_for_clients(self, user_tuples, events, event_id_to_state):
         """ Returns dict of user_id -> list of events that user is allowed to
         see.
-        """
-        # If there is only one user, just get the state for that one user,
-        # otherwise just get all the state.
-        if len(user_tuples) == 1:
-            types = (
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, user_tuples[0][0]),
-            )
-        else:
-            types = None
-
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=types
-        )
 
+        :param (str, bool) user_tuples: (user id, is_peeking) for each
+            user to be checked. is_peeking should be true if:
+              * the user is not currently a member of the room, and:
+              * the user has not been a member of the room since the given
+                events
+        """
         forgotten = yield defer.gatherResults([
             self.store.who_forgot_in_room(
                 room_id,
@@ -87,18 +86,38 @@ class BaseHandler(object):
         def allowed(event, user_id, is_peeking):
             state = event_id_to_state[event.event_id]
 
+            # get the room_visibility at the time of the event.
             visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
             if visibility_event:
                 visibility = visibility_event.content.get("history_visibility", "shared")
             else:
                 visibility = "shared"
 
+            if visibility not in VISIBILITY_PRIORITY:
+                visibility = "shared"
+
+            # if it was world_readable, it's easy: everyone can read it
             if visibility == "world_readable":
                 return True
 
-            if is_peeking:
-                return False
+            # Always allow history visibility events on boundaries. This is done
+            # by setting the effective visibility to the least restrictive
+            # of the old vs new.
+            if event.type == EventTypes.RoomHistoryVisibility:
+                prev_content = event.unsigned.get("prev_content", {})
+                prev_visibility = prev_content.get("history_visibility", None)
 
+                if prev_visibility not in VISIBILITY_PRIORITY:
+                    prev_visibility = "shared"
+
+                new_priority = VISIBILITY_PRIORITY.index(visibility)
+                old_priority = VISIBILITY_PRIORITY.index(prev_visibility)
+                if old_priority < new_priority:
+                    visibility = prev_visibility
+
+            # get the user's membership at the time of the event. (or rather,
+            # just *after* the event. Which means that people can see their
+            # own join events, but not (currently) their own leave events.)
             membership_event = state.get((EventTypes.Member, user_id), None)
             if membership_event:
                 if membership_event.event_id in event_id_forgotten:
@@ -108,20 +127,29 @@ class BaseHandler(object):
             else:
                 membership = None
 
+            # if the user was a member of the room at the time of the event,
+            # they can see it.
             if membership == Membership.JOIN:
                 return True
 
-            if event.type == EventTypes.RoomHistoryVisibility:
-                return not is_peeking
+            if visibility == "joined":
+                # we weren't a member at the time of the event, so we can't
+                # see this event.
+                return False
 
-            if visibility == "shared":
-                return True
-            elif visibility == "joined":
-                return membership == Membership.JOIN
             elif visibility == "invited":
+                # user can also see the event if they were *invited* at the time
+                # of the event.
                 return membership == Membership.INVITE
 
-            return True
+            else:
+                # visibility is shared: user can also see the event if they have
+                # become a member since the event
+                #
+                # XXX: if the user has subsequently joined and then left again,
+                # ideally we would share history up to the point they left. But
+                # we don't know when they left.
+                return not is_peeking
 
         defer.returnValue({
             user_id: [
@@ -134,25 +162,45 @@ class BaseHandler(object):
 
     @defer.inlineCallbacks
     def _filter_events_for_client(self, user_id, events, is_peeking=False):
-        # Assumes that user has at some point joined the room if not is_guest.
-        res = yield self._filter_events_for_clients([(user_id, is_peeking)], events)
+        """
+        Check which events a user is allowed to see
+
+        :param str user_id: user id to be checked
+        :param [synapse.events.EventBase] events: list of events to be checked
+        :param bool is_peeking should be True if:
+              * the user is not currently a member of the room, and:
+              * the user has not been a member of the room since the given
+                events
+        :rtype [synapse.events.EventBase]
+        """
+        types = (
+            (EventTypes.RoomHistoryVisibility, ""),
+            (EventTypes.Member, user_id),
+        )
+        event_id_to_state = yield self.store.get_state_for_events(
+            frozenset(e.event_id for e in events),
+            types=types
+        )
+        res = yield self.filter_events_for_clients(
+            [(user_id, is_peeking)], events, event_id_to_state
+        )
         defer.returnValue(res.get(user_id, []))
 
-    def ratelimit(self, user_id):
+    def ratelimit(self, requester):
         time_now = self.clock.time()
         allowed, time_allowed = self.ratelimiter.send_message(
-            user_id, time_now,
+            requester.user.to_string(), time_now,
             msg_rate_hz=self.hs.config.rc_messages_per_second,
             burst_count=self.hs.config.rc_message_burst_count,
         )
         if not allowed:
             raise LimitExceededError(
-                retry_after_ms=int(1000*(time_allowed - time_now)),
+                retry_after_ms=int(1000 * (time_allowed - time_now)),
             )
 
     @defer.inlineCallbacks
     def _create_new_client_event(self, builder):
-        latest_ret = yield self.store.get_latest_events_in_room(
+        latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
             builder.room_id,
         )
 
@@ -161,7 +209,10 @@ class BaseHandler(object):
         else:
             depth = 1
 
-        prev_events = [(e, h) for e, h, _ in latest_ret]
+        prev_events = [
+            (event_id, prev_hashes)
+            for event_id, prev_hashes, _ in latest_ret
+        ]
 
         builder.prev_events = prev_events
         builder.depth = depth
@@ -170,6 +221,50 @@ class BaseHandler(object):
 
         context = yield state_handler.compute_event_context(builder)
 
+        # If we've received an invite over federation, there are no latest
+        # events in the room, because we don't know enough about the graph
+        # fragment we received to treat it like a graph, so the above returned
+        # no relevant events. It may have returned some events (if we have
+        # joined and left the room), but not useful ones, like the invite.
+        if (
+            not self.is_host_in_room(context.current_state) and
+            builder.type == EventTypes.Member
+        ):
+            prev_member_event = yield self.store.get_room_member(
+                builder.sender, builder.room_id
+            )
+
+            # The prev_member_event may already be in context.current_state,
+            # despite us not being present in the room; in particular, if
+            # inviting user, and all other local users, have already left.
+            #
+            # In that case, we have all the information we need, and we don't
+            # want to drop "context" - not least because we may need to handle
+            # the invite locally, which will require us to have the whole
+            # context (not just prev_member_event) to auth it.
+            #
+            context_event_ids = (
+                e.event_id for e in context.current_state.values()
+            )
+
+            if (
+                prev_member_event and
+                prev_member_event.event_id not in context_event_ids
+            ):
+                # The prev_member_event is missing from context, so it must
+                # have arrived over federation and is an outlier. We forcibly
+                # set our context to the invite we received over federation
+                builder.prev_events = (
+                    prev_member_event.event_id,
+                    prev_member_event.prev_events
+                )
+
+                context = yield state_handler.compute_event_context(
+                    builder,
+                    old_state=(prev_member_event,),
+                    outlier=True
+                )
+
         if builder.is_state():
             builder.prev_state = yield self.store.add_event_hashes(
                 context.prev_state_events
@@ -192,10 +287,40 @@ class BaseHandler(object):
             (event, context,)
         )
 
+    def is_host_in_room(self, current_state):
+        room_members = [
+            (state_key, event.membership)
+            for ((event_type, state_key), event) in current_state.items()
+            if event_type == EventTypes.Member
+        ]
+        if len(room_members) == 0:
+            # Have we just created the room, and is this about to be the very
+            # first member event?
+            create_event = current_state.get(("m.room.create", ""))
+            if create_event:
+                return True
+        for (state_key, membership) in room_members:
+            if (
+                UserID.from_string(state_key).domain == self.hs.hostname
+                and membership == Membership.JOIN
+            ):
+                return True
+        return False
+
     @defer.inlineCallbacks
-    def handle_new_client_event(self, event, context, extra_users=[]):
+    def handle_new_client_event(
+        self,
+        requester,
+        event,
+        context,
+        ratelimit=True,
+        extra_users=[]
+    ):
         # We now need to go and hit out to wherever we need to hit out to.
 
+        if ratelimit:
+            self.ratelimit(requester)
+
         self.auth.check(event, auth_events=context.current_state)
 
         yield self.maybe_kick_guest_users(event, context.current_state.values())
@@ -220,6 +345,12 @@ class BaseHandler(object):
 
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.INVITE:
+                def is_inviter_member_event(e):
+                    return (
+                        e.type == EventTypes.Member and
+                        e.sender == event.sender
+                    )
+
                 event.unsigned["invite_room_state"] = [
                     {
                         "type": e.type,
@@ -228,12 +359,8 @@ class BaseHandler(object):
                         "sender": e.sender,
                     }
                     for k, e in context.current_state.items()
-                    if e.type in (
-                        EventTypes.JoinRules,
-                        EventTypes.CanonicalAlias,
-                        EventTypes.RoomAvatar,
-                        EventTypes.Name,
-                    )
+                    if e.type in self.hs.config.room_invite_state_types
+                    or is_inviter_member_event(e)
                 ]
 
                 invitee = UserID.from_string(event.state_key)
@@ -269,13 +396,19 @@ class BaseHandler(object):
                         "You don't have permission to redact events"
                     )
 
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
-            event, context=context
-        )
+        if event.type == EventTypes.Create and context.current_state:
+            raise AuthError(
+                403,
+                "Changing the room create event is forbidden",
+            )
 
         action_generator = ActionGenerator(self.hs)
         yield action_generator.handle_push_actions_for_event(
-            event, self
+            event, context, self
+        )
+
+        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+            event, context=context
         )
 
         destinations = set()
@@ -293,19 +426,11 @@ class BaseHandler(object):
 
         with PreserveLoggingContext():
             # Don't block waiting on waking up all the listeners.
-            notify_d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id,
                 extra_users=extra_users
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        notify_d.addErrback(log_failure)
-
         # If invite, remove room_state from unsigned before sending.
         event.unsigned.pop("invite_room_state", None)
 
@@ -329,7 +454,8 @@ class BaseHandler(object):
                 if member_event.type != EventTypes.Member:
                     continue
 
-                if not self.hs.is_mine(UserID.from_string(member_event.state_key)):
+                target_user = UserID.from_string(member_event.state_key)
+                if not self.hs.is_mine(target_user):
                     continue
 
                 if member_event.content["membership"] not in {
@@ -351,18 +477,13 @@ class BaseHandler(object):
                 # and having homeservers have their own users leave keeps more
                 # of that decision-making and control local to the guest-having
                 # homeserver.
-                message_handler = self.hs.get_handlers().message_handler
-                yield message_handler.create_and_send_event(
-                    {
-                        "type": EventTypes.Member,
-                        "state_key": member_event.state_key,
-                        "content": {
-                            "membership": Membership.LEAVE,
-                            "kind": "guest"
-                        },
-                        "room_id": member_event.room_id,
-                        "sender": member_event.state_key
-                    },
+                requester = Requester(target_user, "", True)
+                handler = self.hs.get_handlers().room_member_handler
+                yield handler.update_membership(
+                    requester,
+                    target_user,
+                    member_event.room_id,
+                    "leave",
                     ratelimit=False,
                 )
             except Exception as e:
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 62e82a2570..82d458b424 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -35,6 +35,7 @@ logger = logging.getLogger(__name__)
 
 
 class AuthHandler(BaseHandler):
+    SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000
 
     def __init__(self, hs):
         super(AuthHandler, self).__init__(hs)
@@ -66,15 +67,18 @@ class AuthHandler(BaseHandler):
                         'auth' key: this method prompts for auth if none is sent.
             clientip (str): The IP address of the client.
         Returns:
-            A tuple of (authed, dict, dict) where authed is true if the client
-            has successfully completed an auth flow. If it is true, the first
-            dict contains the authenticated credentials of each stage.
+            A tuple of (authed, dict, dict, session_id) where authed is true if
+            the client has successfully completed an auth flow. If it is true
+            the first dict contains the authenticated credentials of each stage.
 
             If authed is false, the first dictionary is the server response to
             the login request and should be passed back to the client.
 
             In either case, the second dict contains the parameters for this
             request (which may have been given only in a previous call).
+
+            session_id is the ID of this session, either passed in by the client
+            or assigned by the call to check_auth
         """
 
         authdict = None
@@ -103,7 +107,10 @@ class AuthHandler(BaseHandler):
 
         if not authdict:
             defer.returnValue(
-                (False, self._auth_dict_for_flows(flows, session), clientdict)
+                (
+                    False, self._auth_dict_for_flows(flows, session),
+                    clientdict, session['id']
+                )
             )
 
         if 'creds' not in session:
@@ -122,12 +129,11 @@ class AuthHandler(BaseHandler):
         for f in flows:
             if len(set(f) - set(creds.keys())) == 0:
                 logger.info("Auth completed with creds: %r", creds)
-                self._remove_session(session)
-                defer.returnValue((True, creds, clientdict))
+                defer.returnValue((True, creds, clientdict, session['id']))
 
         ret = self._auth_dict_for_flows(flows, session)
         ret['completed'] = creds.keys()
-        defer.returnValue((False, ret, clientdict))
+        defer.returnValue((False, ret, clientdict, session['id']))
 
     @defer.inlineCallbacks
     def add_oob_auth(self, stagetype, authdict, clientip):
@@ -154,6 +160,43 @@ class AuthHandler(BaseHandler):
             defer.returnValue(True)
         defer.returnValue(False)
 
+    def get_session_id(self, clientdict):
+        """
+        Gets the session ID for a client given the client dictionary
+        :param clientdict: The dictionary sent by the client in the request
+        :return: The string session ID the client sent. If the client did not
+                 send a session ID, returns None.
+        """
+        sid = None
+        if clientdict and 'auth' in clientdict:
+            authdict = clientdict['auth']
+            if 'session' in authdict:
+                sid = authdict['session']
+        return sid
+
+    def set_session_data(self, session_id, key, value):
+        """
+        Store a key-value pair into the sessions data associated with this
+        request. This data is stored server-side and cannot be modified by
+        the client.
+        :param session_id: (string) The ID of this session as returned from check_auth
+        :param key: (string) The key to store the data under
+        :param value: (any) The data to store
+        """
+        sess = self._get_session_info(session_id)
+        sess.setdefault('serverdict', {})[key] = value
+        self._save_session(sess)
+
+    def get_session_data(self, session_id, key, default=None):
+        """
+        Retrieve data stored with set_session_data
+        :param session_id: (string) The ID of this session as returned from check_auth
+        :param key: (string) The key to store the data under
+        :param default: (any) Value to return if the key has not been set
+        """
+        sess = self._get_session_info(session_id)
+        return sess.setdefault('serverdict', {}).get(key, default)
+
     @defer.inlineCallbacks
     def _check_password_auth(self, authdict, _):
         if "user" not in authdict or "password" not in authdict:
@@ -432,13 +475,18 @@ class AuthHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def set_password(self, user_id, newpassword):
+    def set_password(self, user_id, newpassword, requester=None):
         password_hash = self.hash(newpassword)
 
+        except_access_token_ids = [requester.access_token_id] if requester else []
+
         yield self.store.user_set_password_hash(user_id, password_hash)
-        yield self.store.user_delete_access_tokens(user_id)
-        yield self.hs.get_pusherpool().remove_pushers_by_user(user_id)
-        yield self.store.flush_user(user_id)
+        yield self.store.user_delete_access_tokens(
+            user_id, except_access_token_ids
+        )
+        yield self.hs.get_pusherpool().remove_pushers_by_user(
+            user_id, except_access_token_ids
+        )
 
     @defer.inlineCallbacks
     def add_threepid(self, user_id, medium, address, validated_at):
@@ -450,11 +498,18 @@ class AuthHandler(BaseHandler):
     def _save_session(self, session):
         # TODO: Persistent storage
         logger.debug("Saving session %s", session)
+        session["last_used"] = self.hs.get_clock().time_msec()
         self.sessions[session["id"]] = session
+        self._prune_sessions()
 
-    def _remove_session(self, session):
-        logger.debug("Removing session %s", session)
-        del self.sessions[session["id"]]
+    def _prune_sessions(self):
+        for sid, sess in self.sessions.items():
+            last_used = 0
+            if 'last_used' in sess:
+                last_used = sess['last_used']
+            now = self.hs.get_clock().time_msec()
+            if last_used < now - AuthHandler.SESSION_EXPIRE_MS:
+                del self.sessions[sid]
 
     def hash(self, password):
         """Computes a secure hash of password.
@@ -477,4 +532,4 @@ class AuthHandler(BaseHandler):
         Returns:
             Whether self.hash(password) == stored_hash (bool).
         """
-        return bcrypt.checkpw(password, stored_hash)
+        return bcrypt.hashpw(password, stored_hash) == stored_hash
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 691564c651..6bcc5a5e2b 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -17,9 +17,9 @@
 from twisted.internet import defer
 from ._base import BaseHandler
 
-from synapse.api.errors import SynapseError, Codes, CodeMessageException
+from synapse.api.errors import SynapseError, Codes, CodeMessageException, AuthError
 from synapse.api.constants import EventTypes
-from synapse.types import RoomAlias
+from synapse.types import RoomAlias, UserID
 
 import logging
 import string
@@ -32,13 +32,15 @@ class DirectoryHandler(BaseHandler):
     def __init__(self, hs):
         super(DirectoryHandler, self).__init__(hs)
 
+        self.state = hs.get_state_handler()
+
         self.federation = hs.get_replication_layer()
         self.federation.register_query_handler(
             "directory", self.on_directory_query
         )
 
     @defer.inlineCallbacks
-    def _create_association(self, room_alias, room_id, servers=None):
+    def _create_association(self, room_alias, room_id, servers=None, creator=None):
         # general association creation for both human users and app services
 
         for wchar in string.whitespace:
@@ -60,7 +62,8 @@ class DirectoryHandler(BaseHandler):
         yield self.store.create_room_alias_association(
             room_alias,
             room_id,
-            servers
+            servers,
+            creator=creator,
         )
 
     @defer.inlineCallbacks
@@ -77,7 +80,7 @@ class DirectoryHandler(BaseHandler):
                 400, "This alias is reserved by an application service.",
                 errcode=Codes.EXCLUSIVE
             )
-        yield self._create_association(room_alias, room_id, servers)
+        yield self._create_association(room_alias, room_id, servers, creator=user_id)
 
     @defer.inlineCallbacks
     def create_appservice_association(self, service, room_alias, room_id,
@@ -92,10 +95,14 @@ class DirectoryHandler(BaseHandler):
         yield self._create_association(room_alias, room_id, servers)
 
     @defer.inlineCallbacks
-    def delete_association(self, user_id, room_alias):
+    def delete_association(self, requester, user_id, room_alias):
         # association deletion for human users
 
-        # TODO Check if server admin
+        can_delete = yield self._user_can_delete_alias(room_alias, user_id)
+        if not can_delete:
+            raise AuthError(
+                403, "You don't have permission to delete the alias.",
+            )
 
         can_delete = yield self.can_modify_alias(
             room_alias,
@@ -107,7 +114,25 @@ class DirectoryHandler(BaseHandler):
                 errcode=Codes.EXCLUSIVE
             )
 
-        yield self._delete_association(room_alias)
+        room_id = yield self._delete_association(room_alias)
+
+        try:
+            yield self.send_room_alias_update_event(
+                requester,
+                requester.user.to_string(),
+                room_id
+            )
+
+            yield self._update_canonical_alias(
+                requester,
+                requester.user.to_string(),
+                room_id,
+                room_alias,
+            )
+        except AuthError as e:
+            logger.info("Failed to update alias events: %s", e)
+
+        defer.returnValue(room_id)
 
     @defer.inlineCallbacks
     def delete_appservice_association(self, service, room_alias):
@@ -124,11 +149,9 @@ class DirectoryHandler(BaseHandler):
         if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room alias must be local")
 
-        yield self.store.delete_room_alias(room_alias)
+        room_id = yield self.store.delete_room_alias(room_alias)
 
-        # TODO - Looks like _update_room_alias_event has never been implemented
-        # if room_id:
-        #    yield self._update_room_alias_events(user_id, room_id)
+        defer.returnValue(room_id)
 
     @defer.inlineCallbacks
     def get_association(self, room_alias):
@@ -175,8 +198,8 @@ class DirectoryHandler(BaseHandler):
         # If this server is in the list of servers, return it first.
         if self.server_name in servers:
             servers = (
-                [self.server_name]
-                + [s for s in servers if s != self.server_name]
+                [self.server_name] +
+                [s for s in servers if s != self.server_name]
             )
         else:
             servers = list(servers)
@@ -212,17 +235,44 @@ class DirectoryHandler(BaseHandler):
             )
 
     @defer.inlineCallbacks
-    def send_room_alias_update_event(self, user_id, room_id):
+    def send_room_alias_update_event(self, requester, user_id, room_id):
         aliases = yield self.store.get_aliases_for_room(room_id)
 
         msg_handler = self.hs.get_handlers().message_handler
-        yield msg_handler.create_and_send_event({
-            "type": EventTypes.Aliases,
-            "state_key": self.hs.hostname,
-            "room_id": room_id,
-            "sender": user_id,
-            "content": {"aliases": aliases},
-        }, ratelimit=False)
+        yield msg_handler.create_and_send_nonmember_event(
+            requester,
+            {
+                "type": EventTypes.Aliases,
+                "state_key": self.hs.hostname,
+                "room_id": room_id,
+                "sender": user_id,
+                "content": {"aliases": aliases},
+            },
+            ratelimit=False
+        )
+
+    @defer.inlineCallbacks
+    def _update_canonical_alias(self, requester, user_id, room_id, room_alias):
+        alias_event = yield self.state.get_current_state(
+            room_id, EventTypes.CanonicalAlias, ""
+        )
+
+        alias_str = room_alias.to_string()
+        if not alias_event or alias_event.content.get("alias", "") != alias_str:
+            return
+
+        msg_handler = self.hs.get_handlers().message_handler
+        yield msg_handler.create_and_send_nonmember_event(
+            requester,
+            {
+                "type": EventTypes.CanonicalAlias,
+                "state_key": "",
+                "room_id": room_id,
+                "sender": user_id,
+                "content": {},
+            },
+            ratelimit=False
+        )
 
     @defer.inlineCallbacks
     def get_association_from_room_alias(self, room_alias):
@@ -257,3 +307,13 @@ class DirectoryHandler(BaseHandler):
                 return
         # either no interested services, or no service with an exclusive lock
         defer.returnValue(True)
+
+    @defer.inlineCallbacks
+    def _user_can_delete_alias(self, alias, user_id):
+        creator = yield self.store.get_room_alias_creator(alias.to_string())
+
+        if creator and creator == user_id:
+            defer.returnValue(True)
+
+        is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
+        defer.returnValue(is_admin)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 254b483da6..f25a252523 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -18,6 +18,8 @@ from twisted.internet import defer
 from synapse.util.logutils import log_function
 from synapse.types import UserID
 from synapse.events.utils import serialize_event
+from synapse.api.constants import Membership, EventTypes
+from synapse.events import EventBase
 
 from ._base import BaseHandler
 
@@ -28,14 +30,6 @@ import random
 logger = logging.getLogger(__name__)
 
 
-def started_user_eventstream(distributor, user):
-    return distributor.fire("started_user_eventstream", user)
-
-
-def stopped_user_eventstream(distributor, user):
-    return distributor.fire("stopped_user_eventstream", user)
-
-
 class EventStreamHandler(BaseHandler):
 
     def __init__(self, hs):
@@ -55,61 +49,6 @@ class EventStreamHandler(BaseHandler):
         self.notifier = hs.get_notifier()
 
     @defer.inlineCallbacks
-    def started_stream(self, user):
-        """Tells the presence handler that we have started an eventstream for
-        the user:
-
-        Args:
-            user (User): The user who started a stream.
-        Returns:
-            A deferred that completes once their presence has been updated.
-        """
-        if user not in self._streams_per_user:
-            # Make sure we set the streams per user to 1 here rather than
-            # setting it to zero and incrementing the value below.
-            # Otherwise this may race with stopped_stream causing the
-            # user to be erased from the map before we have a chance
-            # to increment it.
-            self._streams_per_user[user] = 1
-            if user in self._stop_timer_per_user:
-                try:
-                    self.clock.cancel_call_later(
-                        self._stop_timer_per_user.pop(user)
-                    )
-                except:
-                    logger.exception("Failed to cancel event timer")
-            else:
-                yield started_user_eventstream(self.distributor, user)
-        else:
-            self._streams_per_user[user] += 1
-
-    def stopped_stream(self, user):
-        """If there are no streams for a user this starts a timer that will
-        notify the presence handler that we haven't got an event stream for
-        the user unless the user starts a new stream in 30 seconds.
-
-        Args:
-            user (User): The user who stopped a stream.
-        """
-        self._streams_per_user[user] -= 1
-        if not self._streams_per_user[user]:
-            del self._streams_per_user[user]
-
-            # 30 seconds of grace to allow the client to reconnect again
-            #   before we think they're gone
-            def _later():
-                logger.debug("_later stopped_user_eventstream %s", user)
-
-                self._stop_timer_per_user.pop(user, None)
-
-                return stopped_user_eventstream(self.distributor, user)
-
-            logger.debug("Scheduling _later: for %s", user)
-            self._stop_timer_per_user[user] = (
-                self.clock.call_later(30, _later)
-            )
-
-    @defer.inlineCallbacks
     @log_function
     def get_stream(self, auth_user_id, pagin_config, timeout=0,
                    as_client_event=True, affect_presence=True,
@@ -119,18 +58,19 @@ class EventStreamHandler(BaseHandler):
         If `only_keys` is not None, events from keys will be sent down.
         """
         auth_user = UserID.from_string(auth_user_id)
+        presence_handler = self.hs.get_handlers().presence_handler
 
-        try:
-            if affect_presence:
-                yield self.started_stream(auth_user)
-
+        context = yield presence_handler.user_syncing(
+            auth_user_id, affect_presence=affect_presence,
+        )
+        with context:
             if timeout:
                 # If they've set a timeout set a minimum limit.
                 timeout = max(timeout, 500)
 
                 # Add some randomness to this value to try and mitigate against
                 # thundering herds on restart.
-                timeout = random.randint(int(timeout*0.9), int(timeout*1.1))
+                timeout = random.randint(int(timeout * 0.9), int(timeout * 1.1))
 
             events, tokens = yield self.notifier.get_events_for(
                 auth_user, pagin_config, timeout,
@@ -138,6 +78,34 @@ class EventStreamHandler(BaseHandler):
                 is_guest=is_guest, explicit_room_id=room_id
             )
 
+            # When the user joins a new room, or another user joins a currently
+            # joined room, we need to send down presence for those users.
+            to_add = []
+            for event in events:
+                if not isinstance(event, EventBase):
+                    continue
+                if event.type == EventTypes.Member:
+                    if event.membership != Membership.JOIN:
+                        continue
+                    # Send down presence.
+                    if event.state_key == auth_user_id:
+                        # Send down presence for everyone in the room.
+                        users = yield self.store.get_users_in_room(event.room_id)
+                        states = yield presence_handler.get_states(
+                            users,
+                            as_event=True,
+                        )
+                        to_add.extend(states)
+                    else:
+
+                        ev = yield presence_handler.get_state(
+                            UserID.from_string(event.state_key),
+                            as_event=True,
+                        )
+                        to_add.append(ev)
+
+            events.extend(to_add)
+
             time_now = self.clock.time_msec()
 
             chunks = [
@@ -152,10 +120,6 @@ class EventStreamHandler(BaseHandler):
 
             defer.returnValue(chunk)
 
-        finally:
-            if affect_presence:
-                self.stopped_stream(auth_user)
-
 
 class EventHandler(BaseHandler):
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 6c19d6ae8c..f599e817aa 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,6 +14,9 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
+from signedjson.key import decode_verify_key_bytes
+from signedjson.sign import verify_signed_json
+from unpaddedbase64 import decode_base64
 
 from ._base import BaseHandler
 
@@ -221,19 +224,11 @@ class FederationHandler(BaseHandler):
                 extra_users.append(target_user)
 
             with PreserveLoggingContext():
-                d = self.notifier.on_new_room_event(
+                self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
                     extra_users=extra_users
                 )
 
-            def log_failure(f):
-                logger.warn(
-                    "Failed to notify about %s: %s",
-                    event.event_id, f.value
-                )
-
-            d.addErrback(log_failure)
-
         if event.type == EventTypes.Member:
             if event.membership == Membership.JOIN:
                 prev_state = context.current_state.get((event.type, event.state_key))
@@ -244,12 +239,6 @@ class FederationHandler(BaseHandler):
                     user = UserID.from_string(event.state_key)
                     yield user_joined_room(self.distributor, user, event.room_id)
 
-        if not backfilled and not event.internal_metadata.is_outlier():
-            action_generator = ActionGenerator(self.hs)
-            yield action_generator.handle_push_actions_for_event(
-                event, self
-            )
-
     @defer.inlineCallbacks
     def _filter_events_for_server(self, server_name, room_id, events):
         event_to_state = yield self.store.get_state_for_events(
@@ -483,7 +472,7 @@ class FederationHandler(BaseHandler):
                         limit=100,
                         extremities=[e for e in extremities.keys()]
                     )
-                except SynapseError:
+                except SynapseError as e:
                     logger.info(
                         "Failed to backfill from %s because %s",
                         dom, e,
@@ -643,19 +632,11 @@ class FederationHandler(BaseHandler):
             )
 
             with PreserveLoggingContext():
-                d = self.notifier.on_new_room_event(
+                self.notifier.on_new_room_event(
                     event, event_stream_id, max_stream_id,
                     extra_users=[joinee]
                 )
 
-            def log_failure(f):
-                logger.warn(
-                    "Failed to notify about %s: %s",
-                    event.event_id, f.value
-                )
-
-            d.addErrback(log_failure)
-
             logger.debug("Finished joining %s to %s", joinee, room_id)
         finally:
             room_queue = self.room_queues[room_id]
@@ -730,18 +711,10 @@ class FederationHandler(BaseHandler):
             extra_users.append(target_user)
 
         with PreserveLoggingContext():
-            d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id, extra_users=extra_users
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        d.addErrback(log_failure)
-
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.JOIN:
                 user = UserID.from_string(event.state_key)
@@ -811,19 +784,11 @@ class FederationHandler(BaseHandler):
 
         target_user = UserID.from_string(event.state_key)
         with PreserveLoggingContext():
-            d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id,
                 extra_users=[target_user],
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        d.addErrback(log_failure)
-
         defer.returnValue(event)
 
     @defer.inlineCallbacks
@@ -848,7 +813,22 @@ class FederationHandler(BaseHandler):
             target_hosts,
             signed_event
         )
-        defer.returnValue(None)
+
+        context = yield self.state_handler.compute_event_context(event)
+
+        event_stream_id, max_stream_id = yield self.store.persist_event(
+            event,
+            context=context,
+            backfilled=False,
+        )
+
+        target_user = UserID.from_string(event.state_key)
+        self.notifier.on_new_room_event(
+            event, event_stream_id, max_stream_id,
+            extra_users=[target_user],
+        )
+
+        defer.returnValue(event)
 
     @defer.inlineCallbacks
     def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
@@ -948,18 +928,10 @@ class FederationHandler(BaseHandler):
             extra_users.append(target_user)
 
         with PreserveLoggingContext():
-            d = self.notifier.on_new_room_event(
+            self.notifier.on_new_room_event(
                 event, event_stream_id, max_stream_id, extra_users=extra_users
             )
 
-        def log_failure(f):
-            logger.warn(
-                "Failed to notify about %s: %s",
-                event.event_id, f.value
-            )
-
-        d.addErrback(log_failure)
-
         new_pdu = event
 
         destinations = set()
@@ -1113,6 +1085,12 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
+        if not backfilled and not event.internal_metadata.is_outlier():
+            action_generator = ActionGenerator(self.hs)
+            yield action_generator.handle_push_actions_for_event(
+                event, context, self
+            )
+
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
@@ -1186,7 +1164,13 @@ class FederationHandler(BaseHandler):
 
             try:
                 self.auth.check(e, auth_events=auth_for_e)
-            except AuthError as err:
+            except SynapseError as err:
+                # we may get SynapseErrors here as well as AuthErrors. For
+                # instance, there are a couple of (ancient) events in some
+                # rooms whose senders do not have the correct sigil; these
+                # cause SynapseErrors in auth.check. We don't want to give up
+                # the attempt to federate altogether in such cases.
+
                 logger.warn(
                     "Rejecting %s because %s",
                     e.event_id, err.msg
@@ -1654,19 +1638,15 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     @log_function
-    def exchange_third_party_invite(self, invite):
-        sender = invite["sender"]
-        room_id = invite["room_id"]
-
-        if "signed" not in invite or "token" not in invite["signed"]:
-            logger.info(
-                "Discarding received notification of third party invite "
-                "without signed: %s" % (invite,)
-            )
-            return
-
+    def exchange_third_party_invite(
+            self,
+            sender_user_id,
+            target_user_id,
+            room_id,
+            signed,
+    ):
         third_party_invite = {
-            "signed": invite["signed"],
+            "signed": signed,
         }
 
         event_dict = {
@@ -1676,8 +1656,8 @@ class FederationHandler(BaseHandler):
                 "third_party_invite": third_party_invite,
             },
             "room_id": room_id,
-            "sender": sender,
-            "state_key": invite["mxid"],
+            "sender": sender_user_id,
+            "state_key": target_user_id,
         }
 
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
@@ -1690,11 +1670,11 @@ class FederationHandler(BaseHandler):
             )
 
             self.auth.check(event, context.current_state)
-            yield self._validate_keyserver(event, auth_events=context.current_state)
+            yield self._check_signature(event, auth_events=context.current_state)
             member_handler = self.hs.get_handlers().room_member_handler
-            yield member_handler.send_membership_event(event, context)
+            yield member_handler.send_membership_event(None, event, context)
         else:
-            destinations = set([x.split(":", 1)[-1] for x in (sender, room_id)])
+            destinations = set(x.split(":", 1)[-1] for x in (sender_user_id, room_id))
             yield self.replication_layer.forward_third_party_invite(
                 destinations,
                 room_id,
@@ -1715,13 +1695,13 @@ class FederationHandler(BaseHandler):
         )
 
         self.auth.check(event, auth_events=context.current_state)
-        yield self._validate_keyserver(event, auth_events=context.current_state)
+        yield self._check_signature(event, auth_events=context.current_state)
 
         returned_invite = yield self.send_invite(origin, event)
         # TODO: Make sure the signatures actually are correct.
         event.signatures.update(returned_invite.signatures)
         member_handler = self.hs.get_handlers().room_member_handler
-        yield member_handler.send_membership_event(event, context)
+        yield member_handler.send_membership_event(None, event, context)
 
     @defer.inlineCallbacks
     def add_display_name_to_third_party_invite(self, event_dict, event, context):
@@ -1745,17 +1725,69 @@ class FederationHandler(BaseHandler):
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
-    def _validate_keyserver(self, event, auth_events):
-        token = event.content["third_party_invite"]["signed"]["token"]
+    def _check_signature(self, event, auth_events):
+        """
+        Checks that the signature in the event is consistent with its invite.
+        :param event (Event): The m.room.member event to check
+        :param auth_events (dict<(event type, state_key), event>)
+
+        :raises
+            AuthError if signature didn't match any keys, or key has been
+                revoked,
+            SynapseError if a transient error meant a key couldn't be checked
+                for revocation.
+        """
+        signed = event.content["third_party_invite"]["signed"]
+        token = signed["token"]
 
         invite_event = auth_events.get(
             (EventTypes.ThirdPartyInvite, token,)
         )
 
+        if not invite_event:
+            raise AuthError(403, "Could not find invite")
+
+        last_exception = None
+        for public_key_object in self.hs.get_auth().get_public_keys(invite_event):
+            try:
+                for server, signature_block in signed["signatures"].items():
+                    for key_name, encoded_signature in signature_block.items():
+                        if not key_name.startswith("ed25519:"):
+                            continue
+
+                        public_key = public_key_object["public_key"]
+                        verify_key = decode_verify_key_bytes(
+                            key_name,
+                            decode_base64(public_key)
+                        )
+                        verify_signed_json(signed, server, verify_key)
+                        if "key_validity_url" in public_key_object:
+                            yield self._check_key_revocation(
+                                public_key,
+                                public_key_object["key_validity_url"]
+                            )
+                        return
+            except Exception as e:
+                last_exception = e
+        raise last_exception
+
+    @defer.inlineCallbacks
+    def _check_key_revocation(self, public_key, url):
+        """
+        Checks whether public_key has been revoked.
+
+        :param public_key (str): base-64 encoded public key.
+        :param url (str): Key revocation URL.
+
+        :raises
+            AuthError if they key has been revoked.
+            SynapseError if a transient error meant a key couldn't be checked
+                for revocation.
+        """
         try:
             response = yield self.hs.get_simple_http_client().get_json(
-                invite_event.content["key_validity_url"],
-                {"public_key": invite_event.content["public_key"]}
+                url,
+                {"public_key": public_key}
             )
         except Exception:
             raise SynapseError(
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 819ec57c4f..656ce124f9 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -36,14 +36,15 @@ class IdentityHandler(BaseHandler):
 
         self.http_client = hs.get_simple_http_client()
 
+        self.trusted_id_servers = set(hs.config.trusted_third_party_id_servers)
+        self.trust_any_id_server_just_for_testing_do_not_use = (
+            hs.config.use_insecure_ssl_client_just_for_testing_do_not_use
+        )
+
     @defer.inlineCallbacks
     def threepid_from_creds(self, creds):
         yield run_on_reactor()
 
-        # XXX: make this configurable!
-        # trustedIdServers = ['matrix.org', 'localhost:8090']
-        trustedIdServers = ['matrix.org', 'vector.im']
-
         if 'id_server' in creds:
             id_server = creds['id_server']
         elif 'idServer' in creds:
@@ -58,10 +59,19 @@ class IdentityHandler(BaseHandler):
         else:
             raise SynapseError(400, "No client_secret in creds")
 
-        if id_server not in trustedIdServers:
-            logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
-                        'credentials', id_server)
-            defer.returnValue(None)
+        if id_server not in self.trusted_id_servers:
+            if self.trust_any_id_server_just_for_testing_do_not_use:
+                logger.warn(
+                    "Trusting untrustworthy ID server %r even though it isn't"
+                    " in the trusted id list for testing because"
+                    " 'use_insecure_ssl_client_just_for_testing_do_not_use'"
+                    " is set in the config",
+                    id_server,
+                )
+            else:
+                logger.warn('%s is not a trusted ID server: rejecting 3pid ' +
+                            'credentials', id_server)
+                defer.returnValue(None)
 
         data = {}
         try:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ff800f8af1..5c50c611ba 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,12 +16,11 @@
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import SynapseError, AuthError, Codes
+from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.streams.config import PaginationConfig
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
 from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.caches.snapshot_cache import SnapshotCache
 from synapse.types import UserID, RoomStreamToken, StreamToken
 
@@ -105,8 +104,6 @@ class MessageHandler(BaseHandler):
             room_token = pagin_config.from_token.room_key
 
         room_token = RoomStreamToken.parse(room_token)
-        if room_token.topological is None:
-            raise SynapseError(400, "Invalid token")
 
         pagin_config.from_token = pagin_config.from_token.copy_and_replace(
             "room_key", str(room_token)
@@ -117,27 +114,31 @@ class MessageHandler(BaseHandler):
         membership, member_event_id = yield self._check_in_room_or_world_readable(
             room_id, user_id
         )
-        if membership == Membership.LEAVE:
-            # If they have left the room then clamp the token to be before
-            # they left the room.
-            leave_token = yield self.store.get_topological_token_for_event(
-                member_event_id
+
+        if source_config.direction == 'b':
+            # if we're going backwards, we might need to backfill. This
+            # requires that we have a topo token.
+            if room_token.topological:
+                max_topo = room_token.topological
+            else:
+                max_topo = yield self.store.get_max_topological_token_for_stream_and_room(
+                    room_id, room_token.stream
+                )
+
+            if membership == Membership.LEAVE:
+                # If they have left the room then clamp the token to be before
+                # they left the room, to save the effort of loading from the
+                # database.
+                leave_token = yield self.store.get_topological_token_for_event(
+                    member_event_id
+                )
+                leave_token = RoomStreamToken.parse(leave_token)
+                if leave_token.topological < max_topo:
+                    source_config.from_key = str(leave_token)
+
+            yield self.hs.get_handlers().federation_handler.maybe_backfill(
+                room_id, max_topo
             )
-            leave_token = RoomStreamToken.parse(leave_token)
-            if leave_token.topological < room_token.topological:
-                source_config.from_key = str(leave_token)
-
-            if source_config.direction == "f":
-                if source_config.to_key is None:
-                    source_config.to_key = str(leave_token)
-                else:
-                    to_token = RoomStreamToken.parse(source_config.to_key)
-                    if leave_token.topological < to_token.topological:
-                        source_config.to_key = str(leave_token)
-
-        yield self.hs.get_handlers().federation_handler.maybe_backfill(
-            room_id, room_token.topological
-        )
 
         events, next_key = yield data_source.get_pagination_rows(
             requester.user, source_config, room_id
@@ -195,12 +196,25 @@ class MessageHandler(BaseHandler):
 
         if builder.type == EventTypes.Member:
             membership = builder.content.get("membership", None)
+            target = UserID.from_string(builder.state_key)
+
             if membership == Membership.JOIN:
-                joinee = UserID.from_string(builder.state_key)
                 # If event doesn't include a display name, add one.
                 yield collect_presencelike_data(
-                    self.distributor, joinee, builder.content
+                    self.distributor, target, builder.content
                 )
+            elif membership == Membership.INVITE:
+                profile = self.hs.get_handlers().profile_handler
+                content = builder.content
+
+                try:
+                    content["displayname"] = yield profile.get_displayname(target)
+                    content["avatar_url"] = yield profile.get_avatar_url(target)
+                except Exception as e:
+                    logger.info(
+                        "Failed to get profile information for %r: %s",
+                        target, e
+                    )
 
         if token_id is not None:
             builder.internal_metadata.token_id = token_id
@@ -214,7 +228,7 @@ class MessageHandler(BaseHandler):
         defer.returnValue((event, context))
 
     @defer.inlineCallbacks
-    def send_event(self, event, context, ratelimit=True, is_guest=False):
+    def send_nonmember_event(self, requester, event, context, ratelimit=True):
         """
         Persists and notifies local clients and federation of an event.
 
@@ -224,55 +238,70 @@ class MessageHandler(BaseHandler):
             ratelimit (bool): Whether to rate limit this send.
             is_guest (bool): Whether the sender is a guest.
         """
+        if event.type == EventTypes.Member:
+            raise SynapseError(
+                500,
+                "Tried to send member event through non-member codepath"
+            )
+
         user = UserID.from_string(event.sender)
 
         assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
 
-        if ratelimit:
-            self.ratelimit(event.sender)
-
         if event.is_state():
-            prev_state = context.current_state.get((event.type, event.state_key))
-            if prev_state and event.user_id == prev_state.user_id:
-                prev_content = encode_canonical_json(prev_state.content)
-                next_content = encode_canonical_json(event.content)
-                if prev_content == next_content:
-                    # Duplicate suppression for state updates with same sender
-                    # and content.
-                    defer.returnValue(prev_state)
-
-        if event.type == EventTypes.Member:
-            member_handler = self.hs.get_handlers().room_member_handler
-            yield member_handler.send_membership_event(event, context, is_guest=is_guest)
-        else:
-            yield self.handle_new_client_event(
-                event=event,
-                context=context,
-            )
+            prev_state = self.deduplicate_state_event(event, context)
+            if prev_state is not None:
+                defer.returnValue(prev_state)
+
+        yield self.handle_new_client_event(
+            requester=requester,
+            event=event,
+            context=context,
+            ratelimit=ratelimit,
+        )
 
         if event.type == EventTypes.Message:
             presence = self.hs.get_handlers().presence_handler
-            with PreserveLoggingContext():
-                presence.bump_presence_active_time(user)
+            yield presence.bump_presence_active_time(user)
+
+    def deduplicate_state_event(self, event, context):
+        """
+        Checks whether event is in the latest resolved state in context.
+
+        If so, returns the version of the event in context.
+        Otherwise, returns None.
+        """
+        prev_event = context.current_state.get((event.type, event.state_key))
+        if prev_event and event.user_id == prev_event.user_id:
+            prev_content = encode_canonical_json(prev_event.content)
+            next_content = encode_canonical_json(event.content)
+            if prev_content == next_content:
+                return prev_event
+        return None
 
     @defer.inlineCallbacks
-    def create_and_send_event(self, event_dict, ratelimit=True,
-                              token_id=None, txn_id=None, is_guest=False):
+    def create_and_send_nonmember_event(
+        self,
+        requester,
+        event_dict,
+        ratelimit=True,
+        txn_id=None
+    ):
         """
         Creates an event, then sends it.
 
-        See self.create_event and self.send_event.
+        See self.create_event and self.send_nonmember_event.
         """
         event, context = yield self.create_event(
             event_dict,
-            token_id=token_id,
+            token_id=requester.access_token_id,
             txn_id=txn_id
         )
-        yield self.send_event(
+        yield self.send_nonmember_event(
+            requester,
             event,
             context,
             ratelimit=ratelimit,
-            is_guest=is_guest
         )
         defer.returnValue(event)
 
@@ -633,8 +662,8 @@ class MessageHandler(BaseHandler):
             user_id, messages, is_peeking=is_peeking
         )
 
-        start_token = StreamToken(token[0], 0, 0, 0, 0)
-        end_token = StreamToken(token[1], 0, 0, 0, 0)
+        start_token = StreamToken.START.copy_and_replace("room_key", token[0])
+        end_token = StreamToken.START.copy_and_replace("room_key", token[1])
 
         time_now = self.clock.time_msec()
 
@@ -658,10 +687,6 @@ class MessageHandler(BaseHandler):
             room_id=room_id,
         )
 
-        # TODO(paul): I wish I was called with user objects not user_id
-        #   strings...
-        auth_user = UserID.from_string(user_id)
-
         # TODO: These concurrently
         time_now = self.clock.time_msec()
         state = [
@@ -686,13 +711,11 @@ class MessageHandler(BaseHandler):
         @defer.inlineCallbacks
         def get_presence():
             states = yield presence_handler.get_states(
-                target_users=[UserID.from_string(m.user_id) for m in room_members],
-                auth_user=auth_user,
+                [m.user_id for m in room_members],
                 as_event=True,
-                check_auth=False,
             )
 
-            defer.returnValue(states.values())
+            defer.returnValue(states)
 
         @defer.inlineCallbacks
         def get_receipts():
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d36eb3b8d7..d0c8f1328b 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -13,13 +13,26 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+"""This module is responsible for keeping track of presence status of local
+and remote users.
 
-from synapse.api.errors import SynapseError, AuthError
+The methods that define policy are:
+    - PresenceHandler._update_states
+    - PresenceHandler._handle_timeouts
+    - should_notify
+"""
+
+from twisted.internet import defer, reactor
+from contextlib import contextmanager
+
+from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
+from synapse.storage.presence import UserPresenceState
 
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
+from synapse.util.metrics import Measure
+from synapse.util.wheel_timer import WheelTimer
 from synapse.types import UserID
 import synapse.metrics
 
@@ -32,85 +45,43 @@ logger = logging.getLogger(__name__)
 
 metrics = synapse.metrics.get_metrics_for(__name__)
 
+notified_presence_counter = metrics.register_counter("notified_presence")
+federation_presence_out_counter = metrics.register_counter("federation_presence_out")
+presence_updates_counter = metrics.register_counter("presence_updates")
+timers_fired_counter = metrics.register_counter("timers_fired")
+federation_presence_counter = metrics.register_counter("federation_presence")
+bump_active_time_counter = metrics.register_counter("bump_active_time")
 
-# Don't bother bumping "last active" time if it differs by less than 60 seconds
-LAST_ACTIVE_GRANULARITY = 60*1000
-
-# Keep no more than this number of offline serial revisions
-MAX_OFFLINE_SERIALS = 1000
-
-
-# TODO(paul): Maybe there's one of these I can steal from somewhere
-def partition(l, func):
-    """Partition the list by the result of func applied to each element."""
-    ret = {}
-
-    for x in l:
-        key = func(x)
-        if key not in ret:
-            ret[key] = []
-        ret[key].append(x)
-
-    return ret
 
+# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
+# "currently_active"
+LAST_ACTIVE_GRANULARITY = 60 * 1000
 
-def partitionbool(l, func):
-    def boolfunc(x):
-        return bool(func(x))
+# How long to wait until a new /events or /sync request before assuming
+# the client has gone.
+SYNC_ONLINE_TIMEOUT = 30 * 1000
 
-    ret = partition(l, boolfunc)
-    return ret.get(True, []), ret.get(False, [])
+# How long to wait before marking the user as idle. Compared against last active
+IDLE_TIMER = 5 * 60 * 1000
 
+# How often we expect remote servers to resend us presence.
+FEDERATION_TIMEOUT = 30 * 60 * 1000
 
-def user_presence_changed(distributor, user, statuscache):
-    return distributor.fire("user_presence_changed", user, statuscache)
+# How often to resend presence to remote servers
+FEDERATION_PING_INTERVAL = 25 * 60 * 1000
 
-
-def collect_presencelike_data(distributor, user, content):
-    return distributor.fire("collect_presencelike_data", user, content)
+assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
 class PresenceHandler(BaseHandler):
 
-    STATE_LEVELS = {
-        PresenceState.OFFLINE: 0,
-        PresenceState.UNAVAILABLE: 1,
-        PresenceState.ONLINE: 2,
-        PresenceState.FREE_FOR_CHAT: 3,
-    }
-
     def __init__(self, hs):
         super(PresenceHandler, self).__init__(hs)
-
-        self.homeserver = hs
-
+        self.hs = hs
         self.clock = hs.get_clock()
-
-        distributor = hs.get_distributor()
-        distributor.observe("registered_user", self.registered_user)
-
-        distributor.observe(
-            "started_user_eventstream", self.started_user_eventstream
-        )
-        distributor.observe(
-            "stopped_user_eventstream", self.stopped_user_eventstream
-        )
-
-        distributor.observe("user_joined_room", self.user_joined_room)
-
-        distributor.declare("collect_presencelike_data")
-
-        distributor.declare("changed_presencelike_data")
-        distributor.observe(
-            "changed_presencelike_data", self.changed_presencelike_data
-        )
-
-        # outbound signal from the presence module to advertise when a user's
-        # presence has changed
-        distributor.declare("user_presence_changed")
-
-        self.distributor = distributor
-
+        self.store = hs.get_datastore()
+        self.wheel_timer = WheelTimer()
+        self.notifier = hs.get_notifier()
         self.federation = hs.get_replication_layer()
 
         self.federation.register_edu_handler(
@@ -138,346 +109,552 @@ class PresenceHandler(BaseHandler):
             )
         )
 
-        # IN-MEMORY store, mapping local userparts to sets of local users to
-        # be informed of state changes.
-        self._local_pushmap = {}
-        # map local users to sets of remote /domain names/ who are interested
-        # in them
-        self._remote_sendmap = {}
-        # map remote users to sets of local users who're interested in them
-        self._remote_recvmap = {}
-        # list of (serial, set of(userids)) tuples, ordered by serial, latest
-        # first
-        self._remote_offline_serials = []
-
-        # map any user to a UserPresenceCache
-        self._user_cachemap = {}
-        self._user_cachemap_latest_serial = 0
-
-        # map room_ids to the latest presence serial for a member of that
-        # room
-        self._room_serials = {}
+        distributor = hs.get_distributor()
+        distributor.observe("user_joined_room", self.user_joined_room)
+
+        active_presence = self.store.take_presence_startup_info()
+
+        # A dictionary of the current state of users. This is prefilled with
+        # non-offline presence from the DB. We should fetch from the DB if
+        # we can't find a users presence in here.
+        self.user_to_current_state = {
+            state.user_id: state
+            for state in active_presence
+        }
 
         metrics.register_callback(
-            "userCachemap:size",
-            lambda: len(self._user_cachemap),
+            "user_to_current_state_size", lambda: len(self.user_to_current_state)
         )
 
-    def _get_or_make_usercache(self, user):
-        """If the cache entry doesn't exist, initialise a new one."""
-        if user not in self._user_cachemap:
-            self._user_cachemap[user] = UserPresenceCache()
-        return self._user_cachemap[user]
-
-    def _get_or_offline_usercache(self, user):
-        """If the cache entry doesn't exist, return an OFFLINE one but do not
-        store it into the cache."""
-        if user in self._user_cachemap:
-            return self._user_cachemap[user]
-        else:
-            return UserPresenceCache()
+        now = self.clock.time_msec()
+        for state in active_presence:
+            self.wheel_timer.insert(
+                now=now,
+                obj=state.user_id,
+                then=state.last_active_ts + IDLE_TIMER,
+            )
+            self.wheel_timer.insert(
+                now=now,
+                obj=state.user_id,
+                then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
+            )
+            if self.hs.is_mine_id(state.user_id):
+                self.wheel_timer.insert(
+                    now=now,
+                    obj=state.user_id,
+                    then=state.last_federation_update_ts + FEDERATION_PING_INTERVAL,
+                )
+            else:
+                self.wheel_timer.insert(
+                    now=now,
+                    obj=state.user_id,
+                    then=state.last_federation_update_ts + FEDERATION_TIMEOUT,
+                )
 
-    def registered_user(self, user):
-        return self.store.create_presence(user.localpart)
+        # Set of users who have presence in the `user_to_current_state` that
+        # have not yet been persisted
+        self.unpersisted_users_changes = set()
 
-    @defer.inlineCallbacks
-    def is_presence_visible(self, observer_user, observed_user):
-        assert(self.hs.is_mine(observed_user))
+        reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
 
-        if observer_user == observed_user:
-            defer.returnValue(True)
+        self.serial_to_user = {}
+        self._next_serial = 1
 
-        if (yield self.store.user_rooms_intersect(
-                [u.to_string() for u in observer_user, observed_user])):
-            defer.returnValue(True)
+        # Keeps track of the number of *ongoing* syncs. While this is non zero
+        # a user will never go offline.
+        self.user_to_num_current_syncs = {}
 
-        if (yield self.store.is_presence_visible(
-                observed_localpart=observed_user.localpart,
-                observer_userid=observer_user.to_string())):
-            defer.returnValue(True)
+        # Start a LoopingCall in 30s that fires every 5s.
+        # The initial delay is to allow disconnected clients a chance to
+        # reconnect before we treat them as offline.
+        self.clock.call_later(
+            0 * 1000,
+            self.clock.looping_call,
+            self._handle_timeouts,
+            5000,
+        )
 
-        defer.returnValue(False)
+        metrics.register_callback("wheel_timer_size", lambda: len(self.wheel_timer))
 
     @defer.inlineCallbacks
-    def get_state(self, target_user, auth_user, as_event=False, check_auth=True):
-        """Get the current presence state of the given user.
+    def _on_shutdown(self):
+        """Gets called when shutting down. This lets us persist any updates that
+        we haven't yet persisted, e.g. updates that only changes some internal
+        timers. This allows changes to persist across startup without having to
+        persist every single change.
+
+        If this does not run it simply means that some of the timers will fire
+        earlier than they should when synapse is restarted. This affect of this
+        is some spurious presence changes that will self-correct.
+        """
+        logger.info(
+            "Performing _on_shutdown. Persiting %d unpersisted changes",
+            len(self.user_to_current_state)
+        )
 
-        Args:
-            target_user (UserID): The user whose presence we want
-            auth_user (UserID): The user requesting the presence, used for
-                checking if said user is allowed to see the persence of the
-                `target_user`
-            as_event (bool): Format the return as an event or not?
-            check_auth (bool): Perform the auth checks or not?
+        if self.unpersisted_users_changes:
+            yield self.store.update_presence([
+                self.user_to_current_state[user_id]
+                for user_id in self.unpersisted_users_changes
+            ])
+        logger.info("Finished _on_shutdown")
 
-        Returns:
-            dict: The presence state of the `target_user`, whose format depends
-            on the `as_event` argument.
+    @defer.inlineCallbacks
+    def _update_states(self, new_states):
+        """Updates presence of users. Sets the appropriate timeouts. Pokes
+        the notifier and federation if and only if the changed presence state
+        should be sent to clients/servers.
         """
-        if self.hs.is_mine(target_user):
-            if check_auth:
-                visible = yield self.is_presence_visible(
-                    observer_user=auth_user,
-                    observed_user=target_user
-                )
+        now = self.clock.time_msec()
 
-                if not visible:
-                    raise SynapseError(404, "Presence information not visible")
+        with Measure(self.clock, "presence_update_states"):
 
-            if target_user in self._user_cachemap:
-                state = self._user_cachemap[target_user].get_state()
-            else:
-                state = yield self.store.get_presence_state(target_user.localpart)
-                if "mtime" in state:
-                    del state["mtime"]
-                state["presence"] = state.pop("state")
-        else:
-            # TODO(paul): Have remote server send us permissions set
-            state = self._get_or_offline_usercache(target_user).get_state()
+            # NOTE: We purposefully don't yield between now and when we've
+            # calculated what we want to do with the new states, to avoid races.
 
-        if "last_active" in state:
-            state["last_active_ago"] = int(
-                self.clock.time_msec() - state.pop("last_active")
-            )
+            to_notify = {}  # Changes we want to notify everyone about
+            to_federation_ping = {}  # These need sending keep-alives
 
-        if as_event:
-            content = state
+            for new_state in new_states:
+                user_id = new_state.user_id
 
-            content["user_id"] = target_user.to_string()
+                # Its fine to not hit the database here, as the only thing not in
+                # the current state cache are OFFLINE states, where the only field
+                # of interest is last_active which is safe enough to assume is 0
+                # here.
+                prev_state = self.user_to_current_state.get(
+                    user_id, UserPresenceState.default(user_id)
+                )
 
-            if "last_active" in content:
-                content["last_active_ago"] = int(
-                    self._clock.time_msec() - content.pop("last_active")
+                new_state, should_notify, should_ping = handle_update(
+                    prev_state, new_state,
+                    is_mine=self.hs.is_mine_id(user_id),
+                    wheel_timer=self.wheel_timer,
+                    now=now
                 )
 
-            defer.returnValue({"type": "m.presence", "content": content})
-        else:
-            defer.returnValue(state)
+                self.user_to_current_state[user_id] = new_state
 
-    @defer.inlineCallbacks
-    def get_states(self, target_users, auth_user, as_event=False, check_auth=True):
-        """A batched version of the `get_state` method that accepts a list of
-        `target_users`
+                if should_notify:
+                    to_notify[user_id] = new_state
+                elif should_ping:
+                    to_federation_ping[user_id] = new_state
 
-        Args:
-            target_users (list): The list of UserID's whose presence we want
-            auth_user (UserID): The user requesting the presence, used for
-                checking if said user is allowed to see the persence of the
-                `target_users`
-            as_event (bool): Format the return as an event or not?
-            check_auth (bool): Perform the auth checks or not?
+            # TODO: We should probably ensure there are no races hereafter
 
-        Returns:
-            dict: A mapping from user -> presence_state
-        """
-        local_users, remote_users = partitionbool(
-            target_users,
-            lambda u: self.hs.is_mine(u)
-        )
+            presence_updates_counter.inc_by(len(new_states))
 
-        if check_auth:
-            for user in local_users:
-                visible = yield self.is_presence_visible(
-                    observer_user=auth_user,
-                    observed_user=user
+            if to_notify:
+                notified_presence_counter.inc_by(len(to_notify))
+                yield self._persist_and_notify(to_notify.values())
+
+            self.unpersisted_users_changes |= set(s.user_id for s in new_states)
+            self.unpersisted_users_changes -= set(to_notify.keys())
+
+            to_federation_ping = {
+                user_id: state for user_id, state in to_federation_ping.items()
+                if user_id not in to_notify
+            }
+            if to_federation_ping:
+                federation_presence_out_counter.inc_by(len(to_federation_ping))
+
+                _, _, hosts_to_states = yield self._get_interested_parties(
+                    to_federation_ping.values()
                 )
 
-                if not visible:
-                    raise SynapseError(404, "Presence information not visible")
+                self._push_to_remotes(hosts_to_states)
+
+    def _handle_timeouts(self):
+        """Checks the presence of users that have timed out and updates as
+        appropriate.
+        """
+        now = self.clock.time_msec()
+
+        with Measure(self.clock, "presence_handle_timeouts"):
+            # Fetch the list of users that *may* have timed out. Things may have
+            # changed since the timeout was set, so we won't necessarily have to
+            # take any action.
+            users_to_check = self.wheel_timer.fetch(now)
 
-        results = {}
-        if local_users:
-            for user in local_users:
-                if user in self._user_cachemap:
-                    results[user] = self._user_cachemap[user].get_state()
+            states = [
+                self.user_to_current_state.get(
+                    user_id, UserPresenceState.default(user_id)
+                )
+                for user_id in set(users_to_check)
+            ]
 
-            local_to_user = {u.localpart: u for u in local_users}
+            timers_fired_counter.inc_by(len(states))
 
-            states = yield self.store.get_presence_states(
-                [u.localpart for u in local_users if u not in results]
+            changes = handle_timeouts(
+                states,
+                is_mine_fn=self.hs.is_mine_id,
+                user_to_num_current_syncs=self.user_to_num_current_syncs,
+                now=now,
             )
 
-            for local_part, state in states.items():
-                if state is None:
-                    continue
-                res = {"presence": state["state"]}
-                if "status_msg" in state and state["status_msg"]:
-                    res["status_msg"] = state["status_msg"]
-                results[local_to_user[local_part]] = res
-
-        for user in remote_users:
-            # TODO(paul): Have remote server send us permissions set
-            results[user] = self._get_or_offline_usercache(user).get_state()
-
-        for state in results.values():
-            if "last_active" in state:
-                state["last_active_ago"] = int(
-                    self.clock.time_msec() - state.pop("last_active")
-                )
+        preserve_fn(self._update_states)(changes)
 
-        if as_event:
-            for user, state in results.items():
-                content = state
-                content["user_id"] = user.to_string()
+    @defer.inlineCallbacks
+    def bump_presence_active_time(self, user):
+        """We've seen the user do something that indicates they're interacting
+        with the app.
+        """
+        user_id = user.to_string()
 
-                if "last_active" in content:
-                    content["last_active_ago"] = int(
-                        self._clock.time_msec() - content.pop("last_active")
-                    )
+        bump_active_time_counter.inc()
 
-                results[user] = {"type": "m.presence", "content": content}
+        prev_state = yield self.current_state_for_user(user_id)
 
-        defer.returnValue(results)
+        new_fields = {
+            "last_active_ts": self.clock.time_msec(),
+        }
+        if prev_state.state == PresenceState.UNAVAILABLE:
+            new_fields["state"] = PresenceState.ONLINE
+
+        yield self._update_states([prev_state.copy_and_replace(**new_fields)])
 
     @defer.inlineCallbacks
-    @log_function
-    def set_state(self, target_user, auth_user, state):
-        # return
-        # TODO (erikj): Turn this back on. Why did we end up sending EDUs
-        # everywhere?
+    def user_syncing(self, user_id, affect_presence=True):
+        """Returns a context manager that should surround any stream requests
+        from the user.
 
-        if not self.hs.is_mine(target_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
+        This allows us to keep track of who is currently streaming and who isn't
+        without having to have timers outside of this module to avoid flickering
+        when users disconnect/reconnect.
+
+        Args:
+            user_id (str)
+            affect_presence (bool): If false this function will be a no-op.
+                Useful for streams that are not associated with an actual
+                client that is being used by a user.
+        """
+        if affect_presence:
+            curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
+            self.user_to_num_current_syncs[user_id] = curr_sync + 1
+
+            prev_state = yield self.current_state_for_user(user_id)
+            if prev_state.state == PresenceState.OFFLINE:
+                # If they're currently offline then bring them online, otherwise
+                # just update the last sync times.
+                yield self._update_states([prev_state.copy_and_replace(
+                    state=PresenceState.ONLINE,
+                    last_active_ts=self.clock.time_msec(),
+                    last_user_sync_ts=self.clock.time_msec(),
+                )])
+            else:
+                yield self._update_states([prev_state.copy_and_replace(
+                    last_user_sync_ts=self.clock.time_msec(),
+                )])
 
-        if target_user != auth_user:
-            raise AuthError(400, "Cannot set another user's presence")
+        @defer.inlineCallbacks
+        def _end():
+            if affect_presence:
+                self.user_to_num_current_syncs[user_id] -= 1
 
-        if "status_msg" not in state:
-            state["status_msg"] = None
+                prev_state = yield self.current_state_for_user(user_id)
+                yield self._update_states([prev_state.copy_and_replace(
+                    last_user_sync_ts=self.clock.time_msec(),
+                )])
 
-        for k in state.keys():
-            if k not in ("presence", "status_msg"):
-                raise SynapseError(
-                    400, "Unexpected presence state key '%s'" % (k,)
-                )
+        @contextmanager
+        def _user_syncing():
+            try:
+                yield
+            finally:
+                preserve_fn(_end)()
 
-        if state["presence"] not in self.STATE_LEVELS:
-            raise SynapseError(400, "'%s' is not a valid presence state" % (
-                state["presence"],
-            ))
+        defer.returnValue(_user_syncing())
 
-        logger.debug("Updating presence state of %s to %s",
-                     target_user.localpart, state["presence"])
+    @defer.inlineCallbacks
+    def current_state_for_user(self, user_id):
+        """Get the current presence state for a user.
+        """
+        res = yield self.current_state_for_users([user_id])
+        defer.returnValue(res[user_id])
 
-        state_to_store = dict(state)
-        state_to_store["state"] = state_to_store.pop("presence")
+    @defer.inlineCallbacks
+    def current_state_for_users(self, user_ids):
+        """Get the current presence state for multiple users.
 
-        statuscache = self._get_or_offline_usercache(target_user)
-        was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]]
-        now_level = self.STATE_LEVELS[state["presence"]]
+        Returns:
+            dict: `user_id` -> `UserPresenceState`
+        """
+        states = {
+            user_id: self.user_to_current_state.get(user_id, None)
+            for user_id in user_ids
+        }
+
+        missing = [user_id for user_id, state in states.items() if not state]
+        if missing:
+            # There are things not in our in memory cache. Lets pull them out of
+            # the database.
+            res = yield self.store.get_presence_for_users(missing)
+            states.update({state.user_id: state for state in res})
+
+            missing = [user_id for user_id, state in states.items() if not state]
+            if missing:
+                new = {
+                    user_id: UserPresenceState.default(user_id)
+                    for user_id in missing
+                }
+                states.update(new)
+                self.user_to_current_state.update(new)
 
-        yield self.store.set_presence_state(
-            target_user.localpart, state_to_store
-        )
-        yield collect_presencelike_data(self.distributor, target_user, state)
+        defer.returnValue(states)
+
+    @defer.inlineCallbacks
+    def _get_interested_parties(self, states):
+        """Given a list of states return which entities (rooms, users, servers)
+        are interested in the given states.
+
+        Returns:
+            3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`,
+            with each item being a dict of `entity_name` -> `[UserPresenceState]`
+        """
+        room_ids_to_states = {}
+        users_to_states = {}
+        for state in states:
+            events = yield self.store.get_rooms_for_user(state.user_id)
+            for e in events:
+                room_ids_to_states.setdefault(e.room_id, []).append(state)
+
+            plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
+            for u in plist:
+                users_to_states.setdefault(u, []).append(state)
+
+            # Always notify self
+            users_to_states.setdefault(state.user_id, []).append(state)
+
+        hosts_to_states = {}
+        for room_id, states in room_ids_to_states.items():
+            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            if not local_states:
+                continue
 
-        if now_level > was_level:
-            state["last_active"] = self.clock.time_msec()
+            hosts = yield self.store.get_joined_hosts_for_room(room_id)
+            for host in hosts:
+                hosts_to_states.setdefault(host, []).extend(local_states)
 
-        now_online = state["presence"] != PresenceState.OFFLINE
-        was_polling = target_user in self._user_cachemap
+        for user_id, states in users_to_states.items():
+            local_states = filter(lambda s: self.hs.is_mine_id(s.user_id), states)
+            if not local_states:
+                continue
 
-        if now_online and not was_polling:
-            self.start_polling_presence(target_user, state=state)
-        elif not now_online and was_polling:
-            self.stop_polling_presence(target_user)
+            host = UserID.from_string(user_id).domain
+            hosts_to_states.setdefault(host, []).extend(local_states)
 
-        # TODO(paul): perform a presence push as part of start/stop poll so
-        #   we don't have to do this all the time
-        yield self.changed_presencelike_data(target_user, state)
+        # TODO: de-dup hosts_to_states, as a single host might have multiple
+        # of same presence
 
-    def bump_presence_active_time(self, user, now=None):
-        if now is None:
-            now = self.clock.time_msec()
+        defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states))
 
-        prev_state = self._get_or_make_usercache(user)
-        if now - prev_state.state.get("last_active", 0) < LAST_ACTIVE_GRANULARITY:
-            return
+    @defer.inlineCallbacks
+    def _persist_and_notify(self, states):
+        """Persist states in the database, poke the notifier and send to
+        interested remote servers
+        """
+        stream_id, max_token = yield self.store.update_presence(states)
 
-        self.changed_presencelike_data(user, {"last_active": now})
+        parties = yield self._get_interested_parties(states)
+        room_ids_to_states, users_to_states, hosts_to_states = parties
 
-    def get_joined_rooms_for_user(self, user):
-        """Get the list of rooms a user is joined to.
+        self.notifier.on_new_event(
+            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
+            users=[UserID.from_string(u) for u in users_to_states.keys()]
+        )
+
+        self._push_to_remotes(hosts_to_states)
+
+    def _push_to_remotes(self, hosts_to_states):
+        """Sends state updates to remote servers.
 
         Args:
-            user(UserID): The user.
-        Returns:
-            A Deferred of a list of room id strings.
+            hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]`
+        """
+        now = self.clock.time_msec()
+        for host, states in hosts_to_states.items():
+            self.federation.send_edu(
+                destination=host,
+                edu_type="m.presence",
+                content={
+                    "push": [
+                        _format_user_presence_state(state, now)
+                        for state in states
+                    ]
+                }
+            )
+
+    @defer.inlineCallbacks
+    def incoming_presence(self, origin, content):
+        """Called when we receive a `m.presence` EDU from a remote server.
         """
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        return rm_handler.get_joined_rooms_for_user(user)
+        now = self.clock.time_msec()
+        updates = []
+        for push in content.get("push", []):
+            # A "push" contains a list of presence that we are probably interested
+            # in.
+            # TODO: Actually check if we're interested, rather than blindly
+            # accepting presence updates.
+            user_id = push.get("user_id", None)
+            if not user_id:
+                logger.info(
+                    "Got presence update from %r with no 'user_id': %r",
+                    origin, push,
+                )
+                continue
+
+            presence_state = push.get("presence", None)
+            if not presence_state:
+                logger.info(
+                    "Got presence update from %r with no 'presence_state': %r",
+                    origin, push,
+                )
+                continue
+
+            new_fields = {
+                "state": presence_state,
+                "last_federation_update_ts": now,
+            }
 
-    def get_joined_users_for_room_id(self, room_id):
-        rm_handler = self.homeserver.get_handlers().room_member_handler
-        return rm_handler.get_room_members(room_id)
+            last_active_ago = push.get("last_active_ago", None)
+            if last_active_ago is not None:
+                new_fields["last_active_ts"] = now - last_active_ago
+
+            new_fields["status_msg"] = push.get("status_msg", None)
+            new_fields["currently_active"] = push.get("currently_active", False)
+
+            prev_state = yield self.current_state_for_user(user_id)
+            updates.append(prev_state.copy_and_replace(**new_fields))
+
+        if updates:
+            federation_presence_counter.inc_by(len(updates))
+            yield self._update_states(updates)
 
     @defer.inlineCallbacks
-    def changed_presencelike_data(self, user, state):
-        """Updates the presence state of a local user.
+    def get_state(self, target_user, as_event=False):
+        results = yield self.get_states(
+            [target_user.to_string()],
+            as_event=as_event,
+        )
+
+        defer.returnValue(results[0])
+
+    @defer.inlineCallbacks
+    def get_states(self, target_user_ids, as_event=False):
+        """Get the presence state for users.
 
         Args:
-            user(UserID): The user being updated.
-            state(dict): The new presence state for the user.
+            target_user_ids (list)
+            as_event (bool): Whether to format it as a client event or not.
+
         Returns:
-            A Deferred
+            list
         """
-        self._user_cachemap_latest_serial += 1
-        statuscache = yield self.update_presence_cache(user, state)
-        yield self.push_presence(user, statuscache=statuscache)
 
-    @log_function
-    def started_user_eventstream(self, user):
-        # TODO(paul): Use "last online" state
-        return self.set_state(user, user, {"presence": PresenceState.ONLINE})
+        updates = yield self.current_state_for_users(target_user_ids)
+        updates = updates.values()
 
-    @log_function
-    def stopped_user_eventstream(self, user):
-        # TODO(paul): Save current state as "last online" state
-        return self.set_state(user, user, {"presence": PresenceState.OFFLINE})
+        for user_id in set(target_user_ids) - set(u.user_id for u in updates):
+            updates.append(UserPresenceState.default(user_id))
+
+        now = self.clock.time_msec()
+        if as_event:
+            defer.returnValue([
+                {
+                    "type": "m.presence",
+                    "content": _format_user_presence_state(state, now),
+                }
+                for state in updates
+            ])
+        else:
+            defer.returnValue([
+                _format_user_presence_state(state, now) for state in updates
+            ])
 
     @defer.inlineCallbacks
-    def user_joined_room(self, user, room_id):
-        """Called via the distributor whenever a user joins a room.
-        Notifies the new member of the presence of the current members.
-        Notifies the current members of the room of the new member's presence.
+    def set_state(self, target_user, state):
+        """Set the presence state of the user.
+        """
+        status_msg = state.get("status_msg", None)
+        presence = state["presence"]
 
-        Args:
-            user(UserID): The user who joined the room.
-            room_id(str): The room id the user joined.
+        valid_presence = (
+            PresenceState.ONLINE, PresenceState.UNAVAILABLE, PresenceState.OFFLINE
+        )
+        if presence not in valid_presence:
+            raise SynapseError(400, "Invalid presence state")
+
+        user_id = target_user.to_string()
+
+        prev_state = yield self.current_state_for_user(user_id)
+
+        new_fields = {
+            "state": presence,
+            "status_msg": status_msg if presence != PresenceState.OFFLINE else None
+        }
+
+        if presence == PresenceState.ONLINE:
+            new_fields["last_active_ts"] = self.clock.time_msec()
+
+        yield self._update_states([prev_state.copy_and_replace(**new_fields)])
+
+    @defer.inlineCallbacks
+    def user_joined_room(self, user, room_id):
+        """Called (via the distributor) when a user joins a room. This funciton
+        sends presence updates to servers, either:
+            1. the joining user is a local user and we send their presence to
+               all servers in the room.
+            2. the joining user is a remote user and so we send presence for all
+               local users in the room.
         """
+        # We only need to send presence to servers that don't have it yet. We
+        # don't need to send to local clients here, as that is done as part
+        # of the event stream/sync.
+        # TODO: Only send to servers not already in the room.
         if self.hs.is_mine(user):
-            # No actual update but we need to bump the serial anyway for the
-            # event source
-            self._user_cachemap_latest_serial += 1
-            statuscache = yield self.update_presence_cache(
-                user, room_ids=[room_id]
-            )
-            self.push_update_to_local_and_remote(
-                observed_user=user,
-                room_ids=[room_id],
-                statuscache=statuscache,
-            )
+            state = yield self.current_state_for_user(user.to_string())
 
-        # We also want to tell them about current presence of people.
-        curr_users = yield self.get_joined_users_for_room_id(room_id)
+            hosts = yield self.store.get_joined_hosts_for_room(room_id)
+            self._push_to_remotes({host: (state,) for host in hosts})
+        else:
+            user_ids = yield self.store.get_users_in_room(room_id)
+            user_ids = filter(self.hs.is_mine_id, user_ids)
 
-        for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
-            statuscache = yield self.update_presence_cache(
-                local_user, room_ids=[room_id], add_to_cache=False
-            )
+            states = yield self.current_state_for_users(user_ids)
 
-            self.push_update_to_local_and_remote(
-                observed_user=local_user,
-                users_to_push=[user],
-                statuscache=statuscache,
-            )
+            self._push_to_remotes({user.domain: states.values()})
 
     @defer.inlineCallbacks
-    def send_presence_invite(self, observer_user, observed_user):
-        """Request the presence of a local or remote user for a local user"""
+    def get_presence_list(self, observer_user, accepted=None):
+        """Returns the presence for all users in their presence list.
+        """
         if not self.hs.is_mine(observer_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
+        presence_list = yield self.store.get_presence_list(
+            observer_user.localpart, accepted=accepted
+        )
+
+        results = yield self.get_states(
+            target_user_ids=[row["observed_user_id"] for row in presence_list],
+            as_event=False,
+        )
+
+        is_accepted = {
+            row["observed_user_id"]: row["accepted"] for row in presence_list
+        }
+
+        for result in results:
+            result.update({
+                "accepted": is_accepted,
+            })
+
+        defer.returnValue(results)
+
+    @defer.inlineCallbacks
+    def send_presence_invite(self, observer_user, observed_user):
+        """Sends a presence invite.
+        """
         yield self.store.add_presence_list_pending(
             observer_user.localpart, observed_user.to_string()
         )
@@ -495,59 +672,40 @@ class PresenceHandler(BaseHandler):
             )
 
     @defer.inlineCallbacks
-    def _should_accept_invite(self, observed_user, observer_user):
-        if not self.hs.is_mine(observed_user):
-            defer.returnValue(False)
-
-        row = yield self.store.has_presence_state(observed_user.localpart)
-        if not row:
-            defer.returnValue(False)
-
-        # TODO(paul): Eventually we'll ask the user's permission for this
-        # before accepting. For now just accept any invite request
-        defer.returnValue(True)
-
-    @defer.inlineCallbacks
     def invite_presence(self, observed_user, observer_user):
-        """Handles a m.presence_invite EDU. A remote or local user has
-        requested presence updates for a local user. If the invite is accepted
-        then allow the local or remote user to see the presence of the local
-        user.
-
-        Args:
-            observed_user(UserID): The local user whose presence is requested.
-            observer_user(UserID): The remote or local user requesting presence.
+        """Handles new presence invites.
         """
-        accept = yield self._should_accept_invite(observed_user, observer_user)
-
-        if accept:
-            yield self.store.allow_presence_visible(
-                observed_user.localpart, observer_user.to_string()
-            )
+        if not self.hs.is_mine(observed_user):
+            raise SynapseError(400, "User is not hosted on this Home Server")
 
+        # TODO: Don't auto accept
         if self.hs.is_mine(observer_user):
-            if accept:
-                yield self.accept_presence(observed_user, observer_user)
-            else:
-                yield self.deny_presence(observed_user, observer_user)
+            yield self.accept_presence(observed_user, observer_user)
         else:
-            edu_type = "m.presence_accept" if accept else "m.presence_deny"
-
-            yield self.federation.send_edu(
+            self.federation.send_edu(
                 destination=observer_user.domain,
-                edu_type=edu_type,
+                edu_type="m.presence_accept",
                 content={
                     "observed_user": observed_user.to_string(),
                     "observer_user": observer_user.to_string(),
                 }
             )
 
+            state_dict = yield self.get_state(observed_user, as_event=False)
+
+            self.federation.send_edu(
+                destination=observer_user.domain,
+                edu_type="m.presence",
+                content={
+                    "push": [state_dict]
+                }
+            )
+
     @defer.inlineCallbacks
     def accept_presence(self, observed_user, observer_user):
         """Handles a m.presence_accept EDU. Mark a presence invite from a
         local or remote user as accepted in a local user's presence list.
         Starts polling for presence updates from the local or remote user.
-
         Args:
             observed_user(UserID): The user to update in the presence list.
             observer_user(UserID): The owner of the presence list to update.
@@ -556,15 +714,10 @@ class PresenceHandler(BaseHandler):
             observer_user.localpart, observed_user.to_string()
         )
 
-        self.start_polling_presence(
-            observer_user, target_user=observed_user
-        )
-
     @defer.inlineCallbacks
     def deny_presence(self, observed_user, observer_user):
         """Handle a m.presence_deny EDU. Removes a local or remote user from a
         local user's presence list.
-
         Args:
             observed_user(UserID): The local or remote user to remove from the
                 list.
@@ -582,7 +735,6 @@ class PresenceHandler(BaseHandler):
     def drop(self, observed_user, observer_user):
         """Remove a local or remote user from a local user's presence list and
         unsubscribe the local user from updates that user.
-
         Args:
             observed_user(UserId): The local or remote user to remove from the
                 list.
@@ -597,710 +749,353 @@ class PresenceHandler(BaseHandler):
             observer_user.localpart, observed_user.to_string()
         )
 
-        self.stop_polling_presence(
-            observer_user, target_user=observed_user
-        )
-
-    @defer.inlineCallbacks
-    def get_presence_list(self, observer_user, accepted=None):
-        """Get the presence list for a local user. The retured list includes
-        the current presence state for each user listed.
-
-        Args:
-            observer_user(UserID): The local user whose presence list to fetch.
-            accepted(bool or None): If not none then only include users who
-                have or have not accepted the presence invite request.
-        Returns:
-            A Deferred list of presence state events.
-        """
-        if not self.hs.is_mine(observer_user):
-            raise SynapseError(400, "User is not hosted on this Home Server")
-
-        presence_list = yield self.store.get_presence_list(
-            observer_user.localpart, accepted=accepted
-        )
-
-        results = []
-        for row in presence_list:
-            observed_user = UserID.from_string(row["observed_user_id"])
-            result = {
-                "observed_user": observed_user, "accepted": row["accepted"]
-            }
-            result.update(
-                self._get_or_offline_usercache(observed_user).get_state()
-            )
-            if "last_active" in result:
-                result["last_active_ago"] = int(
-                    self.clock.time_msec() - result.pop("last_active")
-                )
-            results.append(result)
-
-        defer.returnValue(results)
+        # TODO: Inform the remote that we've dropped the presence list.
 
     @defer.inlineCallbacks
-    @log_function
-    def start_polling_presence(self, user, target_user=None, state=None):
-        """Subscribe a local user to presence updates from a local or remote
-        user. If no target_user is supplied then subscribe to all users stored
-        in the presence list for the local user.
-
-        Additonally this pushes the current presence state of this user to all
-        target_users. That state can be provided directly or will be read from
-        the stored state for the local user.
-
-        Also this attempts to notify the local user of the current state of
-        any local target users.
-
-        Args:
-            user(UserID): The local user that whishes for presence updates.
-            target_user(UserID): The local or remote user whose updates are
-                wanted.
-            state(dict): Optional presence state for the local user.
+    def is_visible(self, observed_user, observer_user):
+        """Returns whether a user can see another user's presence.
         """
-        logger.debug("Start polling for presence from %s", user)
-
-        if target_user:
-            target_users = set([target_user])
-            room_ids = []
-        else:
-            presence = yield self.store.get_presence_list(
-                user.localpart, accepted=True
-            )
-            target_users = set([
-                UserID.from_string(x["observed_user_id"]) for x in presence
-            ])
+        observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string())
+        observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string())
 
-            # Also include people in all my rooms
+        observer_room_ids = set(r.room_id for r in observer_rooms)
+        observed_room_ids = set(r.room_id for r in observed_rooms)
 
-            room_ids = yield self.get_joined_rooms_for_user(user)
+        if observer_room_ids & observed_room_ids:
+            defer.returnValue(True)
 
-        if state is None:
-            state = yield self.store.get_presence_state(user.localpart)
-        else:
-            # statuscache = self._get_or_make_usercache(user)
-            # self._user_cachemap_latest_serial += 1
-            # statuscache.update(state, self._user_cachemap_latest_serial)
-            pass
-
-        yield self.push_update_to_local_and_remote(
-            observed_user=user,
-            users_to_push=target_users,
-            room_ids=room_ids,
-            statuscache=self._get_or_make_usercache(user),
+        accepted_observers = yield self.store.get_presence_list_observers_accepted(
+            observed_user.to_string()
         )
 
-        for target_user in target_users:
-            if self.hs.is_mine(target_user):
-                self._start_polling_local(user, target_user)
-
-                # We want to tell the person that just came online
-                # presence state of people they are interested in?
-                self.push_update_to_clients(
-                    users_to_push=[user],
-                )
-
-        deferreds = []
-        remote_users = [u for u in target_users if not self.hs.is_mine(u)]
-        remoteusers_by_domain = partition(remote_users, lambda u: u.domain)
-        # Only poll for people in our get_presence_list
-        for domain in remoteusers_by_domain:
-            remoteusers = remoteusers_by_domain[domain]
-
-            deferreds.append(self._start_polling_remote(
-                user, domain, remoteusers
-            ))
+        defer.returnValue(observer_user.to_string() in accepted_observers)
 
-        yield defer.DeferredList(deferreds, consumeErrors=True)
-
-    def _start_polling_local(self, user, target_user):
-        """Subscribe a local user to presence updates for a local user
-
-        Args:
-            user(UserId): The local user that wishes for updates.
-            target_user(UserId): The local users whose updates are wanted.
+    @defer.inlineCallbacks
+    def get_all_presence_updates(self, last_id, current_id):
         """
-        target_localpart = target_user.localpart
-
-        if target_localpart not in self._local_pushmap:
-            self._local_pushmap[target_localpart] = set()
-
-        self._local_pushmap[target_localpart].add(user)
-
-    def _start_polling_remote(self, user, domain, remoteusers):
-        """Subscribe a local user to presence updates for remote users on a
-        given remote domain.
-
-        Args:
-            user(UserID): The local user that wishes for updates.
-            domain(str): The remote server the local user wants updates from.
-            remoteusers(UserID): The remote users that local user wants to be
-                told about.
-        Returns:
-            A Deferred.
+        Gets a list of presence update rows from between the given stream ids.
+        Each row has:
+        - stream_id(str)
+        - user_id(str)
+        - state(str)
+        - last_active_ts(int)
+        - last_federation_update_ts(int)
+        - last_user_sync_ts(int)
+        - status_msg(int)
+        - currently_active(int)
         """
-        to_poll = set()
-
-        for u in remoteusers:
-            if u not in self._remote_recvmap:
-                self._remote_recvmap[u] = set()
-                to_poll.add(u)
-
-            self._remote_recvmap[u].add(user)
+        # TODO(markjh): replicate the unpersisted changes.
+        # This could use the in-memory stores for recent changes.
+        rows = yield self.store.get_all_presence_updates(last_id, current_id)
+        defer.returnValue(rows)
 
-        if not to_poll:
-            return defer.succeed(None)
-
-        return self.federation.send_edu(
-            destination=domain,
-            edu_type="m.presence",
-            content={"poll": [u.to_string() for u in to_poll]}
-        )
-
-    @log_function
-    def stop_polling_presence(self, user, target_user=None):
-        """Unsubscribe a local user from presence updates from a local or
-        remote user. If no target user is supplied then unsubscribe the user
-        from all presence updates that the user had subscribed to.
-
-        Args:
-            user(UserID): The local user that no longer wishes for updates.
-            target_user(UserID or None): The user whose updates are no longer
-                wanted.
-        Returns:
-            A Deferred.
-        """
-        logger.debug("Stop polling for presence from %s", user)
 
-        if not target_user or self.hs.is_mine(target_user):
-            self._stop_polling_local(user, target_user=target_user)
+def should_notify(old_state, new_state):
+    """Decides if a presence state change should be sent to interested parties.
+    """
+    if old_state.status_msg != new_state.status_msg:
+        return True
 
-        deferreds = []
+    if old_state.state == PresenceState.ONLINE:
+        if new_state.state != PresenceState.ONLINE:
+            # Always notify for online -> anything
+            return True
 
-        if target_user:
-            if target_user not in self._remote_recvmap:
-                return
-            target_users = set([target_user])
-        else:
-            target_users = self._remote_recvmap.keys()
+        if new_state.currently_active != old_state.currently_active:
+            return True
 
-        remoteusers = [u for u in target_users
-                       if user in self._remote_recvmap[u]]
-        remoteusers_by_domain = partition(remoteusers, lambda u: u.domain)
+    if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+        # Always notify for a transition where last active gets bumped.
+        return True
 
-        for domain in remoteusers_by_domain:
-            remoteusers = remoteusers_by_domain[domain]
+    if old_state.state != new_state.state:
+        return True
 
-            deferreds.append(
-                self._stop_polling_remote(user, domain, remoteusers)
-            )
+    return False
 
-        return defer.DeferredList(deferreds, consumeErrors=True)
 
-    def _stop_polling_local(self, user, target_user):
-        """Unsubscribe a local user from presence updates from a local user on
-        this server.
+def _format_user_presence_state(state, now):
+    """Convert UserPresenceState to a format that can be sent down to clients
+    and to other servers.
+    """
+    content = {
+        "presence": state.state,
+        "user_id": state.user_id,
+    }
+    if state.last_active_ts:
+        content["last_active_ago"] = now - state.last_active_ts
+    if state.status_msg and state.state != PresenceState.OFFLINE:
+        content["status_msg"] = state.status_msg
+    if state.state == PresenceState.ONLINE:
+        content["currently_active"] = state.currently_active
 
-        Args:
-            user(UserID): The local user that no longer wishes for updates.
-            target_user(UserID): The user whose updates are no longer wanted.
-        """
-        for localpart in self._local_pushmap.keys():
-            if target_user and localpart != target_user.localpart:
-                continue
+    return content
 
-            if user in self._local_pushmap[localpart]:
-                self._local_pushmap[localpart].remove(user)
 
-            if not self._local_pushmap[localpart]:
-                del self._local_pushmap[localpart]
+class PresenceEventSource(object):
+    def __init__(self, hs):
+        self.hs = hs
+        self.clock = hs.get_clock()
+        self.store = hs.get_datastore()
 
+    @defer.inlineCallbacks
     @log_function
-    def _stop_polling_remote(self, user, domain, remoteusers):
-        """Unsubscribe a local user from presence updates from remote users on
-        a given domain.
-
-        Args:
-            user(UserID): The local user that no longer wishes for updates.
-            domain(str): The remote server to unsubscribe from.
-            remoteusers([UserID]): The users on that remote server that the
-                local user no longer wishes to be updated about.
-        Returns:
-            A Deferred.
-        """
-        to_unpoll = set()
-
-        for u in remoteusers:
-            self._remote_recvmap[u].remove(user)
-
-            if not self._remote_recvmap[u]:
-                del self._remote_recvmap[u]
-                to_unpoll.add(u)
+    def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
+                       **kwargs):
+        # The process for getting presence events are:
+        #  1. Get the rooms the user is in.
+        #  2. Get the list of user in the rooms.
+        #  3. Get the list of users that are in the user's presence list.
+        #  4. If there is a from_key set, cross reference the list of users
+        #     with the `presence_stream_cache` to see which ones we actually
+        #     need to check.
+        #  5. Load current state for the users.
+        #
+        # We don't try and limit the presence updates by the current token, as
+        # sending down the rare duplicate is not a concern.
+
+        with Measure(self.clock, "presence.get_new_events"):
+            user_id = user.to_string()
+            if from_key is not None:
+                from_key = int(from_key)
+            room_ids = room_ids or []
 
-        if not to_unpoll:
-            return defer.succeed(None)
+            presence = self.hs.get_handlers().presence_handler
+            stream_change_cache = self.store.presence_stream_cache
 
-        return self.federation.send_edu(
-            destination=domain,
-            edu_type="m.presence",
-            content={"unpoll": [u.to_string() for u in to_unpoll]}
-        )
+            if not room_ids:
+                rooms = yield self.store.get_rooms_for_user(user_id)
+                room_ids = set(e.room_id for e in rooms)
+            else:
+                room_ids = set(room_ids)
+
+            max_token = self.store.get_current_presence_token()
+
+            plist = yield self.store.get_presence_list_accepted(user.localpart)
+            friends = set(row["observed_user_id"] for row in plist)
+            friends.add(user_id)  # So that we receive our own presence
+
+            user_ids_changed = set()
+            changed = None
+            if from_key and max_token - from_key < 100:
+                # For small deltas, its quicker to get all changes and then
+                # work out if we share a room or they're in our presence list
+                changed = stream_change_cache.get_all_entities_changed(from_key)
+
+            # get_all_entities_changed can return None
+            if changed is not None:
+                for other_user_id in changed:
+                    if other_user_id in friends:
+                        user_ids_changed.add(other_user_id)
+                        continue
+                    other_rooms = yield self.store.get_rooms_for_user(other_user_id)
+                    if room_ids.intersection(e.room_id for e in other_rooms):
+                        user_ids_changed.add(other_user_id)
+                        continue
+            else:
+                # Too many possible updates. Find all users we can see and check
+                # if any of them have changed.
+                user_ids_to_check = set()
+                for room_id in room_ids:
+                    users = yield self.store.get_users_in_room(room_id)
+                    user_ids_to_check.update(users)
+
+                user_ids_to_check.update(friends)
+
+                # Always include yourself. Only really matters for when the user is
+                # not in any rooms, but still.
+                user_ids_to_check.add(user_id)
+
+                if from_key:
+                    user_ids_changed = stream_change_cache.get_entities_changed(
+                        user_ids_to_check, from_key,
+                    )
+                else:
+                    user_ids_changed = user_ids_to_check
 
-    @defer.inlineCallbacks
-    @log_function
-    def push_presence(self, user, statuscache):
-        """
-        Notify local and remote users of a change in presence of a local user.
-        Pushes the update to local clients and remote domains that are directly
-        subscribed to the presence of the local user.
-        Also pushes that update to any local user or remote domain that shares
-        a room with the local user.
+            updates = yield presence.current_state_for_users(user_ids_changed)
 
-        Args:
-            user(UserID): The local user whose presence was updated.
-            statuscache(UserPresenceCache): Cache of the user's presence state
-        Returns:
-            A Deferred.
-        """
-        assert(self.hs.is_mine(user))
+        now = self.clock.time_msec()
 
-        logger.debug("Pushing presence update from %s", user)
+        defer.returnValue(([
+            {
+                "type": "m.presence",
+                "content": _format_user_presence_state(s, now),
+            }
+            for s in updates.values()
+            if include_offline or s.state != PresenceState.OFFLINE
+        ], max_token))
 
-        localusers = set(self._local_pushmap.get(user.localpart, set()))
-        remotedomains = set(self._remote_sendmap.get(user.localpart, set()))
+    def get_current_key(self):
+        return self.store.get_current_presence_token()
 
-        # Reflect users' status changes back to themselves, so UIs look nice
-        # and also user is informed of server-forced pushes
-        localusers.add(user)
+    def get_pagination_rows(self, user, pagination_config, key):
+        return self.get_new_events(user, from_key=None, include_offline=False)
 
-        room_ids = yield self.get_joined_rooms_for_user(user)
 
-        if not localusers and not room_ids:
-            defer.returnValue(None)
+def handle_timeouts(user_states, is_mine_fn, user_to_num_current_syncs, now):
+    """Checks the presence of users that have timed out and updates as
+    appropriate.
 
-        yield self.push_update_to_local_and_remote(
-            observed_user=user,
-            users_to_push=localusers,
-            remote_domains=remotedomains,
-            room_ids=room_ids,
-            statuscache=statuscache,
-        )
-        yield user_presence_changed(self.distributor, user, statuscache)
+    Args:
+        user_states(list): List of UserPresenceState's to check.
+        is_mine_fn (fn): Function that returns if a user_id is ours
+        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
+            active syncs.
+        now (int): Current time in ms.
 
-    @defer.inlineCallbacks
-    def incoming_presence(self, origin, content):
-        """Handle an incoming m.presence EDU.
-        For each presence update in the "push" list update our local cache and
-        notify the appropriate local clients. Only clients that share a room
-        or are directly subscribed to the presence for a user should be
-        notified of the update.
-        For each subscription request in the "poll" list start pushing presence
-        updates to the remote server.
-        For unsubscribe request in the "unpoll" list stop pushing presence
-        updates to the remote server.
+    Returns:
+        List of UserPresenceState updates
+    """
+    changes = {}  # Actual changes we need to notify people about
 
-        Args:
-            orgin(str): The source of this m.presence EDU.
-            content(dict): The content of this m.presence EDU.
-        Returns:
-            A Deferred.
-        """
-        deferreds = []
+    for state in user_states:
+        is_mine = is_mine_fn(state.user_id)
 
-        for push in content.get("push", []):
-            user = UserID.from_string(push["user_id"])
+        new_state = handle_timeout(state, is_mine, user_to_num_current_syncs, now)
+        if new_state:
+            changes[state.user_id] = new_state
 
-            logger.debug("Incoming presence update from %s", user)
+    return changes.values()
 
-            observers = set(self._remote_recvmap.get(user, set()))
-            if observers:
-                logger.debug(
-                    " | %d interested local observers %r", len(observers), observers
-                )
 
-            room_ids = yield self.get_joined_rooms_for_user(user)
-            if room_ids:
-                logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)
+def handle_timeout(state, is_mine, user_to_num_current_syncs, now):
+    """Checks the presence of the user to see if any of the timers have elapsed
 
-            state = dict(push)
-            del state["user_id"]
+    Args:
+        state (UserPresenceState)
+        is_mine (bool): Whether the user is ours
+        user_to_num_current_syncs (dict): Mapping of user_id to number of currently
+            active syncs.
+        now (int): Current time in ms.
 
-            if "presence" not in state:
-                logger.warning(
-                    "Received a presence 'push' EDU from %s without a"
-                    " 'presence' key", origin
+    Returns:
+        A UserPresenceState update or None if no update.
+    """
+    if state.state == PresenceState.OFFLINE:
+        # No timeouts are associated with offline states.
+        return None
+
+    changed = False
+    user_id = state.user_id
+
+    if is_mine:
+        if state.state == PresenceState.ONLINE:
+            if now - state.last_active_ts > IDLE_TIMER:
+                # Currently online, but last activity ages ago so auto
+                # idle
+                state = state.copy_and_replace(
+                    state=PresenceState.UNAVAILABLE,
                 )
-                continue
-
-            if "last_active_ago" in state:
-                state["last_active"] = int(
-                    self.clock.time_msec() - state.pop("last_active_ago")
+                changed = True
+            elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+                # So that we send down a notification that we've
+                # stopped updating.
+                changed = True
+
+        if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
+            # Need to send ping to other servers to ensure they don't
+            # timeout and set us to offline
+            changed = True
+
+        # If there are have been no sync for a while (and none ongoing),
+        # set presence to offline
+        if not user_to_num_current_syncs.get(user_id, 0):
+            if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+                state = state.copy_and_replace(
+                    state=PresenceState.OFFLINE,
+                    status_msg=None,
                 )
-
-            self._user_cachemap_latest_serial += 1
-            yield self.update_presence_cache(user, state, room_ids=room_ids)
-
-            if not observers and not room_ids:
-                logger.debug(" | no interested observers or room IDs")
-                continue
-
-            self.push_update_to_clients(
-                users_to_push=observers, room_ids=room_ids
+                changed = True
+    else:
+        # We expect to be poked occaisonally by the other side.
+        # This is to protect against forgetful/buggy servers, so that
+        # no one gets stuck online forever.
+        if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
+            # The other side seems to have disappeared.
+            state = state.copy_and_replace(
+                state=PresenceState.OFFLINE,
+                status_msg=None,
             )
+            changed = True
 
-            user_id = user.to_string()
-
-            if state["presence"] == PresenceState.OFFLINE:
-                self._remote_offline_serials.insert(
-                    0,
-                    (self._user_cachemap_latest_serial, set([user_id]))
-                )
-                while len(self._remote_offline_serials) > MAX_OFFLINE_SERIALS:
-                    self._remote_offline_serials.pop()  # remove the oldest
-                if user in self._user_cachemap:
-                    del self._user_cachemap[user]
-            else:
-                # Remove the user from remote_offline_serials now that they're
-                # no longer offline
-                for idx, elem in enumerate(self._remote_offline_serials):
-                    (_, user_ids) = elem
-                    user_ids.discard(user_id)
-                    if not user_ids:
-                        self._remote_offline_serials.pop(idx)
-
-        for poll in content.get("poll", []):
-            user = UserID.from_string(poll)
-
-            if not self.hs.is_mine(user):
-                continue
-
-            # TODO(paul) permissions checks
-
-            if user not in self._remote_sendmap:
-                self._remote_sendmap[user] = set()
-
-            self._remote_sendmap[user].add(origin)
-
-            deferreds.append(self._push_presence_remote(user, origin))
-
-        for unpoll in content.get("unpoll", []):
-            user = UserID.from_string(unpoll)
+    return state if changed else None
 
-            if not self.hs.is_mine(user):
-                continue
-
-            if user in self._remote_sendmap:
-                self._remote_sendmap[user].remove(origin)
-
-                if not self._remote_sendmap[user]:
-                    del self._remote_sendmap[user]
-
-        yield defer.DeferredList(deferreds, consumeErrors=True)
-
-    @defer.inlineCallbacks
-    def update_presence_cache(self, user, state={}, room_ids=None,
-                              add_to_cache=True):
-        """Update the presence cache for a user with a new state and bump the
-        serial to the latest value.
-
-        Args:
-            user(UserID): The user being updated
-            state(dict): The presence state being updated
-            room_ids(None or list of str): A list of room_ids to update. If
-                room_ids is None then fetch the list of room_ids the user is
-                joined to.
-            add_to_cache: Whether to add an entry to the presence cache if the
-                user isn't already in the cache.
-        Returns:
-            A Deferred UserPresenceCache for the user being updated.
-        """
-        if room_ids is None:
-            room_ids = yield self.get_joined_rooms_for_user(user)
-
-        for room_id in room_ids:
-            self._room_serials[room_id] = self._user_cachemap_latest_serial
-        if add_to_cache:
-            statuscache = self._get_or_make_usercache(user)
-        else:
-            statuscache = self._get_or_offline_usercache(user)
-        statuscache.update(state, serial=self._user_cachemap_latest_serial)
-        defer.returnValue(statuscache)
-
-    @defer.inlineCallbacks
-    def push_update_to_local_and_remote(self, observed_user, statuscache,
-                                        users_to_push=[], room_ids=[],
-                                        remote_domains=[]):
-        """Notify local clients and remote servers of a change in the presence
-        of a user.
 
-        Args:
-            observed_user(UserID): The user to push the presence state for.
-            statuscache(UserPresenceCache): The cache for the presence state to
-                push.
-            users_to_push([UserID]): A list of local and remote users to
-                notify.
-            room_ids([str]): Notify the local and remote occupants of these
-                rooms.
-            remote_domains([str]): A list of remote servers to notify in
-                addition to those implied by the users_to_push and the
-                room_ids.
-        Returns:
-            A Deferred.
-        """
-
-        localusers, remoteusers = partitionbool(
-            users_to_push,
-            lambda u: self.hs.is_mine(u)
-        )
+def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
+    """Given a presence update:
+        1. Add any appropriate timers.
+        2. Check if we should notify anyone.
 
-        localusers = set(localusers)
+    Args:
+        prev_state (UserPresenceState)
+        new_state (UserPresenceState)
+        is_mine (bool): Whether the user is ours
+        wheel_timer (WheelTimer)
+        now (int): Time now in ms
 
-        self.push_update_to_clients(
-            users_to_push=localusers, room_ids=room_ids
-        )
-
-        remote_domains = set(remote_domains)
-        remote_domains |= set([r.domain for r in remoteusers])
-        for room_id in room_ids:
-            remote_domains.update(
-                (yield self.store.get_joined_hosts_for_room(room_id))
-            )
-
-        remote_domains.discard(self.hs.hostname)
-
-        deferreds = []
-        for domain in remote_domains:
-            logger.debug(" | push to remote domain %s", domain)
-            deferreds.append(
-                self._push_presence_remote(
-                    observed_user, domain, state=statuscache.get_state()
-                )
+    Returns:
+        3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
+            - new_state: is the state to actually persist
+            - persist_and_notify (bool): whether to persist and notify people
+            - federation_ping (bool): whether we should send a ping over federation
+    """
+    user_id = new_state.user_id
+
+    persist_and_notify = False
+    federation_ping = False
+
+    # If the users are ours then we want to set up a bunch of timers
+    # to time things out.
+    if is_mine:
+        if new_state.state == PresenceState.ONLINE:
+            # Idle timer
+            wheel_timer.insert(
+                now=now,
+                obj=user_id,
+                then=new_state.last_active_ts + IDLE_TIMER
             )
 
-        yield defer.DeferredList(deferreds, consumeErrors=True)
-
-        defer.returnValue((localusers, remote_domains))
-
-    def push_update_to_clients(self, users_to_push=[], room_ids=[]):
-        """Notify clients of a new presence event.
-
-        Args:
-            users_to_push([UserID]): List of users to notify.
-            room_ids([str]): List of room_ids to notify.
-        """
-        with PreserveLoggingContext():
-            self.notifier.on_new_event(
-                "presence_key",
-                self._user_cachemap_latest_serial,
-                users_to_push,
-                room_ids,
+            active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
+            new_state = new_state.copy_and_replace(
+                currently_active=active,
             )
 
-    @defer.inlineCallbacks
-    def _push_presence_remote(self, user, destination, state=None):
-        """Push a user's presence to a remote server. If a presence state event
-        that event is sent. Otherwise a new state event is constructed from the
-        stored presence state.
-        The last_active is replaced with last_active_ago in case the wallclock
-        time on the remote server is different to the time on this server.
-        Sends an EDU to the remote server with the current presence state.
-
-        Args:
-            user(UserID): The user to push the presence state for.
-            destination(str): The remote server to send state to.
-            state(dict): The state to push, or None to use the current stored
-                state.
-        Returns:
-            A Deferred.
-        """
-        if state is None:
-            state = yield self.store.get_presence_state(user.localpart)
-            del state["mtime"]
-            state["presence"] = state.pop("state")
-
-            if user in self._user_cachemap:
-                state["last_active"] = (
-                    self._user_cachemap[user].get_state()["last_active"]
+            if active:
+                wheel_timer.insert(
+                    now=now,
+                    obj=user_id,
+                    then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY
                 )
 
-            yield collect_presencelike_data(self.distributor, user, state)
-
-        if "last_active" in state:
-            state = dict(state)
-            state["last_active_ago"] = int(
-                self.clock.time_msec() - state.pop("last_active")
-            )
-
-        user_state = {"user_id": user.to_string(), }
-        user_state.update(state)
-
-        yield self.federation.send_edu(
-            destination=destination,
-            edu_type="m.presence",
-            content={"push": [user_state, ], }
-        )
-
-
-class PresenceEventSource(object):
-    def __init__(self, hs):
-        self.hs = hs
-        self.clock = hs.get_clock()
-
-    @defer.inlineCallbacks
-    @log_function
-    def get_new_events(self, user, from_key, room_ids=None, **kwargs):
-        from_key = int(from_key)
-        room_ids = room_ids or []
-
-        presence = self.hs.get_handlers().presence_handler
-        cachemap = presence._user_cachemap
-
-        max_serial = presence._user_cachemap_latest_serial
-
-        clock = self.clock
-        latest_serial = 0
-
-        user_ids_to_check = {user}
-        presence_list = yield presence.store.get_presence_list(
-            user.localpart, accepted=True
-        )
-        if presence_list is not None:
-            user_ids_to_check |= set(
-                UserID.from_string(p["observed_user_id"]) for p in presence_list
+        if new_state.state != PresenceState.OFFLINE:
+            # User has stopped syncing
+            wheel_timer.insert(
+                now=now,
+                obj=user_id,
+                then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
             )
-        for room_id in set(room_ids) & set(presence._room_serials):
-            if presence._room_serials[room_id] > from_key:
-                joined = yield presence.get_joined_users_for_room_id(room_id)
-                user_ids_to_check |= set(joined)
-
-        updates = []
-        for observed_user in user_ids_to_check & set(cachemap):
-            cached = cachemap[observed_user]
-
-            if cached.serial <= from_key or cached.serial > max_serial:
-                continue
-
-            latest_serial = max(cached.serial, latest_serial)
-            updates.append(cached.make_event(user=observed_user, clock=clock))
-
-        # TODO(paul): limit
-
-        for serial, user_ids in presence._remote_offline_serials:
-            if serial <= from_key:
-                break
-
-            if serial > max_serial:
-                continue
-
-            latest_serial = max(latest_serial, serial)
-            for u in user_ids:
-                updates.append({
-                    "type": "m.presence",
-                    "content": {"user_id": u, "presence": PresenceState.OFFLINE},
-                })
-        # TODO(paul): For the v2 API we want to tell the client their from_key
-        #   is too old if we fell off the end of the _remote_offline_serials
-        #   list, and get them to invalidate+resync. In v1 we have no such
-        #   concept so this is a best-effort result.
-
-        if updates:
-            defer.returnValue((updates, latest_serial))
-        else:
-            defer.returnValue(([], presence._user_cachemap_latest_serial))
-
-    def get_current_key(self):
-        presence = self.hs.get_handlers().presence_handler
-        return presence._user_cachemap_latest_serial
-
-    @defer.inlineCallbacks
-    def get_pagination_rows(self, user, pagination_config, key):
-        # TODO (erikj): Does this make sense? Ordering?
-
-        from_key = int(pagination_config.from_key)
-
-        if pagination_config.to_key:
-            to_key = int(pagination_config.to_key)
-        else:
-            to_key = -1
 
-        presence = self.hs.get_handlers().presence_handler
-        cachemap = presence._user_cachemap
+            last_federate = new_state.last_federation_update_ts
+            if now - last_federate > FEDERATION_PING_INTERVAL:
+                # Been a while since we've poked remote servers
+                new_state = new_state.copy_and_replace(
+                    last_federation_update_ts=now,
+                )
+                federation_ping = True
 
-        user_ids_to_check = {user}
-        presence_list = yield presence.store.get_presence_list(
-            user.localpart, accepted=True
+    else:
+        wheel_timer.insert(
+            now=now,
+            obj=user_id,
+            then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
         )
-        if presence_list is not None:
-            user_ids_to_check |= set(
-                UserID.from_string(p["observed_user_id"]) for p in presence_list
-            )
-        room_ids = yield presence.get_joined_rooms_for_user(user)
-        for room_id in set(room_ids) & set(presence._room_serials):
-            if presence._room_serials[room_id] >= from_key:
-                joined = yield presence.get_joined_users_for_room_id(room_id)
-                user_ids_to_check |= set(joined)
-
-        updates = []
-        for observed_user in user_ids_to_check & set(cachemap):
-            if not (to_key < cachemap[observed_user].serial <= from_key):
-                continue
-
-            updates.append((observed_user, cachemap[observed_user]))
-
-        # TODO(paul): limit
-
-        if updates:
-            clock = self.clock
-
-            earliest_serial = max([x[1].serial for x in updates])
-            data = [x[1].make_event(user=x[0], clock=clock) for x in updates]
-
-            defer.returnValue((data, earliest_serial))
-        else:
-            defer.returnValue(([], 0))
 
-
-class UserPresenceCache(object):
-    """Store an observed user's state and status message.
-
-    Includes the update timestamp.
-    """
-    def __init__(self):
-        self.state = {"presence": PresenceState.OFFLINE}
-        self.serial = None
-
-    def __repr__(self):
-        return "UserPresenceCache(state=%r, serial=%r)" % (
-            self.state, self.serial
+    # Check whether the change was something worth notifying about
+    if should_notify(prev_state, new_state):
+        new_state = new_state.copy_and_replace(
+            last_federation_update_ts=now,
         )
+        persist_and_notify = True
 
-    def update(self, state, serial):
-        assert("mtime_age" not in state)
-
-        self.state.update(state)
-        # Delete keys that are now 'None'
-        for k in self.state.keys():
-            if self.state[k] is None:
-                del self.state[k]
-
-        self.serial = serial
-
-        if "status_msg" in state:
-            self.status_msg = state["status_msg"]
-        else:
-            self.status_msg = None
-
-    def get_state(self):
-        # clone it so caller can't break our cache
-        state = dict(self.state)
-        return state
-
-    def make_event(self, user, clock):
-        content = self.get_state()
-        content["user_id"] = user.to_string()
-
-        if "last_active" in content:
-            content["last_active_ago"] = int(
-                clock.time_msec() - content.pop("last_active")
-            )
-
-        return {"type": "m.presence", "content": content}
+    return new_state, persist_and_notify, federation_ping
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 629e6e3594..b45eafbb49 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -16,8 +16,7 @@
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError, CodeMessageException
-from synapse.api.constants import EventTypes, Membership
-from synapse.types import UserID
+from synapse.types import UserID, Requester
 from synapse.util import unwrapFirstError
 
 from ._base import BaseHandler
@@ -49,6 +48,9 @@ class ProfileHandler(BaseHandler):
         distributor = hs.get_distributor()
         self.distributor = distributor
 
+        distributor.declare("collect_presencelike_data")
+        distributor.declare("changed_presencelike_data")
+
         distributor.observe("registered_user", self.registered_user)
 
         distributor.observe(
@@ -87,13 +89,13 @@ class ProfileHandler(BaseHandler):
                 defer.returnValue(result["displayname"])
 
     @defer.inlineCallbacks
-    def set_displayname(self, target_user, auth_user, new_displayname):
+    def set_displayname(self, target_user, requester, new_displayname):
         """target_user is the user whose displayname is to be changed;
         auth_user is the user attempting to make this change."""
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if target_user != auth_user:
+        if target_user != requester.user:
             raise AuthError(400, "Cannot set another user's displayname")
 
         if new_displayname == '':
@@ -107,7 +109,7 @@ class ProfileHandler(BaseHandler):
             "displayname": new_displayname,
         })
 
-        yield self._update_join_states(target_user)
+        yield self._update_join_states(requester)
 
     @defer.inlineCallbacks
     def get_avatar_url(self, target_user):
@@ -137,13 +139,13 @@ class ProfileHandler(BaseHandler):
             defer.returnValue(result["avatar_url"])
 
     @defer.inlineCallbacks
-    def set_avatar_url(self, target_user, auth_user, new_avatar_url):
+    def set_avatar_url(self, target_user, requester, new_avatar_url):
         """target_user is the user whose avatar_url is to be changed;
         auth_user is the user attempting to make this change."""
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "User is not hosted on this Home Server")
 
-        if target_user != auth_user:
+        if target_user != requester.user:
             raise AuthError(400, "Cannot set another user's avatar_url")
 
         yield self.store.set_profile_avatar_url(
@@ -154,7 +156,7 @@ class ProfileHandler(BaseHandler):
             "avatar_url": new_avatar_url,
         })
 
-        yield self._update_join_states(target_user)
+        yield self._update_join_states(requester)
 
     @defer.inlineCallbacks
     def collect_presencelike_data(self, user, state):
@@ -197,32 +199,30 @@ class ProfileHandler(BaseHandler):
         defer.returnValue(response)
 
     @defer.inlineCallbacks
-    def _update_join_states(self, user):
+    def _update_join_states(self, requester):
+        user = requester.user
         if not self.hs.is_mine(user):
             return
 
-        self.ratelimit(user.to_string())
+        self.ratelimit(requester)
 
         joins = yield self.store.get_rooms_for_user(
             user.to_string(),
         )
 
         for j in joins:
-            content = {
-                "membership": Membership.JOIN,
-            }
-
-            yield collect_presencelike_data(self.distributor, user, content)
-
-            msg_handler = self.hs.get_handlers().message_handler
+            handler = self.hs.get_handlers().room_member_handler
             try:
-                yield msg_handler.create_and_send_event({
-                    "type": EventTypes.Member,
-                    "room_id": j.room_id,
-                    "state_key": user.to_string(),
-                    "content": content,
-                    "sender": user.to_string()
-                }, ratelimit=False)
+                # Assume the user isn't a guest because we don't let guests set
+                # profile or avatar data.
+                requester = Requester(user, "", False)
+                yield handler.update_membership(
+                    requester,
+                    user,
+                    j.room_id,
+                    "join",  # We treat a profile update like a join.
+                    ratelimit=False,  # Try to hide that these events aren't atomic.
+                )
             except Exception as e:
                 logger.warn(
                     "Failed to update join event for room %s - %s",
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index de4c694714..935c339707 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -36,8 +36,6 @@ class ReceiptsHandler(BaseHandler):
         )
         self.clock = self.hs.get_clock()
 
-        self._receipt_cache = None
-
     @defer.inlineCallbacks
     def received_client_receipt(self, room_id, receipt_type, user_id,
                                 event_id):
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 1e99c1303c..f287ee247b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -21,7 +21,6 @@ from synapse.api.errors import (
     AuthError, Codes, SynapseError, RegistrationError, InvalidCaptchaError
 )
 from ._base import BaseHandler
-import synapse.util.stringutils as stringutils
 from synapse.util.async import run_on_reactor
 from synapse.http.client import CaptchaServerHttpClient
 
@@ -45,21 +44,33 @@ class RegistrationHandler(BaseHandler):
         self.distributor.declare("registered_user")
         self.captcha_client = CaptchaServerHttpClient(hs)
 
+        self._next_generated_user_id = None
+
     @defer.inlineCallbacks
-    def check_username(self, localpart, guest_access_token=None):
+    def check_username(self, localpart, guest_access_token=None,
+                       assigned_user_id=None):
         yield run_on_reactor()
 
         if urllib.quote(localpart.encode('utf-8')) != localpart:
             raise SynapseError(
                 400,
-                "User ID can only contain characters a-z, 0-9, or '-./'",
+                "User ID can only contain characters a-z, 0-9, or '_-./'",
                 Codes.INVALID_USERNAME
             )
 
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
 
-        yield self.check_user_id_is_valid(user_id)
+        if assigned_user_id:
+            if user_id == assigned_user_id:
+                return
+            else:
+                raise SynapseError(
+                    400,
+                    "A different user ID has already been registered for this session",
+                )
+
+        yield self.check_user_id_not_appservice_exclusive(user_id)
 
         users = yield self.store.get_users_by_id_case_insensitive(user_id)
         if users:
@@ -91,7 +102,7 @@ class RegistrationHandler(BaseHandler):
 
         Args:
             localpart : The local part of the user ID to register. If None,
-              one will be randomly generated.
+              one will be generated.
             password (str) : The password to assign to this user so they can
             login again. This can be None which means they cannot login again
             via a password (e.g. the user is an application service user).
@@ -108,6 +119,18 @@ class RegistrationHandler(BaseHandler):
         if localpart:
             yield self.check_username(localpart, guest_access_token=guest_access_token)
 
+            was_guest = guest_access_token is not None
+
+            if not was_guest:
+                try:
+                    int(localpart)
+                    raise RegistrationError(
+                        400,
+                        "Numeric user IDs are reserved for guest users."
+                    )
+                except ValueError:
+                    pass
+
             user = UserID(localpart, self.hs.hostname)
             user_id = user.to_string()
 
@@ -118,38 +141,37 @@ class RegistrationHandler(BaseHandler):
                 user_id=user_id,
                 token=token,
                 password_hash=password_hash,
-                was_guest=guest_access_token is not None,
+                was_guest=was_guest,
                 make_guest=make_guest,
             )
 
             yield registered_user(self.distributor, user)
         else:
-            # autogen a random user ID
+            # autogen a sequential user ID
             attempts = 0
-            user_id = None
             token = None
-            while not user_id:
+            user = None
+            while not user:
+                localpart = yield self._generate_user_id(attempts > 0)
+                user = UserID(localpart, self.hs.hostname)
+                user_id = user.to_string()
+                yield self.check_user_id_not_appservice_exclusive(user_id)
+                if generate_token:
+                    token = self.auth_handler().generate_access_token(user_id)
                 try:
-                    localpart = self._generate_user_id()
-                    user = UserID(localpart, self.hs.hostname)
-                    user_id = user.to_string()
-                    yield self.check_user_id_is_valid(user_id)
-                    if generate_token:
-                        token = self.auth_handler().generate_access_token(user_id)
                     yield self.store.register(
                         user_id=user_id,
                         token=token,
-                        password_hash=password_hash)
-
-                    yield registered_user(self.distributor, user)
+                        password_hash=password_hash,
+                        make_guest=make_guest
+                    )
                 except SynapseError:
                     # if user id is taken, just generate another
+                    user = None
                     user_id = None
                     token = None
                     attempts += 1
-                    if attempts > 5:
-                        raise RegistrationError(
-                            500, "Cannot generate user ID.")
+            yield registered_user(self.distributor, user)
 
         # We used to generate default identicons here, but nowadays
         # we want clients to generate their own as part of their branding
@@ -169,13 +191,21 @@ class RegistrationHandler(BaseHandler):
                 400, "Invalid user localpart for this application service.",
                 errcode=Codes.EXCLUSIVE
             )
+
+        service_id = service.id if service.is_exclusive_user(user_id) else None
+
+        yield self.check_user_id_not_appservice_exclusive(
+            user_id, allowed_appservice=service
+        )
+
         token = self.auth_handler().generate_access_token(user_id)
         yield self.store.register(
             user_id=user_id,
             token=token,
-            password_hash=""
+            password_hash="",
+            appservice_id=service_id,
         )
-        registered_user(self.distributor, user)
+        yield registered_user(self.distributor, user)
         defer.returnValue((user_id, token))
 
     @defer.inlineCallbacks
@@ -211,11 +241,11 @@ class RegistrationHandler(BaseHandler):
                 400,
                 "User ID must only contain characters which do not"
                 " require URL encoding."
-                )
+            )
         user = UserID(localpart, self.hs.hostname)
         user_id = user.to_string()
 
-        yield self.check_user_id_is_valid(user_id)
+        yield self.check_user_id_not_appservice_exclusive(user_id)
         token = self.auth_handler().generate_access_token(user_id)
         try:
             yield self.store.register(
@@ -224,7 +254,7 @@ class RegistrationHandler(BaseHandler):
                 password_hash=None
             )
             yield registered_user(self.distributor, user)
-        except Exception, e:
+        except Exception as e:
             yield self.store.add_access_token_to_user(user_id, token)
             # Ignore Registration errors
             logger.exception(e)
@@ -267,12 +297,14 @@ class RegistrationHandler(BaseHandler):
             yield identity_handler.bind_threepid(c, user_id)
 
     @defer.inlineCallbacks
-    def check_user_id_is_valid(self, user_id):
+    def check_user_id_not_appservice_exclusive(self, user_id, allowed_appservice=None):
         # valid user IDs must not clash with any user ID namespaces claimed by
         # application services.
         services = yield self.store.get_app_services()
         interested_services = [
-            s for s in services if s.is_interested_in_user(user_id)
+            s for s in services
+            if s.is_interested_in_user(user_id)
+            and s != allowed_appservice
         ]
         for service in interested_services:
             if service.is_exclusive_user(user_id):
@@ -281,8 +313,16 @@ class RegistrationHandler(BaseHandler):
                     errcode=Codes.EXCLUSIVE
                 )
 
-    def _generate_user_id(self):
-        return "-" + stringutils.random_string(18)
+    @defer.inlineCallbacks
+    def _generate_user_id(self, reseed=False):
+        if reseed or self._next_generated_user_id is None:
+            self._next_generated_user_id = (
+                yield self.store.find_next_generated_user_id_localpart()
+            )
+
+        id = self._next_generated_user_id
+        self._next_generated_user_id += 1
+        defer.returnValue(str(id))
 
     @defer.inlineCallbacks
     def _validate_captcha(self, ip_addr, private_key, challenge, response):
@@ -323,3 +363,18 @@ class RegistrationHandler(BaseHandler):
 
     def auth_handler(self):
         return self.hs.get_handlers().auth_handler
+
+    @defer.inlineCallbacks
+    def guest_access_token_for(self, medium, address, inviter_user_id):
+        access_token = yield self.store.get_3pid_guest_access_token(medium, address)
+        if access_token:
+            defer.returnValue(access_token)
+
+        _, access_token = yield self.register(
+            generate_token=True,
+            make_guest=True
+        )
+        access_token = yield self.store.save_or_get_3pid_guest_access_token(
+            medium, address, access_token, inviter_user_id
+        )
+        defer.returnValue(access_token)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 58e2d25f97..051468989f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -18,13 +18,13 @@ from twisted.internet import defer
 
 from ._base import BaseHandler
 
-from synapse.types import UserID, RoomAlias, RoomID
+from synapse.types import UserID, RoomAlias, RoomID, RoomStreamToken, Requester
 from synapse.api.constants import (
     EventTypes, Membership, JoinRules, RoomCreationPreset,
 )
 from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
 from synapse.util import stringutils, unwrapFirstError
-from synapse.util.async import run_on_reactor
+from synapse.util.logcontext import preserve_context_over_fn
 
 from signedjson.sign import verify_signed_json
 from signedjson.key import decode_verify_key_bytes
@@ -41,16 +41,18 @@ logger = logging.getLogger(__name__)
 id_server_scheme = "https://"
 
 
-def collect_presencelike_data(distributor, user, content):
-    return distributor.fire("collect_presencelike_data", user, content)
-
-
 def user_left_room(distributor, user, room_id):
-    return distributor.fire("user_left_room", user=user, room_id=room_id)
+    return preserve_context_over_fn(
+        distributor.fire,
+        "user_left_room", user=user, room_id=room_id
+    )
 
 
 def user_joined_room(distributor, user, room_id):
-    return distributor.fire("user_joined_room", user=user, room_id=room_id)
+    return preserve_context_over_fn(
+        distributor.fire,
+        "user_joined_room", user=user, room_id=room_id
+    )
 
 
 class RoomCreationHandler(BaseHandler):
@@ -74,21 +76,21 @@ class RoomCreationHandler(BaseHandler):
     }
 
     @defer.inlineCallbacks
-    def create_room(self, user_id, room_id, config):
+    def create_room(self, requester, config):
         """ Creates a new room.
 
         Args:
-            user_id (str): The ID of the user creating the new room.
-            room_id (str): The proposed ID for the new room. Can be None, in
-            which case one will be created for you.
+            requester (Requester): The user who requested the room creation.
             config (dict) : A dict of configuration options.
         Returns:
             The new room ID.
         Raises:
-            SynapseError if the room ID was taken, couldn't be stored, or
-            something went horribly wrong.
+            SynapseError if the room ID couldn't be stored, or something went
+            horribly wrong.
         """
-        self.ratelimit(user_id)
+        user_id = requester.user.to_string()
+
+        self.ratelimit(requester)
 
         if "room_alias_name" in config:
             for wchar in string.whitespace:
@@ -119,40 +121,28 @@ class RoomCreationHandler(BaseHandler):
 
         is_public = config.get("visibility", None) == "public"
 
-        if room_id:
-            # Ensure room_id is the correct type
-            room_id_obj = RoomID.from_string(room_id)
-            if not self.hs.is_mine(room_id_obj):
-                raise SynapseError(400, "Room id must be local")
-
-            yield self.store.store_room(
-                room_id=room_id,
-                room_creator_user_id=user_id,
-                is_public=is_public
-            )
-        else:
-            # autogen room IDs and try to create it. We may clash, so just
-            # try a few times till one goes through, giving up eventually.
-            attempts = 0
-            room_id = None
-            while attempts < 5:
-                try:
-                    random_string = stringutils.random_string(18)
-                    gen_room_id = RoomID.create(
-                        random_string,
-                        self.hs.hostname,
-                    )
-                    yield self.store.store_room(
-                        room_id=gen_room_id.to_string(),
-                        room_creator_user_id=user_id,
-                        is_public=is_public
-                    )
-                    room_id = gen_room_id.to_string()
-                    break
-                except StoreError:
-                    attempts += 1
-            if not room_id:
-                raise StoreError(500, "Couldn't generate a room ID.")
+        # autogen room IDs and try to create it. We may clash, so just
+        # try a few times till one goes through, giving up eventually.
+        attempts = 0
+        room_id = None
+        while attempts < 5:
+            try:
+                random_string = stringutils.random_string(18)
+                gen_room_id = RoomID.create(
+                    random_string,
+                    self.hs.hostname,
+                )
+                yield self.store.store_room(
+                    room_id=gen_room_id.to_string(),
+                    room_creator_user_id=user_id,
+                    is_public=is_public
+                )
+                room_id = gen_room_id.to_string()
+                break
+            except StoreError:
+                attempts += 1
+        if not room_id:
+            raise StoreError(500, "Couldn't generate a room ID.")
 
         if room_alias:
             directory_handler = self.hs.get_handlers().directory_handler
@@ -178,9 +168,14 @@ class RoomCreationHandler(BaseHandler):
 
         creation_content = config.get("creation_content", {})
 
-        user = UserID.from_string(user_id)
-        creation_events = self._create_events_for_new_room(
-            user, room_id,
+        msg_handler = self.hs.get_handlers().message_handler
+        room_member_handler = self.hs.get_handlers().room_member_handler
+
+        yield self._send_events_for_new_room(
+            requester,
+            room_id,
+            msg_handler,
+            room_member_handler,
             preset_config=preset_config,
             invite_list=invite_list,
             initial_state=initial_state,
@@ -188,39 +183,40 @@ class RoomCreationHandler(BaseHandler):
             room_alias=room_alias,
         )
 
-        msg_handler = self.hs.get_handlers().message_handler
-
-        for event in creation_events:
-            yield msg_handler.create_and_send_event(event, ratelimit=False)
-
         if "name" in config:
             name = config["name"]
-            yield msg_handler.create_and_send_event({
-                "type": EventTypes.Name,
-                "room_id": room_id,
-                "sender": user_id,
-                "state_key": "",
-                "content": {"name": name},
-            }, ratelimit=False)
+            yield msg_handler.create_and_send_nonmember_event(
+                requester,
+                {
+                    "type": EventTypes.Name,
+                    "room_id": room_id,
+                    "sender": user_id,
+                    "state_key": "",
+                    "content": {"name": name},
+                },
+                ratelimit=False)
 
         if "topic" in config:
             topic = config["topic"]
-            yield msg_handler.create_and_send_event({
-                "type": EventTypes.Topic,
-                "room_id": room_id,
-                "sender": user_id,
-                "state_key": "",
-                "content": {"topic": topic},
-            }, ratelimit=False)
+            yield msg_handler.create_and_send_nonmember_event(
+                requester,
+                {
+                    "type": EventTypes.Topic,
+                    "room_id": room_id,
+                    "sender": user_id,
+                    "state_key": "",
+                    "content": {"topic": topic},
+                },
+                ratelimit=False)
 
         for invitee in invite_list:
-            yield msg_handler.create_and_send_event({
-                "type": EventTypes.Member,
-                "state_key": invitee,
-                "room_id": room_id,
-                "sender": user_id,
-                "content": {"membership": Membership.INVITE},
-            }, ratelimit=False)
+            yield room_member_handler.update_membership(
+                requester,
+                UserID.from_string(invitee),
+                room_id,
+                "invite",
+                ratelimit=False,
+            )
 
         for invite_3pid in invite_3pid_list:
             id_server = invite_3pid["id_server"]
@@ -228,11 +224,11 @@ class RoomCreationHandler(BaseHandler):
             medium = invite_3pid["medium"]
             yield self.hs.get_handlers().room_member_handler.do_3pid_invite(
                 room_id,
-                user,
+                requester.user,
                 medium,
                 address,
                 id_server,
-                token_id=None,
+                requester,
                 txn_id=None,
             )
 
@@ -241,24 +237,24 @@ class RoomCreationHandler(BaseHandler):
         if room_alias:
             result["room_alias"] = room_alias.to_string()
             yield directory_handler.send_room_alias_update_event(
-                user_id, room_id
+                requester, user_id, room_id
             )
 
         defer.returnValue(result)
 
-    def _create_events_for_new_room(self, creator, room_id, preset_config,
-                                    invite_list, initial_state, creation_content,
-                                    room_alias):
-        config = RoomCreationHandler.PRESETS_DICT[preset_config]
-
-        creator_id = creator.to_string()
-
-        event_keys = {
-            "room_id": room_id,
-            "sender": creator_id,
-            "state_key": "",
-        }
-
+    @defer.inlineCallbacks
+    def _send_events_for_new_room(
+            self,
+            creator,  # A Requester object.
+            room_id,
+            msg_handler,
+            room_member_handler,
+            preset_config,
+            invite_list,
+            initial_state,
+            creation_content,
+            room_alias
+    ):
         def create(etype, content, **kwargs):
             e = {
                 "type": etype,
@@ -270,26 +266,43 @@ class RoomCreationHandler(BaseHandler):
 
             return e
 
-        creation_content.update({"creator": creator.to_string()})
-        creation_event = create(
+        @defer.inlineCallbacks
+        def send(etype, content, **kwargs):
+            event = create(etype, content, **kwargs)
+            yield msg_handler.create_and_send_nonmember_event(
+                creator,
+                event,
+                ratelimit=False
+            )
+
+        config = RoomCreationHandler.PRESETS_DICT[preset_config]
+
+        creator_id = creator.user.to_string()
+
+        event_keys = {
+            "room_id": room_id,
+            "sender": creator_id,
+            "state_key": "",
+        }
+
+        creation_content.update({"creator": creator_id})
+        yield send(
             etype=EventTypes.Create,
             content=creation_content,
         )
 
-        join_event = create(
-            etype=EventTypes.Member,
-            state_key=creator_id,
-            content={
-                "membership": Membership.JOIN,
-            },
+        yield room_member_handler.update_membership(
+            creator,
+            creator.user,
+            room_id,
+            "join",
+            ratelimit=False,
         )
 
-        returned_events = [creation_event, join_event]
-
         if (EventTypes.PowerLevels, '') not in initial_state:
             power_level_content = {
                 "users": {
-                    creator.to_string(): 100,
+                    creator_id: 100,
                 },
                 "users_default": 0,
                 "events": {
@@ -311,45 +324,35 @@ class RoomCreationHandler(BaseHandler):
                 for invitee in invite_list:
                     power_level_content["users"][invitee] = 100
 
-            power_levels_event = create(
+            yield send(
                 etype=EventTypes.PowerLevels,
                 content=power_level_content,
             )
 
-            returned_events.append(power_levels_event)
-
         if room_alias and (EventTypes.CanonicalAlias, '') not in initial_state:
-            room_alias_event = create(
+            yield send(
                 etype=EventTypes.CanonicalAlias,
                 content={"alias": room_alias.to_string()},
             )
 
-            returned_events.append(room_alias_event)
-
         if (EventTypes.JoinRules, '') not in initial_state:
-            join_rules_event = create(
+            yield send(
                 etype=EventTypes.JoinRules,
                 content={"join_rule": config["join_rules"]},
             )
 
-            returned_events.append(join_rules_event)
-
         if (EventTypes.RoomHistoryVisibility, '') not in initial_state:
-            history_event = create(
+            yield send(
                 etype=EventTypes.RoomHistoryVisibility,
                 content={"history_visibility": config["history_visibility"]}
             )
 
-            returned_events.append(history_event)
-
         for (etype, state_key), content in initial_state.items():
-            returned_events.append(create(
+            yield send(
                 etype=etype,
                 state_key=state_key,
                 content=content,
-            ))
-
-        return returned_events
+            )
 
 
 class RoomMemberHandler(BaseHandler):
@@ -397,16 +400,35 @@ class RoomMemberHandler(BaseHandler):
                     remotedomains.add(member.domain)
 
     @defer.inlineCallbacks
-    def update_membership(self, requester, target, room_id, action, txn_id=None):
+    def update_membership(
+            self,
+            requester,
+            target,
+            room_id,
+            action,
+            txn_id=None,
+            remote_room_hosts=None,
+            third_party_signed=None,
+            ratelimit=True,
+    ):
         effective_membership_state = action
         if action in ["kick", "unban"]:
             effective_membership_state = "leave"
         elif action == "forget":
             effective_membership_state = "leave"
 
+        if third_party_signed is not None:
+            replication = self.hs.get_replication_layer()
+            yield replication.exchange_third_party_invite(
+                third_party_signed["sender"],
+                target.to_string(),
+                room_id,
+                third_party_signed,
+            )
+
         msg_handler = self.hs.get_handlers().message_handler
 
-        content = {"membership": unicode(effective_membership_state)}
+        content = {"membership": effective_membership_state}
         if requester.is_guest:
             content["kind"] = "guest"
 
@@ -417,6 +439,9 @@ class RoomMemberHandler(BaseHandler):
                 "room_id": room_id,
                 "sender": requester.user.to_string(),
                 "state_key": target.to_string(),
+
+                # For backwards compatibility:
+                "membership": effective_membership_state,
             },
             token_id=requester.access_token_id,
             txn_id=txn_id,
@@ -437,202 +462,200 @@ class RoomMemberHandler(BaseHandler):
                 errcode=Codes.BAD_STATE
             )
 
-        yield msg_handler.send_event(
+        member_handler = self.hs.get_handlers().room_member_handler
+        yield member_handler.send_membership_event(
+            requester,
             event,
             context,
-            ratelimit=True,
-            is_guest=requester.is_guest
+            ratelimit=ratelimit,
+            remote_room_hosts=remote_room_hosts,
         )
 
         if action == "forget":
             yield self.forget(requester.user, room_id)
 
     @defer.inlineCallbacks
-    def send_membership_event(self, event, context, is_guest=False):
-        """ Change the membership status of a user in a room.
+    def send_membership_event(
+            self,
+            requester,
+            event,
+            context,
+            remote_room_hosts=None,
+            ratelimit=True,
+    ):
+        """
+        Change the membership status of a user in a room.
 
         Args:
-            event (SynapseEvent): The membership event
+            requester (Requester): The local user who requested the membership
+                event. If None, certain checks, like whether this homeserver can
+                act as the sender, will be skipped.
+            event (SynapseEvent): The membership event.
+            context: The context of the event.
+            is_guest (bool): Whether the sender is a guest.
+            room_hosts ([str]): Homeservers which are likely to already be in
+                the room, and could be danced with in order to join this
+                homeserver for the first time.
+            ratelimit (bool): Whether to rate limit this request.
         Raises:
             SynapseError if there was a problem changing the membership.
         """
-        target_user_id = event.state_key
-
-        prev_state = context.current_state.get(
-            (EventTypes.Member, target_user_id),
-            None
-        )
+        remote_room_hosts = remote_room_hosts or []
 
+        target_user = UserID.from_string(event.state_key)
         room_id = event.room_id
 
-        # If we're trying to join a room then we have to do this differently
-        # if this HS is not currently in the room, i.e. we have to do the
-        # invite/join dance.
+        if requester is not None:
+            sender = UserID.from_string(event.sender)
+            assert sender == requester.user, (
+                "Sender (%s) must be same as requester (%s)" %
+                (sender, requester.user)
+            )
+            assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
+        else:
+            requester = Requester(target_user, None, False)
+
+        message_handler = self.hs.get_handlers().message_handler
+        prev_event = message_handler.deduplicate_state_event(event, context)
+        if prev_event is not None:
+            return
+
+        action = "send"
+
         if event.membership == Membership.JOIN:
-            if is_guest:
-                guest_access = context.current_state.get(
-                    (EventTypes.GuestAccess, ""),
-                    None
-                )
-                is_guest_access_allowed = (
-                    guest_access
-                    and guest_access.content
-                    and "guest_access" in guest_access.content
-                    and guest_access.content["guest_access"] == "can_join"
-                )
-                if not is_guest_access_allowed:
-                    raise AuthError(403, "Guest access not allowed")
+            if requester.is_guest and not self._can_guest_join(context.current_state):
+                # This should be an auth check, but guests are a local concept,
+                # so don't really fit into the general auth process.
+                raise AuthError(403, "Guest access not allowed")
+            do_remote_join_dance, remote_room_hosts = self._should_do_dance(
+                context,
+                (self.get_inviter(event.state_key, context.current_state)),
+                remote_room_hosts,
+            )
+            if do_remote_join_dance:
+                action = "remote_join"
+        elif event.membership == Membership.LEAVE:
+            is_host_in_room = self.is_host_in_room(context.current_state)
+
+            if not is_host_in_room:
+                # perhaps we've been invited
+                inviter = self.get_inviter(target_user.to_string(), context.current_state)
+                if not inviter:
+                    raise SynapseError(404, "Not a known room")
+
+                if self.hs.is_mine(inviter):
+                    # the inviter was on our server, but has now left. Carry on
+                    # with the normal rejection codepath.
+                    #
+                    # This is a bit of a hack, because the room might still be
+                    # active on other servers.
+                    pass
+                else:
+                    # send the rejection to the inviter's HS.
+                    remote_room_hosts = remote_room_hosts + [inviter.domain]
+                    action = "remote_reject"
+
+        federation_handler = self.hs.get_handlers().federation_handler
+
+        if action == "remote_join":
+            if len(remote_room_hosts) == 0:
+                raise SynapseError(404, "No known servers")
 
-            yield self._do_join(event, context)
+            # We don't do an auth check if we are doing an invite
+            # join dance for now, since we're kinda implicitly checking
+            # that we are allowed to join when we decide whether or not we
+            # need to do the invite/join dance.
+            yield federation_handler.do_invite_join(
+                remote_room_hosts,
+                event.room_id,
+                event.user_id,
+                event.content,
+            )
+        elif action == "remote_reject":
+            yield federation_handler.do_remotely_reject_invite(
+                remote_room_hosts,
+                room_id,
+                event.user_id
+            )
         else:
-            if event.membership == Membership.LEAVE:
-                is_host_in_room = yield self.is_host_in_room(room_id, context)
-                if not is_host_in_room:
-                    # Rejecting an invite, rather than leaving a joined room
-                    handler = self.hs.get_handlers().federation_handler
-                    inviter = yield self.get_inviter(event)
-                    if not inviter:
-                        # return the same error as join_room_alias does
-                        raise SynapseError(404, "No known servers")
-                    yield handler.do_remotely_reject_invite(
-                        [inviter.domain],
-                        room_id,
-                        event.user_id
-                    )
-                    defer.returnValue({"room_id": room_id})
-                    return
-
-            # FIXME: This isn't idempotency.
-            if prev_state and prev_state.membership == event.membership:
-                # double same action, treat this event as a NOOP.
-                defer.returnValue({})
-                return
-
-            yield self._do_local_membership_update(
+            yield self.handle_new_client_event(
+                requester,
                 event,
-                context=context,
+                context,
+                extra_users=[target_user],
+                ratelimit=ratelimit,
             )
 
-            if prev_state and prev_state.membership == Membership.JOIN:
-                user = UserID.from_string(event.user_id)
-                user_left_room(self.distributor, user, event.room_id)
-
-        defer.returnValue({"room_id": room_id})
-
-    @defer.inlineCallbacks
-    def join_room_alias(self, joinee, room_alias, content={}):
-        directory_handler = self.hs.get_handlers().directory_handler
-        mapping = yield directory_handler.get_association(room_alias)
-
-        if not mapping:
-            raise SynapseError(404, "No such room alias")
+        prev_member_event = context.current_state.get(
+            (EventTypes.Member, target_user.to_string()),
+            None
+        )
 
-        room_id = mapping["room_id"]
-        hosts = mapping["servers"]
-        if not hosts:
-            raise SynapseError(404, "No known servers")
+        if event.membership == Membership.JOIN:
+            if not prev_member_event or prev_member_event.membership != Membership.JOIN:
+                # Only fire user_joined_room if the user has acutally joined the
+                # room. Don't bother if the user is just changing their profile
+                # info.
+                yield user_joined_room(self.distributor, target_user, room_id)
+        elif event.membership == Membership.LEAVE:
+            if prev_member_event and prev_member_event.membership == Membership.JOIN:
+                user_left_room(self.distributor, target_user, room_id)
+
+    def _can_guest_join(self, current_state):
+        """
+        Returns whether a guest can join a room based on its current state.
+        """
+        guest_access = current_state.get((EventTypes.GuestAccess, ""), None)
+        return (
+            guest_access
+            and guest_access.content
+            and "guest_access" in guest_access.content
+            and guest_access.content["guest_access"] == "can_join"
+        )
 
-        # If event doesn't include a display name, add one.
-        yield collect_presencelike_data(self.distributor, joinee, content)
+    def _should_do_dance(self, context, inviter, room_hosts=None):
+        # TODO: Shouldn't this be remote_room_host?
+        room_hosts = room_hosts or []
 
-        content.update({"membership": Membership.JOIN})
-        builder = self.event_builder_factory.new({
-            "type": EventTypes.Member,
-            "state_key": joinee.to_string(),
-            "room_id": room_id,
-            "sender": joinee.to_string(),
-            "membership": Membership.JOIN,
-            "content": content,
-        })
-        event, context = yield self._create_new_client_event(builder)
+        is_host_in_room = self.is_host_in_room(context.current_state)
+        if is_host_in_room:
+            return False, room_hosts
 
-        yield self._do_join(event, context, room_hosts=hosts)
+        if inviter and not self.hs.is_mine(inviter):
+            room_hosts.append(inviter.domain)
 
-        defer.returnValue({"room_id": room_id})
+        return True, room_hosts
 
     @defer.inlineCallbacks
-    def _do_join(self, event, context, room_hosts=None):
-        room_id = event.room_id
-
-        # XXX: We don't do an auth check if we are doing an invite
-        # join dance for now, since we're kinda implicitly checking
-        # that we are allowed to join when we decide whether or not we
-        # need to do the invite/join dance.
-
-        is_host_in_room = yield self.is_host_in_room(room_id, context)
-        if is_host_in_room:
-            should_do_dance = False
-        elif room_hosts:  # TODO: Shouldn't this be remote_room_host?
-            should_do_dance = True
-        else:
-            inviter = yield self.get_inviter(event)
-            if not inviter:
-                # return the same error as join_room_alias does
-                raise SynapseError(404, "No known servers")
-            should_do_dance = not self.hs.is_mine(inviter)
-            room_hosts = [inviter.domain]
+    def lookup_room_alias(self, room_alias):
+        """
+        Get the room ID associated with a room alias.
 
-        if should_do_dance:
-            handler = self.hs.get_handlers().federation_handler
-            yield handler.do_invite_join(
-                room_hosts,
-                room_id,
-                event.user_id,
-                event.content,
-            )
-        else:
-            logger.debug("Doing normal join")
+        Args:
+            room_alias (RoomAlias): The alias to look up.
+        Returns:
+            A tuple of:
+                The room ID as a RoomID object.
+                Hosts likely to be participating in the room ([str]).
+        Raises:
+            SynapseError if room alias could not be found.
+        """
+        directory_handler = self.hs.get_handlers().directory_handler
+        mapping = yield directory_handler.get_association(room_alias)
 
-            yield self._do_local_membership_update(
-                event,
-                context=context,
-            )
+        if not mapping:
+            raise SynapseError(404, "No such room alias")
 
-        prev_state = context.current_state.get((event.type, event.state_key))
-        if not prev_state or prev_state.membership != Membership.JOIN:
-            # Only fire user_joined_room if the user has acutally joined the
-            # room. Don't bother if the user is just changing their profile
-            # info.
-            user = UserID.from_string(event.user_id)
-            yield user_joined_room(self.distributor, user, room_id)
+        room_id = mapping["room_id"]
+        servers = mapping["servers"]
 
-    @defer.inlineCallbacks
-    def get_inviter(self, event):
-        # TODO(markjh): get prev_state from snapshot
-        prev_state = yield self.store.get_room_member(
-            event.user_id, event.room_id
-        )
+        defer.returnValue((RoomID.from_string(room_id), servers))
 
+    def get_inviter(self, user_id, current_state):
+        prev_state = current_state.get((EventTypes.Member, user_id))
         if prev_state and prev_state.membership == Membership.INVITE:
-            defer.returnValue(UserID.from_string(prev_state.user_id))
-            return
-        elif "third_party_invite" in event.content:
-            if "sender" in event.content["third_party_invite"]:
-                inviter = UserID.from_string(
-                    event.content["third_party_invite"]["sender"]
-                )
-                defer.returnValue(inviter)
-        defer.returnValue(None)
-
-    @defer.inlineCallbacks
-    def is_host_in_room(self, room_id, context):
-        is_host_in_room = yield self.auth.check_host_in_room(
-            room_id,
-            self.hs.hostname
-        )
-        if not is_host_in_room:
-            # is *anyone* in the room?
-            room_member_keys = [
-                v for (k, v) in context.current_state.keys() if (
-                    k == "m.room.member"
-                )
-            ]
-            if len(room_member_keys) == 0:
-                # has the room been created so we can join it?
-                create_event = context.current_state.get(("m.room.create", ""))
-                if create_event:
-                    is_host_in_room = True
-        defer.returnValue(is_host_in_room)
+            return UserID.from_string(prev_state.user_id)
+        return None
 
     @defer.inlineCallbacks
     def get_joined_rooms_for_user(self, user):
@@ -650,18 +673,6 @@ class RoomMemberHandler(BaseHandler):
         defer.returnValue(room_ids)
 
     @defer.inlineCallbacks
-    def _do_local_membership_update(self, event, context):
-        yield run_on_reactor()
-
-        target_user = UserID.from_string(event.state_key)
-
-        yield self.handle_new_client_event(
-            event,
-            context,
-            extra_users=[target_user],
-        )
-
-    @defer.inlineCallbacks
     def do_3pid_invite(
             self,
             room_id,
@@ -669,7 +680,7 @@ class RoomMemberHandler(BaseHandler):
             medium,
             address,
             id_server,
-            token_id,
+            requester,
             txn_id
     ):
         invitee = yield self._lookup_3pid(
@@ -677,29 +688,22 @@ class RoomMemberHandler(BaseHandler):
         )
 
         if invitee:
-            # make sure it looks like a user ID; it'll throw if it's invalid.
-            UserID.from_string(invitee)
-            yield self.hs.get_handlers().message_handler.create_and_send_event(
-                {
-                    "type": EventTypes.Member,
-                    "content": {
-                        "membership": unicode("invite")
-                    },
-                    "room_id": room_id,
-                    "sender": inviter.to_string(),
-                    "state_key": invitee,
-                },
-                token_id=token_id,
+            handler = self.hs.get_handlers().room_member_handler
+            yield handler.update_membership(
+                requester,
+                UserID.from_string(invitee),
+                room_id,
+                "invite",
                 txn_id=txn_id,
             )
         else:
             yield self._make_and_store_3pid_invite(
+                requester,
                 id_server,
                 medium,
                 address,
                 room_id,
                 inviter,
-                token_id,
                 txn_id=txn_id
             )
 
@@ -757,12 +761,12 @@ class RoomMemberHandler(BaseHandler):
     @defer.inlineCallbacks
     def _make_and_store_3pid_invite(
             self,
+            requester,
             id_server,
             medium,
             address,
             room_id,
             user,
-            token_id,
             txn_id
     ):
         room_state = yield self.hs.get_state_handler().get_current_state(room_id)
@@ -794,7 +798,7 @@ class RoomMemberHandler(BaseHandler):
         if room_avatar_event:
             room_avatar_url = room_avatar_event.content.get("url", "")
 
-        token, public_key, key_validity_url, display_name = (
+        token, public_keys, fallback_public_key, display_name = (
             yield self._ask_id_server_for_third_party_invite(
                 id_server=id_server,
                 medium=medium,
@@ -809,20 +813,24 @@ class RoomMemberHandler(BaseHandler):
                 inviter_avatar_url=inviter_avatar_url
             )
         )
+
         msg_handler = self.hs.get_handlers().message_handler
-        yield msg_handler.create_and_send_event(
+        yield msg_handler.create_and_send_nonmember_event(
+            requester,
             {
                 "type": EventTypes.ThirdPartyInvite,
                 "content": {
                     "display_name": display_name,
-                    "key_validity_url": key_validity_url,
-                    "public_key": public_key,
+                    "public_keys": public_keys,
+
+                    # For backwards compatibility:
+                    "key_validity_url": fallback_public_key["key_validity_url"],
+                    "public_key": fallback_public_key["public_key"],
                 },
                 "room_id": room_id,
                 "sender": user.to_string(),
                 "state_key": token,
             },
-            token_id=token_id,
             txn_id=txn_id,
         )
 
@@ -841,32 +849,89 @@ class RoomMemberHandler(BaseHandler):
             inviter_display_name,
             inviter_avatar_url
     ):
+        """
+        Asks an identity server for a third party invite.
+
+        :param id_server (str): hostname + optional port for the identity server.
+        :param medium (str): The literal string "email".
+        :param address (str): The third party address being invited.
+        :param room_id (str): The ID of the room to which the user is invited.
+        :param inviter_user_id (str): The user ID of the inviter.
+        :param room_alias (str): An alias for the room, for cosmetic
+            notifications.
+        :param room_avatar_url (str): The URL of the room's avatar, for cosmetic
+            notifications.
+        :param room_join_rules (str): The join rules of the email
+            (e.g. "public").
+        :param room_name (str): The m.room.name of the room.
+        :param inviter_display_name (str): The current display name of the
+            inviter.
+        :param inviter_avatar_url (str): The URL of the inviter's avatar.
+
+        :return: A deferred tuple containing:
+            token (str): The token which must be signed to prove authenticity.
+            public_keys ([{"public_key": str, "key_validity_url": str}]):
+                public_key is a base64-encoded ed25519 public key.
+            fallback_public_key: One element from public_keys.
+            display_name (str): A user-friendly name to represent the invited
+                user.
+        """
+
         is_url = "%s%s/_matrix/identity/api/v1/store-invite" % (
             id_server_scheme, id_server,
         )
+
+        invite_config = {
+            "medium": medium,
+            "address": address,
+            "room_id": room_id,
+            "room_alias": room_alias,
+            "room_avatar_url": room_avatar_url,
+            "room_join_rules": room_join_rules,
+            "room_name": room_name,
+            "sender": inviter_user_id,
+            "sender_display_name": inviter_display_name,
+            "sender_avatar_url": inviter_avatar_url,
+        }
+
+        if self.hs.config.invite_3pid_guest:
+            registration_handler = self.hs.get_handlers().registration_handler
+            guest_access_token = yield registration_handler.guest_access_token_for(
+                medium=medium,
+                address=address,
+                inviter_user_id=inviter_user_id,
+            )
+
+            guest_user_info = yield self.hs.get_auth().get_user_by_access_token(
+                guest_access_token
+            )
+
+            invite_config.update({
+                "guest_access_token": guest_access_token,
+                "guest_user_id": guest_user_info["user"].to_string(),
+            })
+
         data = yield self.hs.get_simple_http_client().post_urlencoded_get_json(
             is_url,
-            {
-                "medium": medium,
-                "address": address,
-                "room_id": room_id,
-                "room_alias": room_alias,
-                "room_avatar_url": room_avatar_url,
-                "room_join_rules": room_join_rules,
-                "room_name": room_name,
-                "sender": inviter_user_id,
-                "sender_display_name": inviter_display_name,
-                "sender_avatar_url": inviter_avatar_url,
-            }
+            invite_config
         )
         # TODO: Check for success
         token = data["token"]
-        public_key = data["public_key"]
+        public_keys = data.get("public_keys", [])
+        if "public_key" in data:
+            fallback_public_key = {
+                "public_key": data["public_key"],
+                "key_validity_url": "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+                    id_server_scheme, id_server,
+                ),
+            }
+        else:
+            fallback_public_key = public_keys[0]
+
+        if not public_keys:
+            public_keys.append(fallback_public_key)
         display_name = data["display_name"]
-        key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
-            id_server_scheme, id_server,
-        )
-        defer.returnValue((token, public_key, key_validity_url, display_name))
+        defer.returnValue((token, public_keys, fallback_public_key, display_name))
 
     def forget(self, user, room_id):
         return self.store.forget(user.to_string(), room_id)
@@ -876,39 +941,71 @@ class RoomListHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def get_public_room_list(self):
-        chunk = yield self.store.get_rooms(is_public=True)
-
-        room_members = yield defer.gatherResults(
-            [
-                self.store.get_users_in_room(room["room_id"])
-                for room in chunk
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
-
-        avatar_urls = yield defer.gatherResults(
-            [
-                self.get_room_avatar_url(room["room_id"])
-                for room in chunk
-            ],
-            consumeErrors=True,
-        ).addErrback(unwrapFirstError)
-
-        for i, room in enumerate(chunk):
-            room["num_joined_members"] = len(room_members[i])
-            if avatar_urls[i]:
-                room["avatar_url"] = avatar_urls[i]
+        room_ids = yield self.store.get_public_room_ids()
+
+        @defer.inlineCallbacks
+        def handle_room(room_id):
+            aliases = yield self.store.get_aliases_for_room(room_id)
+            if not aliases:
+                defer.returnValue(None)
+
+            state = yield self.state_handler.get_current_state(room_id)
+
+            result = {"aliases": aliases, "room_id": room_id}
+
+            name_event = state.get((EventTypes.Name, ""), None)
+            if name_event:
+                name = name_event.content.get("name", None)
+                if name:
+                    result["name"] = name
+
+            topic_event = state.get((EventTypes.Topic, ""), None)
+            if topic_event:
+                topic = topic_event.content.get("topic", None)
+                if topic:
+                    result["topic"] = topic
+
+            canonical_event = state.get((EventTypes.CanonicalAlias, ""), None)
+            if canonical_event:
+                canonical_alias = canonical_event.content.get("alias", None)
+                if canonical_alias:
+                    result["canonical_alias"] = canonical_alias
+
+            visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+            visibility = None
+            if visibility_event:
+                visibility = visibility_event.content.get("history_visibility", None)
+            result["world_readable"] = visibility == "world_readable"
+
+            guest_event = state.get((EventTypes.GuestAccess, ""), None)
+            guest = None
+            if guest_event:
+                guest = guest_event.content.get("guest_access", None)
+            result["guest_can_join"] = guest == "can_join"
+
+            avatar_event = state.get(("m.room.avatar", ""), None)
+            if avatar_event:
+                avatar_url = avatar_event.content.get("url", None)
+                if avatar_url:
+                    result["avatar_url"] = avatar_url
+
+            result["num_joined_members"] = sum(
+                1 for (event_type, _), ev in state.items()
+                if event_type == EventTypes.Member and ev.membership == Membership.JOIN
+            )
 
-        # FIXME (erikj): START is no longer a valid value
-        defer.returnValue({"start": "START", "end": "END", "chunk": chunk})
+            defer.returnValue(result)
 
-    @defer.inlineCallbacks
-    def get_room_avatar_url(self, room_id):
-        event = yield self.hs.get_state_handler().get_current_state(
-            room_id, "m.room.avatar"
-        )
-        if event and "url" in event.content:
-            defer.returnValue(event.content["url"])
+        result = []
+        for chunk in (room_ids[i:i + 10] for i in xrange(0, len(room_ids), 10)):
+            chunk_result = yield defer.gatherResults([
+                handle_room(room_id)
+                for room_id in chunk
+            ], consumeErrors=True).addErrback(unwrapFirstError)
+            result.extend(v for v in chunk_result if v)
+
+        # FIXME (erikj): START is no longer a valid value
+        defer.returnValue({"start": "START", "end": "END", "chunk": result})
 
 
 class RoomContextHandler(BaseHandler):
@@ -927,7 +1024,7 @@ class RoomContextHandler(BaseHandler):
         Returns:
             dict, or None if the event isn't found
         """
-        before_limit = math.floor(limit/2.)
+        before_limit = math.floor(limit / 2.)
         after_limit = limit - before_limit
 
         now_token = yield self.hs.get_event_sources().get_current_token()
@@ -997,6 +1094,11 @@ class RoomEventSource(object):
 
         to_key = yield self.get_current_key()
 
+        from_token = RoomStreamToken.parse(from_key)
+        if from_token.topological:
+            logger.warn("Stream has topological part!!!! %r", from_key)
+            from_key = "s%s" % (from_token.stream,)
+
         app_service = yield self.store.get_app_service_by_user_id(
             user.to_string()
         )
@@ -1008,15 +1110,31 @@ class RoomEventSource(object):
                 limit=limit,
             )
         else:
-            events, end_key = yield self.store.get_room_events_stream(
-                user_id=user.to_string(),
+            room_events = yield self.store.get_membership_changes_for_user(
+                user.to_string(), from_key, to_key
+            )
+
+            room_to_events = yield self.store.get_room_events_stream_for_rooms(
+                room_ids=room_ids,
                 from_key=from_key,
                 to_key=to_key,
-                limit=limit,
-                room_ids=room_ids,
-                is_guest=is_guest,
+                limit=limit or 10,
+                order='ASC',
             )
 
+            events = list(room_events)
+            events.extend(e for evs, _ in room_to_events.values() for e in evs)
+
+            events.sort(key=lambda e: e.internal_metadata.order)
+
+            if limit:
+                events[:] = events[:limit]
+
+            if events:
+                end_key = events[-1].internal_metadata.after
+            else:
+                end_key = to_key
+
         defer.returnValue((events, end_key))
 
     def get_current_key(self, direction='f'):
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 53e1eb0508..1f6fde8e8a 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -18,18 +18,22 @@ from ._base import BaseHandler
 from synapse.streams.config import PaginationConfig
 from synapse.api.constants import Membership, EventTypes
 from synapse.util import unwrapFirstError
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.metrics import Measure
+from synapse.push.clientformat import format_push_rules_for_user
 
 from twisted.internet import defer
 
 import collections
 import logging
+import itertools
 
 logger = logging.getLogger(__name__)
 
 
 SyncConfig = collections.namedtuple("SyncConfig", [
     "user",
-    "filter",
+    "filter_collection",
     "is_guest",
 ])
 
@@ -72,7 +76,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
         )
 
 
-class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
+class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
     "room_id",            # str
     "timeline",           # TimelineBatch
     "state",              # dict[(str, str), FrozenEvent]
@@ -118,7 +122,11 @@ class SyncResult(collections.namedtuple("SyncResult", [
         events.
         """
         return bool(
-            self.presence or self.joined or self.invited or self.archived
+            self.presence or
+            self.joined or
+            self.invited or
+            self.archived or
+            self.account_data
         )
 
 
@@ -139,11 +147,21 @@ class SyncHandler(BaseHandler):
             A Deferred SyncResult.
         """
 
+        context = LoggingContext.current_context()
+        if context:
+            if since_token is None:
+                context.tag = "initial_sync"
+            elif full_state:
+                context.tag = "full_state_sync"
+            else:
+                context.tag = "incremental_sync"
+
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
-            result = yield self.current_sync_for_user(sync_config, since_token,
-                                                      full_state=full_state)
+            result = yield self.current_sync_for_user(
+                sync_config, since_token, full_state=full_state,
+            )
             defer.returnValue(result)
         else:
             def current_sync_callback(before_token, after_token):
@@ -151,7 +169,7 @@ class SyncHandler(BaseHandler):
 
             result = yield self.notifier.wait_for_events(
                 sync_config.user.to_string(), timeout, current_sync_callback,
-                from_token=since_token
+                from_token=since_token,
             )
             defer.returnValue(result)
 
@@ -166,18 +184,6 @@ class SyncHandler(BaseHandler):
         else:
             return self.incremental_sync_with_gap(sync_config, since_token)
 
-    def last_read_event_id_for_room_and_user(self, room_id, user_id, ephemeral_by_room):
-        if room_id not in ephemeral_by_room:
-            return None
-        for e in ephemeral_by_room[room_id]:
-            if e['type'] != 'm.receipt':
-                continue
-            for receipt_event_id, val in e['content'].items():
-                if 'm.read' in val:
-                    if user_id in val['m.read']:
-                        return receipt_event_id
-        return None
-
     @defer.inlineCallbacks
     def full_state_sync(self, sync_config, timeline_since_token):
         """Get a sync for a client which is starting without any state.
@@ -204,9 +210,9 @@ class SyncHandler(BaseHandler):
             key=None
         )
 
-        membership_list = (Membership.INVITE, Membership.JOIN)
-        if sync_config.filter.include_leave:
-            membership_list += (Membership.LEAVE, Membership.BAN)
+        membership_list = (
+            Membership.INVITE, Membership.JOIN, Membership.LEAVE, Membership.BAN
+        )
 
         room_list = yield self.store.get_rooms_for_user_where_membership_is(
             user_id=sync_config.user.to_string(),
@@ -219,6 +225,10 @@ class SyncHandler(BaseHandler):
             )
         )
 
+        account_data['m.push_rules'] = yield self.push_rules_for_user(
+            sync_config.user
+        )
+
         tags_by_room = yield self.store.get_tags_for_user(
             sync_config.user.to_string()
         )
@@ -227,48 +237,69 @@ class SyncHandler(BaseHandler):
         invited = []
         archived = []
         deferreds = []
-        for event in room_list:
-            if event.membership == Membership.JOIN:
-                room_sync_deferred = self.full_state_sync_for_joined_room(
-                    room_id=event.room_id,
-                    sync_config=sync_config,
-                    now_token=now_token,
-                    timeline_since_token=timeline_since_token,
-                    ephemeral_by_room=ephemeral_by_room,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
-                room_sync_deferred.addCallback(joined.append)
-                deferreds.append(room_sync_deferred)
-            elif event.membership == Membership.INVITE:
-                invite = yield self.store.get_event(event.event_id)
-                invited.append(InvitedSyncResult(
-                    room_id=event.room_id,
-                    invite=invite,
-                ))
-            elif event.membership in (Membership.LEAVE, Membership.BAN):
-                leave_token = now_token.copy_and_replace(
-                    "room_key", "s%d" % (event.stream_ordering,)
-                )
-                room_sync_deferred = self.full_state_sync_for_archived_room(
-                    sync_config=sync_config,
-                    room_id=event.room_id,
-                    leave_event_id=event.event_id,
-                    leave_token=leave_token,
-                    timeline_since_token=timeline_since_token,
-                    tags_by_room=tags_by_room,
-                    account_data_by_room=account_data_by_room,
-                )
-                room_sync_deferred.addCallback(archived.append)
-                deferreds.append(room_sync_deferred)
 
-        yield defer.gatherResults(
-            deferreds, consumeErrors=True
-        ).addErrback(unwrapFirstError)
+        room_list_chunks = [room_list[i:i + 10] for i in xrange(0, len(room_list), 10)]
+        for room_list_chunk in room_list_chunks:
+            for event in room_list_chunk:
+                if event.membership == Membership.JOIN:
+                    room_sync_deferred = preserve_fn(
+                        self.full_state_sync_for_joined_room
+                    )(
+                        room_id=event.room_id,
+                        sync_config=sync_config,
+                        now_token=now_token,
+                        timeline_since_token=timeline_since_token,
+                        ephemeral_by_room=ephemeral_by_room,
+                        tags_by_room=tags_by_room,
+                        account_data_by_room=account_data_by_room,
+                    )
+                    room_sync_deferred.addCallback(joined.append)
+                    deferreds.append(room_sync_deferred)
+                elif event.membership == Membership.INVITE:
+                    invite = yield self.store.get_event(event.event_id)
+                    invited.append(InvitedSyncResult(
+                        room_id=event.room_id,
+                        invite=invite,
+                    ))
+                elif event.membership in (Membership.LEAVE, Membership.BAN):
+                    # Always send down rooms we were banned or kicked from.
+                    if not sync_config.filter_collection.include_leave:
+                        if event.membership == Membership.LEAVE:
+                            if sync_config.user.to_string() == event.sender:
+                                continue
+
+                    leave_token = now_token.copy_and_replace(
+                        "room_key", "s%d" % (event.stream_ordering,)
+                    )
+                    room_sync_deferred = preserve_fn(
+                        self.full_state_sync_for_archived_room
+                    )(
+                        sync_config=sync_config,
+                        room_id=event.room_id,
+                        leave_event_id=event.event_id,
+                        leave_token=leave_token,
+                        timeline_since_token=timeline_since_token,
+                        tags_by_room=tags_by_room,
+                        account_data_by_room=account_data_by_room,
+                    )
+                    room_sync_deferred.addCallback(archived.append)
+                    deferreds.append(room_sync_deferred)
+
+            yield defer.gatherResults(
+                deferreds, consumeErrors=True
+            ).addErrback(unwrapFirstError)
+
+        account_data_for_user = sync_config.filter_collection.filter_account_data(
+            self.account_data_for_user(account_data)
+        )
+
+        presence = sync_config.filter_collection.filter_presence(
+            presence
+        )
 
         defer.returnValue(SyncResult(
             presence=presence,
-            account_data=self.account_data_for_user(account_data),
+            account_data=account_data_for_user,
             joined=joined,
             invited=invited,
             archived=archived,
@@ -289,29 +320,26 @@ class SyncHandler(BaseHandler):
             room_id, sync_config, now_token, since_token=timeline_since_token
         )
 
-        notifs = yield self.unread_notifs_for_room_id(
-            room_id, sync_config, ephemeral_by_room
+        room_sync = yield self.incremental_sync_with_gap_for_room(
+            room_id, sync_config,
+            now_token=now_token,
+            since_token=timeline_since_token,
+            ephemeral_by_room=ephemeral_by_room,
+            tags_by_room=tags_by_room,
+            account_data_by_room=account_data_by_room,
+            batch=batch,
+            full_state=True,
         )
 
-        unread_notifications = {}
-        if notifs is not None:
-            unread_notifications["notification_count"] = len(notifs)
-            unread_notifications["highlight_count"] = len([
-                1 for notif in notifs if _action_has_highlight(notif["actions"])
-            ])
-
-        current_state = yield self.get_state_at(room_id, now_token)
+        defer.returnValue(room_sync)
 
-        defer.returnValue(JoinedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=current_state,
-            ephemeral=ephemeral_by_room.get(room_id, []),
-            account_data=self.account_data_for_room(
-                room_id, tags_by_room, account_data_by_room
-            ),
-            unread_notifications=unread_notifications,
-        ))
+    @defer.inlineCallbacks
+    def push_rules_for_user(self, user):
+        user_id = user.to_string()
+        rawrules = yield self.store.get_push_rules_for_user(user_id)
+        enabled_map = yield self.store.get_push_rules_enabled_for_user(user_id)
+        rules = format_push_rules_for_user(user, rawrules, enabled_map)
+        defer.returnValue(rules)
 
     def account_data_for_user(self, account_data):
         account_data_events = []
@@ -356,78 +384,68 @@ class SyncHandler(BaseHandler):
             typing events for that room.
         """
 
-        typing_key = since_token.typing_key if since_token else "0"
+        with Measure(self.clock, "ephemeral_by_room"):
+            typing_key = since_token.typing_key if since_token else "0"
 
-        rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-        room_ids = [room.room_id for room in rooms]
+            rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
+            room_ids = [room.room_id for room in rooms]
 
-        typing_source = self.event_sources.sources["typing"]
-        typing, typing_key = yield typing_source.get_new_events(
-            user=sync_config.user,
-            from_key=typing_key,
-            limit=sync_config.filter.ephemeral_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("typing_key", typing_key)
-
-        ephemeral_by_room = {}
-
-        for event in typing:
-            # we want to exclude the room_id from the event, but modifying the
-            # result returned by the event source is poor form (it might cache
-            # the object)
-            room_id = event["room_id"]
-            event_copy = {k: v for (k, v) in event.iteritems()
-                          if k != "room_id"}
-            ephemeral_by_room.setdefault(room_id, []).append(event_copy)
-
-        receipt_key = since_token.receipt_key if since_token else "0"
-
-        receipt_source = self.event_sources.sources["receipt"]
-        receipts, receipt_key = yield receipt_source.get_new_events(
-            user=sync_config.user,
-            from_key=receipt_key,
-            limit=sync_config.filter.ephemeral_limit(),
-            room_ids=room_ids,
-            is_guest=sync_config.is_guest,
-        )
-        now_token = now_token.copy_and_replace("receipt_key", receipt_key)
+            typing_source = self.event_sources.sources["typing"]
+            typing, typing_key = yield typing_source.get_new_events(
+                user=sync_config.user,
+                from_key=typing_key,
+                limit=sync_config.filter_collection.ephemeral_limit(),
+                room_ids=room_ids,
+                is_guest=sync_config.is_guest,
+            )
+            now_token = now_token.copy_and_replace("typing_key", typing_key)
+
+            ephemeral_by_room = {}
+
+            for event in typing:
+                # we want to exclude the room_id from the event, but modifying the
+                # result returned by the event source is poor form (it might cache
+                # the object)
+                room_id = event["room_id"]
+                event_copy = {k: v for (k, v) in event.iteritems()
+                              if k != "room_id"}
+                ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+
+            receipt_key = since_token.receipt_key if since_token else "0"
+
+            receipt_source = self.event_sources.sources["receipt"]
+            receipts, receipt_key = yield receipt_source.get_new_events(
+                user=sync_config.user,
+                from_key=receipt_key,
+                limit=sync_config.filter_collection.ephemeral_limit(),
+                room_ids=room_ids,
+                is_guest=sync_config.is_guest,
+            )
+            now_token = now_token.copy_and_replace("receipt_key", receipt_key)
 
-        for event in receipts:
-            room_id = event["room_id"]
-            # exclude room id, as above
-            event_copy = {k: v for (k, v) in event.iteritems()
-                          if k != "room_id"}
-            ephemeral_by_room.setdefault(room_id, []).append(event_copy)
+            for event in receipts:
+                room_id = event["room_id"]
+                # exclude room id, as above
+                event_copy = {k: v for (k, v) in event.iteritems()
+                              if k != "room_id"}
+                ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
         defer.returnValue((now_token, ephemeral_by_room))
 
-    @defer.inlineCallbacks
     def full_state_sync_for_archived_room(self, room_id, sync_config,
                                           leave_event_id, leave_token,
                                           timeline_since_token, tags_by_room,
                                           account_data_by_room):
         """Sync a room for a client which is starting without any state
         Returns:
-            A Deferred JoinedSyncResult.
+            A Deferred ArchivedSyncResult.
         """
 
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, leave_token, since_token=timeline_since_token
+        return self.incremental_sync_for_archived_room(
+            sync_config, room_id, leave_event_id, timeline_since_token, tags_by_room,
+            account_data_by_room, full_state=True, leave_token=leave_token,
         )
 
-        leave_state = yield self.store.get_state_for_event(leave_event_id)
-
-        defer.returnValue(ArchivedSyncResult(
-            room_id=room_id,
-            timeline=batch,
-            state=leave_state,
-            account_data=self.account_data_for_room(
-                room_id, tags_by_room, account_data_by_room
-            ),
-        ))
-
     @defer.inlineCallbacks
     def incremental_sync_with_gap(self, sync_config, since_token):
         """ Get the incremental delta needed to bring the client up to
@@ -444,19 +462,12 @@ class SyncHandler(BaseHandler):
         presence, presence_key = yield presence_source.get_new_events(
             user=sync_config.user,
             from_key=since_token.presence_key,
-            limit=sync_config.filter.presence_limit(),
+            limit=sync_config.filter_collection.presence_limit(),
             room_ids=room_ids,
             is_guest=sync_config.is_guest,
         )
         now_token = now_token.copy_and_replace("presence_key", presence_key)
 
-        # We now fetch all ephemeral events for this room in order to get
-        # this users current read receipt. This could almost certainly be
-        # optimised.
-        _, all_ephemeral_by_room = yield self.ephemeral_by_room(
-            sync_config, now_token
-        )
-
         now_token, ephemeral_by_room = yield self.ephemeral_by_room(
             sync_config, now_token, since_token
         )
@@ -473,139 +484,169 @@ class SyncHandler(BaseHandler):
                 sync_config.user
             )
 
-        timeline_limit = sync_config.filter.timeline_limit()
+        user_id = sync_config.user.to_string()
 
-        room_events, _ = yield self.store.get_room_events_stream(
-            sync_config.user.to_string(),
-            from_key=since_token.room_key,
-            to_key=now_token.room_key,
-            limit=timeline_limit + 1,
-        )
+        timeline_limit = sync_config.filter_collection.timeline_limit()
 
         tags_by_room = yield self.store.get_updated_tags(
-            sync_config.user.to_string(),
+            user_id,
             since_token.account_data_key,
         )
 
         account_data, account_data_by_room = (
             yield self.store.get_updated_account_data_for_user(
-                sync_config.user.to_string(),
+                user_id,
                 since_token.account_data_key,
             )
         )
 
-        joined = []
+        push_rules_changed = yield self.store.have_push_rules_changed_for_user(
+            user_id, int(since_token.push_rules_key)
+        )
+
+        if push_rules_changed:
+            account_data["m.push_rules"] = yield self.push_rules_for_user(
+                sync_config.user
+            )
+
+        # Get a list of membership change events that have happened.
+        rooms_changed = yield self.store.get_membership_changes_for_user(
+            user_id, since_token.room_key, now_token.room_key
+        )
+
+        mem_change_events_by_room_id = {}
+        for event in rooms_changed:
+            mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
+
+        newly_joined_rooms = []
         archived = []
-        if len(room_events) <= timeline_limit:
-            # There is no gap in any of the rooms. Therefore we can just
-            # partition the new events by room and return them.
-            logger.debug("Got %i events for incremental sync - not limited",
-                         len(room_events))
-
-            invite_events = []
-            leave_events = []
-            events_by_room_id = {}
-            for event in room_events:
-                events_by_room_id.setdefault(event.room_id, []).append(event)
-                if event.room_id not in joined_room_ids:
-                    if (event.type == EventTypes.Member
-                            and event.state_key == sync_config.user.to_string()):
-                        if event.membership == Membership.INVITE:
-                            invite_events.append(event)
-                        elif event.membership in (Membership.LEAVE, Membership.BAN):
-                            leave_events.append(event)
-
-            for room_id in joined_room_ids:
-                recents = events_by_room_id.get(room_id, [])
-                logger.debug("Events for room %s: %r", room_id, recents)
-                state = {
-                    (event.type, event.state_key): event
-                    for event in recents if event.is_state()}
-                limited = False
+        invited = []
+        for room_id, events in mem_change_events_by_room_id.items():
+            non_joins = [e for e in events if e.membership != Membership.JOIN]
+            has_join = len(non_joins) != len(events)
+
+            # We want to figure out if we joined the room at some point since
+            # the last sync (even if we have since left). This is to make sure
+            # we do send down the room, and with full state, where necessary
+            if room_id in joined_room_ids or has_join:
+                old_state = yield self.get_state_at(room_id, since_token)
+                old_mem_ev = old_state.get((EventTypes.Member, user_id), None)
+                if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
+                        newly_joined_rooms.append(room_id)
+
+                if room_id in joined_room_ids:
+                    continue
+
+            if not non_joins:
+                continue
 
-                if recents:
-                    prev_batch = now_token.copy_and_replace(
-                        "room_key", recents[0].internal_metadata.before
-                    )
-                else:
-                    prev_batch = now_token
-
-                just_joined = yield self.check_joined_room(sync_config, state)
-                if just_joined:
-                    logger.debug("User has just joined %s: needs full state",
-                                 room_id)
-                    state = yield self.get_state_at(room_id, now_token)
-                    # the timeline is inherently limited if we've just joined
-                    limited = True
-
-                room_sync = JoinedSyncResult(
-                    room_id=room_id,
-                    timeline=TimelineBatch(
-                        events=recents,
-                        prev_batch=prev_batch,
-                        limited=limited,
-                    ),
-                    state=state,
-                    ephemeral=ephemeral_by_room.get(room_id, []),
-                    account_data=self.account_data_for_room(
-                        room_id, tags_by_room, account_data_by_room
-                    ),
-                    unread_notifications={},
+            # Only bother if we're still currently invited
+            should_invite = non_joins[-1].membership == Membership.INVITE
+            if should_invite:
+                room_sync = InvitedSyncResult(room_id, invite=non_joins[-1])
+                if room_sync:
+                    invited.append(room_sync)
+
+            # Always include leave/ban events. Just take the last one.
+            # TODO: How do we handle ban -> leave in same batch?
+            leave_events = [
+                e for e in non_joins
+                if e.membership in (Membership.LEAVE, Membership.BAN)
+            ]
+
+            if leave_events:
+                leave_event = leave_events[-1]
+                room_sync = yield self.incremental_sync_for_archived_room(
+                    sync_config, room_id, leave_event.event_id, since_token,
+                    tags_by_room, account_data_by_room,
+                    full_state=room_id in newly_joined_rooms
                 )
-                logger.debug("Result for room %s: %r", room_id, room_sync)
-
                 if room_sync:
-                    notifs = yield self.unread_notifs_for_room_id(
-                        room_id, sync_config, all_ephemeral_by_room
-                    )
+                    archived.append(room_sync)
 
-                    if notifs is not None:
-                        notif_dict = room_sync.unread_notifications
-                        notif_dict["notification_count"] = len(notifs)
-                        notif_dict["highlight_count"] = len([
-                            1 for notif in notifs
-                            if _action_has_highlight(notif["actions"])
-                        ])
+        # Get all events for rooms we're currently joined to.
+        room_to_events = yield self.store.get_room_events_stream_for_rooms(
+            room_ids=joined_room_ids,
+            from_key=since_token.room_key,
+            to_key=now_token.room_key,
+            limit=timeline_limit + 1,
+        )
 
-                    joined.append(room_sync)
+        joined = []
+        # We loop through all room ids, even if there are no new events, in case
+        # there are non room events taht we need to notify about.
+        for room_id in joined_room_ids:
+            room_entry = room_to_events.get(room_id, None)
 
-        else:
-            logger.debug("Got %i events for incremental sync - hit limit",
-                         len(room_events))
+            if room_entry:
+                events, start_key = room_entry
 
-            invite_events = yield self.store.get_invites_for_user(
-                sync_config.user.to_string()
-            )
+                prev_batch_token = now_token.copy_and_replace("room_key", start_key)
 
-            leave_events = yield self.store.get_leave_and_ban_events_for_user(
-                sync_config.user.to_string()
-            )
+                newly_joined_room = room_id in newly_joined_rooms
+                full_state = newly_joined_room
 
-            for room_id in joined_room_ids:
-                room_sync = yield self.incremental_sync_with_gap_for_room(
-                    room_id, sync_config, since_token, now_token,
-                    ephemeral_by_room, tags_by_room, account_data_by_room,
-                    all_ephemeral_by_room=all_ephemeral_by_room,
+                batch = yield self.load_filtered_recents(
+                    room_id, sync_config, prev_batch_token,
+                    since_token=since_token,
+                    recents=events,
+                    newly_joined_room=newly_joined_room,
                 )
-                if room_sync:
-                    joined.append(room_sync)
-
-        for leave_event in leave_events:
-            room_sync = yield self.incremental_sync_for_archived_room(
-                sync_config, leave_event, since_token, tags_by_room,
-                account_data_by_room
+            else:
+                batch = TimelineBatch(
+                    events=[],
+                    prev_batch=since_token,
+                    limited=False,
+                )
+                full_state = False
+
+            room_sync = yield self.incremental_sync_with_gap_for_room(
+                room_id=room_id,
+                sync_config=sync_config,
+                since_token=since_token,
+                now_token=now_token,
+                ephemeral_by_room=ephemeral_by_room,
+                tags_by_room=tags_by_room,
+                account_data_by_room=account_data_by_room,
+                batch=batch,
+                full_state=full_state,
             )
             if room_sync:
-                archived.append(room_sync)
+                joined.append(room_sync)
+
+        # For each newly joined room, we want to send down presence of
+        # existing users.
+        presence_handler = self.hs.get_handlers().presence_handler
+        extra_presence_users = set()
+        for room_id in newly_joined_rooms:
+            users = yield self.store.get_users_in_room(event.room_id)
+            extra_presence_users.update(users)
+
+        # For each new member, send down presence.
+        for joined_sync in joined:
+            it = itertools.chain(joined_sync.timeline.events, joined_sync.state.values())
+            for event in it:
+                if event.type == EventTypes.Member:
+                    if event.membership == Membership.JOIN:
+                        extra_presence_users.add(event.state_key)
+
+        states = yield presence_handler.get_states(
+            [u for u in extra_presence_users if u != user_id],
+            as_event=True,
+        )
+        presence.extend(states)
 
-        invited = [
-            InvitedSyncResult(room_id=event.room_id, invite=event)
-            for event in invite_events
-        ]
+        account_data_for_user = sync_config.filter_collection.filter_account_data(
+            self.account_data_for_user(account_data)
+        )
+
+        presence = sync_config.filter_collection.filter_presence(
+            presence
+        )
 
         defer.returnValue(SyncResult(
             presence=presence,
-            account_data=self.account_data_for_user(account_data),
+            account_data=account_data_for_user,
             joined=joined,
             invited=invited,
             archived=archived,
@@ -614,51 +655,71 @@ class SyncHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def load_filtered_recents(self, room_id, sync_config, now_token,
-                              since_token=None):
+                              since_token=None, recents=None, newly_joined_room=False):
         """
         :returns a Deferred TimelineBatch
         """
-        limited = True
-        recents = []
-        filtering_factor = 2
-        timeline_limit = sync_config.filter.timeline_limit()
-        load_limit = max(timeline_limit * filtering_factor, 100)
-        max_repeat = 3  # Only try a few times per room, otherwise
-        room_key = now_token.room_key
-        end_key = room_key
-
-        while limited and len(recents) < timeline_limit and max_repeat:
-            events, keys = yield self.store.get_recent_events_for_room(
-                room_id,
-                limit=load_limit + 1,
-                from_token=since_token.room_key if since_token else None,
-                end_token=end_key,
-            )
-            (room_key, _) = keys
-            end_key = "s" + room_key.split('-')[-1]
-            loaded_recents = sync_config.filter.filter_room_timeline(events)
-            loaded_recents = yield self._filter_events_for_client(
-                sync_config.user.to_string(),
-                loaded_recents,
-                is_peeking=sync_config.is_guest,
-            )
-            loaded_recents.extend(recents)
-            recents = loaded_recents
-            if len(events) <= load_limit:
+        with Measure(self.clock, "load_filtered_recents"):
+            filtering_factor = 2
+            timeline_limit = sync_config.filter_collection.timeline_limit()
+            load_limit = max(timeline_limit * filtering_factor, 10)
+            max_repeat = 5  # Only try a few times per room, otherwise
+            room_key = now_token.room_key
+            end_key = room_key
+
+            if recents is None or newly_joined_room or timeline_limit < len(recents):
+                limited = True
+            else:
                 limited = False
-            max_repeat -= 1
 
-        if len(recents) > timeline_limit:
-            limited = True
-            recents = recents[-timeline_limit:]
-            room_key = recents[0].internal_metadata.before
+            if recents is not None:
+                recents = sync_config.filter_collection.filter_room_timeline(recents)
+                recents = yield self._filter_events_for_client(
+                    sync_config.user.to_string(),
+                    recents,
+                )
+            else:
+                recents = []
+
+            since_key = None
+            if since_token and not newly_joined_room:
+                since_key = since_token.room_key
+
+            while limited and len(recents) < timeline_limit and max_repeat:
+                events, end_key = yield self.store.get_room_events_stream_for_room(
+                    room_id,
+                    limit=load_limit + 1,
+                    from_key=since_key,
+                    to_key=end_key,
+                )
+                loaded_recents = sync_config.filter_collection.filter_room_timeline(
+                    events
+                )
+                loaded_recents = yield self._filter_events_for_client(
+                    sync_config.user.to_string(),
+                    loaded_recents,
+                )
+                loaded_recents.extend(recents)
+                recents = loaded_recents
 
-        prev_batch_token = now_token.copy_and_replace(
-            "room_key", room_key
-        )
+                if len(events) <= load_limit:
+                    limited = False
+                    break
+                max_repeat -= 1
+
+            if len(recents) > timeline_limit:
+                limited = True
+                recents = recents[-timeline_limit:]
+                room_key = recents[0].internal_metadata.before
+
+            prev_batch_token = now_token.copy_and_replace(
+                "room_key", room_key
+            )
 
         defer.returnValue(TimelineBatch(
-            events=recents, prev_batch=prev_batch_token, limited=limited
+            events=recents,
+            prev_batch=prev_batch_token,
+            limited=limited or newly_joined_room
         ))
 
     @defer.inlineCallbacks
@@ -666,112 +727,92 @@ class SyncHandler(BaseHandler):
                                            since_token, now_token,
                                            ephemeral_by_room, tags_by_room,
                                            account_data_by_room,
-                                           all_ephemeral_by_room):
-        """ Get the incremental delta needed to bring the client up to date for
-        the room. Gives the client the most recent events and the changes to
-        state.
-        Returns:
-            A Deferred JoinedSyncResult
-        """
-        logger.debug("Doing incremental sync for room %s between %s and %s",
-                     room_id, since_token, now_token)
-
-        # TODO(mjark): Check for redactions we might have missed.
-
-        batch = yield self.load_filtered_recents(
-            room_id, sync_config, now_token, since_token,
+                                           batch, full_state=False):
+        state = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, now_token,
+            full_state=full_state
         )
 
-        logger.debug("Recents %r", batch)
-
-        current_state = yield self.get_state_at(room_id, now_token)
-
-        state_at_previous_sync = yield self.get_state_at(
-            room_id, stream_position=since_token
+        account_data = self.account_data_for_room(
+            room_id, tags_by_room, account_data_by_room
         )
 
-        state = yield self.compute_state_delta(
-            since_token=since_token,
-            previous_state=state_at_previous_sync,
-            current_state=current_state,
+        account_data = sync_config.filter_collection.filter_room_account_data(
+            account_data
         )
 
-        just_joined = yield self.check_joined_room(sync_config, state)
-        if just_joined:
-            state = yield self.get_state_at(room_id, now_token)
-
-        notifs = yield self.unread_notifs_for_room_id(
-            room_id, sync_config, all_ephemeral_by_room
+        ephemeral = sync_config.filter_collection.filter_room_ephemeral(
+            ephemeral_by_room.get(room_id, [])
         )
 
         unread_notifications = {}
-        if notifs is not None:
-            unread_notifications["notification_count"] = len(notifs)
-            unread_notifications["highlight_count"] = len([
-                1 for notif in notifs if _action_has_highlight(notif["actions"])
-            ])
-
         room_sync = JoinedSyncResult(
             room_id=room_id,
             timeline=batch,
             state=state,
-            ephemeral=ephemeral_by_room.get(room_id, []),
-            account_data=self.account_data_for_room(
-                room_id, tags_by_room, account_data_by_room
-            ),
+            ephemeral=ephemeral,
+            account_data=account_data,
             unread_notifications=unread_notifications,
         )
 
+        if room_sync:
+            notifs = yield self.unread_notifs_for_room_id(
+                room_id, sync_config
+            )
+
+            if notifs is not None:
+                unread_notifications["notification_count"] = notifs["notify_count"]
+                unread_notifications["highlight_count"] = notifs["highlight_count"]
+
         logger.debug("Room sync: %r", room_sync)
 
         defer.returnValue(room_sync)
 
     @defer.inlineCallbacks
-    def incremental_sync_for_archived_room(self, sync_config, leave_event,
+    def incremental_sync_for_archived_room(self, sync_config, room_id, leave_event_id,
                                            since_token, tags_by_room,
-                                           account_data_by_room):
+                                           account_data_by_room, full_state,
+                                           leave_token=None):
         """ Get the incremental delta needed to bring the client up to date for
         the archived room.
         Returns:
             A Deferred ArchivedSyncResult
         """
 
-        stream_token = yield self.store.get_stream_token_for_event(
-            leave_event.event_id
-        )
+        if not leave_token:
+            stream_token = yield self.store.get_stream_token_for_event(
+                leave_event_id
+            )
 
-        leave_token = since_token.copy_and_replace("room_key", stream_token)
+            leave_token = since_token.copy_and_replace("room_key", stream_token)
 
-        if since_token.is_after(leave_token):
+        if since_token and since_token.is_after(leave_token):
             defer.returnValue(None)
 
         batch = yield self.load_filtered_recents(
-            leave_event.room_id, sync_config, leave_token, since_token,
+            room_id, sync_config, leave_token, since_token,
         )
 
         logger.debug("Recents %r", batch)
 
-        state_events_at_leave = yield self.store.get_state_for_event(
-            leave_event.event_id
+        state_events_delta = yield self.compute_state_delta(
+            room_id, batch, sync_config, since_token, leave_token,
+            full_state=full_state
         )
 
-        state_at_previous_sync = yield self.get_state_at(
-            leave_event.room_id, stream_position=since_token
+        account_data = self.account_data_for_room(
+            room_id, tags_by_room, account_data_by_room
         )
 
-        state_events_delta = yield self.compute_state_delta(
-            since_token=since_token,
-            previous_state=state_at_previous_sync,
-            current_state=state_events_at_leave,
+        account_data = sync_config.filter_collection.filter_room_account_data(
+            account_data
         )
 
         room_sync = ArchivedSyncResult(
-            room_id=leave_event.room_id,
+            room_id=room_id,
             timeline=batch,
             state=state_events_delta,
-            account_data=self.account_data_for_room(
-                leave_event.room_id, tags_by_room, account_data_by_room
-            ),
+            account_data=account_data,
         )
 
         logger.debug("Room sync: %r", room_sync)
@@ -812,15 +853,19 @@ class SyncHandler(BaseHandler):
             state = {}
         defer.returnValue(state)
 
-    def compute_state_delta(self, since_token, previous_state, current_state):
-        """ Works out the differnce in state between the current state and the
-        state the client got when it last performed a sync.
-
-        :param str since_token: the point we are comparing against
-        :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the
-            state to compare to
-        :param dict[(str,str), synapse.events.FrozenEvent] current_state: the
-            new state
+    @defer.inlineCallbacks
+    def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
+                            full_state):
+        """ Works out the differnce in state between the start of the timeline
+        and the previous sync.
+
+        :param str room_id
+        :param TimelineBatch batch: The timeline batch for the room that will
+            be sent to the user.
+        :param sync_config
+        :param str since_token: Token of the end of the previous batch. May be None.
+        :param str now_token: Token of the end of the current batch.
+        :param bool full_state: Whether to force returning the full state.
 
         :returns A new event dictionary
         """
@@ -829,12 +874,65 @@ class SyncHandler(BaseHandler):
         # updates even if they occured logically before the previous event.
         # TODO(mjark) Check for new redactions in the state events.
 
-        state_delta = {}
-        for key, event in current_state.iteritems():
-            if (key not in previous_state or
-                    previous_state[key].event_id != event.event_id):
-                state_delta[key] = event
-        return state_delta
+        with Measure(self.clock, "compute_state_delta"):
+            if full_state:
+                if batch:
+                    current_state = yield self.store.get_state_for_event(
+                        batch.events[-1].event_id
+                    )
+
+                    state = yield self.store.get_state_for_event(
+                        batch.events[0].event_id
+                    )
+                else:
+                    current_state = yield self.get_state_at(
+                        room_id, stream_position=now_token
+                    )
+
+                    state = current_state
+
+                timeline_state = {
+                    (event.type, event.state_key): event
+                    for event in batch.events if event.is_state()
+                }
+
+                state = _calculate_state(
+                    timeline_contains=timeline_state,
+                    timeline_start=state,
+                    previous={},
+                    current=current_state,
+                )
+            elif batch.limited:
+                state_at_previous_sync = yield self.get_state_at(
+                    room_id, stream_position=since_token
+                )
+
+                current_state = yield self.store.get_state_for_event(
+                    batch.events[-1].event_id
+                )
+
+                state_at_timeline_start = yield self.store.get_state_for_event(
+                    batch.events[0].event_id
+                )
+
+                timeline_state = {
+                    (event.type, event.state_key): event
+                    for event in batch.events if event.is_state()
+                }
+
+                state = _calculate_state(
+                    timeline_contains=timeline_state,
+                    timeline_start=state_at_timeline_start,
+                    previous=state_at_previous_sync,
+                    current=current_state,
+                )
+            else:
+                state = {}
+
+            defer.returnValue({
+                (e.type, e.state_key): e
+                for e in sync_config.filter_collection.filter_room_state(state.values())
+            })
 
     def check_joined_room(self, sync_config, state_delta):
         """
@@ -855,21 +953,24 @@ class SyncHandler(BaseHandler):
         return False
 
     @defer.inlineCallbacks
-    def unread_notifs_for_room_id(self, room_id, sync_config, ephemeral_by_room):
-        last_unread_event_id = self.last_read_event_id_for_room_and_user(
-            room_id, sync_config.user.to_string(), ephemeral_by_room
-        )
-
-        notifs = []
-        if last_unread_event_id:
-            notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
-                room_id, sync_config.user.to_string(), last_unread_event_id
+    def unread_notifs_for_room_id(self, room_id, sync_config):
+        with Measure(self.clock, "unread_notifs_for_room_id"):
+            last_unread_event_id = yield self.store.get_last_receipt_event_id_for_user(
+                user_id=sync_config.user.to_string(),
+                room_id=room_id,
+                receipt_type="m.read"
             )
-            defer.returnValue(notifs)
 
-        # There is no new information in this period, so your notification
-        # count is whatever it was last time.
-        defer.returnValue(None)
+            notifs = []
+            if last_unread_event_id:
+                notifs = yield self.store.get_unread_event_push_actions_by_room_for_user(
+                    room_id, sync_config.user.to_string(), last_unread_event_id
+                )
+                defer.returnValue(notifs)
+
+            # There is no new information in this period, so your notification
+            # count is whatever it was last time.
+            defer.returnValue(None)
 
 
 def _action_has_highlight(actions):
@@ -881,3 +982,40 @@ def _action_has_highlight(actions):
             pass
 
     return False
+
+
+def _calculate_state(timeline_contains, timeline_start, previous, current):
+    """Works out what state to include in a sync response.
+
+    Args:
+        timeline_contains (dict): state in the timeline
+        timeline_start (dict): state at the start of the timeline
+        previous (dict): state at the end of the previous sync (or empty dict
+            if this is an initial sync)
+        current (dict): state at the end of the timeline
+
+    Returns:
+        dict
+    """
+    event_id_to_state = {
+        e.event_id: e
+        for e in itertools.chain(
+            timeline_contains.values(),
+            previous.values(),
+            timeline_start.values(),
+            current.values(),
+        )
+    }
+
+    c_ids = set(e.event_id for e in current.values())
+    tc_ids = set(e.event_id for e in timeline_contains.values())
+    p_ids = set(e.event_id for e in previous.values())
+    ts_ids = set(e.event_id for e in timeline_start.values())
+
+    state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
+
+    evs = (event_id_to_state[e] for e in state_ids)
+    return {
+        (e.type, e.state_key): e
+        for e in evs
+    }
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 43bf600913..8ce27f49ec 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -19,11 +19,13 @@ from ._base import BaseHandler
 
 from synapse.api.errors import SynapseError, AuthError
 from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.metrics import Measure
 from synapse.types import UserID
 
 import logging
 
 from collections import namedtuple
+import ujson as json
 
 logger = logging.getLogger(__name__)
 
@@ -218,10 +220,24 @@ class TypingNotificationHandler(BaseHandler):
                 "typing_key", self._latest_room_serial, rooms=[room_id]
             )
 
+    def get_all_typing_updates(self, last_id, current_id):
+        # TODO: Work out a way to do this without scanning the entire state.
+        rows = []
+        for room_id, serial in self._room_serials.items():
+            if last_id < serial and serial <= current_id:
+                typing = self._room_typing[room_id]
+                typing_bytes = json.dumps([
+                    u.to_string() for u in typing
+                ], ensure_ascii=False)
+                rows.append((serial, room_id, typing_bytes))
+        rows.sort()
+        return rows
+
 
 class TypingNotificationEventSource(object):
     def __init__(self, hs):
         self.hs = hs
+        self.clock = hs.get_clock()
         self._handler = None
         self._room_member_handler = None
 
@@ -247,19 +263,20 @@ class TypingNotificationEventSource(object):
         }
 
     def get_new_events(self, from_key, room_ids, **kwargs):
-        from_key = int(from_key)
-        handler = self.handler()
+        with Measure(self.clock, "typing.get_new_events"):
+            from_key = int(from_key)
+            handler = self.handler()
 
-        events = []
-        for room_id in room_ids:
-            if room_id not in handler._room_serials:
-                continue
-            if handler._room_serials[room_id] <= from_key:
-                continue
+            events = []
+            for room_id in room_ids:
+                if room_id not in handler._room_serials:
+                    continue
+                if handler._room_serials[room_id] <= from_key:
+                    continue
 
-            events.append(self._make_event_for(room_id))
+                events.append(self._make_event_for(room_id))
 
-        return events, handler._latest_room_serial
+            return events, handler._latest_room_serial
 
     def get_current_key(self):
         return self.handler()._latest_room_serial