summary refs log tree commit diff
path: root/synapse/handlers/room_list.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/room_list.py')
-rw-r--r--synapse/handlers/room_list.py278
1 files changed, 97 insertions, 181 deletions
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index a7e55f00e5..3d0e62ebaf 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -17,17 +17,16 @@ import logging
 from collections import namedtuple
 
 from six import PY3, iteritems
-from six.moves import range
 
 import msgpack
 from unpaddedbase64 import decode_base64, encode_base64
 
 from twisted.internet import defer
+from twisted.internet.defer import maybeDeferred
 
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.api.errors import Codes, HttpResponseException
 from synapse.types import ThirdPartyInstanceID
-from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
 
@@ -37,7 +36,6 @@ logger = logging.getLogger(__name__)
 
 REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
 
-
 # This is used to indicate we should only return rooms published to the main list.
 EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 
@@ -72,6 +70,8 @@ class RoomListHandler(BaseHandler):
                 This can be (None, None) to indicate the main list, or a particular
                 appservice and network id to use an appservice specific one.
                 Setting to None returns all public rooms across all lists.
+            from_federation (bool): true iff the request comes from the federation
+                API
         """
         if not self.enable_room_list_search:
             return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
@@ -133,200 +133,117 @@ class RoomListHandler(BaseHandler):
             from_federation (bool): Whether this request originated from a
                 federating server or a client. Used for room filtering.
             timeout (int|None): Amount of seconds to wait for a response before
-                timing out.
+                timing out. TODO
         """
-        if since_token and since_token != "END":
-            since_token = RoomListNextBatch.from_token(since_token)
+        pagination_token = None
+        if since_token and since_token != "END":  # todo ought we support END and START?
+            if since_token[0] in ("+", "-"):
+                forwards = since_token[0] == "+"
+                pagination_token = since_token[1:]
+            else:
+                raise SyntaxError("shrug ")  # TODO
         else:
-            since_token = None
+            forwards = True
 
-        rooms_to_order_value = {}
-        rooms_to_num_joined = {}
-
-        newly_visible = []
-        newly_unpublished = []
-        if since_token:
-            stream_token = since_token.stream_ordering
-            current_public_id = yield self.store.get_current_public_room_stream_id()
-            public_room_stream_id = since_token.public_room_stream_id
-            newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
-                public_room_stream_id, current_public_id, network_tuple=network_tuple
-            )
-        else:
-            stream_token = yield self.store.get_room_max_stream_ordering()
-            public_room_stream_id = yield self.store.get_current_public_room_stream_id()
+        # we request one more than wanted to see if there are more pages to come
+        probing_limit = limit + 1 if limit is not None else None
 
-        room_ids = yield self.store.get_public_room_ids_at_stream_id(
-            public_room_stream_id, network_tuple=network_tuple
+        results = yield self.store.get_largest_public_rooms(
+            network_tuple, search_filter, probing_limit, pagination_token, forwards
         )
 
-        # We want to return rooms in a particular order: the number of joined
-        # users. We then arbitrarily use the room_id as a tie breaker.
-
-        @defer.inlineCallbacks
-        def get_order_for_room(room_id):
-            # Most of the rooms won't have changed between the since token and
-            # now (especially if the since token is "now"). So, we can ask what
-            # the current users are in a room (that will hit a cache) and then
-            # check if the room has changed since the since token. (We have to
-            # do it in that order to avoid races).
-            # If things have changed then fall back to getting the current state
-            # at the since token.
-            joined_users = yield self.store.get_users_in_room(room_id)
-            if self.store.has_room_changed_since(room_id, stream_token):
-                latest_event_ids = yield self.store.get_forward_extremeties_for_room(
-                    room_id, stream_token
-                )
+        results = [
+            {
+                "room_id": r["room_id"],
+                "name": r["name"],
+                "topic": r["topic"],
+                "canonical_alias": r["canonical_alias"],
+                "num_joined_members": r["joined_members"],
+                "avatar_url": r["avatar"],
+                "world_readable": r["history_visibility"] == "world_readable",
+            }
+            for r in results
+        ]
 
-                if not latest_event_ids:
-                    return
+        response = {}
+        num_results = len(results)
+        if num_results > 0:
+            final_room_id = results[-1]["room_id"]
+            initial_room_id = results[0]["room_id"]
+            if limit is not None:
+                more_to_come = num_results == probing_limit
+                results = results[0:limit]
+            else:
+                more_to_come = False
 
-                joined_users = yield self.state_handler.get_current_users_in_room(
-                    room_id, latest_event_ids
-                )
+            if not forwards or (forwards and more_to_come):
+                response["next_batch"] = "+%s" % (final_room_id,)
+
+            if since_token and (forwards or (not forwards and more_to_come)):
+                if num_results > 0:
+                    response["prev_batch"] = "-%s" % (initial_room_id,)
+                else:
+                    response["prev_batch"] = "-%s" % (pagination_token,)
+
+        if from_federation:
+            # only show rooms with m.federate=True or absent (default is True)
+
+            # get rooms' state
+            room_state_ids = yield defer.gatherResults(
+                [
+                    maybeDeferred(self.store.get_current_state_ids, room["room_id"])
+                    for room in results
+                ],
+                consumeErrors=True,
+            )
 
-            num_joined_users = len(joined_users)
-            rooms_to_num_joined[room_id] = num_joined_users
+            # get rooms' creation state events' IDs
+            room_creation_event_ids = {
+                room["room_id"]: event_ids.get((EventTypes.Create, ""))
+                for (room, event_ids) in zip(results, room_state_ids)
+            }
 
-            if num_joined_users == 0:
-                return
+            # get rooms' creation state events
+            creation_events_by_id = yield self.store.get_events(
+                room_creation_event_ids.values()
+            )
 
-            # We want larger rooms to be first, hence negating num_joined_users
-            rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+            # associate them with the room IDs
+            room_creation_events = {
+                room_id: creation_events_by_id[event_id]
+                for (room_id, event_id) in room_creation_event_ids.items()
+            }
 
-        logger.info(
-            "Getting ordering for %i rooms since %s", len(room_ids), stream_token
-        )
-        yield concurrently_execute(get_order_for_room, room_ids, 10)
+            # now filter out rooms with m.federate: False in their create event
+            results = [
+                room
+                for room in results
+                if room_creation_events[room["room_id"]].content.get("m.federate", True)
+            ]
 
-        sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
-        sorted_rooms = [room_id for room_id, _ in sorted_entries]
+        for room in results:
+            # populate search result entries with additional fields, namely
+            # 'aliases' and 'guest_can_join'
+            room_id = room["room_id"]
+            room["aliases"] = yield self.store.get_aliases_for_room(room_id)
 
-        # `sorted_rooms` should now be a list of all public room ids that is
-        # stable across pagination. Therefore, we can use indices into this
-        # list as our pagination tokens.
+            state_ids = yield self.store.get_current_state_ids(room_id)
+            guests_can_join = False
+            guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
+            if guest_access_state_id is not None:
+                guest_access = yield self.store.get_event(guest_access_state_id)
+                if guest_access is not None:
+                    if guest_access.content.get("guest_access") == "can_join":
+                        guests_can_join = True
+            room["guest_can_join"] = guests_can_join
 
-        # Filter out rooms that we don't want to return
-        rooms_to_scan = [
-            r
-            for r in sorted_rooms
-            if r not in newly_unpublished and rooms_to_num_joined[r] > 0
-        ]
+        response["chunk"] = results
 
-        total_room_count = len(rooms_to_scan)
+        # TODO for federation, we currently don't remove m.federate=False rooms
+        #   from the total room count estimate.
+        response["total_room_count_estimate"] = yield self.store.count_public_rooms()
 
-        if since_token:
-            # Filter out rooms we've already returned previously
-            # `since_token.current_limit` is the index of the last room we
-            # sent down, so we exclude it and everything before/after it.
-            if since_token.direction_is_forward:
-                rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
-            else:
-                rooms_to_scan = rooms_to_scan[: since_token.current_limit]
-                rooms_to_scan.reverse()
-
-        logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
-
-        # _append_room_entry_to_chunk will append to chunk but will stop if
-        # len(chunk) > limit
-        #
-        # Normally we will generate enough results on the first iteration here,
-        #  but if there is a search filter, _append_room_entry_to_chunk may
-        # filter some results out, in which case we loop again.
-        #
-        # We don't want to scan over the entire range either as that
-        # would potentially waste a lot of work.
-        #
-        # XXX if there is no limit, we may end up DoSing the server with
-        # calls to get_current_state_ids for every single room on the
-        # server. Surely we should cap this somehow?
-        #
-        if limit:
-            step = limit + 1
-        else:
-            # step cannot be zero
-            step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
-
-        chunk = []
-        for i in range(0, len(rooms_to_scan), step):
-            if timeout and self.clock.time() > timeout:
-                raise Exception("Timed out searching room directory")
-
-            batch = rooms_to_scan[i : i + step]
-            logger.info("Processing %i rooms for result", len(batch))
-            yield concurrently_execute(
-                lambda r: self._append_room_entry_to_chunk(
-                    r,
-                    rooms_to_num_joined[r],
-                    chunk,
-                    limit,
-                    search_filter,
-                    from_federation=from_federation,
-                ),
-                batch,
-                5,
-            )
-            logger.info("Now %i rooms in result", len(chunk))
-            if len(chunk) >= limit + 1:
-                break
-
-        chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
-
-        # Work out the new limit of the batch for pagination, or None if we
-        # know there are no more results that would be returned.
-        # i.e., [since_token.current_limit..new_limit] is the batch of rooms
-        # we've returned (or the reverse if we paginated backwards)
-        # We tried to pull out limit + 1 rooms above, so if we have <= limit
-        # then we know there are no more results to return
-        new_limit = None
-        if chunk and (not limit or len(chunk) > limit):
-
-            if not since_token or since_token.direction_is_forward:
-                if limit:
-                    chunk = chunk[:limit]
-                last_room_id = chunk[-1]["room_id"]
-            else:
-                if limit:
-                    chunk = chunk[-limit:]
-                last_room_id = chunk[0]["room_id"]
-
-            new_limit = sorted_rooms.index(last_room_id)
-
-        results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
-
-        if since_token:
-            results["new_rooms"] = bool(newly_visible)
-
-        if not since_token or since_token.direction_is_forward:
-            if new_limit is not None:
-                results["next_batch"] = RoomListNextBatch(
-                    stream_ordering=stream_token,
-                    public_room_stream_id=public_room_stream_id,
-                    current_limit=new_limit,
-                    direction_is_forward=True,
-                ).to_token()
-
-            if since_token:
-                results["prev_batch"] = since_token.copy_and_replace(
-                    direction_is_forward=False,
-                    current_limit=since_token.current_limit + 1,
-                ).to_token()
-        else:
-            if new_limit is not None:
-                results["prev_batch"] = RoomListNextBatch(
-                    stream_ordering=stream_token,
-                    public_room_stream_id=public_room_stream_id,
-                    current_limit=new_limit,
-                    direction_is_forward=False,
-                ).to_token()
-
-            if since_token:
-                results["next_batch"] = since_token.copy_and_replace(
-                    direction_is_forward=True,
-                    current_limit=since_token.current_limit - 1,
-                ).to_token()
-
-        return results
+        return response
 
     @defer.inlineCallbacks
     def _append_room_entry_to_chunk(
@@ -587,7 +504,6 @@ class RoomListNextBatch(
         ),
     )
 ):
-
     KEY_DICT = {
         "stream_ordering": "s",
         "public_room_stream_id": "p",