diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index f9930b6460..e35141c548 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -761,6 +761,10 @@ class PublicRoomList(BaseFederationServlet):
else:
network_tuple = ThirdPartyInstanceID(None, None)
+ if limit == 0:
+ # zero is a special value which corresponds to no limit.
+ limit = None
+
data = await maybeDeferred(
self.handler.get_local_public_room_list,
limit,
@@ -796,6 +800,10 @@ class PublicRoomList(BaseFederationServlet):
if search_filter is None:
logger.warning("Nonefilter")
+ if limit == 0:
+ # zero is a special value which corresponds to no limit.
+ limit = None
+
data = await self.handler.get_local_public_room_list(
limit=limit,
since_token=since_token,
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index a7e55f00e5..3d0e62ebaf 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -17,17 +17,16 @@ import logging
from collections import namedtuple
from six import PY3, iteritems
-from six.moves import range
import msgpack
from unpaddedbase64 import decode_base64, encode_base64
from twisted.internet import defer
+from twisted.internet.defer import maybeDeferred
from synapse.api.constants import EventTypes, JoinRules
from synapse.api.errors import Codes, HttpResponseException
from synapse.types import ThirdPartyInstanceID
-from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.descriptors import cachedInlineCallbacks
from synapse.util.caches.response_cache import ResponseCache
@@ -37,7 +36,6 @@ logger = logging.getLogger(__name__)
REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
-
# This is used to indicate we should only return rooms published to the main list.
EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
@@ -72,6 +70,8 @@ class RoomListHandler(BaseHandler):
This can be (None, None) to indicate the main list, or a particular
appservice and network id to use an appservice specific one.
Setting to None returns all public rooms across all lists.
+ from_federation (bool): true iff the request comes from the federation
+ API
"""
if not self.enable_room_list_search:
return defer.succeed({"chunk": [], "total_room_count_estimate": 0})
@@ -133,200 +133,117 @@ class RoomListHandler(BaseHandler):
from_federation (bool): Whether this request originated from a
federating server or a client. Used for room filtering.
timeout (int|None): Amount of seconds to wait for a response before
- timing out.
+ timing out. TODO
"""
- if since_token and since_token != "END":
- since_token = RoomListNextBatch.from_token(since_token)
+ pagination_token = None
+ if since_token and since_token != "END": # todo ought we support END and START?
+ if since_token[0] in ("+", "-"):
+ forwards = since_token[0] == "+"
+ pagination_token = since_token[1:]
+ else:
+ raise SyntaxError("shrug ") # TODO
else:
- since_token = None
+ forwards = True
- rooms_to_order_value = {}
- rooms_to_num_joined = {}
-
- newly_visible = []
- newly_unpublished = []
- if since_token:
- stream_token = since_token.stream_ordering
- current_public_id = yield self.store.get_current_public_room_stream_id()
- public_room_stream_id = since_token.public_room_stream_id
- newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
- public_room_stream_id, current_public_id, network_tuple=network_tuple
- )
- else:
- stream_token = yield self.store.get_room_max_stream_ordering()
- public_room_stream_id = yield self.store.get_current_public_room_stream_id()
+ # we request one more than wanted to see if there are more pages to come
+ probing_limit = limit + 1 if limit is not None else None
- room_ids = yield self.store.get_public_room_ids_at_stream_id(
- public_room_stream_id, network_tuple=network_tuple
+ results = yield self.store.get_largest_public_rooms(
+ network_tuple, search_filter, probing_limit, pagination_token, forwards
)
- # We want to return rooms in a particular order: the number of joined
- # users. We then arbitrarily use the room_id as a tie breaker.
-
- @defer.inlineCallbacks
- def get_order_for_room(room_id):
- # Most of the rooms won't have changed between the since token and
- # now (especially if the since token is "now"). So, we can ask what
- # the current users are in a room (that will hit a cache) and then
- # check if the room has changed since the since token. (We have to
- # do it in that order to avoid races).
- # If things have changed then fall back to getting the current state
- # at the since token.
- joined_users = yield self.store.get_users_in_room(room_id)
- if self.store.has_room_changed_since(room_id, stream_token):
- latest_event_ids = yield self.store.get_forward_extremeties_for_room(
- room_id, stream_token
- )
+ results = [
+ {
+ "room_id": r["room_id"],
+ "name": r["name"],
+ "topic": r["topic"],
+ "canonical_alias": r["canonical_alias"],
+ "num_joined_members": r["joined_members"],
+ "avatar_url": r["avatar"],
+ "world_readable": r["history_visibility"] == "world_readable",
+ }
+ for r in results
+ ]
- if not latest_event_ids:
- return
+ response = {}
+ num_results = len(results)
+ if num_results > 0:
+ final_room_id = results[-1]["room_id"]
+ initial_room_id = results[0]["room_id"]
+ if limit is not None:
+ more_to_come = num_results == probing_limit
+ results = results[0:limit]
+ else:
+ more_to_come = False
- joined_users = yield self.state_handler.get_current_users_in_room(
- room_id, latest_event_ids
- )
+ if not forwards or (forwards and more_to_come):
+ response["next_batch"] = "+%s" % (final_room_id,)
+
+ if since_token and (forwards or (not forwards and more_to_come)):
+ if num_results > 0:
+ response["prev_batch"] = "-%s" % (initial_room_id,)
+ else:
+ response["prev_batch"] = "-%s" % (pagination_token,)
+
+ if from_federation:
+ # only show rooms with m.federate=True or absent (default is True)
+
+ # get rooms' state
+ room_state_ids = yield defer.gatherResults(
+ [
+ maybeDeferred(self.store.get_current_state_ids, room["room_id"])
+ for room in results
+ ],
+ consumeErrors=True,
+ )
- num_joined_users = len(joined_users)
- rooms_to_num_joined[room_id] = num_joined_users
+ # get rooms' creation state events' IDs
+ room_creation_event_ids = {
+ room["room_id"]: event_ids.get((EventTypes.Create, ""))
+ for (room, event_ids) in zip(results, room_state_ids)
+ }
- if num_joined_users == 0:
- return
+ # get rooms' creation state events
+ creation_events_by_id = yield self.store.get_events(
+ room_creation_event_ids.values()
+ )
- # We want larger rooms to be first, hence negating num_joined_users
- rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+ # associate them with the room IDs
+ room_creation_events = {
+ room_id: creation_events_by_id[event_id]
+ for (room_id, event_id) in room_creation_event_ids.items()
+ }
- logger.info(
- "Getting ordering for %i rooms since %s", len(room_ids), stream_token
- )
- yield concurrently_execute(get_order_for_room, room_ids, 10)
+ # now filter out rooms with m.federate: False in their create event
+ results = [
+ room
+ for room in results
+ if room_creation_events[room["room_id"]].content.get("m.federate", True)
+ ]
- sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
- sorted_rooms = [room_id for room_id, _ in sorted_entries]
+ for room in results:
+ # populate search result entries with additional fields, namely
+ # 'aliases' and 'guest_can_join'
+ room_id = room["room_id"]
+ room["aliases"] = yield self.store.get_aliases_for_room(room_id)
- # `sorted_rooms` should now be a list of all public room ids that is
- # stable across pagination. Therefore, we can use indices into this
- # list as our pagination tokens.
+ state_ids = yield self.store.get_current_state_ids(room_id)
+ guests_can_join = False
+ guest_access_state_id = state_ids.get((EventTypes.GuestAccess, ""))
+ if guest_access_state_id is not None:
+ guest_access = yield self.store.get_event(guest_access_state_id)
+ if guest_access is not None:
+ if guest_access.content.get("guest_access") == "can_join":
+ guests_can_join = True
+ room["guest_can_join"] = guests_can_join
- # Filter out rooms that we don't want to return
- rooms_to_scan = [
- r
- for r in sorted_rooms
- if r not in newly_unpublished and rooms_to_num_joined[r] > 0
- ]
+ response["chunk"] = results
- total_room_count = len(rooms_to_scan)
+ # TODO for federation, we currently don't remove m.federate=False rooms
+ # from the total room count estimate.
+ response["total_room_count_estimate"] = yield self.store.count_public_rooms()
- if since_token:
- # Filter out rooms we've already returned previously
- # `since_token.current_limit` is the index of the last room we
- # sent down, so we exclude it and everything before/after it.
- if since_token.direction_is_forward:
- rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :]
- else:
- rooms_to_scan = rooms_to_scan[: since_token.current_limit]
- rooms_to_scan.reverse()
-
- logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan))
-
- # _append_room_entry_to_chunk will append to chunk but will stop if
- # len(chunk) > limit
- #
- # Normally we will generate enough results on the first iteration here,
- # but if there is a search filter, _append_room_entry_to_chunk may
- # filter some results out, in which case we loop again.
- #
- # We don't want to scan over the entire range either as that
- # would potentially waste a lot of work.
- #
- # XXX if there is no limit, we may end up DoSing the server with
- # calls to get_current_state_ids for every single room on the
- # server. Surely we should cap this somehow?
- #
- if limit:
- step = limit + 1
- else:
- # step cannot be zero
- step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
-
- chunk = []
- for i in range(0, len(rooms_to_scan), step):
- if timeout and self.clock.time() > timeout:
- raise Exception("Timed out searching room directory")
-
- batch = rooms_to_scan[i : i + step]
- logger.info("Processing %i rooms for result", len(batch))
- yield concurrently_execute(
- lambda r: self._append_room_entry_to_chunk(
- r,
- rooms_to_num_joined[r],
- chunk,
- limit,
- search_filter,
- from_federation=from_federation,
- ),
- batch,
- 5,
- )
- logger.info("Now %i rooms in result", len(chunk))
- if len(chunk) >= limit + 1:
- break
-
- chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
-
- # Work out the new limit of the batch for pagination, or None if we
- # know there are no more results that would be returned.
- # i.e., [since_token.current_limit..new_limit] is the batch of rooms
- # we've returned (or the reverse if we paginated backwards)
- # We tried to pull out limit + 1 rooms above, so if we have <= limit
- # then we know there are no more results to return
- new_limit = None
- if chunk and (not limit or len(chunk) > limit):
-
- if not since_token or since_token.direction_is_forward:
- if limit:
- chunk = chunk[:limit]
- last_room_id = chunk[-1]["room_id"]
- else:
- if limit:
- chunk = chunk[-limit:]
- last_room_id = chunk[0]["room_id"]
-
- new_limit = sorted_rooms.index(last_room_id)
-
- results = {"chunk": chunk, "total_room_count_estimate": total_room_count}
-
- if since_token:
- results["new_rooms"] = bool(newly_visible)
-
- if not since_token or since_token.direction_is_forward:
- if new_limit is not None:
- results["next_batch"] = RoomListNextBatch(
- stream_ordering=stream_token,
- public_room_stream_id=public_room_stream_id,
- current_limit=new_limit,
- direction_is_forward=True,
- ).to_token()
-
- if since_token:
- results["prev_batch"] = since_token.copy_and_replace(
- direction_is_forward=False,
- current_limit=since_token.current_limit + 1,
- ).to_token()
- else:
- if new_limit is not None:
- results["prev_batch"] = RoomListNextBatch(
- stream_ordering=stream_token,
- public_room_stream_id=public_room_stream_id,
- current_limit=new_limit,
- direction_is_forward=False,
- ).to_token()
-
- if since_token:
- results["next_batch"] = since_token.copy_and_replace(
- direction_is_forward=True,
- current_limit=since_token.current_limit - 1,
- ).to_token()
-
- return results
+ return response
@defer.inlineCallbacks
def _append_room_entry_to_chunk(
@@ -587,7 +504,6 @@ class RoomListNextBatch(
),
)
):
-
KEY_DICT = {
"stream_ordering": "s",
"public_room_stream_id": "p",
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index f244e8f469..b5e864fdaa 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -350,6 +350,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
limit = parse_integer(request, "limit", 0)
since_token = parse_string(request, "since", None)
+ if limit == 0:
+ # zero is a special value which corresponds to no limit.
+ limit = None
+
handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(
@@ -387,6 +391,10 @@ class PublicRoomListRestServlet(TransactionRestServlet):
else:
network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
+ if limit == 0:
+ # zero is a special value which corresponds to no limit.
+ limit = None
+
handler = self.hs.get_room_list_handler()
if server:
data = yield handler.get_remote_public_room_list(
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 08e13f3a3b..3f92fba432 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -161,6 +162,167 @@ class RoomWorkerStore(SQLBaseStore):
"get_public_room_changes", get_public_room_changes_txn
)
+ def count_public_rooms(self):
+ """
+ Counts the number of public rooms as tracked in the room_stats and room_state
+ table.
+ A public room is one who has is_public set
+ AND is publicly-joinable and/or world-readable.
+ Returns:
+ number of public rooms on this homeserver's room directory
+
+ """
+
+ def _count_public_rooms_txn(txn):
+ sql = """
+ SELECT COUNT(*)
+ FROM room_stats
+ JOIN room_state USING (room_id)
+ JOIN rooms USING (room_id)
+ WHERE
+ is_public
+ AND (
+ join_rules = 'public'
+ OR history_visibility = 'world_readable'
+ )
+ """
+ txn.execute(sql)
+ return txn.fetchone()[0]
+
+ return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
+
+ @defer.inlineCallbacks
+ def get_largest_public_rooms(
+ self, network_tuple, search_filter, limit, pagination_token, forwards
+ ):
+ """TODO doc this
+
+ Args:
+ network_tuple (ThirdPartyInstanceID|None):
+ search_filter (dict|None):
+ limit (int|None): Maxmimum number of rows to return, unlimited otherwise.
+ pagination_token (str|None): if present, a room ID which is to be
+ the (first/last) included in the results.
+ forwards (bool): true iff going forwards, going backwards otherwise
+
+ Returns:
+ Rooms in order: biggest number of joined users first.
+ We then arbitrarily use the room_id as a tie breaker.
+
+ """
+
+ # TODO probably want to use ts_… on Postgres?
+
+ sql = """
+ SELECT
+ room_id, name, topic, canonical_alias, joined_members,
+ avatar, history_visibility, joined_members
+ FROM
+ room_stats
+ JOIN room_state USING (room_id)
+ JOIN rooms USING (room_id)
+ """
+ query_args = []
+
+ if network_tuple:
+ sql += """
+ LEFT JOIN appservice_room_list arl USING (room_id)
+ """
+
+ sql += """
+ WHERE
+ is_public
+ AND (
+ join_rules = 'public'
+ OR history_visibility = 'world_readable'
+ )
+ """
+
+ if pagination_token:
+ pt_joined = yield self._simple_select_one_onecol(
+ table="room_stats",
+ keyvalues={"room_id": pagination_token},
+ retcol="joined_members",
+ desc="get_largest_public_rooms",
+ )
+
+ if forwards:
+ sql += """
+ AND (
+ (joined_members < ?)
+ OR (joined_members = ? AND room_id >= ?)
+ )
+ """
+ else:
+ sql += """
+ AND (
+ (joined_members > ?)
+ OR (joined_members = ? AND room_id <= ?)
+ )
+ """
+ query_args += [pt_joined, pt_joined, pagination_token]
+
+ if search_filter and search_filter.get("generic_search_term", None):
+ search_term = "%" + search_filter["generic_search_term"] + "%"
+ sql += """
+ AND (
+ name LIKE ?
+ OR topic LIKE ?
+ OR canonical_alias LIKE ?
+ )
+ """
+ query_args += [search_term, search_term, search_term]
+
+ if network_tuple:
+ sql += "AND ("
+ if network_tuple.appservice_id:
+ sql += "appservice_id = ? AND "
+ query_args.append(network_tuple.appservice_id)
+ else:
+ sql += "appservice_id IS NULL AND "
+
+ if network_tuple.network_id:
+ sql += "network_id = ?)"
+ query_args.append(network_tuple.network_id)
+ else:
+ sql += "network_id IS NULL)"
+
+ if forwards:
+ sql += """
+ ORDER BY
+ joined_members DESC, room_id ASC
+ """
+ else:
+ sql += """
+ ORDER BY
+ joined_members ASC, room_id DESC
+ """
+
+ if limit is not None:
+ # be cautious about SQL injection
+ assert isinstance(limit, int)
+
+ sql += """
+ LIMIT %d
+ """ % (
+ limit,
+ )
+
+ def _get_largest_public_rooms_txn(txn):
+ txn.execute(sql, query_args)
+
+ results = self.cursor_to_dict(txn)
+
+ if not forwards:
+ results.reverse()
+
+ return results
+
+ ret_val = yield self.runInteraction(
+ "get_largest_public_rooms", _get_largest_public_rooms_txn
+ )
+ defer.returnValue(ret_val)
+
@cached(max_entries=10000)
def is_room_blocked(self, room_id):
return self._simple_select_one_onecol(
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index e13efed417..f659f96551 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -68,6 +68,9 @@ class StatsStore(StateDeltasStore):
yield self._end_background_update("populate_stats_createtables")
return 1
+ # TODO dev only
+ yield self.delete_all_stats()
+
# Get all the rooms that we want to process.
def _make_staging_area(txn):
# Create the temporary tables
|