diff --git a/changelog.d/11658.feature b/changelog.d/11658.feature
new file mode 100644
index 0000000000..2ec9fb5eec
--- /dev/null
+++ b/changelog.d/11658.feature
@@ -0,0 +1 @@
+Add an admin API to get a list of rooms that federate with a given remote homeserver.
\ No newline at end of file
diff --git a/docs/usage/administration/admin_api/federation.md b/docs/usage/administration/admin_api/federation.md
index 5e609561a6..60cbc5265e 100644
--- a/docs/usage/administration/admin_api/federation.md
+++ b/docs/usage/administration/admin_api/federation.md
@@ -119,6 +119,66 @@ The following parameters should be set in the URL:
The response fields are the same like in the `destinations` array in
[List of destinations](#list-of-destinations) response.
+## Destination rooms
+
+This API gets the rooms that federate with a specific remote server.
+
+The API is:
+
+```
+GET /_synapse/admin/v1/federation/destinations/<destination>/rooms
+```
+
+A response body like the following is returned:
+
+```json
+{
+ "rooms":[
+ {
+ "room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
+ "stream_ordering": 8326
+ },
+ {
+ "room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
+ "stream_ordering": 93534
+ }
+ ],
+ "total": 2
+}
+```
+
+To paginate, check for `next_token` and if present, call the endpoint again
+with `from` set to the value of `next_token`. This will return a new page.
+
+If the endpoint does not return a `next_token` then there are no more destinations
+to paginate through.
+
+**Parameters**
+
+The following parameters should be set in the URL:
+
+- `destination` - Name of the remote server.
+
+The following query parameters are available:
+
+- `from` - Offset in the returned list. Defaults to `0`.
+- `limit` - Maximum amount of destinations to return. Defaults to `100`.
+- `dir` - Direction of room order by `room_id`. Either `f` for forwards or `b` for
+ backwards. Defaults to `f`.
+
+**Response**
+
+The following fields are returned in the JSON response body:
+
+- `rooms` - An array of objects, each containing information about a room.
+ Room objects contain the following fields:
+ - `room_id` - string - The ID of the room.
+ - `stream_ordering` - integer - The stream ordering of the most recent
+ successfully-sent [PDU](understanding_synapse_through_grafana_graphs.md#federation)
+ to this destination in this room.
+- `next_token`: string representing a positive integer - Indication for pagination. See above.
+- `total` - integer - Total number of destinations.
+
## Reset connection timeout
Synapse makes federation requests to other homeservers. If a federation request fails,
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index b1e49d51b7..9be9e33c8e 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -41,6 +41,7 @@ from synapse.rest.admin.event_reports import (
EventReportsRestServlet,
)
from synapse.rest.admin.federation import (
+ DestinationMembershipRestServlet,
DestinationResetConnectionRestServlet,
DestinationRestServlet,
ListDestinationsRestServlet,
@@ -268,6 +269,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ListRegistrationTokensRestServlet(hs).register(http_server)
NewRegistrationTokenRestServlet(hs).register(http_server)
RegistrationTokenRestServlet(hs).register(http_server)
+ DestinationMembershipRestServlet(hs).register(http_server)
DestinationResetConnectionRestServlet(hs).register(http_server)
DestinationRestServlet(hs).register(http_server)
ListDestinationsRestServlet(hs).register(http_server)
diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py
index 0f33f9e4da..d162e0081e 100644
--- a/synapse/rest/admin/federation.py
+++ b/synapse/rest/admin/federation.py
@@ -148,6 +148,62 @@ class DestinationRestServlet(RestServlet):
return HTTPStatus.OK, response
+class DestinationMembershipRestServlet(RestServlet):
+ """Get list of rooms of a destination.
+ This needs user to have administrator access in Synapse.
+
+ GET /_synapse/admin/v1/federation/destinations/<destination>/rooms?from=0&limit=10
+
+ returns:
+ 200 OK with a list of rooms if success otherwise an error.
+
+ The parameters `from` and `limit` are required only for pagination.
+ By default, a `limit` of 100 is used.
+ """
+
+ PATTERNS = admin_patterns("/federation/destinations/(?P<destination>[^/]*)/rooms$")
+
+ def __init__(self, hs: "HomeServer"):
+ self._auth = hs.get_auth()
+ self._store = hs.get_datastore()
+
+ async def on_GET(
+ self, request: SynapseRequest, destination: str
+ ) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self._auth, request)
+
+ if not await self._store.is_destination_known(destination):
+ raise NotFoundError("Unknown destination")
+
+ start = parse_integer(request, "from", default=0)
+ limit = parse_integer(request, "limit", default=100)
+
+ if start < 0:
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "Query parameter from must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ if limit < 0:
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "Query parameter limit must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ direction = parse_string(request, "dir", default="f", allowed_values=("f", "b"))
+
+ rooms, total = await self._store.get_destination_rooms_paginate(
+ destination, start, limit, direction
+ )
+ response = {"rooms": rooms, "total": total}
+ if (start + limit) < total:
+ response["next_token"] = str(start + len(rooms))
+
+ return HTTPStatus.OK, response
+
+
class DestinationResetConnectionRestServlet(RestServlet):
"""Reset destinations' connection timeouts and wake it up.
This needs user to have administrator access in Synapse.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 4b78b4d098..ba79e19f7f 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -561,6 +561,54 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
"get_destinations_paginate_txn", get_destinations_paginate_txn
)
+ async def get_destination_rooms_paginate(
+ self, destination: str, start: int, limit: int, direction: str = "f"
+ ) -> Tuple[List[JsonDict], int]:
+ """Function to retrieve a paginated list of destination's rooms.
+ This will return a json list of rooms and the
+ total number of rooms.
+
+ Args:
+ destination: the destination to query
+ start: start number to begin the query from
+ limit: number of rows to retrieve
+ direction: sort ascending or descending by room_id
+ Returns:
+ A tuple of a dict of rooms and a count of total rooms.
+ """
+
+ def get_destination_rooms_paginate_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[List[JsonDict], int]:
+
+ if direction == "b":
+ order = "DESC"
+ else:
+ order = "ASC"
+
+ sql = """
+ SELECT COUNT(*) as total_rooms
+ FROM destination_rooms
+ WHERE destination = ?
+ """
+ txn.execute(sql, [destination])
+ count = cast(Tuple[int], txn.fetchone())[0]
+
+ rooms = self.db_pool.simple_select_list_paginate_txn(
+ txn=txn,
+ table="destination_rooms",
+ orderby="room_id",
+ start=start,
+ limit=limit,
+ retcols=("room_id", "stream_ordering"),
+ order_direction=order,
+ )
+ return rooms, count
+
+ return await self.db_pool.runInteraction(
+ "get_destination_rooms_paginate_txn", get_destination_rooms_paginate_txn
+ )
+
async def is_destination_known(self, destination: str) -> bool:
"""Check if a destination is known to the server."""
result = await self.db_pool.simple_select_one_onecol(
diff --git a/tests/rest/admin/test_federation.py b/tests/rest/admin/test_federation.py
index e2d3cff2a3..71068d16cd 100644
--- a/tests/rest/admin/test_federation.py
+++ b/tests/rest/admin/test_federation.py
@@ -20,7 +20,7 @@ from twisted.test.proto_helpers import MemoryReactor
import synapse.rest.admin
from synapse.api.errors import Codes
-from synapse.rest.client import login
+from synapse.rest.client import login, room
from synapse.server import HomeServer
from synapse.types import JsonDict
from synapse.util import Clock
@@ -52,9 +52,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
]
)
def test_requester_is_no_admin(self, method: str, url: str) -> None:
- """
- If the user is not a server admin, an error 403 is returned.
- """
+ """If the user is not a server admin, an error 403 is returned."""
self.register_user("user", "pass", admin=False)
other_user_tok = self.login("user", "pass")
@@ -70,9 +68,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
def test_invalid_parameter(self) -> None:
- """
- If parameters are invalid, an error is returned.
- """
+ """If parameters are invalid, an error is returned."""
# negative limit
channel = self.make_request(
@@ -135,9 +131,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
def test_limit(self) -> None:
- """
- Testing list of destinations with limit
- """
+ """Testing list of destinations with limit"""
number_destinations = 20
self._create_destinations(number_destinations)
@@ -155,9 +149,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self._check_fields(channel.json_body["destinations"])
def test_from(self) -> None:
- """
- Testing list of destinations with a defined starting point (from)
- """
+ """Testing list of destinations with a defined starting point (from)"""
number_destinations = 20
self._create_destinations(number_destinations)
@@ -175,9 +167,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self._check_fields(channel.json_body["destinations"])
def test_limit_and_from(self) -> None:
- """
- Testing list of destinations with a defined starting point and limit
- """
+ """Testing list of destinations with a defined starting point and limit"""
number_destinations = 20
self._create_destinations(number_destinations)
@@ -195,9 +185,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self._check_fields(channel.json_body["destinations"])
def test_next_token(self) -> None:
- """
- Testing that `next_token` appears at the right place
- """
+ """Testing that `next_token` appears at the right place"""
number_destinations = 20
self._create_destinations(number_destinations)
@@ -256,9 +244,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertNotIn("next_token", channel.json_body)
def test_list_all_destinations(self) -> None:
- """
- List all destinations.
- """
+ """List all destinations."""
number_destinations = 5
self._create_destinations(number_destinations)
@@ -277,9 +263,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
self._check_fields(channel.json_body["destinations"])
def test_order_by(self) -> None:
- """
- Testing order list with parameter `order_by`
- """
+ """Testing order list with parameter `order_by`"""
def _order_test(
expected_destination_list: List[str],
@@ -543,3 +527,271 @@ class FederationTestCase(unittest.HomeserverTestCase):
self.assertIn("retry_interval", c)
self.assertIn("failure_ts", c)
self.assertIn("last_successful_stream_ordering", c)
+
+
+class DestinationMembershipTestCase(unittest.HomeserverTestCase):
+ servlets = [
+ synapse.rest.admin.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ ]
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.store = hs.get_datastore()
+ self.admin_user = self.register_user("admin", "pass", admin=True)
+ self.admin_user_tok = self.login("admin", "pass")
+
+ self.dest = "sub0.example.com"
+ self.url = f"/_synapse/admin/v1/federation/destinations/{self.dest}/rooms"
+
+ # Record that we successfully contacted a destination in the DB.
+ self.get_success(
+ self.store.set_destination_retry_timings(self.dest, None, 0, 0)
+ )
+
+ def test_requester_is_no_admin(self) -> None:
+ """If the user is not a server admin, an error 403 is returned."""
+
+ self.register_user("user", "pass", admin=False)
+ other_user_tok = self.login("user", "pass")
+
+ channel = self.make_request(
+ "GET",
+ self.url,
+ access_token=other_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+ def test_invalid_parameter(self) -> None:
+ """If parameters are invalid, an error is returned."""
+
+ # negative limit
+ channel = self.make_request(
+ "GET",
+ self.url + "?limit=-5",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
+
+ # negative from
+ channel = self.make_request(
+ "GET",
+ self.url + "?from=-5",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
+
+ # invalid search order
+ channel = self.make_request(
+ "GET",
+ self.url + "?dir=bar",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
+
+ # invalid destination
+ channel = self.make_request(
+ "GET",
+ "/_synapse/admin/v1/federation/destinations/%s/rooms" % ("invalid",),
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
+ self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
+
+ def test_limit(self) -> None:
+ """Testing list of destinations with limit"""
+
+ number_rooms = 5
+ self._create_destination_rooms(number_rooms)
+
+ channel = self.make_request(
+ "GET",
+ self.url + "?limit=3",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_rooms)
+ self.assertEqual(len(channel.json_body["rooms"]), 3)
+ self.assertEqual(channel.json_body["next_token"], "3")
+ self._check_fields(channel.json_body["rooms"])
+
+ def test_from(self) -> None:
+ """Testing list of rooms with a defined starting point (from)"""
+
+ number_rooms = 10
+ self._create_destination_rooms(number_rooms)
+
+ channel = self.make_request(
+ "GET",
+ self.url + "?from=5",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_rooms)
+ self.assertEqual(len(channel.json_body["rooms"]), 5)
+ self.assertNotIn("next_token", channel.json_body)
+ self._check_fields(channel.json_body["rooms"])
+
+ def test_limit_and_from(self) -> None:
+ """Testing list of rooms with a defined starting point and limit"""
+
+ number_rooms = 10
+ self._create_destination_rooms(number_rooms)
+
+ channel = self.make_request(
+ "GET",
+ self.url + "?from=3&limit=5",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_rooms)
+ self.assertEqual(channel.json_body["next_token"], "8")
+ self.assertEqual(len(channel.json_body["rooms"]), 5)
+ self._check_fields(channel.json_body["rooms"])
+
+ def test_order_direction(self) -> None:
+ """Testing order list with parameter `dir`"""
+ number_rooms = 4
+ self._create_destination_rooms(number_rooms)
+
+ # get list in forward direction
+ channel_asc = self.make_request(
+ "GET",
+ self.url + "?dir=f",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel_asc.code, msg=channel_asc.json_body)
+ self.assertEqual(channel_asc.json_body["total"], number_rooms)
+ self.assertEqual(number_rooms, len(channel_asc.json_body["rooms"]))
+ self._check_fields(channel_asc.json_body["rooms"])
+
+ # get list in backward direction
+ channel_desc = self.make_request(
+ "GET",
+ self.url + "?dir=b",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel_desc.code, msg=channel_desc.json_body)
+ self.assertEqual(channel_desc.json_body["total"], number_rooms)
+ self.assertEqual(number_rooms, len(channel_desc.json_body["rooms"]))
+ self._check_fields(channel_desc.json_body["rooms"])
+
+ # test that both lists have different directions
+ for i in range(0, number_rooms):
+ self.assertEqual(
+ channel_asc.json_body["rooms"][i]["room_id"],
+ channel_desc.json_body["rooms"][number_rooms - 1 - i]["room_id"],
+ )
+
+ def test_next_token(self) -> None:
+ """Testing that `next_token` appears at the right place"""
+
+ number_rooms = 5
+ self._create_destination_rooms(number_rooms)
+
+ # `next_token` does not appear
+ # Number of results is the number of entries
+ channel = self.make_request(
+ "GET",
+ self.url + "?limit=5",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_rooms)
+ self.assertEqual(len(channel.json_body["rooms"]), number_rooms)
+ self.assertNotIn("next_token", channel.json_body)
+
+ # `next_token` does not appear
+ # Number of max results is larger than the number of entries
+ channel = self.make_request(
+ "GET",
+ self.url + "?limit=6",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_rooms)
+ self.assertEqual(len(channel.json_body["rooms"]), number_rooms)
+ self.assertNotIn("next_token", channel.json_body)
+
+ # `next_token` does appear
+ # Number of max results is smaller than the number of entries
+ channel = self.make_request(
+ "GET",
+ self.url + "?limit=4",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_rooms)
+ self.assertEqual(len(channel.json_body["rooms"]), 4)
+ self.assertEqual(channel.json_body["next_token"], "4")
+
+ # Check
+ # Set `from` to value of `next_token` for request remaining entries
+ # `next_token` does not appear
+ channel = self.make_request(
+ "GET",
+ self.url + "?from=4",
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_rooms)
+ self.assertEqual(len(channel.json_body["rooms"]), 1)
+ self.assertNotIn("next_token", channel.json_body)
+
+ def test_destination_rooms(self) -> None:
+ """Testing that request the list of rooms is successfully."""
+ number_rooms = 3
+ self._create_destination_rooms(number_rooms)
+
+ channel = self.make_request(
+ "GET",
+ self.url,
+ access_token=self.admin_user_tok,
+ )
+
+ self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+ self.assertEqual(channel.json_body["total"], number_rooms)
+ self.assertEqual(number_rooms, len(channel.json_body["rooms"]))
+ self._check_fields(channel.json_body["rooms"])
+
+ def _create_destination_rooms(self, number_rooms: int) -> None:
+ """Create a number rooms for destination
+
+ Args:
+ number_rooms: Number of rooms to be created
+ """
+ for _ in range(0, number_rooms):
+ room_id = self.helper.create_room_as(
+ self.admin_user, tok=self.admin_user_tok
+ )
+ self.get_success(
+ self.store.store_destination_rooms_entries((self.dest,), room_id, 1234)
+ )
+
+ def _check_fields(self, content: List[JsonDict]) -> None:
+ """Checks that the expected room attributes are present in content
+
+ Args:
+ content: List that is checked for content
+ """
+ for c in content:
+ self.assertIn("room_id", c)
+ self.assertIn("stream_ordering", c)
|