summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-10-10 13:10:57 +0100
committerErik Johnston <erik@matrix.org>2019-10-10 13:10:57 +0100
commit91f43dca3939cd6ccbc90d19a284d00f6e14fd7c (patch)
treecb15ddf14a01a0e67b9eaf91b482d65bf2057f4e /synapse/handlers
parentNewsfile (diff)
parentAdd domain validation when creating room with list of invitees (#6121) (diff)
downloadsynapse-91f43dca3939cd6ccbc90d19a284d00f6e14fd7c.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/disable_sql_bytes
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/deactivate_account.py37
-rw-r--r--synapse/handlers/federation.py9
-rw-r--r--synapse/handlers/identity.py353
-rw-r--r--synapse/handlers/presence.py16
-rw-r--r--synapse/handlers/register.py10
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/handlers/room_list.py341
-rw-r--r--synapse/handlers/room_member.py432
-rw-r--r--synapse/handlers/stats.py13
-rw-r--r--synapse/handlers/user_directory.py17
10 files changed, 585 insertions, 647 deletions
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index d83912c9a4..63267a0a4c 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -120,6 +120,10 @@ class DeactivateAccountHandler(BaseHandler):
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
 
+        # Reject all pending invites for the user, so that the user doesn't show up in the
+        # "invited" section of rooms' members list.
+        yield self._reject_pending_invites_for_user(user_id)
+
         # Remove all information on the user from the account_validity table.
         if self._account_validity_enabled:
             yield self.store.delete_account_validity_for_user(user_id)
@@ -129,6 +133,39 @@ class DeactivateAccountHandler(BaseHandler):
 
         return identity_server_supports_unbinding
 
+    @defer.inlineCallbacks
+    def _reject_pending_invites_for_user(self, user_id):
+        """Reject pending invites addressed to a given user ID.
+
+        Args:
+            user_id (str): The user ID to reject pending invites for.
+        """
+        user = UserID.from_string(user_id)
+        pending_invites = yield self.store.get_invited_rooms_for_user(user_id)
+
+        for room in pending_invites:
+            try:
+                yield self._room_member_handler.update_membership(
+                    create_requester(user),
+                    user,
+                    room.room_id,
+                    "leave",
+                    ratelimit=False,
+                    require_consent=False,
+                )
+                logger.info(
+                    "Rejected invite for deactivated user %r in room %r",
+                    user_id,
+                    room.room_id,
+                )
+            except Exception:
+                logger.exception(
+                    "Failed to reject invite for user %r in room %r:"
+                    " ignoring and continuing",
+                    user_id,
+                    room.room_id,
+                )
+
     def _start_user_parting(self):
         """
         Start the process that goes through the table of users
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f72b81d419..50fc0fde2a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2570,7 +2570,7 @@ class FederationHandler(BaseHandler):
         )
 
         try:
-            self.auth.check_from_context(room_version, event, context)
+            yield self.auth.check_from_context(room_version, event, context)
         except AuthError as e:
             logger.warn("Denying third party invite %r because %s", event, e)
             raise e
@@ -2599,7 +2599,12 @@ class FederationHandler(BaseHandler):
                 original_invite_id, allow_none=True
             )
         if original_invite:
-            display_name = original_invite.content["display_name"]
+            # If the m.room.third_party_invite event's content is empty, it means the
+            # invite has been revoked. In this case, we don't have to raise an error here
+            # because the auth check will fail on the invite (because it's not able to
+            # fetch public keys from the m.room.third_party_invite event's content, which
+            # is empty).
+            display_name = original_invite.content.get("display_name")
             event_dict["content"]["third_party_invite"]["display_name"] = display_name
         else:
             logger.info(
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 6d42a1aed8..ba99ddf76d 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -21,11 +21,15 @@ import logging
 import urllib
 
 from canonicaljson import json
+from signedjson.key import decode_verify_key_bytes
+from signedjson.sign import verify_signed_json
+from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
 from twisted.internet.error import TimeoutError
 
 from synapse.api.errors import (
+    AuthError,
     CodeMessageException,
     Codes,
     HttpResponseException,
@@ -33,12 +37,15 @@ from synapse.api.errors import (
 )
 from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.http.client import SimpleHttpClient
+from synapse.util.hash import sha256_and_url_safe_base64
 from synapse.util.stringutils import random_string
 
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
+id_server_scheme = "https://"
+
 
 class IdentityHandler(BaseHandler):
     def __init__(self, hs):
@@ -557,6 +564,352 @@ class IdentityHandler(BaseHandler):
             logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
             raise SynapseError(400, "Error contacting the identity server")
 
+    @defer.inlineCallbacks
+    def lookup_3pid(self, id_server, medium, address, id_access_token=None):
+        """Looks up a 3pid in the passed identity server.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
+            id_access_token (str|None): The access token to authenticate to the identity
+                server with
+
+        Returns:
+            str|None: the matrix ID of the 3pid, or None if it is not recognized.
+        """
+        if id_access_token is not None:
+            try:
+                results = yield self._lookup_3pid_v2(
+                    id_server, id_access_token, medium, address
+                )
+                return results
+
+            except Exception as e:
+                # Catch HttpResponseExcept for a non-200 response code
+                # Check if this identity server does not know about v2 lookups
+                if isinstance(e, HttpResponseException) and e.code == 404:
+                    # This is an old identity server that does not yet support v2 lookups
+                    logger.warning(
+                        "Attempted v2 lookup on v1 identity server %s. Falling "
+                        "back to v1",
+                        id_server,
+                    )
+                else:
+                    logger.warning("Error when looking up hashing details: %s", e)
+                    return None
+
+        return (yield self._lookup_3pid_v1(id_server, medium, address))
+
+    @defer.inlineCallbacks
+    def _lookup_3pid_v1(self, id_server, medium, address):
+        """Looks up a 3pid in the passed identity server using v1 lookup.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
+
+        Returns:
+            str: the matrix ID of the 3pid, or None if it is not recognized.
+        """
+        try:
+            data = yield self.blacklisting_http_client.get_json(
+                "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
+                {"medium": medium, "address": address},
+            )
+
+            if "mxid" in data:
+                if "signatures" not in data:
+                    raise AuthError(401, "No signatures on 3pid binding")
+                yield self._verify_any_signature(data, id_server)
+                return data["mxid"]
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
+        except IOError as e:
+            logger.warning("Error from v1 identity server lookup: %s" % (e,))
+
+        return None
+
+    @defer.inlineCallbacks
+    def _lookup_3pid_v2(self, id_server, id_access_token, medium, address):
+        """Looks up a 3pid in the passed identity server using v2 lookup.
+
+        Args:
+            id_server (str): The server name (including port, if required)
+                of the identity server to use.
+            id_access_token (str): The access token to authenticate to the identity server with
+            medium (str): The type of the third party identifier (e.g. "email").
+            address (str): The third party identifier (e.g. "foo@example.com").
+
+        Returns:
+            Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
+        """
+        # Check what hashing details are supported by this identity server
+        try:
+            hash_details = yield self.blacklisting_http_client.get_json(
+                "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
+                {"access_token": id_access_token},
+            )
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
+
+        if not isinstance(hash_details, dict):
+            logger.warning(
+                "Got non-dict object when checking hash details of %s%s: %s",
+                id_server_scheme,
+                id_server,
+                hash_details,
+            )
+            raise SynapseError(
+                400,
+                "Non-dict object from %s%s during v2 hash_details request: %s"
+                % (id_server_scheme, id_server, hash_details),
+            )
+
+        # Extract information from hash_details
+        supported_lookup_algorithms = hash_details.get("algorithms")
+        lookup_pepper = hash_details.get("lookup_pepper")
+        if (
+            not supported_lookup_algorithms
+            or not isinstance(supported_lookup_algorithms, list)
+            or not lookup_pepper
+            or not isinstance(lookup_pepper, str)
+        ):
+            raise SynapseError(
+                400,
+                "Invalid hash details received from identity server %s%s: %s"
+                % (id_server_scheme, id_server, hash_details),
+            )
+
+        # Check if any of the supported lookup algorithms are present
+        if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
+            # Perform a hashed lookup
+            lookup_algorithm = LookupAlgorithm.SHA256
+
+            # Hash address, medium and the pepper with sha256
+            to_hash = "%s %s %s" % (address, medium, lookup_pepper)
+            lookup_value = sha256_and_url_safe_base64(to_hash)
+
+        elif LookupAlgorithm.NONE in supported_lookup_algorithms:
+            # Perform a non-hashed lookup
+            lookup_algorithm = LookupAlgorithm.NONE
+
+            # Combine together plaintext address and medium
+            lookup_value = "%s %s" % (address, medium)
+
+        else:
+            logger.warning(
+                "None of the provided lookup algorithms of %s are supported: %s",
+                id_server,
+                supported_lookup_algorithms,
+            )
+            raise SynapseError(
+                400,
+                "Provided identity server does not support any v2 lookup "
+                "algorithms that this homeserver supports.",
+            )
+
+        # Authenticate with identity server given the access token from the client
+        headers = {"Authorization": create_id_access_token_header(id_access_token)}
+
+        try:
+            lookup_results = yield self.blacklisting_http_client.post_json_get_json(
+                "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
+                {
+                    "addresses": [lookup_value],
+                    "algorithm": lookup_algorithm,
+                    "pepper": lookup_pepper,
+                },
+                headers=headers,
+            )
+        except TimeoutError:
+            raise SynapseError(500, "Timed out contacting identity server")
+        except Exception as e:
+            logger.warning("Error when performing a v2 3pid lookup: %s", e)
+            raise SynapseError(
+                500, "Unknown error occurred during identity server lookup"
+            )
+
+        # Check for a mapping from what we looked up to an MXID
+        if "mappings" not in lookup_results or not isinstance(
+            lookup_results["mappings"], dict
+        ):
+            logger.warning("No results from 3pid lookup")
+            return None
+
+        # Return the MXID if it's available, or None otherwise
+        mxid = lookup_results["mappings"].get(lookup_value)
+        return mxid
+
+    @defer.inlineCallbacks
+    def _verify_any_signature(self, data, server_hostname):
+        if server_hostname not in data["signatures"]:
+            raise AuthError(401, "No signature from server %s" % (server_hostname,))
+        for key_name, signature in data["signatures"][server_hostname].items():
+            try:
+                key_data = yield self.blacklisting_http_client.get_json(
+                    "%s%s/_matrix/identity/api/v1/pubkey/%s"
+                    % (id_server_scheme, server_hostname, key_name)
+                )
+            except TimeoutError:
+                raise SynapseError(500, "Timed out contacting identity server")
+            if "public_key" not in key_data:
+                raise AuthError(
+                    401, "No public key named %s from %s" % (key_name, server_hostname)
+                )
+            verify_signed_json(
+                data,
+                server_hostname,
+                decode_verify_key_bytes(
+                    key_name, decode_base64(key_data["public_key"])
+                ),
+            )
+            return
+
+    @defer.inlineCallbacks
+    def ask_id_server_for_third_party_invite(
+        self,
+        requester,
+        id_server,
+        medium,
+        address,
+        room_id,
+        inviter_user_id,
+        room_alias,
+        room_avatar_url,
+        room_join_rules,
+        room_name,
+        inviter_display_name,
+        inviter_avatar_url,
+        id_access_token=None,
+    ):
+        """
+        Asks an identity server for a third party invite.
+
+        Args:
+            requester (Requester)
+            id_server (str): hostname + optional port for the identity server.
+            medium (str): The literal string "email".
+            address (str): The third party address being invited.
+            room_id (str): The ID of the room to which the user is invited.
+            inviter_user_id (str): The user ID of the inviter.
+            room_alias (str): An alias for the room, for cosmetic notifications.
+            room_avatar_url (str): The URL of the room's avatar, for cosmetic
+                notifications.
+            room_join_rules (str): The join rules of the email (e.g. "public").
+            room_name (str): The m.room.name of the room.
+            inviter_display_name (str): The current display name of the
+                inviter.
+            inviter_avatar_url (str): The URL of the inviter's avatar.
+            id_access_token (str|None): The access token to authenticate to the identity
+                server with
+
+        Returns:
+            A deferred tuple containing:
+                token (str): The token which must be signed to prove authenticity.
+                public_keys ([{"public_key": str, "key_validity_url": str}]):
+                    public_key is a base64-encoded ed25519 public key.
+                fallback_public_key: One element from public_keys.
+                display_name (str): A user-friendly name to represent the invited
+                    user.
+        """
+        invite_config = {
+            "medium": medium,
+            "address": address,
+            "room_id": room_id,
+            "room_alias": room_alias,
+            "room_avatar_url": room_avatar_url,
+            "room_join_rules": room_join_rules,
+            "room_name": room_name,
+            "sender": inviter_user_id,
+            "sender_display_name": inviter_display_name,
+            "sender_avatar_url": inviter_avatar_url,
+        }
+
+        # Add the identity service access token to the JSON body and use the v2
+        # Identity Service endpoints if id_access_token is present
+        data = None
+        base_url = "%s%s/_matrix/identity" % (id_server_scheme, id_server)
+
+        if id_access_token:
+            key_validity_url = "%s%s/_matrix/identity/v2/pubkey/isvalid" % (
+                id_server_scheme,
+                id_server,
+            )
+
+            # Attempt a v2 lookup
+            url = base_url + "/v2/store-invite"
+            try:
+                data = yield self.blacklisting_http_client.post_json_get_json(
+                    url,
+                    invite_config,
+                    {"Authorization": create_id_access_token_header(id_access_token)},
+                )
+            except TimeoutError:
+                raise SynapseError(500, "Timed out contacting identity server")
+            except HttpResponseException as e:
+                if e.code != 404:
+                    logger.info("Failed to POST %s with JSON: %s", url, e)
+                    raise e
+
+        if data is None:
+            key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
+                id_server_scheme,
+                id_server,
+            )
+            url = base_url + "/api/v1/store-invite"
+
+            try:
+                data = yield self.blacklisting_http_client.post_json_get_json(
+                    url, invite_config
+                )
+            except TimeoutError:
+                raise SynapseError(500, "Timed out contacting identity server")
+            except HttpResponseException as e:
+                logger.warning(
+                    "Error trying to call /store-invite on %s%s: %s",
+                    id_server_scheme,
+                    id_server,
+                    e,
+                )
+
+            if data is None:
+                # Some identity servers may only support application/x-www-form-urlencoded
+                # types. This is especially true with old instances of Sydent, see
+                # https://github.com/matrix-org/sydent/pull/170
+                try:
+                    data = yield self.blacklisting_http_client.post_urlencoded_get_json(
+                        url, invite_config
+                    )
+                except HttpResponseException as e:
+                    logger.warning(
+                        "Error calling /store-invite on %s%s with fallback "
+                        "encoding: %s",
+                        id_server_scheme,
+                        id_server,
+                        e,
+                    )
+                    raise e
+
+        # TODO: Check for success
+        token = data["token"]
+        public_keys = data.get("public_keys", [])
+        if "public_key" in data:
+            fallback_public_key = {
+                "public_key": data["public_key"],
+                "key_validity_url": key_validity_url,
+            }
+        else:
+            fallback_public_key = public_keys[0]
+
+        if not public_keys:
+            public_keys.append(fallback_public_key)
+        display_name = data["display_name"]
+        return token, public_keys, fallback_public_key, display_name
+
 
 def create_id_access_token_header(id_access_token):
     """Create an Authorization header for passing to SimpleHttpClient as the header value
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 053cf66b28..2a5f1a007d 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -803,17 +803,25 @@ class PresenceHandler(object):
         # Loop round handling deltas until we're up to date
         while True:
             with Measure(self.clock, "presence_delta"):
-                deltas = yield self.store.get_current_state_deltas(self._event_pos)
-                if not deltas:
+                room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+                if self._event_pos == room_max_stream_ordering:
                     return
 
+                logger.debug(
+                    "Processing presence stats %s->%s",
+                    self._event_pos,
+                    room_max_stream_ordering,
+                )
+                max_pos, deltas = yield self.store.get_current_state_deltas(
+                    self._event_pos, room_max_stream_ordering
+                )
                 yield self._handle_state_delta(deltas)
 
-                self._event_pos = deltas[-1]["stream_id"]
+                self._event_pos = max_pos
 
                 # Expose current event processing position to prometheus
                 synapse.metrics.event_processing_positions.labels("presence").set(
-                    self._event_pos
+                    max_pos
                 )
 
     @defer.inlineCallbacks
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 06bd03b77c..53410f120b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -217,10 +217,9 @@ class RegistrationHandler(BaseHandler):
 
         else:
             # autogen a sequential user ID
-            attempts = 0
             user = None
             while not user:
-                localpart = yield self._generate_user_id(attempts > 0)
+                localpart = yield self._generate_user_id()
                 user = UserID(localpart, self.hs.hostname)
                 user_id = user.to_string()
                 yield self.check_user_id_not_appservice_exclusive(user_id)
@@ -238,7 +237,6 @@ class RegistrationHandler(BaseHandler):
                     # if user id is taken, just generate another
                     user = None
                     user_id = None
-                    attempts += 1
 
         if not self.hs.config.user_consent_at_registration:
             yield self._auto_join_rooms(user_id)
@@ -379,10 +377,10 @@ class RegistrationHandler(BaseHandler):
                 )
 
     @defer.inlineCallbacks
-    def _generate_user_id(self, reseed=False):
-        if reseed or self._next_generated_user_id is None:
+    def _generate_user_id(self):
+        if self._next_generated_user_id is None:
             with (yield self._generate_user_id_linearizer.queue(())):
-                if reseed or self._next_generated_user_id is None:
+                if self._next_generated_user_id is None:
                     self._next_generated_user_id = (
                         yield self.store.find_next_generated_user_id_localpart()
                     )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 970be3c846..2816bd8f87 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -28,6 +28,7 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.http.endpoint import parse_and_validate_server_name
 from synapse.storage.state import StateFilter
 from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
 from synapse.util import stringutils
@@ -554,7 +555,8 @@ class RoomCreationHandler(BaseHandler):
         invite_list = config.get("invite", [])
         for i in invite_list:
             try:
-                UserID.from_string(i)
+                uid = UserID.from_string(i)
+                parse_and_validate_server_name(uid.domain)
             except Exception:
                 raise SynapseError(400, "Invalid user_id: %s" % (i,))
 
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index a7e55f00e5..c615206df1 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -16,8 +16,7 @@
 import logging
 from collections import namedtuple
 
-from six import PY3, iteritems
-from six.moves import range
+from six import iteritems
 
 import msgpack
 from unpaddedbase64 import decode_base64, encode_base64
@@ -27,7 +26,6 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.api.errors import Codes, HttpResponseException
 from synapse.types import ThirdPartyInstanceID
-from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
 
@@ -37,7 +35,6 @@ logger = logging.getLogger(__name__)
 
 REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
 
-
 # This is used to indicate we should only return rooms published to the main list.
 EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 
@@ -72,6 +69,8 @@ class RoomListHandler(BaseHandler):
                 This can be (None, None) to indicate the main list, or a particular
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
+            from_federation (bool): true iff the request comes from the federation
+                API
         """
         if not self.enable_room_list_search:
             return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
@@ -89,16 +88,8 @@ class RoomListHandler(BaseHandler):
             # appservice specific lists.
             logger.info("Bypassing cache as search request.")
 
-            # XXX: Quick hack to stop room directory queries taking too long.
-            # Timeout request after 60s. Probably want a more fundamental
-            # solution at some point
-            timeout = self.clock.time() + 60
             return self._get_public_room_list(
-                limit,
-                since_token,
-                search_filter,
-                network_tuple=network_tuple,
-                timeout=timeout,
+                limit, since_token, search_filter, network_tuple=network_tuple
             )
 
         key = (limit, since_token, network_tuple)
@@ -119,7 +110,6 @@ class RoomListHandler(BaseHandler):
         search_filter=None,
         network_tuple=EMPTY_THIRD_PARTY_ID,
         from_federation=False,
-        timeout=None,
     ):
         """Generate a public room list.
         Args:
@@ -132,240 +122,116 @@ class RoomListHandler(BaseHandler):
                 Setting to None returns all public rooms across all lists.
             from_federation (bool): Whether this request originated from a
                 federating server or a client. Used for room filtering.
-            timeout (int|None): Amount of seconds to wait for a response before
-                timing out.
         """
-        if since_token and since_token != "END":
-            since_token = RoomListNextBatch.from_token(since_token)
-        else:
-            since_token = None
 
-        rooms_to_order_value = {}
-        rooms_to_num_joined = {}
+        # Pagination tokens work by storing the room ID sent in the last batch,
+        # plus the direction (forwards or backwards). Next batch tokens always
+        # go forwards, prev batch tokens always go backwards.
 
-        newly_visible = []
-        newly_unpublished = []
         if since_token:
-            stream_token = since_token.stream_ordering
-            current_public_id = yield self.store.get_current_public_room_stream_id()
-            public_room_stream_id = since_token.public_room_stream_id
-            newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
-                public_room_stream_id, current_public_id, network_tuple=network_tuple
-            )
-        else:
-            stream_token = yield self.store.get_room_max_stream_ordering()
-            public_room_stream_id = yield self.store.get_current_public_room_stream_id()
-
-        room_ids = yield self.store.get_public_room_ids_at_stream_id(
-            public_room_stream_id, network_tuple=network_tuple
-        )
-
-        # We want to return rooms in a particular order: the number of joined
-        # users. We then arbitrarily use the room_id as a tie breaker.
-
-        @defer.inlineCallbacks
-        def get_order_for_room(room_id):
-            # Most of the rooms won't have changed between the since token and
-            # now (especially if the since token is "now"). So, we can ask what
-            # the current users are in a room (that will hit a cache) and then
-            # check if the room has changed since the since token. (We have to
-            # do it in that order to avoid races).
-            # If things have changed then fall back to getting the current state
-            # at the since token.
-            joined_users = yield self.store.get_users_in_room(room_id)
-            if self.store.has_room_changed_since(room_id, stream_token):
-                latest_event_ids = yield self.store.get_forward_extremeties_for_room(
-                    room_id, stream_token
-                )
-
-                if not latest_event_ids:
-                    return
+            batch_token = RoomListNextBatch.from_token(since_token)
 
-                joined_users = yield self.state_handler.get_current_users_in_room(
-                    room_id, latest_event_ids
-                )
-
-            num_joined_users = len(joined_users)
-            rooms_to_num_joined[room_id] = num_joined_users
+            bounds = (batch_token.last_joined_members, batch_token.last_room_id)
+            forwards = batch_token.direction_is_forward
+        else:
+            batch_token = None
+            bounds = None
 
-            if num_joined_users == 0:
-                return
+            forwards = True
 
-            # We want larger rooms to be first, hence negating num_joined_users
-            rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+        # we request one more than wanted to see if there are more pages to come
+        probing_limit = limit + 1 if limit is not None else None
 
-        logger.info(
-            "Getting ordering for %i rooms since %s", len(room_ids), stream_token
+        results = yield self.store.get_largest_public_rooms(
+            network_tuple,
+            search_filter,
+            probing_limit,
+            bounds=bounds,
+            forwards=forwards,
+            ignore_non_federatable=from_federation,
         )
-        yield concurrently_execute(get_order_for_room, room_ids, 10)
 
-        sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
-        sorted_rooms = [room_id for room_id, _ in sorted_entries]
+        def build_room_entry(room):
+            entry = {
+                "room_id": room["room_id"],
+                "name": room["name"],
+                "topic": room["topic"],
+                "canonical_alias": room["canonical_alias"],
+                "num_joined_members": room["joined_members"],
+                "avatar_url": room["avatar"],
+                "world_readable": room["history_visibility"] == "world_readable",
+                "guest_can_join": room["guest_access"] == "can_join",
+            }
 
-        # `sorted_rooms` should now be a list of all public room ids that is
-        # stable across pagination. Therefore, we can use indices into this
-        # list as our pagination tokens.
+            # Filter out Nones – rather omit the field altogether
+            return {k: v for k, v in entry.items() if v is not None}
 
-        # Filter out rooms that we don't want to return
-        rooms_to_scan = [
-            r
-            for r in sorted_rooms
-            if r not in newly_unpublished and rooms_to_num_joined[r] > 0
-        ]
+        results = [build_room_entry(r) for r in results]
 
-        total_room_count = len(rooms_to_scan)
+        response = {}
+        num_results = len(results)
+        if limit is not None:
+            more_to_come = num_results == probing_limit
 
-        if since_token:
-            # Filter out rooms we've already returned previously
-            # `since_token.current_limit` is the index of the last room we
-            # sent down, so we exclude it and everything before/after it.
-            if since_token.direction_is_forward:
-                rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
+            # Depending on direction we trim either the front or back.
+            if forwards:
+                results = results[:limit]
             else:
-                rooms_to_scan = rooms_to_scan[: since_token.current_limit]
-                rooms_to_scan.reverse()
-
-        logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
-
-        # _append_room_entry_to_chunk will append to chunk but will stop if
-        # len(chunk) > limit
-        #
-        # Normally we will generate enough results on the first iteration here,
-        #  but if there is a search filter, _append_room_entry_to_chunk may
-        # filter some results out, in which case we loop again.
-        #
-        # We don't want to scan over the entire range either as that
-        # would potentially waste a lot of work.
-        #
-        # XXX if there is no limit, we may end up DoSing the server with
-        # calls to get_current_state_ids for every single room on the
-        # server. Surely we should cap this somehow?
-        #
-        if limit:
-            step = limit + 1
+                results = results[-limit:]
         else:
-            # step cannot be zero
-            step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
-
-        chunk = []
-        for i in range(0, len(rooms_to_scan), step):
-            if timeout and self.clock.time() > timeout:
-                raise Exception("Timed out searching room directory")
-
-            batch = rooms_to_scan[i : i + step]
-            logger.info("Processing %i rooms for result", len(batch))
-            yield concurrently_execute(
-                lambda r: self._append_room_entry_to_chunk(
-                    r,
-                    rooms_to_num_joined[r],
-                    chunk,
-                    limit,
-                    search_filter,
-                    from_federation=from_federation,
-                ),
-                batch,
-                5,
-            )
-            logger.info("Now %i rooms in result", len(chunk))
-            if len(chunk) >= limit + 1:
-                break
-
-        chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
-
-        # Work out the new limit of the batch for pagination, or None if we
-        # know there are no more results that would be returned.
-        # i.e., [since_token.current_limit..new_limit] is the batch of rooms
-        # we've returned (or the reverse if we paginated backwards)
-        # We tried to pull out limit + 1 rooms above, so if we have <= limit
-        # then we know there are no more results to return
-        new_limit = None
-        if chunk and (not limit or len(chunk) > limit):
-
-            if not since_token or since_token.direction_is_forward:
-                if limit:
-                    chunk = chunk[:limit]
-                last_room_id = chunk[-1]["room_id"]
+            more_to_come = False
+
+        if num_results > 0:
+            final_entry = results[-1]
+            initial_entry = results[0]
+
+            if forwards:
+                if batch_token:
+                    # If there was a token given then we assume that there
+                    # must be previous results.
+                    response["prev_batch"] = RoomListNextBatch(
+                        last_joined_members=initial_entry["num_joined_members"],
+                        last_room_id=initial_entry["room_id"],
+                        direction_is_forward=False,
+                    ).to_token()
+
+                if more_to_come:
+                    response["next_batch"] = RoomListNextBatch(
+                        last_joined_members=final_entry["num_joined_members"],
+                        last_room_id=final_entry["room_id"],
+                        direction_is_forward=True,
+                    ).to_token()
             else:
-                if limit:
-                    chunk = chunk[-limit:]
-                last_room_id = chunk[0]["room_id"]
-
-            new_limit = sorted_rooms.index(last_room_id)
-
-        results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
-
-        if since_token:
-            results["new_rooms"] = bool(newly_visible)
-
-        if not since_token or since_token.direction_is_forward:
-            if new_limit is not None:
-                results["next_batch"] = RoomListNextBatch(
-                    stream_ordering=stream_token,
-                    public_room_stream_id=public_room_stream_id,
-                    current_limit=new_limit,
-                    direction_is_forward=True,
-                ).to_token()
-
-            if since_token:
-                results["prev_batch"] = since_token.copy_and_replace(
-                    direction_is_forward=False,
-                    current_limit=since_token.current_limit + 1,
-                ).to_token()
-        else:
-            if new_limit is not None:
-                results["prev_batch"] = RoomListNextBatch(
-                    stream_ordering=stream_token,
-                    public_room_stream_id=public_room_stream_id,
-                    current_limit=new_limit,
-                    direction_is_forward=False,
-                ).to_token()
-
-            if since_token:
-                results["next_batch"] = since_token.copy_and_replace(
-                    direction_is_forward=True,
-                    current_limit=since_token.current_limit - 1,
-                ).to_token()
-
-        return results
-
-    @defer.inlineCallbacks
-    def _append_room_entry_to_chunk(
-        self,
-        room_id,
-        num_joined_users,
-        chunk,
-        limit,
-        search_filter,
-        from_federation=False,
-    ):
-        """Generate the entry for a room in the public room list and append it
-        to the `chunk` if it matches the search filter
-
-        Args:
-            room_id (str): The ID of the room.
-            num_joined_users (int): The number of joined users in the room.
-            chunk (list)
-            limit (int|None): Maximum amount of rooms to display. Function will
-                return if length of chunk is greater than limit + 1.
-            search_filter (dict|None)
-            from_federation (bool): Whether this request originated from a
-                federating server or a client. Used for room filtering.
-        """
-        if limit and len(chunk) > limit + 1:
-            # We've already got enough, so lets just drop it.
-            return
+                if batch_token:
+                    response["next_batch"] = RoomListNextBatch(
+                        last_joined_members=final_entry["num_joined_members"],
+                        last_room_id=final_entry["room_id"],
+                        direction_is_forward=True,
+                    ).to_token()
+
+                if more_to_come:
+                    response["prev_batch"] = RoomListNextBatch(
+                        last_joined_members=initial_entry["num_joined_members"],
+                        last_room_id=initial_entry["room_id"],
+                        direction_is_forward=False,
+                    ).to_token()
+
+        for room in results:
+            # populate search result entries with additional fields, namely
+            # 'aliases'
+            room_id = room["room_id"]
+
+            aliases = yield self.store.get_aliases_for_room(room_id)
+            if aliases:
+                room["aliases"] = aliases
 
-        result = yield self.generate_room_entry(room_id, num_joined_users)
-        if not result:
-            return
+        response["chunk"] = results
 
-        if from_federation and not result.get("m.federate", True):
-            # This is a room that other servers cannot join. Do not show them
-            # this room.
-            return
+        response["total_room_count_estimate"] = yield self.store.count_public_rooms(
+            network_tuple, ignore_non_federatable=from_federation
+        )
 
-        if _matches_room_entry(result, search_filter):
-            chunk.append(result)
+        return response
 
     @cachedInlineCallbacks(num_args=1, cache_context=True)
     def generate_room_entry(
@@ -580,18 +446,15 @@ class RoomListNextBatch(
     namedtuple(
         "RoomListNextBatch",
         (
-            "stream_ordering",  # stream_ordering of the first public room list
-            "public_room_stream_id",  # public room stream id for first public room list
-            "current_limit",  # The number of previous rooms returned
+            "last_joined_members",  # The count to get rooms after/before
+            "last_room_id",  # The room_id to get rooms after/before
             "direction_is_forward",  # Bool if this is a next_batch, false if prev_batch
         ),
     )
 ):
-
     KEY_DICT = {
-        "stream_ordering": "s",
-        "public_room_stream_id": "p",
-        "current_limit": "n",
+        "last_joined_members": "m",
+        "last_room_id": "r",
         "direction_is_forward": "d",
     }
 
@@ -599,13 +462,7 @@ class RoomListNextBatch(
 
     @classmethod
     def from_token(cls, token):
-        if PY3:
-            # The argument raw=False is only available on new versions of
-            # msgpack, and only really needed on Python 3. Gate it behind
-            # a PY3 check to avoid causing issues on Debian-packaged versions.
-            decoded = msgpack.loads(decode_base64(token), raw=False)
-        else:
-            decoded = msgpack.loads(decode_base64(token))
+        decoded = msgpack.loads(decode_base64(token), raw=False)
         return RoomListNextBatch(
             **{cls.REVERSE_KEY_DICT[key]: val for key, val in decoded.items()}
         )
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 94cd0cf3ef..380e2fad5e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -20,29 +20,19 @@ import logging
 
 from six.moves import http_client
 
-from signedjson.key import decode_verify_key_bytes
-from signedjson.sign import verify_signed_json
-from unpaddedbase64 import decode_base64
-
 from twisted.internet import defer
-from twisted.internet.error import TimeoutError
 
 from synapse import types
 from synapse.api.constants import EventTypes, Membership
-from synapse.api.errors import AuthError, Codes, HttpResponseException, SynapseError
-from synapse.handlers.identity import LookupAlgorithm, create_id_access_token_header
-from synapse.http.client import SimpleHttpClient
+from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.types import RoomID, UserID
 from synapse.util.async_helpers import Linearizer
 from synapse.util.distributor import user_joined_room, user_left_room
-from synapse.util.hash import sha256_and_url_safe_base64
 
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
-id_server_scheme = "https://"
-
 
 class RoomMemberHandler(object):
     # TODO(paul): This handler currently contains a messy conflation of
@@ -63,14 +53,10 @@ class RoomMemberHandler(object):
         self.auth = hs.get_auth()
         self.state_handler = hs.get_state_handler()
         self.config = hs.config
-        # We create a blacklisting instance of SimpleHttpClient for contacting identity
-        # servers specified by clients
-        self.simple_http_client = SimpleHttpClient(
-            hs, ip_blacklist=hs.config.federation_ip_range_blacklist
-        )
 
         self.federation_handler = hs.get_handlers().federation_handler
         self.directory_handler = hs.get_handlers().directory_handler
+        self.identity_handler = hs.get_handlers().identity_handler
         self.registration_handler = hs.get_registration_handler()
         self.profile_handler = hs.get_profile_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
@@ -217,23 +203,11 @@ class RoomMemberHandler(object):
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
                 newly_joined = prev_member_event.membership != Membership.JOIN
             if newly_joined:
-                yield self._user_joined_room(target, room_id)
-
-            # Copy over direct message status and room tags if this is a join
-            # on an upgraded room
-
-            # Check if this is an upgraded room
-            predecessor = yield self.store.get_room_predecessor(room_id)
-
-            if predecessor:
-                # It is an upgraded room. Copy over old tags
-                self.copy_room_tags_and_direct_to_room(
-                    predecessor["room_id"], room_id, user_id
-                )
-                # Move over old push rules
-                self.store.move_push_rules_from_room_to_room_for_user(
-                    predecessor["room_id"], room_id, user_id
+                # Copy over user state if we're joining an upgraded room
+                yield self.copy_user_state_if_room_upgrade(
+                    room_id, requester.user.to_string()
                 )
+                yield self._user_joined_room(target, room_id)
         elif event.membership == Membership.LEAVE:
             if prev_member_event_id:
                 prev_member_event = yield self.store.get_event(prev_member_event_id)
@@ -477,10 +451,16 @@ class RoomMemberHandler(object):
                 if requester.is_guest:
                     content["kind"] = "guest"
 
-                ret = yield self._remote_join(
+                remote_join_response = yield self._remote_join(
                     requester, remote_room_hosts, room_id, target, content
                 )
-                return ret
+
+                # Copy over user state if this is a join on an remote upgraded room
+                yield self.copy_user_state_if_room_upgrade(
+                    room_id, requester.user.to_string()
+                )
+
+                return remote_join_response
 
         elif effective_membership_state == Membership.LEAVE:
             if not is_host_in_room:
@@ -518,6 +498,38 @@ class RoomMemberHandler(object):
         return res
 
     @defer.inlineCallbacks
+    def copy_user_state_if_room_upgrade(self, new_room_id, user_id):
+        """Copy user-specific information when they join a new room if that new room is the
+        result of a room upgrade
+
+        Args:
+            new_room_id (str): The ID of the room the user is joining
+            user_id (str): The ID of the user
+
+        Returns:
+            Deferred
+        """
+        # Check if the new room is an upgraded room
+        predecessor = yield self.store.get_room_predecessor(new_room_id)
+        if not predecessor:
+            return
+
+        logger.debug(
+            "Found predecessor for %s: %s. Copying over room tags and push " "rules",
+            new_room_id,
+            predecessor,
+        )
+
+        # It is an upgraded room. Copy over old tags
+        yield self.copy_room_tags_and_direct_to_room(
+            predecessor["room_id"], new_room_id, user_id
+        )
+        # Copy over push rules
+        yield self.store.copy_push_rules_from_room_to_room_for_user(
+            predecessor["room_id"], new_room_id, user_id
+        )
+
+    @defer.inlineCallbacks
     def send_membership_event(self, requester, event, context, ratelimit=True):
         """
         Change the membership status of a user in a room.
@@ -682,7 +694,9 @@ class RoomMemberHandler(object):
                 403, "Looking up third-party identifiers is denied from this server"
             )
 
-        invitee = yield self._lookup_3pid(id_server, medium, address, id_access_token)
+        invitee = yield self.identity_handler.lookup_3pid(
+            id_server, medium, address, id_access_token
+        )
 
         if invitee:
             yield self.update_membership(
@@ -701,211 +715,6 @@ class RoomMemberHandler(object):
             )
 
     @defer.inlineCallbacks
-    def _lookup_3pid(self, id_server, medium, address, id_access_token=None):
-        """Looks up a 3pid in the passed identity server.
-
-        Args:
-            id_server (str): The server name (including port, if required)
-                of the identity server to use.
-            medium (str): The type of the third party identifier (e.g. "email").
-            address (str): The third party identifier (e.g. "foo@example.com").
-            id_access_token (str|None): The access token to authenticate to the identity
-                server with
-
-        Returns:
-            str|None: the matrix ID of the 3pid, or None if it is not recognized.
-        """
-        if id_access_token is not None:
-            try:
-                results = yield self._lookup_3pid_v2(
-                    id_server, id_access_token, medium, address
-                )
-                return results
-
-            except Exception as e:
-                # Catch HttpResponseExcept for a non-200 response code
-                # Check if this identity server does not know about v2 lookups
-                if isinstance(e, HttpResponseException) and e.code == 404:
-                    # This is an old identity server that does not yet support v2 lookups
-                    logger.warning(
-                        "Attempted v2 lookup on v1 identity server %s. Falling "
-                        "back to v1",
-                        id_server,
-                    )
-                else:
-                    logger.warning("Error when looking up hashing details: %s", e)
-                    return None
-
-        return (yield self._lookup_3pid_v1(id_server, medium, address))
-
-    @defer.inlineCallbacks
-    def _lookup_3pid_v1(self, id_server, medium, address):
-        """Looks up a 3pid in the passed identity server using v1 lookup.
-
-        Args:
-            id_server (str): The server name (including port, if required)
-                of the identity server to use.
-            medium (str): The type of the third party identifier (e.g. "email").
-            address (str): The third party identifier (e.g. "foo@example.com").
-
-        Returns:
-            str: the matrix ID of the 3pid, or None if it is not recognized.
-        """
-        try:
-            data = yield self.simple_http_client.get_json(
-                "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
-                {"medium": medium, "address": address},
-            )
-
-            if "mxid" in data:
-                if "signatures" not in data:
-                    raise AuthError(401, "No signatures on 3pid binding")
-                yield self._verify_any_signature(data, id_server)
-                return data["mxid"]
-        except TimeoutError:
-            raise SynapseError(500, "Timed out contacting identity server")
-        except IOError as e:
-            logger.warning("Error from v1 identity server lookup: %s" % (e,))
-
-        return None
-
-    @defer.inlineCallbacks
-    def _lookup_3pid_v2(self, id_server, id_access_token, medium, address):
-        """Looks up a 3pid in the passed identity server using v2 lookup.
-
-        Args:
-            id_server (str): The server name (including port, if required)
-                of the identity server to use.
-            id_access_token (str): The access token to authenticate to the identity server with
-            medium (str): The type of the third party identifier (e.g. "email").
-            address (str): The third party identifier (e.g. "foo@example.com").
-
-        Returns:
-            Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
-        """
-        # Check what hashing details are supported by this identity server
-        try:
-            hash_details = yield self.simple_http_client.get_json(
-                "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
-                {"access_token": id_access_token},
-            )
-        except TimeoutError:
-            raise SynapseError(500, "Timed out contacting identity server")
-
-        if not isinstance(hash_details, dict):
-            logger.warning(
-                "Got non-dict object when checking hash details of %s%s: %s",
-                id_server_scheme,
-                id_server,
-                hash_details,
-            )
-            raise SynapseError(
-                400,
-                "Non-dict object from %s%s during v2 hash_details request: %s"
-                % (id_server_scheme, id_server, hash_details),
-            )
-
-        # Extract information from hash_details
-        supported_lookup_algorithms = hash_details.get("algorithms")
-        lookup_pepper = hash_details.get("lookup_pepper")
-        if (
-            not supported_lookup_algorithms
-            or not isinstance(supported_lookup_algorithms, list)
-            or not lookup_pepper
-            or not isinstance(lookup_pepper, str)
-        ):
-            raise SynapseError(
-                400,
-                "Invalid hash details received from identity server %s%s: %s"
-                % (id_server_scheme, id_server, hash_details),
-            )
-
-        # Check if any of the supported lookup algorithms are present
-        if LookupAlgorithm.SHA256 in supported_lookup_algorithms:
-            # Perform a hashed lookup
-            lookup_algorithm = LookupAlgorithm.SHA256
-
-            # Hash address, medium and the pepper with sha256
-            to_hash = "%s %s %s" % (address, medium, lookup_pepper)
-            lookup_value = sha256_and_url_safe_base64(to_hash)
-
-        elif LookupAlgorithm.NONE in supported_lookup_algorithms:
-            # Perform a non-hashed lookup
-            lookup_algorithm = LookupAlgorithm.NONE
-
-            # Combine together plaintext address and medium
-            lookup_value = "%s %s" % (address, medium)
-
-        else:
-            logger.warning(
-                "None of the provided lookup algorithms of %s are supported: %s",
-                id_server,
-                supported_lookup_algorithms,
-            )
-            raise SynapseError(
-                400,
-                "Provided identity server does not support any v2 lookup "
-                "algorithms that this homeserver supports.",
-            )
-
-        # Authenticate with identity server given the access token from the client
-        headers = {"Authorization": create_id_access_token_header(id_access_token)}
-
-        try:
-            lookup_results = yield self.simple_http_client.post_json_get_json(
-                "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
-                {
-                    "addresses": [lookup_value],
-                    "algorithm": lookup_algorithm,
-                    "pepper": lookup_pepper,
-                },
-                headers=headers,
-            )
-        except TimeoutError:
-            raise SynapseError(500, "Timed out contacting identity server")
-        except Exception as e:
-            logger.warning("Error when performing a v2 3pid lookup: %s", e)
-            raise SynapseError(
-                500, "Unknown error occurred during identity server lookup"
-            )
-
-        # Check for a mapping from what we looked up to an MXID
-        if "mappings" not in lookup_results or not isinstance(
-            lookup_results["mappings"], dict
-        ):
-            logger.warning("No results from 3pid lookup")
-            return None
-
-        # Return the MXID if it's available, or None otherwise
-        mxid = lookup_results["mappings"].get(lookup_value)
-        return mxid
-
-    @defer.inlineCallbacks
-    def _verify_any_signature(self, data, server_hostname):
-        if server_hostname not in data["signatures"]:
-            raise AuthError(401, "No signature from server %s" % (server_hostname,))
-        for key_name, signature in data["signatures"][server_hostname].items():
-            try:
-                key_data = yield self.simple_http_client.get_json(
-                    "%s%s/_matrix/identity/api/v1/pubkey/%s"
-                    % (id_server_scheme, server_hostname, key_name)
-                )
-            except TimeoutError:
-                raise SynapseError(500, "Timed out contacting identity server")
-            if "public_key" not in key_data:
-                raise AuthError(
-                    401, "No public key named %s from %s" % (key_name, server_hostname)
-                )
-            verify_signed_json(
-                data,
-                server_hostname,
-                decode_verify_key_bytes(
-                    key_name, decode_base64(key_data["public_key"])
-                ),
-            )
-            return
-
-    @defer.inlineCallbacks
     def _make_and_store_3pid_invite(
         self,
         requester,
@@ -951,7 +760,7 @@ class RoomMemberHandler(object):
             room_avatar_url = room_avatar_event.content.get("url", "")
 
         token, public_keys, fallback_public_key, display_name = (
-            yield self._ask_id_server_for_third_party_invite(
+            yield self.identity_handler.ask_id_server_for_third_party_invite(
                 requester=requester,
                 id_server=id_server,
                 medium=medium,
@@ -988,147 +797,6 @@ class RoomMemberHandler(object):
         )
 
     @defer.inlineCallbacks
-    def _ask_id_server_for_third_party_invite(
-        self,
-        requester,
-        id_server,
-        medium,
-        address,
-        room_id,
-        inviter_user_id,
-        room_alias,
-        room_avatar_url,
-        room_join_rules,
-        room_name,
-        inviter_display_name,
-        inviter_avatar_url,
-        id_access_token=None,
-    ):
-        """
-        Asks an identity server for a third party invite.
-
-        Args:
-            requester (Requester)
-            id_server (str): hostname + optional port for the identity server.
-            medium (str): The literal string "email".
-            address (str): The third party address being invited.
-            room_id (str): The ID of the room to which the user is invited.
-            inviter_user_id (str): The user ID of the inviter.
-            room_alias (str): An alias for the room, for cosmetic notifications.
-            room_avatar_url (str): The URL of the room's avatar, for cosmetic
-                notifications.
-            room_join_rules (str): The join rules of the email (e.g. "public").
-            room_name (str): The m.room.name of the room.
-            inviter_display_name (str): The current display name of the
-                inviter.
-            inviter_avatar_url (str): The URL of the inviter's avatar.
-            id_access_token (str|None): The access token to authenticate to the identity
-                server with
-
-        Returns:
-            A deferred tuple containing:
-                token (str): The token which must be signed to prove authenticity.
-                public_keys ([{"public_key": str, "key_validity_url": str}]):
-                    public_key is a base64-encoded ed25519 public key.
-                fallback_public_key: One element from public_keys.
-                display_name (str): A user-friendly name to represent the invited
-                    user.
-        """
-        invite_config = {
-            "medium": medium,
-            "address": address,
-            "room_id": room_id,
-            "room_alias": room_alias,
-            "room_avatar_url": room_avatar_url,
-            "room_join_rules": room_join_rules,
-            "room_name": room_name,
-            "sender": inviter_user_id,
-            "sender_display_name": inviter_display_name,
-            "sender_avatar_url": inviter_avatar_url,
-        }
-
-        # Add the identity service access token to the JSON body and use the v2
-        # Identity Service endpoints if id_access_token is present
-        data = None
-        base_url = "%s%s/_matrix/identity" % (id_server_scheme, id_server)
-
-        if id_access_token:
-            key_validity_url = "%s%s/_matrix/identity/v2/pubkey/isvalid" % (
-                id_server_scheme,
-                id_server,
-            )
-
-            # Attempt a v2 lookup
-            url = base_url + "/v2/store-invite"
-            try:
-                data = yield self.simple_http_client.post_json_get_json(
-                    url,
-                    invite_config,
-                    {"Authorization": create_id_access_token_header(id_access_token)},
-                )
-            except TimeoutError:
-                raise SynapseError(500, "Timed out contacting identity server")
-            except HttpResponseException as e:
-                if e.code != 404:
-                    logger.info("Failed to POST %s with JSON: %s", url, e)
-                    raise e
-
-        if data is None:
-            key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
-                id_server_scheme,
-                id_server,
-            )
-            url = base_url + "/api/v1/store-invite"
-
-            try:
-                data = yield self.simple_http_client.post_json_get_json(
-                    url, invite_config
-                )
-            except TimeoutError:
-                raise SynapseError(500, "Timed out contacting identity server")
-            except HttpResponseException as e:
-                logger.warning(
-                    "Error trying to call /store-invite on %s%s: %s",
-                    id_server_scheme,
-                    id_server,
-                    e,
-                )
-
-            if data is None:
-                # Some identity servers may only support application/x-www-form-urlencoded
-                # types. This is especially true with old instances of Sydent, see
-                # https://github.com/matrix-org/sydent/pull/170
-                try:
-                    data = yield self.simple_http_client.post_urlencoded_get_json(
-                        url, invite_config
-                    )
-                except HttpResponseException as e:
-                    logger.warning(
-                        "Error calling /store-invite on %s%s with fallback "
-                        "encoding: %s",
-                        id_server_scheme,
-                        id_server,
-                        e,
-                    )
-                    raise e
-
-        # TODO: Check for success
-        token = data["token"]
-        public_keys = data.get("public_keys", [])
-        if "public_key" in data:
-            fallback_public_key = {
-                "public_key": data["public_key"],
-                "key_validity_url": key_validity_url,
-            }
-        else:
-            fallback_public_key = public_keys[0]
-
-        if not public_keys:
-            public_keys.append(fallback_public_key)
-        display_name = data["display_name"]
-        return token, public_keys, fallback_public_key, display_name
-
-    @defer.inlineCallbacks
     def _is_host_in_room(self, current_state_ids):
         # Have we just created the room, and is this about to be the very
         # first member event?
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index cbac7c347a..466daf9202 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -87,21 +87,23 @@ class StatsHandler(StateDeltasHandler):
             # Be sure to read the max stream_ordering *before* checking if there are any outstanding
             # deltas, since there is otherwise a chance that we could miss updates which arrive
             # after we check the deltas.
-            room_max_stream_ordering = yield self.store.get_room_max_stream_ordering()
+            room_max_stream_ordering = self.store.get_room_max_stream_ordering()
             if self.pos == room_max_stream_ordering:
                 break
 
-            deltas = yield self.store.get_current_state_deltas(self.pos)
+            logger.debug(
+                "Processing room stats %s->%s", self.pos, room_max_stream_ordering
+            )
+            max_pos, deltas = yield self.store.get_current_state_deltas(
+                self.pos, room_max_stream_ordering
+            )
 
             if deltas:
                 logger.debug("Handling %d state deltas", len(deltas))
                 room_deltas, user_deltas = yield self._handle_deltas(deltas)
-
-                max_pos = deltas[-1]["stream_id"]
             else:
                 room_deltas = {}
                 user_deltas = {}
-                max_pos = room_max_stream_ordering
 
             # Then count deltas for total_events and total_event_bytes.
             room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
@@ -293,6 +295,7 @@ class StatsHandler(StateDeltasHandler):
                 room_state["guest_access"] = event_content.get("guest_access")
 
         for room_id, state in room_to_state_updates.items():
+            logger.info("Updating room_stats_state for %s: %s", room_id, state)
             yield self.store.update_room_state(room_id, state)
 
         return room_to_stats_deltas, user_to_stats_deltas
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index e53669e40d..624f05ab5b 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -138,21 +138,28 @@ class UserDirectoryHandler(StateDeltasHandler):
         # Loop round handling deltas until we're up to date
         while True:
             with Measure(self.clock, "user_dir_delta"):
-                deltas = yield self.store.get_current_state_deltas(self.pos)
-                if not deltas:
+                room_max_stream_ordering = self.store.get_room_max_stream_ordering()
+                if self.pos == room_max_stream_ordering:
                     return
 
+                logger.debug(
+                    "Processing user stats %s->%s", self.pos, room_max_stream_ordering
+                )
+                max_pos, deltas = yield self.store.get_current_state_deltas(
+                    self.pos, room_max_stream_ordering
+                )
+
                 logger.info("Handling %d state deltas", len(deltas))
                 yield self._handle_deltas(deltas)
 
-                self.pos = deltas[-1]["stream_id"]
+                self.pos = max_pos
 
                 # Expose current event processing position to prometheus
                 synapse.metrics.event_processing_positions.labels("user_dir").set(
-                    self.pos
+                    max_pos
                 )
 
-                yield self.store.update_user_directory_stream_pos(self.pos)
+                yield self.store.update_user_directory_stream_pos(max_pos)
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):