summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-04-11 11:12:37 +0100
committerErik Johnston <erik@matrix.org>2017-04-11 11:12:37 +0100
commit4902db1fc978d1c9a3681720cffc4dbb9d72dbea (patch)
treeee78b86f62119a7605ab2fdd1bc8afd3953e7978 /synapse/handlers
parentMerge branch 'release-v0.19.3' of github.com:matrix-org/synapse (diff)
parentBump changelog (diff)
downloadsynapse-4902db1fc978d1c9a3681720cffc4dbb9d72dbea.tar.xz
Merge branch 'release-v0.20.0' of github.com:matrix-org/synapse v0.20.0
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py32
-rw-r--r--synapse/handlers/device.py50
-rw-r--r--synapse/handlers/directory.py1
-rw-r--r--synapse/handlers/e2e_keys.py34
-rw-r--r--synapse/handlers/federation.py291
-rw-r--r--synapse/handlers/identity.py37
-rw-r--r--synapse/handlers/initial_sync.py11
-rw-r--r--synapse/handlers/presence.py98
-rw-r--r--synapse/handlers/profile.py14
-rw-r--r--synapse/handlers/receipts.py5
-rw-r--r--synapse/handlers/room_list.py66
-rw-r--r--synapse/handlers/sync.py82
12 files changed, 566 insertions, 155 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index fffba34383..e7a1bb7246 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -47,6 +48,7 @@ class AuthHandler(BaseHandler):
             LoginType.PASSWORD: self._check_password_auth,
             LoginType.RECAPTCHA: self._check_recaptcha,
             LoginType.EMAIL_IDENTITY: self._check_email_identity,
+            LoginType.MSISDN: self._check_msisdn,
             LoginType.DUMMY: self._check_dummy_auth,
         }
         self.bcrypt_rounds = hs.config.bcrypt_rounds
@@ -307,31 +309,47 @@ class AuthHandler(BaseHandler):
                 defer.returnValue(True)
         raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
 
-    @defer.inlineCallbacks
     def _check_email_identity(self, authdict, _):
+        return self._check_threepid('email', authdict)
+
+    def _check_msisdn(self, authdict, _):
+        return self._check_threepid('msisdn', authdict)
+
+    @defer.inlineCallbacks
+    def _check_dummy_auth(self, authdict, _):
+        yield run_on_reactor()
+        defer.returnValue(True)
+
+    @defer.inlineCallbacks
+    def _check_threepid(self, medium, authdict):
         yield run_on_reactor()
 
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
         threepid_creds = authdict['threepid_creds']
+
         identity_handler = self.hs.get_handlers().identity_handler
 
-        logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,))
+        logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
         threepid = yield identity_handler.threepid_from_creds(threepid_creds)
 
         if not threepid:
             raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
 
+        if threepid['medium'] != medium:
+            raise LoginError(
+                401,
+                "Expecting threepid of type '%s', got '%s'" % (
+                    medium, threepid['medium'],
+                ),
+                errcode=Codes.UNAUTHORIZED
+            )
+
         threepid['threepid_creds'] = authdict['threepid_creds']
 
         defer.returnValue(threepid)
 
-    @defer.inlineCallbacks
-    def _check_dummy_auth(self, authdict, _):
-        yield run_on_reactor()
-        defer.returnValue(True)
-
     def _get_params_recaptcha(self):
         return {"public_key": self.hs.config.recaptcha_public_key}
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index e859b3165f..c22f65ce5d 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -170,6 +170,40 @@ class DeviceHandler(BaseHandler):
         yield self.notify_device_update(user_id, [device_id])
 
     @defer.inlineCallbacks
+    def delete_devices(self, user_id, device_ids):
+        """ Delete several devices
+
+        Args:
+            user_id (str):
+            device_ids (str): The list of device IDs to delete
+
+        Returns:
+            defer.Deferred:
+        """
+
+        try:
+            yield self.store.delete_devices(user_id, device_ids)
+        except errors.StoreError, e:
+            if e.code == 404:
+                # no match
+                pass
+            else:
+                raise
+
+        # Delete access tokens and e2e keys for each device. Not optimised as it is not
+        # considered as part of a critical path.
+        for device_id in device_ids:
+            yield self.store.user_delete_access_tokens(
+                user_id, device_id=device_id,
+                delete_refresh_tokens=True,
+            )
+            yield self.store.delete_e2e_keys_by_device(
+                user_id=user_id, device_id=device_id
+            )
+
+        yield self.notify_device_update(user_id, device_ids)
+
+    @defer.inlineCallbacks
     def update_device(self, user_id, device_id, content):
         """ Update the given device
 
@@ -214,8 +248,7 @@ class DeviceHandler(BaseHandler):
             user_id, device_ids, list(hosts)
         )
 
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        room_ids = [r.room_id for r in rooms]
+        room_ids = yield self.store.get_rooms_for_user(user_id)
 
         yield self.notifier.on_new_event(
             "device_list_key", position, rooms=room_ids,
@@ -236,8 +269,7 @@ class DeviceHandler(BaseHandler):
             user_id (str)
             from_token (StreamToken)
         """
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        room_ids = set(r.room_id for r in rooms)
+        room_ids = yield self.store.get_rooms_for_user(user_id)
 
         # First we check if any devices have changed
         changed = yield self.store.get_user_whose_devices_changed(
@@ -262,7 +294,7 @@ class DeviceHandler(BaseHandler):
                 # ordering: treat it the same as a new room
                 event_ids = []
 
-            current_state_ids = yield self.state.get_current_state_ids(room_id)
+            current_state_ids = yield self.store.get_current_state_ids(room_id)
 
             # special-case for an empty prev state: include all members
             # in the changed list
@@ -313,8 +345,8 @@ class DeviceHandler(BaseHandler):
     @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
         user_id = user.to_string()
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        if not rooms:
+        room_ids = yield self.store.get_rooms_for_user(user_id)
+        if not room_ids:
             # We no longer share rooms with this user, so we'll no longer
             # receive device updates. Mark this in DB.
             yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
@@ -370,8 +402,8 @@ class DeviceListEduUpdater(object):
             logger.warning("Got device list update edu for %r from %r", user_id, origin)
             return
 
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        if not rooms:
+        room_ids = yield self.store.get_rooms_for_user(user_id)
+        if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
             # probably won't get any further updates.
             return
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 1b5317edf5..943554ce98 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -175,6 +175,7 @@ class DirectoryHandler(BaseHandler):
                         "room_alias": room_alias.to_string(),
                     },
                     retry_on_dns_fail=False,
+                    ignore_backoff=True,
                 )
             except CodeMessageException as e:
                 logging.warn("Error retrieving alias")
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index e40495d1ab..c2b38d72a9 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
 from synapse.api.errors import SynapseError, CodeMessageException
 from synapse.types import get_domain_from_id
 from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
-from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+from synapse.util.retryutils import NotRetryingDestination
 
 logger = logging.getLogger(__name__)
 
@@ -121,15 +121,11 @@ class E2eKeysHandler(object):
         def do_remote_query(destination):
             destination_query = remote_queries_not_in_cache[destination]
             try:
-                limiter = yield get_retry_limiter(
-                    destination, self.clock, self.store
+                remote_result = yield self.federation.query_client_keys(
+                    destination,
+                    {"device_keys": destination_query},
+                    timeout=timeout
                 )
-                with limiter:
-                    remote_result = yield self.federation.query_client_keys(
-                        destination,
-                        {"device_keys": destination_query},
-                        timeout=timeout
-                    )
 
                 for user_id, keys in remote_result["device_keys"].items():
                     if user_id in destination_query:
@@ -239,18 +235,14 @@ class E2eKeysHandler(object):
         def claim_client_keys(destination):
             device_keys = remote_queries[destination]
             try:
-                limiter = yield get_retry_limiter(
-                    destination, self.clock, self.store
+                remote_result = yield self.federation.claim_client_keys(
+                    destination,
+                    {"one_time_keys": device_keys},
+                    timeout=timeout
                 )
-                with limiter:
-                    remote_result = yield self.federation.claim_client_keys(
-                        destination,
-                        {"one_time_keys": device_keys},
-                        timeout=timeout
-                    )
-                    for user_id, keys in remote_result["one_time_keys"].items():
-                        if user_id in device_keys:
-                            json_result[user_id] = keys
+                for user_id, keys in remote_result["one_time_keys"].items():
+                    if user_id in device_keys:
+                        json_result[user_id] = keys
             except CodeMessageException as e:
                 failures[destination] = {
                     "status": e.code, "message": e.message
@@ -316,7 +308,7 @@ class E2eKeysHandler(object):
         # old access_token without an associated device_id. Either way, we
         # need to double-check the device is registered to avoid ending up with
         # keys without a corresponding device.
-        self.device_handler.check_device_registered(user_id, device_id)
+        yield self.device_handler.check_device_registered(user_id, device_id)
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ed0fa51e7f..53f9296399 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
+import synapse.util.logcontext
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
@@ -31,7 +32,7 @@ from synapse.util.logcontext import (
 )
 from synapse.util.metrics import measure_func
 from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor
+from synapse.util.async import run_on_reactor, Linearizer
 from synapse.util.frozenutils import unfreeze
 from synapse.crypto.event_signing import (
     compute_event_signature, add_hashes_and_signatures,
@@ -79,29 +80,216 @@ class FederationHandler(BaseHandler):
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
+        self._room_pdu_linearizer = Linearizer("fed_room_pdu")
 
-    @log_function
     @defer.inlineCallbacks
-    def on_receive_pdu(self, origin, pdu, state=None, auth_chain=None):
-        """ Called by the ReplicationLayer when we have a new pdu. We need to
-        do auth checks and put it through the StateHandler.
+    @log_function
+    def on_receive_pdu(self, origin, pdu, get_missing=True):
+        """ Process a PDU received via a federation /send/ transaction, or
+        via backfill of missing prev_events
+
+        Args:
+            origin (str): server which initiated the /send/ transaction. Will
+                be used to fetch missing events or state.
+            pdu (FrozenEvent): received PDU
+            get_missing (bool): True if we should fetch missing prev_events
 
-        auth_chain and state are None if we already have the necessary state
-        and prev_events in the db
+        Returns (Deferred): completes with None
         """
-        event = pdu
 
-        logger.debug("Got event: %s", event.event_id)
+        # We reprocess pdus when we have seen them only as outliers
+        existing = yield self.get_persisted_pdu(
+            origin, pdu.event_id, do_auth=False
+        )
+
+        # FIXME: Currently we fetch an event again when we already have it
+        # if it has been marked as an outlier.
+
+        already_seen = (
+            existing and (
+                not existing.internal_metadata.is_outlier()
+                or pdu.internal_metadata.is_outlier()
+            )
+        )
+        if already_seen:
+            logger.debug("Already seen pdu %s", pdu.event_id)
+            return
 
         # If we are currently in the process of joining this room, then we
         # queue up events for later processing.
-        if event.room_id in self.room_queues:
-            self.room_queues[event.room_id].append((pdu, origin))
+        if pdu.room_id in self.room_queues:
+            logger.info("Ignoring PDU %s for room %s from %s for now; join "
+                        "in progress", pdu.event_id, pdu.room_id, origin)
+            self.room_queues[pdu.room_id].append((pdu, origin))
             return
 
-        logger.debug("Processing event: %s", event.event_id)
+        state = None
+
+        auth_chain = []
+
+        have_seen = yield self.store.have_events(
+            [ev for ev, _ in pdu.prev_events]
+        )
+
+        fetch_state = False
+
+        # Get missing pdus if necessary.
+        if not pdu.internal_metadata.is_outlier():
+            # We only backfill backwards to the min depth.
+            min_depth = yield self.get_min_depth_for_context(
+                pdu.room_id
+            )
+
+            logger.debug(
+                "_handle_new_pdu min_depth for %s: %d",
+                pdu.room_id, min_depth
+            )
+
+            prevs = {e_id for e_id, _ in pdu.prev_events}
+            seen = set(have_seen.keys())
+
+            if min_depth and pdu.depth < min_depth:
+                # This is so that we don't notify the user about this
+                # message, to work around the fact that some events will
+                # reference really really old events we really don't want to
+                # send to the clients.
+                pdu.internal_metadata.outlier = True
+            elif min_depth and pdu.depth > min_depth:
+                if get_missing and prevs - seen:
+                    # If we're missing stuff, ensure we only fetch stuff one
+                    # at a time.
+                    logger.info(
+                        "Acquiring lock for room %r to fetch %d missing events: %r...",
+                        pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
+                    )
+                    with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
+                        logger.info(
+                            "Acquired lock for room %r to fetch %d missing events",
+                            pdu.room_id, len(prevs - seen),
+                        )
+
+                        yield self._get_missing_events_for_pdu(
+                            origin, pdu, prevs, min_depth
+                        )
+
+            prevs = {e_id for e_id, _ in pdu.prev_events}
+            seen = set(have_seen.keys())
+            if prevs - seen:
+                logger.info(
+                    "Still missing %d events for room %r: %r...",
+                    len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+                )
+                fetch_state = True
+
+        if fetch_state:
+            # We need to get the state at this event, since we haven't
+            # processed all the prev events.
+            logger.debug(
+                "_handle_new_pdu getting state for %s",
+                pdu.room_id
+            )
+            try:
+                state, auth_chain = yield self.replication_layer.get_state_for_room(
+                    origin, pdu.room_id, pdu.event_id,
+                )
+            except:
+                logger.exception("Failed to get state for event: %s", pdu.event_id)
+
+        yield self._process_received_pdu(
+            origin,
+            pdu,
+            state=state,
+            auth_chain=auth_chain,
+        )
+
+    @defer.inlineCallbacks
+    def _get_missing_events_for_pdu(self, origin, pdu, prevs, min_depth):
+        """
+        Args:
+            origin (str): Origin of the pdu. Will be called to get the missing events
+            pdu: received pdu
+            prevs (str[]): List of event ids which we are missing
+            min_depth (int): Minimum depth of events to return.
+
+        Returns:
+            Deferred<dict(str, str?)>: updated have_seen dictionary
+        """
+        # We recalculate seen, since it may have changed.
+        have_seen = yield self.store.have_events(prevs)
+        seen = set(have_seen.keys())
 
-        logger.debug("Event: %s", event)
+        if not prevs - seen:
+            # nothing left to do
+            defer.returnValue(have_seen)
+
+        latest = yield self.store.get_latest_event_ids_in_room(
+            pdu.room_id
+        )
+
+        # We add the prev events that we have seen to the latest
+        # list to ensure the remote server doesn't give them to us
+        latest = set(latest)
+        latest |= seen
+
+        logger.info(
+            "Missing %d events for room %r: %r...",
+            len(prevs - seen), pdu.room_id, list(prevs - seen)[:5]
+        )
+
+        # XXX: we set timeout to 10s to help workaround
+        # https://github.com/matrix-org/synapse/issues/1733.
+        # The reason is to avoid holding the linearizer lock
+        # whilst processing inbound /send transactions, causing
+        # FDs to stack up and block other inbound transactions
+        # which empirically can currently take up to 30 minutes.
+        #
+        # N.B. this explicitly disables retry attempts.
+        #
+        # N.B. this also increases our chances of falling back to
+        # fetching fresh state for the room if the missing event
+        # can't be found, which slightly reduces our security.
+        # it may also increase our DAG extremity count for the room,
+        # causing additional state resolution?  See #1760.
+        # However, fetching state doesn't hold the linearizer lock
+        # apparently.
+        #
+        # see https://github.com/matrix-org/synapse/pull/1744
+
+        missing_events = yield self.replication_layer.get_missing_events(
+            origin,
+            pdu.room_id,
+            earliest_events_ids=list(latest),
+            latest_events=[pdu],
+            limit=10,
+            min_depth=min_depth,
+            timeout=10000,
+        )
+
+        # We want to sort these by depth so we process them and
+        # tell clients about them in order.
+        missing_events.sort(key=lambda x: x.depth)
+
+        for e in missing_events:
+            yield self.on_receive_pdu(
+                origin,
+                e,
+                get_missing=False
+            )
+
+        have_seen = yield self.store.have_events(
+            [ev for ev, _ in pdu.prev_events]
+        )
+        defer.returnValue(have_seen)
+
+    @log_function
+    @defer.inlineCallbacks
+    def _process_received_pdu(self, origin, pdu, state, auth_chain):
+        """ Called when we have a new pdu. We need to do auth checks and put it
+        through the StateHandler.
+        """
+        event = pdu
+
+        logger.debug("Processing event: %s", event)
 
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
@@ -670,8 +858,6 @@ class FederationHandler(BaseHandler):
         """
         logger.debug("Joining %s to %s", joinee, room_id)
 
-        yield self.store.clean_room_for_join(room_id)
-
         origin, event = yield self._make_and_verify_event(
             target_hosts,
             room_id,
@@ -680,7 +866,15 @@ class FederationHandler(BaseHandler):
             content,
         )
 
+        # This shouldn't happen, because the RoomMemberHandler has a
+        # linearizer lock which only allows one operation per user per room
+        # at a time - so this is just paranoia.
+        assert (room_id not in self.room_queues)
+
         self.room_queues[room_id] = []
+
+        yield self.store.clean_room_for_join(room_id)
+
         handled_events = set()
 
         try:
@@ -733,18 +927,37 @@ class FederationHandler(BaseHandler):
             room_queue = self.room_queues[room_id]
             del self.room_queues[room_id]
 
-            for p, origin in room_queue:
-                if p.event_id in handled_events:
-                    continue
+            # we don't need to wait for the queued events to be processed -
+            # it's just a best-effort thing at this point. We do want to do
+            # them roughly in order, though, otherwise we'll end up making
+            # lots of requests for missing prev_events which we do actually
+            # have. Hence we fire off the deferred, but don't wait for it.
 
-                try:
-                    self.on_receive_pdu(origin, p)
-                except:
-                    logger.exception("Couldn't handle pdu")
+            synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
+                room_queue
+            )
 
         defer.returnValue(True)
 
     @defer.inlineCallbacks
+    def _handle_queued_pdus(self, room_queue):
+        """Process PDUs which got queued up while we were busy send_joining.
+
+        Args:
+            room_queue (list[FrozenEvent, str]): list of PDUs to be processed
+                and the servers that sent them
+        """
+        for p, origin in room_queue:
+            try:
+                logger.info("Processing queued PDU %s which was received "
+                            "while we were joining %s", p.event_id, p.room_id)
+                yield self.on_receive_pdu(origin, p)
+            except Exception as e:
+                logger.warn(
+                    "Error handling queued PDU %s from %s: %s",
+                    p.event_id, origin, e)
+
+    @defer.inlineCallbacks
     @log_function
     def on_make_join_request(self, room_id, user_id):
         """ We've received a /make_join/ request, so we create a partial
@@ -791,9 +1004,19 @@ class FederationHandler(BaseHandler):
         )
 
         event.internal_metadata.outlier = False
-        # Send this event on behalf of the origin server since they may not
-        # have an up to data view of the state of the room at this event so
-        # will not know which servers to send the event to.
+        # Send this event on behalf of the origin server.
+        #
+        # The reasons we have the destination server rather than the origin
+        # server send it are slightly mysterious: the origin server should have
+        # all the neccessary state once it gets the response to the send_join,
+        # so it could send the event itself if it wanted to. It may be that
+        # doing it this way reduces failure modes, or avoids certain attacks
+        # where a new server selectively tells a subset of the federation that
+        # it has joined.
+        #
+        # The fact is that, as of the current writing, Synapse doesn't send out
+        # the join event over federation after joining, and changing it now
+        # would introduce the danger of backwards-compatibility problems.
         event.internal_metadata.send_on_behalf_of = origin
 
         context, event_stream_id, max_stream_id = yield self._handle_new_event(
@@ -878,15 +1101,15 @@ class FederationHandler(BaseHandler):
                 user_id,
                 "leave"
             )
-            signed_event = self._sign_event(event)
+            event = self._sign_event(event)
         except SynapseError:
             raise
         except CodeMessageException as e:
             logger.warn("Failed to reject invite: %s", e)
             raise SynapseError(500, "Failed to reject invite")
 
-        # Try the host we successfully got a response to /make_join/
-        # request first.
+        # Try the host that we succesfully called /make_leave/ on first for
+        # the /send_leave/ request.
         try:
             target_hosts.remove(origin)
             target_hosts.insert(0, origin)
@@ -896,7 +1119,7 @@ class FederationHandler(BaseHandler):
         try:
             yield self.replication_layer.send_leave(
                 target_hosts,
-                signed_event
+                event
             )
         except SynapseError:
             raise
@@ -1325,7 +1548,17 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _prep_event(self, origin, event, state=None, auth_events=None):
+        """
+
+        Args:
+            origin:
+            event:
+            state:
+            auth_events:
 
+        Returns:
+            Deferred, which resolves to synapse.events.snapshot.EventContext
+        """
         context = yield self.state_handler.compute_event_context(
             event, old_state=state,
         )
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 559e5d5a71..6a53c5eb47 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -150,7 +151,7 @@ class IdentityHandler(BaseHandler):
         params.update(kwargs)
 
         try:
-            data = yield self.http_client.post_urlencoded_get_json(
+            data = yield self.http_client.post_json_get_json(
                 "https://%s%s" % (
                     id_server,
                     "/_matrix/identity/api/v1/validate/email/requestToken"
@@ -161,3 +162,37 @@ class IdentityHandler(BaseHandler):
         except CodeMessageException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e
+
+    @defer.inlineCallbacks
+    def requestMsisdnToken(
+            self, id_server, country, phone_number,
+            client_secret, send_attempt, **kwargs
+    ):
+        yield run_on_reactor()
+
+        if not self._should_trust_id_server(id_server):
+            raise SynapseError(
+                400, "Untrusted ID server '%s'" % id_server,
+                Codes.SERVER_NOT_TRUSTED
+            )
+
+        params = {
+            'country': country,
+            'phone_number': phone_number,
+            'client_secret': client_secret,
+            'send_attempt': send_attempt,
+        }
+        params.update(kwargs)
+
+        try:
+            data = yield self.http_client.post_json_get_json(
+                "https://%s%s" % (
+                    id_server,
+                    "/_matrix/identity/api/v1/validate/msisdn/requestToken"
+                ),
+                params
+            )
+            defer.returnValue(data)
+        except CodeMessageException as e:
+            logger.info("Proxied requestToken failed: %r", e)
+            raise e
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index e0ade4c164..10f5f35a69 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -19,6 +19,7 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
+from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
 from synapse.types import (
     UserID, StreamToken,
@@ -225,9 +226,17 @@ class InitialSyncHandler(BaseHandler):
                 "content": content,
             })
 
+        now = self.clock.time_msec()
+
         ret = {
             "rooms": rooms_ret,
-            "presence": presence,
+            "presence": [
+                {
+                    "type": "m.presence",
+                    "content": format_user_presence_state(event, now),
+                }
+                for event in presence
+            ],
             "account_data": account_data_events,
             "receipts": receipt,
             "end": now_token.to_string(),
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index da610e430f..1ede117c79 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -29,6 +29,7 @@ from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
 from synapse.storage.presence import UserPresenceState
 
+from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -556,9 +557,9 @@ class PresenceHandler(object):
         room_ids_to_states = {}
         users_to_states = {}
         for state in states:
-            events = yield self.store.get_rooms_for_user(state.user_id)
-            for e in events:
-                room_ids_to_states.setdefault(e.room_id, []).append(state)
+            room_ids = yield self.store.get_rooms_for_user(state.user_id)
+            for room_id in room_ids:
+                room_ids_to_states.setdefault(room_id, []).append(state)
 
             plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
             for u in plist:
@@ -574,8 +575,7 @@ class PresenceHandler(object):
                 if not local_states:
                     continue
 
-                users = yield self.store.get_users_in_room(room_id)
-                hosts = set(get_domain_from_id(u) for u in users)
+                hosts = yield self.store.get_hosts_in_room(room_id)
 
                 for host in hosts:
                     hosts_to_states.setdefault(host, []).extend(local_states)
@@ -719,9 +719,7 @@ class PresenceHandler(object):
                 for state in updates
             ])
         else:
-            defer.returnValue([
-                format_user_presence_state(state, now) for state in updates
-            ])
+            defer.returnValue(updates)
 
     @defer.inlineCallbacks
     def set_state(self, target_user, state, ignore_status_msg=False):
@@ -795,6 +793,9 @@ class PresenceHandler(object):
             as_event=False,
         )
 
+        now = self.clock.time_msec()
+        results[:] = [format_user_presence_state(r, now) for r in results]
+
         is_accepted = {
             row["observed_user_id"]: row["accepted"] for row in presence_list
         }
@@ -847,6 +848,7 @@ class PresenceHandler(object):
             )
 
             state_dict = yield self.get_state(observed_user, as_event=False)
+            state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
 
             self.federation.send_edu(
                 destination=observer_user.domain,
@@ -910,11 +912,12 @@ class PresenceHandler(object):
     def is_visible(self, observed_user, observer_user):
         """Returns whether a user can see another user's presence.
         """
-        observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string())
-        observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string())
-
-        observer_room_ids = set(r.room_id for r in observer_rooms)
-        observed_room_ids = set(r.room_id for r in observed_rooms)
+        observer_room_ids = yield self.store.get_rooms_for_user(
+            observer_user.to_string()
+        )
+        observed_room_ids = yield self.store.get_rooms_for_user(
+            observed_user.to_string()
+        )
 
         if observer_room_ids & observed_room_ids:
             defer.returnValue(True)
@@ -979,14 +982,18 @@ def should_notify(old_state, new_state):
     return False
 
 
-def format_user_presence_state(state, now):
+def format_user_presence_state(state, now, include_user_id=True):
     """Convert UserPresenceState to a format that can be sent down to clients
     and to other servers.
+
+    The "user_id" is optional so that this function can be used to format presence
+    updates for client /sync responses and for federation /send requests.
     """
     content = {
         "presence": state.state,
-        "user_id": state.user_id,
     }
+    if include_user_id:
+        content["user_id"] = state.user_id
     if state.last_active_ts:
         content["last_active_ago"] = now - state.last_active_ts
     if state.status_msg and state.state != PresenceState.OFFLINE:
@@ -1025,7 +1032,6 @@ class PresenceEventSource(object):
         # sending down the rare duplicate is not a concern.
 
         with Measure(self.clock, "presence.get_new_events"):
-            user_id = user.to_string()
             if from_key is not None:
                 from_key = int(from_key)
 
@@ -1034,18 +1040,7 @@ class PresenceEventSource(object):
 
             max_token = self.store.get_current_presence_token()
 
-            plist = yield self.store.get_presence_list_accepted(user.localpart)
-            users_interested_in = set(row["observed_user_id"] for row in plist)
-            users_interested_in.add(user_id)  # So that we receive our own presence
-
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-                user_id
-            )
-            users_interested_in.update(users_who_share_room)
-
-            if explicit_room_id:
-                user_ids = yield self.store.get_users_in_room(explicit_room_id)
-                users_interested_in.update(user_ids)
+            users_interested_in = yield self._get_interested_in(user, explicit_room_id)
 
             user_ids_changed = set()
             changed = None
@@ -1073,16 +1068,13 @@ class PresenceEventSource(object):
 
             updates = yield presence.current_state_for_users(user_ids_changed)
 
-        now = self.clock.time_msec()
-
-        defer.returnValue(([
-            {
-                "type": "m.presence",
-                "content": format_user_presence_state(s, now),
-            }
-            for s in updates.values()
-            if include_offline or s.state != PresenceState.OFFLINE
-        ], max_token))
+        if include_offline:
+            defer.returnValue((updates.values(), max_token))
+        else:
+            defer.returnValue(([
+                s for s in updates.itervalues()
+                if s.state != PresenceState.OFFLINE
+            ], max_token))
 
     def get_current_key(self):
         return self.store.get_current_presence_token()
@@ -1090,6 +1082,31 @@ class PresenceEventSource(object):
     def get_pagination_rows(self, user, pagination_config, key):
         return self.get_new_events(user, from_key=None, include_offline=False)
 
+    @cachedInlineCallbacks(num_args=2, cache_context=True)
+    def _get_interested_in(self, user, explicit_room_id, cache_context):
+        """Returns the set of users that the given user should see presence
+        updates for
+        """
+        user_id = user.to_string()
+        plist = yield self.store.get_presence_list_accepted(
+            user.localpart, on_invalidate=cache_context.invalidate,
+        )
+        users_interested_in = set(row["observed_user_id"] for row in plist)
+        users_interested_in.add(user_id)  # So that we receive our own presence
+
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id, on_invalidate=cache_context.invalidate,
+        )
+        users_interested_in.update(users_who_share_room)
+
+        if explicit_room_id:
+            user_ids = yield self.store.get_users_in_room(
+                explicit_room_id, on_invalidate=cache_context.invalidate,
+            )
+            users_interested_in.update(user_ids)
+
+        defer.returnValue(users_interested_in)
+
 
 def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     """Checks the presence of users that have timed out and updates as
@@ -1157,7 +1174,10 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
         # If there are have been no sync for a while (and none ongoing),
         # set presence to offline
         if user_id not in syncing_user_ids:
-            if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+            # If the user has done something recently but hasn't synced,
+            # don't set them as offline.
+            sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
+            if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
                     state=PresenceState.OFFLINE,
                     status_msg=None,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 87f74dfb8e..9bf638f818 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -52,7 +52,8 @@ class ProfileHandler(BaseHandler):
                     args={
                         "user_id": target_user.to_string(),
                         "field": "displayname",
-                    }
+                    },
+                    ignore_backoff=True,
                 )
             except CodeMessageException as e:
                 if e.code != 404:
@@ -99,7 +100,8 @@ class ProfileHandler(BaseHandler):
                     args={
                         "user_id": target_user.to_string(),
                         "field": "avatar_url",
-                    }
+                    },
+                    ignore_backoff=True,
                 )
             except CodeMessageException as e:
                 if e.code != 404:
@@ -156,11 +158,11 @@ class ProfileHandler(BaseHandler):
 
         self.ratelimit(requester)
 
-        joins = yield self.store.get_rooms_for_user(
+        room_ids = yield self.store.get_rooms_for_user(
             user.to_string(),
         )
 
-        for j in joins:
+        for room_id in room_ids:
             handler = self.hs.get_handlers().room_member_handler
             try:
                 # Assume the user isn't a guest because we don't let guests set
@@ -171,12 +173,12 @@ class ProfileHandler(BaseHandler):
                 yield handler.update_membership(
                     requester,
                     user,
-                    j.room_id,
+                    room_id,
                     "join",  # We treat a profile update like a join.
                     ratelimit=False,  # Try to hide that these events aren't atomic.
                 )
             except Exception as e:
                 logger.warn(
                     "Failed to update join event for room %s - %s",
-                    j.room_id, str(e.message)
+                    room_id, str(e.message)
                 )
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 50aa513935..e1cd3a48e9 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -210,10 +210,9 @@ class ReceiptEventSource(object):
         else:
             from_key = None
 
-        rooms = yield self.store.get_rooms_for_user(user.to_string())
-        rooms = [room.room_id for room in rooms]
+        room_ids = yield self.store.get_rooms_for_user(user.to_string())
         events = yield self.store.get_linearized_receipts_for_rooms(
-            rooms,
+            room_ids,
             from_key=from_key,
             to_key=to_key,
         )
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 19eebbd43f..516cd9a6ac 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -21,6 +21,7 @@ from synapse.api.constants import (
     EventTypes, JoinRules,
 )
 from synapse.util.async import concurrently_execute
+from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.types import ThirdPartyInstanceID
 
@@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler):
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
         """
+        logger.info(
+            "Getting public room list: limit=%r, since=%r, search=%r, network=%r",
+            limit, since_token, bool(search_filter), network_tuple,
+        )
         if search_filter:
             # We explicitly don't bother caching searches or requests for
             # appservice specific lists.
@@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler):
 
         rooms_to_order_value = {}
         rooms_to_num_joined = {}
-        rooms_to_latest_event_ids = {}
 
         newly_visible = []
         newly_unpublished = []
@@ -116,19 +120,26 @@ class RoomListHandler(BaseHandler):
 
         @defer.inlineCallbacks
         def get_order_for_room(room_id):
-            latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
-            if not latest_event_ids:
+            # Most of the rooms won't have changed between the since token and
+            # now (especially if the since token is "now"). So, we can ask what
+            # the current users are in a room (that will hit a cache) and then
+            # check if the room has changed since the since token. (We have to
+            # do it in that order to avoid races).
+            # If things have changed then fall back to getting the current state
+            # at the since token.
+            joined_users = yield self.store.get_users_in_room(room_id)
+            if self.store.has_room_changed_since(room_id, stream_token):
                 latest_event_ids = yield self.store.get_forward_extremeties_for_room(
                     room_id, stream_token
                 )
-                rooms_to_latest_event_ids[room_id] = latest_event_ids
 
-            if not latest_event_ids:
-                return
+                if not latest_event_ids:
+                    return
+
+                joined_users = yield self.state_handler.get_current_user_in_room(
+                    room_id, latest_event_ids,
+                )
 
-            joined_users = yield self.state_handler.get_current_user_in_room(
-                room_id, latest_event_ids,
-            )
             num_joined_users = len(joined_users)
             rooms_to_num_joined[room_id] = num_joined_users
 
@@ -165,19 +176,19 @@ class RoomListHandler(BaseHandler):
                 rooms_to_scan = rooms_to_scan[:since_token.current_limit]
                 rooms_to_scan.reverse()
 
-        # Actually generate the entries. _generate_room_entry will append to
+        # Actually generate the entries. _append_room_entry_to_chunk will append to
         # chunk but will stop if len(chunk) > limit
         chunk = []
         if limit and not search_filter:
             step = limit + 1
             for i in xrange(0, len(rooms_to_scan), step):
                 # We iterate here because the vast majority of cases we'll stop
-                # at first iteration, but occaisonally _generate_room_entry
+                # at first iteration, but occaisonally _append_room_entry_to_chunk
                 # won't append to the chunk and so we need to loop again.
                 # We don't want to scan over the entire range either as that
                 # would potentially waste a lot of work.
                 yield concurrently_execute(
-                    lambda r: self._generate_room_entry(
+                    lambda r: self._append_room_entry_to_chunk(
                         r, rooms_to_num_joined[r],
                         chunk, limit, search_filter
                     ),
@@ -187,7 +198,7 @@ class RoomListHandler(BaseHandler):
                     break
         else:
             yield concurrently_execute(
-                lambda r: self._generate_room_entry(
+                lambda r: self._append_room_entry_to_chunk(
                     r, rooms_to_num_joined[r],
                     chunk, limit, search_filter
                 ),
@@ -256,21 +267,35 @@ class RoomListHandler(BaseHandler):
         defer.returnValue(results)
 
     @defer.inlineCallbacks
-    def _generate_room_entry(self, room_id, num_joined_users, chunk, limit,
-                             search_filter):
+    def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
+                                    search_filter):
+        """Generate the entry for a room in the public room list and append it
+        to the `chunk` if it matches the search filter
+        """
         if limit and len(chunk) > limit + 1:
             # We've already got enough, so lets just drop it.
             return
 
+        result = yield self._generate_room_entry(room_id, num_joined_users)
+
+        if result and _matches_room_entry(result, search_filter):
+            chunk.append(result)
+
+    @cachedInlineCallbacks(num_args=1, cache_context=True)
+    def _generate_room_entry(self, room_id, num_joined_users, cache_context):
+        """Returns the entry for a room
+        """
         result = {
             "room_id": room_id,
             "num_joined_members": num_joined_users,
         }
 
-        current_state_ids = yield self.state_handler.get_current_state_ids(room_id)
+        current_state_ids = yield self.store.get_current_state_ids(
+            room_id, on_invalidate=cache_context.invalidate,
+        )
 
         event_map = yield self.store.get_events([
-            event_id for key, event_id in current_state_ids.items()
+            event_id for key, event_id in current_state_ids.iteritems()
             if key[0] in (
                 EventTypes.JoinRules,
                 EventTypes.Name,
@@ -294,7 +319,9 @@ class RoomListHandler(BaseHandler):
             if join_rule and join_rule != JoinRules.PUBLIC:
                 defer.returnValue(None)
 
-        aliases = yield self.store.get_aliases_for_room(room_id)
+        aliases = yield self.store.get_aliases_for_room(
+            room_id, on_invalidate=cache_context.invalidate
+        )
         if aliases:
             result["aliases"] = aliases
 
@@ -334,8 +361,7 @@ class RoomListHandler(BaseHandler):
             if avatar_url:
                 result["avatar_url"] = avatar_url
 
-        if _matches_room_entry(result, search_filter):
-            chunk.append(result)
+        defer.returnValue(result)
 
     @defer.inlineCallbacks
     def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5572cb883f..c0205da1a9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.visibility import filter_events_for_client
+from synapse.types import RoomStreamToken
 
 from twisted.internet import defer
 
@@ -225,8 +226,7 @@ class SyncHandler(object):
         with Measure(self.clock, "ephemeral_by_room"):
             typing_key = since_token.typing_key if since_token else "0"
 
-            rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-            room_ids = [room.room_id for room in rooms]
+            room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())
 
             typing_source = self.event_sources.sources["typing"]
             typing, typing_key = yield typing_source.get_new_events(
@@ -568,16 +568,15 @@ class SyncHandler(object):
         since_token = sync_result_builder.since_token
 
         if since_token and since_token.device_list_key:
-            rooms = yield self.store.get_rooms_for_user(user_id)
-            room_ids = set(r.room_id for r in rooms)
+            room_ids = yield self.store.get_rooms_for_user(user_id)
 
             user_ids_changed = set()
             changed = yield self.store.get_user_whose_devices_changed(
                 since_token.device_list_key
             )
             for other_user_id in changed:
-                other_rooms = yield self.store.get_rooms_for_user(other_user_id)
-                if room_ids.intersection(e.room_id for e in other_rooms):
+                other_room_ids = yield self.store.get_rooms_for_user(other_user_id)
+                if room_ids.intersection(other_room_ids):
                     user_ids_changed.add(other_user_id)
 
             defer.returnValue(user_ids_changed)
@@ -721,14 +720,14 @@ class SyncHandler(object):
             extra_users_ids.update(users)
         extra_users_ids.discard(user.to_string())
 
-        states = yield self.presence_handler.get_states(
-            extra_users_ids,
-            as_event=True,
-        )
-        presence.extend(states)
+        if extra_users_ids:
+            states = yield self.presence_handler.get_states(
+                extra_users_ids,
+            )
+            presence.extend(states)
 
-        # Deduplicate the presence entries so that there's at most one per user
-        presence = {p["content"]["user_id"]: p for p in presence}.values()
+            # Deduplicate the presence entries so that there's at most one per user
+            presence = {p.user_id: p for p in presence}.values()
 
         presence = sync_config.filter_collection.filter_presence(
             presence
@@ -765,6 +764,21 @@ class SyncHandler(object):
             )
             sync_result_builder.now_token = now_token
 
+        # We check up front if anything has changed, if it hasn't then there is
+        # no point in going futher.
+        since_token = sync_result_builder.since_token
+        if not sync_result_builder.full_state:
+            if since_token and not ephemeral_by_room and not account_data_by_room:
+                have_changed = yield self._have_rooms_changed(sync_result_builder)
+                if not have_changed:
+                    tags_by_room = yield self.store.get_updated_tags(
+                        user_id,
+                        since_token.account_data_key,
+                    )
+                    if not tags_by_room:
+                        logger.debug("no-oping sync")
+                        defer.returnValue(([], []))
+
         ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
             "m.ignored_user_list", user_id=user_id,
         )
@@ -774,13 +788,12 @@ class SyncHandler(object):
         else:
             ignored_users = frozenset()
 
-        if sync_result_builder.since_token:
+        if since_token:
             res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
             room_entries, invited, newly_joined_rooms = res
 
             tags_by_room = yield self.store.get_updated_tags(
-                user_id,
-                sync_result_builder.since_token.account_data_key,
+                user_id, since_token.account_data_key,
             )
         else:
             res = yield self._get_all_rooms(sync_result_builder, ignored_users)
@@ -805,7 +818,7 @@ class SyncHandler(object):
 
         # Now we want to get any newly joined users
         newly_joined_users = set()
-        if sync_result_builder.since_token:
+        if since_token:
             for joined_sync in sync_result_builder.joined:
                 it = itertools.chain(
                     joined_sync.timeline.events, joined_sync.state.values()
@@ -818,6 +831,38 @@ class SyncHandler(object):
         defer.returnValue((newly_joined_rooms, newly_joined_users))
 
     @defer.inlineCallbacks
+    def _have_rooms_changed(self, sync_result_builder):
+        """Returns whether there may be any new events that should be sent down
+        the sync. Returns True if there are.
+        """
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+
+        assert since_token
+
+        # Get a list of membership change events that have happened.
+        rooms_changed = yield self.store.get_membership_changes_for_user(
+            user_id, since_token.room_key, now_token.room_key
+        )
+
+        if rooms_changed:
+            defer.returnValue(True)
+
+        app_service = self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            rooms = yield self.store.get_app_service_rooms(app_service)
+            joined_room_ids = set(r.room_id for r in rooms)
+        else:
+            joined_room_ids = yield self.store.get_rooms_for_user(user_id)
+
+        stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
+        for room_id in joined_room_ids:
+            if self.store.has_room_changed_since(room_id, stream_id):
+                defer.returnValue(True)
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
     def _get_rooms_changed(self, sync_result_builder, ignored_users):
         """Gets the the changes that have happened since the last sync.
 
@@ -841,8 +886,7 @@ class SyncHandler(object):
             rooms = yield self.store.get_app_service_rooms(app_service)
             joined_room_ids = set(r.room_id for r in rooms)
         else:
-            rooms = yield self.store.get_rooms_for_user(user_id)
-            joined_room_ids = set(r.room_id for r in rooms)
+            joined_room_ids = yield self.store.get_rooms_for_user(user_id)
 
         # Get a list of membership change events that have happened.
         rooms_changed = yield self.store.get_membership_changes_for_user(