diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index cdaee17451..e55cdc0470 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -39,7 +39,7 @@ from typing import TYPE_CHECKING, Optional, Tuple
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.handlers.pagination import PURGE_HISTORY_ACTION_NAME
-from synapse.http.server import HttpServer, JsonResource
+from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
@@ -51,6 +51,7 @@ from synapse.rest.admin.background_updates import (
from synapse.rest.admin.devices import (
DeleteDevicesRestServlet,
DeviceRestServlet,
+ DevicesGetRestServlet,
DevicesRestServlet,
)
from synapse.rest.admin.event_reports import (
@@ -86,6 +87,7 @@ from synapse.rest.admin.rooms import (
RoomStateRestServlet,
RoomTimestampToEventRestServlet,
)
+from synapse.rest.admin.scheduled_tasks import ScheduledTasksRestServlet
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.rest.admin.statistics import (
LargestRoomsStatistics,
@@ -98,6 +100,8 @@ from synapse.rest.admin.users import (
DeactivateAccountRestServlet,
PushersRestServlet,
RateLimitRestServlet,
+ RedactUser,
+ RedactUserStatus,
ResetPasswordRestServlet,
SearchUsersRestServlet,
ShadowBanRestServlet,
@@ -105,6 +109,8 @@ from synapse.rest.admin.users import (
UserAdminServlet,
UserByExternalId,
UserByThreePid,
+ UserInvitesCount,
+ UserJoinedRoomCount,
UserMembershipRestServlet,
UserRegisterServlet,
UserReplaceMasterCrossSigningKeyRestServlet,
@@ -201,8 +207,7 @@ class PurgeHistoryRestServlet(RestServlet):
(stream, topo, _event_id) = r
token = "t%d-%d" % (topo, stream)
logger.info(
- "[purge] purging up to token %s (received_ts %i => "
- "stream_ordering %i)",
+ "[purge] purging up to token %s (received_ts %i => stream_ordering %i)",
token,
ts,
stream_ordering,
@@ -259,27 +264,24 @@ class PurgeHistoryStatusRestServlet(RestServlet):
########################################################################################
-class AdminRestResource(JsonResource):
- """The REST resource which gets mounted at /_synapse/admin"""
-
- def __init__(self, hs: "HomeServer"):
- JsonResource.__init__(self, hs, canonical_json=False)
- register_servlets(hs, self)
-
-
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
"""
Register all the admin servlets.
"""
- # Admin servlets aren't registered on workers.
+ RoomRestServlet(hs).register(http_server)
+
+ # Admin servlets below may not work on workers.
if hs.config.worker.worker_app is not None:
+ # Some admin servlets can be mounted on workers when MSC3861 is enabled.
+ if hs.config.experimental.msc3861.enabled:
+ register_servlets_for_msc3861_delegation(hs, http_server)
+
return
register_servlets_for_client_rest_resource(hs, http_server)
BlockRoomRestServlet(hs).register(http_server)
ListRoomRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server)
- RoomRestServlet(hs).register(http_server)
RoomRestV2Servlet(hs).register(http_server)
RoomMembersRestServlet(hs).register(http_server)
DeleteRoomStatusByDeleteIdRestServlet(hs).register(http_server)
@@ -319,6 +321,10 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
UserByExternalId(hs).register(http_server)
UserByThreePid(hs).register(http_server)
+ RedactUser(hs).register(http_server)
+ RedactUserStatus(hs).register(http_server)
+ UserInvitesCount(hs).register(http_server)
+ UserJoinedRoomCount(hs).register(http_server)
DeviceRestServlet(hs).register(http_server)
DevicesRestServlet(hs).register(http_server)
@@ -328,8 +334,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
BackgroundUpdateRestServlet(hs).register(http_server)
BackgroundUpdateStartJobRestServlet(hs).register(http_server)
ExperimentalFeaturesRestServlet(hs).register(http_server)
- if hs.config.experimental.msc3823_account_suspension:
- SuspendAccountRestServlet(hs).register(http_server)
+ SuspendAccountRestServlet(hs).register(http_server)
+ ScheduledTasksRestServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(
@@ -357,4 +363,16 @@ def register_servlets_for_client_rest_resource(
ListMediaInRoom(hs).register(http_server)
# don't add more things here: new servlets should only be exposed on
- # /_synapse/admin so should not go here. Instead register them in AdminRestResource.
+ # /_synapse/admin so should not go here. Instead register them in register_servlets.
+
+
+def register_servlets_for_msc3861_delegation(
+ hs: "HomeServer", http_server: HttpServer
+) -> None:
+ """Register servlets needed by MAS when MSC3861 is enabled"""
+ assert hs.config.experimental.msc3861.enabled
+
+ UserRestServletV2(hs).register(http_server)
+ UsernameAvailableRestServlet(hs).register(http_server)
+ UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
+ DevicesGetRestServlet(hs).register(http_server)
diff --git a/synapse/rest/admin/devices.py b/synapse/rest/admin/devices.py
index 449b066923..125ed8c491 100644
--- a/synapse/rest/admin/devices.py
+++ b/synapse/rest/admin/devices.py
@@ -113,18 +113,19 @@ class DeviceRestServlet(RestServlet):
return HTTPStatus.OK, {}
-class DevicesRestServlet(RestServlet):
+class DevicesGetRestServlet(RestServlet):
"""
Retrieve the given user's devices
+
+ This can be mounted on workers as it is read-only, as opposed
+ to `DevicesRestServlet`.
"""
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2")
def __init__(self, hs: "HomeServer"):
self.auth = hs.get_auth()
- handler = hs.get_device_handler()
- assert isinstance(handler, DeviceHandler)
- self.device_handler = handler
+ self.device_worker_handler = hs.get_device_handler()
self.store = hs.get_datastores().main
self.is_mine = hs.is_mine
@@ -141,9 +142,24 @@ class DevicesRestServlet(RestServlet):
if u is None:
raise NotFoundError("Unknown user")
- devices = await self.device_handler.get_devices_by_user(target_user.to_string())
+ devices = await self.device_worker_handler.get_devices_by_user(
+ target_user.to_string()
+ )
return HTTPStatus.OK, {"devices": devices, "total": len(devices)}
+
+class DevicesRestServlet(DevicesGetRestServlet):
+ """
+ Retrieve the given user's devices
+ """
+
+ PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/devices$", "v2")
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__(hs)
+ assert isinstance(self.device_worker_handler, DeviceHandler)
+ self.device_handler = self.device_worker_handler
+
async def on_POST(
self, request: SynapseRequest, user_id: str
) -> Tuple[int, JsonDict]:
diff --git a/synapse/rest/admin/event_reports.py b/synapse/rest/admin/event_reports.py
index 9fb68bfa46..ff1abc0697 100644
--- a/synapse/rest/admin/event_reports.py
+++ b/synapse/rest/admin/event_reports.py
@@ -50,8 +50,10 @@ class EventReportsRestServlet(RestServlet):
The parameters `from` and `limit` are required only for pagination.
By default, a `limit` of 100 is used.
The parameter `dir` can be used to define the order of results.
- The parameter `user_id` can be used to filter by user id.
- The parameter `room_id` can be used to filter by room id.
+ The `user_id` query parameter filters by the user ID of the reporter of the event.
+ The `room_id` query parameter filters by room id.
+ The `event_sender_user_id` query parameter can be used to filter by the user id
+ of the sender of the reported event.
Returns:
A list of reported events and an integer representing the total number of
reported events that exist given this query
@@ -71,6 +73,7 @@ class EventReportsRestServlet(RestServlet):
direction = parse_enum(request, "dir", Direction, Direction.BACKWARDS)
user_id = parse_string(request, "user_id")
room_id = parse_string(request, "room_id")
+ event_sender_user_id = parse_string(request, "event_sender_user_id")
if start < 0:
raise SynapseError(
@@ -87,7 +90,7 @@ class EventReportsRestServlet(RestServlet):
)
event_reports, total = await self._store.get_event_reports_paginate(
- start, limit, direction, user_id, room_id
+ start, limit, direction, user_id, room_id, event_sender_user_id
)
ret = {"event_reports": event_reports, "total": total}
if (start + limit) < total:
diff --git a/synapse/rest/admin/experimental_features.py b/synapse/rest/admin/experimental_features.py
index d7913896d9..afb71f4a0f 100644
--- a/synapse/rest/admin/experimental_features.py
+++ b/synapse/rest/admin/experimental_features.py
@@ -43,12 +43,15 @@ class ExperimentalFeature(str, Enum):
MSC3881 = "msc3881"
MSC3575 = "msc3575"
+ MSC4222 = "msc4222"
def is_globally_enabled(self, config: "HomeServerConfig") -> bool:
if self is ExperimentalFeature.MSC3881:
return config.experimental.msc3881_enabled
if self is ExperimentalFeature.MSC3575:
return config.experimental.msc3575_enabled
+ if self is ExperimentalFeature.MSC4222:
+ return config.experimental.msc4222_enabled
assert_never(self)
diff --git a/synapse/rest/admin/registration_tokens.py b/synapse/rest/admin/registration_tokens.py
index 0867f7a51c..bec2331590 100644
--- a/synapse/rest/admin/registration_tokens.py
+++ b/synapse/rest/admin/registration_tokens.py
@@ -181,8 +181,7 @@ class NewRegistrationTokenRestServlet(RestServlet):
uses_allowed = body.get("uses_allowed", None)
if not (
- uses_allowed is None
- or (type(uses_allowed) is int and uses_allowed >= 0) # noqa: E721
+ uses_allowed is None or (type(uses_allowed) is int and uses_allowed >= 0) # noqa: E721
):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 01f9de9ffa..adac1f0362 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -23,6 +23,7 @@ from http import HTTPStatus
from typing import TYPE_CHECKING, List, Optional, Tuple, cast
import attr
+from immutabledict import immutabledict
from synapse.api.constants import Direction, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
@@ -149,6 +150,7 @@ class RoomRestV2Servlet(RestServlet):
def _convert_delete_task_to_response(task: ScheduledTask) -> JsonDict:
return {
"delete_id": task.id,
+ "room_id": task.resource_id,
"status": task.status,
"shutdown_room": task.result,
}
@@ -249,6 +251,10 @@ class ListRoomRestServlet(RestServlet):
direction = parse_enum(request, "dir", Direction, default=Direction.FORWARDS)
reverse_order = True if direction == Direction.BACKWARDS else False
+ emma_include_tombstone = parse_boolean(
+ request, "emma_include_tombstone", default=False
+ )
+
# Return list of rooms according to parameters
rooms, total_rooms = await self.store.get_rooms_paginate(
start,
@@ -258,6 +264,7 @@ class ListRoomRestServlet(RestServlet):
search_term,
public_rooms,
empty_rooms,
+ emma_include_tombstone = emma_include_tombstone
)
response = {
@@ -463,7 +470,18 @@ class RoomStateRestServlet(RestServlet):
if not room:
raise NotFoundError("Room not found")
- event_ids = await self._storage_controllers.state.get_current_state_ids(room_id)
+ state_filter = None
+ type = parse_string(request, "type")
+
+ if type:
+ state_filter = StateFilter(
+ types=immutabledict({type: None}),
+ include_others=False,
+ )
+
+ event_ids = await self._storage_controllers.state.get_current_state_ids(
+ room_id, state_filter
+ )
events = await self.store.get_events(event_ids.values())
now = self.clock.time_msec()
room_state = await self._event_serializer.serialize_events(events.values(), now)
diff --git a/synapse/rest/admin/scheduled_tasks.py b/synapse/rest/admin/scheduled_tasks.py
new file mode 100644
index 0000000000..2ae13021b9
--- /dev/null
+++ b/synapse/rest/admin/scheduled_tasks.py
@@ -0,0 +1,70 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2025 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+#
+#
+from typing import TYPE_CHECKING, Tuple
+
+from synapse.http.servlet import RestServlet, parse_integer, parse_string
+from synapse.http.site import SynapseRequest
+from synapse.rest.admin import admin_patterns, assert_requester_is_admin
+from synapse.types import JsonDict, TaskStatus
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
+class ScheduledTasksRestServlet(RestServlet):
+ """Get a list of scheduled tasks and their statuses
+ optionally filtered by action name, resource id, status, and max timestamp
+ """
+
+ PATTERNS = admin_patterns("/scheduled_tasks$")
+
+ def __init__(self, hs: "HomeServer"):
+ self._auth = hs.get_auth()
+ self._store = hs.get_datastores().main
+
+ async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self._auth, request)
+
+ # extract query params
+ action_name = parse_string(request, "action_name")
+ resource_id = parse_string(request, "resource_id")
+ status = parse_string(request, "job_status")
+ max_timestamp = parse_integer(request, "max_timestamp")
+
+ actions = [action_name] if action_name else None
+ statuses = [TaskStatus(status)] if status else None
+
+ tasks = await self._store.get_scheduled_tasks(
+ actions=actions,
+ resource_id=resource_id,
+ statuses=statuses,
+ max_timestamp=max_timestamp,
+ )
+
+ json_tasks = []
+ for task in tasks:
+ result_task = {
+ "id": task.id,
+ "action": task.action,
+ "status": task.status,
+ "timestamp_ms": task.timestamp,
+ "resource_id": task.resource_id,
+ "result": task.result,
+ "error": task.error,
+ }
+ json_tasks.append(result_task)
+
+ return 200, {"scheduled_tasks": json_tasks}
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index ad515bd5a3..7b8f1d1b2a 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -27,7 +27,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
import attr
-from synapse._pydantic_compat import HAS_PYDANTIC_V2
+from synapse._pydantic_compat import StrictBool, StrictInt, StrictStr
from synapse.api.constants import Direction, UserTypes
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.servlet import (
@@ -50,17 +50,12 @@ from synapse.rest.admin._base import (
from synapse.rest.client._base import client_patterns
from synapse.storage.databases.main.registration import ExternalIDReuseException
from synapse.storage.databases.main.stats import UserSortOrder
-from synapse.types import JsonDict, JsonMapping, UserID
+from synapse.types import JsonDict, JsonMapping, TaskStatus, UserID
from synapse.types.rest import RequestBodyModel
if TYPE_CHECKING:
from synapse.server import HomeServer
-if TYPE_CHECKING or HAS_PYDANTIC_V2:
- from pydantic.v1 import StrictBool
-else:
- from pydantic import StrictBool
-
logger = logging.getLogger(__name__)
@@ -988,7 +983,7 @@ class UserAdminServlet(RestServlet):
class UserMembershipRestServlet(RestServlet):
"""
- Get room list of an user.
+ Get list of joined room ID's for a user.
"""
PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/joined_rooms$")
@@ -1004,8 +999,9 @@ class UserMembershipRestServlet(RestServlet):
await assert_requester_is_admin(self.auth, request)
room_ids = await self.store.get_rooms_for_user(user_id)
- ret = {"joined_rooms": list(room_ids), "total": len(room_ids)}
- return HTTPStatus.OK, ret
+ rooms_response = {"joined_rooms": list(room_ids), "total": len(room_ids)}
+
+ return HTTPStatus.OK, rooms_response
class PushersRestServlet(RestServlet):
@@ -1410,3 +1406,146 @@ class UserByThreePid(RestServlet):
raise NotFoundError("User not found")
return HTTPStatus.OK, {"user_id": user_id}
+
+
+class RedactUser(RestServlet):
+ """
+ Redact all the events of a given user in the given rooms or if empty dict is provided
+ then all events in all rooms user is member of. Kicks off a background process and
+ returns an id that can be used to check on the progress of the redaction progress
+ """
+
+ PATTERNS = admin_patterns("/user/(?P<user_id>[^/]*)/redact")
+
+ def __init__(self, hs: "HomeServer"):
+ self._auth = hs.get_auth()
+ self._store = hs.get_datastores().main
+ self.admin_handler = hs.get_admin_handler()
+
+ class PostBody(RequestBodyModel):
+ rooms: List[StrictStr]
+ reason: Optional[StrictStr]
+ limit: Optional[StrictInt]
+
+ async def on_POST(
+ self, request: SynapseRequest, user_id: str
+ ) -> Tuple[int, JsonDict]:
+ requester = await self._auth.get_user_by_req(request)
+ await assert_user_is_admin(self._auth, requester)
+
+ # parse provided user id to check that it is valid
+ UserID.from_string(user_id)
+
+ body = parse_and_validate_json_object_from_request(request, self.PostBody)
+
+ limit = body.limit
+ if limit and limit <= 0:
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "If limit is provided it must be a non-negative integer greater than 0.",
+ )
+
+ rooms = body.rooms
+ if not rooms:
+ current_rooms = list(await self._store.get_rooms_for_user(user_id))
+ banned_rooms = list(
+ await self._store.get_rooms_user_currently_banned_from(user_id)
+ )
+ rooms = current_rooms + banned_rooms
+
+ redact_id = await self.admin_handler.start_redact_events(
+ user_id, rooms, requester.serialize(), body.reason, limit
+ )
+
+ return HTTPStatus.OK, {"redact_id": redact_id}
+
+
+class RedactUserStatus(RestServlet):
+ """
+ Check on the progress of the redaction request represented by the provided ID, returning
+ the status of the process and a dict of events that were unable to be redacted, if any
+ """
+
+ PATTERNS = admin_patterns("/user/redact_status/(?P<redact_id>[^/]*)$")
+
+ def __init__(self, hs: "HomeServer"):
+ self._auth = hs.get_auth()
+ self.admin_handler = hs.get_admin_handler()
+
+ async def on_GET(
+ self, request: SynapseRequest, redact_id: str
+ ) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self._auth, request)
+
+ task = await self.admin_handler.get_redact_task(redact_id)
+
+ if task:
+ if task.status == TaskStatus.ACTIVE:
+ return HTTPStatus.OK, {"status": TaskStatus.ACTIVE}
+ elif task.status == TaskStatus.COMPLETE:
+ assert task.result is not None
+ failed_redactions = task.result.get("failed_redactions")
+ return HTTPStatus.OK, {
+ "status": TaskStatus.COMPLETE,
+ "failed_redactions": failed_redactions if failed_redactions else {},
+ }
+ elif task.status == TaskStatus.SCHEDULED:
+ return HTTPStatus.OK, {"status": TaskStatus.SCHEDULED}
+ else:
+ return HTTPStatus.OK, {
+ "status": TaskStatus.FAILED,
+ "error": (
+ task.error
+ if task.error
+ else "Unknown error, please check the logs for more information."
+ ),
+ }
+ else:
+ raise NotFoundError("redact id '%s' not found" % redact_id)
+
+
+class UserInvitesCount(RestServlet):
+ """
+ Return the count of invites that the user has sent after the given timestamp
+ """
+
+ PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/sent_invite_count")
+
+ def __init__(self, hs: "HomeServer"):
+ self._auth = hs.get_auth()
+ self.store = hs.get_datastores().main
+
+ async def on_GET(
+ self, request: SynapseRequest, user_id: str
+ ) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self._auth, request)
+ from_ts = parse_integer(request, "from_ts", required=True)
+
+ sent_invite_count = await self.store.get_sent_invite_count_by_user(
+ user_id, from_ts
+ )
+
+ return HTTPStatus.OK, {"invite_count": sent_invite_count}
+
+
+class UserJoinedRoomCount(RestServlet):
+ """
+ Return the count of rooms that the user has joined at or after the given timestamp, even
+ if they have subsequently left/been banned from those rooms.
+ """
+
+ PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/cumulative_joined_room_count")
+
+ def __init__(self, hs: "HomeServer"):
+ self._auth = hs.get_auth()
+ self.store = hs.get_datastores().main
+
+ async def on_GET(
+ self, request: SynapseRequest, user_id: str
+ ) -> Tuple[int, JsonDict]:
+ await assert_requester_is_admin(self._auth, request)
+ from_ts = parse_integer(request, "from_ts", required=True)
+
+ joined_rooms = await self.store.get_rooms_for_user_by_date(user_id, from_ts)
+
+ return HTTPStatus.OK, {"cumulative_joined_room_count": len(joined_rooms)}
|