summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-09-15 13:27:09 +0100
committerGitHub <noreply@github.com>2016-09-15 13:27:09 +0100
commite457034e99d46d32e3d71eddb12cda07d8a0370e (patch)
tree2b3cc4cbf5a17a6de91c6f07e47b038b22186921
parentMerge pull request #1117 from matrix-org/erikj/fix_state (diff)
parentBy default limit /publicRooms to 100 entries (diff)
downloadsynapse-e457034e99d46d32e3d71eddb12cda07d8a0370e.tar.xz
Merge pull request #1121 from matrix-org/erikj/public_room_paginate
Add pagination support to publicRooms
-rw-r--r--synapse/config/server.py9
-rw-r--r--synapse/federation/federation_client.py22
-rw-r--r--synapse/federation/transport/client.py9
-rw-r--r--synapse/federation/transport/server.py10
-rw-r--r--synapse/handlers/room_list.py280
-rw-r--r--synapse/http/servlet.py18
-rw-r--r--synapse/python_dependencies.py1
-rw-r--r--synapse/replication/resource.py20
-rw-r--r--synapse/replication/slave/storage/events.py8
-rw-r--r--synapse/replication/slave/storage/room.py31
-rw-r--r--synapse/rest/client/v1/room.py18
-rw-r--r--synapse/state.py5
-rw-r--r--synapse/storage/__init__.py3
-rw-r--r--synapse/storage/event_federation.py81
-rw-r--r--synapse/storage/room.py146
-rw-r--r--synapse/storage/schema/delta/35/public_room_list_change_stream.sql33
-rw-r--r--synapse/storage/schema/delta/35/stream_order_to_extrem.sql37
-rw-r--r--synapse/storage/stream.py3
18 files changed, 595 insertions, 139 deletions
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 51eaf423ce..ed5417d0c3 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -29,7 +29,6 @@ class ServerConfig(Config):
         self.user_agent_suffix = config.get("user_agent_suffix")
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
         self.public_baseurl = config.get("public_baseurl")
-        self.secondary_directory_servers = config.get("secondary_directory_servers", [])
 
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != '/':
@@ -142,14 +141,6 @@ class ServerConfig(Config):
         # The GC threshold parameters to pass to `gc.set_threshold`, if defined
         # gc_thresholds: [700, 10, 10]
 
-        # A list of other Home Servers to fetch the public room directory from
-        # and include in the public room directory of this home server
-        # This is a temporary stopgap solution to populate new server with a
-        # list of rooms until there exists a good solution of a decentralized
-        # room directory.
-        # secondary_directory_servers:
-        #     - matrix.org
-
         # List of ports that Synapse should listen on, their purpose and their
         # configuration.
         listeners:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 91bed4746f..f0a684fc13 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -24,7 +24,6 @@ from synapse.api.errors import (
     CodeMessageException, HttpResponseException, SynapseError,
 )
 from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
@@ -719,24 +718,11 @@ class FederationClient(FederationBase):
 
         raise RuntimeError("Failed to send to any server.")
 
-    @defer.inlineCallbacks
-    def get_public_rooms(self, destinations):
-        results_by_server = {}
-
-        @defer.inlineCallbacks
-        def _get_result(s):
-            if s == self.server_name:
-                defer.returnValue()
-
-            try:
-                result = yield self.transport_layer.get_public_rooms(s)
-                results_by_server[s] = result
-            except:
-                logger.exception("Error getting room list from server %r", s)
-
-        yield concurrently_execute(_get_result, destinations, 3)
+    def get_public_rooms(self, destination, limit=None, since_token=None):
+        if destination == self.server_name:
+            return
 
-        defer.returnValue(results_by_server)
+        return self.transport_layer.get_public_rooms(destination, limit, since_token)
 
     @defer.inlineCallbacks
     def query_auth(self, destination, room_id, event_id, local_auth):
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 2b138526ba..f508b70f11 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -248,12 +248,19 @@ class TransportLayerClient(object):
 
     @defer.inlineCallbacks
     @log_function
-    def get_public_rooms(self, remote_server):
+    def get_public_rooms(self, remote_server, limit, since_token):
         path = PREFIX + "/publicRooms"
 
+        args = {}
+        if limit:
+            args["limit"] = [str(limit)]
+        if since_token:
+            args["since"] = [since_token]
+
         response = yield self.client.get_json(
             destination=remote_server,
             path=path,
+            args=args,
         )
 
         defer.returnValue(response)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 37c0d4fbc4..fec337be64 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,7 +18,9 @@ from twisted.internet import defer
 from synapse.api.urls import FEDERATION_PREFIX as PREFIX
 from synapse.api.errors import Codes, SynapseError
 from synapse.http.server import JsonResource
-from synapse.http.servlet import parse_json_object_from_request
+from synapse.http.servlet import (
+    parse_json_object_from_request, parse_integer_from_args, parse_string_from_args,
+)
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
 
@@ -554,7 +556,11 @@ class PublicRoomList(BaseFederationServlet):
 
     @defer.inlineCallbacks
     def on_GET(self, origin, content, query):
-        data = yield self.room_list_handler.get_local_public_room_list()
+        limit = parse_integer_from_args(query, "limit", 0)
+        since_token = parse_string_from_args(query, "since", None)
+        data = yield self.room_list_handler.get_local_public_room_list(
+            limit, since_token
+        )
         defer.returnValue((200, data))
 
 
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index d72e8c99f9..28bc35f8a3 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -18,13 +18,16 @@ from twisted.internet import defer
 from ._base import BaseHandler
 
 from synapse.api.constants import (
-    EventTypes, JoinRules, Membership,
+    EventTypes, JoinRules,
 )
-from synapse.api.errors import SynapseError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
 
+from collections import namedtuple
+from unpaddedbase64 import encode_base64, decode_base64
+
 import logging
+import msgpack
 
 logger = logging.getLogger(__name__)
 
@@ -35,28 +38,130 @@ class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
         self.response_cache = ResponseCache(hs)
-        self.remote_list_request_cache = ResponseCache(hs)
-        self.remote_list_cache = {}
-        self.fetch_looping_call = hs.get_clock().looping_call(
-            self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
-        )
-        self.fetch_all_remote_lists()
 
-    def get_local_public_room_list(self):
-        result = self.response_cache.get(())
+    def get_local_public_room_list(self, limit=None, since_token=None):
+        result = self.response_cache.get((limit, since_token))
         if not result:
-            result = self.response_cache.set((), self._get_public_room_list())
+            result = self.response_cache.set(
+                (limit, since_token),
+                self._get_public_room_list(limit, since_token)
+            )
         return result
 
     @defer.inlineCallbacks
-    def _get_public_room_list(self):
-        room_ids = yield self.store.get_public_room_ids()
+    def _get_public_room_list(self, limit=None, since_token=None):
+        if since_token and since_token != "END":
+            since_token = RoomListNextBatch.from_token(since_token)
+        else:
+            since_token = None
+
+        rooms_to_order_value = {}
+        rooms_to_num_joined = {}
+        rooms_to_latest_event_ids = {}
+
+        newly_visible = []
+        newly_unpublished = []
+        if since_token:
+            stream_token = since_token.stream_ordering
+            current_public_id = yield self.store.get_current_public_room_stream_id()
+            public_room_stream_id = since_token.public_room_stream_id
+            newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
+                public_room_stream_id, current_public_id
+            )
+        else:
+            stream_token = yield self.store.get_room_max_stream_ordering()
+            public_room_stream_id = yield self.store.get_current_public_room_stream_id()
+
+        room_ids = yield self.store.get_public_room_ids_at_stream_id(
+            public_room_stream_id
+        )
+
+        # We want to return rooms in a particular order: the number of joined
+        # users. We then arbitrarily use the room_id as a tie breaker.
+
+        @defer.inlineCallbacks
+        def get_order_for_room(room_id):
+            latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
+            if not latest_event_ids:
+                latest_event_ids = yield self.store.get_forward_extremeties_for_room(
+                    room_id, stream_token
+                )
+                rooms_to_latest_event_ids[room_id] = latest_event_ids
+
+            if not latest_event_ids:
+                return
+
+            joined_users = yield self.state_handler.get_current_user_in_room(
+                room_id, latest_event_ids,
+            )
+            num_joined_users = len(joined_users)
+            rooms_to_num_joined[room_id] = num_joined_users
+
+            if num_joined_users == 0:
+                return
+
+            # We want larger rooms to be first, hence negating num_joined_users
+            rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+
+        yield concurrently_execute(get_order_for_room, room_ids, 10)
+
+        sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
+        sorted_rooms = [room_id for room_id, _ in sorted_entries]
+
+        if since_token:
+            if since_token.direction_is_forward:
+                sorted_rooms = sorted_rooms[since_token.current_limit:]
+            else:
+                sorted_rooms = sorted_rooms[:since_token.current_limit]
+                sorted_rooms.reverse()
 
-        results = []
+        new_limit = None
+        if limit:
+            if sorted_rooms[limit:]:
+                new_limit = limit
+                if since_token:
+                    if since_token.direction_is_forward:
+                        new_limit += since_token.current_limit
+                    else:
+                        new_limit = since_token.current_limit - new_limit
+                        new_limit = max(0, new_limit)
+            sorted_rooms = sorted_rooms[:limit]
+
+        chunk = []
 
         @defer.inlineCallbacks
         def handle_room(room_id):
-            current_state = yield self.state_handler.get_current_state(room_id)
+            num_joined_users = rooms_to_num_joined[room_id]
+            if num_joined_users == 0:
+                return
+
+            if room_id in newly_unpublished:
+                return
+
+            result = {
+                "room_id": room_id,
+                "num_joined_members": num_joined_users,
+            }
+
+            current_state_ids = yield self.state_handler.get_current_state_ids(room_id)
+
+            event_map = yield self.store.get_events([
+                event_id for key, event_id in current_state_ids.items()
+                if key[0] in (
+                    EventTypes.JoinRules,
+                    EventTypes.Name,
+                    EventTypes.Topic,
+                    EventTypes.CanonicalAlias,
+                    EventTypes.RoomHistoryVisibility,
+                    EventTypes.GuestAccess,
+                    "m.room.avatar",
+                )
+            ])
+
+            current_state = {
+                (ev.type, ev.state_key): ev
+                for ev in event_map.values()
+            }
 
             # Double check that this is actually a public room.
             join_rules_event = current_state.get((EventTypes.JoinRules, ""))
@@ -65,18 +170,6 @@ class RoomListHandler(BaseHandler):
                 if join_rule and join_rule != JoinRules.PUBLIC:
                     defer.returnValue(None)
 
-            result = {"room_id": room_id}
-
-            num_joined_users = len([
-                1 for _, event in current_state.items()
-                if event.type == EventTypes.Member
-                and event.membership == Membership.JOIN
-            ])
-            if num_joined_users == 0:
-                return
-
-            result["num_joined_members"] = num_joined_users
-
             aliases = yield self.store.get_aliases_for_room(room_id)
             if aliases:
                 result["aliases"] = aliases
@@ -117,68 +210,87 @@ class RoomListHandler(BaseHandler):
                 if avatar_url:
                     result["avatar_url"] = avatar_url
 
-            results.append(result)
+            chunk.append(result)
 
-        yield concurrently_execute(handle_room, room_ids, 10)
+        yield concurrently_execute(handle_room, sorted_rooms, 10)
 
-        # FIXME (erikj): START is no longer a valid value
-        defer.returnValue({"start": "START", "end": "END", "chunk": results})
+        chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
 
-    @defer.inlineCallbacks
-    def fetch_all_remote_lists(self):
-        deferred = self.hs.get_replication_layer().get_public_rooms(
-            self.hs.config.secondary_directory_servers
-        )
-        self.remote_list_request_cache.set((), deferred)
-        self.remote_list_cache = yield deferred
+        results = {
+            "chunk": chunk,
+        }
+
+        if since_token:
+            results["new_rooms"] = bool(newly_visible)
+
+        if not since_token or since_token.direction_is_forward:
+            if new_limit:
+                results["next_batch"] = RoomListNextBatch(
+                    stream_ordering=stream_token,
+                    public_room_stream_id=public_room_stream_id,
+                    current_limit=new_limit,
+                    direction_is_forward=True,
+                ).to_token()
+
+            if since_token:
+                results["prev_batch"] = since_token.copy_and_replace(
+                    direction_is_forward=False,
+                ).to_token()
+        else:
+            if new_limit:
+                results["prev_batch"] = RoomListNextBatch(
+                    stream_ordering=stream_token,
+                    public_room_stream_id=public_room_stream_id,
+                    current_limit=new_limit,
+                    direction_is_forward=False,
+                ).to_token()
+
+            if since_token:
+                results["next_batch"] = since_token.copy_and_replace(
+                    direction_is_forward=True,
+                ).to_token()
+
+        defer.returnValue(results)
 
     @defer.inlineCallbacks
-    def get_remote_public_room_list(self, server_name):
+    def get_remote_public_room_list(self, server_name, limit=None, since_token=None):
         res = yield self.hs.get_replication_layer().get_public_rooms(
-            [server_name]
+            server_name, limit=limit, since_token=since_token,
         )
 
-        if server_name not in res:
-            raise SynapseError(404, "Server not found")
-        defer.returnValue(res[server_name])
-
-    @defer.inlineCallbacks
-    def get_aggregated_public_room_list(self):
-        """
-        Get the public room list from this server and the servers
-        specified in the secondary_directory_servers config option.
-        XXX: Pagination...
-        """
-        # We return the results from out cache which is updated by a looping call,
-        # unless we're missing a cache entry, in which case wait for the result
-        # of the fetch if there's one in progress. If not, omit that server.
-        wait = False
-        for s in self.hs.config.secondary_directory_servers:
-            if s not in self.remote_list_cache:
-                logger.warn("No cached room list from %s: waiting for fetch", s)
-                wait = True
-                break
-
-        if wait and self.remote_list_request_cache.get(()):
-            yield self.remote_list_request_cache.get(())
-
-        public_rooms = yield self.get_local_public_room_list()
-
-        # keep track of which room IDs we've seen so we can de-dup
-        room_ids = set()
-
-        # tag all the ones in our list with our server name.
-        # Also add the them to the de-deping set
-        for room in public_rooms['chunk']:
-            room["server_name"] = self.hs.hostname
-            room_ids.add(room["room_id"])
-
-        # Now add the results from federation
-        for server_name, server_result in self.remote_list_cache.items():
-            for room in server_result["chunk"]:
-                if room["room_id"] not in room_ids:
-                    room["server_name"] = server_name
-                    public_rooms["chunk"].append(room)
-                    room_ids.add(room["room_id"])
-
-        defer.returnValue(public_rooms)
+        defer.returnValue(res)
+
+
+class RoomListNextBatch(namedtuple("RoomListNextBatch", (
+    "stream_ordering",  # stream_ordering of the first public room list
+    "public_room_stream_id",  # public room stream id for first public room list
+    "current_limit",  # The number of previous rooms returned
+    "direction_is_forward",  # Bool if this is a next_batch, false if prev_batch
+))):
+
+    KEY_DICT = {
+        "stream_ordering": "s",
+        "public_room_stream_id": "p",
+        "current_limit": "n",
+        "direction_is_forward": "d",
+    }
+
+    REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()}
+
+    @classmethod
+    def from_token(cls, token):
+        return RoomListNextBatch(**{
+            cls.REVERSE_KEY_DICT[key]: val
+            for key, val in msgpack.loads(decode_base64(token)).items()
+        })
+
+    def to_token(self):
+        return encode_base64(msgpack.dumps({
+            self.KEY_DICT[key]: val
+            for key, val in self._asdict().items()
+        }))
+
+    def copy_and_replace(self, **kwds):
+        return self._replace(
+            **kwds
+        )
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index e41afeab8e..9346386238 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -41,9 +41,13 @@ def parse_integer(request, name, default=None, required=False):
         SynapseError: if the parameter is absent and required, or if the
             parameter is present and not an integer.
     """
-    if name in request.args:
+    return parse_integer_from_args(request.args, name, default, required)
+
+
+def parse_integer_from_args(args, name, default=None, required=False):
+    if name in args:
         try:
-            return int(request.args[name][0])
+            return int(args[name][0])
         except:
             message = "Query parameter %r must be an integer" % (name,)
             raise SynapseError(400, message)
@@ -116,9 +120,15 @@ def parse_string(request, name, default=None, required=False,
             parameter is present, must be one of a list of allowed values and
             is not one of those allowed values.
     """
+    return parse_string_from_args(
+        request.args, name, default, required, allowed_values, param_type,
+    )
 
-    if name in request.args:
-        value = request.args[name][0]
+
+def parse_string_from_args(args, name, default=None, required=False,
+                           allowed_values=None, param_type="string"):
+    if name in args:
+        value = args[name][0]
         if allowed_values is not None and value not in allowed_values:
             message = "Query parameter %r must be one of [%s]" % (
                 name, ", ".join(repr(v) for v in allowed_values)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 86e3d89154..b9e41770ee 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -36,6 +36,7 @@ REQUIREMENTS = {
     "blist": ["blist"],
     "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
     "pymacaroons-pynacl": ["pymacaroons"],
+    "msgpack-python>=0.3.0": ["msgpack"],
 }
 CONDITIONAL_REQUIREMENTS = {
     "web_client": {
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 299e9419a4..9aab3ce23c 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -42,6 +42,7 @@ STREAM_NAMES = (
     ("pushers",),
     ("caches",),
     ("to_device",),
+    ("public_rooms",),
 )
 
 
@@ -131,6 +132,7 @@ class ReplicationResource(Resource):
         push_rules_token, room_stream_token = self.store.get_push_rules_stream_token()
         pushers_token = self.store.get_pushers_stream_token()
         caches_token = self.store.get_cache_stream_token()
+        public_rooms_token = self.store.get_current_public_room_stream_id()
 
         defer.returnValue(_ReplicationToken(
             room_stream_token,
@@ -144,6 +146,7 @@ class ReplicationResource(Resource):
             0,  # State stream is no longer a thing
             caches_token,
             int(stream_token.to_device_key),
+            int(public_rooms_token),
         ))
 
     @request_handler()
@@ -193,6 +196,7 @@ class ReplicationResource(Resource):
         yield self.pushers(writer, current_token, limit, request_streams)
         yield self.caches(writer, current_token, limit, request_streams)
         yield self.to_device(writer, current_token, limit, request_streams)
+        yield self.public_rooms(writer, current_token, limit, request_streams)
         self.streams(writer, current_token, request_streams)
 
         logger.debug("Replicated %d rows", writer.total)
@@ -400,6 +404,20 @@ class ReplicationResource(Resource):
                 "position", "user_id", "device_id", "message_json"
             ))
 
+    @defer.inlineCallbacks
+    def public_rooms(self, writer, current_token, limit, request_streams):
+        current_position = current_token.public_rooms
+
+        public_rooms = request_streams.get("public_rooms")
+
+        if public_rooms is not None:
+            public_rooms_rows = yield self.store.get_all_new_public_rooms(
+                public_rooms, current_position, limit
+            )
+            writer.write_header_and_rows("public_rooms", public_rooms_rows, (
+                "position", "room_id", "visibility"
+            ))
+
 
 class _Writer(object):
     """Writes the streams as a JSON object as the response to the request"""
@@ -428,7 +446,7 @@ class _Writer(object):
 
 class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
     "events", "presence", "typing", "receipts", "account_data", "backfill",
-    "push_rules", "pushers", "state", "caches", "to_device",
+    "push_rules", "pushers", "state", "caches", "to_device", "public_rooms",
 ))):
     __slots__ = []
 
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 15c52774a2..f8965c73a0 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -61,6 +61,8 @@ class SlavedEventStore(BaseSlavedStore):
             "MembershipStreamChangeCache", events_max,
         )
 
+        self.stream_ordering_month_ago = 0
+
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
     get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
@@ -168,6 +170,12 @@ class SlavedEventStore(BaseSlavedStore):
     get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
     _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
 
+    get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
+
+    get_forward_extremeties_for_room = (
+        DataStore.get_forward_extremeties_for_room.__func__
+    )
+
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
         result["events"] = self._stream_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index d5bb0f98ea..81743941dc 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -15,7 +15,38 @@
 
 from ._base import BaseSlavedStore
 from synapse.storage import DataStore
+from ._slaved_id_tracker import SlavedIdTracker
 
 
 class RoomStore(BaseSlavedStore):
+    def __init__(self, db_conn, hs):
+        super(RoomStore, self).__init__(db_conn, hs)
+        self._public_room_id_gen = SlavedIdTracker(
+            db_conn, "public_room_list_stream", "stream_id"
+        )
+
     get_public_room_ids = DataStore.get_public_room_ids.__func__
+    get_current_public_room_stream_id = (
+        DataStore.get_current_public_room_stream_id.__func__
+    )
+    get_public_room_ids_at_stream_id = (
+        DataStore.get_public_room_ids_at_stream_id.__func__
+    )
+    get_public_room_ids_at_stream_id_txn = (
+        DataStore.get_public_room_ids_at_stream_id_txn.__func__
+    )
+    get_published_at_stream_id_txn = (
+        DataStore.get_published_at_stream_id_txn.__func__
+    )
+
+    def stream_positions(self):
+        result = super(RoomStore, self).stream_positions()
+        result["public_rooms"] = self._public_room_id_gen.get_current_token()
+        return result
+
+    def process_replication(self, result):
+        stream = result.get("public_rooms")
+        if stream:
+            self._public_room_id_gen.advance(int(stream["position"]))
+
+        return super(RoomStore, self).process_replication(result)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 22d6a7d31e..924c785358 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -23,7 +23,9 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.filtering import Filter
 from synapse.types import UserID, RoomID, RoomAlias
 from synapse.events.utils import serialize_event, format_event_for_client_v2
-from synapse.http.servlet import parse_json_object_from_request, parse_string
+from synapse.http.servlet import (
+    parse_json_object_from_request, parse_string, parse_integer
+)
 
 import logging
 import urllib
@@ -317,11 +319,21 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
             else:
                 pass
 
+        limit = parse_integer(request, "limit", 100)
+        since_token = parse_string(request, "since", None)
+
         handler = self.hs.get_room_list_handler()
         if server:
-            data = yield handler.get_remote_public_room_list(server)
+            data = yield handler.get_remote_public_room_list(
+                server,
+                limit=limit,
+                since_token=since_token,
+            )
         else:
-            data = yield handler.get_aggregated_public_room_list()
+            data = yield handler.get_local_public_room_list(
+                limit=limit,
+                since_token=since_token,
+            )
 
         defer.returnValue((200, data))
 
diff --git a/synapse/state.py b/synapse/state.py
index d89aca26b1..b4eca0e5d5 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -157,8 +157,9 @@ class StateHandler(object):
         defer.returnValue(state)
 
     @defer.inlineCallbacks
-    def get_current_user_in_room(self, room_id):
-        latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+    def get_current_user_in_room(self, room_id, latest_event_ids=None):
+        if not latest_event_ids:
+            latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
         entry = yield self.resolve_state_groups(room_id, latest_event_ids)
         joined_users = yield self.store.get_joined_users_from_state(
             room_id, entry.state_id, entry.state
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a61e83d5de..0099a3f5bb 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -113,6 +113,9 @@ class DataStore(RoomMemberStore, RoomStore,
         self._device_inbox_id_gen = StreamIdGenerator(
             db_conn, "device_max_stream_id", "stream_id"
         )
+        self._public_room_id_gen = StreamIdGenerator(
+            db_conn, "public_room_list_stream", "stream_id"
+        )
 
         self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
         self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 0827946207..ec6dbe5492 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -16,6 +16,7 @@
 from twisted.internet import defer
 
 from ._base import SQLBaseStore
+from synapse.api.errors import StoreError
 from synapse.util.caches.descriptors import cached
 from unpaddedbase64 import encode_base64
 
@@ -36,6 +37,13 @@ class EventFederationStore(SQLBaseStore):
     and backfilling from another server respectively.
     """
 
+    def __init__(self, hs):
+        super(EventFederationStore, self).__init__(hs)
+
+        hs.get_clock().looping_call(
+            self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+        )
+
     def get_auth_chain(self, event_ids):
         return self.get_auth_chain_ids(event_ids).addCallback(self._get_events)
 
@@ -270,6 +278,37 @@ class EventFederationStore(SQLBaseStore):
             ]
         )
 
+        # We now insert into stream_ordering_to_exterm a mapping from room_id,
+        # new stream_ordering to new forward extremeties in the room.
+        # This allows us to later efficiently look up the forward extremeties
+        # for a room before a given stream_ordering
+        max_stream_ord = max(
+            ev.internal_metadata.stream_ordering for ev in events
+        )
+        new_extrem = {}
+        for room_id in events_by_room:
+            event_ids = self._simple_select_onecol_txn(
+                txn,
+                table="event_forward_extremities",
+                keyvalues={"room_id": room_id},
+                retcol="event_id",
+            )
+            new_extrem[room_id] = event_ids
+
+        self._simple_insert_many_txn(
+            txn,
+            table="stream_ordering_to_exterm",
+            values=[
+                {
+                    "room_id": room_id,
+                    "event_id": event_id,
+                    "stream_ordering": max_stream_ord,
+                }
+                for room_id, extrem_evs in new_extrem.items()
+                for event_id in extrem_evs
+            ]
+        )
+
         query = (
             "INSERT INTO event_backward_extremities (event_id, room_id)"
             " SELECT ?, ? WHERE NOT EXISTS ("
@@ -305,6 +344,48 @@ class EventFederationStore(SQLBaseStore):
                 self.get_latest_event_ids_in_room.invalidate, (room_id,)
             )
 
+    def get_forward_extremeties_for_room(self, room_id, stream_ordering):
+        """For a given room_id and stream_ordering, return the forward
+        extremeties of the room at that point in "time".
+
+        Throws a StoreError if we have since purged the index for
+        stream_orderings from that point.
+        """
+
+        if stream_ordering <= self.stream_ordering_month_ago:
+            raise StoreError(400, "stream_ordering too old")
+
+        sql = ("""
+                SELECT event_id FROM stream_ordering_to_exterm
+                INNER JOIN (
+                    SELECT room_id, MAX(stream_ordering) AS stream_ordering
+                    FROM stream_ordering_to_exterm
+                    WHERE stream_ordering < ? GROUP BY room_id
+                ) AS rms USING (room_id, stream_ordering)
+                WHERE room_id = ?
+        """)
+
+        def get_forward_extremeties_for_room_txn(txn):
+            txn.execute(sql, (stream_ordering, room_id))
+            rows = txn.fetchall()
+            return [event_id for event_id, in rows]
+
+        return self.runInteraction(
+            "get_forward_extremeties_for_room",
+            get_forward_extremeties_for_room_txn
+        )
+
+    def _delete_old_forward_extrem_cache(self):
+        def _delete_old_forward_extrem_cache_txn(txn):
+            txn.execute(
+                "DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?",
+                (self.stream_ordering_month_ago,)
+            )
+        return self.runInteraction(
+            "_delete_old_forward_extrem_cache",
+            _delete_old_forward_extrem_cache_txn
+        )
+
     def get_backfill_events(self, room_id, event_list, limit):
         """Get a list of Events for a given topic that occurred before (and
         including) the events in event_list. Return a list of max size `limit`
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 8251f58670..2ef13d7403 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -48,15 +48,31 @@ class RoomStore(SQLBaseStore):
             StoreError if the room could not be stored.
         """
         try:
-            yield self._simple_insert(
-                "rooms",
-                {
-                    "room_id": room_id,
-                    "creator": room_creator_user_id,
-                    "is_public": is_public,
-                },
-                desc="store_room",
-            )
+            def store_room_txn(txn, next_id):
+                self._simple_insert_txn(
+                    txn,
+                    "rooms",
+                    {
+                        "room_id": room_id,
+                        "creator": room_creator_user_id,
+                        "is_public": is_public,
+                    },
+                )
+                if is_public:
+                    self._simple_insert_txn(
+                        txn,
+                        table="public_room_list_stream",
+                        values={
+                            "stream_id": next_id,
+                            "room_id": room_id,
+                            "visibility": is_public,
+                        }
+                    )
+            with self._public_room_id_gen.get_next() as next_id:
+                yield self.runInteraction(
+                    "store_room_txn",
+                    store_room_txn, next_id,
+                )
         except Exception as e:
             logger.error("store_room with room_id=%s failed: %s", room_id, e)
             raise StoreError(500, "Problem creating room.")
@@ -77,13 +93,45 @@ class RoomStore(SQLBaseStore):
             allow_none=True,
         )
 
+    @defer.inlineCallbacks
     def set_room_is_public(self, room_id, is_public):
-        return self._simple_update_one(
-            table="rooms",
-            keyvalues={"room_id": room_id},
-            updatevalues={"is_public": is_public},
-            desc="set_room_is_public",
-        )
+        def set_room_is_public_txn(txn, next_id):
+            self._simple_update_one_txn(
+                txn,
+                table="rooms",
+                keyvalues={"room_id": room_id},
+                updatevalues={"is_public": is_public},
+            )
+
+            entries = self._simple_select_list_txn(
+                txn,
+                table="public_room_list_stream",
+                keyvalues={"room_id": room_id},
+                retcols=("stream_id", "visibility"),
+            )
+
+            entries.sort(key=lambda r: r["stream_id"])
+
+            add_to_stream = True
+            if entries:
+                add_to_stream = bool(entries[-1]["visibility"]) != is_public
+
+            if add_to_stream:
+                self._simple_insert_txn(
+                    txn,
+                    table="public_room_list_stream",
+                    values={
+                        "stream_id": next_id,
+                        "room_id": room_id,
+                        "visibility": is_public,
+                    }
+                )
+
+        with self._public_room_id_gen.get_next() as next_id:
+            yield self.runInteraction(
+                "set_room_is_public",
+                set_room_is_public_txn, next_id,
+            )
 
     def get_public_room_ids(self):
         return self._simple_select_onecol(
@@ -207,3 +255,71 @@ class RoomStore(SQLBaseStore):
             },
             desc="add_event_report"
         )
+
+    def get_current_public_room_stream_id(self):
+        return self._public_room_id_gen.get_current_token()
+
+    def get_public_room_ids_at_stream_id(self, stream_id):
+        return self.runInteraction(
+            "get_public_room_ids_at_stream_id",
+            self.get_public_room_ids_at_stream_id_txn, stream_id
+        )
+
+    def get_public_room_ids_at_stream_id_txn(self, txn, stream_id):
+        return {
+            rm
+            for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items()
+            if vis
+        }
+
+    def get_published_at_stream_id_txn(self, txn, stream_id):
+        sql = ("""
+            SELECT room_id, visibility FROM public_room_list_stream
+            INNER JOIN (
+                SELECT room_id, max(stream_id) AS stream_id
+                FROM public_room_list_stream
+                WHERE stream_id <= ?
+                GROUP BY room_id
+            ) grouped USING (room_id, stream_id)
+        """)
+
+        txn.execute(sql, (stream_id,))
+        return dict(txn.fetchall())
+
+    def get_public_room_changes(self, prev_stream_id, new_stream_id):
+        def get_public_room_changes_txn(txn):
+            then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id)
+
+            now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id)
+
+            now_rooms_visible = set(
+                rm for rm, vis in now_rooms_dict.items() if vis
+            )
+            now_rooms_not_visible = set(
+                rm for rm, vis in now_rooms_dict.items() if not vis
+            )
+
+            newly_visible = now_rooms_visible - then_rooms
+            newly_unpublished = now_rooms_not_visible & then_rooms
+
+            return newly_visible, newly_unpublished
+
+        return self.runInteraction(
+            "get_public_room_changes", get_public_room_changes_txn
+        )
+
+    def get_all_new_public_rooms(self, prev_id, current_id, limit):
+        def get_all_new_public_rooms(txn):
+            sql = ("""
+                SELECT stream_id, room_id, visibility FROM public_room_list_stream
+                WHERE stream_id > ? AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """)
+
+            txn.execute(sql, (prev_id, current_id, limit,))
+            return txn.fetchall()
+
+        return self.runInteraction(
+            "get_all_new_public_rooms", get_all_new_public_rooms
+        )
diff --git a/synapse/storage/schema/delta/35/public_room_list_change_stream.sql b/synapse/storage/schema/delta/35/public_room_list_change_stream.sql
new file mode 100644
index 0000000000..dd2bf2e28a
--- /dev/null
+++ b/synapse/storage/schema/delta/35/public_room_list_change_stream.sql
@@ -0,0 +1,33 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE public_room_list_stream (
+    stream_id BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    visibility BOOLEAN NOT NULL
+);
+
+INSERT INTO public_room_list_stream (stream_id, room_id, visibility)
+    SELECT 1, room_id, is_public FROM rooms
+    WHERE is_public = CAST(1 AS BOOLEAN);
+
+CREATE INDEX public_room_list_stream_idx on public_room_list_stream(
+    stream_id
+);
+
+CREATE INDEX public_room_list_stream_rm_idx on public_room_list_stream(
+    room_id, stream_id
+);
diff --git a/synapse/storage/schema/delta/35/stream_order_to_extrem.sql b/synapse/storage/schema/delta/35/stream_order_to_extrem.sql
new file mode 100644
index 0000000000..2b945d8a57
--- /dev/null
+++ b/synapse/storage/schema/delta/35/stream_order_to_extrem.sql
@@ -0,0 +1,37 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE stream_ordering_to_exterm (
+    stream_ordering BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    event_id TEXT NOT NULL
+);
+
+INSERT INTO stream_ordering_to_exterm (stream_ordering, room_id, event_id)
+    SELECT stream_ordering, room_id, event_id FROM event_forward_extremities
+    INNER JOIN (
+        SELECT room_id, max(stream_ordering) as stream_ordering FROM events
+        INNER JOIN event_forward_extremities USING (room_id, event_id)
+        GROUP BY room_id
+    ) AS rms USING (room_id);
+
+CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm(
+    stream_ordering
+);
+
+CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm(
+    room_id, stream_ordering
+);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 0577a0525b..07ea969d4d 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -531,6 +531,9 @@ class StreamStore(SQLBaseStore):
             )
             defer.returnValue("t%d-%d" % (topo, token))
 
+    def get_room_max_stream_ordering(self):
+        return self._stream_id_gen.get_current_token()
+
     def get_stream_token_for_event(self, event_id):
         """The stream token for an event
         Args: