diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index b44e862493..5467d129bd 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -21,13 +21,34 @@
import abc
import logging
-from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Set
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Sequence,
+ Set,
+ Tuple,
+)
import attr
-from synapse.api.constants import Direction, Membership
+from synapse.api.constants import Direction, EventTypes, Membership
+from synapse.api.errors import SynapseError
from synapse.events import EventBase
-from synapse.types import JsonMapping, RoomStreamToken, StateMap, UserID, UserInfo
+from synapse.types import (
+ JsonMapping,
+ Requester,
+ RoomStreamToken,
+ ScheduledTask,
+ StateMap,
+ TaskStatus,
+ UserID,
+ UserInfo,
+ create_requester,
+)
from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
@@ -35,6 +56,8 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+REDACT_ALL_EVENTS_ACTION_NAME = "redact_all_events"
+
class AdminHandler:
def __init__(self, hs: "HomeServer"):
@@ -43,6 +66,22 @@ class AdminHandler:
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
self._msc3866_enabled = hs.config.experimental.msc3866.enabled
+ self.event_creation_handler = hs.get_event_creation_handler()
+ self._task_scheduler = hs.get_task_scheduler()
+
+ self._task_scheduler.register_action(
+ self._redact_all_events, REDACT_ALL_EVENTS_ACTION_NAME
+ )
+
+ self.hs = hs
+
+ async def get_redact_task(self, redact_id: str) -> Optional[ScheduledTask]:
+ """Get the current status of an active redaction process
+
+ Args:
+ redact_id: redact_id returned by start_redact_events.
+ """
+ return await self._task_scheduler.get_task(redact_id)
async def get_whois(self, user: UserID) -> JsonMapping:
connections = []
@@ -85,6 +124,7 @@ class AdminHandler:
"consent_ts": user_info.consent_ts,
"user_type": user_info.user_type,
"is_guest": user_info.is_guest,
+ "suspended": user_info.suspended,
}
if self._msc3866_enabled:
@@ -93,7 +133,6 @@ class AdminHandler:
# Add additional user metadata
profile = await self._store.get_profileinfo(user)
- threepids = await self._store.user_get_threepids(user.to_string())
external_ids = [
({"auth_provider": auth_provider, "external_id": external_id})
for auth_provider, external_id in await self._store.get_external_ids_by_user(
@@ -102,7 +141,6 @@ class AdminHandler:
]
user_info_dict["displayname"] = profile.display_name
user_info_dict["avatar_url"] = profile.avatar_url
- user_info_dict["threepids"] = [attr.asdict(t) for t in threepids]
user_info_dict["external_ids"] = external_ids
user_info_dict["erased"] = await self._store.is_user_erased(user.to_string())
@@ -197,14 +235,16 @@ class AdminHandler:
# events that we have and then filtering, this isn't the most
# efficient method perhaps but it does guarantee we get everything.
while True:
- events, _ = (
- await self._store.paginate_room_events_by_topological_ordering(
- room_id=room_id,
- from_key=from_key,
- to_key=to_key,
- limit=100,
- direction=Direction.FORWARDS,
- )
+ (
+ events,
+ _,
+ _,
+ ) = await self._store.paginate_room_events_by_topological_ordering(
+ room_id=room_id,
+ from_key=from_key,
+ to_key=to_key,
+ limit=100,
+ direction=Direction.FORWARDS,
)
if not events:
break
@@ -311,6 +351,155 @@ class AdminHandler:
return writer.finished()
+ async def start_redact_events(
+ self,
+ user_id: str,
+ rooms: list,
+ requester: JsonMapping,
+ reason: Optional[str],
+ limit: Optional[int],
+ ) -> str:
+ """
+ Start a task redacting the events of the given user in the given rooms
+
+ Args:
+ user_id: the user ID of the user whose events should be redacted
+ rooms: the rooms in which to redact the user's events
+ requester: the user requesting the events
+ reason: reason for requesting the redaction, ie spam, etc
+ limit: limit on the number of events in each room to redact
+
+ Returns:
+ a unique ID which can be used to query the status of the task
+ """
+ active_tasks = await self._task_scheduler.get_tasks(
+ actions=[REDACT_ALL_EVENTS_ACTION_NAME],
+ resource_id=user_id,
+ statuses=[TaskStatus.ACTIVE],
+ )
+
+ if len(active_tasks) > 0:
+ raise SynapseError(
+ 400, "Redact already in progress for user %s" % (user_id,)
+ )
+
+ if not limit:
+ limit = 1000
+
+ redact_id = await self._task_scheduler.schedule_task(
+ REDACT_ALL_EVENTS_ACTION_NAME,
+ resource_id=user_id,
+ params={
+ "rooms": rooms,
+ "requester": requester,
+ "user_id": user_id,
+ "reason": reason,
+ "limit": limit,
+ },
+ )
+
+ logger.info(
+ "starting redact events with redact_id %s",
+ redact_id,
+ )
+
+ return redact_id
+
+ async def _redact_all_events(
+ self, task: ScheduledTask
+ ) -> Tuple[TaskStatus, Optional[Mapping[str, Any]], Optional[str]]:
+ """
+ Task to redact all of a users events in the given rooms, tracking which, if any, events
+ whose redaction failed
+ """
+
+ assert task.params is not None
+ rooms = task.params.get("rooms")
+ assert rooms is not None
+
+ r = task.params.get("requester")
+ assert r is not None
+ admin = Requester.deserialize(self._store, r)
+
+ user_id = task.params.get("user_id")
+ assert user_id is not None
+
+ # puppet the user if they're ours, otherwise use admin to redact
+ requester = create_requester(
+ user_id if self.hs.is_mine_id(user_id) else admin.user.to_string(),
+ authenticated_entity=admin.user.to_string(),
+ )
+
+ reason = task.params.get("reason")
+ limit = task.params.get("limit")
+ assert limit is not None
+
+ result: Mapping[str, Any] = (
+ task.result if task.result else {"failed_redactions": {}}
+ )
+ for room in rooms:
+ room_version = await self._store.get_room_version(room)
+ event_ids = await self._store.get_events_sent_by_user_in_room(
+ user_id,
+ room,
+ limit,
+ ["m.room.member", "m.room.message", "m.room.encrypted"],
+ )
+ if not event_ids:
+ # nothing to redact in this room
+ continue
+
+ events = await self._store.get_events_as_list(event_ids)
+ for event in events:
+ # we care about join events but not other membership events
+ if event.type == "m.room.member":
+ content = event.content
+ if content:
+ if content.get("membership") == Membership.JOIN:
+ pass
+ else:
+ continue
+ relations = await self._store.get_relations_for_event(
+ room, event.event_id, event, event_type=EventTypes.Redaction
+ )
+
+ # if we've already successfully redacted this event then skip processing it
+ if relations[0]:
+ continue
+
+ event_dict = {
+ "type": EventTypes.Redaction,
+ "content": {"reason": reason} if reason else {},
+ "room_id": room,
+ "sender": requester.user.to_string(),
+ }
+ if room_version.updated_redaction_rules:
+ event_dict["content"]["redacts"] = event.event_id
+ else:
+ event_dict["redacts"] = event.event_id
+
+ try:
+ # set the prev event to the offending message to allow for redactions
+ # to be processed in the case where the user has been kicked/banned before
+ # redactions are requested
+ (
+ redaction,
+ _,
+ ) = await self.event_creation_handler.create_and_send_nonmember_event(
+ requester,
+ event_dict,
+ prev_event_ids=[event.event_id],
+ ratelimit=False,
+ )
+ except Exception as ex:
+ logger.info(
+ f"Redaction of event {event.event_id} failed due to: {ex}"
+ )
+ result["failed_redactions"][event.event_id] = str(ex)
+ await self._task_scheduler.update_task(task.id, result=result)
+
+ return TaskStatus.COMPLETE, result, None
+
class ExfiltrationWriter(metaclass=abc.ABCMeta):
"""Interface used to specify how to write exported data."""
|