summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/5947.misc1
-rw-r--r--synapse/federation/transport/server.py8
-rw-r--r--synapse/handlers/room_list.py281
-rw-r--r--synapse/rest/client/v1/room.py8
-rw-r--r--synapse/storage/room.py191
-rw-r--r--synapse/storage/stats.py2
6 files changed, 305 insertions, 186 deletions
diff --git a/changelog.d/5947.misc b/changelog.d/5947.misc
new file mode 100644
index 0000000000..d2fc89c11a
--- /dev/null
+++ b/changelog.d/5947.misc
@@ -0,0 +1 @@
+Perform room directory searches more efficiently, using room statistics.
\ No newline at end of file
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index f9930b6460..e35141c548 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -761,6 +761,10 @@ class PublicRoomList(BaseFederationServlet):
         else:
             network_tuple = ThirdPartyInstanceID(None, None)
 
+        if limit == 0:
+            # zero is a special value which corresponds to no limit.
+            limit = None
+
         data = await maybeDeferred(
             self.handler.get_local_public_room_list,
             limit,
@@ -796,6 +800,10 @@ class PublicRoomList(BaseFederationServlet):
         if search_filter is None:
             logger.warning("Nonefilter")
 
+        if limit == 0:
+            # zero is a special value which corresponds to no limit.
+            limit = None
+
         data = await self.handler.get_local_public_room_list(
             limit=limit,
             since_token=since_token,
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index a7e55f00e5..c911de022c 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -17,7 +17,6 @@ import logging
 from collections import namedtuple
 
 from six import PY3, iteritems
-from six.moves import range
 
 import msgpack
 from unpaddedbase64 import decode_base64, encode_base64
@@ -25,9 +24,8 @@ from unpaddedbase64 import decode_base64, encode_base64
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules
-from synapse.api.errors import Codes, HttpResponseException
+from synapse.api.errors import Codes, HttpResponseException, SynapseError
 from synapse.types import ThirdPartyInstanceID
-from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
 
@@ -37,7 +35,6 @@ logger = logging.getLogger(__name__)
 
 REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
 
-
 # This is used to indicate we should only return rooms published to the main list.
 EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 
@@ -72,6 +69,8 @@ class RoomListHandler(BaseHandler):
                 This can be (None, None) to indicate the main list, or a particular
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
+            from_federation (bool): true iff the request comes from the federation
+                API
         """
         if not self.enable_room_list_search:
             return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
@@ -133,200 +132,113 @@ class RoomListHandler(BaseHandler):
             from_federation (bool): Whether this request originated from a
                 federating server or a client. Used for room filtering.
             timeout (int|None): Amount of seconds to wait for a response before
-                timing out.
+                timing out. TODO
         """
+        pagination_token = None
         if since_token and since_token != "END":
-            since_token = RoomListNextBatch.from_token(since_token)
+            if since_token[0] in ("+", "-"):
+                forwards = since_token[0] == "+"
+                pagination_token = since_token[1:]
+            else:
+                raise SynapseError(400, "Invalid since token.")
         else:
-            since_token = None
+            forwards = True
 
-        rooms_to_order_value = {}
-        rooms_to_num_joined = {}
-
-        newly_visible = []
-        newly_unpublished = []
-        if since_token:
-            stream_token = since_token.stream_ordering
-            current_public_id = yield self.store.get_current_public_room_stream_id()
-            public_room_stream_id = since_token.public_room_stream_id
-            newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
-                public_room_stream_id, current_public_id, network_tuple=network_tuple
-            )
-        else:
-            stream_token = yield self.store.get_room_max_stream_ordering()
-            public_room_stream_id = yield self.store.get_current_public_room_stream_id()
+        # we request one more than wanted to see if there are more pages to come
+        probing_limit = limit + 1 if limit is not None else None
 
-        room_ids = yield self.store.get_public_room_ids_at_stream_id(
-            public_room_stream_id, network_tuple=network_tuple
+        results = yield self.store.get_largest_public_rooms(
+            network_tuple,
+            search_filter,
+            probing_limit,
+            pagination_token,
+            forwards,
+            fetch_creation_event_ids=from_federation,
         )
 
-        # We want to return rooms in a particular order: the number of joined
-        # users. We then arbitrarily use the room_id as a tie breaker.
-
-        @defer.inlineCallbacks
-        def get_order_for_room(room_id):
-            # Most of the rooms won't have changed between the since token and
-            # now (especially if the since token is "now"). So, we can ask what
-            # the current users are in a room (that will hit a cache) and then
-            # check if the room has changed since the since token. (We have to
-            # do it in that order to avoid races).
-            # If things have changed then fall back to getting the current state
-            # at the since token.
-            joined_users = yield self.store.get_users_in_room(room_id)
-            if self.store.has_room_changed_since(room_id, stream_token):
-                latest_event_ids = yield self.store.get_forward_extremeties_for_room(
-                    room_id, stream_token
-                )
+        def build_room_entry(room):
+            entry = {
+                "room_id": room["room_id"],
+                "name": room["name"],
+                "topic": room["topic"],
+                "canonical_alias": room["canonical_alias"],
+                "num_joined_members": room["joined_members"],
+                "avatar_url": room["avatar"],
+                "world_readable": room["history_visibility"] == "world_readable",
+            }
+
+            # Filter out Nones – rather omit the field altogether
+            return {k: v for k, v in entry.items() if v is not None}
+
+        if from_federation:
+            room_creation_event_ids = [r["creation_event_id"] for r in results]
+
+        results = [build_room_entry(r) for r in results]
+
+        response = {}
+        num_results = len(results)
+        if num_results > 0:
+            final_room_id = results[-1]["room_id"]
+            initial_room_id = results[0]["room_id"]
+            if limit is not None:
+                more_to_come = num_results == probing_limit
+                results = results[0:limit]
+            else:
+                more_to_come = False
+
+            if not forwards or (forwards and more_to_come):
+                response["next_batch"] = "+%s" % (final_room_id,)
+
+            if since_token and (forwards or (not forwards and more_to_come)):
+                if num_results > 0:
+                    response["prev_batch"] = "-%s" % (initial_room_id,)
+                else:
+                    response["prev_batch"] = "-%s" % (pagination_token,)
+
+        if from_federation:
+            # only show rooms with m.federate=True or absent (default is True)
 
-                if not latest_event_ids:
-                    return
+            # we already have rooms' creation state events' IDs
+            # so get rooms' creation state events
+            creation_events_by_id = yield self.store.get_events(room_creation_event_ids)
 
-                joined_users = yield self.state_handler.get_current_users_in_room(
-                    room_id, latest_event_ids
+            # now filter out rooms with m.federate: False in their create event
+            results = [
+                room
+                for (room, room_creation_event_id) in zip(
+                    results, room_creation_event_ids
                 )
+                if creation_events_by_id[room_creation_event_id].content.get(
+                    "m.federate", True
+                )
+            ]
 
-            num_joined_users = len(joined_users)
-            rooms_to_num_joined[room_id] = num_joined_users
+        for room in results:
+            # populate search result entries with additional fields, namely
+            # 'aliases' and 'guest_can_join'
+            room_id = room["room_id"]
 
-            if num_joined_users == 0:
-                return
+            aliases = yield self.store.get_aliases_for_room(room_id)
+            if aliases:
+                room["aliases"] = aliases
 
-            # We want larger rooms to be first, hence negating num_joined_users
-            rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+            state_ids = yield self.store.get_current_state_ids(room_id)
+            guests_can_join = False
+            guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
+            if guest_access_state_id is not None:
+                guest_access = yield self.store.get_event(guest_access_state_id)
+                if guest_access is not None:
+                    if guest_access.content.get("guest_access") == "can_join":
+                        guests_can_join = True
+            room["guest_can_join"] = guests_can_join
 
-        logger.info(
-            "Getting ordering for %i rooms since %s", len(room_ids), stream_token
-        )
-        yield concurrently_execute(get_order_for_room, room_ids, 10)
-
-        sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
-        sorted_rooms = [room_id for room_id, _ in sorted_entries]
-
-        # `sorted_rooms` should now be a list of all public room ids that is
-        # stable across pagination. Therefore, we can use indices into this
-        # list as our pagination tokens.
-
-        # Filter out rooms that we don't want to return
-        rooms_to_scan = [
-            r
-            for r in sorted_rooms
-            if r not in newly_unpublished and rooms_to_num_joined[r] > 0
-        ]
-
-        total_room_count = len(rooms_to_scan)
-
-        if since_token:
-            # Filter out rooms we've already returned previously
-            # `since_token.current_limit` is the index of the last room we
-            # sent down, so we exclude it and everything before/after it.
-            if since_token.direction_is_forward:
-                rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
-            else:
-                rooms_to_scan = rooms_to_scan[: since_token.current_limit]
-                rooms_to_scan.reverse()
-
-        logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
-
-        # _append_room_entry_to_chunk will append to chunk but will stop if
-        # len(chunk) > limit
-        #
-        # Normally we will generate enough results on the first iteration here,
-        #  but if there is a search filter, _append_room_entry_to_chunk may
-        # filter some results out, in which case we loop again.
-        #
-        # We don't want to scan over the entire range either as that
-        # would potentially waste a lot of work.
-        #
-        # XXX if there is no limit, we may end up DoSing the server with
-        # calls to get_current_state_ids for every single room on the
-        # server. Surely we should cap this somehow?
-        #
-        if limit:
-            step = limit + 1
-        else:
-            # step cannot be zero
-            step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
-
-        chunk = []
-        for i in range(0, len(rooms_to_scan), step):
-            if timeout and self.clock.time() > timeout:
-                raise Exception("Timed out searching room directory")
-
-            batch = rooms_to_scan[i : i + step]
-            logger.info("Processing %i rooms for result", len(batch))
-            yield concurrently_execute(
-                lambda r: self._append_room_entry_to_chunk(
-                    r,
-                    rooms_to_num_joined[r],
-                    chunk,
-                    limit,
-                    search_filter,
-                    from_federation=from_federation,
-                ),
-                batch,
-                5,
-            )
-            logger.info("Now %i rooms in result", len(chunk))
-            if len(chunk) >= limit + 1:
-                break
-
-        chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
-
-        # Work out the new limit of the batch for pagination, or None if we
-        # know there are no more results that would be returned.
-        # i.e., [since_token.current_limit..new_limit] is the batch of rooms
-        # we've returned (or the reverse if we paginated backwards)
-        # We tried to pull out limit + 1 rooms above, so if we have <= limit
-        # then we know there are no more results to return
-        new_limit = None
-        if chunk and (not limit or len(chunk) > limit):
-
-            if not since_token or since_token.direction_is_forward:
-                if limit:
-                    chunk = chunk[:limit]
-                last_room_id = chunk[-1]["room_id"]
-            else:
-                if limit:
-                    chunk = chunk[-limit:]
-                last_room_id = chunk[0]["room_id"]
-
-            new_limit = sorted_rooms.index(last_room_id)
-
-        results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
-
-        if since_token:
-            results["new_rooms"] = bool(newly_visible)
-
-        if not since_token or since_token.direction_is_forward:
-            if new_limit is not None:
-                results["next_batch"] = RoomListNextBatch(
-                    stream_ordering=stream_token,
-                    public_room_stream_id=public_room_stream_id,
-                    current_limit=new_limit,
-                    direction_is_forward=True,
-                ).to_token()
-
-            if since_token:
-                results["prev_batch"] = since_token.copy_and_replace(
-                    direction_is_forward=False,
-                    current_limit=since_token.current_limit + 1,
-                ).to_token()
-        else:
-            if new_limit is not None:
-                results["prev_batch"] = RoomListNextBatch(
-                    stream_ordering=stream_token,
-                    public_room_stream_id=public_room_stream_id,
-                    current_limit=new_limit,
-                    direction_is_forward=False,
-                ).to_token()
-
-            if since_token:
-                results["next_batch"] = since_token.copy_and_replace(
-                    direction_is_forward=True,
-                    current_limit=since_token.current_limit - 1,
-                ).to_token()
-
-        return results
+        response["chunk"] = results
+
+        # TODO for federation, we currently don't remove m.federate=False rooms
+        #   from the total room count estimate.
+        response["total_room_count_estimate"] = yield self.store.count_public_rooms()
+
+        return response
 
     @defer.inlineCallbacks
     def _append_room_entry_to_chunk(
@@ -587,7 +499,6 @@ class RoomListNextBatch(
         ),
     )
 ):
-
     KEY_DICT = {
         "stream_ordering": "s",
         "public_room_stream_id": "p",
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index f244e8f469..b5e864fdaa 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -350,6 +350,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
         limit = parse_integer(request, "limit", 0)
         since_token = parse_string(request, "since", None)
 
+        if limit == 0:
+            # zero is a special value which corresponds to no limit.
+            limit = None
+
         handler = self.hs.get_room_list_handler()
         if server:
             data = yield handler.get_remote_public_room_list(
@@ -387,6 +391,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
         else:
             network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
 
+        if limit == 0:
+            # zero is a special value which corresponds to no limit.
+            limit = None
+
         handler = self.hs.get_room_list_handler()
         if server:
             data = yield handler.get_remote_public_room_list(
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 08e13f3a3b..732352a731 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -161,6 +162,196 @@ class RoomWorkerStore(SQLBaseStore):
             "get_public_room_changes", get_public_room_changes_txn
         )
 
+    def count_public_rooms(self):
+        """
+        Counts the number of public rooms as tracked in the room_stats_current
+        and room_stats_state
+        table.
+        A public room is one who has is_public set
+        AND is publicly-joinable and/or world-readable.
+        Returns:
+            number of public rooms on this homeserver's room directory
+
+        """
+
+        def _count_public_rooms_txn(txn):
+            sql = """
+                SELECT COUNT(*)
+                FROM room_stats_current
+                    JOIN room_stats_state USING (room_id)
+                    JOIN rooms USING (room_id)
+                WHERE
+                    is_public
+                    AND (
+                        join_rules = 'public'
+                        OR history_visibility = 'world_readable'
+                    )
+            """
+            txn.execute(sql)
+            return txn.fetchone()[0]
+
+        return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
+
+    @defer.inlineCallbacks
+    def get_largest_public_rooms(
+        self,
+        network_tuple,
+        search_filter,
+        limit,
+        pagination_token,
+        forwards,
+        fetch_creation_event_ids=False,
+    ):
+        """Gets the largest public rooms (where largest is in terms of joined
+        members, as tracked in the statistics table).
+
+        Args:
+            network_tuple (ThirdPartyInstanceID|None):
+            search_filter (dict|None):
+            limit (int|None): Maxmimum number of rows to return, unlimited otherwise.
+            pagination_token (str|None): if present, a room ID which is to be
+                the (first/last) included in the results.
+            forwards (bool): true iff going forwards, going backwards otherwise
+            fetch_creation_event_ids (bool): if true, room creation_event_ids will
+                be included in the results.
+
+        Returns:
+            Rooms in order: biggest number of joined users first.
+            We then arbitrarily use the room_id as a tie breaker.
+
+        """
+
+        # TODO we probably want to use full text search on Postgres?
+
+        sql = """
+            SELECT
+                room_id, name, topic, canonical_alias, joined_members,
+                avatar, history_visibility, joined_members
+        """
+
+        if fetch_creation_event_ids:
+            sql += """
+                , cse_create.event_id AS creation_event_id
+            """
+
+        sql += """
+            FROM
+                room_stats_current
+                JOIN room_stats_state USING (room_id)
+                JOIN rooms USING (room_id)
+        """
+        query_args = []
+
+        if network_tuple:
+            sql += """
+                LEFT JOIN appservice_room_list arl USING (room_id)
+            """
+
+        if fetch_creation_event_ids:
+            sql += """
+                LEFT JOIN current_state_events cse_create USING (room_id)
+            """
+
+        sql += """
+            WHERE
+                is_public
+                AND (
+                    join_rules = 'public'
+                    OR history_visibility = 'world_readable'
+                )
+        """
+
+        if fetch_creation_event_ids:
+            sql += """
+                AND cse_create.type = 'm.room.create'
+                AND cse_create.state_key = ''
+            """
+
+        if pagination_token:
+            pt_joined = yield self._simple_select_one_onecol(
+                table="room_stats_current",
+                keyvalues={"room_id": pagination_token},
+                retcol="joined_members",
+                desc="get_largest_public_rooms",
+            )
+
+            if forwards:
+                sql += """
+                    AND (
+                        (joined_members < ?)
+                        OR (joined_members = ? AND room_id >= ?)
+                    )
+                """
+            else:
+                sql += """
+                    AND (
+                        (joined_members > ?)
+                        OR (joined_members = ? AND room_id <= ?)
+                    )
+                """
+            query_args += [pt_joined, pt_joined, pagination_token]
+
+        if search_filter and search_filter.get("generic_search_term", None):
+            search_term = "%" + search_filter["generic_search_term"] + "%"
+            sql += """
+                AND (
+                    name LIKE ?
+                    OR topic LIKE ?
+                    OR canonical_alias LIKE ?
+                )
+            """
+            query_args += [search_term, search_term, search_term]
+
+        if network_tuple:
+            sql += "AND ("
+            if network_tuple.appservice_id:
+                sql += "appservice_id = ? AND "
+                query_args.append(network_tuple.appservice_id)
+            else:
+                sql += "appservice_id IS NULL AND "
+
+            if network_tuple.network_id:
+                sql += "network_id = ?)"
+                query_args.append(network_tuple.network_id)
+            else:
+                sql += "network_id IS NULL)"
+
+        if forwards:
+            sql += """
+                ORDER BY
+                    joined_members DESC, room_id ASC
+            """
+        else:
+            sql += """
+                ORDER BY
+                    joined_members ASC, room_id DESC
+            """
+
+        if limit is not None:
+            # be cautious about SQL injection
+            assert isinstance(limit, int)
+
+            sql += """
+                LIMIT %d
+            """ % (
+                limit,
+            )
+
+        def _get_largest_public_rooms_txn(txn):
+            txn.execute(sql, query_args)
+
+            results = self.cursor_to_dict(txn)
+
+            if not forwards:
+                results.reverse()
+
+            return results
+
+        ret_val = yield self.runInteraction(
+            "get_largest_public_rooms", _get_largest_public_rooms_txn
+        )
+        defer.returnValue(ret_val)
+
     @cached(max_entries=10000)
     def is_room_blocked(self, room_id):
         return self._simple_select_one_onecol(
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 20ce3664a0..8c1eaaa10b 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -567,7 +567,7 @@ class StatsStore(StateDeltasStore):
 
         Examples:
         (low, high) → (kind)
-        (3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
+        (3, 7) → 3 <git … <= 7 (normal-filled; low already processed before)
         (-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
         (-7, 7) → -7 <= … <= 7 (both)