diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/rest/admin/__init__.py | 6 | ||||
-rw-r--r-- | synapse/rest/admin/federation.py | 135 | ||||
-rw-r--r-- | synapse/storage/databases/main/transactions.py | 70 |
3 files changed, 211 insertions, 0 deletions
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index c51a029bf3..c499afd4be 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -40,6 +40,10 @@ from synapse.rest.admin.event_reports import ( EventReportDetailRestServlet, EventReportsRestServlet, ) +from synapse.rest.admin.federation import ( + DestinationsRestServlet, + ListDestinationsRestServlet, +) from synapse.rest.admin.groups import DeleteGroupAdminRestServlet from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo from synapse.rest.admin.registration_tokens import ( @@ -261,6 +265,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ListRegistrationTokensRestServlet(hs).register(http_server) NewRegistrationTokenRestServlet(hs).register(http_server) RegistrationTokenRestServlet(hs).register(http_server) + DestinationsRestServlet(hs).register(http_server) + ListDestinationsRestServlet(hs).register(http_server) # Some servlets only get registered for the main process. if hs.config.worker.worker_app is None: diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py new file mode 100644 index 0000000000..744687be35 --- /dev/null +++ b/synapse/rest/admin/federation.py @@ -0,0 +1,135 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from http import HTTPStatus +from typing import TYPE_CHECKING, Tuple + +from synapse.api.errors import Codes, NotFoundError, SynapseError +from synapse.http.servlet import RestServlet, parse_integer, parse_string +from synapse.http.site import SynapseRequest +from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin +from synapse.storage.databases.main.transactions import DestinationSortOrder +from synapse.types import JsonDict + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class ListDestinationsRestServlet(RestServlet): + """Get request to list all destinations. + This needs user to have administrator access in Synapse. + + GET /_synapse/admin/v1/federation/destinations?from=0&limit=10 + + returns: + 200 OK with list of destinations if success otherwise an error. + + The parameters `from` and `limit` are required only for pagination. + By default, a `limit` of 100 is used. + The parameter `destination` can be used to filter by destination. + The parameter `order_by` can be used to order the result. + """ + + PATTERNS = admin_patterns("/federation/destinations$") + + def __init__(self, hs: "HomeServer"): + self._auth = hs.get_auth() + self._store = hs.get_datastore() + + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + await assert_requester_is_admin(self._auth, request) + + start = parse_integer(request, "from", default=0) + limit = parse_integer(request, "limit", default=100) + + if start < 0: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Query parameter from must be a string representing a positive integer.", + errcode=Codes.INVALID_PARAM, + ) + + if limit < 0: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Query parameter limit must be a string representing a positive integer.", + errcode=Codes.INVALID_PARAM, + ) + + destination = parse_string(request, "destination") + + order_by = parse_string( + request, + "order_by", + default=DestinationSortOrder.DESTINATION.value, + allowed_values=[dest.value for dest in DestinationSortOrder], + ) + + direction = parse_string(request, "dir", default="f", allowed_values=("f", "b")) + + destinations, total = await self._store.get_destinations_paginate( + start, limit, destination, order_by, direction + ) + response = {"destinations": destinations, "total": total} + if (start + limit) < total: + response["next_token"] = str(start + len(destinations)) + + return HTTPStatus.OK, response + + +class DestinationsRestServlet(RestServlet): + """Get details of a destination. + This needs user to have administrator access in Synapse. + + GET /_synapse/admin/v1/federation/destinations/<destination> + + returns: + 200 OK with details of a destination if success otherwise an error. + """ + + PATTERNS = admin_patterns("/federation/destinations/(?P<destination>[^/]+)$") + + def __init__(self, hs: "HomeServer"): + self._auth = hs.get_auth() + self._store = hs.get_datastore() + + async def on_GET( + self, request: SynapseRequest, destination: str + ) -> Tuple[int, JsonDict]: + await assert_requester_is_admin(self._auth, request) + + destination_retry_timings = await self._store.get_destination_retry_timings( + destination + ) + + if not destination_retry_timings: + raise NotFoundError("Unknown destination") + + last_successful_stream_ordering = ( + await self._store.get_destination_last_successful_stream_ordering( + destination + ) + ) + + response = { + "destination": destination, + "failure_ts": destination_retry_timings.failure_ts, + "retry_last_ts": destination_retry_timings.retry_last_ts, + "retry_interval": destination_retry_timings.retry_interval, + "last_successful_stream_ordering": last_successful_stream_ordering, + } + + return HTTPStatus.OK, response diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index d7dc1f73ac..1622822552 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,6 +14,7 @@ import logging from collections import namedtuple +from enum import Enum from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple import attr @@ -44,6 +45,16 @@ _UpdateTransactionRow = namedtuple( ) +class DestinationSortOrder(Enum): + """Enum to define the sorting method used when returning destinations.""" + + DESTINATION = "destination" + RETRY_LAST_TS = "retry_last_ts" + RETTRY_INTERVAL = "retry_interval" + FAILURE_TS = "failure_ts" + LAST_SUCCESSFUL_STREAM_ORDERING = "last_successful_stream_ordering" + + @attr.s(slots=True, frozen=True, auto_attribs=True) class DestinationRetryTimings: """The current destination retry timing info for a remote server.""" @@ -480,3 +491,62 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): destinations = [row[0] for row in txn] return destinations + + async def get_destinations_paginate( + self, + start: int, + limit: int, + destination: Optional[str] = None, + order_by: str = DestinationSortOrder.DESTINATION.value, + direction: str = "f", + ) -> Tuple[List[JsonDict], int]: + """Function to retrieve a paginated list of destinations. + This will return a json list of destinations and the + total number of destinations matching the filter criteria. + + Args: + start: start number to begin the query from + limit: number of rows to retrieve + destination: search string in destination + order_by: the sort order of the returned list + direction: sort ascending or descending + Returns: + A tuple of a list of mappings from destination to information + and a count of total destinations. + """ + + def get_destinations_paginate_txn( + txn: LoggingTransaction, + ) -> Tuple[List[JsonDict], int]: + order_by_column = DestinationSortOrder(order_by).value + + if direction == "b": + order = "DESC" + else: + order = "ASC" + + args = [] + where_statement = "" + if destination: + args.extend(["%" + destination.lower() + "%"]) + where_statement = "WHERE LOWER(destination) LIKE ?" + + sql_base = f"FROM destinations {where_statement} " + sql = f"SELECT COUNT(*) as total_destinations {sql_base}" + txn.execute(sql, args) + count = txn.fetchone()[0] + + sql = f""" + SELECT destination, retry_last_ts, retry_interval, failure_ts, + last_successful_stream_ordering + {sql_base} + ORDER BY {order_by_column} {order}, destination ASC + LIMIT ? OFFSET ? + """ + txn.execute(sql, args + [limit, start]) + destinations = self.db_pool.cursor_to_dict(txn) + return destinations, count + + return await self.db_pool.runInteraction( + "get_destinations_paginate_txn", get_destinations_paginate_txn + ) |