summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/admin.py250
-rw-r--r--synapse/storage/roommember.py20
-rw-r--r--synapse/storage/stream.py16
3 files changed, 280 insertions, 6 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 941ebfa107..f06914a378 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -14,9 +14,17 @@
 # limitations under the License.
 
 import logging
+import os
+import tempfile
+
+from canonicaljson import json
 
 from twisted.internet import defer
 
+from synapse.api.constants import Membership
+from synapse.types import RoomStreamToken
+from synapse.visibility import filter_events_for_client
+
 from ._base import BaseHandler
 
 logger = logging.getLogger(__name__)
@@ -89,3 +97,245 @@ class AdminHandler(BaseHandler):
         ret = yield self.store.search_users(term)
 
         defer.returnValue(ret)
+
+    @defer.inlineCallbacks
+    def export_user_data(self, user_id, writer):
+        """Write all data we have on the user to the given writer.
+
+        Args:
+            user_id (str)
+            writer (ExfiltrationWriter)
+
+        Returns:
+            defer.Deferred: Resolves when all data for a user has been written.
+            The returned value is that returned by `writer.finished()`.
+        """
+        # Get all rooms the user is in or has been in
+        rooms = yield self.store.get_rooms_for_user_where_membership_is(
+            user_id,
+            membership_list=(
+                Membership.JOIN,
+                Membership.LEAVE,
+                Membership.BAN,
+                Membership.INVITE,
+            ),
+        )
+
+        # We only try and fetch events for rooms the user has been in. If
+        # they've been e.g. invited to a room without joining then we handle
+        # those seperately.
+        rooms_user_has_been_in = yield self.store.get_rooms_user_has_been_in(user_id)
+
+        for index, room in enumerate(rooms):
+            room_id = room.room_id
+
+            logger.info(
+                "[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
+            )
+
+            forgotten = yield self.store.did_forget(user_id, room_id)
+            if forgotten:
+                logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
+                continue
+
+            if room_id not in rooms_user_has_been_in:
+                # If we haven't been in the rooms then the filtering code below
+                # won't return anything, so we need to handle these cases
+                # explicitly.
+
+                if room.membership == Membership.INVITE:
+                    event_id = room.event_id
+                    invite = yield self.store.get_event(event_id, allow_none=True)
+                    if invite:
+                        invited_state = invite.unsigned["invite_room_state"]
+                        writer.write_invite(room_id, invite, invited_state)
+
+                continue
+
+            # We only want to bother fetching events up to the last time they
+            # were joined. We estimate that point by looking at the
+            # stream_ordering of the last membership if it wasn't a join.
+            if room.membership == Membership.JOIN:
+                stream_ordering = yield self.store.get_room_max_stream_ordering()
+            else:
+                stream_ordering = room.stream_ordering
+
+            from_key = str(RoomStreamToken(0, 0))
+            to_key = str(RoomStreamToken(None, stream_ordering))
+
+            written_events = set()  # Events that we've processed in this room
+
+            # We need to track gaps in the events stream so that we can then
+            # write out the state at those events. We do this by keeping track
+            # of events whose prev events we haven't seen.
+
+            # Map from event ID to prev events that haven't been processed,
+            # dict[str, set[str]].
+            event_to_unseen_prevs = {}
+
+            # The reverse mapping to above, i.e. map from unseen event to events
+            # that have the unseen event in their prev_events, i.e. the unseen
+            # events "children". dict[str, set[str]]
+            unseen_to_child_events = {}
+
+            # We fetch events in the room the user could see by fetching *all*
+            # events that we have and then filtering, this isn't the most
+            # efficient method perhaps but it does guarantee we get everything.
+            while True:
+                events, _ = yield self.store.paginate_room_events(
+                    room_id, from_key, to_key, limit=100, direction="f"
+                )
+                if not events:
+                    break
+
+                from_key = events[-1].internal_metadata.after
+
+                events = yield filter_events_for_client(self.store, user_id, events)
+
+                writer.write_events(room_id, events)
+
+                # Update the extremity tracking dicts
+                for event in events:
+                    # Check if we have any prev events that haven't been
+                    # processed yet, and add those to the appropriate dicts.
+                    unseen_events = set(event.prev_event_ids()) - written_events
+                    if unseen_events:
+                        event_to_unseen_prevs[event.event_id] = unseen_events
+                        for unseen in unseen_events:
+                            unseen_to_child_events.setdefault(unseen, set()).add(
+                                event.event_id
+                            )
+
+                    # Now check if this event is an unseen prev event, if so
+                    # then we remove this event from the appropriate dicts.
+                    for child_id in unseen_to_child_events.pop(event.event_id, []):
+                        event_to_unseen_prevs[child_id].discard(event.event_id)
+
+                    written_events.add(event.event_id)
+
+                logger.info(
+                    "Written %d events in room %s", len(written_events), room_id
+                )
+
+            # Extremities are the events who have at least one unseen prev event.
+            extremities = (
+                event_id
+                for event_id, unseen_prevs in event_to_unseen_prevs.items()
+                if unseen_prevs
+            )
+            for event_id in extremities:
+                if not event_to_unseen_prevs[event_id]:
+                    continue
+                state = yield self.store.get_state_for_event(event_id)
+                writer.write_state(room_id, event_id, state)
+
+        defer.returnValue(writer.finished())
+
+
+class ExfiltrationWriter(object):
+    """Interface used to specify how to write exported data.
+    """
+
+    def write_events(self, room_id, events):
+        """Write a batch of events for a room.
+
+        Args:
+            room_id (str)
+            events (list[FrozenEvent])
+        """
+        pass
+
+    def write_state(self, room_id, event_id, state):
+        """Write the state at the given event in the room.
+
+        This only gets called for backward extremities rather than for each
+        event.
+
+        Args:
+            room_id (str)
+            event_id (str)
+            state (dict[tuple[str, str], FrozenEvent])
+        """
+        pass
+
+    def write_invite(self, room_id, event, state):
+        """Write an invite for the room, with associated invite state.
+
+        Args:
+            room_id (str)
+            event (FrozenEvent)
+            state (dict[tuple[str, str], dict]): A subset of the state at the
+                invite, with a subset of the event keys (type, state_key
+                content and sender)
+        """
+
+    def finished(self):
+        """Called when all data has succesfully been exported and written.
+
+        This functions return value is passed to the caller of
+        `export_user_data`.
+        """
+        pass
+
+
+class FileExfiltrationWriter(ExfiltrationWriter):
+    """An ExfiltrationWriter that writes the user's data to a directory.
+
+    Returns the directory location on completion.
+
+    Args:
+        user_id (str): The user whose data is being exported.
+        directory (str|None): The directory to write the data to. If None then
+            will write to a temporary directory.
+    """
+
+    def __init__(self, user_id, directory=None):
+        self.user_id = user_id
+
+        if directory:
+            self.base_directory = directory
+        else:
+            self.base_directory = tempfile.mkdtemp(
+                prefix="synapse-exported__%s__" % (user_id,)
+            )
+
+        os.makedirs(self.base_directory, exist_ok=True)
+        if list(os.listdir(self.base_directory)):
+            raise Exception("Directory must be empty")
+
+    def write_events(self, room_id, events):
+        room_directory = os.path.join(self.base_directory, "rooms", room_id)
+        os.makedirs(room_directory, exist_ok=True)
+        events_file = os.path.join(room_directory, "events")
+
+        with open(events_file, "a") as f:
+            for event in events:
+                print(json.dumps(event.get_pdu_json()), file=f)
+
+    def write_state(self, room_id, event_id, state):
+        room_directory = os.path.join(self.base_directory, "rooms", room_id)
+        state_directory = os.path.join(room_directory, "state")
+        os.makedirs(state_directory, exist_ok=True)
+
+        event_file = os.path.join(state_directory, event_id)
+
+        with open(event_file, "a") as f:
+            for event in state.values():
+                print(json.dumps(event.get_pdu_json()), file=f)
+
+    def write_invite(self, room_id, event, state):
+        self.write_events(room_id, [event])
+
+        # We write the invite state somewhere else as they aren't full events
+        # and are only a subset of the state at the event.
+        room_directory = os.path.join(self.base_directory, "rooms", room_id)
+        os.makedirs(room_directory, exist_ok=True)
+
+        invite_state = os.path.join(room_directory, "invite_state")
+
+        with open(invite_state, "a") as f:
+            for event in state.values():
+                print(json.dumps(event), file=f)
+
+    def finished(self):
+        return self.base_directory
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 8004aeb909..32cfd010a5 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -575,6 +575,26 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         count = yield self.runInteraction("did_forget_membership", f)
         defer.returnValue(count == 0)
 
+    @defer.inlineCallbacks
+    def get_rooms_user_has_been_in(self, user_id):
+        """Get all rooms that the user has ever been in.
+
+        Args:
+            user_id (str)
+
+        Returns:
+            Deferred[set[str]]: Set of room IDs.
+        """
+
+        room_ids = yield self._simple_select_onecol(
+            table="room_memberships",
+            keyvalues={"membership": Membership.JOIN, "user_id": user_id},
+            retcol="room_id",
+            desc="get_rooms_user_has_been_in",
+        )
+
+        return set(room_ids)
+
 
 class RoomMemberStore(RoomMemberWorkerStore):
     def __init__(self, db_conn, hs):
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 386a9dbe14..f8e3007d67 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -833,7 +833,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         Returns:
             Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
             as a list of _EventDictReturn and a token that points to the end
-            of the result set.
+            of the result set. If no events are returned then the end of the
+            stream has been reached (i.e. there are no events between
+            `from_token` and `to_token`).
         """
 
         assert int(limit) >= 0
@@ -905,15 +907,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 only those before
             direction(char): Either 'b' or 'f' to indicate whether we are
                 paginating forwards or backwards from `from_key`.
-            limit (int): The maximum number of events to return. Zero or less
-                means no limit.
+            limit (int): The maximum number of events to return.
             event_filter (Filter|None): If provided filters the events to
                 those that match the filter.
 
         Returns:
-            tuple[list[dict], str]: Returns the results as a list of dicts and
-            a token that points to the end of the result set. The dicts have
-            the keys "event_id", "topological_ordering" and "stream_orderign".
+            tuple[list[FrozenEvents], str]: Returns the results as a list of
+            dicts and a token that points to the end of the result set. The
+            dicts have the keys "event_id", "topological_ordering" and
+            "stream_ordering". If no events are returned then the end of the
+            stream has been reached (i.e. there are no events between
+            `from_key` and `to_key`).
         """
 
         from_key = RoomStreamToken.parse(from_key)