summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2016-09-16 13:17:29 +0100
committerGitHub <noreply@github.com>2016-09-16 13:17:29 +0100
commit9e1283c8240068575dcf41d12b97fcd771e0c0e0 (patch)
tree3e85a6708c9dc9210237e7d027a2802e011a5bbf
parentMerge pull request #1125 from matrix-org/erikj/public_room_cache (diff)
parentComment (diff)
downloadsynapse-9e1283c8240068575dcf41d12b97fcd771e0c0e0.tar.xz
Merge pull request #1126 from matrix-org/erikj/public_room_cache
Add very basic filter API to /publicRooms
-rw-r--r--synapse/federation/federation_client.py7
-rw-r--r--synapse/federation/transport/client.py5
-rw-r--r--synapse/handlers/room_list.py112
-rw-r--r--synapse/replication/slave/storage/events.py1
-rw-r--r--synapse/rest/client/v1/room.py28
-rw-r--r--synapse/storage/__init__.py2
-rw-r--r--synapse/storage/event_federation.py27
7 files changed, 158 insertions, 24 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f0a684fc13..06d0320b1a 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -718,11 +718,14 @@ class FederationClient(FederationBase):
 
         raise RuntimeError("Failed to send to any server.")
 
-    def get_public_rooms(self, destination, limit=None, since_token=None):
+    def get_public_rooms(self, destination, limit=None, since_token=None,
+                         search_filter=None):
         if destination == self.server_name:
             return
 
-        return self.transport_layer.get_public_rooms(destination, limit, since_token)
+        return self.transport_layer.get_public_rooms(
+            destination, limit, since_token, search_filter
+        )
 
     @defer.inlineCallbacks
     def query_auth(self, destination, room_id, event_id, local_auth):
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index f508b70f11..db45c7826c 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -248,7 +248,8 @@ class TransportLayerClient(object):
 
     @defer.inlineCallbacks
     @log_function
-    def get_public_rooms(self, remote_server, limit, since_token):
+    def get_public_rooms(self, remote_server, limit, since_token,
+                         search_filter=None):
         path = PREFIX + "/publicRooms"
 
         args = {}
@@ -257,6 +258,8 @@ class TransportLayerClient(object):
         if since_token:
             args["since"] = [since_token]
 
+        # TODO(erikj): Actually send the search_filter across federation.
+
         response = yield self.client.get_json(
             destination=remote_server,
             path=path,
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 28bc35f8a3..f15987b265 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -38,8 +38,14 @@ class RoomListHandler(BaseHandler):
     def __init__(self, hs):
         super(RoomListHandler, self).__init__(hs)
         self.response_cache = ResponseCache(hs)
+        self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
+
+    def get_local_public_room_list(self, limit=None, since_token=None,
+                                   search_filter=None):
+        if search_filter:
+            # We explicitly don't bother caching searches.
+            return self._get_public_room_list(limit, since_token, search_filter)
 
-    def get_local_public_room_list(self, limit=None, since_token=None):
         result = self.response_cache.get((limit, since_token))
         if not result:
             result = self.response_cache.set(
@@ -49,7 +55,8 @@ class RoomListHandler(BaseHandler):
         return result
 
     @defer.inlineCallbacks
-    def _get_public_room_list(self, limit=None, since_token=None):
+    def _get_public_room_list(self, limit=None, since_token=None,
+                              search_filter=None):
         if since_token and since_token != "END":
             since_token = RoomListNextBatch.from_token(since_token)
         else:
@@ -115,22 +122,18 @@ class RoomListHandler(BaseHandler):
                 sorted_rooms = sorted_rooms[:since_token.current_limit]
                 sorted_rooms.reverse()
 
-        new_limit = None
-        if limit:
-            if sorted_rooms[limit:]:
-                new_limit = limit
-                if since_token:
-                    if since_token.direction_is_forward:
-                        new_limit += since_token.current_limit
-                    else:
-                        new_limit = since_token.current_limit - new_limit
-                        new_limit = max(0, new_limit)
-            sorted_rooms = sorted_rooms[:limit]
+        rooms_to_scan = sorted_rooms
+        if limit and not search_filter:
+            rooms_to_scan = sorted_rooms[:limit + 1]
 
         chunk = []
 
         @defer.inlineCallbacks
         def handle_room(room_id):
+            if limit and len(chunk) > limit + 1:
+                # We've already got enough, so lets just drop it.
+                return
+
             num_joined_users = rooms_to_num_joined[room_id]
             if num_joined_users == 0:
                 return
@@ -210,12 +213,37 @@ class RoomListHandler(BaseHandler):
                 if avatar_url:
                     result["avatar_url"] = avatar_url
 
-            chunk.append(result)
+            if _matches_room_entry(result, search_filter):
+                chunk.append(result)
 
-        yield concurrently_execute(handle_room, sorted_rooms, 10)
+        yield concurrently_execute(handle_room, rooms_to_scan, 10)
 
         chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
 
+        # Work out the new limit of the batch for pagination, or None if we
+        # know there are no more results that would be returned.
+        new_limit = None
+        if chunk and (not limit or len(chunk) > limit):
+            if limit:
+                chunk = chunk[:limit]
+
+            addition = 1
+            if since_token:
+                addition += since_token.current_limit
+
+            if not since_token or since_token.direction_is_forward:
+                last_room_id = chunk[-1]["room_id"]
+            else:
+                last_room_id = chunk[0]["room_id"]
+                addition *= -1
+
+            try:
+                new_limit = sorted_rooms.index(last_room_id) + addition
+                if new_limit >= len(sorted_rooms):
+                    new_limit = None
+            except ValueError:
+                pass
+
         results = {
             "chunk": chunk,
         }
@@ -253,13 +281,48 @@ class RoomListHandler(BaseHandler):
         defer.returnValue(results)
 
     @defer.inlineCallbacks
-    def get_remote_public_room_list(self, server_name, limit=None, since_token=None):
-        res = yield self.hs.get_replication_layer().get_public_rooms(
+    def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
+                                    search_filter=None):
+        if search_filter:
+            # We currently don't support searching across federation, so we have
+            # to do it manually without pagination
+            limit = None
+            since_token = None
+
+        res = yield self._get_remote_list_cached(
             server_name, limit=limit, since_token=since_token,
         )
 
+        if search_filter:
+            res = {"chunk": [
+                entry
+                for entry in list(res.get("chunk", []))
+                if _matches_room_entry(entry, search_filter)
+            ]}
+
         defer.returnValue(res)
 
+    def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
+                                search_filter=None):
+        repl_layer = self.hs.get_replication_layer()
+        if search_filter:
+            # We can't cache when asking for search
+            return repl_layer.get_public_rooms(
+                server_name, limit=limit, since_token=since_token,
+                search_filter=search_filter,
+            )
+
+        result = self.remote_response_cache.get((server_name, limit, since_token))
+        if not result:
+            result = self.remote_response_cache.set(
+                (server_name, limit, since_token),
+                repl_layer.get_public_rooms(
+                    server_name, limit=limit, since_token=since_token,
+                    search_filter=search_filter,
+                )
+            )
+        return result
+
 
 class RoomListNextBatch(namedtuple("RoomListNextBatch", (
     "stream_ordering",  # stream_ordering of the first public room list
@@ -294,3 +357,18 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", (
         return self._replace(
             **kwds
         )
+
+
+def _matches_room_entry(room_entry, search_filter):
+    if search_filter and search_filter.get("generic_search_term", None):
+        generic_search_term = search_filter["generic_search_term"]
+        if generic_search_term in room_entry.get("name", ""):
+            return True
+        elif generic_search_term in room_entry.get("topic", ""):
+            return True
+        elif generic_search_term in room_entry.get("canonical_alias", ""):
+            return True
+    else:
+        return True
+
+    return False
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index cc32c66792..0c26e96e98 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -62,6 +62,7 @@ class SlavedEventStore(BaseSlavedStore):
         )
 
         self.stream_ordering_month_ago = 0
+        self._stream_order_on_start = self.get_room_max_stream_ordering()
 
     # Cached functions can't be accessed through a class instance so we need
     # to reach inside the __dict__ to extract them.
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index db0cd4380a..5584bfbfc0 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -337,6 +337,34 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
 
         defer.returnValue((200, data))
 
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        yield self.auth.get_user_by_req(request)
+
+        server = parse_string(request, "server", default=None)
+        content = parse_json_object_from_request(request)
+
+        limit = int(content.get("limit", 100))
+        since_token = content.get("since", None)
+        search_filter = content.get("filter", None)
+
+        handler = self.hs.get_room_list_handler()
+        if server:
+            data = yield handler.get_remote_public_room_list(
+                server,
+                limit=limit,
+                since_token=since_token,
+                search_filter=search_filter,
+            )
+        else:
+            data = yield handler.get_local_public_room_list(
+                limit=limit,
+                since_token=since_token,
+                search_filter=search_filter,
+            )
+
+        defer.returnValue((200, data))
+
 
 # TODO: Needs unit testing
 class RoomMemberListRestServlet(ClientV1RestServlet):
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0099a3f5bb..9996f195a0 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -222,6 +222,8 @@ class DataStore(RoomMemberStore, RoomStore,
             self._find_stream_orderings_for_times, 60 * 60 * 1000
         )
 
+        self._stream_order_on_start = self.get_room_max_stream_ordering()
+
         super(DataStore, self).__init__(hs)
 
     def take_presence_startup_info(self):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 59b4cf1e53..3d62451de9 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -348,7 +348,14 @@ class EventFederationStore(SQLBaseStore):
         # We want to make the cache more effective, so we clamp to the last
         # change before the given ordering.
         last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
-        stream_ordering = min(last_change, stream_ordering)
+
+        # We don't always have a full stream_to_exterm_id table, e.g. after
+        # the upgrade that introduced it, so we make sure we never ask for a
+        # try and pin to a stream_ordering from before a restart
+        last_change = max(self._stream_order_on_start, last_change)
+
+        if last_change > self.stream_ordering_month_ago:
+            stream_ordering = min(last_change, stream_ordering)
 
         return self._get_forward_extremeties_for_room(room_id, stream_ordering)
 
@@ -369,7 +376,7 @@ class EventFederationStore(SQLBaseStore):
                 INNER JOIN (
                     SELECT room_id, MAX(stream_ordering) AS stream_ordering
                     FROM stream_ordering_to_exterm
-                    WHERE stream_ordering < ? GROUP BY room_id
+                    WHERE stream_ordering <= ? GROUP BY room_id
                 ) AS rms USING (room_id, stream_ordering)
                 WHERE room_id = ?
         """)
@@ -386,9 +393,21 @@ class EventFederationStore(SQLBaseStore):
 
     def _delete_old_forward_extrem_cache(self):
         def _delete_old_forward_extrem_cache_txn(txn):
+            # Delete entries older than a month, while making sure we don't delete
+            # the only entries for a room.
+            sql = ("""
+                DELETE FROM stream_ordering_to_exterm
+                WHERE
+                (
+                    SELECT max(stream_ordering) AS stream_ordering
+                    FROM stream_ordering_to_exterm
+                    WHERE room_id = stream_ordering_to_exterm.room_id
+                ) > ?
+                AND stream_ordering < ?
+            """)
             txn.execute(
-                "DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?",
-                (self.stream_ordering_month_ago,)
+                sql,
+                (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
             )
         return self.runInteraction(
             "_delete_old_forward_extrem_cache",