summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/11658.feature1
-rw-r--r--docs/usage/administration/admin_api/federation.md60
-rw-r--r--synapse/rest/admin/__init__.py2
-rw-r--r--synapse/rest/admin/federation.py56
-rw-r--r--synapse/storage/databases/main/transactions.py48
-rw-r--r--tests/rest/admin/test_federation.py302
6 files changed, 444 insertions, 25 deletions
diff --git a/changelog.d/11658.feature b/changelog.d/11658.feature
new file mode 100644
index 0000000000..2ec9fb5eec
--- /dev/null
+++ b/changelog.d/11658.feature
@@ -0,0 +1 @@
+Add an admin API to get a list of rooms that federate with a given remote homeserver.
\ No newline at end of file
diff --git a/docs/usage/administration/admin_api/federation.md b/docs/usage/administration/admin_api/federation.md
index 5e609561a6..60cbc5265e 100644
--- a/docs/usage/administration/admin_api/federation.md
+++ b/docs/usage/administration/admin_api/federation.md
@@ -119,6 +119,66 @@ The following parameters should be set in the URL:
 The response fields are the same like in the `destinations` array in
 [List of destinations](#list-of-destinations) response.
 
+## Destination rooms
+
+This API gets the rooms that federate with a specific remote server.
+
+The API is:
+
+```
+GET /_synapse/admin/v1/federation/destinations/<destination>/rooms
+```
+
+A response body like the following is returned:
+
+```json
+{
+   "rooms":[
+      {
+         "room_id": "!OGEhHVWSdvArJzumhm:matrix.org",
+         "stream_ordering": 8326
+      },
+      {
+         "room_id": "!xYvNcQPhnkrdUmYczI:matrix.org",
+         "stream_ordering": 93534
+      }
+   ],
+   "total": 2
+}
+```
+
+To paginate, check for `next_token` and if present, call the endpoint again
+with `from` set to the value of `next_token`. This will return a new page.
+
+If the endpoint does not return a `next_token` then there are no more destinations
+to paginate through.
+
+**Parameters**
+
+The following parameters should be set in the URL:
+
+- `destination` - Name of the remote server.
+
+The following query parameters are available:
+
+- `from` - Offset in the returned list. Defaults to `0`.
+- `limit` - Maximum amount of destinations to return. Defaults to `100`.
+- `dir` - Direction of room order by `room_id`. Either `f` for forwards or `b` for
+  backwards. Defaults to `f`.
+
+**Response**
+
+The following fields are returned in the JSON response body:
+
+- `rooms` - An array of objects, each containing information about a room.
+  Room objects contain the following fields:
+  - `room_id` - string - The ID of the room.
+  - `stream_ordering` - integer -  The stream ordering of the most recent
+    successfully-sent [PDU](understanding_synapse_through_grafana_graphs.md#federation)
+    to this destination in this room.
+- `next_token`: string representing a positive integer - Indication for pagination. See above.
+- `total` - integer - Total number of destinations.
+
 ## Reset connection timeout
 
 Synapse makes federation requests to other homeservers. If a federation request fails,
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index b1e49d51b7..9be9e33c8e 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -41,6 +41,7 @@ from synapse.rest.admin.event_reports import (
     EventReportsRestServlet,
 )
 from synapse.rest.admin.federation import (
+    DestinationMembershipRestServlet,
     DestinationResetConnectionRestServlet,
     DestinationRestServlet,
     ListDestinationsRestServlet,
@@ -268,6 +269,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ListRegistrationTokensRestServlet(hs).register(http_server)
     NewRegistrationTokenRestServlet(hs).register(http_server)
     RegistrationTokenRestServlet(hs).register(http_server)
+    DestinationMembershipRestServlet(hs).register(http_server)
     DestinationResetConnectionRestServlet(hs).register(http_server)
     DestinationRestServlet(hs).register(http_server)
     ListDestinationsRestServlet(hs).register(http_server)
diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py
index 0f33f9e4da..d162e0081e 100644
--- a/synapse/rest/admin/federation.py
+++ b/synapse/rest/admin/federation.py
@@ -148,6 +148,62 @@ class DestinationRestServlet(RestServlet):
         return HTTPStatus.OK, response
 
 
+class DestinationMembershipRestServlet(RestServlet):
+    """Get list of rooms of a destination.
+    This needs user to have administrator access in Synapse.
+
+    GET /_synapse/admin/v1/federation/destinations/<destination>/rooms?from=0&limit=10
+
+    returns:
+        200 OK with a list of rooms if success otherwise an error.
+
+    The parameters `from` and `limit` are required only for pagination.
+    By default, a `limit` of 100 is used.
+    """
+
+    PATTERNS = admin_patterns("/federation/destinations/(?P<destination>[^/]*)/rooms$")
+
+    def __init__(self, hs: "HomeServer"):
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastore()
+
+    async def on_GET(
+        self, request: SynapseRequest, destination: str
+    ) -> Tuple[int, JsonDict]:
+        await assert_requester_is_admin(self._auth, request)
+
+        if not await self._store.is_destination_known(destination):
+            raise NotFoundError("Unknown destination")
+
+        start = parse_integer(request, "from", default=0)
+        limit = parse_integer(request, "limit", default=100)
+
+        if start < 0:
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "Query parameter from must be a string representing a positive integer.",
+                errcode=Codes.INVALID_PARAM,
+            )
+
+        if limit < 0:
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "Query parameter limit must be a string representing a positive integer.",
+                errcode=Codes.INVALID_PARAM,
+            )
+
+        direction = parse_string(request, "dir", default="f", allowed_values=("f", "b"))
+
+        rooms, total = await self._store.get_destination_rooms_paginate(
+            destination, start, limit, direction
+        )
+        response = {"rooms": rooms, "total": total}
+        if (start + limit) < total:
+            response["next_token"] = str(start + len(rooms))
+
+        return HTTPStatus.OK, response
+
+
 class DestinationResetConnectionRestServlet(RestServlet):
     """Reset destinations' connection timeouts and wake it up.
     This needs user to have administrator access in Synapse.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 4b78b4d098..ba79e19f7f 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -561,6 +561,54 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             "get_destinations_paginate_txn", get_destinations_paginate_txn
         )
 
+    async def get_destination_rooms_paginate(
+        self, destination: str, start: int, limit: int, direction: str = "f"
+    ) -> Tuple[List[JsonDict], int]:
+        """Function to retrieve a paginated list of destination's rooms.
+        This will return a json list of rooms and the
+        total number of rooms.
+
+        Args:
+            destination: the destination to query
+            start: start number to begin the query from
+            limit: number of rows to retrieve
+            direction: sort ascending or descending by room_id
+        Returns:
+            A tuple of a dict of rooms and a count of total rooms.
+        """
+
+        def get_destination_rooms_paginate_txn(
+            txn: LoggingTransaction,
+        ) -> Tuple[List[JsonDict], int]:
+
+            if direction == "b":
+                order = "DESC"
+            else:
+                order = "ASC"
+
+            sql = """
+                SELECT COUNT(*) as total_rooms
+                FROM destination_rooms
+                WHERE destination = ?
+                """
+            txn.execute(sql, [destination])
+            count = cast(Tuple[int], txn.fetchone())[0]
+
+            rooms = self.db_pool.simple_select_list_paginate_txn(
+                txn=txn,
+                table="destination_rooms",
+                orderby="room_id",
+                start=start,
+                limit=limit,
+                retcols=("room_id", "stream_ordering"),
+                order_direction=order,
+            )
+            return rooms, count
+
+        return await self.db_pool.runInteraction(
+            "get_destination_rooms_paginate_txn", get_destination_rooms_paginate_txn
+        )
+
     async def is_destination_known(self, destination: str) -> bool:
         """Check if a destination is known to the server."""
         result = await self.db_pool.simple_select_one_onecol(
diff --git a/tests/rest/admin/test_federation.py b/tests/rest/admin/test_federation.py
index e2d3cff2a3..71068d16cd 100644
--- a/tests/rest/admin/test_federation.py
+++ b/tests/rest/admin/test_federation.py
@@ -20,7 +20,7 @@ from twisted.test.proto_helpers import MemoryReactor
 
 import synapse.rest.admin
 from synapse.api.errors import Codes
-from synapse.rest.client import login
+from synapse.rest.client import login, room
 from synapse.server import HomeServer
 from synapse.types import JsonDict
 from synapse.util import Clock
@@ -52,9 +52,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
         ]
     )
     def test_requester_is_no_admin(self, method: str, url: str) -> None:
-        """
-        If the user is not a server admin, an error 403 is returned.
-        """
+        """If the user is not a server admin, an error 403 is returned."""
 
         self.register_user("user", "pass", admin=False)
         other_user_tok = self.login("user", "pass")
@@ -70,9 +68,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
         self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
 
     def test_invalid_parameter(self) -> None:
-        """
-        If parameters are invalid, an error is returned.
-        """
+        """If parameters are invalid, an error is returned."""
 
         # negative limit
         channel = self.make_request(
@@ -135,9 +131,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
         self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
 
     def test_limit(self) -> None:
-        """
-        Testing list of destinations with limit
-        """
+        """Testing list of destinations with limit"""
 
         number_destinations = 20
         self._create_destinations(number_destinations)
@@ -155,9 +149,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
         self._check_fields(channel.json_body["destinations"])
 
     def test_from(self) -> None:
-        """
-        Testing list of destinations with a defined starting point (from)
-        """
+        """Testing list of destinations with a defined starting point (from)"""
 
         number_destinations = 20
         self._create_destinations(number_destinations)
@@ -175,9 +167,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
         self._check_fields(channel.json_body["destinations"])
 
     def test_limit_and_from(self) -> None:
-        """
-        Testing list of destinations with a defined starting point and limit
-        """
+        """Testing list of destinations with a defined starting point and limit"""
 
         number_destinations = 20
         self._create_destinations(number_destinations)
@@ -195,9 +185,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
         self._check_fields(channel.json_body["destinations"])
 
     def test_next_token(self) -> None:
-        """
-        Testing that `next_token` appears at the right place
-        """
+        """Testing that `next_token` appears at the right place"""
 
         number_destinations = 20
         self._create_destinations(number_destinations)
@@ -256,9 +244,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
         self.assertNotIn("next_token", channel.json_body)
 
     def test_list_all_destinations(self) -> None:
-        """
-        List all destinations.
-        """
+        """List all destinations."""
         number_destinations = 5
         self._create_destinations(number_destinations)
 
@@ -277,9 +263,7 @@ class FederationTestCase(unittest.HomeserverTestCase):
         self._check_fields(channel.json_body["destinations"])
 
     def test_order_by(self) -> None:
-        """
-        Testing order list with parameter `order_by`
-        """
+        """Testing order list with parameter `order_by`"""
 
         def _order_test(
             expected_destination_list: List[str],
@@ -543,3 +527,271 @@ class FederationTestCase(unittest.HomeserverTestCase):
             self.assertIn("retry_interval", c)
             self.assertIn("failure_ts", c)
             self.assertIn("last_successful_stream_ordering", c)
+
+
+class DestinationMembershipTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastore()
+        self.admin_user = self.register_user("admin", "pass", admin=True)
+        self.admin_user_tok = self.login("admin", "pass")
+
+        self.dest = "sub0.example.com"
+        self.url = f"/_synapse/admin/v1/federation/destinations/{self.dest}/rooms"
+
+        # Record that we successfully contacted a destination in the DB.
+        self.get_success(
+            self.store.set_destination_retry_timings(self.dest, None, 0, 0)
+        )
+
+    def test_requester_is_no_admin(self) -> None:
+        """If the user is not a server admin, an error 403 is returned."""
+
+        self.register_user("user", "pass", admin=False)
+        other_user_tok = self.login("user", "pass")
+
+        channel = self.make_request(
+            "GET",
+            self.url,
+            access_token=other_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
+        self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+    def test_invalid_parameter(self) -> None:
+        """If parameters are invalid, an error is returned."""
+
+        # negative limit
+        channel = self.make_request(
+            "GET",
+            self.url + "?limit=-5",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+        self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
+
+        # negative from
+        channel = self.make_request(
+            "GET",
+            self.url + "?from=-5",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+        self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
+
+        # invalid search order
+        channel = self.make_request(
+            "GET",
+            self.url + "?dir=bar",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
+        self.assertEqual(Codes.INVALID_PARAM, channel.json_body["errcode"])
+
+        # invalid destination
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/federation/destinations/%s/rooms" % ("invalid",),
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.NOT_FOUND, channel.code, msg=channel.json_body)
+        self.assertEqual(Codes.NOT_FOUND, channel.json_body["errcode"])
+
+    def test_limit(self) -> None:
+        """Testing list of destinations with limit"""
+
+        number_rooms = 5
+        self._create_destination_rooms(number_rooms)
+
+        channel = self.make_request(
+            "GET",
+            self.url + "?limit=3",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+        self.assertEqual(channel.json_body["total"], number_rooms)
+        self.assertEqual(len(channel.json_body["rooms"]), 3)
+        self.assertEqual(channel.json_body["next_token"], "3")
+        self._check_fields(channel.json_body["rooms"])
+
+    def test_from(self) -> None:
+        """Testing list of rooms with a defined starting point (from)"""
+
+        number_rooms = 10
+        self._create_destination_rooms(number_rooms)
+
+        channel = self.make_request(
+            "GET",
+            self.url + "?from=5",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+        self.assertEqual(channel.json_body["total"], number_rooms)
+        self.assertEqual(len(channel.json_body["rooms"]), 5)
+        self.assertNotIn("next_token", channel.json_body)
+        self._check_fields(channel.json_body["rooms"])
+
+    def test_limit_and_from(self) -> None:
+        """Testing list of rooms with a defined starting point and limit"""
+
+        number_rooms = 10
+        self._create_destination_rooms(number_rooms)
+
+        channel = self.make_request(
+            "GET",
+            self.url + "?from=3&limit=5",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+        self.assertEqual(channel.json_body["total"], number_rooms)
+        self.assertEqual(channel.json_body["next_token"], "8")
+        self.assertEqual(len(channel.json_body["rooms"]), 5)
+        self._check_fields(channel.json_body["rooms"])
+
+    def test_order_direction(self) -> None:
+        """Testing order list with parameter `dir`"""
+        number_rooms = 4
+        self._create_destination_rooms(number_rooms)
+
+        # get list in forward direction
+        channel_asc = self.make_request(
+            "GET",
+            self.url + "?dir=f",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel_asc.code, msg=channel_asc.json_body)
+        self.assertEqual(channel_asc.json_body["total"], number_rooms)
+        self.assertEqual(number_rooms, len(channel_asc.json_body["rooms"]))
+        self._check_fields(channel_asc.json_body["rooms"])
+
+        # get list in backward direction
+        channel_desc = self.make_request(
+            "GET",
+            self.url + "?dir=b",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel_desc.code, msg=channel_desc.json_body)
+        self.assertEqual(channel_desc.json_body["total"], number_rooms)
+        self.assertEqual(number_rooms, len(channel_desc.json_body["rooms"]))
+        self._check_fields(channel_desc.json_body["rooms"])
+
+        # test that both lists have different directions
+        for i in range(0, number_rooms):
+            self.assertEqual(
+                channel_asc.json_body["rooms"][i]["room_id"],
+                channel_desc.json_body["rooms"][number_rooms - 1 - i]["room_id"],
+            )
+
+    def test_next_token(self) -> None:
+        """Testing that `next_token` appears at the right place"""
+
+        number_rooms = 5
+        self._create_destination_rooms(number_rooms)
+
+        #  `next_token` does not appear
+        # Number of results is the number of entries
+        channel = self.make_request(
+            "GET",
+            self.url + "?limit=5",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+        self.assertEqual(channel.json_body["total"], number_rooms)
+        self.assertEqual(len(channel.json_body["rooms"]), number_rooms)
+        self.assertNotIn("next_token", channel.json_body)
+
+        #  `next_token` does not appear
+        # Number of max results is larger than the number of entries
+        channel = self.make_request(
+            "GET",
+            self.url + "?limit=6",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+        self.assertEqual(channel.json_body["total"], number_rooms)
+        self.assertEqual(len(channel.json_body["rooms"]), number_rooms)
+        self.assertNotIn("next_token", channel.json_body)
+
+        #  `next_token` does appear
+        # Number of max results is smaller than the number of entries
+        channel = self.make_request(
+            "GET",
+            self.url + "?limit=4",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+        self.assertEqual(channel.json_body["total"], number_rooms)
+        self.assertEqual(len(channel.json_body["rooms"]), 4)
+        self.assertEqual(channel.json_body["next_token"], "4")
+
+        # Check
+        # Set `from` to value of `next_token` for request remaining entries
+        #  `next_token` does not appear
+        channel = self.make_request(
+            "GET",
+            self.url + "?from=4",
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+        self.assertEqual(channel.json_body["total"], number_rooms)
+        self.assertEqual(len(channel.json_body["rooms"]), 1)
+        self.assertNotIn("next_token", channel.json_body)
+
+    def test_destination_rooms(self) -> None:
+        """Testing that request the list of rooms is successfully."""
+        number_rooms = 3
+        self._create_destination_rooms(number_rooms)
+
+        channel = self.make_request(
+            "GET",
+            self.url,
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+        self.assertEqual(channel.json_body["total"], number_rooms)
+        self.assertEqual(number_rooms, len(channel.json_body["rooms"]))
+        self._check_fields(channel.json_body["rooms"])
+
+    def _create_destination_rooms(self, number_rooms: int) -> None:
+        """Create a number rooms for destination
+
+        Args:
+            number_rooms: Number of rooms to be created
+        """
+        for _ in range(0, number_rooms):
+            room_id = self.helper.create_room_as(
+                self.admin_user, tok=self.admin_user_tok
+            )
+            self.get_success(
+                self.store.store_destination_rooms_entries((self.dest,), room_id, 1234)
+            )
+
+    def _check_fields(self, content: List[JsonDict]) -> None:
+        """Checks that the expected room attributes are present in content
+
+        Args:
+            content: List that is checked for content
+        """
+        for c in content:
+            self.assertIn("room_id", c)
+            self.assertIn("stream_ordering", c)