diff options
Diffstat (limited to 'synapse/handlers/room_list.py')
-rw-r--r-- | synapse/handlers/room_list.py | 254 |
1 files changed, 152 insertions, 102 deletions
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 617d1c9ef8..aae696a7e8 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -46,13 +46,18 @@ class RoomListHandler(BaseHandler): super(RoomListHandler, self).__init__(hs) self.enable_room_list_search = hs.config.enable_room_list_search self.response_cache = ResponseCache(hs, "room_list") - self.remote_response_cache = ResponseCache(hs, "remote_room_list", - timeout_ms=30 * 1000) + self.remote_response_cache = ResponseCache( + hs, "remote_room_list", timeout_ms=30 * 1000 + ) - def get_local_public_room_list(self, limit=None, since_token=None, - search_filter=None, - network_tuple=EMPTY_THIRD_PARTY_ID, - from_federation=False): + def get_local_public_room_list( + self, + limit=None, + since_token=None, + search_filter=None, + network_tuple=EMPTY_THIRD_PARTY_ID, + from_federation=False, + ): """Generate a local public room list. There are multiple different lists: the main one plus one per third @@ -68,14 +73,14 @@ class RoomListHandler(BaseHandler): Setting to None returns all public rooms across all lists. """ if not self.enable_room_list_search: - return defer.succeed({ - "chunk": [], - "total_room_count_estimate": 0, - }) + return defer.succeed({"chunk": [], "total_room_count_estimate": 0}) logger.info( "Getting public room list: limit=%r, since=%r, search=%r, network=%r", - limit, since_token, bool(search_filter), network_tuple, + limit, + since_token, + bool(search_filter), + network_tuple, ) if search_filter: @@ -88,24 +93,33 @@ class RoomListHandler(BaseHandler): # solution at some point timeout = self.clock.time() + 60 return self._get_public_room_list( - limit, since_token, search_filter, - network_tuple=network_tuple, timeout=timeout, + limit, + since_token, + search_filter, + network_tuple=network_tuple, + timeout=timeout, ) key = (limit, since_token, network_tuple) return self.response_cache.wrap( key, self._get_public_room_list, - limit, since_token, - network_tuple=network_tuple, from_federation=from_federation, + limit, + since_token, + network_tuple=network_tuple, + from_federation=from_federation, ) @defer.inlineCallbacks - def _get_public_room_list(self, limit=None, since_token=None, - search_filter=None, - network_tuple=EMPTY_THIRD_PARTY_ID, - from_federation=False, - timeout=None,): + def _get_public_room_list( + self, + limit=None, + since_token=None, + search_filter=None, + network_tuple=EMPTY_THIRD_PARTY_ID, + from_federation=False, + timeout=None, + ): """Generate a public room list. Args: limit (int|None): Maximum amount of rooms to return. @@ -135,15 +149,14 @@ class RoomListHandler(BaseHandler): current_public_id = yield self.store.get_current_public_room_stream_id() public_room_stream_id = since_token.public_room_stream_id newly_visible, newly_unpublished = yield self.store.get_public_room_changes( - public_room_stream_id, current_public_id, - network_tuple=network_tuple, + public_room_stream_id, current_public_id, network_tuple=network_tuple ) else: stream_token = yield self.store.get_room_max_stream_ordering() public_room_stream_id = yield self.store.get_current_public_room_stream_id() room_ids = yield self.store.get_public_room_ids_at_stream_id( - public_room_stream_id, network_tuple=network_tuple, + public_room_stream_id, network_tuple=network_tuple ) # We want to return rooms in a particular order: the number of joined @@ -168,7 +181,7 @@ class RoomListHandler(BaseHandler): return joined_users = yield self.state_handler.get_current_users_in_room( - room_id, latest_event_ids, + room_id, latest_event_ids ) num_joined_users = len(joined_users) @@ -180,8 +193,9 @@ class RoomListHandler(BaseHandler): # We want larger rooms to be first, hence negating num_joined_users rooms_to_order_value[room_id] = (-num_joined_users, room_id) - logger.info("Getting ordering for %i rooms since %s", - len(room_ids), stream_token) + logger.info( + "Getting ordering for %i rooms since %s", len(room_ids), stream_token + ) yield concurrently_execute(get_order_for_room, room_ids, 10) sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) @@ -193,7 +207,8 @@ class RoomListHandler(BaseHandler): # Filter out rooms that we don't want to return rooms_to_scan = [ - r for r in sorted_rooms + r + for r in sorted_rooms if r not in newly_unpublished and rooms_to_num_joined[r] > 0 ] @@ -204,13 +219,12 @@ class RoomListHandler(BaseHandler): # `since_token.current_limit` is the index of the last room we # sent down, so we exclude it and everything before/after it. if since_token.direction_is_forward: - rooms_to_scan = rooms_to_scan[since_token.current_limit + 1:] + rooms_to_scan = rooms_to_scan[since_token.current_limit + 1 :] else: - rooms_to_scan = rooms_to_scan[:since_token.current_limit] + rooms_to_scan = rooms_to_scan[: since_token.current_limit] rooms_to_scan.reverse() - logger.info("After sorting and filtering, %i rooms remain", - len(rooms_to_scan)) + logger.info("After sorting and filtering, %i rooms remain", len(rooms_to_scan)) # _append_room_entry_to_chunk will append to chunk but will stop if # len(chunk) > limit @@ -237,15 +251,19 @@ class RoomListHandler(BaseHandler): if timeout and self.clock.time() > timeout: raise Exception("Timed out searching room directory") - batch = rooms_to_scan[i:i + step] + batch = rooms_to_scan[i : i + step] logger.info("Processing %i rooms for result", len(batch)) yield concurrently_execute( lambda r: self._append_room_entry_to_chunk( - r, rooms_to_num_joined[r], - chunk, limit, search_filter, + r, + rooms_to_num_joined[r], + chunk, + limit, + search_filter, from_federation=from_federation, ), - batch, 5, + batch, + 5, ) logger.info("Now %i rooms in result", len(chunk)) if len(chunk) >= limit + 1: @@ -273,10 +291,7 @@ class RoomListHandler(BaseHandler): new_limit = sorted_rooms.index(last_room_id) - results = { - "chunk": chunk, - "total_room_count_estimate": total_room_count, - } + results = {"chunk": chunk, "total_room_count_estimate": total_room_count} if since_token: results["new_rooms"] = bool(newly_visible) @@ -313,8 +328,15 @@ class RoomListHandler(BaseHandler): defer.returnValue(results) @defer.inlineCallbacks - def _append_room_entry_to_chunk(self, room_id, num_joined_users, chunk, limit, - search_filter, from_federation=False): + def _append_room_entry_to_chunk( + self, + room_id, + num_joined_users, + chunk, + limit, + search_filter, + from_federation=False, + ): """Generate the entry for a room in the public room list and append it to the `chunk` if it matches the search filter @@ -345,8 +367,14 @@ class RoomListHandler(BaseHandler): chunk.append(result) @cachedInlineCallbacks(num_args=1, cache_context=True) - def generate_room_entry(self, room_id, num_joined_users, cache_context, - with_alias=True, allow_private=False): + def generate_room_entry( + self, + room_id, + num_joined_users, + cache_context, + with_alias=True, + allow_private=False, + ): """Returns the entry for a room Args: @@ -360,33 +388,31 @@ class RoomListHandler(BaseHandler): Deferred[dict|None]: Returns a room entry as a dictionary, or None if this room was determined not to be shown publicly. """ - result = { - "room_id": room_id, - "num_joined_members": num_joined_users, - } + result = {"room_id": room_id, "num_joined_members": num_joined_users} current_state_ids = yield self.store.get_current_state_ids( - room_id, on_invalidate=cache_context.invalidate, + room_id, on_invalidate=cache_context.invalidate ) - event_map = yield self.store.get_events([ - event_id for key, event_id in iteritems(current_state_ids) - if key[0] in ( - EventTypes.Create, - EventTypes.JoinRules, - EventTypes.Name, - EventTypes.Topic, - EventTypes.CanonicalAlias, - EventTypes.RoomHistoryVisibility, - EventTypes.GuestAccess, - "m.room.avatar", - ) - ]) + event_map = yield self.store.get_events( + [ + event_id + for key, event_id in iteritems(current_state_ids) + if key[0] + in ( + EventTypes.Create, + EventTypes.JoinRules, + EventTypes.Name, + EventTypes.Topic, + EventTypes.CanonicalAlias, + EventTypes.RoomHistoryVisibility, + EventTypes.GuestAccess, + "m.room.avatar", + ) + ] + ) - current_state = { - (ev.type, ev.state_key): ev - for ev in event_map.values() - } + current_state = {(ev.type, ev.state_key): ev for ev in event_map.values()} # Double check that this is actually a public room. @@ -446,14 +472,17 @@ class RoomListHandler(BaseHandler): defer.returnValue(result) @defer.inlineCallbacks - def get_remote_public_room_list(self, server_name, limit=None, since_token=None, - search_filter=None, include_all_networks=False, - third_party_instance_id=None,): + def get_remote_public_room_list( + self, + server_name, + limit=None, + since_token=None, + search_filter=None, + include_all_networks=False, + third_party_instance_id=None, + ): if not self.enable_room_list_search: - defer.returnValue({ - "chunk": [], - "total_room_count_estimate": 0, - }) + defer.returnValue({"chunk": [], "total_room_count_estimate": 0}) if search_filter: # We currently don't support searching across federation, so we have @@ -462,52 +491,75 @@ class RoomListHandler(BaseHandler): since_token = None res = yield self._get_remote_list_cached( - server_name, limit=limit, since_token=since_token, + server_name, + limit=limit, + since_token=since_token, include_all_networks=include_all_networks, third_party_instance_id=third_party_instance_id, ) if search_filter: - res = {"chunk": [ - entry - for entry in list(res.get("chunk", [])) - if _matches_room_entry(entry, search_filter) - ]} + res = { + "chunk": [ + entry + for entry in list(res.get("chunk", [])) + if _matches_room_entry(entry, search_filter) + ] + } defer.returnValue(res) - def _get_remote_list_cached(self, server_name, limit=None, since_token=None, - search_filter=None, include_all_networks=False, - third_party_instance_id=None,): + def _get_remote_list_cached( + self, + server_name, + limit=None, + since_token=None, + search_filter=None, + include_all_networks=False, + third_party_instance_id=None, + ): repl_layer = self.hs.get_federation_client() if search_filter: # We can't cache when asking for search return repl_layer.get_public_rooms( - server_name, limit=limit, since_token=since_token, - search_filter=search_filter, include_all_networks=include_all_networks, + server_name, + limit=limit, + since_token=since_token, + search_filter=search_filter, + include_all_networks=include_all_networks, third_party_instance_id=third_party_instance_id, ) key = ( - server_name, limit, since_token, include_all_networks, + server_name, + limit, + since_token, + include_all_networks, third_party_instance_id, ) return self.remote_response_cache.wrap( key, repl_layer.get_public_rooms, - server_name, limit=limit, since_token=since_token, + server_name, + limit=limit, + since_token=since_token, search_filter=search_filter, include_all_networks=include_all_networks, third_party_instance_id=third_party_instance_id, ) -class RoomListNextBatch(namedtuple("RoomListNextBatch", ( - "stream_ordering", # stream_ordering of the first public room list - "public_room_stream_id", # public room stream id for first public room list - "current_limit", # The number of previous rooms returned - "direction_is_forward", # Bool if this is a next_batch, false if prev_batch -))): +class RoomListNextBatch( + namedtuple( + "RoomListNextBatch", + ( + "stream_ordering", # stream_ordering of the first public room list + "public_room_stream_id", # public room stream id for first public room list + "current_limit", # The number of previous rooms returned + "direction_is_forward", # Bool if this is a next_batch, false if prev_batch + ), + ) +): KEY_DICT = { "stream_ordering": "s", @@ -527,21 +579,19 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", ( decoded = msgpack.loads(decode_base64(token), raw=False) else: decoded = msgpack.loads(decode_base64(token)) - return RoomListNextBatch(**{ - cls.REVERSE_KEY_DICT[key]: val - for key, val in decoded.items() - }) + return RoomListNextBatch( + **{cls.REVERSE_KEY_DICT[key]: val for key, val in decoded.items()} + ) def to_token(self): - return encode_base64(msgpack.dumps({ - self.KEY_DICT[key]: val - for key, val in self._asdict().items() - })) + return encode_base64( + msgpack.dumps( + {self.KEY_DICT[key]: val for key, val in self._asdict().items()} + ) + ) def copy_and_replace(self, **kwds): - return self._replace( - **kwds - ) + return self._replace(**kwds) def _matches_room_entry(room_entry, search_filter): |