| diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 08e13f3a3b..43cc56fa6f 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,6 +17,7 @@
 import collections
 import logging
 import re
+from typing import Optional, Tuple
 
 from canonicaljson import json
 
@@ -24,6 +26,7 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.search import SearchStore
+from synapse.types import ThirdPartyInstanceID
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
 logger = logging.getLogger(__name__)
@@ -63,103 +66,196 @@ class RoomWorkerStore(SQLBaseStore):
             desc="get_public_room_ids",
         )
 
-    @cached(num_args=2, max_entries=100)
-    def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
-        """Get pulbic rooms for a particular list, or across all lists.
+    def count_public_rooms(self, network_tuple, ignore_non_federatable):
+        """Counts the number of public rooms as tracked in the room_stats_current
+        and room_stats_state table.
 
         Args:
-            stream_id (int)
-            network_tuple (ThirdPartyInstanceID): The list to use (None, None)
-                means the main list, None means all lsits.
+            network_tuple (ThirdPartyInstanceID|None)
+            ignore_non_federatable (bool): If true filters out non-federatable rooms
         """
-        return self.runInteraction(
-            "get_public_room_ids_at_stream_id",
-            self.get_public_room_ids_at_stream_id_txn,
-            stream_id,
-            network_tuple=network_tuple,
-        )
-
-    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, network_tuple):
-        return {
-            rm
-            for rm, vis in self.get_published_at_stream_id_txn(
-                txn, stream_id, network_tuple=network_tuple
-            ).items()
-            if vis
-        }
 
-    def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
-        if network_tuple:
-            # We want to get from a particular list. No aggregation required.
+        def _count_public_rooms_txn(txn):
+            query_args = []
+
+            if network_tuple:
+                if network_tuple.appservice_id:
+                    published_sql = """
+                        SELECT room_id from appservice_room_list
+                        WHERE appservice_id = ? AND network_id = ?
+                    """
+                    query_args.append(network_tuple.appservice_id)
+                    query_args.append(network_tuple.network_id)
+                else:
+                    published_sql = """
+                        SELECT room_id FROM rooms WHERE is_public
+                    """
+            else:
+                published_sql = """
+                    SELECT room_id FROM rooms WHERE is_public
+                    UNION SELECT room_id from appservice_room_list
+            """
 
             sql = """
-                SELECT room_id, visibility FROM public_room_list_stream
-                INNER JOIN (
-                    SELECT room_id, max(stream_id) AS stream_id
-                    FROM public_room_list_stream
-                    WHERE stream_id <= ? %s
-                    GROUP BY room_id
-                ) grouped USING (room_id, stream_id)
+                SELECT
+                    COALESCE(COUNT(*), 0)
+                FROM (
+                    %(published_sql)s
+                ) published
+                INNER JOIN room_stats_state USING (room_id)
+                INNER JOIN room_stats_current USING (room_id)
+                WHERE
+                    (
+                        join_rules = 'public' OR history_visibility = 'world_readable'
+                    )
+                    AND joined_members > 0
+            """ % {
+                "published_sql": published_sql
+            }
+
+            txn.execute(sql, query_args)
+            return txn.fetchone()[0]
+
+        return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
+
+    @defer.inlineCallbacks
+    def get_largest_public_rooms(
+        self,
+        network_tuple: Optional[ThirdPartyInstanceID],
+        search_filter: Optional[dict],
+        limit: Optional[int],
+        bounds: Optional[Tuple[int, str]],
+        forwards: bool,
+        ignore_non_federatable: bool = False,
+    ):
+        """Gets the largest public rooms (where largest is in terms of joined
+        members, as tracked in the statistics table).
+
+        Args:
+            network_tuple
+            search_filter
+            limit: Maxmimum number of rows to return, unlimited otherwise.
+            bounds: An uppoer or lower bound to apply to result set if given,
+                consists of a joined member count and room_id (these are
+                excluded from result set).
+            forwards: true iff going forwards, going backwards otherwise
+            ignore_non_federatable: If true filters out non-federatable rooms.
+
+        Returns:
+            Rooms in order: biggest number of joined users first.
+            We then arbitrarily use the room_id as a tie breaker.
+
+        """
+
+        where_clauses = []
+        query_args = []
+
+        if network_tuple:
+            if network_tuple.appservice_id:
+                published_sql = """
+                    SELECT room_id from appservice_room_list
+                    WHERE appservice_id = ? AND network_id = ?
+                """
+                query_args.append(network_tuple.appservice_id)
+                query_args.append(network_tuple.network_id)
+            else:
+                published_sql = """
+                    SELECT room_id FROM rooms WHERE is_public
+                """
+        else:
+            published_sql = """
+                SELECT room_id FROM rooms WHERE is_public
+                UNION SELECT room_id from appservice_room_list
             """
 
-            if network_tuple.appservice_id is not None:
-                txn.execute(
-                    sql % ("AND appservice_id = ? AND network_id = ?",),
-                    (stream_id, network_tuple.appservice_id, network_tuple.network_id),
+        # Work out the bounds if we're given them, these bounds look slightly
+        # odd, but are designed to help query planner use indices by pulling
+        # out a common bound.
+        if bounds:
+            last_joined_members, last_room_id = bounds
+            if forwards:
+                where_clauses.append(
+                    """
+                        joined_members <= ? AND (
+                            joined_members < ? OR room_id < ?
+                        )
+                    """
                 )
             else:
-                txn.execute(sql % ("AND appservice_id IS NULL",), (stream_id,))
-            return dict(txn)
-        else:
-            # We want to get from all lists, so we need to aggregate the results
+                where_clauses.append(
+                    """
+                        joined_members >= ? AND (
+                            joined_members > ? OR room_id > ?
+                        )
+                    """
+                )
 
-            logger.info("Executing full list")
+            query_args += [last_joined_members, last_joined_members, last_room_id]
 
-            sql = """
-                SELECT room_id, visibility
-                FROM public_room_list_stream
-                INNER JOIN (
-                    SELECT
-                        room_id, max(stream_id) AS stream_id, appservice_id,
-                        network_id
-                    FROM public_room_list_stream
-                    WHERE stream_id <= ?
-                    GROUP BY room_id, appservice_id, network_id
-                ) grouped USING (room_id, stream_id)
-            """
+        if ignore_non_federatable:
+            where_clauses.append("is_federatable")
 
-            txn.execute(sql, (stream_id,))
+        if search_filter and search_filter.get("generic_search_term", None):
+            search_term = "%" + search_filter["generic_search_term"] + "%"
 
-            results = {}
-            # A room is visible if its visible on any list.
-            for room_id, visibility in txn:
-                results[room_id] = bool(visibility) or results.get(room_id, False)
+            where_clauses.append(
+                """
+                    (
+                        name LIKE ?
+                        OR topic LIKE ?
+                        OR canonical_alias LIKE ?
+                    )
+                """
+            )
+            query_args += [search_term, search_term, search_term]
+
+        where_clause = ""
+        if where_clauses:
+            where_clause = " AND " + " AND ".join(where_clauses)
+
+        sql = """
+            SELECT
+                room_id, name, topic, canonical_alias, joined_members,
+                avatar, history_visibility, joined_members, guest_access
+            FROM (
+                %(published_sql)s
+            ) published
+            INNER JOIN room_stats_state USING (room_id)
+            INNER JOIN room_stats_current USING (room_id)
+            WHERE
+                (
+                    join_rules = 'public' OR history_visibility = 'world_readable'
+                )
+                AND joined_members > 0
+                %(where_clause)s
+            ORDER BY joined_members %(dir)s, room_id %(dir)s
+        """ % {
+            "published_sql": published_sql,
+            "where_clause": where_clause,
+            "dir": "DESC" if forwards else "ASC",
+        }
 
-            return results
+        if limit is not None:
+            query_args.append(limit)
 
-    def get_public_room_changes(self, prev_stream_id, new_stream_id, network_tuple):
-        def get_public_room_changes_txn(txn):
-            then_rooms = self.get_public_room_ids_at_stream_id_txn(
-                txn, prev_stream_id, network_tuple
-            )
+            sql += """
+                LIMIT ?
+            """
 
-            now_rooms_dict = self.get_published_at_stream_id_txn(
-                txn, new_stream_id, network_tuple
-            )
+        def _get_largest_public_rooms_txn(txn):
+            txn.execute(sql, query_args)
 
-            now_rooms_visible = set(rm for rm, vis in now_rooms_dict.items() if vis)
-            now_rooms_not_visible = set(
-                rm for rm, vis in now_rooms_dict.items() if not vis
-            )
+            results = self.cursor_to_dict(txn)
 
-            newly_visible = now_rooms_visible - then_rooms
-            newly_unpublished = now_rooms_not_visible & then_rooms
+            if not forwards:
+                results.reverse()
 
-            return newly_visible, newly_unpublished
+            return results
 
-        return self.runInteraction(
-            "get_public_room_changes", get_public_room_changes_txn
+        ret_val = yield self.runInteraction(
+            "get_largest_public_rooms", _get_largest_public_rooms_txn
         )
+        defer.returnValue(ret_val)
 
     @cached(max_entries=10000)
     def is_room_blocked(self, room_id):
 |