diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index c51a029bf3..c499afd4be 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -40,6 +40,10 @@ from synapse.rest.admin.event_reports import (
EventReportDetailRestServlet,
EventReportsRestServlet,
)
+from synapse.rest.admin.federation import (
+ DestinationsRestServlet,
+ ListDestinationsRestServlet,
+)
from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
from synapse.rest.admin.registration_tokens import (
@@ -261,6 +265,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
ListRegistrationTokensRestServlet(hs).register(http_server)
NewRegistrationTokenRestServlet(hs).register(http_server)
RegistrationTokenRestServlet(hs).register(http_server)
+ DestinationsRestServlet(hs).register(http_server)
+ ListDestinationsRestServlet(hs).register(http_server)
# Some servlets only get registered for the main process.
if hs.config.worker.worker_app is None:
diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py
new file mode 100644
index 0000000000..744687be35
--- /dev/null
+++ b/synapse/rest/admin/federation.py
@@ -0,0 +1,135 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from http import HTTPStatus
+from typing import TYPE_CHECKING, Tuple
+
+from synapse.api.errors import Codes, NotFoundError, SynapseError
+from synapse.http.servlet import RestServlet, parse_integer, parse_string
+from synapse.http.site import SynapseRequest
+from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
+from synapse.storage.databases.main.transactions import DestinationSortOrder
+from synapse.types import JsonDict
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class ListDestinationsRestServlet(RestServlet):
+ """Get request to list all destinations.
+ This needs user to have administrator access in Synapse.
+
+ GET /_synapse/admin/v1/federation/destinations?from=0&limit=10
+
+ returns:
+ 200 OK with list of destinations if success otherwise an error.
+
+ The parameters `from` and `limit` are required only for pagination.
+ By default, a `limit` of 100 is used.
+ The parameter `destination` can be used to filter by destination.
+ The parameter `order_by` can be used to order the result.
+ """
+
+ PATTERNS = admin_patterns("/federation/destinations$")
+
+ def __init__(self, hs: "HomeServer"):
+ self._auth = hs.get_auth()
+ self._store = hs.get_datastore()
+
+ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self._auth, request)
+
+ start = parse_integer(request, "from", default=0)
+ limit = parse_integer(request, "limit", default=100)
+
+ if start < 0:
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "Query parameter from must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ if limit < 0:
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "Query parameter limit must be a string representing a positive integer.",
+ errcode=Codes.INVALID_PARAM,
+ )
+
+ destination = parse_string(request, "destination")
+
+ order_by = parse_string(
+ request,
+ "order_by",
+ default=DestinationSortOrder.DESTINATION.value,
+ allowed_values=[dest.value for dest in DestinationSortOrder],
+ )
+
+ direction = parse_string(request, "dir", default="f", allowed_values=("f", "b"))
+
+ destinations, total = await self._store.get_destinations_paginate(
+ start, limit, destination, order_by, direction
+ )
+ response = {"destinations": destinations, "total": total}
+ if (start + limit) < total:
+ response["next_token"] = str(start + len(destinations))
+
+ return HTTPStatus.OK, response
+
+
+class DestinationsRestServlet(RestServlet):
+ """Get details of a destination.
+ This needs user to have administrator access in Synapse.
+
+ GET /_synapse/admin/v1/federation/destinations/<destination>
+
+ returns:
+ 200 OK with details of a destination if success otherwise an error.
+ """
+
+ PATTERNS = admin_patterns("/federation/destinations/(?P<destination>[^/]+)$")
+
+ def __init__(self, hs: "HomeServer"):
+ self._auth = hs.get_auth()
+ self._store = hs.get_datastore()
+
+ async def on_GET(
+ self, request: SynapseRequest, destination: str
+ ) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self._auth, request)
+
+ destination_retry_timings = await self._store.get_destination_retry_timings(
+ destination
+ )
+
+ if not destination_retry_timings:
+ raise NotFoundError("Unknown destination")
+
+ last_successful_stream_ordering = (
+ await self._store.get_destination_last_successful_stream_ordering(
+ destination
+ )
+ )
+
+ response = {
+ "destination": destination,
+ "failure_ts": destination_retry_timings.failure_ts,
+ "retry_last_ts": destination_retry_timings.retry_last_ts,
+ "retry_interval": destination_retry_timings.retry_interval,
+ "last_successful_stream_ordering": last_successful_stream_ordering,
+ }
+
+ return HTTPStatus.OK, response
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index d7dc1f73ac..1622822552 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -14,6 +14,7 @@
import logging
from collections import namedtuple
+from enum import Enum
from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
import attr
@@ -44,6 +45,16 @@ _UpdateTransactionRow = namedtuple(
)
+class DestinationSortOrder(Enum):
+ """Enum to define the sorting method used when returning destinations."""
+
+ DESTINATION = "destination"
+ RETRY_LAST_TS = "retry_last_ts"
+ RETTRY_INTERVAL = "retry_interval"
+ FAILURE_TS = "failure_ts"
+ LAST_SUCCESSFUL_STREAM_ORDERING = "last_successful_stream_ordering"
+
+
@attr.s(slots=True, frozen=True, auto_attribs=True)
class DestinationRetryTimings:
"""The current destination retry timing info for a remote server."""
@@ -480,3 +491,62 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
destinations = [row[0] for row in txn]
return destinations
+
+ async def get_destinations_paginate(
+ self,
+ start: int,
+ limit: int,
+ destination: Optional[str] = None,
+ order_by: str = DestinationSortOrder.DESTINATION.value,
+ direction: str = "f",
+ ) -> Tuple[List[JsonDict], int]:
+ """Function to retrieve a paginated list of destinations.
+ This will return a json list of destinations and the
+ total number of destinations matching the filter criteria.
+
+ Args:
+ start: start number to begin the query from
+ limit: number of rows to retrieve
+ destination: search string in destination
+ order_by: the sort order of the returned list
+ direction: sort ascending or descending
+ Returns:
+ A tuple of a list of mappings from destination to information
+ and a count of total destinations.
+ """
+
+ def get_destinations_paginate_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[List[JsonDict], int]:
+ order_by_column = DestinationSortOrder(order_by).value
+
+ if direction == "b":
+ order = "DESC"
+ else:
+ order = "ASC"
+
+ args = []
+ where_statement = ""
+ if destination:
+ args.extend(["%" + destination.lower() + "%"])
+ where_statement = "WHERE LOWER(destination) LIKE ?"
+
+ sql_base = f"FROM destinations {where_statement} "
+ sql = f"SELECT COUNT(*) as total_destinations {sql_base}"
+ txn.execute(sql, args)
+ count = txn.fetchone()[0]
+
+ sql = f"""
+ SELECT destination, retry_last_ts, retry_interval, failure_ts,
+ last_successful_stream_ordering
+ {sql_base}
+ ORDER BY {order_by_column} {order}, destination ASC
+ LIMIT ? OFFSET ?
+ """
+ txn.execute(sql, args + [limit, start])
+ destinations = self.db_pool.cursor_to_dict(txn)
+ return destinations, count
+
+ return await self.db_pool.runInteraction(
+ "get_destinations_paginate_txn", get_destinations_paginate_txn
+ )
|