diff options
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/room_list.py | 280 |
1 files changed, 196 insertions, 84 deletions
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index d72e8c99f9..28bc35f8a3 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -18,13 +18,16 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.constants import ( - EventTypes, JoinRules, Membership, + EventTypes, JoinRules, ) -from synapse.api.errors import SynapseError from synapse.util.async import concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from collections import namedtuple +from unpaddedbase64 import encode_base64, decode_base64 + import logging +import msgpack logger = logging.getLogger(__name__) @@ -35,28 +38,130 @@ class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache(hs) - self.remote_list_request_cache = ResponseCache(hs) - self.remote_list_cache = {} - self.fetch_looping_call = hs.get_clock().looping_call( - self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL - ) - self.fetch_all_remote_lists() - def get_local_public_room_list(self): - result = self.response_cache.get(()) + def get_local_public_room_list(self, limit=None, since_token=None): + result = self.response_cache.get((limit, since_token)) if not result: - result = self.response_cache.set((), self._get_public_room_list()) + result = self.response_cache.set( + (limit, since_token), + self._get_public_room_list(limit, since_token) + ) return result @defer.inlineCallbacks - def _get_public_room_list(self): - room_ids = yield self.store.get_public_room_ids() + def _get_public_room_list(self, limit=None, since_token=None): + if since_token and since_token != "END": + since_token = RoomListNextBatch.from_token(since_token) + else: + since_token = None + + rooms_to_order_value = {} + rooms_to_num_joined = {} + rooms_to_latest_event_ids = {} + + newly_visible = [] + newly_unpublished = [] + if since_token: + stream_token = since_token.stream_ordering + current_public_id = yield self.store.get_current_public_room_stream_id() + public_room_stream_id = since_token.public_room_stream_id + newly_visible, newly_unpublished = yield self.store.get_public_room_changes( + public_room_stream_id, current_public_id + ) + else: + stream_token = yield self.store.get_room_max_stream_ordering() + public_room_stream_id = yield self.store.get_current_public_room_stream_id() + + room_ids = yield self.store.get_public_room_ids_at_stream_id( + public_room_stream_id + ) + + # We want to return rooms in a particular order: the number of joined + # users. We then arbitrarily use the room_id as a tie breaker. + + @defer.inlineCallbacks + def get_order_for_room(room_id): + latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) + if not latest_event_ids: + latest_event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_token + ) + rooms_to_latest_event_ids[room_id] = latest_event_ids + + if not latest_event_ids: + return + + joined_users = yield self.state_handler.get_current_user_in_room( + room_id, latest_event_ids, + ) + num_joined_users = len(joined_users) + rooms_to_num_joined[room_id] = num_joined_users + + if num_joined_users == 0: + return + + # We want larger rooms to be first, hence negating num_joined_users + rooms_to_order_value[room_id] = (-num_joined_users, room_id) + + yield concurrently_execute(get_order_for_room, room_ids, 10) + + sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) + sorted_rooms = [room_id for room_id, _ in sorted_entries] + + if since_token: + if since_token.direction_is_forward: + sorted_rooms = sorted_rooms[since_token.current_limit:] + else: + sorted_rooms = sorted_rooms[:since_token.current_limit] + sorted_rooms.reverse() - results = [] + new_limit = None + if limit: + if sorted_rooms[limit:]: + new_limit = limit + if since_token: + if since_token.direction_is_forward: + new_limit += since_token.current_limit + else: + new_limit = since_token.current_limit - new_limit + new_limit = max(0, new_limit) + sorted_rooms = sorted_rooms[:limit] + + chunk = [] @defer.inlineCallbacks def handle_room(room_id): - current_state = yield self.state_handler.get_current_state(room_id) + num_joined_users = rooms_to_num_joined[room_id] + if num_joined_users == 0: + return + + if room_id in newly_unpublished: + return + + result = { + "room_id": room_id, + "num_joined_members": num_joined_users, + } + + current_state_ids = yield self.state_handler.get_current_state_ids(room_id) + + event_map = yield self.store.get_events([ + event_id for key, event_id in current_state_ids.items() + if key[0] in ( + EventTypes.JoinRules, + EventTypes.Name, + EventTypes.Topic, + EventTypes.CanonicalAlias, + EventTypes.RoomHistoryVisibility, + EventTypes.GuestAccess, + "m.room.avatar", + ) + ]) + + current_state = { + (ev.type, ev.state_key): ev + for ev in event_map.values() + } # Double check that this is actually a public room. join_rules_event = current_state.get((EventTypes.JoinRules, "")) @@ -65,18 +170,6 @@ class RoomListHandler(BaseHandler): if join_rule and join_rule != JoinRules.PUBLIC: defer.returnValue(None) - result = {"room_id": room_id} - - num_joined_users = len([ - 1 for _, event in current_state.items() - if event.type == EventTypes.Member - and event.membership == Membership.JOIN - ]) - if num_joined_users == 0: - return - - result["num_joined_members"] = num_joined_users - aliases = yield self.store.get_aliases_for_room(room_id) if aliases: result["aliases"] = aliases @@ -117,68 +210,87 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - results.append(result) + chunk.append(result) - yield concurrently_execute(handle_room, room_ids, 10) + yield concurrently_execute(handle_room, sorted_rooms, 10) - # FIXME (erikj): START is no longer a valid value - defer.returnValue({"start": "START", "end": "END", "chunk": results}) + chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) - @defer.inlineCallbacks - def fetch_all_remote_lists(self): - deferred = self.hs.get_replication_layer().get_public_rooms( - self.hs.config.secondary_directory_servers - ) - self.remote_list_request_cache.set((), deferred) - self.remote_list_cache = yield deferred + results = { + "chunk": chunk, + } + + if since_token: + results["new_rooms"] = bool(newly_visible) + + if not since_token or since_token.direction_is_forward: + if new_limit: + results["next_batch"] = RoomListNextBatch( + stream_ordering=stream_token, + public_room_stream_id=public_room_stream_id, + current_limit=new_limit, + direction_is_forward=True, + ).to_token() + + if since_token: + results["prev_batch"] = since_token.copy_and_replace( + direction_is_forward=False, + ).to_token() + else: + if new_limit: + results["prev_batch"] = RoomListNextBatch( + stream_ordering=stream_token, + public_room_stream_id=public_room_stream_id, + current_limit=new_limit, + direction_is_forward=False, + ).to_token() + + if since_token: + results["next_batch"] = since_token.copy_and_replace( + direction_is_forward=True, + ).to_token() + + defer.returnValue(results) @defer.inlineCallbacks - def get_remote_public_room_list(self, server_name): + def get_remote_public_room_list(self, server_name, limit=None, since_token=None): res = yield self.hs.get_replication_layer().get_public_rooms( - [server_name] + server_name, limit=limit, since_token=since_token, ) - if server_name not in res: - raise SynapseError(404, "Server not found") - defer.returnValue(res[server_name]) - - @defer.inlineCallbacks - def get_aggregated_public_room_list(self): - """ - Get the public room list from this server and the servers - specified in the secondary_directory_servers config option. - XXX: Pagination... - """ - # We return the results from out cache which is updated by a looping call, - # unless we're missing a cache entry, in which case wait for the result - # of the fetch if there's one in progress. If not, omit that server. - wait = False - for s in self.hs.config.secondary_directory_servers: - if s not in self.remote_list_cache: - logger.warn("No cached room list from %s: waiting for fetch", s) - wait = True - break - - if wait and self.remote_list_request_cache.get(()): - yield self.remote_list_request_cache.get(()) - - public_rooms = yield self.get_local_public_room_list() - - # keep track of which room IDs we've seen so we can de-dup - room_ids = set() - - # tag all the ones in our list with our server name. - # Also add the them to the de-deping set - for room in public_rooms['chunk']: - room["server_name"] = self.hs.hostname - room_ids.add(room["room_id"]) - - # Now add the results from federation - for server_name, server_result in self.remote_list_cache.items(): - for room in server_result["chunk"]: - if room["room_id"] not in room_ids: - room["server_name"] = server_name - public_rooms["chunk"].append(room) - room_ids.add(room["room_id"]) - - defer.returnValue(public_rooms) + defer.returnValue(res) + + +class RoomListNextBatch(namedtuple("RoomListNextBatch", ( + "stream_ordering", # stream_ordering of the first public room list + "public_room_stream_id", # public room stream id for first public room list + "current_limit", # The number of previous rooms returned + "direction_is_forward", # Bool if this is a next_batch, false if prev_batch +))): + + KEY_DICT = { + "stream_ordering": "s", + "public_room_stream_id": "p", + "current_limit": "n", + "direction_is_forward": "d", + } + + REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()} + + @classmethod + def from_token(cls, token): + return RoomListNextBatch(**{ + cls.REVERSE_KEY_DICT[key]: val + for key, val in msgpack.loads(decode_base64(token)).items() + }) + + def to_token(self): + return encode_base64(msgpack.dumps({ + self.KEY_DICT[key]: val + for key, val in self._asdict().items() + })) + + def copy_and_replace(self, **kwds): + return self._replace( + **kwds + ) |