summary refs log tree commit diff
path: root/synapse/handlers/admin.py
blob: d1194545aeb65272777ac95e4cc75500f89c1e23 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2014-2016 OpenMarket Ltd
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#

import abc
import logging
from typing import (
    TYPE_CHECKING,
    Any,
    Dict,
    List,
    Mapping,
    Optional,
    Sequence,
    Set,
    Tuple,
)

import attr

from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.errors import SynapseError
from synapse.events import EventBase
from synapse.types import (
    JsonMapping,
    Requester,
    RoomStreamToken,
    ScheduledTask,
    StateMap,
    TaskStatus,
    UserID,
    UserInfo,
    create_requester,
)
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
    from synapse.server import HomeServer

logger = logging.getLogger(__name__)

REDACT_ALL_EVENTS_ACTION_NAME = "redact_all_events"


class AdminHandler:
    def __init__(self, hs: "HomeServer"):
        self._store = hs.get_datastores().main
        self._device_handler = hs.get_device_handler()
        self._storage_controllers = hs.get_storage_controllers()
        self._state_storage_controller = self._storage_controllers.state
        self._msc3866_enabled = hs.config.experimental.msc3866.enabled
        self.event_creation_handler = hs.get_event_creation_handler()
        self._task_scheduler = hs.get_task_scheduler()

        self._task_scheduler.register_action(
            self._redact_all_events, REDACT_ALL_EVENTS_ACTION_NAME
        )

        self.hs = hs

    async def get_redact_task(self, redact_id: str) -> Optional[ScheduledTask]:
        """Get the current status of an active redaction process

        Args:
            redact_id: redact_id returned by start_redact_events.
        """
        return await self._task_scheduler.get_task(redact_id)

    async def get_whois(self, user: UserID) -> JsonMapping:
        connections = []

        sessions = await self._store.get_user_ip_and_agents(user)
        for session in sessions:
            connections.append(
                {
                    "ip": session["ip"],
                    "last_seen": session["last_seen"],
                    "user_agent": session["user_agent"],
                }
            )

        ret = {
            "user_id": user.to_string(),
            "devices": {"": {"sessions": [{"connections": connections}]}},
        }

        return ret

    async def get_user(self, user: UserID) -> Optional[JsonMapping]:
        """Function to get user details"""
        user_info: Optional[UserInfo] = await self._store.get_user_by_id(
            user.to_string()
        )
        if user_info is None:
            return None

        user_info_dict = {
            "name": user.to_string(),
            "admin": user_info.is_admin,
            "deactivated": user_info.is_deactivated,
            "locked": user_info.locked,
            "shadow_banned": user_info.is_shadow_banned,
            "creation_ts": user_info.creation_ts,
            "appservice_id": user_info.appservice_id,
            "consent_server_notice_sent": user_info.consent_server_notice_sent,
            "consent_version": user_info.consent_version,
            "consent_ts": user_info.consent_ts,
            "user_type": user_info.user_type,
            "is_guest": user_info.is_guest,
            "suspended": user_info.suspended,
        }

        if self._msc3866_enabled:
            # Only include the approved flag if support for MSC3866 is enabled.
            user_info_dict["approved"] = user_info.approved

        # Add additional user metadata
        profile = await self._store.get_profileinfo(user)
        threepids = await self._store.user_get_threepids(user.to_string())
        external_ids = [
            ({"auth_provider": auth_provider, "external_id": external_id})
            for auth_provider, external_id in await self._store.get_external_ids_by_user(
                user.to_string()
            )
        ]
        user_info_dict["displayname"] = profile.display_name
        user_info_dict["avatar_url"] = profile.avatar_url
        user_info_dict["threepids"] = [attr.asdict(t) for t in threepids]
        user_info_dict["external_ids"] = external_ids
        user_info_dict["erased"] = await self._store.is_user_erased(user.to_string())

        last_seen_ts = await self._store.get_last_seen_for_user_id(user.to_string())
        user_info_dict["last_seen_ts"] = last_seen_ts

        return user_info_dict

    async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> Any:
        """Write all data we have on the user to the given writer.

        Args:
            user_id: The user ID to fetch data of.
            writer: The writer to write to.

        Returns:
            Resolves when all data for a user has been written.
            The returned value is that returned by `writer.finished()`.
        """
        # Get all rooms the user is in or has been in
        rooms = await self._store.get_rooms_for_local_user_where_membership_is(
            user_id,
            membership_list=Membership.LIST,
        )

        # We only try and fetch events for rooms the user has been in. If
        # they've been e.g. invited to a room without joining then we handle
        # those separately.
        rooms_user_has_been_in = await self._store.get_rooms_user_has_been_in(user_id)

        for index, room in enumerate(rooms):
            room_id = room.room_id

            logger.info(
                "[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
            )

            forgotten = await self._store.did_forget(user_id, room_id)
            if forgotten:
                logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
                continue

            if room_id not in rooms_user_has_been_in:
                # If we haven't been in the rooms then the filtering code below
                # won't return anything, so we need to handle these cases
                # explicitly.

                if room.membership == Membership.INVITE:
                    event_id = room.event_id
                    invite = await self._store.get_event(event_id, allow_none=True)
                    if invite:
                        invited_state = invite.unsigned["invite_room_state"]
                        writer.write_invite(room_id, invite, invited_state)

                if room.membership == Membership.KNOCK:
                    event_id = room.event_id
                    knock = await self._store.get_event(event_id, allow_none=True)
                    if knock:
                        knock_state = knock.unsigned["knock_room_state"]
                        writer.write_knock(room_id, knock, knock_state)

                continue

            # We only want to bother fetching events up to the last time they
            # were joined. We estimate that point by looking at the
            # stream_ordering of the last membership if it wasn't a join.
            if room.membership == Membership.JOIN:
                stream_ordering = self._store.get_room_max_stream_ordering()
            else:
                stream_ordering = room.event_pos.stream

            from_key = RoomStreamToken(topological=0, stream=0)
            to_key = RoomStreamToken(stream=stream_ordering)

            # Events that we've processed in this room
            written_events: Set[str] = set()

            # We need to track gaps in the events stream so that we can then
            # write out the state at those events. We do this by keeping track
            # of events whose prev events we haven't seen.

            # Map from event ID to prev events that haven't been processed,
            # dict[str, set[str]].
            event_to_unseen_prevs = {}

            # The reverse mapping to above, i.e. map from unseen event to events
            # that have the unseen event in their prev_events, i.e. the unseen
            # events "children".
            unseen_to_child_events: Dict[str, Set[str]] = {}

            # We fetch events in the room the user could see by fetching *all*
            # events that we have and then filtering, this isn't the most
            # efficient method perhaps but it does guarantee we get everything.
            while True:
                (
                    events,
                    _,
                    _,
                ) = await self._store.paginate_room_events_by_topological_ordering(
                    room_id=room_id,
                    from_key=from_key,
                    to_key=to_key,
                    limit=100,
                    direction=Direction.FORWARDS,
                )
                if not events:
                    break

                last_event = events[-1]
                assert last_event.internal_metadata.stream_ordering
                from_key = RoomStreamToken(
                    stream=last_event.internal_metadata.stream_ordering,
                    topological=last_event.depth,
                )

                events = await filter_events_for_client(
                    self._storage_controllers,
                    user_id,
                    events,
                )

                writer.write_events(room_id, events)

                # Update the extremity tracking dicts
                for event in events:
                    # Check if we have any prev events that haven't been
                    # processed yet, and add those to the appropriate dicts.
                    unseen_events = set(event.prev_event_ids()) - written_events
                    if unseen_events:
                        event_to_unseen_prevs[event.event_id] = unseen_events
                        for unseen in unseen_events:
                            unseen_to_child_events.setdefault(unseen, set()).add(
                                event.event_id
                            )

                    # Now check if this event is an unseen prev event, if so
                    # then we remove this event from the appropriate dicts.
                    for child_id in unseen_to_child_events.pop(event.event_id, []):
                        event_to_unseen_prevs[child_id].discard(event.event_id)

                    written_events.add(event.event_id)

                logger.info(
                    "Written %d events in room %s", len(written_events), room_id
                )

            # Extremities are the events who have at least one unseen prev event.
            extremities = (
                event_id
                for event_id, unseen_prevs in event_to_unseen_prevs.items()
                if unseen_prevs
            )
            for event_id in extremities:
                if not event_to_unseen_prevs[event_id]:
                    continue
                state = await self._state_storage_controller.get_state_for_event(
                    event_id
                )
                writer.write_state(room_id, event_id, state)

        # Get the user profile
        profile = await self.get_user(UserID.from_string(user_id))
        if profile is not None:
            writer.write_profile(profile)
            logger.info("[%s] Written profile", user_id)

        # Get all devices the user has
        devices = await self._device_handler.get_devices_by_user(user_id)
        writer.write_devices(devices)
        logger.info("[%s] Written %s devices", user_id, len(devices))

        # Get all connections the user has
        connections = await self.get_whois(UserID.from_string(user_id))
        writer.write_connections(
            connections["devices"][""]["sessions"][0]["connections"]
        )
        logger.info("[%s] Written %s connections", user_id, len(connections))

        # Get all account data the user has global and in rooms
        global_data = await self._store.get_global_account_data_for_user(user_id)
        by_room_data = await self._store.get_room_account_data_for_user(user_id)
        writer.write_account_data("global", global_data)
        for room_id in by_room_data:
            writer.write_account_data(room_id, by_room_data[room_id])
        logger.info(
            "[%s] Written account data for %s rooms", user_id, len(by_room_data)
        )

        # Get all media ids the user has
        limit = 100
        start = 0
        while True:
            media_ids, total = await self._store.get_local_media_by_user_paginate(
                start, limit, user_id
            )
            for media in media_ids:
                writer.write_media_id(media.media_id, attr.asdict(media))

            logger.info(
                "[%s] Written %d media_ids of %s",
                user_id,
                (start + len(media_ids)),
                total,
            )
            if (start + limit) >= total:
                break
            start += limit

        return writer.finished()

    async def start_redact_events(
        self,
        user_id: str,
        rooms: list,
        requester: JsonMapping,
        reason: Optional[str],
        limit: Optional[int],
    ) -> str:
        """
        Start a task redacting the events of the given user in the given rooms

        Args:
            user_id: the user ID of the user whose events should be redacted
            rooms: the rooms in which to redact the user's events
            requester: the user requesting the events
            reason: reason for requesting the redaction, ie spam, etc
            limit: limit on the number of events in each room to redact

        Returns:
            a unique ID which can be used to query the status of the task
        """
        active_tasks = await self._task_scheduler.get_tasks(
            actions=[REDACT_ALL_EVENTS_ACTION_NAME],
            resource_id=user_id,
            statuses=[TaskStatus.ACTIVE],
        )

        if len(active_tasks) > 0:
            raise SynapseError(
                400, "Redact already in progress for user %s" % (user_id,)
            )

        if not limit:
            limit = 1000

        redact_id = await self._task_scheduler.schedule_task(
            REDACT_ALL_EVENTS_ACTION_NAME,
            resource_id=user_id,
            params={
                "rooms": rooms,
                "requester": requester,
                "user_id": user_id,
                "reason": reason,
                "limit": limit,
            },
        )

        logger.info(
            "starting redact events with redact_id %s",
            redact_id,
        )

        return redact_id

    async def _redact_all_events(
        self, task: ScheduledTask
    ) -> Tuple[TaskStatus, Optional[Mapping[str, Any]], Optional[str]]:
        """
        Task to redact all of a users events in the given rooms, tracking which, if any, events
        whose redaction failed
        """

        assert task.params is not None
        rooms = task.params.get("rooms")
        assert rooms is not None

        r = task.params.get("requester")
        assert r is not None
        admin = Requester.deserialize(self._store, r)

        user_id = task.params.get("user_id")
        assert user_id is not None

        # puppet the user if they're ours, otherwise use admin to redact
        requester = create_requester(
            user_id if self.hs.is_mine_id(user_id) else admin.user.to_string(),
            authenticated_entity=admin.user.to_string(),
        )

        reason = task.params.get("reason")
        limit = task.params.get("limit")
        assert limit is not None

        result: Mapping[str, Any] = (
            task.result if task.result else {"failed_redactions": {}}
        )
        for room in rooms:
            room_version = await self._store.get_room_version(room)
            event_ids = await self._store.get_events_sent_by_user_in_room(
                user_id,
                room,
                limit,
                ["m.room.member", "m.room.message"],
            )
            if not event_ids:
                # nothing to redact in this room
                continue

            events = await self._store.get_events_as_list(event_ids)
            for event in events:
                # we care about join events but not other membership events
                if event.type == "m.room.member":
                    content = event.content
                    if content:
                        if content.get("membership") == Membership.JOIN:
                            pass
                        else:
                            continue
                relations = await self._store.get_relations_for_event(
                    room, event.event_id, event, event_type=EventTypes.Redaction
                )

                # if we've already successfully redacted this event then skip processing it
                if relations[0]:
                    continue

                event_dict = {
                    "type": EventTypes.Redaction,
                    "content": {"reason": reason} if reason else {},
                    "room_id": room,
                    "sender": user_id,
                }
                if room_version.updated_redaction_rules:
                    event_dict["content"]["redacts"] = event.event_id
                else:
                    event_dict["redacts"] = event.event_id

                try:
                    # set the prev event to the offending message to allow for redactions
                    # to be processed in the case where the user has been kicked/banned before
                    # redactions are requested
                    (
                        redaction,
                        _,
                    ) = await self.event_creation_handler.create_and_send_nonmember_event(
                        requester,
                        event_dict,
                        prev_event_ids=[event.event_id],
                        ratelimit=False,
                    )
                except Exception as ex:
                    logger.info(
                        f"Redaction of event {event.event_id} failed due to: {ex}"
                    )
                    result["failed_redactions"][event.event_id] = str(ex)
                    await self._task_scheduler.update_task(task.id, result=result)

        return TaskStatus.COMPLETE, result, None


class ExfiltrationWriter(metaclass=abc.ABCMeta):
    """Interface used to specify how to write exported data."""

    @abc.abstractmethod
    def write_events(self, room_id: str, events: List[EventBase]) -> None:
        """Write a batch of events for a room."""
        raise NotImplementedError()

    @abc.abstractmethod
    def write_state(
        self, room_id: str, event_id: str, state: StateMap[EventBase]
    ) -> None:
        """Write the state at the given event in the room.

        This only gets called for backward extremities rather than for each
        event.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_invite(
        self, room_id: str, event: EventBase, state: StateMap[EventBase]
    ) -> None:
        """Write an invite for the room, with associated invite state.

        Args:
            room_id: The room ID the invite is for.
            event: The invite event.
            state: A subset of the state at the invite, with a subset of the
                event keys (type, state_key content and sender).
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_knock(
        self, room_id: str, event: EventBase, state: StateMap[EventBase]
    ) -> None:
        """Write a knock for the room, with associated knock state.

        Args:
            room_id: The room ID the knock is for.
            event: The knock event.
            state: A subset of the state at the knock, with a subset of the
                event keys (type, state_key content and sender).
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_profile(self, profile: JsonMapping) -> None:
        """Write the profile of a user.

        Args:
            profile: The user profile.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_devices(self, devices: Sequence[JsonMapping]) -> None:
        """Write the devices of a user.

        Args:
            devices: The list of devices.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_connections(self, connections: Sequence[JsonMapping]) -> None:
        """Write the connections of a user.

        Args:
            connections: The list of connections / sessions.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_account_data(
        self, file_name: str, account_data: Mapping[str, JsonMapping]
    ) -> None:
        """Write the account data of a user.

        Args:
            file_name: file name to write data
            account_data: mapping of global or room account_data
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_media_id(self, media_id: str, media_metadata: JsonMapping) -> None:
        """Write the media's metadata of a user.
        Exports only the metadata, as this can be fetched from the database via
        read only. In order to access the files, a connection to the correct
        media repository would be required.

        Args:
            media_id: ID of the media.
            media_metadata: Metadata of one media file.
        """

    @abc.abstractmethod
    def finished(self) -> Any:
        """Called when all data has successfully been exported and written.

        This functions return value is passed to the caller of
        `export_user_data`.
        """
        raise NotImplementedError()