diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/admin.py | 250 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 20 | ||||
-rw-r--r-- | synapse/storage/stream.py | 16 |
3 files changed, 280 insertions, 6 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 941ebfa107..f06914a378 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -14,9 +14,17 @@ # limitations under the License. import logging +import os +import tempfile + +from canonicaljson import json from twisted.internet import defer +from synapse.api.constants import Membership +from synapse.types import RoomStreamToken +from synapse.visibility import filter_events_for_client + from ._base import BaseHandler logger = logging.getLogger(__name__) @@ -89,3 +97,245 @@ class AdminHandler(BaseHandler): ret = yield self.store.search_users(term) defer.returnValue(ret) + + @defer.inlineCallbacks + def export_user_data(self, user_id, writer): + """Write all data we have on the user to the given writer. + + Args: + user_id (str) + writer (ExfiltrationWriter) + + Returns: + defer.Deferred: Resolves when all data for a user has been written. + The returned value is that returned by `writer.finished()`. + """ + # Get all rooms the user is in or has been in + rooms = yield self.store.get_rooms_for_user_where_membership_is( + user_id, + membership_list=( + Membership.JOIN, + Membership.LEAVE, + Membership.BAN, + Membership.INVITE, + ), + ) + + # We only try and fetch events for rooms the user has been in. If + # they've been e.g. invited to a room without joining then we handle + # those seperately. + rooms_user_has_been_in = yield self.store.get_rooms_user_has_been_in(user_id) + + for index, room in enumerate(rooms): + room_id = room.room_id + + logger.info( + "[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms) + ) + + forgotten = yield self.store.did_forget(user_id, room_id) + if forgotten: + logger.info("[%s] User forgot room %d, ignoring", user_id, room_id) + continue + + if room_id not in rooms_user_has_been_in: + # If we haven't been in the rooms then the filtering code below + # won't return anything, so we need to handle these cases + # explicitly. + + if room.membership == Membership.INVITE: + event_id = room.event_id + invite = yield self.store.get_event(event_id, allow_none=True) + if invite: + invited_state = invite.unsigned["invite_room_state"] + writer.write_invite(room_id, invite, invited_state) + + continue + + # We only want to bother fetching events up to the last time they + # were joined. We estimate that point by looking at the + # stream_ordering of the last membership if it wasn't a join. + if room.membership == Membership.JOIN: + stream_ordering = yield self.store.get_room_max_stream_ordering() + else: + stream_ordering = room.stream_ordering + + from_key = str(RoomStreamToken(0, 0)) + to_key = str(RoomStreamToken(None, stream_ordering)) + + written_events = set() # Events that we've processed in this room + + # We need to track gaps in the events stream so that we can then + # write out the state at those events. We do this by keeping track + # of events whose prev events we haven't seen. + + # Map from event ID to prev events that haven't been processed, + # dict[str, set[str]]. + event_to_unseen_prevs = {} + + # The reverse mapping to above, i.e. map from unseen event to events + # that have the unseen event in their prev_events, i.e. the unseen + # events "children". dict[str, set[str]] + unseen_to_child_events = {} + + # We fetch events in the room the user could see by fetching *all* + # events that we have and then filtering, this isn't the most + # efficient method perhaps but it does guarantee we get everything. + while True: + events, _ = yield self.store.paginate_room_events( + room_id, from_key, to_key, limit=100, direction="f" + ) + if not events: + break + + from_key = events[-1].internal_metadata.after + + events = yield filter_events_for_client(self.store, user_id, events) + + writer.write_events(room_id, events) + + # Update the extremity tracking dicts + for event in events: + # Check if we have any prev events that haven't been + # processed yet, and add those to the appropriate dicts. + unseen_events = set(event.prev_event_ids()) - written_events + if unseen_events: + event_to_unseen_prevs[event.event_id] = unseen_events + for unseen in unseen_events: + unseen_to_child_events.setdefault(unseen, set()).add( + event.event_id + ) + + # Now check if this event is an unseen prev event, if so + # then we remove this event from the appropriate dicts. + for child_id in unseen_to_child_events.pop(event.event_id, []): + event_to_unseen_prevs[child_id].discard(event.event_id) + + written_events.add(event.event_id) + + logger.info( + "Written %d events in room %s", len(written_events), room_id + ) + + # Extremities are the events who have at least one unseen prev event. + extremities = ( + event_id + for event_id, unseen_prevs in event_to_unseen_prevs.items() + if unseen_prevs + ) + for event_id in extremities: + if not event_to_unseen_prevs[event_id]: + continue + state = yield self.store.get_state_for_event(event_id) + writer.write_state(room_id, event_id, state) + + defer.returnValue(writer.finished()) + + +class ExfiltrationWriter(object): + """Interface used to specify how to write exported data. + """ + + def write_events(self, room_id, events): + """Write a batch of events for a room. + + Args: + room_id (str) + events (list[FrozenEvent]) + """ + pass + + def write_state(self, room_id, event_id, state): + """Write the state at the given event in the room. + + This only gets called for backward extremities rather than for each + event. + + Args: + room_id (str) + event_id (str) + state (dict[tuple[str, str], FrozenEvent]) + """ + pass + + def write_invite(self, room_id, event, state): + """Write an invite for the room, with associated invite state. + + Args: + room_id (str) + event (FrozenEvent) + state (dict[tuple[str, str], dict]): A subset of the state at the + invite, with a subset of the event keys (type, state_key + content and sender) + """ + + def finished(self): + """Called when all data has succesfully been exported and written. + + This functions return value is passed to the caller of + `export_user_data`. + """ + pass + + +class FileExfiltrationWriter(ExfiltrationWriter): + """An ExfiltrationWriter that writes the user's data to a directory. + + Returns the directory location on completion. + + Args: + user_id (str): The user whose data is being exported. + directory (str|None): The directory to write the data to. If None then + will write to a temporary directory. + """ + + def __init__(self, user_id, directory=None): + self.user_id = user_id + + if directory: + self.base_directory = directory + else: + self.base_directory = tempfile.mkdtemp( + prefix="synapse-exported__%s__" % (user_id,) + ) + + os.makedirs(self.base_directory, exist_ok=True) + if list(os.listdir(self.base_directory)): + raise Exception("Directory must be empty") + + def write_events(self, room_id, events): + room_directory = os.path.join(self.base_directory, "rooms", room_id) + os.makedirs(room_directory, exist_ok=True) + events_file = os.path.join(room_directory, "events") + + with open(events_file, "a") as f: + for event in events: + print(json.dumps(event.get_pdu_json()), file=f) + + def write_state(self, room_id, event_id, state): + room_directory = os.path.join(self.base_directory, "rooms", room_id) + state_directory = os.path.join(room_directory, "state") + os.makedirs(state_directory, exist_ok=True) + + event_file = os.path.join(state_directory, event_id) + + with open(event_file, "a") as f: + for event in state.values(): + print(json.dumps(event.get_pdu_json()), file=f) + + def write_invite(self, room_id, event, state): + self.write_events(room_id, [event]) + + # We write the invite state somewhere else as they aren't full events + # and are only a subset of the state at the event. + room_directory = os.path.join(self.base_directory, "rooms", room_id) + os.makedirs(room_directory, exist_ok=True) + + invite_state = os.path.join(room_directory, "invite_state") + + with open(invite_state, "a") as f: + for event in state.values(): + print(json.dumps(event), file=f) + + def finished(self): + return self.base_directory diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 8004aeb909..32cfd010a5 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -575,6 +575,26 @@ class RoomMemberWorkerStore(EventsWorkerStore): count = yield self.runInteraction("did_forget_membership", f) defer.returnValue(count == 0) + @defer.inlineCallbacks + def get_rooms_user_has_been_in(self, user_id): + """Get all rooms that the user has ever been in. + + Args: + user_id (str) + + Returns: + Deferred[set[str]]: Set of room IDs. + """ + + room_ids = yield self._simple_select_onecol( + table="room_memberships", + keyvalues={"membership": Membership.JOIN, "user_id": user_id}, + retcol="room_id", + desc="get_rooms_user_has_been_in", + ) + + return set(room_ids) + class RoomMemberStore(RoomMemberWorkerStore): def __init__(self, db_conn, hs): diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 386a9dbe14..f8e3007d67 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -833,7 +833,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): Returns: Deferred[tuple[list[_EventDictReturn], str]]: Returns the results as a list of _EventDictReturn and a token that points to the end - of the result set. + of the result set. If no events are returned then the end of the + stream has been reached (i.e. there are no events between + `from_token` and `to_token`). """ assert int(limit) >= 0 @@ -905,15 +907,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): only those before direction(char): Either 'b' or 'f' to indicate whether we are paginating forwards or backwards from `from_key`. - limit (int): The maximum number of events to return. Zero or less - means no limit. + limit (int): The maximum number of events to return. event_filter (Filter|None): If provided filters the events to those that match the filter. Returns: - tuple[list[dict], str]: Returns the results as a list of dicts and - a token that points to the end of the result set. The dicts have - the keys "event_id", "topological_ordering" and "stream_orderign". + tuple[list[FrozenEvents], str]: Returns the results as a list of + dicts and a token that points to the end of the result set. The + dicts have the keys "event_id", "topological_ordering" and + "stream_ordering". If no events are returned then the end of the + stream has been reached (i.e. there are no events between + `from_key` and `to_key`). """ from_key = RoomStreamToken.parse(from_key) |