diff options
Diffstat (limited to 'synapse/handlers/room_list.py')
-rw-r--r-- | synapse/handlers/room_list.py | 142 |
1 files changed, 75 insertions, 67 deletions
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 516cd9a6ac..828229f5c3 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -13,23 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +import logging +from collections import namedtuple -from ._base import BaseHandler +from six import iteritems +from six.moves import range + +import msgpack +from unpaddedbase64 import decode_base64, encode_base64 + +from twisted.internet import defer -from synapse.api.constants import ( - EventTypes, JoinRules, -) +from synapse.api.constants import EventTypes, JoinRules +from synapse.types import ThirdPartyInstanceID from synapse.util.async import concurrently_execute from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.util.caches.response_cache import ResponseCache -from synapse.types import ThirdPartyInstanceID - -from collections import namedtuple -from unpaddedbase64 import encode_base64, decode_base64 -import logging -import msgpack +from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -37,18 +38,19 @@ REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 # This is used to indicate we should only return rooms published to the main list. -EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) +EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) - self.response_cache = ResponseCache(hs) - self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) + self.response_cache = ResponseCache(hs, "room_list") + self.remote_response_cache = ResponseCache(hs, "remote_room_list", + timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, search_filter=None, - network_tuple=EMTPY_THIRD_PARTY_ID,): + network_tuple=EMPTY_THIRD_PARTY_ID,): """Generate a local public room list. There are multiple different lists: the main one plus one per third @@ -70,25 +72,22 @@ class RoomListHandler(BaseHandler): if search_filter: # We explicitly don't bother caching searches or requests for # appservice specific lists. + logger.info("Bypassing cache as search request.") return self._get_public_room_list( limit, since_token, search_filter, network_tuple=network_tuple, ) key = (limit, since_token, network_tuple) - result = self.response_cache.get(key) - if not result: - result = self.response_cache.set( - key, - self._get_public_room_list( - limit, since_token, network_tuple=network_tuple - ) - ) - return result + return self.response_cache.wrap( + key, + self._get_public_room_list, + limit, since_token, network_tuple=network_tuple, + ) @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, search_filter=None, - network_tuple=EMTPY_THIRD_PARTY_ID,): + network_tuple=EMPTY_THIRD_PARTY_ID,): if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: @@ -149,6 +148,8 @@ class RoomListHandler(BaseHandler): # We want larger rooms to be first, hence negating num_joined_users rooms_to_order_value[room_id] = (-num_joined_users, room_id) + logger.info("Getting ordering for %i rooms since %s", + len(room_ids), stream_token) yield concurrently_execute(get_order_for_room, room_ids, 10) sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) @@ -176,34 +177,43 @@ class RoomListHandler(BaseHandler): rooms_to_scan = rooms_to_scan[:since_token.current_limit] rooms_to_scan.reverse() - # Actually generate the entries. _append_room_entry_to_chunk will append to - # chunk but will stop if len(chunk) > limit - chunk = [] - if limit and not search_filter: + logger.info("After sorting and filtering, %i rooms remain", + len(rooms_to_scan)) + + # _append_room_entry_to_chunk will append to chunk but will stop if + # len(chunk) > limit + # + # Normally we will generate enough results on the first iteration here, + # but if there is a search filter, _append_room_entry_to_chunk may + # filter some results out, in which case we loop again. + # + # We don't want to scan over the entire range either as that + # would potentially waste a lot of work. + # + # XXX if there is no limit, we may end up DoSing the server with + # calls to get_current_state_ids for every single room on the + # server. Surely we should cap this somehow? + # + if limit: step = limit + 1 - for i in xrange(0, len(rooms_to_scan), step): - # We iterate here because the vast majority of cases we'll stop - # at first iteration, but occaisonally _append_room_entry_to_chunk - # won't append to the chunk and so we need to loop again. - # We don't want to scan over the entire range either as that - # would potentially waste a lot of work. - yield concurrently_execute( - lambda r: self._append_room_entry_to_chunk( - r, rooms_to_num_joined[r], - chunk, limit, search_filter - ), - rooms_to_scan[i:i + step], 10 - ) - if len(chunk) >= limit + 1: - break else: + # step cannot be zero + step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1 + + chunk = [] + for i in range(0, len(rooms_to_scan), step): + batch = rooms_to_scan[i:i + step] + logger.info("Processing %i rooms for result", len(batch)) yield concurrently_execute( lambda r: self._append_room_entry_to_chunk( r, rooms_to_num_joined[r], chunk, limit, search_filter ), - rooms_to_scan, 5 + batch, 5, ) + logger.info("Now %i rooms in result", len(chunk)) + if len(chunk) >= limit + 1: + break chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) @@ -276,13 +286,14 @@ class RoomListHandler(BaseHandler): # We've already got enough, so lets just drop it. return - result = yield self._generate_room_entry(room_id, num_joined_users) + result = yield self.generate_room_entry(room_id, num_joined_users) if result and _matches_room_entry(result, search_filter): chunk.append(result) @cachedInlineCallbacks(num_args=1, cache_context=True) - def _generate_room_entry(self, room_id, num_joined_users, cache_context): + def generate_room_entry(self, room_id, num_joined_users, cache_context, + with_alias=True, allow_private=False): """Returns the entry for a room """ result = { @@ -295,7 +306,7 @@ class RoomListHandler(BaseHandler): ) event_map = yield self.store.get_events([ - event_id for key, event_id in current_state_ids.iteritems() + event_id for key, event_id in iteritems(current_state_ids) if key[0] in ( EventTypes.JoinRules, EventTypes.Name, @@ -316,14 +327,15 @@ class RoomListHandler(BaseHandler): join_rules_event = current_state.get((EventTypes.JoinRules, "")) if join_rules_event: join_rule = join_rules_event.content.get("join_rule", None) - if join_rule and join_rule != JoinRules.PUBLIC: + if not allow_private and join_rule and join_rule != JoinRules.PUBLIC: defer.returnValue(None) - aliases = yield self.store.get_aliases_for_room( - room_id, on_invalidate=cache_context.invalidate - ) - if aliases: - result["aliases"] = aliases + if with_alias: + aliases = yield self.store.get_aliases_for_room( + room_id, on_invalidate=cache_context.invalidate + ) + if aliases: + result["aliases"] = aliases name_event = yield current_state.get((EventTypes.Name, "")) if name_event: @@ -391,7 +403,7 @@ class RoomListHandler(BaseHandler): def _get_remote_list_cached(self, server_name, limit=None, since_token=None, search_filter=None, include_all_networks=False, third_party_instance_id=None,): - repl_layer = self.hs.get_replication_layer() + repl_layer = self.hs.get_federation_client() if search_filter: # We can't cache when asking for search return repl_layer.get_public_rooms( @@ -404,18 +416,14 @@ class RoomListHandler(BaseHandler): server_name, limit, since_token, include_all_networks, third_party_instance_id, ) - result = self.remote_response_cache.get(key) - if not result: - result = self.remote_response_cache.set( - key, - repl_layer.get_public_rooms( - server_name, limit=limit, since_token=since_token, - search_filter=search_filter, - include_all_networks=include_all_networks, - third_party_instance_id=third_party_instance_id, - ) - ) - return result + return self.remote_response_cache.wrap( + key, + repl_layer.get_public_rooms, + server_name, limit=limit, since_token=since_token, + search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, + ) class RoomListNextBatch(namedtuple("RoomListNextBatch", ( |