summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/api/errors.py74
-rw-r--r--synapse/api/filtering.py37
-rw-r--r--synapse/app/synchrotron.py3
-rw-r--r--synapse/config/logger.py12
-rw-r--r--synapse/federation/transaction_queue.py5
-rw-r--r--synapse/handlers/auth.py32
-rw-r--r--synapse/handlers/device.py50
-rw-r--r--synapse/handlers/identity.py37
-rw-r--r--synapse/handlers/initial_sync.py11
-rw-r--r--synapse/handlers/presence.py95
-rw-r--r--synapse/handlers/profile.py8
-rw-r--r--synapse/handlers/receipts.py5
-rw-r--r--synapse/handlers/room_list.py66
-rw-r--r--synapse/handlers/sync.py82
-rw-r--r--synapse/http/matrixfederationclient.py15
-rw-r--r--synapse/http/servlet.py10
-rw-r--r--synapse/notifier.py66
-rw-r--r--synapse/push/mailer.py2
-rw-r--r--synapse/push/push_tools.py8
-rw-r--r--synapse/python_dependencies.py2
-rw-r--r--synapse/replication/slave/storage/_slaved_id_tracker.py5
-rw-r--r--synapse/replication/slave/storage/events.py4
-rw-r--r--synapse/rest/client/v1/login.py97
-rw-r--r--synapse/rest/client/v1/presence.py3
-rw-r--r--synapse/rest/client/v1/room.py3
-rw-r--r--synapse/rest/client/v2_alpha/account.py114
-rw-r--r--synapse/rest/client/v2_alpha/devices.py47
-rw-r--r--synapse/rest/client/v2_alpha/register.py137
-rw-r--r--synapse/rest/client/v2_alpha/sync.py20
-rw-r--r--synapse/rest/media/v1/download_resource.py12
-rw-r--r--synapse/rest/media/v1/media_repository.py44
-rw-r--r--synapse/storage/_base.py41
-rw-r--r--synapse/storage/deviceinbox.py6
-rw-r--r--synapse/storage/devices.py19
-rw-r--r--synapse/storage/events.py49
-rw-r--r--synapse/storage/roommember.py13
-rw-r--r--synapse/storage/state.py14
-rw-r--r--synapse/storage/stream.py3
-rw-r--r--synapse/storage/util/id_generators.py14
-rw-r--r--synapse/util/caches/stream_change_cache.py2
-rw-r--r--synapse/util/msisdn.py40
42 files changed, 1069 insertions, 240 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index ca23c9c460..489efb7f86 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -44,6 +45,7 @@ class JoinRules(object):
 class LoginType(object):
     PASSWORD = u"m.login.password"
     EMAIL_IDENTITY = u"m.login.email.identity"
+    MSISDN = u"m.login.msisdn"
     RECAPTCHA = u"m.login.recaptcha"
     DUMMY = u"m.login.dummy"
 
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 921c457738..6fbd5d6876 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -15,6 +15,7 @@
 
 """Contains exceptions and error codes."""
 
+import json
 import logging
 
 logger = logging.getLogger(__name__)
@@ -50,27 +51,35 @@ class Codes(object):
 
 
 class CodeMessageException(RuntimeError):
-    """An exception with integer code and message string attributes."""
+    """An exception with integer code and message string attributes.
 
+    Attributes:
+        code (int): HTTP error code
+        msg (str): string describing the error
+    """
     def __init__(self, code, msg):
         super(CodeMessageException, self).__init__("%d: %s" % (code, msg))
         self.code = code
         self.msg = msg
-        self.response_code_message = None
 
     def error_dict(self):
         return cs_error(self.msg)
 
 
 class SynapseError(CodeMessageException):
-    """A base error which can be caught for all synapse events."""
+    """A base exception type for matrix errors which have an errcode and error
+    message (as well as an HTTP status code).
+
+    Attributes:
+        errcode (str): Matrix error code e.g 'M_FORBIDDEN'
+    """
     def __init__(self, code, msg, errcode=Codes.UNKNOWN):
         """Constructs a synapse error.
 
         Args:
             code (int): The integer error code (an HTTP response code)
             msg (str): The human-readable error message.
-            err (str): The error code e.g 'M_FORBIDDEN'
+            errcode (str): The matrix error code e.g 'M_FORBIDDEN'
         """
         super(SynapseError, self).__init__(code, msg)
         self.errcode = errcode
@@ -81,6 +90,39 @@ class SynapseError(CodeMessageException):
             self.errcode,
         )
 
+    @classmethod
+    def from_http_response_exception(cls, err):
+        """Make a SynapseError based on an HTTPResponseException
+
+        This is useful when a proxied request has failed, and we need to
+        decide how to map the failure onto a matrix error to send back to the
+        client.
+
+        An attempt is made to parse the body of the http response as a matrix
+        error. If that succeeds, the errcode and error message from the body
+        are used as the errcode and error message in the new synapse error.
+
+        Otherwise, the errcode is set to M_UNKNOWN, and the error message is
+        set to the reason code from the HTTP response.
+
+        Args:
+            err (HttpResponseException):
+
+        Returns:
+            SynapseError:
+        """
+        # try to parse the body as json, to get better errcode/msg, but
+        # default to M_UNKNOWN with the HTTP status as the error text
+        try:
+            j = json.loads(err.response)
+        except ValueError:
+            j = {}
+        errcode = j.get('errcode', Codes.UNKNOWN)
+        errmsg = j.get('error', err.msg)
+
+        res = SynapseError(err.code, errmsg, errcode)
+        return res
+
 
 class RegistrationError(SynapseError):
     """An error raised when a registration event fails."""
@@ -106,13 +148,11 @@ class UnrecognizedRequestError(SynapseError):
 
 class NotFoundError(SynapseError):
     """An error indicating we can't find the thing you asked for"""
-    def __init__(self, *args, **kwargs):
-        if "errcode" not in kwargs:
-            kwargs["errcode"] = Codes.NOT_FOUND
+    def __init__(self, msg="Not found", errcode=Codes.NOT_FOUND):
         super(NotFoundError, self).__init__(
             404,
-            "Not found",
-            **kwargs
+            msg,
+            errcode=errcode
         )
 
 
@@ -173,7 +213,6 @@ class LimitExceededError(SynapseError):
                  errcode=Codes.LIMIT_EXCEEDED):
         super(LimitExceededError, self).__init__(code, msg, errcode)
         self.retry_after_ms = retry_after_ms
-        self.response_code_message = "Too Many Requests"
 
     def error_dict(self):
         return cs_error(
@@ -243,6 +282,19 @@ class FederationError(RuntimeError):
 
 
 class HttpResponseException(CodeMessageException):
+    """
+    Represents an HTTP-level failure of an outbound request
+
+    Attributes:
+        response (str): body of response
+    """
     def __init__(self, code, msg, response):
-        self.response = response
+        """
+
+        Args:
+            code (int): HTTP status code
+            msg (str): reason phrase from HTTP response status line
+            response (str): body of response
+        """
         super(HttpResponseException, self).__init__(code, msg)
+        self.response = response
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index fb291d7fb9..47f0cf0fa9 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from synapse.api.errors import SynapseError
+from synapse.storage.presence import UserPresenceState
 from synapse.types import UserID, RoomID
 
 from twisted.internet import defer
@@ -253,19 +254,35 @@ class Filter(object):
         Returns:
             bool: True if the event matches
         """
-        sender = event.get("sender", None)
-        if not sender:
-            # Presence events have their 'sender' in content.user_id
-            content = event.get("content")
-            # account_data has been allowed to have non-dict content, so check type first
-            if isinstance(content, dict):
-                sender = content.get("user_id")
+        # We usually get the full "events" as dictionaries coming through,
+        # except for presence which actually gets passed around as its own
+        # namedtuple type.
+        if isinstance(event, UserPresenceState):
+            sender = event.user_id
+            room_id = None
+            ev_type = "m.presence"
+            is_url = False
+        else:
+            sender = event.get("sender", None)
+            if not sender:
+                # Presence events had their 'sender' in content.user_id, but are
+                # now handled above. We don't know if anything else uses this
+                # form. TODO: Check this and probably remove it.
+                content = event.get("content")
+                # account_data has been allowed to have non-dict content, so
+                # check type first
+                if isinstance(content, dict):
+                    sender = content.get("user_id")
+
+            room_id = event.get("room_id", None)
+            ev_type = event.get("type", None)
+            is_url = "url" in event.get("content", {})
 
         return self.check_fields(
-            event.get("room_id", None),
+            room_id,
             sender,
-            event.get("type", None),
-            "url" in event.get("content", {})
+            ev_type,
+            is_url,
         )
 
     def check_fields(self, room_id, sender, event_type, contains_url):
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 29f075aa5f..449fac771b 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -399,8 +399,7 @@ class SynchrotronServer(HomeServer):
                 position = row[position_index]
                 user_id = row[user_index]
 
-                rooms = yield store.get_rooms_for_user(user_id)
-                room_ids = [r.room_id for r in rooms]
+                room_ids = yield store.get_rooms_for_user(user_id)
 
                 notifier.on_new_event(
                     "device_list_key", position, rooms=room_ids,
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index e5d945e5b8..2dbeafa9dd 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -45,7 +45,6 @@ handlers:
     maxBytes: 104857600
     backupCount: 10
     filters: [context]
-    level: INFO
   console:
     class: logging.StreamHandler
     formatter: precise
@@ -56,6 +55,8 @@ loggers:
         level: INFO
 
     synapse.storage.SQL:
+        # beware: increasing this to DEBUG will make synapse log sensitive
+        # information such as access tokens.
         level: INFO
 
 root:
@@ -78,10 +79,10 @@ class LoggingConfig(Config):
             os.path.join(config_dir_path, server_name + ".log.config")
         )
         return """
-        # Logging verbosity level.
+        # Logging verbosity level. Ignored if log_config is specified.
         verbose: 0
 
-        # File to write logging to
+        # File to write logging to. Ignored if log_config is specified.
         log_file: "%(log_file)s"
 
         # A yaml python logging config file
@@ -102,11 +103,12 @@ class LoggingConfig(Config):
         logging_group = parser.add_argument_group("logging")
         logging_group.add_argument(
             '-v', '--verbose', dest="verbose", action='count',
-            help="The verbosity level."
+            help="The verbosity level. Specify multiple times to increase "
+            "verbosity. (Ignored if --log-config is specified.)"
         )
         logging_group.add_argument(
             '-f', '--log-file', dest="log_file",
-            help="File to log to."
+            help="File to log to. (Ignored if --log-config is specified.)"
         )
         logging_group.add_argument(
             '--log-config', dest="log_config", default=None,
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 90235ff098..c802dd67a3 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -99,7 +99,12 @@ class TransactionQueue(object):
         # destination -> list of tuple(failure, deferred)
         self.pending_failures_by_dest = {}
 
+        # destination -> stream_id of last successfully sent to-device message.
+        # NB: may be a long or an int.
         self.last_device_stream_id_by_dest = {}
+
+        # destination -> stream_id of last successfully sent device list
+        # update.
         self.last_device_list_stream_id_by_dest = {}
 
         # HACK to get unique tx id
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index fffba34383..e7a1bb7246 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -47,6 +48,7 @@ class AuthHandler(BaseHandler):
             LoginType.PASSWORD: self._check_password_auth,
             LoginType.RECAPTCHA: self._check_recaptcha,
             LoginType.EMAIL_IDENTITY: self._check_email_identity,
+            LoginType.MSISDN: self._check_msisdn,
             LoginType.DUMMY: self._check_dummy_auth,
         }
         self.bcrypt_rounds = hs.config.bcrypt_rounds
@@ -307,31 +309,47 @@ class AuthHandler(BaseHandler):
                 defer.returnValue(True)
         raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
 
-    @defer.inlineCallbacks
     def _check_email_identity(self, authdict, _):
+        return self._check_threepid('email', authdict)
+
+    def _check_msisdn(self, authdict, _):
+        return self._check_threepid('msisdn', authdict)
+
+    @defer.inlineCallbacks
+    def _check_dummy_auth(self, authdict, _):
+        yield run_on_reactor()
+        defer.returnValue(True)
+
+    @defer.inlineCallbacks
+    def _check_threepid(self, medium, authdict):
         yield run_on_reactor()
 
         if 'threepid_creds' not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
         threepid_creds = authdict['threepid_creds']
+
         identity_handler = self.hs.get_handlers().identity_handler
 
-        logger.info("Getting validated threepid. threepidcreds: %r" % (threepid_creds,))
+        logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
         threepid = yield identity_handler.threepid_from_creds(threepid_creds)
 
         if not threepid:
             raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
 
+        if threepid['medium'] != medium:
+            raise LoginError(
+                401,
+                "Expecting threepid of type '%s', got '%s'" % (
+                    medium, threepid['medium'],
+                ),
+                errcode=Codes.UNAUTHORIZED
+            )
+
         threepid['threepid_creds'] = authdict['threepid_creds']
 
         defer.returnValue(threepid)
 
-    @defer.inlineCallbacks
-    def _check_dummy_auth(self, authdict, _):
-        yield run_on_reactor()
-        defer.returnValue(True)
-
     def _get_params_recaptcha(self):
         return {"public_key": self.hs.config.recaptcha_public_key}
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index e859b3165f..c22f65ce5d 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -170,6 +170,40 @@ class DeviceHandler(BaseHandler):
         yield self.notify_device_update(user_id, [device_id])
 
     @defer.inlineCallbacks
+    def delete_devices(self, user_id, device_ids):
+        """ Delete several devices
+
+        Args:
+            user_id (str):
+            device_ids (str): The list of device IDs to delete
+
+        Returns:
+            defer.Deferred:
+        """
+
+        try:
+            yield self.store.delete_devices(user_id, device_ids)
+        except errors.StoreError, e:
+            if e.code == 404:
+                # no match
+                pass
+            else:
+                raise
+
+        # Delete access tokens and e2e keys for each device. Not optimised as it is not
+        # considered as part of a critical path.
+        for device_id in device_ids:
+            yield self.store.user_delete_access_tokens(
+                user_id, device_id=device_id,
+                delete_refresh_tokens=True,
+            )
+            yield self.store.delete_e2e_keys_by_device(
+                user_id=user_id, device_id=device_id
+            )
+
+        yield self.notify_device_update(user_id, device_ids)
+
+    @defer.inlineCallbacks
     def update_device(self, user_id, device_id, content):
         """ Update the given device
 
@@ -214,8 +248,7 @@ class DeviceHandler(BaseHandler):
             user_id, device_ids, list(hosts)
         )
 
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        room_ids = [r.room_id for r in rooms]
+        room_ids = yield self.store.get_rooms_for_user(user_id)
 
         yield self.notifier.on_new_event(
             "device_list_key", position, rooms=room_ids,
@@ -236,8 +269,7 @@ class DeviceHandler(BaseHandler):
             user_id (str)
             from_token (StreamToken)
         """
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        room_ids = set(r.room_id for r in rooms)
+        room_ids = yield self.store.get_rooms_for_user(user_id)
 
         # First we check if any devices have changed
         changed = yield self.store.get_user_whose_devices_changed(
@@ -262,7 +294,7 @@ class DeviceHandler(BaseHandler):
                 # ordering: treat it the same as a new room
                 event_ids = []
 
-            current_state_ids = yield self.state.get_current_state_ids(room_id)
+            current_state_ids = yield self.store.get_current_state_ids(room_id)
 
             # special-case for an empty prev state: include all members
             # in the changed list
@@ -313,8 +345,8 @@ class DeviceHandler(BaseHandler):
     @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
         user_id = user.to_string()
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        if not rooms:
+        room_ids = yield self.store.get_rooms_for_user(user_id)
+        if not room_ids:
             # We no longer share rooms with this user, so we'll no longer
             # receive device updates. Mark this in DB.
             yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
@@ -370,8 +402,8 @@ class DeviceListEduUpdater(object):
             logger.warning("Got device list update edu for %r from %r", user_id, origin)
             return
 
-        rooms = yield self.store.get_rooms_for_user(user_id)
-        if not rooms:
+        room_ids = yield self.store.get_rooms_for_user(user_id)
+        if not room_ids:
             # We don't share any rooms with this user. Ignore update, as we
             # probably won't get any further updates.
             return
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 559e5d5a71..6a53c5eb47 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -150,7 +151,7 @@ class IdentityHandler(BaseHandler):
         params.update(kwargs)
 
         try:
-            data = yield self.http_client.post_urlencoded_get_json(
+            data = yield self.http_client.post_json_get_json(
                 "https://%s%s" % (
                     id_server,
                     "/_matrix/identity/api/v1/validate/email/requestToken"
@@ -161,3 +162,37 @@ class IdentityHandler(BaseHandler):
         except CodeMessageException as e:
             logger.info("Proxied requestToken failed: %r", e)
             raise e
+
+    @defer.inlineCallbacks
+    def requestMsisdnToken(
+            self, id_server, country, phone_number,
+            client_secret, send_attempt, **kwargs
+    ):
+        yield run_on_reactor()
+
+        if not self._should_trust_id_server(id_server):
+            raise SynapseError(
+                400, "Untrusted ID server '%s'" % id_server,
+                Codes.SERVER_NOT_TRUSTED
+            )
+
+        params = {
+            'country': country,
+            'phone_number': phone_number,
+            'client_secret': client_secret,
+            'send_attempt': send_attempt,
+        }
+        params.update(kwargs)
+
+        try:
+            data = yield self.http_client.post_json_get_json(
+                "https://%s%s" % (
+                    id_server,
+                    "/_matrix/identity/api/v1/validate/msisdn/requestToken"
+                ),
+                params
+            )
+            defer.returnValue(data)
+        except CodeMessageException as e:
+            logger.info("Proxied requestToken failed: %r", e)
+            raise e
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index e0ade4c164..10f5f35a69 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -19,6 +19,7 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes
 from synapse.events.utils import serialize_event
 from synapse.events.validator import EventValidator
+from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
 from synapse.types import (
     UserID, StreamToken,
@@ -225,9 +226,17 @@ class InitialSyncHandler(BaseHandler):
                 "content": content,
             })
 
+        now = self.clock.time_msec()
+
         ret = {
             "rooms": rooms_ret,
-            "presence": presence,
+            "presence": [
+                {
+                    "type": "m.presence",
+                    "content": format_user_presence_state(event, now),
+                }
+                for event in presence
+            ],
             "account_data": account_data_events,
             "receipts": receipt,
             "end": now_token.to_string(),
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index da610e430f..059260a8aa 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -29,6 +29,7 @@ from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
 from synapse.storage.presence import UserPresenceState
 
+from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.logcontext import preserve_fn
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
@@ -556,9 +557,9 @@ class PresenceHandler(object):
         room_ids_to_states = {}
         users_to_states = {}
         for state in states:
-            events = yield self.store.get_rooms_for_user(state.user_id)
-            for e in events:
-                room_ids_to_states.setdefault(e.room_id, []).append(state)
+            room_ids = yield self.store.get_rooms_for_user(state.user_id)
+            for room_id in room_ids:
+                room_ids_to_states.setdefault(room_id, []).append(state)
 
             plist = yield self.store.get_presence_list_observers_accepted(state.user_id)
             for u in plist:
@@ -719,9 +720,7 @@ class PresenceHandler(object):
                 for state in updates
             ])
         else:
-            defer.returnValue([
-                format_user_presence_state(state, now) for state in updates
-            ])
+            defer.returnValue(updates)
 
     @defer.inlineCallbacks
     def set_state(self, target_user, state, ignore_status_msg=False):
@@ -795,6 +794,9 @@ class PresenceHandler(object):
             as_event=False,
         )
 
+        now = self.clock.time_msec()
+        results[:] = [format_user_presence_state(r, now) for r in results]
+
         is_accepted = {
             row["observed_user_id"]: row["accepted"] for row in presence_list
         }
@@ -847,6 +849,7 @@ class PresenceHandler(object):
             )
 
             state_dict = yield self.get_state(observed_user, as_event=False)
+            state_dict = format_user_presence_state(state_dict, self.clock.time_msec())
 
             self.federation.send_edu(
                 destination=observer_user.domain,
@@ -910,11 +913,12 @@ class PresenceHandler(object):
     def is_visible(self, observed_user, observer_user):
         """Returns whether a user can see another user's presence.
         """
-        observer_rooms = yield self.store.get_rooms_for_user(observer_user.to_string())
-        observed_rooms = yield self.store.get_rooms_for_user(observed_user.to_string())
-
-        observer_room_ids = set(r.room_id for r in observer_rooms)
-        observed_room_ids = set(r.room_id for r in observed_rooms)
+        observer_room_ids = yield self.store.get_rooms_for_user(
+            observer_user.to_string()
+        )
+        observed_room_ids = yield self.store.get_rooms_for_user(
+            observed_user.to_string()
+        )
 
         if observer_room_ids & observed_room_ids:
             defer.returnValue(True)
@@ -979,14 +983,18 @@ def should_notify(old_state, new_state):
     return False
 
 
-def format_user_presence_state(state, now):
+def format_user_presence_state(state, now, include_user_id=True):
     """Convert UserPresenceState to a format that can be sent down to clients
     and to other servers.
+
+    The "user_id" is optional so that this function can be used to format presence
+    updates for client /sync responses and for federation /send requests.
     """
     content = {
         "presence": state.state,
-        "user_id": state.user_id,
     }
+    if include_user_id:
+        content["user_id"] = state.user_id
     if state.last_active_ts:
         content["last_active_ago"] = now - state.last_active_ts
     if state.status_msg and state.state != PresenceState.OFFLINE:
@@ -1025,7 +1033,6 @@ class PresenceEventSource(object):
         # sending down the rare duplicate is not a concern.
 
         with Measure(self.clock, "presence.get_new_events"):
-            user_id = user.to_string()
             if from_key is not None:
                 from_key = int(from_key)
 
@@ -1034,18 +1041,7 @@ class PresenceEventSource(object):
 
             max_token = self.store.get_current_presence_token()
 
-            plist = yield self.store.get_presence_list_accepted(user.localpart)
-            users_interested_in = set(row["observed_user_id"] for row in plist)
-            users_interested_in.add(user_id)  # So that we receive our own presence
-
-            users_who_share_room = yield self.store.get_users_who_share_room_with_user(
-                user_id
-            )
-            users_interested_in.update(users_who_share_room)
-
-            if explicit_room_id:
-                user_ids = yield self.store.get_users_in_room(explicit_room_id)
-                users_interested_in.update(user_ids)
+            users_interested_in = yield self._get_interested_in(user, explicit_room_id)
 
             user_ids_changed = set()
             changed = None
@@ -1073,16 +1069,13 @@ class PresenceEventSource(object):
 
             updates = yield presence.current_state_for_users(user_ids_changed)
 
-        now = self.clock.time_msec()
-
-        defer.returnValue(([
-            {
-                "type": "m.presence",
-                "content": format_user_presence_state(s, now),
-            }
-            for s in updates.values()
-            if include_offline or s.state != PresenceState.OFFLINE
-        ], max_token))
+        if include_offline:
+            defer.returnValue((updates.values(), max_token))
+        else:
+            defer.returnValue(([
+                s for s in updates.itervalues()
+                if s.state != PresenceState.OFFLINE
+            ], max_token))
 
     def get_current_key(self):
         return self.store.get_current_presence_token()
@@ -1090,6 +1083,31 @@ class PresenceEventSource(object):
     def get_pagination_rows(self, user, pagination_config, key):
         return self.get_new_events(user, from_key=None, include_offline=False)
 
+    @cachedInlineCallbacks(num_args=2, cache_context=True)
+    def _get_interested_in(self, user, explicit_room_id, cache_context):
+        """Returns the set of users that the given user should see presence
+        updates for
+        """
+        user_id = user.to_string()
+        plist = yield self.store.get_presence_list_accepted(
+            user.localpart, on_invalidate=cache_context.invalidate,
+        )
+        users_interested_in = set(row["observed_user_id"] for row in plist)
+        users_interested_in.add(user_id)  # So that we receive our own presence
+
+        users_who_share_room = yield self.store.get_users_who_share_room_with_user(
+            user_id, on_invalidate=cache_context.invalidate,
+        )
+        users_interested_in.update(users_who_share_room)
+
+        if explicit_room_id:
+            user_ids = yield self.store.get_users_in_room(
+                explicit_room_id, on_invalidate=cache_context.invalidate,
+            )
+            users_interested_in.update(user_ids)
+
+        defer.returnValue(users_interested_in)
+
 
 def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     """Checks the presence of users that have timed out and updates as
@@ -1157,7 +1175,10 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
         # If there are have been no sync for a while (and none ongoing),
         # set presence to offline
         if user_id not in syncing_user_ids:
-            if now - state.last_user_sync_ts > SYNC_ONLINE_TIMEOUT:
+            # If the user has done something recently but hasn't synced,
+            # don't set them as offline.
+            sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
+            if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
                 state = state.copy_and_replace(
                     state=PresenceState.OFFLINE,
                     status_msg=None,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 87f74dfb8e..abd1fb28cb 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -156,11 +156,11 @@ class ProfileHandler(BaseHandler):
 
         self.ratelimit(requester)
 
-        joins = yield self.store.get_rooms_for_user(
+        room_ids = yield self.store.get_rooms_for_user(
             user.to_string(),
         )
 
-        for j in joins:
+        for room_id in room_ids:
             handler = self.hs.get_handlers().room_member_handler
             try:
                 # Assume the user isn't a guest because we don't let guests set
@@ -171,12 +171,12 @@ class ProfileHandler(BaseHandler):
                 yield handler.update_membership(
                     requester,
                     user,
-                    j.room_id,
+                    room_id,
                     "join",  # We treat a profile update like a join.
                     ratelimit=False,  # Try to hide that these events aren't atomic.
                 )
             except Exception as e:
                 logger.warn(
                     "Failed to update join event for room %s - %s",
-                    j.room_id, str(e.message)
+                    room_id, str(e.message)
                 )
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 50aa513935..e1cd3a48e9 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -210,10 +210,9 @@ class ReceiptEventSource(object):
         else:
             from_key = None
 
-        rooms = yield self.store.get_rooms_for_user(user.to_string())
-        rooms = [room.room_id for room in rooms]
+        room_ids = yield self.store.get_rooms_for_user(user.to_string())
         events = yield self.store.get_linearized_receipts_for_rooms(
-            rooms,
+            room_ids,
             from_key=from_key,
             to_key=to_key,
         )
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 19eebbd43f..516cd9a6ac 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -21,6 +21,7 @@ from synapse.api.constants import (
     EventTypes, JoinRules,
 )
 from synapse.util.async import concurrently_execute
+from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.types import ThirdPartyInstanceID
 
@@ -62,6 +63,10 @@ class RoomListHandler(BaseHandler):
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
         """
+        logger.info(
+            "Getting public room list: limit=%r, since=%r, search=%r, network=%r",
+            limit, since_token, bool(search_filter), network_tuple,
+        )
         if search_filter:
             # We explicitly don't bother caching searches or requests for
             # appservice specific lists.
@@ -91,7 +96,6 @@ class RoomListHandler(BaseHandler):
 
         rooms_to_order_value = {}
         rooms_to_num_joined = {}
-        rooms_to_latest_event_ids = {}
 
         newly_visible = []
         newly_unpublished = []
@@ -116,19 +120,26 @@ class RoomListHandler(BaseHandler):
 
         @defer.inlineCallbacks
         def get_order_for_room(room_id):
-            latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
-            if not latest_event_ids:
+            # Most of the rooms won't have changed between the since token and
+            # now (especially if the since token is "now"). So, we can ask what
+            # the current users are in a room (that will hit a cache) and then
+            # check if the room has changed since the since token. (We have to
+            # do it in that order to avoid races).
+            # If things have changed then fall back to getting the current state
+            # at the since token.
+            joined_users = yield self.store.get_users_in_room(room_id)
+            if self.store.has_room_changed_since(room_id, stream_token):
                 latest_event_ids = yield self.store.get_forward_extremeties_for_room(
                     room_id, stream_token
                 )
-                rooms_to_latest_event_ids[room_id] = latest_event_ids
 
-            if not latest_event_ids:
-                return
+                if not latest_event_ids:
+                    return
+
+                joined_users = yield self.state_handler.get_current_user_in_room(
+                    room_id, latest_event_ids,
+                )
 
-            joined_users = yield self.state_handler.get_current_user_in_room(
-                room_id, latest_event_ids,
-            )
             num_joined_users = len(joined_users)
             rooms_to_num_joined[room_id] = num_joined_users
 
@@ -165,19 +176,19 @@ class RoomListHandler(BaseHandler):
                 rooms_to_scan = rooms_to_scan[:since_token.current_limit]
                 rooms_to_scan.reverse()
 
-        # Actually generate the entries. _generate_room_entry will append to
+        # Actually generate the entries. _append_room_entry_to_chunk will append to
         # chunk but will stop if len(chunk) > limit
         chunk = []
         if limit and not search_filter:
             step = limit + 1
             for i in xrange(0, len(rooms_to_scan), step):
                 # We iterate here because the vast majority of cases we'll stop
-                # at first iteration, but occaisonally _generate_room_entry
+                # at first iteration, but occaisonally _append_room_entry_to_chunk
                 # won't append to the chunk and so we need to loop again.
                 # We don't want to scan over the entire range either as that
                 # would potentially waste a lot of work.
                 yield concurrently_execute(
-                    lambda r: self._generate_room_entry(
+                    lambda r: self._append_room_entry_to_chunk(
                         r, rooms_to_num_joined[r],
                         chunk, limit, search_filter
                     ),
@@ -187,7 +198,7 @@ class RoomListHandler(BaseHandler):
                     break
         else:
             yield concurrently_execute(
-                lambda r: self._generate_room_entry(
+                lambda r: self._append_room_entry_to_chunk(
                     r, rooms_to_num_joined[r],
                     chunk, limit, search_filter
                 ),
@@ -256,21 +267,35 @@ class RoomListHandler(BaseHandler):
         defer.returnValue(results)
 
     @defer.inlineCallbacks
-    def _generate_room_entry(self, room_id, num_joined_users, chunk, limit,
-                             search_filter):
+    def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit,
+                                    search_filter):
+        """Generate the entry for a room in the public room list and append it
+        to the `chunk` if it matches the search filter
+        """
         if limit and len(chunk) > limit + 1:
             # We've already got enough, so lets just drop it.
             return
 
+        result = yield self._generate_room_entry(room_id, num_joined_users)
+
+        if result and _matches_room_entry(result, search_filter):
+            chunk.append(result)
+
+    @cachedInlineCallbacks(num_args=1, cache_context=True)
+    def _generate_room_entry(self, room_id, num_joined_users, cache_context):
+        """Returns the entry for a room
+        """
         result = {
             "room_id": room_id,
             "num_joined_members": num_joined_users,
         }
 
-        current_state_ids = yield self.state_handler.get_current_state_ids(room_id)
+        current_state_ids = yield self.store.get_current_state_ids(
+            room_id, on_invalidate=cache_context.invalidate,
+        )
 
         event_map = yield self.store.get_events([
-            event_id for key, event_id in current_state_ids.items()
+            event_id for key, event_id in current_state_ids.iteritems()
             if key[0] in (
                 EventTypes.JoinRules,
                 EventTypes.Name,
@@ -294,7 +319,9 @@ class RoomListHandler(BaseHandler):
             if join_rule and join_rule != JoinRules.PUBLIC:
                 defer.returnValue(None)
 
-        aliases = yield self.store.get_aliases_for_room(room_id)
+        aliases = yield self.store.get_aliases_for_room(
+            room_id, on_invalidate=cache_context.invalidate
+        )
         if aliases:
             result["aliases"] = aliases
 
@@ -334,8 +361,7 @@ class RoomListHandler(BaseHandler):
             if avatar_url:
                 result["avatar_url"] = avatar_url
 
-        if _matches_room_entry(result, search_filter):
-            chunk.append(result)
+        defer.returnValue(result)
 
     @defer.inlineCallbacks
     def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5572cb883f..c0205da1a9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.util.metrics import Measure, measure_func
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.visibility import filter_events_for_client
+from synapse.types import RoomStreamToken
 
 from twisted.internet import defer
 
@@ -225,8 +226,7 @@ class SyncHandler(object):
         with Measure(self.clock, "ephemeral_by_room"):
             typing_key = since_token.typing_key if since_token else "0"
 
-            rooms = yield self.store.get_rooms_for_user(sync_config.user.to_string())
-            room_ids = [room.room_id for room in rooms]
+            room_ids = yield self.store.get_rooms_for_user(sync_config.user.to_string())
 
             typing_source = self.event_sources.sources["typing"]
             typing, typing_key = yield typing_source.get_new_events(
@@ -568,16 +568,15 @@ class SyncHandler(object):
         since_token = sync_result_builder.since_token
 
         if since_token and since_token.device_list_key:
-            rooms = yield self.store.get_rooms_for_user(user_id)
-            room_ids = set(r.room_id for r in rooms)
+            room_ids = yield self.store.get_rooms_for_user(user_id)
 
             user_ids_changed = set()
             changed = yield self.store.get_user_whose_devices_changed(
                 since_token.device_list_key
             )
             for other_user_id in changed:
-                other_rooms = yield self.store.get_rooms_for_user(other_user_id)
-                if room_ids.intersection(e.room_id for e in other_rooms):
+                other_room_ids = yield self.store.get_rooms_for_user(other_user_id)
+                if room_ids.intersection(other_room_ids):
                     user_ids_changed.add(other_user_id)
 
             defer.returnValue(user_ids_changed)
@@ -721,14 +720,14 @@ class SyncHandler(object):
             extra_users_ids.update(users)
         extra_users_ids.discard(user.to_string())
 
-        states = yield self.presence_handler.get_states(
-            extra_users_ids,
-            as_event=True,
-        )
-        presence.extend(states)
+        if extra_users_ids:
+            states = yield self.presence_handler.get_states(
+                extra_users_ids,
+            )
+            presence.extend(states)
 
-        # Deduplicate the presence entries so that there's at most one per user
-        presence = {p["content"]["user_id"]: p for p in presence}.values()
+            # Deduplicate the presence entries so that there's at most one per user
+            presence = {p.user_id: p for p in presence}.values()
 
         presence = sync_config.filter_collection.filter_presence(
             presence
@@ -765,6 +764,21 @@ class SyncHandler(object):
             )
             sync_result_builder.now_token = now_token
 
+        # We check up front if anything has changed, if it hasn't then there is
+        # no point in going futher.
+        since_token = sync_result_builder.since_token
+        if not sync_result_builder.full_state:
+            if since_token and not ephemeral_by_room and not account_data_by_room:
+                have_changed = yield self._have_rooms_changed(sync_result_builder)
+                if not have_changed:
+                    tags_by_room = yield self.store.get_updated_tags(
+                        user_id,
+                        since_token.account_data_key,
+                    )
+                    if not tags_by_room:
+                        logger.debug("no-oping sync")
+                        defer.returnValue(([], []))
+
         ignored_account_data = yield self.store.get_global_account_data_by_type_for_user(
             "m.ignored_user_list", user_id=user_id,
         )
@@ -774,13 +788,12 @@ class SyncHandler(object):
         else:
             ignored_users = frozenset()
 
-        if sync_result_builder.since_token:
+        if since_token:
             res = yield self._get_rooms_changed(sync_result_builder, ignored_users)
             room_entries, invited, newly_joined_rooms = res
 
             tags_by_room = yield self.store.get_updated_tags(
-                user_id,
-                sync_result_builder.since_token.account_data_key,
+                user_id, since_token.account_data_key,
             )
         else:
             res = yield self._get_all_rooms(sync_result_builder, ignored_users)
@@ -805,7 +818,7 @@ class SyncHandler(object):
 
         # Now we want to get any newly joined users
         newly_joined_users = set()
-        if sync_result_builder.since_token:
+        if since_token:
             for joined_sync in sync_result_builder.joined:
                 it = itertools.chain(
                     joined_sync.timeline.events, joined_sync.state.values()
@@ -818,6 +831,38 @@ class SyncHandler(object):
         defer.returnValue((newly_joined_rooms, newly_joined_users))
 
     @defer.inlineCallbacks
+    def _have_rooms_changed(self, sync_result_builder):
+        """Returns whether there may be any new events that should be sent down
+        the sync. Returns True if there are.
+        """
+        user_id = sync_result_builder.sync_config.user.to_string()
+        since_token = sync_result_builder.since_token
+        now_token = sync_result_builder.now_token
+
+        assert since_token
+
+        # Get a list of membership change events that have happened.
+        rooms_changed = yield self.store.get_membership_changes_for_user(
+            user_id, since_token.room_key, now_token.room_key
+        )
+
+        if rooms_changed:
+            defer.returnValue(True)
+
+        app_service = self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            rooms = yield self.store.get_app_service_rooms(app_service)
+            joined_room_ids = set(r.room_id for r in rooms)
+        else:
+            joined_room_ids = yield self.store.get_rooms_for_user(user_id)
+
+        stream_id = RoomStreamToken.parse_stream_token(since_token.room_key).stream
+        for room_id in joined_room_ids:
+            if self.store.has_room_changed_since(room_id, stream_id):
+                defer.returnValue(True)
+        defer.returnValue(False)
+
+    @defer.inlineCallbacks
     def _get_rooms_changed(self, sync_result_builder, ignored_users):
         """Gets the the changes that have happened since the last sync.
 
@@ -841,8 +886,7 @@ class SyncHandler(object):
             rooms = yield self.store.get_app_service_rooms(app_service)
             joined_room_ids = set(r.room_id for r in rooms)
         else:
-            rooms = yield self.store.get_rooms_for_user(user_id)
-            joined_room_ids = set(r.room_id for r in rooms)
+            joined_room_ids = yield self.store.get_rooms_for_user(user_id)
 
         # Get a list of membership change events that have happened.
         rooms_changed = yield self.store.get_membership_changes_for_user(
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 78b92cef36..82586e3dea 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -108,6 +108,12 @@ class MatrixFederationHttpClient(object):
                         query_bytes=b"", retry_on_dns_fail=True,
                         timeout=None, long_retries=False):
         """ Creates and sends a request to the given url
+
+        Returns:
+            Deferred: resolves with the http response object on success.
+
+            Fails with ``HTTPRequestException``: if we get an HTTP response
+            code >= 300.
         """
         headers_dict[b"User-Agent"] = [self.version_string]
         headers_dict[b"Host"] = [destination]
@@ -408,8 +414,11 @@ class MatrixFederationHttpClient(object):
             output_stream (file): File to write the response body to.
             args (dict): Optional dictionary used to create the query string.
         Returns:
-            A (int,dict) tuple of the file length and a dict of the response
-            headers.
+            Deferred: resolves with an (int,dict) tuple of the file length and
+            a dict of the response headers.
+
+            Fails with ``HTTPRequestException`` if we get an HTTP response code
+            >= 300
         """
 
         encoded_args = {}
@@ -419,7 +428,7 @@ class MatrixFederationHttpClient(object):
             encoded_args[k] = [v.encode("UTF-8") for v in vs]
 
         query_bytes = urllib.urlencode(encoded_args, True)
-        logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
+        logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
 
         def body_callback(method, url_bytes, headers_dict):
             self.sign_request(destination, method, url_bytes, headers_dict)
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 8c22d6f00f..9a4c36ad5d 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -192,6 +192,16 @@ def parse_json_object_from_request(request):
     return content
 
 
+def assert_params_in_request(body, required):
+    absent = []
+    for k in required:
+        if k not in body:
+            absent.append(k)
+
+    if len(absent) > 0:
+        raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+
+
 class RestServlet(object):
 
     """ A Synapse REST Servlet.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 8051a7a842..7eeba6d28e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
+from synapse.handlers.presence import format_user_presence_state
 
 from synapse.util import DeferredTimedOutError
 from synapse.util.logutils import log_function
@@ -37,6 +38,10 @@ metrics = synapse.metrics.get_metrics_for(__name__)
 
 notified_events_counter = metrics.register_counter("notified_events")
 
+users_woken_by_stream_counter = metrics.register_counter(
+    "users_woken_by_stream", labels=["stream"]
+)
+
 
 # TODO(paul): Should be shared somewhere
 def count(func, l):
@@ -73,6 +78,13 @@ class _NotifierUserStream(object):
         self.user_id = user_id
         self.rooms = set(rooms)
         self.current_token = current_token
+
+        # The last token for which we should wake up any streams that have a
+        # token that comes before it. This gets updated everytime we get poked.
+        # We start it at the current token since if we get any streams
+        # that have a token from before we have no idea whether they should be
+        # woken up or not, so lets just wake them up.
+        self.last_notified_token = current_token
         self.last_notified_ms = time_now_ms
 
         with PreserveLoggingContext():
@@ -89,9 +101,12 @@ class _NotifierUserStream(object):
         self.current_token = self.current_token.copy_and_advance(
             stream_key, stream_id
         )
+        self.last_notified_token = self.current_token
         self.last_notified_ms = time_now_ms
         noify_deferred = self.notify_deferred
 
+        users_woken_by_stream_counter.inc(stream_key)
+
         with PreserveLoggingContext():
             self.notify_deferred = ObservableDeferred(defer.Deferred())
             noify_deferred.callback(self.current_token)
@@ -113,8 +128,14 @@ class _NotifierUserStream(object):
     def new_listener(self, token):
         """Returns a deferred that is resolved when there is a new token
         greater than the given token.
+
+        Args:
+            token: The token from which we are streaming from, i.e. we shouldn't
+                notify for things that happened before this.
         """
-        if self.current_token.is_after(token):
+        # Immediately wake up stream if something has already since happened
+        # since their last token.
+        if self.last_notified_token.is_after(token):
             return _NotificationListener(defer.succeed(self.current_token))
         else:
             return _NotificationListener(self.notify_deferred.observe())
@@ -283,8 +304,7 @@ class Notifier(object):
         if user_stream is None:
             current_token = yield self.event_sources.get_current_token()
             if room_ids is None:
-                rooms = yield self.store.get_rooms_for_user(user_id)
-                room_ids = [room.room_id for room in rooms]
+                room_ids = yield self.store.get_rooms_for_user(user_id)
             user_stream = _NotifierUserStream(
                 user_id=user_id,
                 rooms=room_ids,
@@ -294,40 +314,44 @@ class Notifier(object):
             self._register_with_keys(user_stream)
 
         result = None
+        prev_token = from_token
         if timeout:
             end_time = self.clock.time_msec() + timeout
 
-            prev_token = from_token
             while not result:
                 try:
-                    current_token = user_stream.current_token
-
-                    result = yield callback(prev_token, current_token)
-                    if result:
-                        break
-
                     now = self.clock.time_msec()
                     if end_time <= now:
                         break
 
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
-                    # We need to supply the token we supplied to callback so
-                    # that we don't miss any current_token updates.
-                    prev_token = current_token
                     listener = user_stream.new_listener(prev_token)
                     with PreserveLoggingContext():
                         yield self.clock.time_bound_deferred(
                             listener.deferred,
                             time_out=(end_time - now) / 1000.
                         )
+
+                    current_token = user_stream.current_token
+
+                    result = yield callback(prev_token, current_token)
+                    if result:
+                        break
+
+                    # Update the prev_token to the current_token since nothing
+                    # has happened between the old prev_token and the current_token
+                    prev_token = current_token
                 except DeferredTimedOutError:
                     break
                 except defer.CancelledError:
                     break
-        else:
+
+        if result is None:
+            # This happened if there was no timeout or if the timeout had
+            # already expired.
             current_token = user_stream.current_token
-            result = yield callback(from_token, current_token)
+            result = yield callback(prev_token, current_token)
 
         defer.returnValue(result)
 
@@ -388,6 +412,15 @@ class Notifier(object):
                         new_events,
                         is_peeking=is_peeking,
                     )
+                elif name == "presence":
+                    now = self.clock.time_msec()
+                    new_events[:] = [
+                        {
+                            "type": "m.presence",
+                            "content": format_user_presence_state(event, now),
+                        }
+                        for event in new_events
+                    ]
 
                 events.extend(new_events)
                 end_token = end_token.copy_and_replace(keyname, new_key)
@@ -420,8 +453,7 @@ class Notifier(object):
 
     @defer.inlineCallbacks
     def _get_room_ids(self, user, explicit_room_id):
-        joined_rooms = yield self.store.get_rooms_for_user(user.to_string())
-        joined_room_ids = map(lambda r: r.room_id, joined_rooms)
+        joined_room_ids = yield self.store.get_rooms_for_user(user.to_string())
         if explicit_room_id:
             if explicit_room_id in joined_room_ids:
                 defer.returnValue(([explicit_room_id], True))
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 62d794f22b..3a50c72e0b 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -139,7 +139,7 @@ class Mailer(object):
 
         @defer.inlineCallbacks
         def _fetch_room_state(room_id):
-            room_state = yield self.state_handler.get_current_state_ids(room_id)
+            room_state = yield self.store.get_current_state_ids(room_id)
             state_by_room[room_id] = room_state
 
         # Run at most 3 of these at once: sync does 10 at a time but email
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index a27476bbad..287df94b4f 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -33,13 +33,13 @@ def get_badge_count(store, user_id):
 
     badge = len(invites)
 
-    for r in joins:
-        if r.room_id in my_receipts_by_room:
-            last_unread_event_id = my_receipts_by_room[r.room_id]
+    for room_id in joins:
+        if room_id in my_receipts_by_room:
+            last_unread_event_id = my_receipts_by_room[room_id]
 
             notifs = yield (
                 store.get_unread_event_push_actions_by_room_for_user(
-                    r.room_id, user_id, last_unread_event_id
+                    room_id, user_id, last_unread_event_id
                 )
             )
             # return one badge count per conversation, as count per
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 7817b0cd91..c4777b2a2b 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -1,4 +1,5 @@
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -37,6 +38,7 @@ REQUIREMENTS = {
     "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
     "pymacaroons-pynacl": ["pymacaroons"],
     "msgpack-python>=0.3.0": ["msgpack"],
+    "phonenumbers>=8.2.0": ["phonenumbers"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
diff --git a/synapse/replication/slave/storage/_slaved_id_tracker.py b/synapse/replication/slave/storage/_slaved_id_tracker.py
index 24b5c79d4a..9d1d173b2f 100644
--- a/synapse/replication/slave/storage/_slaved_id_tracker.py
+++ b/synapse/replication/slave/storage/_slaved_id_tracker.py
@@ -27,4 +27,9 @@ class SlavedIdTracker(object):
         self._current = (max if self.step > 0 else min)(self._current, new_id)
 
     def get_current_token(self):
+        """
+
+        Returns:
+            int
+        """
         return self._current
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 622b2d8540..518c9ea2e9 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -109,6 +109,10 @@ class SlavedEventStore(BaseSlavedStore):
     get_recent_event_ids_for_room = (
         StreamStore.__dict__["get_recent_event_ids_for_room"]
     )
+    get_current_state_ids = (
+        StateStore.__dict__["get_current_state_ids"]
+    )
+    has_room_changed_since = DataStore.has_room_changed_since.__func__
 
     get_unread_push_actions_for_user_in_range_for_http = (
         DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 72057f1b0c..a43410fb37 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -19,6 +19,7 @@ from synapse.api.errors import SynapseError, LoginError, Codes
 from synapse.types import UserID
 from synapse.http.server import finish_request
 from synapse.http.servlet import parse_json_object_from_request
+from synapse.util.msisdn import phone_number_to_msisdn
 
 from .base import ClientV1RestServlet, client_path_patterns
 
@@ -33,10 +34,55 @@ from saml2.client import Saml2Client
 
 import xml.etree.ElementTree as ET
 
+from twisted.web.client import PartialDownloadError
+
 
 logger = logging.getLogger(__name__)
 
 
+def login_submission_legacy_convert(submission):
+    """
+    If the input login submission is an old style object
+    (ie. with top-level user / medium / address) convert it
+    to a typed object.
+    """
+    if "user" in submission:
+        submission["identifier"] = {
+            "type": "m.id.user",
+            "user": submission["user"],
+        }
+        del submission["user"]
+
+    if "medium" in submission and "address" in submission:
+        submission["identifier"] = {
+            "type": "m.id.thirdparty",
+            "medium": submission["medium"],
+            "address": submission["address"],
+        }
+        del submission["medium"]
+        del submission["address"]
+
+
+def login_id_thirdparty_from_phone(identifier):
+    """
+    Convert a phone login identifier type to a generic threepid identifier
+    Args:
+        identifier(dict): Login identifier dict of type 'm.id.phone'
+
+    Returns: Login identifier dict of type 'm.id.threepid'
+    """
+    if "country" not in identifier or "number" not in identifier:
+        raise SynapseError(400, "Invalid phone-type identifier")
+
+    msisdn = phone_number_to_msisdn(identifier["country"], identifier["number"])
+
+    return {
+        "type": "m.id.thirdparty",
+        "medium": "msisdn",
+        "address": msisdn,
+    }
+
+
 class LoginRestServlet(ClientV1RestServlet):
     PATTERNS = client_path_patterns("/login$")
     PASS_TYPE = "m.login.password"
@@ -117,20 +163,52 @@ class LoginRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def do_password_login(self, login_submission):
-        if 'medium' in login_submission and 'address' in login_submission:
-            address = login_submission['address']
-            if login_submission['medium'] == 'email':
+        if "password" not in login_submission:
+            raise SynapseError(400, "Missing parameter: password")
+
+        login_submission_legacy_convert(login_submission)
+
+        if "identifier" not in login_submission:
+            raise SynapseError(400, "Missing param: identifier")
+
+        identifier = login_submission["identifier"]
+        if "type" not in identifier:
+            raise SynapseError(400, "Login identifier has no type")
+
+        # convert phone type identifiers to generic threepids
+        if identifier["type"] == "m.id.phone":
+            identifier = login_id_thirdparty_from_phone(identifier)
+
+        # convert threepid identifiers to user IDs
+        if identifier["type"] == "m.id.thirdparty":
+            if 'medium' not in identifier or 'address' not in identifier:
+                raise SynapseError(400, "Invalid thirdparty identifier")
+
+            address = identifier['address']
+            if identifier['medium'] == 'email':
                 # For emails, transform the address to lowercase.
                 # We store all email addreses as lowercase in the DB.
                 # (See add_threepid in synapse/handlers/auth.py)
                 address = address.lower()
             user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
-                login_submission['medium'], address
+                identifier['medium'], address
             )
             if not user_id:
                 raise LoginError(403, "", errcode=Codes.FORBIDDEN)
-        else:
-            user_id = login_submission['user']
+
+            identifier = {
+                "type": "m.id.user",
+                "user": user_id,
+            }
+
+        # by this point, the identifier should be an m.id.user: if it's anything
+        # else, we haven't understood it.
+        if identifier["type"] != "m.id.user":
+            raise SynapseError(400, "Unknown login identifier type")
+        if "user" not in identifier:
+            raise SynapseError(400, "User identifier is missing 'user' key")
+
+        user_id = identifier["user"]
 
         if not user_id.startswith('@'):
             user_id = UserID.create(
@@ -341,7 +419,12 @@ class CasTicketServlet(ClientV1RestServlet):
             "ticket": request.args["ticket"],
             "service": self.cas_service_url
         }
-        body = yield http_client.get_raw(uri, args)
+        try:
+            body = yield http_client.get_raw(uri, args)
+        except PartialDownloadError as pde:
+            # Twisted raises this error if the connection is closed,
+            # even if that's being used old-http style to signal end-of-data
+            body = pde.response
         result = yield self.handle_cas_response(request, body, client_redirect_url)
         defer.returnValue(result)
 
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index eafdce865e..47b2dc45e7 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
 
 from synapse.api.errors import SynapseError, AuthError
 from synapse.types import UserID
+from synapse.handlers.presence import format_user_presence_state
 from synapse.http.servlet import parse_json_object_from_request
 from .base import ClientV1RestServlet, client_path_patterns
 
@@ -33,6 +34,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
     def __init__(self, hs):
         super(PresenceStatusRestServlet, self).__init__(hs)
         self.presence_handler = hs.get_presence_handler()
+        self.clock = hs.get_clock()
 
     @defer.inlineCallbacks
     def on_GET(self, request, user_id):
@@ -48,6 +50,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
                 raise AuthError(403, "You are not allowed to see their presence.")
 
         state = yield self.presence_handler.get_state(target_user=user)
+        state = format_user_presence_state(state, self.clock.time_msec())
 
         defer.returnValue((200, state))
 
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 90242a6bac..0bdd6b5b36 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -748,8 +748,7 @@ class JoinedRoomsRestServlet(ClientV1RestServlet):
     def on_GET(self, request):
         requester = yield self.auth.get_user_by_req(request, allow_guest=True)
 
-        rooms = yield self.store.get_rooms_for_user(requester.user.to_string())
-        room_ids = set(r.room_id for r in rooms)  # Ensure they're unique.
+        room_ids = yield self.store.get_rooms_for_user(requester.user.to_string())
         defer.returnValue((200, {"joined_rooms": list(room_ids)}))
 
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 398e7f5eb0..aac76edf1c 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -17,8 +18,11 @@ from twisted.internet import defer
 
 from synapse.api.constants import LoginType
 from synapse.api.errors import LoginError, SynapseError, Codes
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import (
+    RestServlet, parse_json_object_from_request, assert_params_in_request
+)
 from synapse.util.async import run_on_reactor
+from synapse.util.msisdn import phone_number_to_msisdn
 
 from ._base import client_v2_patterns
 
@@ -28,11 +32,11 @@ import logging
 logger = logging.getLogger(__name__)
 
 
-class PasswordRequestTokenRestServlet(RestServlet):
+class EmailPasswordRequestTokenRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/account/password/email/requestToken$")
 
     def __init__(self, hs):
-        super(PasswordRequestTokenRestServlet, self).__init__()
+        super(EmailPasswordRequestTokenRestServlet, self).__init__()
         self.hs = hs
         self.identity_handler = hs.get_handlers().identity_handler
 
@@ -40,14 +44,9 @@ class PasswordRequestTokenRestServlet(RestServlet):
     def on_POST(self, request):
         body = parse_json_object_from_request(request)
 
-        required = ['id_server', 'client_secret', 'email', 'send_attempt']
-        absent = []
-        for k in required:
-            if k not in body:
-                absent.append(k)
-
-        if absent:
-            raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+        assert_params_in_request(body, [
+            'id_server', 'client_secret', 'email', 'send_attempt'
+        ])
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             'email', body['email']
@@ -60,6 +59,37 @@ class PasswordRequestTokenRestServlet(RestServlet):
         defer.returnValue((200, ret))
 
 
+class MsisdnPasswordRequestTokenRestServlet(RestServlet):
+    PATTERNS = client_v2_patterns("/account/password/msisdn/requestToken$")
+
+    def __init__(self, hs):
+        super(MsisdnPasswordRequestTokenRestServlet, self).__init__()
+        self.hs = hs
+        self.datastore = self.hs.get_datastore()
+        self.identity_handler = hs.get_handlers().identity_handler
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        body = parse_json_object_from_request(request)
+
+        assert_params_in_request(body, [
+            'id_server', 'client_secret',
+            'country', 'phone_number', 'send_attempt',
+        ])
+
+        msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+
+        existingUid = yield self.datastore.get_user_id_by_threepid(
+            'msisdn', msisdn
+        )
+
+        if existingUid is None:
+            raise SynapseError(400, "MSISDN not found", Codes.THREEPID_NOT_FOUND)
+
+        ret = yield self.identity_handler.requestMsisdnToken(**body)
+        defer.returnValue((200, ret))
+
+
 class PasswordRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/account/password$")
 
@@ -68,6 +98,7 @@ class PasswordRestServlet(RestServlet):
         self.hs = hs
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
+        self.datastore = self.hs.get_datastore()
 
     @defer.inlineCallbacks
     def on_POST(self, request):
@@ -77,7 +108,8 @@ class PasswordRestServlet(RestServlet):
 
         authed, result, params, _ = yield self.auth_handler.check_auth([
             [LoginType.PASSWORD],
-            [LoginType.EMAIL_IDENTITY]
+            [LoginType.EMAIL_IDENTITY],
+            [LoginType.MSISDN],
         ], body, self.hs.get_ip_from_request(request))
 
         if not authed:
@@ -102,7 +134,7 @@ class PasswordRestServlet(RestServlet):
                 # (See add_threepid in synapse/handlers/auth.py)
                 threepid['address'] = threepid['address'].lower()
             # if using email, we must know about the email they're authing with!
-            threepid_user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
+            threepid_user_id = yield self.datastore.get_user_id_by_threepid(
                 threepid['medium'], threepid['address']
             )
             if not threepid_user_id:
@@ -169,13 +201,14 @@ class DeactivateAccountRestServlet(RestServlet):
         defer.returnValue((200, {}))
 
 
-class ThreepidRequestTokenRestServlet(RestServlet):
+class EmailThreepidRequestTokenRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/account/3pid/email/requestToken$")
 
     def __init__(self, hs):
         self.hs = hs
-        super(ThreepidRequestTokenRestServlet, self).__init__()
+        super(EmailThreepidRequestTokenRestServlet, self).__init__()
         self.identity_handler = hs.get_handlers().identity_handler
+        self.datastore = self.hs.get_datastore()
 
     @defer.inlineCallbacks
     def on_POST(self, request):
@@ -190,7 +223,7 @@ class ThreepidRequestTokenRestServlet(RestServlet):
         if absent:
             raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
 
-        existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
+        existingUid = yield self.datastore.get_user_id_by_threepid(
             'email', body['email']
         )
 
@@ -201,6 +234,44 @@ class ThreepidRequestTokenRestServlet(RestServlet):
         defer.returnValue((200, ret))
 
 
+class MsisdnThreepidRequestTokenRestServlet(RestServlet):
+    PATTERNS = client_v2_patterns("/account/3pid/msisdn/requestToken$")
+
+    def __init__(self, hs):
+        self.hs = hs
+        super(MsisdnThreepidRequestTokenRestServlet, self).__init__()
+        self.identity_handler = hs.get_handlers().identity_handler
+        self.datastore = self.hs.get_datastore()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        body = parse_json_object_from_request(request)
+
+        required = [
+            'id_server', 'client_secret',
+            'country', 'phone_number', 'send_attempt',
+        ]
+        absent = []
+        for k in required:
+            if k not in body:
+                absent.append(k)
+
+        if absent:
+            raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+
+        msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+
+        existingUid = yield self.datastore.get_user_id_by_threepid(
+            'msisdn', msisdn
+        )
+
+        if existingUid is not None:
+            raise SynapseError(400, "MSISDN is already in use", Codes.THREEPID_IN_USE)
+
+        ret = yield self.identity_handler.requestEmailToken(**body)
+        defer.returnValue((200, ret))
+
+
 class ThreepidRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/account/3pid$")
 
@@ -210,6 +281,7 @@ class ThreepidRestServlet(RestServlet):
         self.identity_handler = hs.get_handlers().identity_handler
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
+        self.datastore = self.hs.get_datastore()
 
     @defer.inlineCallbacks
     def on_GET(self, request):
@@ -217,7 +289,7 @@ class ThreepidRestServlet(RestServlet):
 
         requester = yield self.auth.get_user_by_req(request)
 
-        threepids = yield self.hs.get_datastore().user_get_threepids(
+        threepids = yield self.datastore.user_get_threepids(
             requester.user.to_string()
         )
 
@@ -258,7 +330,7 @@ class ThreepidRestServlet(RestServlet):
 
         if 'bind' in body and body['bind']:
             logger.debug(
-                "Binding emails %s to %s",
+                "Binding threepid %s to %s",
                 threepid, user_id
             )
             yield self.identity_handler.bind_threepid(
@@ -302,9 +374,11 @@ class ThreepidDeleteRestServlet(RestServlet):
 
 
 def register_servlets(hs, http_server):
-    PasswordRequestTokenRestServlet(hs).register(http_server)
+    EmailPasswordRequestTokenRestServlet(hs).register(http_server)
+    MsisdnPasswordRequestTokenRestServlet(hs).register(http_server)
     PasswordRestServlet(hs).register(http_server)
     DeactivateAccountRestServlet(hs).register(http_server)
-    ThreepidRequestTokenRestServlet(hs).register(http_server)
+    EmailThreepidRequestTokenRestServlet(hs).register(http_server)
+    MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
     ThreepidRestServlet(hs).register(http_server)
     ThreepidDeleteRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py
index a1feaf3d54..b57ba95d24 100644
--- a/synapse/rest/client/v2_alpha/devices.py
+++ b/synapse/rest/client/v2_alpha/devices.py
@@ -46,6 +46,52 @@ class DevicesRestServlet(servlet.RestServlet):
         defer.returnValue((200, {"devices": devices}))
 
 
+class DeleteDevicesRestServlet(servlet.RestServlet):
+    """
+    API for bulk deletion of devices. Accepts a JSON object with a devices
+    key which lists the device_ids to delete. Requires user interactive auth.
+    """
+    PATTERNS = client_v2_patterns("/delete_devices", releases=[], v2_alpha=False)
+
+    def __init__(self, hs):
+        super(DeleteDevicesRestServlet, self).__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.device_handler = hs.get_device_handler()
+        self.auth_handler = hs.get_auth_handler()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        try:
+            body = servlet.parse_json_object_from_request(request)
+        except errors.SynapseError as e:
+            if e.errcode == errors.Codes.NOT_JSON:
+                # deal with older clients which didn't pass a J*DELETESON dict
+                # the same as those that pass an empty dict
+                body = {}
+            else:
+                raise e
+
+        if 'devices' not in body:
+            raise errors.SynapseError(
+                400, "No devices supplied", errcode=errors.Codes.MISSING_PARAM
+            )
+
+        authed, result, params, _ = yield self.auth_handler.check_auth([
+            [constants.LoginType.PASSWORD],
+        ], body, self.hs.get_ip_from_request(request))
+
+        if not authed:
+            defer.returnValue((401, result))
+
+        requester = yield self.auth.get_user_by_req(request)
+        yield self.device_handler.delete_devices(
+            requester.user.to_string(),
+            body['devices'],
+        )
+        defer.returnValue((200, {}))
+
+
 class DeviceRestServlet(servlet.RestServlet):
     PATTERNS = client_v2_patterns("/devices/(?P<device_id>[^/]*)$",
                                   releases=[], v2_alpha=False)
@@ -111,5 +157,6 @@ class DeviceRestServlet(servlet.RestServlet):
 
 
 def register_servlets(hs, http_server):
+    DeleteDevicesRestServlet(hs).register(http_server)
     DevicesRestServlet(hs).register(http_server)
     DeviceRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index ccca5a12d5..dcd13b876f 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2017 Vector Creations Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -19,7 +20,10 @@ import synapse
 from synapse.api.auth import get_access_token_from_request, has_access_token
 from synapse.api.constants import LoginType
 from synapse.api.errors import SynapseError, Codes, UnrecognizedRequestError
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import (
+    RestServlet, parse_json_object_from_request, assert_params_in_request
+)
+from synapse.util.msisdn import phone_number_to_msisdn
 
 from ._base import client_v2_patterns
 
@@ -43,7 +47,7 @@ else:
 logger = logging.getLogger(__name__)
 
 
-class RegisterRequestTokenRestServlet(RestServlet):
+class EmailRegisterRequestTokenRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/register/email/requestToken$")
 
     def __init__(self, hs):
@@ -51,7 +55,7 @@ class RegisterRequestTokenRestServlet(RestServlet):
         Args:
             hs (synapse.server.HomeServer): server
         """
-        super(RegisterRequestTokenRestServlet, self).__init__()
+        super(EmailRegisterRequestTokenRestServlet, self).__init__()
         self.hs = hs
         self.identity_handler = hs.get_handlers().identity_handler
 
@@ -59,14 +63,9 @@ class RegisterRequestTokenRestServlet(RestServlet):
     def on_POST(self, request):
         body = parse_json_object_from_request(request)
 
-        required = ['id_server', 'client_secret', 'email', 'send_attempt']
-        absent = []
-        for k in required:
-            if k not in body:
-                absent.append(k)
-
-        if len(absent) > 0:
-            raise SynapseError(400, "Missing params: %r" % absent, Codes.MISSING_PARAM)
+        assert_params_in_request(body, [
+            'id_server', 'client_secret', 'email', 'send_attempt'
+        ])
 
         existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
             'email', body['email']
@@ -79,6 +78,43 @@ class RegisterRequestTokenRestServlet(RestServlet):
         defer.returnValue((200, ret))
 
 
+class MsisdnRegisterRequestTokenRestServlet(RestServlet):
+    PATTERNS = client_v2_patterns("/register/msisdn/requestToken$")
+
+    def __init__(self, hs):
+        """
+        Args:
+            hs (synapse.server.HomeServer): server
+        """
+        super(MsisdnRegisterRequestTokenRestServlet, self).__init__()
+        self.hs = hs
+        self.identity_handler = hs.get_handlers().identity_handler
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        body = parse_json_object_from_request(request)
+
+        assert_params_in_request(body, [
+            'id_server', 'client_secret',
+            'country', 'phone_number',
+            'send_attempt',
+        ])
+
+        msisdn = phone_number_to_msisdn(body['country'], body['phone_number'])
+
+        existingUid = yield self.hs.get_datastore().get_user_id_by_threepid(
+            'msisdn', msisdn
+        )
+
+        if existingUid is not None:
+            raise SynapseError(
+                400, "Phone number is already in use", Codes.THREEPID_IN_USE
+            )
+
+        ret = yield self.identity_handler.requestMsisdnToken(**body)
+        defer.returnValue((200, ret))
+
+
 class RegisterRestServlet(RestServlet):
     PATTERNS = client_v2_patterns("/register$")
 
@@ -200,16 +236,37 @@ class RegisterRestServlet(RestServlet):
                 assigned_user_id=registered_user_id,
             )
 
+        # Only give msisdn flows if the x_show_msisdn flag is given:
+        # this is a hack to work around the fact that clients were shipped
+        # that use fallback registration if they see any flows that they don't
+        # recognise, which means we break registration for these clients if we
+        # advertise msisdn flows. Once usage of Riot iOS <=0.3.9 and Riot
+        # Android <=0.6.9 have fallen below an acceptable threshold, this
+        # parameter should go away and we should always advertise msisdn flows.
+        show_msisdn = False
+        if 'x_show_msisdn' in body and body['x_show_msisdn']:
+            show_msisdn = True
+
         if self.hs.config.enable_registration_captcha:
             flows = [
                 [LoginType.RECAPTCHA],
-                [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA]
+                [LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
             ]
+            if show_msisdn:
+                flows.extend([
+                    [LoginType.MSISDN, LoginType.RECAPTCHA],
+                    [LoginType.MSISDN, LoginType.EMAIL_IDENTITY, LoginType.RECAPTCHA],
+                ])
         else:
             flows = [
                 [LoginType.DUMMY],
-                [LoginType.EMAIL_IDENTITY]
+                [LoginType.EMAIL_IDENTITY],
             ]
+            if show_msisdn:
+                flows.extend([
+                    [LoginType.MSISDN],
+                    [LoginType.MSISDN, LoginType.EMAIL_IDENTITY],
+                ])
 
         authed, auth_result, params, session_id = yield self.auth_handler.check_auth(
             flows, body, self.hs.get_ip_from_request(request)
@@ -224,8 +281,9 @@ class RegisterRestServlet(RestServlet):
                 "Already registered user ID %r for this session",
                 registered_user_id
             )
-            # don't re-register the email address
+            # don't re-register the threepids
             add_email = False
+            add_msisdn = False
         else:
             # NB: This may be from the auth handler and NOT from the POST
             if 'password' not in params:
@@ -250,6 +308,7 @@ class RegisterRestServlet(RestServlet):
             )
 
             add_email = True
+            add_msisdn = True
 
         return_dict = yield self._create_registration_details(
             registered_user_id, params
@@ -262,6 +321,13 @@ class RegisterRestServlet(RestServlet):
                 params.get("bind_email")
             )
 
+        if add_msisdn and auth_result and LoginType.MSISDN in auth_result:
+            threepid = auth_result[LoginType.MSISDN]
+            yield self._register_msisdn_threepid(
+                registered_user_id, threepid, return_dict["access_token"],
+                params.get("bind_msisdn")
+            )
+
         defer.returnValue((200, return_dict))
 
     def on_OPTIONS(self, _):
@@ -323,8 +389,9 @@ class RegisterRestServlet(RestServlet):
         """
         reqd = ('medium', 'address', 'validated_at')
         if any(x not in threepid for x in reqd):
+            # This will only happen if the ID server returns a malformed response
             logger.info("Can't add incomplete 3pid")
-            defer.returnValue()
+            return
 
         yield self.auth_handler.add_threepid(
             user_id,
@@ -372,6 +439,43 @@ class RegisterRestServlet(RestServlet):
             logger.info("bind_email not specified: not binding email")
 
     @defer.inlineCallbacks
+    def _register_msisdn_threepid(self, user_id, threepid, token, bind_msisdn):
+        """Add a phone number as a 3pid identifier
+
+        Also optionally binds msisdn to the given user_id on the identity server
+
+        Args:
+            user_id (str): id of user
+            threepid (object): m.login.msisdn auth response
+            token (str): access_token for the user
+            bind_email (bool): true if the client requested the email to be
+                bound at the identity server
+        Returns:
+            defer.Deferred:
+        """
+        reqd = ('medium', 'address', 'validated_at')
+        if any(x not in threepid for x in reqd):
+            # This will only happen if the ID server returns a malformed response
+            logger.info("Can't add incomplete 3pid")
+            defer.returnValue()
+
+        yield self.auth_handler.add_threepid(
+            user_id,
+            threepid['medium'],
+            threepid['address'],
+            threepid['validated_at'],
+        )
+
+        if bind_msisdn:
+            logger.info("bind_msisdn specified: binding")
+            logger.debug("Binding msisdn %s to %s", threepid, user_id)
+            yield self.identity_handler.bind_threepid(
+                threepid['threepid_creds'], user_id
+            )
+        else:
+            logger.info("bind_msisdn not specified: not binding msisdn")
+
+    @defer.inlineCallbacks
     def _create_registration_details(self, user_id, params):
         """Complete registration of newly-registered user
 
@@ -449,5 +553,6 @@ class RegisterRestServlet(RestServlet):
 
 
 def register_servlets(hs, http_server):
-    RegisterRequestTokenRestServlet(hs).register(http_server)
+    EmailRegisterRequestTokenRestServlet(hs).register(http_server)
+    MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
     RegisterRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index b3d8001638..a7a9e0a794 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
 from synapse.http.servlet import (
     RestServlet, parse_string, parse_integer, parse_boolean
 )
+from synapse.handlers.presence import format_user_presence_state
 from synapse.handlers.sync import SyncConfig
 from synapse.types import StreamToken
 from synapse.events.utils import (
@@ -28,7 +29,6 @@ from synapse.api.errors import SynapseError
 from synapse.api.constants import PresenceState
 from ._base import client_v2_patterns
 
-import copy
 import itertools
 import logging
 
@@ -194,12 +194,18 @@ class SyncRestServlet(RestServlet):
         defer.returnValue((200, response_content))
 
     def encode_presence(self, events, time_now):
-        formatted = []
-        for event in events:
-            event = copy.deepcopy(event)
-            event['sender'] = event['content'].pop('user_id')
-            formatted.append(event)
-        return {"events": formatted}
+        return {
+            "events": [
+                {
+                    "type": "m.presence",
+                    "sender": event.user_id,
+                    "content": format_user_presence_state(
+                        event, time_now, include_user_id=False
+                    ),
+                }
+                for event in events
+            ]
+        }
 
     def encode_joined(self, rooms, time_now, token_id, event_fields):
         """
diff --git a/synapse/rest/media/v1/download_resource.py b/synapse/rest/media/v1/download_resource.py
index dfb87ffd15..6788375e85 100644
--- a/synapse/rest/media/v1/download_resource.py
+++ b/synapse/rest/media/v1/download_resource.py
@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import synapse.http.servlet
 
 from ._base import parse_media_id, respond_with_file, respond_404
 from twisted.web.resource import Resource
@@ -81,6 +82,17 @@ class DownloadResource(Resource):
 
     @defer.inlineCallbacks
     def _respond_remote_file(self, request, server_name, media_id, name):
+        # don't forward requests for remote media if allow_remote is false
+        allow_remote = synapse.http.servlet.parse_boolean(
+            request, "allow_remote", default=True)
+        if not allow_remote:
+            logger.info(
+                "Rejecting request for remote media %s/%s due to allow_remote",
+                server_name, media_id,
+            )
+            respond_404(request)
+            return
+
         media_info = yield self.media_repo.get_remote_media(server_name, media_id)
 
         media_type = media_info["media_type"]
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 481ffee200..c43b185e08 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -13,22 +13,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from twisted.internet import defer, threads
+import twisted.internet.error
+import twisted.web.http
+from twisted.web.resource import Resource
+
 from .upload_resource import UploadResource
 from .download_resource import DownloadResource
 from .thumbnail_resource import ThumbnailResource
 from .identicon_resource import IdenticonResource
 from .preview_url_resource import PreviewUrlResource
 from .filepath import MediaFilePaths
-
-from twisted.web.resource import Resource
-
 from .thumbnailer import Thumbnailer
 
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.util.stringutils import random_string
-from synapse.api.errors import SynapseError
-
-from twisted.internet import defer, threads
+from synapse.api.errors import SynapseError, HttpResponseException, \
+    NotFoundError
 
 from synapse.util.async import Linearizer
 from synapse.util.stringutils import is_ascii
@@ -157,11 +158,34 @@ class MediaRepository(object):
                 try:
                     length, headers = yield self.client.get_file(
                         server_name, request_path, output_stream=f,
-                        max_size=self.max_upload_size,
+                        max_size=self.max_upload_size, args={
+                            # tell the remote server to 404 if it doesn't
+                            # recognise the server_name, to make sure we don't
+                            # end up with a routing loop.
+                            "allow_remote": "false",
+                        }
                     )
-                except Exception as e:
-                    logger.warn("Failed to fetch remoted media %r", e)
-                    raise SynapseError(502, "Failed to fetch remoted media")
+                except twisted.internet.error.DNSLookupError as e:
+                    logger.warn("HTTP error fetching remote media %s/%s: %r",
+                                server_name, media_id, e)
+                    raise NotFoundError()
+
+                except HttpResponseException as e:
+                    logger.warn("HTTP error fetching remote media %s/%s: %s",
+                                server_name, media_id, e.response)
+                    if e.code == twisted.web.http.NOT_FOUND:
+                        raise SynapseError.from_http_response_exception(e)
+                    raise SynapseError(502, "Failed to fetch remote media")
+
+                except SynapseError:
+                    logger.exception("Failed to fetch remote media %s/%s",
+                                     server_name, media_id)
+                    raise
+
+                except Exception:
+                    logger.exception("Failed to fetch remote media %s/%s",
+                                     server_name, media_id)
+                    raise SynapseError(502, "Failed to fetch remote media")
 
             media_type = headers["Content-Type"][0]
             time_now_ms = self.clock.time_msec()
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index a7a8ec9b7b..13b106bba1 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -840,6 +840,47 @@ class SQLBaseStore(object):
 
         return txn.execute(sql, keyvalues.values())
 
+    def _simple_delete_many(self, table, column, iterable, keyvalues, desc):
+        return self.runInteraction(
+            desc, self._simple_delete_many_txn, table, column, iterable, keyvalues
+        )
+
+    @staticmethod
+    def _simple_delete_many_txn(txn, table, column, iterable, keyvalues):
+        """Executes a DELETE query on the named table.
+
+        Filters rows by if value of `column` is in `iterable`.
+
+        Args:
+            txn : Transaction object
+            table : string giving the table name
+            column : column name to test for inclusion against `iterable`
+            iterable : list
+            keyvalues : dict of column names and values to select the rows with
+        """
+        if not iterable:
+            return
+
+        sql = "DELETE FROM %s" % table
+
+        clauses = []
+        values = []
+        clauses.append(
+            "%s IN (%s)" % (column, ",".join("?" for _ in iterable))
+        )
+        values.extend(iterable)
+
+        for key, value in keyvalues.items():
+            clauses.append("%s = ?" % (key,))
+            values.append(value)
+
+        if clauses:
+            sql = "%s WHERE %s" % (
+                sql,
+                " AND ".join(clauses),
+            )
+        return txn.execute(sql, values)
+
     def _get_cache_dict(self, db_conn, table, entity_column, stream_column,
                         max_value, limit=100000):
         # Fetch a mapping of room_id -> max stream position for "recent" rooms.
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index 5c7db5e5f6..7925cb5f1b 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -357,12 +357,12 @@ class DeviceInboxStore(BackgroundUpdateStore):
         """
         Args:
             destination(str): The name of the remote server.
-            last_stream_id(int): The last position of the device message stream
+            last_stream_id(int|long): The last position of the device message stream
                 that the server sent up to.
-            current_stream_id(int): The current position of the device
+            current_stream_id(int|long): The current position of the device
                 message stream.
         Returns:
-            Deferred ([dict], int): List of messages for the device and where
+            Deferred ([dict], int|long): List of messages for the device and where
                 in the stream the messages got to.
         """
 
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index bd56ba2515..e545b62e39 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -108,6 +108,23 @@ class DeviceStore(SQLBaseStore):
             desc="delete_device",
         )
 
+    def delete_devices(self, user_id, device_ids):
+        """Deletes several devices.
+
+        Args:
+            user_id (str): The ID of the user which owns the devices
+            device_ids (list): The IDs of the devices to delete
+        Returns:
+            defer.Deferred
+        """
+        return self._simple_delete_many(
+            table="devices",
+            column="device_id",
+            iterable=device_ids,
+            keyvalues={"user_id": user_id},
+            desc="delete_devices",
+        )
+
     def update_device(self, user_id, device_id, new_display_name=None):
         """Update a device.
 
@@ -291,7 +308,7 @@ class DeviceStore(SQLBaseStore):
         """Get stream of updates to send to remote servers
 
         Returns:
-            (now_stream_id, [ { updates }, .. ])
+            (int, list[dict]): current stream id and list of updates
         """
         now_stream_id = self._device_list_id_gen.get_current_token()
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index db01eb6d14..72319c35ae 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -433,23 +433,43 @@ class EventsStore(SQLBaseStore):
         if not new_latest_event_ids:
             current_state = {}
         elif was_updated:
+            # We work out the current state by passing the state sets to the
+            # state resolution algorithm. It may ask for some events, including
+            # the events we have yet to persist, so we need a slightly more
+            # complicated event lookup function than simply looking the events
+            # up in the db.
+            events_map = {ev.event_id: ev for ev, _ in events_context}
+
+            @defer.inlineCallbacks
+            def get_events(ev_ids):
+                # We get the events by first looking at the list of events we
+                # are trying to persist, and then fetching the rest from the DB.
+                db = []
+                to_return = {}
+                for ev_id in ev_ids:
+                    ev = events_map.get(ev_id, None)
+                    if ev:
+                        to_return[ev_id] = ev
+                    else:
+                        db.append(ev_id)
+
+                if db:
+                    evs = yield self.get_events(
+                        ev_ids, get_prev_content=False, check_redacted=False,
+                    )
+                    to_return.update(evs)
+                defer.returnValue(to_return)
+
             current_state = yield resolve_events(
                 state_sets,
-                state_map_factory=lambda ev_ids: self.get_events(
-                    ev_ids, get_prev_content=False, check_redacted=False,
-                ),
+                state_map_factory=get_events,
             )
         else:
             return
 
-        existing_state_rows = yield self._simple_select_list(
-            table="current_state_events",
-            keyvalues={"room_id": room_id},
-            retcols=["event_id", "type", "state_key"],
-            desc="_calculate_state_delta",
-        )
+        existing_state = yield self.get_current_state_ids(room_id)
 
-        existing_events = set(row["event_id"] for row in existing_state_rows)
+        existing_events = set(existing_state.itervalues())
         new_events = set(ev_id for ev_id in current_state.itervalues())
         changed_events = existing_events ^ new_events
 
@@ -457,9 +477,8 @@ class EventsStore(SQLBaseStore):
             return
 
         to_delete = {
-            (row["type"], row["state_key"]): row["event_id"]
-            for row in existing_state_rows
-            if row["event_id"] in changed_events
+            key: ev_id for key, ev_id in existing_state.iteritems()
+            if ev_id in changed_events
         }
         events_to_insert = (new_events - existing_events)
         to_insert = {
@@ -585,6 +604,10 @@ class EventsStore(SQLBaseStore):
                     txn, self.get_users_in_room, (room_id,)
                 )
 
+                self._invalidate_cache_and_stream(
+                    txn, self.get_current_state_ids, (room_id,)
+                )
+
         for room_id, new_extrem in new_forward_extremeties.items():
             self._simple_delete_txn(
                 txn,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 545d3d3a99..e38d8927bf 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -274,24 +274,27 @@ class RoomMemberStore(SQLBaseStore):
 
         return rows
 
-    @cached(max_entries=500000, iterable=True)
+    @cachedInlineCallbacks(max_entries=500000, iterable=True)
     def get_rooms_for_user(self, user_id):
-        return self.get_rooms_for_user_where_membership_is(
+        """Returns a set of room_ids the user is currently joined to
+        """
+        rooms = yield self.get_rooms_for_user_where_membership_is(
             user_id, membership_list=[Membership.JOIN],
         )
+        defer.returnValue(frozenset(r.room_id for r in rooms))
 
     @cachedInlineCallbacks(max_entries=500000, cache_context=True, iterable=True)
     def get_users_who_share_room_with_user(self, user_id, cache_context):
         """Returns the set of users who share a room with `user_id`
         """
-        rooms = yield self.get_rooms_for_user(
+        room_ids = yield self.get_rooms_for_user(
             user_id, on_invalidate=cache_context.invalidate,
         )
 
         user_who_share_room = set()
-        for room in rooms:
+        for room_id in room_ids:
             user_ids = yield self.get_users_in_room(
-                room.room_id, on_invalidate=cache_context.invalidate,
+                room_id, on_invalidate=cache_context.invalidate,
             )
             user_who_share_room.update(user_ids)
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 84482d8285..27f1ec89ec 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
 from synapse.util.caches import intern_string
 from synapse.storage.engines import PostgresEngine
 
@@ -69,6 +69,18 @@ class StateStore(SQLBaseStore):
             where_clause="type='m.room.member'",
         )
 
+    @cachedInlineCallbacks(max_entries=100000, iterable=True)
+    def get_current_state_ids(self, room_id):
+        rows = yield self._simple_select_list(
+            table="current_state_events",
+            keyvalues={"room_id": room_id},
+            retcols=["event_id", "type", "state_key"],
+            desc="_calculate_state_delta",
+        )
+        defer.returnValue({
+            (r["type"], r["state_key"]): r["event_id"] for r in rows
+        })
+
     @defer.inlineCallbacks
     def get_state_groups_ids(self, room_id, event_ids):
         if not event_ids:
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 200d124632..dddd5fc0e7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -829,3 +829,6 @@ class StreamStore(SQLBaseStore):
             updatevalues={"stream_id": stream_id},
             desc="update_federation_out_pos",
         )
+
+    def has_room_changed_since(self, room_id, stream_id):
+        return self._events_stream_cache.has_entity_changed(room_id, stream_id)
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 46cf93ff87..95031dc9ec 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -30,6 +30,17 @@ class IdGenerator(object):
 
 
 def _load_current_id(db_conn, table, column, step=1):
+    """
+
+    Args:
+        db_conn (object):
+        table (str):
+        column (str):
+        step (int):
+
+    Returns:
+        int
+    """
     cur = db_conn.cursor()
     if step == 1:
         cur.execute("SELECT MAX(%s) FROM %s" % (column, table,))
@@ -131,6 +142,9 @@ class StreamIdGenerator(object):
     def get_current_token(self):
         """Returns the maximum stream id such that all stream ids less than or
         equal to it have been successfully persisted.
+
+        Returns:
+            int
         """
         with self._lock:
             if self._unfinished_ids:
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index b72bb0ff02..70fe00ce0b 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -50,7 +50,7 @@ class StreamChangeCache(object):
     def has_entity_changed(self, entity, stream_pos):
         """Returns True if the entity may have been updated since stream_pos
         """
-        assert type(stream_pos) is int
+        assert type(stream_pos) is int or type(stream_pos) is long
 
         if stream_pos < self._earliest_known_stream_pos:
             self.metrics.inc_misses()
diff --git a/synapse/util/msisdn.py b/synapse/util/msisdn.py
new file mode 100644
index 0000000000..607161e7f0
--- /dev/null
+++ b/synapse/util/msisdn.py
@@ -0,0 +1,40 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import phonenumbers
+from synapse.api.errors import SynapseError
+
+
+def phone_number_to_msisdn(country, number):
+    """
+    Takes an ISO-3166-1 2 letter country code and phone number and
+    returns an msisdn representing the canonical version of that
+    phone number.
+    Args:
+        country (str): ISO-3166-1 2 letter country code
+        number (str): Phone number in a national or international format
+
+    Returns:
+        (str) The canonical form of the phone number, as an msisdn
+    Raises:
+            SynapseError if the number could not be parsed.
+    """
+    try:
+        phoneNumber = phonenumbers.parse(number, country)
+    except phonenumbers.NumberParseException:
+        raise SynapseError(400, "Unable to parse phone number")
+    return phonenumbers.format_number(
+        phoneNumber, phonenumbers.PhoneNumberFormat.E164
+    )[1:]