diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f0a684fc13..06d0320b1a 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -718,11 +718,14 @@ class FederationClient(FederationBase):
raise RuntimeError("Failed to send to any server.")
- def get_public_rooms(self, destination, limit=None, since_token=None):
+ def get_public_rooms(self, destination, limit=None, since_token=None,
+ search_filter=None):
if destination == self.server_name:
return
- return self.transport_layer.get_public_rooms(destination, limit, since_token)
+ return self.transport_layer.get_public_rooms(
+ destination, limit, since_token, search_filter
+ )
@defer.inlineCallbacks
def query_auth(self, destination, room_id, event_id, local_auth):
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index f508b70f11..db45c7826c 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -248,7 +248,8 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
- def get_public_rooms(self, remote_server, limit, since_token):
+ def get_public_rooms(self, remote_server, limit, since_token,
+ search_filter=None):
path = PREFIX + "/publicRooms"
args = {}
@@ -257,6 +258,8 @@ class TransportLayerClient(object):
if since_token:
args["since"] = [since_token]
+ # TODO(erikj): Actually send the search_filter across federation.
+
response = yield self.client.get_json(
destination=remote_server,
path=path,
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 28bc35f8a3..f15987b265 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -38,8 +38,14 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs)
+ self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000)
+
+ def get_local_public_room_list(self, limit=None, since_token=None,
+ search_filter=None):
+ if search_filter:
+ # We explicitly don't bother caching searches.
+ return self._get_public_room_list(limit, since_token, search_filter)
- def get_local_public_room_list(self, limit=None, since_token=None):
result = self.response_cache.get((limit, since_token))
if not result:
result = self.response_cache.set(
@@ -49,7 +55,8 @@ class RoomListHandler(BaseHandler):
return result
@defer.inlineCallbacks
- def _get_public_room_list(self, limit=None, since_token=None):
+ def _get_public_room_list(self, limit=None, since_token=None,
+ search_filter=None):
if since_token and since_token != "END":
since_token = RoomListNextBatch.from_token(since_token)
else:
@@ -115,22 +122,18 @@ class RoomListHandler(BaseHandler):
sorted_rooms = sorted_rooms[:since_token.current_limit]
sorted_rooms.reverse()
- new_limit = None
- if limit:
- if sorted_rooms[limit:]:
- new_limit = limit
- if since_token:
- if since_token.direction_is_forward:
- new_limit += since_token.current_limit
- else:
- new_limit = since_token.current_limit - new_limit
- new_limit = max(0, new_limit)
- sorted_rooms = sorted_rooms[:limit]
+ rooms_to_scan = sorted_rooms
+ if limit and not search_filter:
+ rooms_to_scan = sorted_rooms[:limit + 1]
chunk = []
@defer.inlineCallbacks
def handle_room(room_id):
+ if limit and len(chunk) > limit + 1:
+ # We've already got enough, so lets just drop it.
+ return
+
num_joined_users = rooms_to_num_joined[room_id]
if num_joined_users == 0:
return
@@ -210,12 +213,37 @@ class RoomListHandler(BaseHandler):
if avatar_url:
result["avatar_url"] = avatar_url
- chunk.append(result)
+ if _matches_room_entry(result, search_filter):
+ chunk.append(result)
- yield concurrently_execute(handle_room, sorted_rooms, 10)
+ yield concurrently_execute(handle_room, rooms_to_scan, 10)
chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
+ # Work out the new limit of the batch for pagination, or None if we
+ # know there are no more results that would be returned.
+ new_limit = None
+ if chunk and (not limit or len(chunk) > limit):
+ if limit:
+ chunk = chunk[:limit]
+
+ addition = 1
+ if since_token:
+ addition += since_token.current_limit
+
+ if not since_token or since_token.direction_is_forward:
+ last_room_id = chunk[-1]["room_id"]
+ else:
+ last_room_id = chunk[0]["room_id"]
+ addition *= -1
+
+ try:
+ new_limit = sorted_rooms.index(last_room_id) + addition
+ if new_limit >= len(sorted_rooms):
+ new_limit = None
+ except ValueError:
+ pass
+
results = {
"chunk": chunk,
}
@@ -253,13 +281,48 @@ class RoomListHandler(BaseHandler):
defer.returnValue(results)
@defer.inlineCallbacks
- def get_remote_public_room_list(self, server_name, limit=None, since_token=None):
- res = yield self.hs.get_replication_layer().get_public_rooms(
+ def get_remote_public_room_list(self, server_name, limit=None, since_token=None,
+ search_filter=None):
+ if search_filter:
+ # We currently don't support searching across federation, so we have
+ # to do it manually without pagination
+ limit = None
+ since_token = None
+
+ res = yield self._get_remote_list_cached(
server_name, limit=limit, since_token=since_token,
)
+ if search_filter:
+ res = {"chunk": [
+ entry
+ for entry in list(res.get("chunk", []))
+ if _matches_room_entry(entry, search_filter)
+ ]}
+
defer.returnValue(res)
+ def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
+ search_filter=None):
+ repl_layer = self.hs.get_replication_layer()
+ if search_filter:
+ # We can't cache when asking for search
+ return repl_layer.get_public_rooms(
+ server_name, limit=limit, since_token=since_token,
+ search_filter=search_filter,
+ )
+
+ result = self.remote_response_cache.get((server_name, limit, since_token))
+ if not result:
+ result = self.remote_response_cache.set(
+ (server_name, limit, since_token),
+ repl_layer.get_public_rooms(
+ server_name, limit=limit, since_token=since_token,
+ search_filter=search_filter,
+ )
+ )
+ return result
+
class RoomListNextBatch(namedtuple("RoomListNextBatch", (
"stream_ordering", # stream_ordering of the first public room list
@@ -294,3 +357,18 @@ class RoomListNextBatch(namedtuple("RoomListNextBatch", (
return self._replace(
**kwds
)
+
+
+def _matches_room_entry(room_entry, search_filter):
+ if search_filter and search_filter.get("generic_search_term", None):
+ generic_search_term = search_filter["generic_search_term"]
+ if generic_search_term in room_entry.get("name", ""):
+ return True
+ elif generic_search_term in room_entry.get("topic", ""):
+ return True
+ elif generic_search_term in room_entry.get("canonical_alias", ""):
+ return True
+ else:
+ return True
+
+ return False
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index cc32c66792..0c26e96e98 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -62,6 +62,7 @@ class SlavedEventStore(BaseSlavedStore):
)
self.stream_ordering_month_ago = 0
+ self._stream_order_on_start = self.get_room_max_stream_ordering()
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index db0cd4380a..5584bfbfc0 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -337,6 +337,34 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
defer.returnValue((200, data))
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ yield self.auth.get_user_by_req(request)
+
+ server = parse_string(request, "server", default=None)
+ content = parse_json_object_from_request(request)
+
+ limit = int(content.get("limit", 100))
+ since_token = content.get("since", None)
+ search_filter = content.get("filter", None)
+
+ handler = self.hs.get_room_list_handler()
+ if server:
+ data = yield handler.get_remote_public_room_list(
+ server,
+ limit=limit,
+ since_token=since_token,
+ search_filter=search_filter,
+ )
+ else:
+ data = yield handler.get_local_public_room_list(
+ limit=limit,
+ since_token=since_token,
+ search_filter=search_filter,
+ )
+
+ defer.returnValue((200, data))
+
# TODO: Needs unit testing
class RoomMemberListRestServlet(ClientV1RestServlet):
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 0099a3f5bb..9996f195a0 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -222,6 +222,8 @@ class DataStore(RoomMemberStore, RoomStore,
self._find_stream_orderings_for_times, 60 * 60 * 1000
)
+ self._stream_order_on_start = self.get_room_max_stream_ordering()
+
super(DataStore, self).__init__(hs)
def take_presence_startup_info(self):
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 59b4cf1e53..3d62451de9 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -348,7 +348,14 @@ class EventFederationStore(SQLBaseStore):
# We want to make the cache more effective, so we clamp to the last
# change before the given ordering.
last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
- stream_ordering = min(last_change, stream_ordering)
+
+ # We don't always have a full stream_to_exterm_id table, e.g. after
+ # the upgrade that introduced it, so we make sure we never ask for a
+ # try and pin to a stream_ordering from before a restart
+ last_change = max(self._stream_order_on_start, last_change)
+
+ if last_change > self.stream_ordering_month_ago:
+ stream_ordering = min(last_change, stream_ordering)
return self._get_forward_extremeties_for_room(room_id, stream_ordering)
@@ -369,7 +376,7 @@ class EventFederationStore(SQLBaseStore):
INNER JOIN (
SELECT room_id, MAX(stream_ordering) AS stream_ordering
FROM stream_ordering_to_exterm
- WHERE stream_ordering < ? GROUP BY room_id
+ WHERE stream_ordering <= ? GROUP BY room_id
) AS rms USING (room_id, stream_ordering)
WHERE room_id = ?
""")
@@ -386,9 +393,21 @@ class EventFederationStore(SQLBaseStore):
def _delete_old_forward_extrem_cache(self):
def _delete_old_forward_extrem_cache_txn(txn):
+ # Delete entries older than a month, while making sure we don't delete
+ # the only entries for a room.
+ sql = ("""
+ DELETE FROM stream_ordering_to_exterm
+ WHERE
+ (
+ SELECT max(stream_ordering) AS stream_ordering
+ FROM stream_ordering_to_exterm
+ WHERE room_id = stream_ordering_to_exterm.room_id
+ ) > ?
+ AND stream_ordering < ?
+ """)
txn.execute(
- "DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?",
- (self.stream_ordering_month_ago,)
+ sql,
+ (self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
return self.runInteraction(
"_delete_old_forward_extrem_cache",
|