summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-09-15 13:27:09 +0100
committerGitHub <noreply@github.com>2016-09-15 13:27:09 +0100
commite457034e99d46d32e3d71eddb12cda07d8a0370e (patch)
tree2b3cc4cbf5a17a6de91c6f07e47b038b22186921 /synapse/handlers
parentMerge pull request #1117 from matrix-org/erikj/fix_state (diff)
parentBy default limit /publicRooms to 100 entries (diff)
downloadsynapse-e457034e99d46d32e3d71eddb12cda07d8a0370e.tar.xz
Merge pull request #1121 from matrix-org/erikj/public_room_paginate
Add pagination support to publicRooms
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/room_list.py280
1 files changed, 196 insertions, 84 deletions
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index d72e8c99f9..28bc35f8a3 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -18,13 +18,16 @@ from twisted.internet import defer
 from ._base import BaseHandler
 
 from synapse.api.constants import (
-    EventTypes, JoinRules, Membership,
+    EventTypes, JoinRules,
 )
-from synapse.api.errors import SynapseError
 from synapse.util.async import concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
 
+from collections import namedtuple
+from unpaddedbase64 import encode_base64, decode_base64
+
 import logging
+import msgpack
 
 logger = logging.getLogger(__name__)
 
@@ -35,28 +38,130 @@ class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
         self.response_cache = ResponseCache(hs)
-        self.remote_list_request_cache = ResponseCache(hs)
-        self.remote_list_cache = {}
-        self.fetch_looping_call = hs.get_clock().looping_call(
-            self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
-        )
-        self.fetch_all_remote_lists()
 
-    def get_local_public_room_list(self):
-        result = self.response_cache.get(())
+    def get_local_public_room_list(self, limit=None, since_token=None):
+        result = self.response_cache.get((limit, since_token))
         if not result:
-            result = self.response_cache.set((), self._get_public_room_list())
+            result = self.response_cache.set(
+                (limit, since_token),
+                self._get_public_room_list(limit, since_token)
+            )
         return result
 
     @defer.inlineCallbacks
-    def _get_public_room_list(self):
-        room_ids = yield self.store.get_public_room_ids()
+    def _get_public_room_list(self, limit=None, since_token=None):
+        if since_token and since_token != "END":
+            since_token = RoomListNextBatch.from_token(since_token)
+        else:
+            since_token = None
+
+        rooms_to_order_value = {}
+        rooms_to_num_joined = {}
+        rooms_to_latest_event_ids = {}
+
+        newly_visible = []
+        newly_unpublished = []
+        if since_token:
+            stream_token = since_token.stream_ordering
+            current_public_id = yield self.store.get_current_public_room_stream_id()
+            public_room_stream_id = since_token.public_room_stream_id
+            newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
+                public_room_stream_id, current_public_id
+            )
+        else:
+            stream_token = yield self.store.get_room_max_stream_ordering()
+            public_room_stream_id = yield self.store.get_current_public_room_stream_id()
+
+        room_ids = yield self.store.get_public_room_ids_at_stream_id(
+            public_room_stream_id
+        )
+
+        # We want to return rooms in a particular order: the number of joined
+        # users. We then arbitrarily use the room_id as a tie breaker.
+
+        @defer.inlineCallbacks
+        def get_order_for_room(room_id):
+            latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
+            if not latest_event_ids:
+                latest_event_ids = yield self.store.get_forward_extremeties_for_room(
+                    room_id, stream_token
+                )
+                rooms_to_latest_event_ids[room_id] = latest_event_ids
+
+            if not latest_event_ids:
+                return
+
+            joined_users = yield self.state_handler.get_current_user_in_room(
+                room_id, latest_event_ids,
+            )
+            num_joined_users = len(joined_users)
+            rooms_to_num_joined[room_id] = num_joined_users
+
+            if num_joined_users == 0:
+                return
+
+            # We want larger rooms to be first, hence negating num_joined_users
+            rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+
+        yield concurrently_execute(get_order_for_room, room_ids, 10)
+
+        sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
+        sorted_rooms = [room_id for room_id, _ in sorted_entries]
+
+        if since_token:
+            if since_token.direction_is_forward:
+                sorted_rooms = sorted_rooms[since_token.current_limit:]
+            else:
+                sorted_rooms = sorted_rooms[:since_token.current_limit]
+                sorted_rooms.reverse()
 
-        results = []
+        new_limit = None
+        if limit:
+            if sorted_rooms[limit:]:
+                new_limit = limit
+                if since_token:
+                    if since_token.direction_is_forward:
+                        new_limit += since_token.current_limit
+                    else:
+                        new_limit = since_token.current_limit - new_limit
+                        new_limit = max(0, new_limit)
+            sorted_rooms = sorted_rooms[:limit]
+
+        chunk = []
 
         @defer.inlineCallbacks
         def handle_room(room_id):
-            current_state = yield self.state_handler.get_current_state(room_id)
+            num_joined_users = rooms_to_num_joined[room_id]
+            if num_joined_users == 0:
+                return
+
+            if room_id in newly_unpublished:
+                return
+
+            result = {
+                "room_id": room_id,
+                "num_joined_members": num_joined_users,
+            }
+
+            current_state_ids = yield self.state_handler.get_current_state_ids(room_id)
+
+            event_map = yield self.store.get_events([
+                event_id for key, event_id in current_state_ids.items()
+                if key[0] in (
+                    EventTypes.JoinRules,
+                    EventTypes.Name,
+                    EventTypes.Topic,
+                    EventTypes.CanonicalAlias,
+                    EventTypes.RoomHistoryVisibility,
+                    EventTypes.GuestAccess,
+                    "m.room.avatar",
+                )
+            ])
+
+            current_state = {
+                (ev.type, ev.state_key): ev
+                for ev in event_map.values()
+            }
 
             # Double check that this is actually a public room.
             join_rules_event = current_state.get((EventTypes.JoinRules, ""))
@@ -65,18 +170,6 @@ class RoomListHandler(BaseHandler):
                 if join_rule and join_rule != JoinRules.PUBLIC:
                     defer.returnValue(None)
 
-            result = {"room_id": room_id}
-
-            num_joined_users = len([
-                1 for _, event in current_state.items()
-                if event.type == EventTypes.Member
-                and event.membership == Membership.JOIN
-            ])
-            if num_joined_users == 0:
-                return
-
-            result["num_joined_members"] = num_joined_users
-
             aliases = yield self.store.get_aliases_for_room(room_id)
             if aliases:
                 result["aliases"] = aliases
@@ -117,68 +210,87 @@ class RoomListHandler(BaseHandler):
                 if avatar_url:
                     result["avatar_url"] = avatar_url
 
-            results.append(result)
+            chunk.append(result)
 
-        yield concurrently_execute(handle_room, room_ids, 10)
+        yield concurrently_execute(handle_room, sorted_rooms, 10)
 
-        # FIXME (erikj): START is no longer a valid value
-        defer.returnValue({"start": "START", "end": "END", "chunk": results})
+        chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
 
-    @defer.inlineCallbacks
-    def fetch_all_remote_lists(self):
-        deferred = self.hs.get_replication_layer().get_public_rooms(
-            self.hs.config.secondary_directory_servers
-        )
-        self.remote_list_request_cache.set((), deferred)
-        self.remote_list_cache = yield deferred
+        results = {
+            "chunk": chunk,
+        }
+
+        if since_token:
+            results["new_rooms"] = bool(newly_visible)
+
+        if not since_token or since_token.direction_is_forward:
+            if new_limit:
+                results["next_batch"] = RoomListNextBatch(
+                    stream_ordering=stream_token,
+                    public_room_stream_id=public_room_stream_id,
+                    current_limit=new_limit,
+                    direction_is_forward=True,
+                ).to_token()
+
+            if since_token:
+                results["prev_batch"] = since_token.copy_and_replace(
+                    direction_is_forward=False,
+                ).to_token()
+        else:
+            if new_limit:
+                results["prev_batch"] = RoomListNextBatch(
+                    stream_ordering=stream_token,
+                    public_room_stream_id=public_room_stream_id,
+                    current_limit=new_limit,
+                    direction_is_forward=False,
+                ).to_token()
+
+            if since_token:
+                results["next_batch"] = since_token.copy_and_replace(
+                    direction_is_forward=True,
+                ).to_token()
+
+        defer.returnValue(results)
 
     @defer.inlineCallbacks
-    def get_remote_public_room_list(self, server_name):
+    def get_remote_public_room_list(self, server_name, limit=None, since_token=None):
         res = yield self.hs.get_replication_layer().get_public_rooms(
-            [server_name]
+            server_name, limit=limit, since_token=since_token,
         )
 
-        if server_name not in res:
-            raise SynapseError(404, "Server not found")
-        defer.returnValue(res[server_name])
-
-    @defer.inlineCallbacks
-    def get_aggregated_public_room_list(self):
-        """
-        Get the public room list from this server and the servers
-        specified in the secondary_directory_servers config option.
-        XXX: Pagination...
-        """
-        # We return the results from out cache which is updated by a looping call,
-        # unless we're missing a cache entry, in which case wait for the result
-        # of the fetch if there's one in progress. If not, omit that server.
-        wait = False
-        for s in self.hs.config.secondary_directory_servers:
-            if s not in self.remote_list_cache:
-                logger.warn("No cached room list from %s: waiting for fetch", s)
-                wait = True
-                break
-
-        if wait and self.remote_list_request_cache.get(()):
-            yield self.remote_list_request_cache.get(())
-
-        public_rooms = yield self.get_local_public_room_list()
-
-        # keep track of which room IDs we've seen so we can de-dup
-        room_ids = set()
-
-        # tag all the ones in our list with our server name.
-        # Also add the them to the de-deping set
-        for room in public_rooms['chunk']:
-            room["server_name"] = self.hs.hostname
-            room_ids.add(room["room_id"])
-
-        # Now add the results from federation
-        for server_name, server_result in self.remote_list_cache.items():
-            for room in server_result["chunk"]:
-                if room["room_id"] not in room_ids:
-                    room["server_name"] = server_name
-                    public_rooms["chunk"].append(room)
-                    room_ids.add(room["room_id"])
-
-        defer.returnValue(public_rooms)
+        defer.returnValue(res)
+
+
+class RoomListNextBatch(namedtuple("RoomListNextBatch", (
+    "stream_ordering",  # stream_ordering of the first public room list
+    "public_room_stream_id",  # public room stream id for first public room list
+    "current_limit",  # The number of previous rooms returned
+    "direction_is_forward",  # Bool if this is a next_batch, false if prev_batch
+))):
+
+    KEY_DICT = {
+        "stream_ordering": "s",
+        "public_room_stream_id": "p",
+        "current_limit": "n",
+        "direction_is_forward": "d",
+    }
+
+    REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()}
+
+    @classmethod
+    def from_token(cls, token):
+        return RoomListNextBatch(**{
+            cls.REVERSE_KEY_DICT[key]: val
+            for key, val in msgpack.loads(decode_base64(token)).items()
+        })
+
+    def to_token(self):
+        return encode_base64(msgpack.dumps({
+            self.KEY_DICT[key]: val
+            for key, val in self._asdict().items()
+        }))
+
+    def copy_and_replace(self, **kwds):
+        return self._replace(
+            **kwds
+        )