summary refs log tree commit diff
path: root/synapse/handlers/admin.py
blob: b44e862493d98eea98a3d0a61f4ddbc4a7212dec (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2014-2016 OpenMarket Ltd
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#

import abc
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Set

import attr

from synapse.api.constants import Direction, Membership
from synapse.events import EventBase
from synapse.types import JsonMapping, RoomStreamToken, StateMap, UserID, UserInfo
from synapse.visibility import filter_events_for_client

if TYPE_CHECKING:
    from synapse.server import HomeServer

logger = logging.getLogger(__name__)


class AdminHandler:
    def __init__(self, hs: "HomeServer"):
        self._store = hs.get_datastores().main
        self._device_handler = hs.get_device_handler()
        self._storage_controllers = hs.get_storage_controllers()
        self._state_storage_controller = self._storage_controllers.state
        self._msc3866_enabled = hs.config.experimental.msc3866.enabled

    async def get_whois(self, user: UserID) -> JsonMapping:
        connections = []

        sessions = await self._store.get_user_ip_and_agents(user)
        for session in sessions:
            connections.append(
                {
                    "ip": session["ip"],
                    "last_seen": session["last_seen"],
                    "user_agent": session["user_agent"],
                }
            )

        ret = {
            "user_id": user.to_string(),
            "devices": {"": {"sessions": [{"connections": connections}]}},
        }

        return ret

    async def get_user(self, user: UserID) -> Optional[JsonMapping]:
        """Function to get user details"""
        user_info: Optional[UserInfo] = await self._store.get_user_by_id(
            user.to_string()
        )
        if user_info is None:
            return None

        user_info_dict = {
            "name": user.to_string(),
            "admin": user_info.is_admin,
            "deactivated": user_info.is_deactivated,
            "locked": user_info.locked,
            "shadow_banned": user_info.is_shadow_banned,
            "creation_ts": user_info.creation_ts,
            "appservice_id": user_info.appservice_id,
            "consent_server_notice_sent": user_info.consent_server_notice_sent,
            "consent_version": user_info.consent_version,
            "consent_ts": user_info.consent_ts,
            "user_type": user_info.user_type,
            "is_guest": user_info.is_guest,
        }

        if self._msc3866_enabled:
            # Only include the approved flag if support for MSC3866 is enabled.
            user_info_dict["approved"] = user_info.approved

        # Add additional user metadata
        profile = await self._store.get_profileinfo(user)
        threepids = await self._store.user_get_threepids(user.to_string())
        external_ids = [
            ({"auth_provider": auth_provider, "external_id": external_id})
            for auth_provider, external_id in await self._store.get_external_ids_by_user(
                user.to_string()
            )
        ]
        user_info_dict["displayname"] = profile.display_name
        user_info_dict["avatar_url"] = profile.avatar_url
        user_info_dict["threepids"] = [attr.asdict(t) for t in threepids]
        user_info_dict["external_ids"] = external_ids
        user_info_dict["erased"] = await self._store.is_user_erased(user.to_string())

        last_seen_ts = await self._store.get_last_seen_for_user_id(user.to_string())
        user_info_dict["last_seen_ts"] = last_seen_ts

        return user_info_dict

    async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> Any:
        """Write all data we have on the user to the given writer.

        Args:
            user_id: The user ID to fetch data of.
            writer: The writer to write to.

        Returns:
            Resolves when all data for a user has been written.
            The returned value is that returned by `writer.finished()`.
        """
        # Get all rooms the user is in or has been in
        rooms = await self._store.get_rooms_for_local_user_where_membership_is(
            user_id,
            membership_list=Membership.LIST,
        )

        # We only try and fetch events for rooms the user has been in. If
        # they've been e.g. invited to a room without joining then we handle
        # those separately.
        rooms_user_has_been_in = await self._store.get_rooms_user_has_been_in(user_id)

        for index, room in enumerate(rooms):
            room_id = room.room_id

            logger.info(
                "[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
            )

            forgotten = await self._store.did_forget(user_id, room_id)
            if forgotten:
                logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
                continue

            if room_id not in rooms_user_has_been_in:
                # If we haven't been in the rooms then the filtering code below
                # won't return anything, so we need to handle these cases
                # explicitly.

                if room.membership == Membership.INVITE:
                    event_id = room.event_id
                    invite = await self._store.get_event(event_id, allow_none=True)
                    if invite:
                        invited_state = invite.unsigned["invite_room_state"]
                        writer.write_invite(room_id, invite, invited_state)

                if room.membership == Membership.KNOCK:
                    event_id = room.event_id
                    knock = await self._store.get_event(event_id, allow_none=True)
                    if knock:
                        knock_state = knock.unsigned["knock_room_state"]
                        writer.write_knock(room_id, knock, knock_state)

                continue

            # We only want to bother fetching events up to the last time they
            # were joined. We estimate that point by looking at the
            # stream_ordering of the last membership if it wasn't a join.
            if room.membership == Membership.JOIN:
                stream_ordering = self._store.get_room_max_stream_ordering()
            else:
                stream_ordering = room.event_pos.stream

            from_key = RoomStreamToken(topological=0, stream=0)
            to_key = RoomStreamToken(stream=stream_ordering)

            # Events that we've processed in this room
            written_events: Set[str] = set()

            # We need to track gaps in the events stream so that we can then
            # write out the state at those events. We do this by keeping track
            # of events whose prev events we haven't seen.

            # Map from event ID to prev events that haven't been processed,
            # dict[str, set[str]].
            event_to_unseen_prevs = {}

            # The reverse mapping to above, i.e. map from unseen event to events
            # that have the unseen event in their prev_events, i.e. the unseen
            # events "children".
            unseen_to_child_events: Dict[str, Set[str]] = {}

            # We fetch events in the room the user could see by fetching *all*
            # events that we have and then filtering, this isn't the most
            # efficient method perhaps but it does guarantee we get everything.
            while True:
                events, _ = (
                    await self._store.paginate_room_events_by_topological_ordering(
                        room_id=room_id,
                        from_key=from_key,
                        to_key=to_key,
                        limit=100,
                        direction=Direction.FORWARDS,
                    )
                )
                if not events:
                    break

                last_event = events[-1]
                assert last_event.internal_metadata.stream_ordering
                from_key = RoomStreamToken(
                    stream=last_event.internal_metadata.stream_ordering,
                    topological=last_event.depth,
                )

                events = await filter_events_for_client(
                    self._storage_controllers,
                    user_id,
                    events,
                )

                writer.write_events(room_id, events)

                # Update the extremity tracking dicts
                for event in events:
                    # Check if we have any prev events that haven't been
                    # processed yet, and add those to the appropriate dicts.
                    unseen_events = set(event.prev_event_ids()) - written_events
                    if unseen_events:
                        event_to_unseen_prevs[event.event_id] = unseen_events
                        for unseen in unseen_events:
                            unseen_to_child_events.setdefault(unseen, set()).add(
                                event.event_id
                            )

                    # Now check if this event is an unseen prev event, if so
                    # then we remove this event from the appropriate dicts.
                    for child_id in unseen_to_child_events.pop(event.event_id, []):
                        event_to_unseen_prevs[child_id].discard(event.event_id)

                    written_events.add(event.event_id)

                logger.info(
                    "Written %d events in room %s", len(written_events), room_id
                )

            # Extremities are the events who have at least one unseen prev event.
            extremities = (
                event_id
                for event_id, unseen_prevs in event_to_unseen_prevs.items()
                if unseen_prevs
            )
            for event_id in extremities:
                if not event_to_unseen_prevs[event_id]:
                    continue
                state = await self._state_storage_controller.get_state_for_event(
                    event_id
                )
                writer.write_state(room_id, event_id, state)

        # Get the user profile
        profile = await self.get_user(UserID.from_string(user_id))
        if profile is not None:
            writer.write_profile(profile)
            logger.info("[%s] Written profile", user_id)

        # Get all devices the user has
        devices = await self._device_handler.get_devices_by_user(user_id)
        writer.write_devices(devices)
        logger.info("[%s] Written %s devices", user_id, len(devices))

        # Get all connections the user has
        connections = await self.get_whois(UserID.from_string(user_id))
        writer.write_connections(
            connections["devices"][""]["sessions"][0]["connections"]
        )
        logger.info("[%s] Written %s connections", user_id, len(connections))

        # Get all account data the user has global and in rooms
        global_data = await self._store.get_global_account_data_for_user(user_id)
        by_room_data = await self._store.get_room_account_data_for_user(user_id)
        writer.write_account_data("global", global_data)
        for room_id in by_room_data:
            writer.write_account_data(room_id, by_room_data[room_id])
        logger.info(
            "[%s] Written account data for %s rooms", user_id, len(by_room_data)
        )

        # Get all media ids the user has
        limit = 100
        start = 0
        while True:
            media_ids, total = await self._store.get_local_media_by_user_paginate(
                start, limit, user_id
            )
            for media in media_ids:
                writer.write_media_id(media.media_id, attr.asdict(media))

            logger.info(
                "[%s] Written %d media_ids of %s",
                user_id,
                (start + len(media_ids)),
                total,
            )
            if (start + limit) >= total:
                break
            start += limit

        return writer.finished()


class ExfiltrationWriter(metaclass=abc.ABCMeta):
    """Interface used to specify how to write exported data."""

    @abc.abstractmethod
    def write_events(self, room_id: str, events: List[EventBase]) -> None:
        """Write a batch of events for a room."""
        raise NotImplementedError()

    @abc.abstractmethod
    def write_state(
        self, room_id: str, event_id: str, state: StateMap[EventBase]
    ) -> None:
        """Write the state at the given event in the room.

        This only gets called for backward extremities rather than for each
        event.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_invite(
        self, room_id: str, event: EventBase, state: StateMap[EventBase]
    ) -> None:
        """Write an invite for the room, with associated invite state.

        Args:
            room_id: The room ID the invite is for.
            event: The invite event.
            state: A subset of the state at the invite, with a subset of the
                event keys (type, state_key content and sender).
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_knock(
        self, room_id: str, event: EventBase, state: StateMap[EventBase]
    ) -> None:
        """Write a knock for the room, with associated knock state.

        Args:
            room_id: The room ID the knock is for.
            event: The knock event.
            state: A subset of the state at the knock, with a subset of the
                event keys (type, state_key content and sender).
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_profile(self, profile: JsonMapping) -> None:
        """Write the profile of a user.

        Args:
            profile: The user profile.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_devices(self, devices: Sequence[JsonMapping]) -> None:
        """Write the devices of a user.

        Args:
            devices: The list of devices.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_connections(self, connections: Sequence[JsonMapping]) -> None:
        """Write the connections of a user.

        Args:
            connections: The list of connections / sessions.
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_account_data(
        self, file_name: str, account_data: Mapping[str, JsonMapping]
    ) -> None:
        """Write the account data of a user.

        Args:
            file_name: file name to write data
            account_data: mapping of global or room account_data
        """
        raise NotImplementedError()

    @abc.abstractmethod
    def write_media_id(self, media_id: str, media_metadata: JsonMapping) -> None:
        """Write the media's metadata of a user.
        Exports only the metadata, as this can be fetched from the database via
        read only. In order to access the files, a connection to the correct
        media repository would be required.

        Args:
            media_id: ID of the media.
            media_metadata: Metadata of one media file.
        """

    @abc.abstractmethod
    def finished(self) -> Any:
        """Called when all data has successfully been exported and written.

        This functions return value is passed to the caller of
        `export_user_data`.
        """
        raise NotImplementedError()