summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py5
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/api/filtering.py37
-rw-r--r--synapse/app/appservice.py8
-rw-r--r--synapse/app/client_reader.py8
-rw-r--r--synapse/app/federation_reader.py8
-rw-r--r--synapse/app/federation_sender.py8
-rwxr-xr-xsynapse/app/homeserver.py9
-rw-r--r--synapse/app/media_repository.py8
-rw-r--r--synapse/app/pusher.py9
-rw-r--r--synapse/app/synchrotron.py24
-rw-r--r--synapse/crypto/keyring.py32
-rw-r--r--synapse/events/snapshot.py26
-rw-r--r--synapse/federation/transaction_queue.py5
-rw-r--r--synapse/handlers/auth.py32
-rw-r--r--synapse/handlers/device.py14
-rw-r--r--synapse/handlers/e2e_keys.py2
-rw-r--r--synapse/handlers/federation.py79
-rw-r--r--synapse/handlers/identity.py37
-rw-r--r--synapse/handlers/initial_sync.py11
-rw-r--r--synapse/handlers/presence.py95
-rw-r--r--synapse/handlers/profile.py8
-rw-r--r--synapse/handlers/receipts.py5
-rw-r--r--synapse/handlers/sync.py82
-rw-r--r--synapse/http/servlet.py10
-rw-r--r--synapse/notifier.py22
-rw-r--r--synapse/push/push_tools.py8
-rw-r--r--synapse/python_dependencies.py2
-rw-r--r--synapse/replication/resource.py4
-rw-r--r--synapse/replication/slave/storage/_slaved_id_tracker.py5
-rw-r--r--synapse/replication/slave/storage/events.py44
-rw-r--r--synapse/rest/client/v1/login.py97
-rw-r--r--synapse/rest/client/v1/presence.py3
-rw-r--r--synapse/rest/client/v1/room.py3
-rw-r--r--synapse/rest/client/v2_alpha/account.py114
-rw-r--r--synapse/rest/client/v2_alpha/register.py137
-rw-r--r--synapse/rest/client/v2_alpha/sync.py20
-rw-r--r--synapse/state.py11
-rw-r--r--synapse/storage/background_updates.py15
-rw-r--r--synapse/storage/deviceinbox.py6
-rw-r--r--synapse/storage/devices.py2
-rw-r--r--synapse/storage/event_federation.py24
-rw-r--r--synapse/storage/events.py312
-rw-r--r--synapse/storage/keys.py5
-rw-r--r--synapse/storage/roommember.py13
-rw-r--r--synapse/storage/state.py10
-rw-r--r--synapse/storage/util/id_generators.py14
-rw-r--r--synapse/util/caches/descriptors.py135
-rw-r--r--synapse/util/caches/stream_change_cache.py2
-rw-r--r--synapse/util/logcontext.py54
-rw-r--r--synapse/util/msisdn.py40
-rw-r--r--synapse/util/retryutils.py5
53 files changed, 1236 insertions, 437 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index ff251ce597..7628e7c505 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
 """ This is a reference implementation of a Matrix home server.
 """
 
-__version__ = "0.19.2"
+__version__ = "0.19.3"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 03a215ab1b..9dbc7993df 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -23,7 +23,7 @@ from synapse import event_auth
 from synapse.api.constants import EventTypes, Membership, JoinRules
 from synapse.api.errors import AuthError, Codes
 from synapse.types import UserID
-from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util import logcontext
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -209,8 +209,7 @@ class Auth(object):
                 default=[""]
             )[0]
             if user and access_token and ip_addr:
-                preserve_context_over_fn(
-                    self.store.insert_client_ip,
+                logcontext.preserve_fn(self.store.insert_client_ip)(
                     user=user,
                     access_token=access_token,
                     ip=ip_addr,
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index ca23c9c460..489efb7f86 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -44,6 +45,7 @@ class JoinRules(object):
 class LoginType(object):
     PASSWORD = u"m.login.password"
     EMAIL_IDENTITY = u"m.login.email.identity"
+    MSISDN = u"m.login.msisdn"
     RECAPTCHA = u"m.login.recaptcha"
     DUMMY = u"m.login.dummy"
 
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index fb291d7fb9..47f0cf0fa9 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from synapse.api.errors import SynapseError
+from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, RoomID
 
 from twisted.internet import defer
@@ -253,19 +254,35 @@ class Filter(object):
         Returns:
             bool: True if the event matches
         """
-        sender = event.get("sender", None)
-        if not sender:
-            # Presence events have their 'sender' in content.user_id
-            content = event.get("content")
-            # account_data has been allowed to have non-dict content, so check type first
-            if isinstance(content, dict):
-                sender = content.get("user_id")
+        # We usually get the full "events" as dictionaries coming through,
+        # except for presence which actually gets passed around as its own
+        # namedtuple type.
+        if isinstance(event, UserPresenceState):
+            sender = event.user_id
+            room_id = None
+            ev_type = "m.presence"
+            is_url = False
+        else:
+            sender = event.get("sender", None)
+            if not sender:
+                # Presence events had their 'sender' in content.user_id, but are
+                # now handled above. We don't know if anything else uses this
+                # form. TODO: Check this and probably remove it.
+                content = event.get("content")
+                # account_data has been allowed to have non-dict content, so
+                # check type first
+                if isinstance(content, dict):
+                    sender = content.get("user_id")
+
+            room_id = event.get("room_id", None)
+            ev_type = event.get("type", None)
+            is_url = "url" in event.get("content", {})
 
         return self.check_fields(
-            event.get("room_id", None),
+            room_id,
             sender,
-            event.get("type", None),
-            "url" in event.get("content", {})
+            ev_type,
+            is_url,
         )
 
     def check_fields(self, room_id, sender, event_type, contains_url):
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 83ee3e3ce3..a6f1e7594e 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -29,7 +29,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.storage.engines import create_engine
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -187,7 +187,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 7ed0de4117..a821a6ce62 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -35,7 +35,7 @@ from synapse.storage.client_ips import ClientIpStore
 from synapse.storage.engines import create_engine
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -193,7 +193,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index ca742de6b2..e52b0f240d 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -31,7 +31,7 @@ from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -184,7 +184,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 0cf5b196e6..76c4cc54d1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -35,7 +35,7 @@ from synapse.storage.engines import create_engine
 from synapse.storage.presence import UserPresenceState
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -193,7 +193,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 0b9d78c13c..2cdd2d39ff 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -52,7 +52,7 @@ from synapse.api.urls import (
 )
 from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.metrics import register_memory_metrics, get_metrics_for
 from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
 from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
@@ -456,7 +456,12 @@ def run(hs):
     def in_thread():
         # Uncomment to enable tracing of log context changes.
         # sys.settrace(logcontext_tracer)
-        with LoggingContext("run"):
+
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             change_resource_limit(hs.config.soft_file_limit)
             if hs.config.gc_thresholds:
                 gc.set_threshold(*hs.config.gc_thresholds)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index c5579d9e38..3c7984a237 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -32,7 +32,7 @@ from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -190,7 +190,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index b025db54d4..ab682e52ec 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -31,7 +31,8 @@ from synapse.storage.engines import create_engine
 from synapse.storage import DataStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+    PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
@@ -275,7 +276,11 @@ def start(config_options):
     ps.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 29f075aa5f..34e34e5580 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -20,7 +20,6 @@ from synapse.api.constants import EventTypes, PresenceState
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.events import FrozenEvent
 from synapse.handlers.presence import PresenceHandler
 from synapse.http.site import SynapseSite
 from synapse.http.server import JsonResource
@@ -48,7 +47,8 @@ from synapse.storage.presence import PresenceStore, UserPresenceState
 from synapse.storage.roommember import RoomMemberStore
 from synapse.util.async import sleep
 from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, preserve_fn, \
+    PreserveLoggingContext
 from synapse.util.manhole import manhole
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.stringutils import random_string
@@ -399,8 +399,7 @@ class SynchrotronServer(HomeServer):
                 position = row[position_index]
                 user_id = row[user_index]
 
-                rooms = yield store.get_rooms_for_user(user_id)
-                room_ids = [r.room_id for r in rooms]
+                room_ids = yield store.get_rooms_for_user(user_id)
 
                 notifier.on_new_event(
                     "device_list_key", position, rooms=room_ids,
@@ -411,11 +410,16 @@ class SynchrotronServer(HomeServer):
             stream = result.get("events")
             if stream:
                 max_position = stream["position"]
+
+                event_map = yield store.get_events([row[1] for row in stream["rows"]])
+
                 for row in stream["rows"]:
                     position = row[0]
-                    internal = json.loads(row[1])
-                    event_json = json.loads(row[2])
-                    event = FrozenEvent(event_json, internal_metadata_dict=internal)
+                    event_id = row[1]
+                    event = event_map.get(event_id, None)
+                    if not event:
+                        continue
+
                     extra_users = ()
                     if event.type == EventTypes.Member:
                         extra_users = (event.state_key,)
@@ -497,7 +501,11 @@ def start(config_options):
     ss.start_listening(config.worker_listeners)
 
     def run():
-        with LoggingContext("run"):
+        # make sure that we run the reactor with the sentinel log context,
+        # otherwise other PreserveLoggingContext instances will get confused
+        # and complain when they see the logcontext arbitrarily swapping
+        # between the sentinel and `run` logcontexts.
+        with PreserveLoggingContext():
             logger.info("Running")
             change_resource_limit(config.soft_file_limit)
             if config.gc_thresholds:
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index d7211ee9b3..0a21392a62 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -96,10 +96,11 @@ class Keyring(object):
         verify_requests = []
 
         for server_name, json_object in server_and_json:
-            logger.debug("Verifying for %s", server_name)
 
             key_ids = signature_ids(json_object, server_name)
             if not key_ids:
+                logger.warn("Request from %s: no supported signature keys",
+                            server_name)
                 deferred = defer.fail(SynapseError(
                     400,
                     "Not signed with a supported algorithm",
@@ -108,6 +109,9 @@ class Keyring(object):
             else:
                 deferred = defer.Deferred()
 
+            logger.debug("Verifying for %s with key_ids %s",
+                         server_name, key_ids)
+
             verify_request = VerifyKeyRequest(
                 server_name, key_ids, json_object, deferred
             )
@@ -142,6 +146,9 @@ class Keyring(object):
 
             json_object = verify_request.json_object
 
+            logger.debug("Got key %s %s:%s for server %s, verifying" % (
+                key_id, verify_key.alg, verify_key.version, server_name,
+            ))
             try:
                 verify_signed_json(json_object, server_name, verify_key)
             except:
@@ -231,8 +238,14 @@ class Keyring(object):
             d.addBoth(rm, server_name)
 
     def get_server_verify_keys(self, verify_requests):
-        """Takes a dict of KeyGroups and tries to find at least one key for
-        each group.
+        """Tries to find at least one key for each verify request
+
+        For each verify_request, verify_request.deferred is called back with
+        params (server_name, key_id, VerifyKey) if a key is found, or errbacked
+        with a SynapseError if none of the keys are found.
+
+        Args:
+            verify_requests (list[VerifyKeyRequest]): list of verify requests
         """
 
         # These are functions that produce keys given a list of key ids
@@ -245,8 +258,11 @@ class Keyring(object):
         @defer.inlineCallbacks
         def do_iterations():
             with Measure(self.clock, "get_server_verify_keys"):
+                # dict[str, dict[str, VerifyKey]]: results so far.
+                # map server_name -> key_id -> VerifyKey
                 merged_results = {}
 
+                # dict[str, set(str)]: keys to fetch for each server
                 missing_keys = {}
                 for verify_request in verify_requests:
                     missing_keys.setdefault(verify_request.server_name, set()).update(
@@ -308,6 +324,16 @@ class Keyring(object):
 
     @defer.inlineCallbacks
     def get_keys_from_store(self, server_name_and_key_ids):
+        """
+
+        Args:
+            server_name_and_key_ids (list[(str, iterable[str])]):
+                list of (server_name, iterable[key_id]) tuples to fetch keys for
+
+        Returns:
+            Deferred: resolves to dict[str, dict[str, VerifyKey]]: map from
+                server_name -> key_id -> VerifyKey
+        """
         res = yield preserve_context_over_deferred(defer.gatherResults(
             [
                 preserve_fn(self.store.get_server_verify_keys)(
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 11605b34a3..6be18880b9 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -15,6 +15,32 @@
 
 
 class EventContext(object):
+    """
+    Attributes:
+        current_state_ids (dict[(str, str), str]):
+            The current state map including the current event.
+            (type, state_key) -> event_id
+
+        prev_state_ids (dict[(str, str), str]):
+            The current state map excluding the current event.
+            (type, state_key) -> event_id
+
+        state_group (int): state group id
+        rejected (bool|str): A rejection reason if the event was rejected, else
+            False
+
+        push_actions (list[(str, list[object])]): list of (user_id, actions)
+            tuples
+
+        prev_group (int): Previously persisted state group. ``None`` for an
+            outlier.
+        delta_ids (dict[(str, str), str]): Delta from ``prev_group``.
+            (type, state_key) -> event_id. ``None`` for an outlier.
+
+        prev_state_events (?): XXX: is this ever set to anything other than
+            the empty list?
+    """
+
     __slots__ = [
         "current_state_ids",
         "prev_state_ids",
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 90235ff098..c802dd67a3 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -99,7 +99,12 @@ class TransactionQueue(object):
         # destination -> list of tuple(failure, deferred)
         self.pending_failures_by_dest = {}
 
+        # destination -> stream_id of last successfully sent to-device message.
+        # NB: may be a long or an int.
         self.last_device_stream_id_by_dest = {}
+
+        # destination -> stream_id of last successfully sent device list
+        # update.
         self.last_device_list_stream_id_by_dest = {}
 
         # HACK to get unique tx id
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index fffba34383..e7a1bb7246 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -47,6 +48,7 @@ class AuthHandler(BaseHandler):
             LoginType.PASSWORD: self._check_password_auth,
             LoginType.RECAPTCHA: self._check_recaptcha,
             LoginType.EMAIL_IDENTITY: self._check_email_identity,
+            LoginType.MSISDN: self._check_msisdn,
             LoginType.DUMMY: self._check_dummy_auth,
         }
         self.bcrypt_rounds = hs.config.bcrypt_rounds
@@ -307,31 +309,47 @@ class AuthHandler(BaseHandler):
                 defer.returnValue(True)
         raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
 
-    @defer.inlineCallbacks
     def _check_email_identity(self, authdict, _):
+        return self._check_threepid('email', authdict)
+
+    def _check_msisdn(self, authdict, _):
+        return self._check_threepid('msisdn', authdict)
+
+    @defer.inlineCallbacks
+    def _check_dummy_auth(self, authdict, _):
+        yield run_on_reactor()
+        defer.returnValue(True)
+
+    @defer.inlineCallbacks
+    def _check_threepid(self, medium, authdict):
         yield run_on_reactor()
 
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
         threepid_creds = authdict['threepid_creds']
+
         identity_handler = self.hs.get_handlers().identity_handler
 
-        logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,))
+        logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
         threepid = yield identity_handler.threepid_from_creds(threepid_creds)
 
         if not threepid:
             raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
 
+        if threepid['medium'] != medium:
+            raise LoginError(
+                401,
+                "Expecting threepid of type '%s', got '%s'" % (
+                    medium, threepid['medium'],
+                ),
+                errcode=Codes.UNAUTHORIZED
+            )
+
         threepid['threepid_creds'] = authdict['threepid_creds']
 
         defer.returnValue(threepid)
 
-    @defer.inlineCallbacks
-    def _check_dummy_auth(self, authdict, _):
-        yield run_on_reactor()
-        defer.returnValue(True)
-
     def _get_params_recaptcha(self):
         return {"public_key": self.hs.config.recaptcha_public_key}
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 1b007d4945..c22f65ce5d 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -248,8 +248,7 @@ class DeviceHandler(BaseHandler):
             user_id, device_ids, list(hosts)
         )
 
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        room_ids = [r.room_id for r in rooms]
+        room_ids = yield self.store.get_rooms_for_user(user_id)
 
         yield self.notifier.on_new_event(
             "device_list_key", position, rooms=room_ids,
@@ -270,8 +269,7 @@ class DeviceHandler(BaseHandler):
             user_id (str)
             from_token (StreamToken)
         """
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        room_ids = set(r.room_id for r in rooms)
+        room_ids = yield self.store.get_rooms_for_user(user_id)
 
         # First we check if any devices have changed
         changed = yield self.store.get_user_whose_devices_changed(
@@ -347,8 +345,8 @@ class DeviceHandler(BaseHandler):
     @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
         user_id = user.to_string()
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        if not rooms:
+        room_ids = yield self.store.get_rooms_for_user(user_id)
+        if not room_ids:
             # We no longer share rooms with this user, so we'll no longer
             # receive device updates. Mark this in DB.
             yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
@@ -404,8 +402,8 @@ class DeviceListEduUpdater(object):
             logger.warning("Got device list update edu for %r from %r", user_id, origin)
             return
 
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        if not rooms:
+        room_ids = yield self.store.get_rooms_for_user(user_id)
+        if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
             # probably won't get any further updates.
             return
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index e40495d1ab..c02d41a74c 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -316,7 +316,7 @@ class E2eKeysHandler(object):
         # old access_token without an associated device_id. Either way, we
         # need to double-check the device is registered to avoid ending up with
         # keys without a corresponding device.
-        self.device_handler.check_device_registered(user_id, device_id)
+        yield self.device_handler.check_device_registered(user_id, device_id)
 
         result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d0c2b4d6ed..888dd01240 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 """Contains handlers for federation events."""
+import synapse.util.logcontext
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
@@ -114,6 +115,14 @@ class FederationHandler(BaseHandler):
             logger.debug("Already seen pdu %s", pdu.event_id)
             return
 
+        # If we are currently in the process of joining this room, then we
+        # queue up events for later processing.
+        if pdu.room_id in self.room_queues:
+            logger.info("Ignoring PDU %s for room %s from %s for now; join "
+                        "in progress", pdu.event_id, pdu.room_id, origin)
+            self.room_queues[pdu.room_id].append((pdu, origin))
+            return
+
         state = None
 
         auth_chain = []
@@ -274,26 +283,13 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def _process_received_pdu(self, origin, pdu, state=None, auth_chain=None):
+    def _process_received_pdu(self, origin, pdu, state, auth_chain):
         """ Called when we have a new pdu. We need to do auth checks and put it
         through the StateHandler.
-
-        auth_chain and state are None if we already have the necessary state
-        and prev_events in the db
         """
         event = pdu
 
-        logger.debug("Got event: %s", event.event_id)
-
-        # If we are currently in the process of joining this room, then we
-        # queue up events for later processing.
-        if event.room_id in self.room_queues:
-            self.room_queues[event.room_id].append((pdu, origin))
-            return
-
-        logger.debug("Processing event: %s", event.event_id)
-
-        logger.debug("Event: %s", event)
+        logger.debug("Processing event: %s", event)
 
         # FIXME (erikj): Awful hack to make the case where we are not currently
         # in the room work
@@ -862,8 +858,6 @@ class FederationHandler(BaseHandler):
         """
         logger.debug("Joining %s to %s", joinee, room_id)
 
-        yield self.store.clean_room_for_join(room_id)
-
         origin, event = yield self._make_and_verify_event(
             target_hosts,
             room_id,
@@ -872,7 +866,15 @@ class FederationHandler(BaseHandler):
             content,
         )
 
+        # This shouldn't happen, because the RoomMemberHandler has a
+        # linearizer lock which only allows one operation per user per room
+        # at a time - so this is just paranoia.
+        assert (room_id not in self.room_queues)
+
         self.room_queues[room_id] = []
+
+        yield self.store.clean_room_for_join(room_id)
+
         handled_events = set()
 
         try:
@@ -925,18 +927,37 @@ class FederationHandler(BaseHandler):
             room_queue = self.room_queues[room_id]
             del self.room_queues[room_id]
 
-            for p, origin in room_queue:
-                if p.event_id in handled_events:
-                    continue
+            # we don't need to wait for the queued events to be processed -
+            # it's just a best-effort thing at this point. We do want to do
+            # them roughly in order, though, otherwise we'll end up making
+            # lots of requests for missing prev_events which we do actually
+            # have. Hence we fire off the deferred, but don't wait for it.
 
-                try:
-                    self._process_received_pdu(origin, p)
-                except:
-                    logger.exception("Couldn't handle pdu")
+            synapse.util.logcontext.preserve_fn(self._handle_queued_pdus)(
+                room_queue
+            )
 
         defer.returnValue(True)
 
     @defer.inlineCallbacks
+    def _handle_queued_pdus(self, room_queue):
+        """Process PDUs which got queued up while we were busy send_joining.
+
+        Args:
+            room_queue (list[FrozenEvent, str]): list of PDUs to be processed
+                and the servers that sent them
+        """
+        for p, origin in room_queue:
+            try:
+                logger.info("Processing queued PDU %s which was received "
+                            "while we were joining %s", p.event_id, p.room_id)
+                yield self.on_receive_pdu(origin, p)
+            except Exception as e:
+                logger.warn(
+                    "Error handling queued PDU %s from %s: %s",
+                    p.event_id, origin, e)
+
+    @defer.inlineCallbacks
     @log_function
     def on_make_join_request(self, room_id, user_id):
         """ We've received a /make_join/ request, so we create a partial
@@ -1517,7 +1538,17 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def _prep_event(self, origin, event, state=None, auth_events=None):
+        """
 
+        Args:
+            origin:
+            event:
+            state:
+            auth_events:
+
+        Returns:
+            Deferred, which resolves to synapse.events.snapshot.EventContext
+        """
         context = yield self.state_handler.compute_event_context(
             event, old_state=state,
         )
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 559e5d5a71..6a53c5eb47 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -150,7 +151,7 @@ class IdentityHandler(BaseHandler):
         params.update(kwargs)
 
         try:
-            data = yield self.http_client.post_urlencoded_get_json(
+            data = yield self.http_client.post_json_get_json(
                 "https://%s%s" % (
                     id_server,
                     "/_matrix/identity/api/v1/validate/email/requestToken"
@@ -161,3 +162,37 @@ class IdentityHandler(BaseHandler):
         except CodeMessageException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e
+
+    @defer.inlineCallbacks
+    def requestMsisdnToken(
+            self, id_server, country, phone_number,
+            client_secret, send_attempt, **kwargs
+    ):
+        yield run_on_reactor()
+
+        if not self._should_trust_id_server(id_server):
+            raise SynapseError(
+                400, "Untrusted ID server '%s'" % id_server,
+                Codes.SERVER_NOT_TRUSTED
+            )
+
+        params = {
+            'country': country,
+            'phone_number': phone_number,
+            'client_secret': client_secret,
+            'send_attempt': send_attempt,
+        }
+        params.update(kwargs)
+
+        try:
+            data = yield self.http_client.post_json_get_json(
+                "https://%s%s" % (
+                    id_server,
+                    "/_matrix/identity/api/v1/validate/msisdn/requestToken"
+                ),
+                params
+            )
+            defer.returnValue(data)
+        except CodeMessageException as e:
+            logger.info("Proxied requestToken failed: %r", e)
+            raise e
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index e0ade4c164..10f5f35a69 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -19,6 +19,7 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
+from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
 from synapse.types import (
     UserID, StreamToken,
@@ -225,9 +226,17 @@ class InitialSyncHandler(BaseHandler):
                 "content": content,
             })
 
+        now = self.clock.time_msec()
+
         ret = {
             "rooms": rooms_ret,
-            "presence": presence,
+            "presence": [
+                {
+                    "type": "m.presence",
+                    "content": format_user_presence_state(event, now),
+                }
+                for event in presence
+            ],
             "account_data": account_data_events,
             "receipts": receipt,
             "end": now_token.to_string(),
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index da610e430f..059260a8aa 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -29,6 +29,7 @@ from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
 from synapse.storage.presence import UserPresenceState
 
+from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -556,9 +557,9 @@ class PresenceHandler(object):
         room_ids_to_states = {}
         users_to_states = {}
         for state in states:
-            events = yield self.store.get_rooms_for_user(state.user_id)
-            for e in events:
-                room_ids_to_states.setdefault(e.room_id, []).append(state)
+            room_ids = yield self.store.get_rooms_for_user(state.user_id)
+            for room_id in room_ids:
+                room_ids_to_states.setdefault(room_id, []).append(state)
 
             plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
             for u in plist:
@@ -719,9 +720,7 @@ class PresenceHandler(object):
                 for state in updates
             ])
         else:
-            defer.returnValue([
-                format_user_presence_state(state, now) for state in updates
-            ])
+            defer.returnValue(updates)
 
     @defer.inlineCallbacks
     def set_state(self, target_user, state, ignore_status_msg=False):
@@ -795,6 +794,9 @@ class PresenceHandler(object):
             as_event=False,
         )
 
+        now = self.clock.time_msec()
+        results[:] = [format_user_presence_state(r, now) for r in results]
+
         is_accepted = {
             row["observed_user_id"]: row["accepted"] for row in presence_list
         }
@@ -847,6 +849,7 @@ class PresenceHandler(object):
             )
 
             state_dict = yield self.get_state(observed_user, as_event=False)
+            state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
 
             self.federation.send_edu(
                 destination=observer_user.domain,
@@ -910,11 +913,12 @@ class PresenceHandler(object):
     def is_visible(self, observed_user, observer_user):
         """Returns whether a user can see another user's presence.
         """
-        observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string())
-        observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string())
-
-        observer_room_ids = set(r.room_id for r in observer_rooms)
-        observed_room_ids = set(r.room_id for r in observed_rooms)
+        observer_room_ids = yield self.store.get_rooms_for_user(
+            observer_user.to_string()
+        )
+        observed_room_ids = yield self.store.get_rooms_for_user(
+            observed_user.to_string()
+        )
 
         if observer_room_ids & observed_room_ids:
             defer.returnValue(True)
@@ -979,14 +983,18 @@ def should_notify(old_state, new_state):
     return False
 
 
-def format_user_presence_state(state, now):
+def format_user_presence_state(state, now, include_user_id=True):
     """Convert UserPresenceState to a format that can be sent down to clients
     and to other servers.
+
+    The "user_id" is optional so that this function can be used to format presence
+    updates for client /sync responses and for federation /send requests.
     """
     content = {
         "presence": state.state,
-        "user_id": state.user_id,
     }
+    if include_user_id:
+        content["user_id"] = state.user_id
     if state.last_active_ts:
         content["last_active_ago"] = now - state.last_active_ts
     if state.status_msg and state.state != PresenceState.OFFLINE:
@@ -1025,7 +1033,6 @@ class PresenceEventSource(object):
         # sending down the rare duplicate is not a concern.
 
         with Measure(self.clock, "presence.get_new_events"):
-            user_id = user.to_string()
             if from_key is not None:
                 from_key = int(from_key)
 
@@ -1034,18 +1041,7 @@ class PresenceEventSource(object):
 
             max_token = self.store.get_current_presence_token()
 
-            plist = yield self.store.get_presence_list_accepted(user.localpart)
-            users_interested_in = set(row["observed_user_id"] for row in plist)
-            users_interested_in.add(user_id)  # So that we receive our own presence
-
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-                user_id
-            )
-            users_interested_in.update(users_who_share_room)
-
-            if explicit_room_id:
-                user_ids = yield self.store.get_users_in_room(explicit_room_id)
-                users_interested_in.update(user_ids)
+            users_interested_in = yield self._get_interested_in(user, explicit_room_id)
 
             user_ids_changed = set()
             changed = None
@@ -1073,16 +1069,13 @@ class PresenceEventSource(object):
 
             updates = yield presence.current_state_for_users(user_ids_changed)
 
-        now = self.clock.time_msec()
-
-        defer.returnValue(([
-            {
-                "type": "m.presence",
-                "content": format_user_presence_state(s, now),
-            }
-            for s in updates.values()
-            if include_offline or s.state != PresenceState.OFFLINE
-        ], max_token))
+        if include_offline:
+            defer.returnValue((updates.values(), max_token))
+        else:
+            defer.returnValue(([
+                s for s in updates.itervalues()
+                if s.state != PresenceState.OFFLINE
+            ], max_token))
 
     def get_current_key(self):
         return self.store.get_current_presence_token()
@@ -1090,6 +1083,31 @@ class PresenceEventSource(object):
     def get_pagination_rows(self, user, pagination_config, key):
         return self.get_new_events(user, from_key=None, include_offline=False)
 
+    @cachedInlineCallbacks(num_args=2, cache_context=True)
+    def _get_interested_in(self, user, explicit_room_id, cache_context):
+        """Returns the set of users that the given user should see presence
+        updates for
+        """
+        user_id = user.to_string()
+        plist = yield self.store.get_presence_list_accepted(
+            user.localpart, on_invalidate=cache_context.invalidate,
+        )
+        users_interested_in = set(row["observed_user_id"] for row in plist)
+        users_interested_in.add(user_id)  # So that we receive our own presence
+
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id, on_invalidate=cache_context.invalidate,
+        )
+        users_interested_in.update(users_who_share_room)
+
+        if explicit_room_id:
+            user_ids = yield self.store.get_users_in_room(
+                explicit_room_id, on_invalidate=cache_context.invalidate,
+            )
+            users_interested_in.update(user_ids)
+
+        defer.returnValue(users_interested_in)
+
 
 def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     """Checks the presence of users that have timed out and updates as
@@ -1157,7 +1175,10 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
         # If there are have been no sync for a while (and none ongoing),
         # set presence to offline
         if user_id not in syncing_user_ids:
-            if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+            # If the user has done something recently but hasn't synced,
+            # don't set them as offline.
+            sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
+            if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
                     state=PresenceState.OFFLINE,
                     status_msg=None,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 87f74dfb8e..abd1fb28cb 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -156,11 +156,11 @@ class ProfileHandler(BaseHandler):
 
         self.ratelimit(requester)
 
-        joins = yield self.store.get_rooms_for_user(
+        room_ids = yield self.store.get_rooms_for_user(
             user.to_string(),
         )
 
-        for j in joins:
+        for room_id in room_ids:
             handler = self.hs.get_handlers().room_member_handler
             try:
                 # Assume the user isn't a guest because we don't let guests set
@@ -171,12 +171,12 @@ class ProfileHandler(BaseHandler):
                 yield handler.update_membership(
                     requester,
                     user,
-                    j.room_id,
+                    room_id,
                     "join",  # We treat a profile update like a join.
                     ratelimit=False,  # Try to hide that these events aren't atomic.
                 )
             except Exception as e:
                 logger.warn(
                     "Failed to update join event for room %s - %s",
-                    j.room_id, str(e.message)
+                    room_id, str(e.message)
                 )
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 50aa513935..e1cd3a48e9 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -210,10 +210,9 @@ class ReceiptEventSource(object):
         else:
             from_key = None
 
-        rooms = yield self.store.get_rooms_for_user(user.to_string())
-        rooms = [room.room_id for room in rooms]
+        room_ids = yield self.store.get_rooms_for_user(user.to_string())
         events = yield self.store.get_linearized_receipts_for_rooms(
-            rooms,
+            room_ids,
             from_key=from_key,
             to_key=to_key,
         )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5572cb883f..c0205da1a9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.visibility import filter_events_for_client
+from synapse.types import RoomStreamToken
 
 from twisted.internet import defer
 
@@ -225,8 +226,7 @@ class SyncHandler(object):
         with Measure(self.clock, "ephemeral_by_room"):
             typing_key = since_token.typing_key if since_token else "0"
 
-            rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-            room_ids = [room.room_id for room in rooms]
+            room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())
 
             typing_source = self.event_sources.sources["typing"]
             typing, typing_key = yield typing_source.get_new_events(
@@ -568,16 +568,15 @@ class SyncHandler(object):
         since_token = sync_result_builder.since_token
 
         if since_token and since_token.device_list_key:
-            rooms = yield self.store.get_rooms_for_user(user_id)
-            room_ids = set(r.room_id for r in rooms)
+            room_ids = yield self.store.get_rooms_for_user(user_id)
 
             user_ids_changed = set()
             changed = yield self.store.get_user_whose_devices_changed(
                 since_token.device_list_key
             )
             for other_user_id in changed:
-                other_rooms = yield self.store.get_rooms_for_user(other_user_id)
-                if room_ids.intersection(e.room_id for e in other_rooms):
+                other_room_ids = yield self.store.get_rooms_for_user(other_user_id)
+                if room_ids.intersection(other_room_ids):
                     user_ids_changed.add(other_user_id)
 
             defer.returnValue(user_ids_changed)
@@ -721,14 +720,14 @@ class SyncHandler(object):
             extra_users_ids.update(users)
         extra_users_ids.discard(user.to_string())
 
-        states = yield self.presence_handler.get_states(
-            extra_users_ids,
-            as_event=True,
-        )
-        presence.extend(states)
+        if extra_users_ids:
+            states = yield self.presence_handler.get_states(
+                extra_users_ids,
+            )
+            presence.extend(states)
 
-        # Deduplicate the presence entries so that there's at most one per user
-        presence = {p["content"]["user_id"]: p for p in presence}.values()
+            # Deduplicate the presence entries so that there's at most one per user
+            presence = {p.user_id: p for p in presence}.values()
 
         presence = sync_config.filter_collection.filter_presence(
             presence
@@ -765,6 +764,21 @@ class SyncHandler(object):
             )
             sync_result_builder.now_token = now_token
 
+        # We check up front if anything has changed, if it hasn't then there is
+        # no point in going futher.
+        since_token = sync_result_builder.since_token
+        if not sync_result_builder.full_state:
+            if since_token and not ephemeral_by_room and not account_data_by_room:
+                have_changed = yield self._have_rooms_changed(sync_result_builder)
+                if not have_changed:
+                    tags_by_room = yield self.store.get_updated_tags(
+                        user_id,
+                        since_token.account_data_key,
+                    )
+                    if not tags_by_room:
+                        logger.debug("no-oping sync")
+                        defer.returnValue(([], []))
+
         ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
             "m.ignored_user_list", user_id=user_id,
         )
@@ -774,13 +788,12 @@ class SyncHandler(object):
         else:
             ignored_users = frozenset()
 
-        if sync_result_builder.since_token:
+        if since_token:
             res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
             room_entries, invited, newly_joined_rooms = res
 
             tags_by_room = yield self.store.get_updated_tags(
-                user_id,
-                sync_result_builder.since_token.account_data_key,
+                user_id, since_token.account_data_key,
             )
         else:
             res = yield self._get_all_rooms(sync_result_builder, ignored_users)
@@ -805,7 +818,7 @@ class SyncHandler(object):
 
         # Now we want to get any newly joined users
         newly_joined_users = set()
-        if sync_result_builder.since_token:
+        if since_token:
             for joined_sync in sync_result_builder.joined:
                 it = itertools.chain(
                     joined_sync.timeline.events, joined_sync.state.values()
@@ -818,6 +831,38 @@ class SyncHandler(object):
         defer.returnValue((newly_joined_rooms, newly_joined_users))
 
     @defer.inlineCallbacks
+    def _have_rooms_changed(self, sync_result_builder):
+        """Returns whether there may be any new events that should be sent down
+        the sync. Returns True if there are.
+        """
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+
+        assert since_token
+
+        # Get a list of membership change events that have happened.
+        rooms_changed = yield self.store.get_membership_changes_for_user(
+            user_id, since_token.room_key, now_token.room_key
+        )
+
+        if rooms_changed:
+            defer.returnValue(True)
+
+        app_service = self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            rooms = yield self.store.get_app_service_rooms(app_service)
+            joined_room_ids = set(r.room_id for r in rooms)
+        else:
+            joined_room_ids = yield self.store.get_rooms_for_user(user_id)
+
+        stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
+        for room_id in joined_room_ids:
+            if self.store.has_room_changed_since(room_id, stream_id):
+                defer.returnValue(True)
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
     def _get_rooms_changed(self, sync_result_builder, ignored_users):
         """Gets the the changes that have happened since the last sync.
 
@@ -841,8 +886,7 @@ class SyncHandler(object):
             rooms = yield self.store.get_app_service_rooms(app_service)
             joined_room_ids = set(r.room_id for r in rooms)
         else:
-            rooms = yield self.store.get_rooms_for_user(user_id)
-            joined_room_ids = set(r.room_id for r in rooms)
+            joined_room_ids = yield self.store.get_rooms_for_user(user_id)
 
         # Get a list of membership change events that have happened.
         rooms_changed = yield self.store.get_membership_changes_for_user(
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 8c22d6f00f..9a4c36ad5d 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -192,6 +192,16 @@ def parse_json_object_from_request(request):
     return content
 
 
+def assert_params_in_request(body, required):
+    absent = []
+    for k in required:
+        if k not in body:
+            absent.append(k)
+
+    if len(absent) > 0:
+        raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+
+
 class RestServlet(object):
 
     """ A Synapse REST Servlet.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6abb33bb3f..7eeba6d28e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
+from synapse.handlers.presence import format_user_presence_state
 
 from synapse.util import DeferredTimedOutError
 from synapse.util.logutils import log_function
@@ -37,6 +38,10 @@ metrics = synapse.metrics.get_metrics_for(__name__)
 
 notified_events_counter = metrics.register_counter("notified_events")
 
+users_woken_by_stream_counter = metrics.register_counter(
+    "users_woken_by_stream", labels=["stream"]
+)
+
 
 # TODO(paul): Should be shared somewhere
 def count(func, l):
@@ -100,6 +105,8 @@ class _NotifierUserStream(object):
         self.last_notified_ms = time_now_ms
         noify_deferred = self.notify_deferred
 
+        users_woken_by_stream_counter.inc(stream_key)
+
         with PreserveLoggingContext():
             self.notify_deferred = ObservableDeferred(defer.Deferred())
             noify_deferred.callback(self.current_token)
@@ -297,8 +304,7 @@ class Notifier(object):
         if user_stream is None:
             current_token = yield self.event_sources.get_current_token()
             if room_ids is None:
-                rooms = yield self.store.get_rooms_for_user(user_id)
-                room_ids = [room.room_id for room in rooms]
+                room_ids = yield self.store.get_rooms_for_user(user_id)
             user_stream = _NotifierUserStream(
                 user_id=user_id,
                 rooms=room_ids,
@@ -406,6 +412,15 @@ class Notifier(object):
                         new_events,
                         is_peeking=is_peeking,
                     )
+                elif name == "presence":
+                    now = self.clock.time_msec()
+                    new_events[:] = [
+                        {
+                            "type": "m.presence",
+                            "content": format_user_presence_state(event, now),
+                        }
+                        for event in new_events
+                    ]
 
                 events.extend(new_events)
                 end_token = end_token.copy_and_replace(keyname, new_key)
@@ -438,8 +453,7 @@ class Notifier(object):
 
     @defer.inlineCallbacks
     def _get_room_ids(self, user, explicit_room_id):
-        joined_rooms = yield self.store.get_rooms_for_user(user.to_string())
-        joined_room_ids = map(lambda r: r.room_id, joined_rooms)
+        joined_room_ids = yield self.store.get_rooms_for_user(user.to_string())
         if explicit_room_id:
             if explicit_room_id in joined_room_ids:
                 defer.returnValue(([explicit_room_id], True))
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index a27476bbad..287df94b4f 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -33,13 +33,13 @@ def get_badge_count(store, user_id):
 
     badge = len(invites)
 
-    for r in joins:
-        if r.room_id in my_receipts_by_room:
-            last_unread_event_id = my_receipts_by_room[r.room_id]
+    for room_id in joins:
+        if room_id in my_receipts_by_room:
+            last_unread_event_id = my_receipts_by_room[room_id]
 
             notifs = yield (
                 store.get_unread_event_push_actions_by_room_for_user(
-                    r.room_id, user_id, last_unread_event_id
+                    room_id, user_id, last_unread_event_id
                 )
             )
             # return one badge count per conversation, as count per
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 7817b0cd91..c4777b2a2b 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -1,4 +1,5 @@
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -37,6 +38,7 @@ REQUIREMENTS = {
     "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
+    "phonenumbers>=8.2.0": ["phonenumbers"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index d8eb14592b..03930fe958 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -283,12 +283,12 @@ class ReplicationResource(Resource):
 
             if request_events != upto_events_token:
                 writer.write_header_and_rows("events", res.new_forward_events, (
-                    "position", "internal", "json", "state_group"
+                    "position", "event_id", "room_id", "type", "state_key",
                 ), position=upto_events_token)
 
             if request_backfill != upto_backfill_token:
                 writer.write_header_and_rows("backfill", res.new_backfill_events, (
-                    "position", "internal", "json", "state_group",
+                    "position", "event_id", "room_id", "type", "state_key", "redacts",
                 ), position=upto_backfill_token)
 
             writer.write_header_and_rows(
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
index 24b5c79d4a..9d1d173b2f 100644
--- a/synapse/replication/slave/storage/_slaved_id_tracker.py
+++ b/synapse/replication/slave/storage/_slaved_id_tracker.py
@@ -27,4 +27,9 @@ class SlavedIdTracker(object):
         self._current = (max if self.step > 0 else min)(self._current, new_id)
 
     def get_current_token(self):
+        """
+
+        Returns:
+            int
+        """
         return self._current
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 518c9ea2e9..a1e1e54e5b 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -16,7 +16,6 @@ from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
 
 from synapse.api.constants import EventTypes
-from synapse.events import FrozenEvent
 from synapse.storage import DataStore
 from synapse.storage.roommember import RoomMemberStore
 from synapse.storage.event_federation import EventFederationStore
@@ -25,7 +24,6 @@ from synapse.storage.state import StateStore
 from synapse.storage.stream import StreamStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
-import ujson as json
 import logging
 
 
@@ -242,46 +240,32 @@ class SlavedEventStore(BaseSlavedStore):
         return super(SlavedEventStore, self).process_replication(result)
 
     def _process_replication_row(self, row, backfilled):
-        internal = json.loads(row[1])
-        event_json = json.loads(row[2])
-        event = FrozenEvent(event_json, internal_metadata_dict=internal)
+        stream_ordering = row[0] if not backfilled else -row[0]
         self.invalidate_caches_for_event(
-            event, backfilled,
+            stream_ordering, row[1], row[2], row[3], row[4], row[5],
+            backfilled=backfilled,
         )
 
-    def invalidate_caches_for_event(self, event, backfilled):
-        self._invalidate_get_event_cache(event.event_id)
+    def invalidate_caches_for_event(self, stream_ordering, event_id, room_id,
+                                    etype, state_key, redacts, backfilled):
+        self._invalidate_get_event_cache(event_id)
 
-        self.get_latest_event_ids_in_room.invalidate((event.room_id,))
+        self.get_latest_event_ids_in_room.invalidate((room_id,))
 
         self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
-            (event.room_id,)
+            (room_id,)
         )
 
         if not backfilled:
             self._events_stream_cache.entity_has_changed(
-                event.room_id, event.internal_metadata.stream_ordering
+                room_id, stream_ordering
             )
 
-        # self.get_unread_event_push_actions_by_room_for_user.invalidate_many(
-        #     (event.room_id,)
-        # )
+        if redacts:
+            self._invalidate_get_event_cache(redacts)
 
-        if event.type == EventTypes.Redaction:
-            self._invalidate_get_event_cache(event.redacts)
-
-        if event.type == EventTypes.Member:
+        if etype == EventTypes.Member:
             self._membership_stream_cache.entity_has_changed(
-                event.state_key, event.internal_metadata.stream_ordering
+                state_key, stream_ordering
             )
-            self.get_invited_rooms_for_user.invalidate((event.state_key,))
-
-        if not event.is_state():
-            return
-
-        if backfilled:
-            return
-
-        if (not event.internal_metadata.is_invite_from_remote()
-                and event.internal_metadata.is_outlier()):
-            return
+            self.get_invited_rooms_for_user.invalidate((state_key,))
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 72057f1b0c..a43410fb37 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -19,6 +19,7 @@ from synapse.api.errors import SynapseError, LoginError, Codes
 from synapse.types import UserID
 from synapse.http.server import finish_request
 from synapse.http.servlet import parse_json_object_from_request
+from synapse.util.msisdn import phone_number_to_msisdn
 
 from .base import ClientV1RestServlet, client_path_patterns
 
@@ -33,10 +34,55 @@ from saml2.client import Saml2Client
 
 import xml.etree.ElementTree as ET
 
+from twisted.web.client import PartialDownloadError
+
 
 logger = logging.getLogger(__name__)
 
 
+def login_submission_legacy_convert(submission):
+    """
+    If the input login submission is an old style object
+    (ie. with top-level user / medium / address) convert it
+    to a typed object.
+    """
+    if "user" in submission:
+        submission["identifier"] = {
+            "type": "m.id.user",
+            "user": submission["user"],
+        }
+        del submission["user"]
+
+    if "medium" in submission and "address" in submission:
+        submission["identifier"] = {
+            "type": "m.id.thirdparty",
+            "medium": submission["medium"],
+            "address": submission["address"],
+        }
+        del submission["medium"]
+        del submission["address"]
+
+
+def login_id_thirdparty_from_phone(identifier):
+    """
+    Convert a phone login identifier type to a generic threepid identifier
+    Args:
+        identifier(dict): Login identifier dict of type 'm.id.phone'
+
+    Returns: Login identifier dict of type 'm.id.threepid'
+    """
+    if "country" not in identifier or "number" not in identifier:
+        raise SynapseError(400, "Invalid phone-type identifier")
+
+    msisdn = phone_number_to_msisdn(identifier["country"], identifier["number"])
+
+    return {
+        "type": "m.id.thirdparty",
+        "medium": "msisdn",
+        "address": msisdn,
+    }
+
+
 class LoginRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns("/login$")
     PASS_TYPE = "m.login.password"
@@ -117,20 +163,52 @@ class LoginRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def do_password_login(self, login_submission):
-        if 'medium' in login_submission and 'address' in login_submission:
-            address = login_submission['address']
-            if login_submission['medium'] == 'email':
+        if "password" not in login_submission:
+            raise SynapseError(400, "Missing parameter: password")
+
+        login_submission_legacy_convert(login_submission)
+
+        if "identifier" not in login_submission:
+            raise SynapseError(400, "Missing param: identifier")
+
+        identifier = login_submission["identifier"]
+        if "type" not in identifier:
+            raise SynapseError(400, "Login identifier has no type")
+
+        # convert phone type identifiers to generic threepids
+        if identifier["type"] == "m.id.phone":
+            identifier = login_id_thirdparty_from_phone(identifier)
+
+        # convert threepid identifiers to user IDs
+        if identifier["type"] == "m.id.thirdparty":
+            if 'medium' not in identifier or 'address' not in identifier:
+                raise SynapseError(400, "Invalid thirdparty identifier")
+
+            address = identifier['address']
+            if identifier['medium'] == 'email':
                 # For emails, transform the address to lowercase.
                 # We store all email addreses as lowercase in the DB.
                 # (See add_threepid in synapse/handlers/auth.py)
                 address = address.lower()
             user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
-                login_submission['medium'], address
+                identifier['medium'], address
             )
             if not user_id:
                 raise LoginError(403, "", errcode=Codes.FORBIDDEN)
-        else:
-            user_id = login_submission['user']
+
+            identifier = {
+                "type": "m.id.user",
+                "user": user_id,
+            }
+
+        # by this point, the identifier should be an m.id.user: if it's anything
+        # else, we haven't understood it.
+        if identifier["type"] != "m.id.user":
+            raise SynapseError(400, "Unknown login identifier type")
+        if "user" not in identifier:
+            raise SynapseError(400, "User identifier is missing 'user' key")
+
+        user_id = identifier["user"]
 
         if not user_id.startswith('@'):
             user_id = UserID.create(
@@ -341,7 +419,12 @@ class CasTicketServlet(ClientV1RestServlet):
             "ticket": request.args["ticket"],
             "service": self.cas_service_url
         }
-        body = yield http_client.get_raw(uri, args)
+        try:
+            body = yield http_client.get_raw(uri, args)
+        except PartialDownloadError as pde:
+            # Twisted raises this error if the connection is closed,
+            # even if that's being used old-http style to signal end-of-data
+            body = pde.response
         result = yield self.handle_cas_response(request, body, client_redirect_url)
         defer.returnValue(result)
 
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index eafdce865e..47b2dc45e7 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError
 from synapse.types import UserID
+from synapse.handlers.presence import format_user_presence_state
 from synapse.http.servlet import parse_json_object_from_request
 from .base import ClientV1RestServlet, client_path_patterns
 
@@ -33,6 +34,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
     def __init__(self, hs):
         super(PresenceStatusRestServlet, self).__init__(hs)
         self.presence_handler = hs.get_presence_handler()
+        self.clock = hs.get_clock()
 
     @defer.inlineCallbacks
     def on_GET(self, request, user_id):
@@ -48,6 +50,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
                 raise AuthError(403, "You are not allowed to see their presence.")
 
         state = yield self.presence_handler.get_state(target_user=user)
+        state = format_user_presence_state(state, self.clock.time_msec())
 
         defer.returnValue((200, state))
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 90242a6bac..0bdd6b5b36 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -748,8 +748,7 @@ class JoinedRoomsRestServlet(ClientV1RestServlet):
     def on_GET(self, request):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
 
-        rooms = yield self.store.get_rooms_for_user(requester.user.to_string())
-        room_ids = set(r.room_id for r in rooms)  # Ensure they're unique.
+        room_ids = yield self.store.get_rooms_for_user(requester.user.to_string())
         defer.returnValue((200, {"joined_rooms": list(room_ids)}))
 
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 398e7f5eb0..aac76edf1c 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -17,8 +18,11 @@ from twisted.internet import defer
 
 from synapse.api.constants import LoginType
 from synapse.api.errors import LoginError, SynapseError, Codes
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import (
+    RestServlet, parse_json_object_from_request, assert_params_in_request
+)
 from synapse.util.async import run_on_reactor
+from synapse.util.msisdn import phone_number_to_msisdn
 
 from ._base import client_v2_patterns
 
@@ -28,11 +32,11 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class PasswordRequestTokenRestServlet(RestServlet):
+class EmailPasswordRequestTokenRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/account/password/email/requestToken$")
 
     def __init__(self, hs):
-        super(PasswordRequestTokenRestServlet, self).__init__()
+        super(EmailPasswordRequestTokenRestServlet, self).__init__()
         self.hs = hs
         self.identity_handler = hs.get_handlers().identity_handler
 
@@ -40,14 +44,9 @@ class PasswordRequestTokenRestServlet(RestServlet):
     def on_POST(self, request):
         body = parse_json_object_from_request(request)
 
-        required = ['id_server', 'client_secret', 'email', 'send_attempt']
-        absent = []
-        for k in required:
-            if k not in body:
-                absent.append(k)
-
-        if absent:
-            raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+        assert_params_in_request(body, [
+            'id_server', 'client_secret', 'email', 'send_attempt'
+        ])
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             'email', body['email']
@@ -60,6 +59,37 @@ class PasswordRequestTokenRestServlet(RestServlet):
         defer.returnValue((200, ret))
 
 
+class MsisdnPasswordRequestTokenRestServlet(RestServlet):
+    PATTERNS = client_v2_patterns("/account/password/msisdn/requestToken$")
+
+    def __init__(self, hs):
+        super(MsisdnPasswordRequestTokenRestServlet, self).__init__()
+        self.hs = hs
+        self.datastore = self.hs.get_datastore()
+        self.identity_handler = hs.get_handlers().identity_handler
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        body = parse_json_object_from_request(request)
+
+        assert_params_in_request(body, [
+            'id_server', 'client_secret',
+            'country', 'phone_number', 'send_attempt',
+        ])
+
+        msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+
+        existingUid = yield self.datastore.get_user_id_by_threepid(
+            'msisdn', msisdn
+        )
+
+        if existingUid is None:
+            raise SynapseError(400, "MSISDN not found", Codes.THREEPID_NOT_FOUND)
+
+        ret = yield self.identity_handler.requestMsisdnToken(**body)
+        defer.returnValue((200, ret))
+
+
 class PasswordRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/account/password$")
 
@@ -68,6 +98,7 @@ class PasswordRestServlet(RestServlet):
         self.hs = hs
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
+        self.datastore = self.hs.get_datastore()
 
     @defer.inlineCallbacks
     def on_POST(self, request):
@@ -77,7 +108,8 @@ class PasswordRestServlet(RestServlet):
 
         authed, result, params, _ = yield self.auth_handler.check_auth([
             [LoginType.PASSWORD],
-            [LoginType.EMAIL_IDENTITY]
+            [LoginType.EMAIL_IDENTITY],
+            [LoginType.MSISDN],
         ], body, self.hs.get_ip_from_request(request))
 
         if not authed:
@@ -102,7 +134,7 @@ class PasswordRestServlet(RestServlet):
                 # (See add_threepid in synapse/handlers/auth.py)
                 threepid['address'] = threepid['address'].lower()
             # if using email, we must know about the email they're authing with!
-            threepid_user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
+            threepid_user_id = yield self.datastore.get_user_id_by_threepid(
                 threepid['medium'], threepid['address']
             )
             if not threepid_user_id:
@@ -169,13 +201,14 @@ class DeactivateAccountRestServlet(RestServlet):
         defer.returnValue((200, {}))
 
 
-class ThreepidRequestTokenRestServlet(RestServlet):
+class EmailThreepidRequestTokenRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/account/3pid/email/requestToken$")
 
     def __init__(self, hs):
         self.hs = hs
-        super(ThreepidRequestTokenRestServlet, self).__init__()
+        super(EmailThreepidRequestTokenRestServlet, self).__init__()
         self.identity_handler = hs.get_handlers().identity_handler
+        self.datastore = self.hs.get_datastore()
 
     @defer.inlineCallbacks
     def on_POST(self, request):
@@ -190,7 +223,7 @@ class ThreepidRequestTokenRestServlet(RestServlet):
         if absent:
             raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
 
-        existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
+        existingUid = yield self.datastore.get_user_id_by_threepid(
             'email', body['email']
         )
 
@@ -201,6 +234,44 @@ class ThreepidRequestTokenRestServlet(RestServlet):
         defer.returnValue((200, ret))
 
 
+class MsisdnThreepidRequestTokenRestServlet(RestServlet):
+    PATTERNS = client_v2_patterns("/account/3pid/msisdn/requestToken$")
+
+    def __init__(self, hs):
+        self.hs = hs
+        super(MsisdnThreepidRequestTokenRestServlet, self).__init__()
+        self.identity_handler = hs.get_handlers().identity_handler
+        self.datastore = self.hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        body = parse_json_object_from_request(request)
+
+        required = [
+            'id_server', 'client_secret',
+            'country', 'phone_number', 'send_attempt',
+        ]
+        absent = []
+        for k in required:
+            if k not in body:
+                absent.append(k)
+
+        if absent:
+            raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+
+        msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+
+        existingUid = yield self.datastore.get_user_id_by_threepid(
+            'msisdn', msisdn
+        )
+
+        if existingUid is not None:
+            raise SynapseError(400, "MSISDN is already in use", Codes.THREEPID_IN_USE)
+
+        ret = yield self.identity_handler.requestEmailToken(**body)
+        defer.returnValue((200, ret))
+
+
 class ThreepidRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/account/3pid$")
 
@@ -210,6 +281,7 @@ class ThreepidRestServlet(RestServlet):
         self.identity_handler = hs.get_handlers().identity_handler
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
+        self.datastore = self.hs.get_datastore()
 
     @defer.inlineCallbacks
     def on_GET(self, request):
@@ -217,7 +289,7 @@ class ThreepidRestServlet(RestServlet):
 
         requester = yield self.auth.get_user_by_req(request)
 
-        threepids = yield self.hs.get_datastore().user_get_threepids(
+        threepids = yield self.datastore.user_get_threepids(
             requester.user.to_string()
         )
 
@@ -258,7 +330,7 @@ class ThreepidRestServlet(RestServlet):
 
         if 'bind' in body and body['bind']:
             logger.debug(
-                "Binding emails %s to %s",
+                "Binding threepid %s to %s",
                 threepid, user_id
             )
             yield self.identity_handler.bind_threepid(
@@ -302,9 +374,11 @@ class ThreepidDeleteRestServlet(RestServlet):
 
 
 def register_servlets(hs, http_server):
-    PasswordRequestTokenRestServlet(hs).register(http_server)
+    EmailPasswordRequestTokenRestServlet(hs).register(http_server)
+    MsisdnPasswordRequestTokenRestServlet(hs).register(http_server)
     PasswordRestServlet(hs).register(http_server)
     DeactivateAccountRestServlet(hs).register(http_server)
-    ThreepidRequestTokenRestServlet(hs).register(http_server)
+    EmailThreepidRequestTokenRestServlet(hs).register(http_server)
+    MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
     ThreepidRestServlet(hs).register(http_server)
     ThreepidDeleteRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index ccca5a12d5..dcd13b876f 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -19,7 +20,10 @@ import synapse
 from synapse.api.auth import get_access_token_from_request, has_access_token
 from synapse.api.constants import LoginType
 from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import (
+    RestServlet, parse_json_object_from_request, assert_params_in_request
+)
+from synapse.util.msisdn import phone_number_to_msisdn
 
 from ._base import client_v2_patterns
 
@@ -43,7 +47,7 @@ else:
 logger = logging.getLogger(__name__)
 
 
-class RegisterRequestTokenRestServlet(RestServlet):
+class EmailRegisterRequestTokenRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/register/email/requestToken$")
 
     def __init__(self, hs):
@@ -51,7 +55,7 @@ class RegisterRequestTokenRestServlet(RestServlet):
         Args:
             hs (synapse.server.HomeServer): server
         """
-        super(RegisterRequestTokenRestServlet, self).__init__()
+        super(EmailRegisterRequestTokenRestServlet, self).__init__()
         self.hs = hs
         self.identity_handler = hs.get_handlers().identity_handler
 
@@ -59,14 +63,9 @@ class RegisterRequestTokenRestServlet(RestServlet):
     def on_POST(self, request):
         body = parse_json_object_from_request(request)
 
-        required = ['id_server', 'client_secret', 'email', 'send_attempt']
-        absent = []
-        for k in required:
-            if k not in body:
-                absent.append(k)
-
-        if len(absent) > 0:
-            raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+        assert_params_in_request(body, [
+            'id_server', 'client_secret', 'email', 'send_attempt'
+        ])
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             'email', body['email']
@@ -79,6 +78,43 @@ class RegisterRequestTokenRestServlet(RestServlet):
         defer.returnValue((200, ret))
 
 
+class MsisdnRegisterRequestTokenRestServlet(RestServlet):
+    PATTERNS = client_v2_patterns("/register/msisdn/requestToken$")
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        super(MsisdnRegisterRequestTokenRestServlet, self).__init__()
+        self.hs = hs
+        self.identity_handler = hs.get_handlers().identity_handler
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        body = parse_json_object_from_request(request)
+
+        assert_params_in_request(body, [
+            'id_server', 'client_secret',
+            'country', 'phone_number',
+            'send_attempt',
+        ])
+
+        msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+
+        existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
+            'msisdn', msisdn
+        )
+
+        if existingUid is not None:
+            raise SynapseError(
+                400, "Phone number is already in use", Codes.THREEPID_IN_USE
+            )
+
+        ret = yield self.identity_handler.requestMsisdnToken(**body)
+        defer.returnValue((200, ret))
+
+
 class RegisterRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/register$")
 
@@ -200,16 +236,37 @@ class RegisterRestServlet(RestServlet):
                 assigned_user_id=registered_user_id,
             )
 
+        # Only give msisdn flows if the x_show_msisdn flag is given:
+        # this is a hack to work around the fact that clients were shipped
+        # that use fallback registration if they see any flows that they don't
+        # recognise, which means we break registration for these clients if we
+        # advertise msisdn flows. Once usage of Riot iOS <=0.3.9 and Riot
+        # Android <=0.6.9 have fallen below an acceptable threshold, this
+        # parameter should go away and we should always advertise msisdn flows.
+        show_msisdn = False
+        if 'x_show_msisdn' in body and body['x_show_msisdn']:
+            show_msisdn = True
+
         if self.hs.config.enable_registration_captcha:
             flows = [
                 [LoginType.RECAPTCHA],
-                [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]
+                [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
             ]
+            if show_msisdn:
+                flows.extend([
+                    [LoginType.MSISDN, LoginType.RECAPTCHA],
+                    [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
+                ])
         else:
             flows = [
                 [LoginType.DUMMY],
-                [LoginType.EMAIL_IDENTITY]
+                [LoginType.EMAIL_IDENTITY],
             ]
+            if show_msisdn:
+                flows.extend([
+                    [LoginType.MSISDN],
+                    [LoginType.MSISDN, LoginType.EMAIL_IDENTITY],
+                ])
 
         authed, auth_result, params, session_id = yield self.auth_handler.check_auth(
             flows, body, self.hs.get_ip_from_request(request)
@@ -224,8 +281,9 @@ class RegisterRestServlet(RestServlet):
                 "Already registered user ID %r for this session",
                 registered_user_id
             )
-            # don't re-register the email address
+            # don't re-register the threepids
             add_email = False
+            add_msisdn = False
         else:
             # NB: This may be from the auth handler and NOT from the POST
             if 'password' not in params:
@@ -250,6 +308,7 @@ class RegisterRestServlet(RestServlet):
             )
 
             add_email = True
+            add_msisdn = True
 
         return_dict = yield self._create_registration_details(
             registered_user_id, params
@@ -262,6 +321,13 @@ class RegisterRestServlet(RestServlet):
                 params.get("bind_email")
             )
 
+        if add_msisdn and auth_result and LoginType.MSISDN in auth_result:
+            threepid = auth_result[LoginType.MSISDN]
+            yield self._register_msisdn_threepid(
+                registered_user_id, threepid, return_dict["access_token"],
+                params.get("bind_msisdn")
+            )
+
         defer.returnValue((200, return_dict))
 
     def on_OPTIONS(self, _):
@@ -323,8 +389,9 @@ class RegisterRestServlet(RestServlet):
         """
         reqd = ('medium', 'address', 'validated_at')
         if any(x not in threepid for x in reqd):
+            # This will only happen if the ID server returns a malformed response
             logger.info("Can't add incomplete 3pid")
-            defer.returnValue()
+            return
 
         yield self.auth_handler.add_threepid(
             user_id,
@@ -372,6 +439,43 @@ class RegisterRestServlet(RestServlet):
             logger.info("bind_email not specified: not binding email")
 
     @defer.inlineCallbacks
+    def _register_msisdn_threepid(self, user_id, threepid, token, bind_msisdn):
+        """Add a phone number as a 3pid identifier
+
+        Also optionally binds msisdn to the given user_id on the identity server
+
+        Args:
+            user_id (str): id of user
+            threepid (object): m.login.msisdn auth response
+            token (str): access_token for the user
+            bind_email (bool): true if the client requested the email to be
+                bound at the identity server
+        Returns:
+            defer.Deferred:
+        """
+        reqd = ('medium', 'address', 'validated_at')
+        if any(x not in threepid for x in reqd):
+            # This will only happen if the ID server returns a malformed response
+            logger.info("Can't add incomplete 3pid")
+            defer.returnValue()
+
+        yield self.auth_handler.add_threepid(
+            user_id,
+            threepid['medium'],
+            threepid['address'],
+            threepid['validated_at'],
+        )
+
+        if bind_msisdn:
+            logger.info("bind_msisdn specified: binding")
+            logger.debug("Binding msisdn %s to %s", threepid, user_id)
+            yield self.identity_handler.bind_threepid(
+                threepid['threepid_creds'], user_id
+            )
+        else:
+            logger.info("bind_msisdn not specified: not binding msisdn")
+
+    @defer.inlineCallbacks
     def _create_registration_details(self, user_id, params):
         """Complete registration of newly-registered user
 
@@ -449,5 +553,6 @@ class RegisterRestServlet(RestServlet):
 
 
 def register_servlets(hs, http_server):
-    RegisterRequestTokenRestServlet(hs).register(http_server)
+    EmailRegisterRequestTokenRestServlet(hs).register(http_server)
+    MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
     RegisterRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index b3d8001638..a7a9e0a794 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.http.servlet import (
     RestServlet, parse_string, parse_integer, parse_boolean
 )
+from synapse.handlers.presence import format_user_presence_state
 from synapse.handlers.sync import SyncConfig
 from synapse.types import StreamToken
 from synapse.events.utils import (
@@ -28,7 +29,6 @@ from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
 from ._base import client_v2_patterns
 
-import copy
 import itertools
 import logging
 
@@ -194,12 +194,18 @@ class SyncRestServlet(RestServlet):
         defer.returnValue((200, response_content))
 
     def encode_presence(self, events, time_now):
-        formatted = []
-        for event in events:
-            event = copy.deepcopy(event)
-            event['sender'] = event['content'].pop('user_id')
-            formatted.append(event)
-        return {"events": formatted}
+        return {
+            "events": [
+                {
+                    "type": "m.presence",
+                    "sender": event.user_id,
+                    "content": format_user_presence_state(
+                        event, time_now, include_user_id=False
+                    ),
+                }
+                for event in events
+            ]
+        }
 
     def encode_joined(self, rooms, time_now, token_id, event_fields):
         """
diff --git a/synapse/state.py b/synapse/state.py
index 383d32b163..9a523a1b89 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -177,17 +177,12 @@ class StateHandler(object):
 
     @defer.inlineCallbacks
     def compute_event_context(self, event, old_state=None):
-        """ Fills out the context with the `current state` of the graph. The
-        `current state` here is defined to be the state of the event graph
-        just before the event - i.e. it never includes `event`
-
-        If `event` has `auth_events` then this will also fill out the
-        `auth_events` field on `context` from the `current_state`.
+        """Build an EventContext structure for the event.
 
         Args:
-            event (EventBase)
+            event (synapse.events.EventBase):
         Returns:
-            an EventContext
+            synapse.events.snapshot.EventContext:
         """
         context = EventContext()
 
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 94b2bcc54a..813ad59e56 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import synapse.util.async
 
 from ._base import SQLBaseStore
 from . import engines
@@ -84,24 +85,14 @@ class BackgroundUpdateStore(SQLBaseStore):
         self._background_update_performance = {}
         self._background_update_queue = []
         self._background_update_handlers = {}
-        self._background_update_timer = None
 
     @defer.inlineCallbacks
     def start_doing_background_updates(self):
-        assert self._background_update_timer is None, \
-            "background updates already running"
-
         logger.info("Starting background schema updates")
 
         while True:
-            sleep = defer.Deferred()
-            self._background_update_timer = self._clock.call_later(
-                self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
-            )
-            try:
-                yield sleep
-            finally:
-                self._background_update_timer = None
+            yield synapse.util.async.sleep(
+                self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
 
             try:
                 result = yield self.do_next_background_update(
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 5c7db5e5f6..7925cb5f1b 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -357,12 +357,12 @@ class DeviceInboxStore(BackgroundUpdateStore):
         """
         Args:
             destination(str): The name of the remote server.
-            last_stream_id(int): The last position of the device message stream
+            last_stream_id(int|long): The last position of the device message stream
                 that the server sent up to.
-            current_stream_id(int): The current position of the device
+            current_stream_id(int|long): The current position of the device
                 message stream.
         Returns:
-            Deferred ([dict], int): List of messages for the device and where
+            Deferred ([dict], int|long): List of messages for the device and where
                 in the stream the messages got to.
         """
 
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 563071b7a9..e545b62e39 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -308,7 +308,7 @@ class DeviceStore(SQLBaseStore):
         """Get stream of updates to send to remote servers
 
         Returns:
-            (now_stream_id, [ { updates }, .. ])
+            (int, list[dict]): current stream id and list of updates
         """
         now_stream_id = self._device_list_id_gen.get_current_token()
 
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 256e50dc20..0d97de2fe7 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -201,19 +201,19 @@ class EventFederationStore(SQLBaseStore):
     def _update_min_depth_for_room_txn(self, txn, room_id, depth):
         min_depth = self._get_min_depth_interaction(txn, room_id)
 
-        do_insert = depth < min_depth if min_depth else True
+        if min_depth and depth >= min_depth:
+            return
 
-        if do_insert:
-            self._simple_upsert_txn(
-                txn,
-                table="room_depth",
-                keyvalues={
-                    "room_id": room_id,
-                },
-                values={
-                    "min_depth": depth,
-                },
-            )
+        self._simple_upsert_txn(
+            txn,
+            table="room_depth",
+            keyvalues={
+                "room_id": room_id,
+            },
+            values={
+                "min_depth": depth,
+            },
+        )
 
     def _handle_mult_prev_events(self, txn, events):
         """
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 72319c35ae..3c8393bfe8 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,14 +34,16 @@ from canonicaljson import encode_canonical_json
 from collections import deque, namedtuple, OrderedDict
 from functools import wraps
 
-import synapse
 import synapse.metrics
 
-
 import logging
 import math
 import ujson as json
 
+# these are only included to make the type annotations work
+from synapse.events import EventBase    # noqa: F401
+from synapse.events.snapshot import EventContext   # noqa: F401
+
 logger = logging.getLogger(__name__)
 
 
@@ -82,6 +84,11 @@ class _EventPeristenceQueue(object):
 
     def add_to_queue(self, room_id, events_and_contexts, backfilled):
         """Add events to the queue, with the given persist_event options.
+
+        Args:
+            room_id (str):
+            events_and_contexts (list[(EventBase, EventContext)]):
+            backfilled (bool):
         """
         queue = self._event_persist_queues.setdefault(room_id, deque())
         if queue:
@@ -227,6 +234,17 @@ class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
     @log_function
     def persist_event(self, event, context, backfilled=False):
+        """
+
+        Args:
+            event (EventBase):
+            context (EventContext):
+            backfilled (bool):
+
+        Returns:
+            Deferred: resolves to (int, int): the stream ordering of ``event``,
+            and the stream ordering of the latest persisted event
+        """
         deferred = self._event_persist_queue.add_to_queue(
             event.room_id, [(event, context)],
             backfilled=backfilled,
@@ -253,6 +271,16 @@ class EventsStore(SQLBaseStore):
     @defer.inlineCallbacks
     def _persist_events(self, events_and_contexts, backfilled=False,
                         delete_existing=False):
+        """Persist events to db
+
+        Args:
+            events_and_contexts (list[(EventBase, EventContext)]):
+            backfilled (bool):
+            delete_existing (bool):
+
+        Returns:
+            Deferred: resolves when the events have been persisted
+        """
         if not events_and_contexts:
             return
 
@@ -554,11 +582,91 @@ class EventsStore(SQLBaseStore):
         and the rejections table. Things reading from those table will need to check
         whether the event was rejected.
 
-        If delete_existing is True then existing events will be purged from the
-        database before insertion. This is useful when retrying due to IntegrityError.
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]):
+                events to persist
+            backfilled (bool): True if the events were backfilled
+            delete_existing (bool): True to purge existing table rows for the
+                events from the database. This is useful when retrying due to
+                IntegrityError.
+            current_state_for_room (dict[str, (list[str], list[str])]):
+                The current-state delta for each room. For each room, a tuple
+                (to_delete, to_insert), being a list of event ids to be removed
+                from the current state, and a list of event ids to be added to
+                the current state.
+            new_forward_extremeties (dict[str, list[str]]):
+                The new forward extremities for each room. For each room, a
+                list of the event ids which are the forward extremities.
+
         """
+        self._update_current_state_txn(txn, current_state_for_room)
+
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
-        for room_id, current_state_tuple in current_state_for_room.iteritems():
+        self._update_forward_extremities_txn(
+            txn,
+            new_forward_extremities=new_forward_extremeties,
+            max_stream_order=max_stream_order,
+        )
+
+        # Ensure that we don't have the same event twice.
+        events_and_contexts = self._filter_events_and_contexts_for_duplicates(
+            events_and_contexts,
+        )
+
+        self._update_room_depths_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+            backfilled=backfilled,
+        )
+
+        # _update_outliers_txn filters out any events which have already been
+        # persisted, and returns the filtered list.
+        events_and_contexts = self._update_outliers_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+        )
+
+        # From this point onwards the events are only events that we haven't
+        # seen before.
+
+        if delete_existing:
+            # For paranoia reasons, we go and delete all the existing entries
+            # for these events so we can reinsert them.
+            # This gets around any problems with some tables already having
+            # entries.
+            self._delete_existing_rows_txn(
+                txn,
+                events_and_contexts=events_and_contexts,
+            )
+
+        self._store_event_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+        )
+
+        # Insert into the state_groups, state_groups_state, and
+        # event_to_state_groups tables.
+        self._store_mult_state_groups_txn(txn, events_and_contexts)
+
+        # _store_rejected_events_txn filters out any events which were
+        # rejected, and returns the filtered list.
+        events_and_contexts = self._store_rejected_events_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+        )
+
+        # From this point onwards the events are only ones that weren't
+        # rejected.
+
+        self._update_metadata_tables_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+            backfilled=backfilled,
+        )
+
+    def _update_current_state_txn(self, txn, state_delta_by_room):
+        for room_id, current_state_tuple in state_delta_by_room.iteritems():
                 to_delete, to_insert = current_state_tuple
                 txn.executemany(
                     "DELETE FROM current_state_events WHERE event_id = ?",
@@ -608,7 +716,9 @@ class EventsStore(SQLBaseStore):
                     txn, self.get_current_state_ids, (room_id,)
                 )
 
-        for room_id, new_extrem in new_forward_extremeties.items():
+    def _update_forward_extremities_txn(self, txn, new_forward_extremities,
+                                        max_stream_order):
+        for room_id, new_extrem in new_forward_extremities.items():
             self._simple_delete_txn(
                 txn,
                 table="event_forward_extremities",
@@ -626,7 +736,7 @@ class EventsStore(SQLBaseStore):
                     "event_id": ev_id,
                     "room_id": room_id,
                 }
-                for room_id, new_extrem in new_forward_extremeties.items()
+                for room_id, new_extrem in new_forward_extremities.items()
                 for ev_id in new_extrem
             ],
         )
@@ -643,13 +753,22 @@ class EventsStore(SQLBaseStore):
                     "event_id": event_id,
                     "stream_ordering": max_stream_order,
                 }
-                for room_id, new_extrem in new_forward_extremeties.items()
+                for room_id, new_extrem in new_forward_extremities.items()
                 for event_id in new_extrem
             ]
         )
 
-        # Ensure that we don't have the same event twice.
-        # Pick the earliest non-outlier if there is one, else the earliest one.
+    @classmethod
+    def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
+        """Ensure that we don't have the same event twice.
+
+        Pick the earliest non-outlier if there is one, else the earliest one.
+
+        Args:
+            events_and_contexts (list[(EventBase, EventContext)]):
+        Returns:
+            list[(EventBase, EventContext)]: filtered list
+        """
         new_events_and_contexts = OrderedDict()
         for event, context in events_and_contexts:
             prev_event_context = new_events_and_contexts.get(event.event_id)
@@ -662,9 +781,17 @@ class EventsStore(SQLBaseStore):
                         new_events_and_contexts[event.event_id] = (event, context)
             else:
                 new_events_and_contexts[event.event_id] = (event, context)
+        return new_events_and_contexts.values()
 
-        events_and_contexts = new_events_and_contexts.values()
+    def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
+        """Update min_depth for each room
 
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+            backfilled (bool): True if the events were backfilled
+        """
         depth_updates = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
@@ -683,6 +810,21 @@ class EventsStore(SQLBaseStore):
         for room_id, depth in depth_updates.items():
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
+    def _update_outliers_txn(self, txn, events_and_contexts):
+        """Update any outliers with new event info.
+
+        This turns outliers into ex-outliers (unless the new event was
+        rejected).
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+
+        Returns:
+            list[(EventBase, EventContext)] new list, without events which
+            are already in the events table.
+        """
         txn.execute(
             "SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
                 ",".join(["?"] * len(events_and_contexts)),
@@ -697,19 +839,16 @@ class EventsStore(SQLBaseStore):
 
         to_remove = set()
         for event, context in events_and_contexts:
-            if context.rejected:
-                # If the event is rejected then we don't care if the event
-                # was an outlier or not.
-                if event.event_id in have_persisted:
-                    # If we have already seen the event then ignore it.
-                    to_remove.add(event)
-                continue
-
             if event.event_id not in have_persisted:
                 continue
 
             to_remove.add(event)
 
+            if context.rejected:
+                # If the event is rejected then we don't care if the event
+                # was an outlier or not.
+                continue
+
             outlier_persisted = have_persisted[event.event_id]
             if not event.internal_metadata.is_outlier() and outlier_persisted:
                 # We received a copy of an event that we had already stored as
@@ -764,37 +903,19 @@ class EventsStore(SQLBaseStore):
                 # event isn't an outlier any more.
                 self._update_backward_extremeties(txn, [event])
 
-        events_and_contexts = [
+        return [
             ec for ec in events_and_contexts if ec[0] not in to_remove
         ]
 
+    @classmethod
+    def _delete_existing_rows_txn(cls, txn, events_and_contexts):
         if not events_and_contexts:
-            # Make sure we don't pass an empty list to functions that expect to
-            # be storing at least one element.
+            # nothing to do here
             return
 
-        # From this point onwards the events are only events that we haven't
-        # seen before.
-
-        def event_dict(event):
-            return {
-                k: v
-                for k, v in event.get_dict().items()
-                if k not in [
-                    "redacted",
-                    "redacted_because",
-                ]
-            }
-
-        if delete_existing:
-            # For paranoia reasons, we go and delete all the existing entries
-            # for these events so we can reinsert them.
-            # This gets around any problems with some tables already having
-            # entries.
-
-            logger.info("Deleting existing")
+        logger.info("Deleting existing")
 
-            for table in (
+        for table in (
                 "events",
                 "event_auth",
                 "event_json",
@@ -817,11 +938,34 @@ class EventsStore(SQLBaseStore):
                 "redactions",
                 "room_memberships",
                 "topics"
-            ):
-                txn.executemany(
-                    "DELETE FROM %s WHERE event_id = ?" % (table,),
-                    [(ev.event_id,) for ev, _ in events_and_contexts]
-                )
+        ):
+            txn.executemany(
+                "DELETE FROM %s WHERE event_id = ?" % (table,),
+                [(ev.event_id,) for ev, _ in events_and_contexts]
+            )
+
+    def _store_event_txn(self, txn, events_and_contexts):
+        """Insert new events into the event and event_json tables
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+        """
+
+        if not events_and_contexts:
+            # nothing to do here
+            return
+
+        def event_dict(event):
+            return {
+                k: v
+                for k, v in event.get_dict().items()
+                if k not in [
+                    "redacted",
+                    "redacted_because",
+                ]
+            }
 
         self._simple_insert_many_txn(
             txn,
@@ -865,6 +1009,19 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
+    def _store_rejected_events_txn(self, txn, events_and_contexts):
+        """Add rows to the 'rejections' table for received events which were
+        rejected
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+
+        Returns:
+            list[(EventBase, EventContext)] new list, without the rejected
+                events.
+        """
         # Remove the rejected events from the list now that we've added them
         # to the events table and the events_json table.
         to_remove = set()
@@ -876,17 +1033,24 @@ class EventsStore(SQLBaseStore):
                 )
                 to_remove.add(event)
 
-        events_and_contexts = [
+        return [
             ec for ec in events_and_contexts if ec[0] not in to_remove
         ]
 
+    def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
+        """Update all the miscellaneous tables for new events
+
+        Args:
+            txn (twisted.enterprise.adbapi.Connection): db connection
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+            backfilled (bool): True if the events were backfilled
+        """
+
         if not events_and_contexts:
-            # Make sure we don't pass an empty list to functions that expect to
-            # be storing at least one element.
+            # nothing to do here
             return
 
-        # From this point onwards the events are only ones that weren't rejected.
-
         for event, context in events_and_contexts:
             # Insert all the push actions into the event_push_actions table.
             if context.push_actions:
@@ -915,10 +1079,6 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
-        # Insert into the state_groups, state_groups_state, and
-        # event_to_state_groups tables.
-        self._store_mult_state_groups_txn(txn, events_and_contexts)
-
         # Update the event_forward_extremities, event_backward_extremities and
         # event_edges tables.
         self._handle_mult_prev_events(
@@ -1005,13 +1165,6 @@ class EventsStore(SQLBaseStore):
         # Prefill the event cache
         self._add_to_cache(txn, events_and_contexts)
 
-        if backfilled:
-            # Backfilled events come before the current state so we don't need
-            # to update the current state table
-            return
-
-        return
-
     def _add_to_cache(self, txn, events_and_contexts):
         to_prefill = []
 
@@ -1620,14 +1773,13 @@ class EventsStore(SQLBaseStore):
 
         def get_all_new_events_txn(txn):
             sql = (
-                "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group"
-                " FROM events as e"
-                " JOIN event_json as ej"
-                " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
-                " LEFT JOIN event_to_state_groups as eg"
-                " ON e.event_id = eg.event_id"
-                " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
-                " ORDER BY e.stream_ordering ASC"
+                "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
+                " state_key, redacts"
+                " FROM events AS e"
+                " LEFT JOIN redactions USING (event_id)"
+                " LEFT JOIN state_events USING (event_id)"
+                " WHERE ? < stream_ordering AND stream_ordering <= ?"
+                " ORDER BY stream_ordering ASC"
                 " LIMIT ?"
             )
             if have_forward_events:
@@ -1653,15 +1805,13 @@ class EventsStore(SQLBaseStore):
                 forward_ex_outliers = []
 
             sql = (
-                "SELECT -e.stream_ordering, ej.internal_metadata, ej.json,"
-                " eg.state_group"
-                " FROM events as e"
-                " JOIN event_json as ej"
-                " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
-                " LEFT JOIN event_to_state_groups as eg"
-                " ON e.event_id = eg.event_id"
-                " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
-                " ORDER BY e.stream_ordering DESC"
+                "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
+                " state_key, redacts"
+                " FROM events AS e"
+                " LEFT JOIN redactions USING (event_id)"
+                " LEFT JOIN state_events USING (event_id)"
+                " WHERE ? > stream_ordering AND stream_ordering >= ?"
+                " ORDER BY stream_ordering DESC"
                 " LIMIT ?"
             )
             if have_backfill_events:
diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py
index 86b37b9ddd..3b5e0a4fb9 100644
--- a/synapse/storage/keys.py
+++ b/synapse/storage/keys.py
@@ -101,9 +101,10 @@ class KeyStore(SQLBaseStore):
         key_ids
         Args:
             server_name (str): The name of the server.
-            key_ids (list of str): List of key_ids to try and look up.
+            key_ids (iterable[str]): key_ids to try and look up.
         Returns:
-            (list of VerifyKey): The verification keys.
+            Deferred: resolves to dict[str, VerifyKey]: map from
+               key_id to verification key.
         """
         keys = {}
         for key_id in key_ids:
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 545d3d3a99..e38d8927bf 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -274,24 +274,27 @@ class RoomMemberStore(SQLBaseStore):
 
         return rows
 
-    @cached(max_entries=500000, iterable=True)
+    @cachedInlineCallbacks(max_entries=500000, iterable=True)
     def get_rooms_for_user(self, user_id):
-        return self.get_rooms_for_user_where_membership_is(
+        """Returns a set of room_ids the user is currently joined to
+        """
+        rooms = yield self.get_rooms_for_user_where_membership_is(
             user_id, membership_list=[Membership.JOIN],
         )
+        defer.returnValue(frozenset(r.room_id for r in rooms))
 
     @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
     def get_users_who_share_room_with_user(self, user_id, cache_context):
         """Returns the set of users who share a room with `user_id`
         """
-        rooms = yield self.get_rooms_for_user(
+        room_ids = yield self.get_rooms_for_user(
             user_id, on_invalidate=cache_context.invalidate,
         )
 
         user_who_share_room = set()
-        for room in rooms:
+        for room_id in room_ids:
             user_ids = yield self.get_users_in_room(
-                room.room_id, on_invalidate=cache_context.invalidate,
+                room_id, on_invalidate=cache_context.invalidate,
             )
             user_who_share_room.update(user_ids)
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 27f1ec89ec..1b42bea07a 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -136,6 +136,16 @@ class StateStore(SQLBaseStore):
                 continue
 
             if context.current_state_ids is None:
+                # AFAIK, this can never happen
+                logger.error(
+                    "Non-outlier event %s had current_state_ids==None",
+                    event.event_id)
+                continue
+
+            # if the event was rejected, just give it the same state as its
+            # predecessor.
+            if context.rejected:
+                state_groups[event.event_id] = context.prev_group
                 continue
 
             state_groups[event.event_id] = context.state_group
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 46cf93ff87..95031dc9ec 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -30,6 +30,17 @@ class IdGenerator(object):
 
 
 def _load_current_id(db_conn, table, column, step=1):
+    """
+
+    Args:
+        db_conn (object):
+        table (str):
+        column (str):
+        step (int):
+
+    Returns:
+        int
+    """
     cur = db_conn.cursor()
     if step == 1:
         cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
@@ -131,6 +142,9 @@ class StreamIdGenerator(object):
     def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
+
+        Returns:
+            int
         """
         with self._lock:
             if self._unfinished_ids:
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 998de70d29..19595df422 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -189,7 +189,55 @@ class Cache(object):
         self.cache.clear()
 
 
-class CacheDescriptor(object):
+class _CacheDescriptorBase(object):
+    def __init__(self, orig, num_args, inlineCallbacks, cache_context=False):
+        self.orig = orig
+
+        if inlineCallbacks:
+            self.function_to_call = defer.inlineCallbacks(orig)
+        else:
+            self.function_to_call = orig
+
+        arg_spec = inspect.getargspec(orig)
+        all_args = arg_spec.args
+
+        if "cache_context" in all_args:
+            if not cache_context:
+                raise ValueError(
+                    "Cannot have a 'cache_context' arg without setting"
+                    " cache_context=True"
+                )
+        elif cache_context:
+            raise ValueError(
+                "Cannot have cache_context=True without having an arg"
+                " named `cache_context`"
+            )
+
+        if num_args is None:
+            num_args = len(all_args) - 1
+            if cache_context:
+                num_args -= 1
+
+        if len(all_args) < num_args + 1:
+            raise Exception(
+                "Not enough explicit positional arguments to key off for %r: "
+                "got %i args, but wanted %i. (@cached cannot key off *args or "
+                "**kwargs)"
+                % (orig.__name__, len(all_args), num_args)
+            )
+
+        self.num_args = num_args
+        self.arg_names = all_args[1:num_args + 1]
+
+        if "cache_context" in self.arg_names:
+            raise Exception(
+                "cache_context arg cannot be included among the cache keys"
+            )
+
+        self.add_cache_context = cache_context
+
+
+class CacheDescriptor(_CacheDescriptorBase):
     """ A method decorator that applies a memoizing cache around the function.
 
     This caches deferreds, rather than the results themselves. Deferreds that
@@ -217,52 +265,24 @@ class CacheDescriptor(object):
             r2 = yield self.bar2(key, on_invalidate=cache_context.invalidate)
             defer.returnValue(r1 + r2)
 
+    Args:
+        num_args (int): number of positional arguments (excluding ``self`` and
+            ``cache_context``) to use as cache keys. Defaults to all named
+            args of the function.
     """
-    def __init__(self, orig, max_entries=1000, num_args=1, tree=False,
+    def __init__(self, orig, max_entries=1000, num_args=None, tree=False,
                  inlineCallbacks=False, cache_context=False, iterable=False):
-        max_entries = int(max_entries * CACHE_SIZE_FACTOR)
 
-        self.orig = orig
+        super(CacheDescriptor, self).__init__(
+            orig, num_args=num_args, inlineCallbacks=inlineCallbacks,
+            cache_context=cache_context)
 
-        if inlineCallbacks:
-            self.function_to_call = defer.inlineCallbacks(orig)
-        else:
-            self.function_to_call = orig
+        max_entries = int(max_entries * CACHE_SIZE_FACTOR)
 
         self.max_entries = max_entries
-        self.num_args = num_args
         self.tree = tree
-
         self.iterable = iterable
 
-        all_args = inspect.getargspec(orig)
-        self.arg_names = all_args.args[1:num_args + 1]
-
-        if "cache_context" in all_args.args:
-            if not cache_context:
-                raise ValueError(
-                    "Cannot have a 'cache_context' arg without setting"
-                    " cache_context=True"
-                )
-            try:
-                self.arg_names.remove("cache_context")
-            except ValueError:
-                pass
-        elif cache_context:
-            raise ValueError(
-                "Cannot have cache_context=True without having an arg"
-                " named `cache_context`"
-            )
-
-        self.add_cache_context = cache_context
-
-        if len(self.arg_names) < self.num_args:
-            raise Exception(
-                "Not enough explicit positional arguments to key off of for %r."
-                " (@cached cannot key off of *args or **kwargs)"
-                % (orig.__name__,)
-            )
-
     def __get__(self, obj, objtype=None):
         cache = Cache(
             name=self.orig.__name__,
@@ -338,48 +358,36 @@ class CacheDescriptor(object):
         return wrapped
 
 
-class CacheListDescriptor(object):
+class CacheListDescriptor(_CacheDescriptorBase):
     """Wraps an existing cache to support bulk fetching of keys.
 
     Given a list of keys it looks in the cache to find any hits, then passes
     the list of missing keys to the wrapped fucntion.
     """
 
-    def __init__(self, orig, cached_method_name, list_name, num_args=1,
+    def __init__(self, orig, cached_method_name, list_name, num_args=None,
                  inlineCallbacks=False):
         """
         Args:
             orig (function)
-            method_name (str); The name of the chached method.
+            cached_method_name (str): The name of the chached method.
             list_name (str): Name of the argument which is the bulk lookup list
-            num_args (int)
+            num_args (int): number of positional arguments (excluding ``self``,
+                but including list_name) to use as cache keys. Defaults to all
+                named args of the function.
             inlineCallbacks (bool): Whether orig is a generator that should
                 be wrapped by defer.inlineCallbacks
         """
-        self.orig = orig
+        super(CacheListDescriptor, self).__init__(
+            orig, num_args=num_args, inlineCallbacks=inlineCallbacks)
 
-        if inlineCallbacks:
-            self.function_to_call = defer.inlineCallbacks(orig)
-        else:
-            self.function_to_call = orig
-
-        self.num_args = num_args
         self.list_name = list_name
 
-        self.arg_names = inspect.getargspec(orig).args[1:num_args + 1]
         self.list_pos = self.arg_names.index(self.list_name)
-
         self.cached_method_name = cached_method_name
 
         self.sentinel = object()
 
-        if len(self.arg_names) < self.num_args:
-            raise Exception(
-                "Not enough explicit positional arguments to key off of for %r."
-                " (@cached cannot key off of *args or **kwars)"
-                % (orig.__name__,)
-            )
-
         if self.list_name not in self.arg_names:
             raise Exception(
                 "Couldn't see arguments %r for %r."
@@ -487,7 +495,7 @@ class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
         self.cache.invalidate(self.key)
 
 
-def cached(max_entries=1000, num_args=1, tree=False, cache_context=False,
+def cached(max_entries=1000, num_args=None, tree=False, cache_context=False,
            iterable=False):
     return lambda orig: CacheDescriptor(
         orig,
@@ -499,8 +507,8 @@ def cached(max_entries=1000, num_args=1, tree=False, cache_context=False,
     )
 
 
-def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False,
-                          iterable=False):
+def cachedInlineCallbacks(max_entries=1000, num_args=None, tree=False,
+                          cache_context=False, iterable=False):
     return lambda orig: CacheDescriptor(
         orig,
         max_entries=max_entries,
@@ -512,7 +520,7 @@ def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_contex
     )
 
 
-def cachedList(cached_method_name, list_name, num_args=1, inlineCallbacks=False):
+def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=False):
     """Creates a descriptor that wraps a function in a `CacheListDescriptor`.
 
     Used to do batch lookups for an already created cache. A single argument
@@ -525,7 +533,8 @@ def cachedList(cached_method_name, list_name, num_args=1, inlineCallbacks=False)
         cache (Cache): The underlying cache to use.
         list_name (str): The name of the argument that is the list to use to
             do batch lookups in the cache.
-        num_args (int): Number of arguments to use as the key in the cache.
+        num_args (int): Number of arguments to use as the key in the cache
+            (including list_name). Defaults to all named parameters.
         inlineCallbacks (bool): Should the function be wrapped in an
             `defer.inlineCallbacks`?
 
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b72bb0ff02..70fe00ce0b 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -50,7 +50,7 @@ class StreamChangeCache(object):
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
         """
-        assert type(stream_pos) is int
+        assert type(stream_pos) is int or type(stream_pos) is long
 
         if stream_pos < self._earliest_known_stream_pos:
             self.metrics.inc_misses()
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 6c83eb213d..ff67b1d794 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -12,6 +12,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+""" Thread-local-alike tracking of log contexts within synapse
+
+This module provides objects and utilities for tracking contexts through
+synapse code, so that log lines can include a request identifier, and so that
+CPU and database activity can be accounted for against the request that caused
+them.
+
+See doc/log_contexts.rst for details on how this works.
+"""
+
 from twisted.internet import defer
 
 import threading
@@ -309,21 +319,43 @@ def preserve_context_over_deferred(deferred, context=None):
 
 
 def preserve_fn(f):
-    """Ensures that function is called with correct context and that context is
-    restored after return. Useful for wrapping functions that return a deferred
-    which you don't yield on.
+    """Wraps a function, to ensure that the current context is restored after
+    return from the function, and that the sentinel context is set once the
+    deferred returned by the funtion completes.
+
+    Useful for wrapping functions that return a deferred which you don't yield
+    on.
     """
+    def reset_context(result):
+        LoggingContext.set_current_context(LoggingContext.sentinel)
+        return result
+
+    # XXX: why is this here rather than inside g? surely we want to preserve
+    # the context from the time the function was called, not when it was
+    # wrapped?
     current = LoggingContext.current_context()
 
     def g(*args, **kwargs):
-        with PreserveLoggingContext(current):
-            res = f(*args, **kwargs)
-            if isinstance(res, defer.Deferred):
-                return preserve_context_over_deferred(
-                    res, context=LoggingContext.sentinel
-                )
-            else:
-                return res
+        res = f(*args, **kwargs)
+        if isinstance(res, defer.Deferred) and not res.called:
+            # The function will have reset the context before returning, so
+            # we need to restore it now.
+            LoggingContext.set_current_context(current)
+
+            # The original context will be restored when the deferred
+            # completes, but there is nothing waiting for it, so it will
+            # get leaked into the reactor or some other function which
+            # wasn't expecting it. We therefore need to reset the context
+            # here.
+            #
+            # (If this feels asymmetric, consider it this way: we are
+            # effectively forking a new thread of execution. We are
+            # probably currently within a ``with LoggingContext()`` block,
+            # which is supposed to have a single entry and exit point. But
+            # by spawning off another deferred, we are effectively
+            # adding a new exit point.)
+            res.addBoth(reset_context)
+        return res
     return g
 
 
diff --git a/synapse/util/msisdn.py b/synapse/util/msisdn.py
new file mode 100644
index 0000000000..607161e7f0
--- /dev/null
+++ b/synapse/util/msisdn.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import phonenumbers
+from synapse.api.errors import SynapseError
+
+
+def phone_number_to_msisdn(country, number):
+    """
+    Takes an ISO-3166-1 2 letter country code and phone number and
+    returns an msisdn representing the canonical version of that
+    phone number.
+    Args:
+        country (str): ISO-3166-1 2 letter country code
+        number (str): Phone number in a national or international format
+
+    Returns:
+        (str) The canonical form of the phone number, as an msisdn
+    Raises:
+            SynapseError if the number could not be parsed.
+    """
+    try:
+        phoneNumber = phonenumbers.parse(number, country)
+    except phonenumbers.NumberParseException:
+        raise SynapseError(400, "Unable to parse phone number")
+    return phonenumbers.format_number(
+        phoneNumber, phonenumbers.PhoneNumberFormat.E164
+    )[1:]
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 153ef001ad..b68e8c4e9f 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import synapse.util.logcontext
 from twisted.internet import defer
 
 from synapse.api.errors import CodeMessageException
@@ -173,4 +173,5 @@ class RetryDestinationLimiter(object):
                     "Failed to store set_destination_retry_timings",
                 )
 
-        store_retry_timings()
+        # we deliberately do this in the background.
+        synapse.util.logcontext.preserve_fn(store_retry_timings)()