summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-07-17 15:50:29 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-07-17 15:50:29 +0100
commit69f6a46cb597d0834b470779821d191d250183f9 (patch)
treedac9560d3f9f357b6cba990d784797bb96adbbf6 /synapse
parentChangelog for #5691 (diff)
downloadsynapse-69f6a46cb597d0834b470779821d191d250183f9.tar.xz
Use room_stats and room_state for room directory search
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/transport/server.py4
-rw-r--r--synapse/handlers/room_list.py280
-rw-r--r--synapse/rest/client/v1/room.py8
-rw-r--r--synapse/storage/room.py162
-rw-r--r--synapse/storage/stats.py3
5 files changed, 275 insertions, 182 deletions
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index c45d458d94..a6bf430c81 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -764,6 +764,10 @@ class PublicRoomList(BaseFederationServlet):
         else:
             network_tuple = ThirdPartyInstanceID(None, None)
 
+        if limit == 0:
+            # zero is a special value which corresponds to no limit.
+            limit = None
+
         data = yield self.handler.get_local_public_room_list(
             limit, since_token, network_tuple=network_tuple, from_federation=True
         )
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index aae696a7e8..9af7edc643 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -17,16 +17,15 @@ import logging
 from collections import namedtuple
 
 from six import PY3, iteritems
-from six.moves import range
 
 import msgpack
 from unpaddedbase64 import decode_base64, encode_base64
 
 from twisted.internet import defer
+from twisted.internet.defer import maybeDeferred
 
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.types import ThirdPartyInstanceID
-from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
 
@@ -36,7 +35,6 @@ logger = logging.getLogger(__name__)
 
 REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
 
-
 # This is used to indicate we should only return rooms published to the main list.
 EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 
@@ -71,6 +69,8 @@ class RoomListHandler(BaseHandler):
                 This can be (None, None) to indicate the main list, or a particular
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
+            from_federation (bool): true iff the request comes from the federation
+                API
         """
         if not self.enable_room_list_search:
             return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
@@ -132,200 +132,117 @@ class RoomListHandler(BaseHandler):
             from_federation (bool): Whether this request originated from a
                 federating server or a client. Used for room filtering.
             timeout (int|None): Amount of seconds to wait for a response before
-                timing out.
+                timing out. TODO
         """
-        if since_token and since_token != "END":
-            since_token = RoomListNextBatch.from_token(since_token)
+        pagination_token = None
+        if since_token and since_token != "END":  # todo ought we support END and START?
+            if since_token[0] in ("+", "-"):
+                forwards = since_token[0] == "+"
+                pagination_token = since_token[1:]
+            else:
+                raise SyntaxError("shrug ")  # TODO
         else:
-            since_token = None
+            forwards = True
 
-        rooms_to_order_value = {}
-        rooms_to_num_joined = {}
-
-        newly_visible = []
-        newly_unpublished = []
-        if since_token:
-            stream_token = since_token.stream_ordering
-            current_public_id = yield self.store.get_current_public_room_stream_id()
-            public_room_stream_id = since_token.public_room_stream_id
-            newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
-                public_room_stream_id, current_public_id, network_tuple=network_tuple
-            )
-        else:
-            stream_token = yield self.store.get_room_max_stream_ordering()
-            public_room_stream_id = yield self.store.get_current_public_room_stream_id()
+        # we request one more than wanted to see if there are more pages to come
+        probing_limit = limit + 1 if limit is not None else None
 
-        room_ids = yield self.store.get_public_room_ids_at_stream_id(
-            public_room_stream_id, network_tuple=network_tuple
+        results = yield self.store.get_largest_public_rooms(
+            network_tuple, search_filter, probing_limit, pagination_token, forwards
         )
 
-        # We want to return rooms in a particular order: the number of joined
-        # users. We then arbitrarily use the room_id as a tie breaker.
-
-        @defer.inlineCallbacks
-        def get_order_for_room(room_id):
-            # Most of the rooms won't have changed between the since token and
-            # now (especially if the since token is "now"). So, we can ask what
-            # the current users are in a room (that will hit a cache) and then
-            # check if the room has changed since the since token. (We have to
-            # do it in that order to avoid races).
-            # If things have changed then fall back to getting the current state
-            # at the since token.
-            joined_users = yield self.store.get_users_in_room(room_id)
-            if self.store.has_room_changed_since(room_id, stream_token):
-                latest_event_ids = yield self.store.get_forward_extremeties_for_room(
-                    room_id, stream_token
-                )
-
-                if not latest_event_ids:
-                    return
+        results = [
+            {
+                "room_id": r["room_id"],
+                "name": r["name"],
+                "topic": r["topic"],
+                "canonical_alias": r["canonical_alias"],
+                "num_joined_members": r["joined_members"],
+                "avatar_url": r["avatar"],
+                "world_readable": r["history_visibility"] == "world_readable",
+            }
+            for r in results
+        ]
 
-                joined_users = yield self.state_handler.get_current_users_in_room(
-                    room_id, latest_event_ids
-                )
+        response = {}
+        num_results = len(results)
+        if num_results > 0:
+            final_room_id = results[-1]["room_id"]
+            initial_room_id = results[0]["room_id"]
+            if limit is not None:
+                more_to_come = num_results == probing_limit
+                results = results[0:limit]
+            else:
+                more_to_come = False
+
+            if not forwards or (forwards and more_to_come):
+                response["next_batch"] = "+%s" % (final_room_id,)
+
+            if since_token and (forwards or (not forwards and more_to_come)):
+                if num_results > 0:
+                    response["prev_batch"] = "-%s" % (initial_room_id,)
+                else:
+                    response["prev_batch"] = "-%s" % (pagination_token,)
+
+        if from_federation:
+            # only show rooms with m.federate=True or absent (default is True)
+
+            # get rooms' state
+            room_state_ids = yield defer.gatherResults(
+                [
+                    maybeDeferred(self.store.get_current_state_ids, room["room_id"])
+                    for room in results
+                ],
+                consumeErrors=True,
+            )
 
-            num_joined_users = len(joined_users)
-            rooms_to_num_joined[room_id] = num_joined_users
+            # get rooms' creation state events' IDs
+            room_creation_event_ids = {
+                room["room_id"]: event_ids.get((EventTypes.Create, ""))
+                for (room, event_ids) in zip(results, room_state_ids)
+            }
 
-            if num_joined_users == 0:
-                return
+            # get rooms' creation state events
+            creation_events_by_id = yield self.store.get_events(
+                room_creation_event_ids.values()
+            )
 
-            # We want larger rooms to be first, hence negating num_joined_users
-            rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+            # associate them with the room IDs
+            room_creation_events = {
+                room_id: creation_events_by_id[event_id]
+                for (room_id, event_id) in room_creation_event_ids.items()
+            }
 
-        logger.info(
-            "Getting ordering for %i rooms since %s", len(room_ids), stream_token
-        )
-        yield concurrently_execute(get_order_for_room, room_ids, 10)
+            # now filter out rooms with m.federate: False in their create event
+            results = [
+                room
+                for room in results
+                if room_creation_events[room["room_id"]].content.get("m.federate", True)
+            ]
 
-        sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
-        sorted_rooms = [room_id for room_id, _ in sorted_entries]
+        for room in results:
+            # populate search result entries with additional fields, namely
+            # 'aliases' and 'guest_can_join'
+            room_id = room["room_id"]
+            room["aliases"] = yield self.store.get_aliases_for_room(room_id)
 
-        # `sorted_rooms` should now be a list of all public room ids that is
-        # stable across pagination. Therefore, we can use indices into this
-        # list as our pagination tokens.
+            state_ids = yield self.store.get_current_state_ids(room_id)
+            guests_can_join = False
+            guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
+            if guest_access_state_id is not None:
+                guest_access = yield self.store.get_event(guest_access_state_id)
+                if guest_access is not None:
+                    if guest_access.content.get("guest_access") == "can_join":
+                        guests_can_join = True
+            room["guest_can_join"] = guests_can_join
 
-        # Filter out rooms that we don't want to return
-        rooms_to_scan = [
-            r
-            for r in sorted_rooms
-            if r not in newly_unpublished and rooms_to_num_joined[r] > 0
-        ]
+        response["chunk"] = results
 
-        total_room_count = len(rooms_to_scan)
+        # TODO for federation, we currently don't remove m.federate=False rooms
+        #   from the total room count estimate.
+        response["total_room_count_estimate"] = yield self.store.count_public_rooms()
 
-        if since_token:
-            # Filter out rooms we've already returned previously
-            # `since_token.current_limit` is the index of the last room we
-            # sent down, so we exclude it and everything before/after it.
-            if since_token.direction_is_forward:
-                rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
-            else:
-                rooms_to_scan = rooms_to_scan[: since_token.current_limit]
-                rooms_to_scan.reverse()
-
-        logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
-
-        # _append_room_entry_to_chunk will append to chunk but will stop if
-        # len(chunk) > limit
-        #
-        # Normally we will generate enough results on the first iteration here,
-        #  but if there is a search filter, _append_room_entry_to_chunk may
-        # filter some results out, in which case we loop again.
-        #
-        # We don't want to scan over the entire range either as that
-        # would potentially waste a lot of work.
-        #
-        # XXX if there is no limit, we may end up DoSing the server with
-        # calls to get_current_state_ids for every single room on the
-        # server. Surely we should cap this somehow?
-        #
-        if limit:
-            step = limit + 1
-        else:
-            # step cannot be zero
-            step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
-
-        chunk = []
-        for i in range(0, len(rooms_to_scan), step):
-            if timeout and self.clock.time() > timeout:
-                raise Exception("Timed out searching room directory")
-
-            batch = rooms_to_scan[i : i + step]
-            logger.info("Processing %i rooms for result", len(batch))
-            yield concurrently_execute(
-                lambda r: self._append_room_entry_to_chunk(
-                    r,
-                    rooms_to_num_joined[r],
-                    chunk,
-                    limit,
-                    search_filter,
-                    from_federation=from_federation,
-                ),
-                batch,
-                5,
-            )
-            logger.info("Now %i rooms in result", len(chunk))
-            if len(chunk) >= limit + 1:
-                break
-
-        chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
-
-        # Work out the new limit of the batch for pagination, or None if we
-        # know there are no more results that would be returned.
-        # i.e., [since_token.current_limit..new_limit] is the batch of rooms
-        # we've returned (or the reverse if we paginated backwards)
-        # We tried to pull out limit + 1 rooms above, so if we have <= limit
-        # then we know there are no more results to return
-        new_limit = None
-        if chunk and (not limit or len(chunk) > limit):
-
-            if not since_token or since_token.direction_is_forward:
-                if limit:
-                    chunk = chunk[:limit]
-                last_room_id = chunk[-1]["room_id"]
-            else:
-                if limit:
-                    chunk = chunk[-limit:]
-                last_room_id = chunk[0]["room_id"]
-
-            new_limit = sorted_rooms.index(last_room_id)
-
-        results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
-
-        if since_token:
-            results["new_rooms"] = bool(newly_visible)
-
-        if not since_token or since_token.direction_is_forward:
-            if new_limit is not None:
-                results["next_batch"] = RoomListNextBatch(
-                    stream_ordering=stream_token,
-                    public_room_stream_id=public_room_stream_id,
-                    current_limit=new_limit,
-                    direction_is_forward=True,
-                ).to_token()
-
-            if since_token:
-                results["prev_batch"] = since_token.copy_and_replace(
-                    direction_is_forward=False,
-                    current_limit=since_token.current_limit + 1,
-                ).to_token()
-        else:
-            if new_limit is not None:
-                results["prev_batch"] = RoomListNextBatch(
-                    stream_ordering=stream_token,
-                    public_room_stream_id=public_room_stream_id,
-                    current_limit=new_limit,
-                    direction_is_forward=False,
-                ).to_token()
-
-            if since_token:
-                results["next_batch"] = since_token.copy_and_replace(
-                    direction_is_forward=True,
-                    current_limit=since_token.current_limit - 1,
-                ).to_token()
-
-        defer.returnValue(results)
+        defer.returnValue(response)
 
     @defer.inlineCallbacks
     def _append_room_entry_to_chunk(
@@ -560,7 +477,6 @@ class RoomListNextBatch(
         ),
     )
 ):
-
     KEY_DICT = {
         "stream_ordering": "s",
         "public_room_stream_id": "p",
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 7709c2d705..65faf539bb 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -332,6 +332,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
         limit = parse_integer(request, "limit", 0)
         since_token = parse_string(request, "since", None)
 
+        if limit == 0:
+            # zero is a special value which corresponds to no limit.
+            limit = None
+
         handler = self.hs.get_room_list_handler()
         if server:
             data = yield handler.get_remote_public_room_list(
@@ -369,6 +373,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
         else:
             network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
 
+        if limit == 0:
+            # zero is a special value which corresponds to no limit.
+            limit = None
+
         handler = self.hs.get_room_list_handler()
         if server:
             data = yield handler.get_remote_public_room_list(
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index fe9d79d792..37dd71b825 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -161,6 +162,167 @@ class RoomWorkerStore(SQLBaseStore):
             "get_public_room_changes", get_public_room_changes_txn
         )
 
+    def count_public_rooms(self):
+        """
+        Counts the number of public rooms as tracked in the room_stats and room_state
+        table.
+        A public room is one who has is_public set
+        AND is publicly-joinable and/or world-readable.
+        Returns:
+            number of public rooms on this homeserver's room directory
+
+        """
+
+        def _count_public_rooms_txn(txn):
+            sql = """
+                SELECT COUNT(*)
+                FROM room_stats
+                    JOIN room_state USING (room_id)
+                    JOIN rooms USING (room_id)
+                WHERE
+                    is_public
+                    AND (
+                        join_rules = 'public'
+                        OR history_visibility = 'world_readable'
+                    )
+            """
+            txn.execute(sql)
+            return txn.fetchone()[0]
+
+        return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
+
+    @defer.inlineCallbacks
+    def get_largest_public_rooms(
+        self, network_tuple, search_filter, limit, pagination_token, forwards
+    ):
+        """TODO doc this
+
+        Args:
+            network_tuple (ThirdPartyInstanceID|None):
+            search_filter (dict|None):
+            limit (int|None): Maxmimum number of rows to return, unlimited otherwise.
+            pagination_token (str|None): if present, a room ID which is to be
+                the (first/last) included in the results.
+            forwards (bool): true iff going forwards, going backwards otherwise
+
+        Returns:
+            Rooms in order: biggest number of joined users first.
+            We then arbitrarily use the room_id as a tie breaker.
+
+        """
+
+        # TODO probably want to use ts_… on Postgres?
+
+        sql = """
+            SELECT
+                room_id, name, topic, canonical_alias, joined_members,
+                avatar, history_visibility, joined_members
+            FROM
+                room_stats
+                JOIN room_state USING (room_id)
+                JOIN rooms USING (room_id)
+        """
+        query_args = []
+
+        if network_tuple:
+            sql += """
+                LEFT JOIN appservice_room_list arl USING (room_id)
+            """
+
+        sql += """
+            WHERE
+                is_public
+                AND (
+                    join_rules = 'public'
+                    OR history_visibility = 'world_readable'
+                )
+        """
+
+        if pagination_token:
+            pt_joined = yield self._simple_select_one_onecol(
+                table="room_stats",
+                keyvalues={"room_id": pagination_token},
+                retcol="joined_members",
+                desc="get_largest_public_rooms",
+            )
+
+            if forwards:
+                sql += """
+                    AND (
+                        (joined_members < ?)
+                        OR (joined_members = ? AND room_id >= ?)
+                    )
+                """
+            else:
+                sql += """
+                    AND (
+                        (joined_members > ?)
+                        OR (joined_members = ? AND room_id <= ?)
+                    )
+                """
+            query_args += [pt_joined, pt_joined, pagination_token]
+
+        if search_filter and search_filter.get("generic_search_term", None):
+            search_term = "%" + search_filter["generic_search_term"] + "%"
+            sql += """
+                AND (
+                    name LIKE ?
+                    OR topic LIKE ?
+                    OR canonical_alias LIKE ?
+                )
+            """
+            query_args += [search_term, search_term, search_term]
+
+        if network_tuple:
+            sql += "AND ("
+            if network_tuple.appservice_id:
+                sql += "appservice_id = ? AND "
+                query_args.append(network_tuple.appservice_id)
+            else:
+                sql += "appservice_id IS NULL AND "
+
+            if network_tuple.network_id:
+                sql += "network_id = ?)"
+                query_args.append(network_tuple.network_id)
+            else:
+                sql += "network_id IS NULL)"
+
+        if forwards:
+            sql += """
+                ORDER BY
+                    joined_members DESC, room_id ASC
+            """
+        else:
+            sql += """
+                ORDER BY
+                    joined_members ASC, room_id DESC
+            """
+
+        if limit is not None:
+            # be cautious about SQL injection
+            assert isinstance(limit, int)
+
+            sql += """
+                LIMIT %d
+            """ % (
+                limit,
+            )
+
+        def _get_largest_public_rooms_txn(txn):
+            txn.execute(sql, query_args)
+
+            results = self.cursor_to_dict(txn)
+
+            if not forwards:
+                results.reverse()
+
+            return results
+
+        ret_val = yield self.runInteraction(
+            "get_largest_public_rooms", _get_largest_public_rooms_txn
+        )
+        defer.returnValue(ret_val)
+
     @cached(max_entries=10000)
     def is_room_blocked(self, room_id):
         return self._simple_select_one_onecol(
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 1cec84ee2e..13d552e049 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -68,6 +68,9 @@ class StatsStore(StateDeltasStore):
             yield self._end_background_update("populate_stats_createtables")
             defer.returnValue(1)
 
+        # TODO dev only
+        yield self.delete_all_stats()
+
         # Get all the rooms that we want to process.
         def _make_staging_area(txn):
             # Create the temporary tables