diff --git a/changelog.d/5947.misc b/changelog.d/5947.misc
new file mode 100644
index 0000000000..d2fc89c11a
--- /dev/null
+++ b/changelog.d/5947.misc
@@ -0,0 +1 @@
+Perform room directory searches more efficiently, using room statistics.
\ No newline at end of file
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index f9930b6460..e35141c548 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -761,6 +761,10 @@ class PublicRoomList(BaseFederationServlet):
else:
network_tuple = ThirdPartyInstanceID(None, None)
+ if limit == 0:
+ # zero is a special value which corresponds to no limit.
+ limit = None
+
data = await maybeDeferred(
self.handler.get_local_public_room_list,
limit,
@@ -796,6 +800,10 @@ class PublicRoomList(BaseFederationServlet):
if search_filter is None:
logger.warning("Nonefilter")
+ if limit == 0:
+ # zero is a special value which corresponds to no limit.
+ limit = None
+
data = await self.handler.get_local_public_room_list(
limit=limit,
since_token=since_token,
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index a7e55f00e5..c911de022c 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -17,7 +17,6 @@ import logging
from collections import namedtuple
from six import PY3, iteritems
-from six.moves import range
import msgpack
from unpaddedbase64 import decode_base64, encode_base64
@@ -25,9 +24,8 @@ from unpaddedbase64 import decode_base64, encode_base64
from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules
-from synapse.api.errors import Codes, HttpResponseException
+from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.types import ThirdPartyInstanceID
-from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@@ -37,7 +35,6 @@ logger = logging.getLogger(__name__)
REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
-
# This is used to indicate we should only return rooms published to the main list.
EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
@@ -72,6 +69,8 @@ class RoomListHandler(BaseHandler):
This can be (None, None) to indicate the main list, or a particular
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
+ from_federation (bool): true iff the request comes from the federation
+ API
"""
if not self.enable_room_list_search:
return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
@@ -133,200 +132,113 @@ class RoomListHandler(BaseHandler):
from_federation (bool): Whether this request originated from a
federating server or a client. Used for room filtering.
timeout (int|None): Amount of seconds to wait for a response before
- timing out.
+ timing out. TODO
"""
+ pagination_token = None
if since_token and since_token != "END":
- since_token = RoomListNextBatch.from_token(since_token)
+ if since_token[0] in ("+", "-"):
+ forwards = since_token[0] == "+"
+ pagination_token = since_token[1:]
+ else:
+ raise SynapseError(400, "Invalid since token.")
else:
- since_token = None
+ forwards = True
- rooms_to_order_value = {}
- rooms_to_num_joined = {}
-
- newly_visible = []
- newly_unpublished = []
- if since_token:
- stream_token = since_token.stream_ordering
- current_public_id = yield self.store.get_current_public_room_stream_id()
- public_room_stream_id = since_token.public_room_stream_id
- newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
- public_room_stream_id, current_public_id, network_tuple=network_tuple
- )
- else:
- stream_token = yield self.store.get_room_max_stream_ordering()
- public_room_stream_id = yield self.store.get_current_public_room_stream_id()
+ # we request one more than wanted to see if there are more pages to come
+ probing_limit = limit + 1 if limit is not None else None
- room_ids = yield self.store.get_public_room_ids_at_stream_id(
- public_room_stream_id, network_tuple=network_tuple
+ results = yield self.store.get_largest_public_rooms(
+ network_tuple,
+ search_filter,
+ probing_limit,
+ pagination_token,
+ forwards,
+ fetch_creation_event_ids=from_federation,
)
- # We want to return rooms in a particular order: the number of joined
- # users. We then arbitrarily use the room_id as a tie breaker.
-
- @defer.inlineCallbacks
- def get_order_for_room(room_id):
- # Most of the rooms won't have changed between the since token and
- # now (especially if the since token is "now"). So, we can ask what
- # the current users are in a room (that will hit a cache) and then
- # check if the room has changed since the since token. (We have to
- # do it in that order to avoid races).
- # If things have changed then fall back to getting the current state
- # at the since token.
- joined_users = yield self.store.get_users_in_room(room_id)
- if self.store.has_room_changed_since(room_id, stream_token):
- latest_event_ids = yield self.store.get_forward_extremeties_for_room(
- room_id, stream_token
- )
+ def build_room_entry(room):
+ entry = {
+ "room_id": room["room_id"],
+ "name": room["name"],
+ "topic": room["topic"],
+ "canonical_alias": room["canonical_alias"],
+ "num_joined_members": room["joined_members"],
+ "avatar_url": room["avatar"],
+ "world_readable": room["history_visibility"] == "world_readable",
+ }
+
+ # Filter out Nones – rather omit the field altogether
+ return {k: v for k, v in entry.items() if v is not None}
+
+ if from_federation:
+ room_creation_event_ids = [r["creation_event_id"] for r in results]
+
+ results = [build_room_entry(r) for r in results]
+
+ response = {}
+ num_results = len(results)
+ if num_results > 0:
+ final_room_id = results[-1]["room_id"]
+ initial_room_id = results[0]["room_id"]
+ if limit is not None:
+ more_to_come = num_results == probing_limit
+ results = results[0:limit]
+ else:
+ more_to_come = False
+
+ if not forwards or (forwards and more_to_come):
+ response["next_batch"] = "+%s" % (final_room_id,)
+
+ if since_token and (forwards or (not forwards and more_to_come)):
+ if num_results > 0:
+ response["prev_batch"] = "-%s" % (initial_room_id,)
+ else:
+ response["prev_batch"] = "-%s" % (pagination_token,)
+
+ if from_federation:
+ # only show rooms with m.federate=True or absent (default is True)
- if not latest_event_ids:
- return
+ # we already have rooms' creation state events' IDs
+ # so get rooms' creation state events
+ creation_events_by_id = yield self.store.get_events(room_creation_event_ids)
- joined_users = yield self.state_handler.get_current_users_in_room(
- room_id, latest_event_ids
+ # now filter out rooms with m.federate: False in their create event
+ results = [
+ room
+ for (room, room_creation_event_id) in zip(
+ results, room_creation_event_ids
)
+ if creation_events_by_id[room_creation_event_id].content.get(
+ "m.federate", True
+ )
+ ]
- num_joined_users = len(joined_users)
- rooms_to_num_joined[room_id] = num_joined_users
+ for room in results:
+ # populate search result entries with additional fields, namely
+ # 'aliases' and 'guest_can_join'
+ room_id = room["room_id"]
- if num_joined_users == 0:
- return
+ aliases = yield self.store.get_aliases_for_room(room_id)
+ if aliases:
+ room["aliases"] = aliases
- # We want larger rooms to be first, hence negating num_joined_users
- rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+ state_ids = yield self.store.get_current_state_ids(room_id)
+ guests_can_join = False
+ guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
+ if guest_access_state_id is not None:
+ guest_access = yield self.store.get_event(guest_access_state_id)
+ if guest_access is not None:
+ if guest_access.content.get("guest_access") == "can_join":
+ guests_can_join = True
+ room["guest_can_join"] = guests_can_join
- logger.info(
- "Getting ordering for %i rooms since %s", len(room_ids), stream_token
- )
- yield concurrently_execute(get_order_for_room, room_ids, 10)
-
- sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
- sorted_rooms = [room_id for room_id, _ in sorted_entries]
-
- # `sorted_rooms` should now be a list of all public room ids that is
- # stable across pagination. Therefore, we can use indices into this
- # list as our pagination tokens.
-
- # Filter out rooms that we don't want to return
- rooms_to_scan = [
- r
- for r in sorted_rooms
- if r not in newly_unpublished and rooms_to_num_joined[r] > 0
- ]
-
- total_room_count = len(rooms_to_scan)
-
- if since_token:
- # Filter out rooms we've already returned previously
- # `since_token.current_limit` is the index of the last room we
- # sent down, so we exclude it and everything before/after it.
- if since_token.direction_is_forward:
- rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
- else:
- rooms_to_scan = rooms_to_scan[: since_token.current_limit]
- rooms_to_scan.reverse()
-
- logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
-
- # _append_room_entry_to_chunk will append to chunk but will stop if
- # len(chunk) > limit
- #
- # Normally we will generate enough results on the first iteration here,
- # but if there is a search filter, _append_room_entry_to_chunk may
- # filter some results out, in which case we loop again.
- #
- # We don't want to scan over the entire range either as that
- # would potentially waste a lot of work.
- #
- # XXX if there is no limit, we may end up DoSing the server with
- # calls to get_current_state_ids for every single room on the
- # server. Surely we should cap this somehow?
- #
- if limit:
- step = limit + 1
- else:
- # step cannot be zero
- step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
-
- chunk = []
- for i in range(0, len(rooms_to_scan), step):
- if timeout and self.clock.time() > timeout:
- raise Exception("Timed out searching room directory")
-
- batch = rooms_to_scan[i : i + step]
- logger.info("Processing %i rooms for result", len(batch))
- yield concurrently_execute(
- lambda r: self._append_room_entry_to_chunk(
- r,
- rooms_to_num_joined[r],
- chunk,
- limit,
- search_filter,
- from_federation=from_federation,
- ),
- batch,
- 5,
- )
- logger.info("Now %i rooms in result", len(chunk))
- if len(chunk) >= limit + 1:
- break
-
- chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
-
- # Work out the new limit of the batch for pagination, or None if we
- # know there are no more results that would be returned.
- # i.e., [since_token.current_limit..new_limit] is the batch of rooms
- # we've returned (or the reverse if we paginated backwards)
- # We tried to pull out limit + 1 rooms above, so if we have <= limit
- # then we know there are no more results to return
- new_limit = None
- if chunk and (not limit or len(chunk) > limit):
-
- if not since_token or since_token.direction_is_forward:
- if limit:
- chunk = chunk[:limit]
- last_room_id = chunk[-1]["room_id"]
- else:
- if limit:
- chunk = chunk[-limit:]
- last_room_id = chunk[0]["room_id"]
-
- new_limit = sorted_rooms.index(last_room_id)
-
- results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
-
- if since_token:
- results["new_rooms"] = bool(newly_visible)
-
- if not since_token or since_token.direction_is_forward:
- if new_limit is not None:
- results["next_batch"] = RoomListNextBatch(
- stream_ordering=stream_token,
- public_room_stream_id=public_room_stream_id,
- current_limit=new_limit,
- direction_is_forward=True,
- ).to_token()
-
- if since_token:
- results["prev_batch"] = since_token.copy_and_replace(
- direction_is_forward=False,
- current_limit=since_token.current_limit + 1,
- ).to_token()
- else:
- if new_limit is not None:
- results["prev_batch"] = RoomListNextBatch(
- stream_ordering=stream_token,
- public_room_stream_id=public_room_stream_id,
- current_limit=new_limit,
- direction_is_forward=False,
- ).to_token()
-
- if since_token:
- results["next_batch"] = since_token.copy_and_replace(
- direction_is_forward=True,
- current_limit=since_token.current_limit - 1,
- ).to_token()
-
- return results
+ response["chunk"] = results
+
+ # TODO for federation, we currently don't remove m.federate=False rooms
+ # from the total room count estimate.
+ response["total_room_count_estimate"] = yield self.store.count_public_rooms()
+
+ return response
@defer.inlineCallbacks
def _append_room_entry_to_chunk(
@@ -587,7 +499,6 @@ class RoomListNextBatch(
),
)
):
-
KEY_DICT = {
"stream_ordering": "s",
"public_room_stream_id": "p",
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index f244e8f469..b5e864fdaa 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -350,6 +350,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
limit = parse_integer(request, "limit", 0)
since_token = parse_string(request, "since", None)
+ if limit == 0:
+ # zero is a special value which corresponds to no limit.
+ limit = None
+
handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(
@@ -387,6 +391,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
else:
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
+ if limit == 0:
+ # zero is a special value which corresponds to no limit.
+ limit = None
+
handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 08e13f3a3b..732352a731 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -161,6 +162,196 @@ class RoomWorkerStore(SQLBaseStore):
"get_public_room_changes", get_public_room_changes_txn
)
+ def count_public_rooms(self):
+ """
+ Counts the number of public rooms as tracked in the room_stats_current
+ and room_stats_state
+ table.
+ A public room is one who has is_public set
+ AND is publicly-joinable and/or world-readable.
+ Returns:
+ number of public rooms on this homeserver's room directory
+
+ """
+
+ def _count_public_rooms_txn(txn):
+ sql = """
+ SELECT COUNT(*)
+ FROM room_stats_current
+ JOIN room_stats_state USING (room_id)
+ JOIN rooms USING (room_id)
+ WHERE
+ is_public
+ AND (
+ join_rules = 'public'
+ OR history_visibility = 'world_readable'
+ )
+ """
+ txn.execute(sql)
+ return txn.fetchone()[0]
+
+ return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
+
+ @defer.inlineCallbacks
+ def get_largest_public_rooms(
+ self,
+ network_tuple,
+ search_filter,
+ limit,
+ pagination_token,
+ forwards,
+ fetch_creation_event_ids=False,
+ ):
+ """Gets the largest public rooms (where largest is in terms of joined
+ members, as tracked in the statistics table).
+
+ Args:
+ network_tuple (ThirdPartyInstanceID|None):
+ search_filter (dict|None):
+ limit (int|None): Maxmimum number of rows to return, unlimited otherwise.
+ pagination_token (str|None): if present, a room ID which is to be
+ the (first/last) included in the results.
+ forwards (bool): true iff going forwards, going backwards otherwise
+ fetch_creation_event_ids (bool): if true, room creation_event_ids will
+ be included in the results.
+
+ Returns:
+ Rooms in order: biggest number of joined users first.
+ We then arbitrarily use the room_id as a tie breaker.
+
+ """
+
+ # TODO we probably want to use full text search on Postgres?
+
+ sql = """
+ SELECT
+ room_id, name, topic, canonical_alias, joined_members,
+ avatar, history_visibility, joined_members
+ """
+
+ if fetch_creation_event_ids:
+ sql += """
+ , cse_create.event_id AS creation_event_id
+ """
+
+ sql += """
+ FROM
+ room_stats_current
+ JOIN room_stats_state USING (room_id)
+ JOIN rooms USING (room_id)
+ """
+ query_args = []
+
+ if network_tuple:
+ sql += """
+ LEFT JOIN appservice_room_list arl USING (room_id)
+ """
+
+ if fetch_creation_event_ids:
+ sql += """
+ LEFT JOIN current_state_events cse_create USING (room_id)
+ """
+
+ sql += """
+ WHERE
+ is_public
+ AND (
+ join_rules = 'public'
+ OR history_visibility = 'world_readable'
+ )
+ """
+
+ if fetch_creation_event_ids:
+ sql += """
+ AND cse_create.type = 'm.room.create'
+ AND cse_create.state_key = ''
+ """
+
+ if pagination_token:
+ pt_joined = yield self._simple_select_one_onecol(
+ table="room_stats_current",
+ keyvalues={"room_id": pagination_token},
+ retcol="joined_members",
+ desc="get_largest_public_rooms",
+ )
+
+ if forwards:
+ sql += """
+ AND (
+ (joined_members < ?)
+ OR (joined_members = ? AND room_id >= ?)
+ )
+ """
+ else:
+ sql += """
+ AND (
+ (joined_members > ?)
+ OR (joined_members = ? AND room_id <= ?)
+ )
+ """
+ query_args += [pt_joined, pt_joined, pagination_token]
+
+ if search_filter and search_filter.get("generic_search_term", None):
+ search_term = "%" + search_filter["generic_search_term"] + "%"
+ sql += """
+ AND (
+ name LIKE ?
+ OR topic LIKE ?
+ OR canonical_alias LIKE ?
+ )
+ """
+ query_args += [search_term, search_term, search_term]
+
+ if network_tuple:
+ sql += "AND ("
+ if network_tuple.appservice_id:
+ sql += "appservice_id = ? AND "
+ query_args.append(network_tuple.appservice_id)
+ else:
+ sql += "appservice_id IS NULL AND "
+
+ if network_tuple.network_id:
+ sql += "network_id = ?)"
+ query_args.append(network_tuple.network_id)
+ else:
+ sql += "network_id IS NULL)"
+
+ if forwards:
+ sql += """
+ ORDER BY
+ joined_members DESC, room_id ASC
+ """
+ else:
+ sql += """
+ ORDER BY
+ joined_members ASC, room_id DESC
+ """
+
+ if limit is not None:
+ # be cautious about SQL injection
+ assert isinstance(limit, int)
+
+ sql += """
+ LIMIT %d
+ """ % (
+ limit,
+ )
+
+ def _get_largest_public_rooms_txn(txn):
+ txn.execute(sql, query_args)
+
+ results = self.cursor_to_dict(txn)
+
+ if not forwards:
+ results.reverse()
+
+ return results
+
+ ret_val = yield self.runInteraction(
+ "get_largest_public_rooms", _get_largest_public_rooms_txn
+ )
+ defer.returnValue(ret_val)
+
@cached(max_entries=10000)
def is_room_blocked(self, room_id):
return self._simple_select_one_onecol(
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 20ce3664a0..8c1eaaa10b 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -567,7 +567,7 @@ class StatsStore(StateDeltasStore):
Examples:
(low, high) → (kind)
- (3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
+ (3, 7) → 3 <git … <= 7 (normal-filled; low already processed before)
(-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
(-7, 7) → -7 <= … <= 7 (both)
|