diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index d72e8c99f9..28bc35f8a3 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -18,13 +18,16 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.constants import (
- EventTypes, JoinRules, Membership,
+ EventTypes, JoinRules,
)
-from synapse.api.errors import SynapseError
from synapse.util.async import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
+from collections import namedtuple
+from unpaddedbase64 import encode_base64, decode_base64
+
import logging
+import msgpack
logger = logging.getLogger(__name__)
@@ -35,28 +38,130 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs)
- self.remote_list_request_cache = ResponseCache(hs)
- self.remote_list_cache = {}
- self.fetch_looping_call = hs.get_clock().looping_call(
- self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
- )
- self.fetch_all_remote_lists()
- def get_local_public_room_list(self):
- result = self.response_cache.get(())
+ def get_local_public_room_list(self, limit=None, since_token=None):
+ result = self.response_cache.get((limit, since_token))
if not result:
- result = self.response_cache.set((), self._get_public_room_list())
+ result = self.response_cache.set(
+ (limit, since_token),
+ self._get_public_room_list(limit, since_token)
+ )
return result
@defer.inlineCallbacks
- def _get_public_room_list(self):
- room_ids = yield self.store.get_public_room_ids()
+ def _get_public_room_list(self, limit=None, since_token=None):
+ if since_token and since_token != "END":
+ since_token = RoomListNextBatch.from_token(since_token)
+ else:
+ since_token = None
+
+ rooms_to_order_value = {}
+ rooms_to_num_joined = {}
+ rooms_to_latest_event_ids = {}
+
+ newly_visible = []
+ newly_unpublished = []
+ if since_token:
+ stream_token = since_token.stream_ordering
+ current_public_id = yield self.store.get_current_public_room_stream_id()
+ public_room_stream_id = since_token.public_room_stream_id
+ newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
+ public_room_stream_id, current_public_id
+ )
+ else:
+ stream_token = yield self.store.get_room_max_stream_ordering()
+ public_room_stream_id = yield self.store.get_current_public_room_stream_id()
+
+ room_ids = yield self.store.get_public_room_ids_at_stream_id(
+ public_room_stream_id
+ )
+
+ # We want to return rooms in a particular order: the number of joined
+ # users. We then arbitrarily use the room_id as a tie breaker.
+
+ @defer.inlineCallbacks
+ def get_order_for_room(room_id):
+ latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
+ if not latest_event_ids:
+ latest_event_ids = yield self.store.get_forward_extremeties_for_room(
+ room_id, stream_token
+ )
+ rooms_to_latest_event_ids[room_id] = latest_event_ids
+
+ if not latest_event_ids:
+ return
+
+ joined_users = yield self.state_handler.get_current_user_in_room(
+ room_id, latest_event_ids,
+ )
+ num_joined_users = len(joined_users)
+ rooms_to_num_joined[room_id] = num_joined_users
+
+ if num_joined_users == 0:
+ return
+
+ # We want larger rooms to be first, hence negating num_joined_users
+ rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+
+ yield concurrently_execute(get_order_for_room, room_ids, 10)
+
+ sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
+ sorted_rooms = [room_id for room_id, _ in sorted_entries]
+
+ if since_token:
+ if since_token.direction_is_forward:
+ sorted_rooms = sorted_rooms[since_token.current_limit:]
+ else:
+ sorted_rooms = sorted_rooms[:since_token.current_limit]
+ sorted_rooms.reverse()
- results = []
+ new_limit = None
+ if limit:
+ if sorted_rooms[limit:]:
+ new_limit = limit
+ if since_token:
+ if since_token.direction_is_forward:
+ new_limit += since_token.current_limit
+ else:
+ new_limit = since_token.current_limit - new_limit
+ new_limit = max(0, new_limit)
+ sorted_rooms = sorted_rooms[:limit]
+
+ chunk = []
@defer.inlineCallbacks
def handle_room(room_id):
- current_state = yield self.state_handler.get_current_state(room_id)
+ num_joined_users = rooms_to_num_joined[room_id]
+ if num_joined_users == 0:
+ return
+
+ if room_id in newly_unpublished:
+ return
+
+ result = {
+ "room_id": room_id,
+ "num_joined_members": num_joined_users,
+ }
+
+ current_state_ids = yield self.state_handler.get_current_state_ids(room_id)
+
+ event_map = yield self.store.get_events([
+ event_id for key, event_id in current_state_ids.items()
+ if key[0] in (
+ EventTypes.JoinRules,
+ EventTypes.Name,
+ EventTypes.Topic,
+ EventTypes.CanonicalAlias,
+ EventTypes.RoomHistoryVisibility,
+ EventTypes.GuestAccess,
+ "m.room.avatar",
+ )
+ ])
+
+ current_state = {
+ (ev.type, ev.state_key): ev
+ for ev in event_map.values()
+ }
# Double check that this is actually a public room.
join_rules_event = current_state.get((EventTypes.JoinRules, ""))
@@ -65,18 +170,6 @@ class RoomListHandler(BaseHandler):
if join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None)
- result = {"room_id": room_id}
-
- num_joined_users = len([
- 1 for _, event in current_state.items()
- if event.type == EventTypes.Member
- and event.membership == Membership.JOIN
- ])
- if num_joined_users == 0:
- return
-
- result["num_joined_members"] = num_joined_users
-
aliases = yield self.store.get_aliases_for_room(room_id)
if aliases:
result["aliases"] = aliases
@@ -117,68 +210,87 @@ class RoomListHandler(BaseHandler):
if avatar_url:
result["avatar_url"] = avatar_url
- results.append(result)
+ chunk.append(result)
- yield concurrently_execute(handle_room, room_ids, 10)
+ yield concurrently_execute(handle_room, sorted_rooms, 10)
- # FIXME (erikj): START is no longer a valid value
- defer.returnValue({"start": "START", "end": "END", "chunk": results})
+ chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
- @defer.inlineCallbacks
- def fetch_all_remote_lists(self):
- deferred = self.hs.get_replication_layer().get_public_rooms(
- self.hs.config.secondary_directory_servers
- )
- self.remote_list_request_cache.set((), deferred)
- self.remote_list_cache = yield deferred
+ results = {
+ "chunk": chunk,
+ }
+
+ if since_token:
+ results["new_rooms"] = bool(newly_visible)
+
+ if not since_token or since_token.direction_is_forward:
+ if new_limit:
+ results["next_batch"] = RoomListNextBatch(
+ stream_ordering=stream_token,
+ public_room_stream_id=public_room_stream_id,
+ current_limit=new_limit,
+ direction_is_forward=True,
+ ).to_token()
+
+ if since_token:
+ results["prev_batch"] = since_token.copy_and_replace(
+ direction_is_forward=False,
+ ).to_token()
+ else:
+ if new_limit:
+ results["prev_batch"] = RoomListNextBatch(
+ stream_ordering=stream_token,
+ public_room_stream_id=public_room_stream_id,
+ current_limit=new_limit,
+ direction_is_forward=False,
+ ).to_token()
+
+ if since_token:
+ results["next_batch"] = since_token.copy_and_replace(
+ direction_is_forward=True,
+ ).to_token()
+
+ defer.returnValue(results)
@defer.inlineCallbacks
- def get_remote_public_room_list(self, server_name):
+ def get_remote_public_room_list(self, server_name, limit=None, since_token=None):
res = yield self.hs.get_replication_layer().get_public_rooms(
- [server_name]
+ server_name, limit=limit, since_token=since_token,
)
- if server_name not in res:
- raise SynapseError(404, "Server not found")
- defer.returnValue(res[server_name])
-
- @defer.inlineCallbacks
- def get_aggregated_public_room_list(self):
- """
- Get the public room list from this server and the servers
- specified in the secondary_directory_servers config option.
- XXX: Pagination...
- """
- # We return the results from out cache which is updated by a looping call,
- # unless we're missing a cache entry, in which case wait for the result
- # of the fetch if there's one in progress. If not, omit that server.
- wait = False
- for s in self.hs.config.secondary_directory_servers:
- if s not in self.remote_list_cache:
- logger.warn("No cached room list from %s: waiting for fetch", s)
- wait = True
- break
-
- if wait and self.remote_list_request_cache.get(()):
- yield self.remote_list_request_cache.get(())
-
- public_rooms = yield self.get_local_public_room_list()
-
- # keep track of which room IDs we've seen so we can de-dup
- room_ids = set()
-
- # tag all the ones in our list with our server name.
- # Also add the them to the de-deping set
- for room in public_rooms['chunk']:
- room["server_name"] = self.hs.hostname
- room_ids.add(room["room_id"])
-
- # Now add the results from federation
- for server_name, server_result in self.remote_list_cache.items():
- for room in server_result["chunk"]:
- if room["room_id"] not in room_ids:
- room["server_name"] = server_name
- public_rooms["chunk"].append(room)
- room_ids.add(room["room_id"])
-
- defer.returnValue(public_rooms)
+ defer.returnValue(res)
+
+
+class RoomListNextBatch(namedtuple("RoomListNextBatch", (
+ "stream_ordering", # stream_ordering of the first public room list
+ "public_room_stream_id", # public room stream id for first public room list
+ "current_limit", # The number of previous rooms returned
+ "direction_is_forward", # Bool if this is a next_batch, false if prev_batch
+))):
+
+ KEY_DICT = {
+ "stream_ordering": "s",
+ "public_room_stream_id": "p",
+ "current_limit": "n",
+ "direction_is_forward": "d",
+ }
+
+ REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()}
+
+ @classmethod
+ def from_token(cls, token):
+ return RoomListNextBatch(**{
+ cls.REVERSE_KEY_DICT[key]: val
+ for key, val in msgpack.loads(decode_base64(token)).items()
+ })
+
+ def to_token(self):
+ return encode_base64(msgpack.dumps({
+ self.KEY_DICT[key]: val
+ for key, val in self._asdict().items()
+ }))
+
+ def copy_and_replace(self, **kwds):
+ return self._replace(
+ **kwds
+ )
|