summary refs log tree commit diff
path: root/synapse/handlers/room_list.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/room_list.py')
-rw-r--r--synapse/handlers/room_list.py142
1 files changed, 75 insertions, 67 deletions
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 516cd9a6ac..828229f5c3 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -13,23 +13,24 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
+from collections import namedtuple
 
-from ._base import BaseHandler
+from six import iteritems
+from six.moves import range
+
+import msgpack
+from unpaddedbase64 import decode_base64, encode_base64
+
+from twisted.internet import defer
 
-from synapse.api.constants import (
-    EventTypes, JoinRules,
-)
+from synapse.api.constants import EventTypes, JoinRules
+from synapse.types import ThirdPartyInstanceID
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.types import ThirdPartyInstanceID
-
-from collections import namedtuple
-from unpaddedbase64 import encode_base64, decode_base64
 
-import logging
-import msgpack
+from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
 
@@ -37,18 +38,19 @@ REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000
 
 
 # This is used to indicate we should only return rooms published to the main list.
-EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
+EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
 
 
 class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
-        self.response_cache = ResponseCache(hs)
-        self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
+        self.response_cache = ResponseCache(hs, "room_list")
+        self.remote_response_cache = ResponseCache(hs, "remote_room_list",
+                                                   timeout_ms=30 * 1000)
 
     def get_local_public_room_list(self, limit=None, since_token=None,
                                    search_filter=None,
-                                   network_tuple=EMTPY_THIRD_PARTY_ID,):
+                                   network_tuple=EMPTY_THIRD_PARTY_ID,):
         """Generate a local public room list.
 
         There are multiple different lists: the main one plus one per third
@@ -70,25 +72,22 @@ class RoomListHandler(BaseHandler):
         if search_filter:
             # We explicitly don't bother caching searches or requests for
             # appservice specific lists.
+            logger.info("Bypassing cache as search request.")
             return self._get_public_room_list(
                 limit, since_token, search_filter, network_tuple=network_tuple,
             )
 
         key = (limit, since_token, network_tuple)
-        result = self.response_cache.get(key)
-        if not result:
-            result = self.response_cache.set(
-                key,
-                self._get_public_room_list(
-                    limit, since_token, network_tuple=network_tuple
-                )
-            )
-        return result
+        return self.response_cache.wrap(
+            key,
+            self._get_public_room_list,
+            limit, since_token, network_tuple=network_tuple,
+        )
 
     @defer.inlineCallbacks
     def _get_public_room_list(self, limit=None, since_token=None,
                               search_filter=None,
-                              network_tuple=EMTPY_THIRD_PARTY_ID,):
+                              network_tuple=EMPTY_THIRD_PARTY_ID,):
         if since_token and since_token != "END":
             since_token = RoomListNextBatch.from_token(since_token)
         else:
@@ -149,6 +148,8 @@ class RoomListHandler(BaseHandler):
             # We want larger rooms to be first, hence negating num_joined_users
             rooms_to_order_value[room_id] = (-num_joined_users, room_id)
 
+        logger.info("Getting ordering for %i rooms since %s",
+                    len(room_ids), stream_token)
         yield concurrently_execute(get_order_for_room, room_ids, 10)
 
         sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
@@ -176,34 +177,43 @@ class RoomListHandler(BaseHandler):
                 rooms_to_scan = rooms_to_scan[:since_token.current_limit]
                 rooms_to_scan.reverse()
 
-        # Actually generate the entries. _append_room_entry_to_chunk will append to
-        # chunk but will stop if len(chunk) > limit
-        chunk = []
-        if limit and not search_filter:
+        logger.info("After sorting and filtering, %i rooms remain",
+                    len(rooms_to_scan))
+
+        # _append_room_entry_to_chunk will append to chunk but will stop if
+        # len(chunk) > limit
+        #
+        # Normally we will generate enough results on the first iteration here,
+        #  but if there is a search filter, _append_room_entry_to_chunk may
+        # filter some results out, in which case we loop again.
+        #
+        # We don't want to scan over the entire range either as that
+        # would potentially waste a lot of work.
+        #
+        # XXX if there is no limit, we may end up DoSing the server with
+        # calls to get_current_state_ids for every single room on the
+        # server. Surely we should cap this somehow?
+        #
+        if limit:
             step = limit + 1
-            for i in xrange(0, len(rooms_to_scan), step):
-                # We iterate here because the vast majority of cases we'll stop
-                # at first iteration, but occaisonally _append_room_entry_to_chunk
-                # won't append to the chunk and so we need to loop again.
-                # We don't want to scan over the entire range either as that
-                # would potentially waste a lot of work.
-                yield concurrently_execute(
-                    lambda r: self._append_room_entry_to_chunk(
-                        r, rooms_to_num_joined[r],
-                        chunk, limit, search_filter
-                    ),
-                    rooms_to_scan[i:i + step], 10
-                )
-                if len(chunk) >= limit + 1:
-                    break
         else:
+            # step cannot be zero
+            step = len(rooms_to_scan) if len(rooms_to_scan) != 0 else 1
+
+        chunk = []
+        for i in range(0, len(rooms_to_scan), step):
+            batch = rooms_to_scan[i:i + step]
+            logger.info("Processing %i rooms for result", len(batch))
             yield concurrently_execute(
                 lambda r: self._append_room_entry_to_chunk(
                     r, rooms_to_num_joined[r],
                     chunk, limit, search_filter
                 ),
-                rooms_to_scan, 5
+                batch, 5,
             )
+            logger.info("Now %i rooms in result", len(chunk))
+            if len(chunk) >= limit + 1:
+                break
 
         chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
 
@@ -276,13 +286,14 @@ class RoomListHandler(BaseHandler):
             # We've already got enough, so lets just drop it.
             return
 
-        result = yield self._generate_room_entry(room_id, num_joined_users)
+        result = yield self.generate_room_entry(room_id, num_joined_users)
 
         if result and _matches_room_entry(result, search_filter):
             chunk.append(result)
 
     @cachedInlineCallbacks(num_args=1, cache_context=True)
-    def _generate_room_entry(self, room_id, num_joined_users, cache_context):
+    def generate_room_entry(self, room_id, num_joined_users, cache_context,
+                            with_alias=True, allow_private=False):
         """Returns the entry for a room
         """
         result = {
@@ -295,7 +306,7 @@ class RoomListHandler(BaseHandler):
         )
 
         event_map = yield self.store.get_events([
-            event_id for key, event_id in current_state_ids.iteritems()
+            event_id for key, event_id in iteritems(current_state_ids)
             if key[0] in (
                 EventTypes.JoinRules,
                 EventTypes.Name,
@@ -316,14 +327,15 @@ class RoomListHandler(BaseHandler):
         join_rules_event = current_state.get((EventTypes.JoinRules, ""))
         if join_rules_event:
             join_rule = join_rules_event.content.get("join_rule", None)
-            if join_rule and join_rule != JoinRules.PUBLIC:
+            if not allow_private and join_rule and join_rule != JoinRules.PUBLIC:
                 defer.returnValue(None)
 
-        aliases = yield self.store.get_aliases_for_room(
-            room_id, on_invalidate=cache_context.invalidate
-        )
-        if aliases:
-            result["aliases"] = aliases
+        if with_alias:
+            aliases = yield self.store.get_aliases_for_room(
+                room_id, on_invalidate=cache_context.invalidate
+            )
+            if aliases:
+                result["aliases"] = aliases
 
         name_event = yield current_state.get((EventTypes.Name, ""))
         if name_event:
@@ -391,7 +403,7 @@ class RoomListHandler(BaseHandler):
     def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
                                 search_filter=None, include_all_networks=False,
                                 third_party_instance_id=None,):
-        repl_layer = self.hs.get_replication_layer()
+        repl_layer = self.hs.get_federation_client()
         if search_filter:
             # We can't cache when asking for search
             return repl_layer.get_public_rooms(
@@ -404,18 +416,14 @@ class RoomListHandler(BaseHandler):
             server_name, limit, since_token, include_all_networks,
             third_party_instance_id,
         )
-        result = self.remote_response_cache.get(key)
-        if not result:
-            result = self.remote_response_cache.set(
-                key,
-                repl_layer.get_public_rooms(
-                    server_name, limit=limit, since_token=since_token,
-                    search_filter=search_filter,
-                    include_all_networks=include_all_networks,
-                    third_party_instance_id=third_party_instance_id,
-                )
-            )
-        return result
+        return self.remote_response_cache.wrap(
+            key,
+            repl_layer.get_public_rooms,
+            server_name, limit=limit, since_token=since_token,
+            search_filter=search_filter,
+            include_all_networks=include_all_networks,
+            third_party_instance_id=third_party_instance_id,
+        )
 
 
 class RoomListNextBatch(namedtuple("RoomListNextBatch", (