diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 282 |
1 files changed, 261 insertions, 21 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index ac876287fc..2e485c8644 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,7 +23,7 @@ from functools import wraps from six import iteritems, text_type from six.moves import range -from canonicaljson import json +from canonicaljson import encode_canonical_json, json from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.events.utils import prune_event_dict from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.utils import log_function from synapse.metrics import BucketCollector @@ -262,6 +263,14 @@ class EventsStore( hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + def _censor_redactions(): + return run_as_background_process( + "_censor_redactions", self._censor_redactions + ) + + if self.hs.config.redaction_retention_period is not None: + hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) + @defer.inlineCallbacks def _read_forward_extremities(self): def fetch(txn): @@ -810,7 +819,7 @@ class EventsStore( # If they old and new groups are the same then we don't need to do # anything. if old_state_groups == new_state_groups: - return (None, None) + return None, None if len(new_state_groups) == 1 and len(old_state_groups) == 1: # If we're going from one state group to another, lets check if @@ -827,7 +836,7 @@ class EventsStore( # the current state in memory then lets also return that, # but it doesn't matter if we don't. new_state = state_groups_map.get(new_state_group) - return (new_state, delta_ids) + return new_state, delta_ids # Now that we have calculated new_state_groups we need to get # their state IDs so we can resolve to a single state set. @@ -839,7 +848,7 @@ class EventsStore( if len(new_state_groups) == 1: # If there is only one state group, then we know what the current # state is. - return (state_groups_map[new_state_groups.pop()], None) + return state_groups_map[new_state_groups.pop()], None # Ok, we need to defer to the state handler to resolve our state sets. @@ -868,7 +877,7 @@ class EventsStore( state_res_store=StateResolutionStore(self), ) - return (res.state, None) + return res.state, None @defer.inlineCallbacks def _calculate_state_delta(self, room_id, current_state): @@ -891,7 +900,7 @@ class EventsStore( if ev_id != existing_state.get(key) } - return (to_delete, to_insert) + return to_delete, to_insert @log_function def _persist_events_txn( @@ -1302,15 +1311,11 @@ class EventsStore( "event_reference_hashes", "event_search", "event_to_state_groups", - "guest_access", - "history_visibility", "local_invites", - "room_names", "state_events", "rejections", "redactions", "room_memberships", - "topics", ): txn.executemany( "DELETE FROM %s WHERE event_id = ?" % (table,), @@ -1384,6 +1389,18 @@ class EventsStore( ], ) + for event, _ in events_and_contexts: + if not event.internal_metadata.is_redacted(): + # If we're persisting an unredacted event we go and ensure + # that we mark any redactions that reference this event as + # requiring censoring. + self._simple_update_txn( + txn, + table="redactions", + keyvalues={"redacts": event.event_id}, + updatevalues={"have_censored": False}, + ) + def _store_rejected_events_txn(self, txn, events_and_contexts): """Add rows to the 'rejections' table for received events which were rejected @@ -1454,10 +1471,10 @@ class EventsStore( for event, _ in events_and_contexts: if event.type == EventTypes.Name: - # Insert into the room_names and event_search tables. + # Insert into the event_search table. self._store_room_name_txn(txn, event) elif event.type == EventTypes.Topic: - # Insert into the topics table and event_search table. + # Insert into the event_search table. self._store_room_topic_txn(txn, event) elif event.type == EventTypes.Message: # Insert into the event_search table. @@ -1465,12 +1482,6 @@ class EventsStore( elif event.type == EventTypes.Redaction: # Insert into the redactions table. self._store_redaction(txn, event) - elif event.type == EventTypes.RoomHistoryVisibility: - # Insert into the event_search table. - self._store_history_visibility_txn(txn, event) - elif event.type == EventTypes.GuestAccess: - # Insert into the event_search table. - self._store_guest_access_txn(txn, event) self._handle_event_relations(txn, event) @@ -1553,11 +1564,102 @@ class EventsStore( def _store_redaction(self, txn, event): # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) - txn.execute( - "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts), + + self._simple_insert_txn( + txn, + table="redactions", + values={ + "event_id": event.event_id, + "redacts": event.redacts, + "received_ts": self._clock.time_msec(), + }, + ) + + @defer.inlineCallbacks + def _censor_redactions(self): + """Censors all redactions older than the configured period that haven't + been censored yet. + + By censor we mean update the event_json table with the redacted event. + + Returns: + Deferred + """ + + if self.hs.config.redaction_retention_period is None: + return + + before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period + + # We fetch all redactions that: + # 1. point to an event we have, + # 2. has a received_ts from before the cut off, and + # 3. we haven't yet censored. + # + # This is limited to 100 events to ensure that we don't try and do too + # much at once. We'll get called again so this should eventually catch + # up. + sql = """ + SELECT redactions.event_id, redacts FROM redactions + LEFT JOIN events AS original_event ON ( + redacts = original_event.event_id + ) + WHERE NOT have_censored + AND redactions.received_ts <= ? + ORDER BY redactions.received_ts ASC + LIMIT ? + """ + + rows = yield self._execute( + "_censor_redactions_fetch", None, sql, before_ts, 100 ) + updates = [] + + for redaction_id, event_id in rows: + redaction_event = yield self.get_event(redaction_id, allow_none=True) + original_event = yield self.get_event( + event_id, allow_rejected=True, allow_none=True + ) + + # The SQL above ensures that we have both the redaction and + # original event, so if the `get_event` calls return None it + # means that the redaction wasn't allowed. Either way we know that + # the result won't change so we mark the fact that we've checked. + if ( + redaction_event + and original_event + and original_event.internal_metadata.is_redacted() + ): + # Redaction was allowed + pruned_json = encode_canonical_json( + prune_event_dict(original_event.get_dict()) + ) + else: + # Redaction wasn't allowed + pruned_json = None + + updates.append((redaction_id, event_id, pruned_json)) + + def _update_censor_txn(txn): + for redaction_id, event_id, pruned_json in updates: + if pruned_json: + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + + self._simple_update_one_txn( + txn, + table="redactions", + keyvalues={"event_id": redaction_id}, + updatevalues={"have_censored": True}, + ) + + yield self.runInteraction("_update_censor_txn", _update_censor_txn) + @defer.inlineCallbacks def count_daily_messages(self): """ @@ -2191,6 +2293,144 @@ class EventsStore( return to_delete, to_dedelta + def purge_room(self, room_id): + """Deletes all record of a room + + Args: + room_id (str): + """ + + return self.runInteraction("purge_room", self._purge_room_txn, room_id) + + def _purge_room_txn(self, txn, room_id): + # first we have to delete the state groups states + logger.info("[purge] removing %s from state_groups_state", room_id) + + txn.execute( + """ + DELETE FROM state_groups_state WHERE state_group IN ( + SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) + WHERE events.room_id=? + ) + """, + (room_id,), + ) + + # ... and the state group edges + logger.info("[purge] removing %s from state_group_edges", room_id) + + txn.execute( + """ + DELETE FROM state_group_edges WHERE state_group IN ( + SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) + WHERE events.room_id=? + ) + """, + (room_id,), + ) + + # ... and the state groups + logger.info("[purge] removing %s from state_groups", room_id) + + txn.execute( + """ + DELETE FROM state_groups WHERE id IN ( + SELECT state_group FROM events JOIN event_to_state_groups USING(event_id) + WHERE events.room_id=? + ) + """, + (room_id,), + ) + + # and then tables which lack an index on room_id but have one on event_id + for table in ( + "event_auth", + "event_edges", + "event_push_actions_staging", + "event_reference_hashes", + "event_relations", + "event_to_state_groups", + "redactions", + "rejections", + "state_events", + ): + logger.info("[purge] removing %s from %s", room_id, table) + + txn.execute( + """ + DELETE FROM %s WHERE event_id IN ( + SELECT event_id FROM events WHERE room_id=? + ) + """ + % (table,), + (room_id,), + ) + + # and finally, the tables with an index on room_id (or no useful index) + for table in ( + "current_state_events", + "event_backward_extremities", + "event_forward_extremities", + "event_json", + "event_push_actions", + "event_search", + "events", + "group_rooms", + "public_room_list_stream", + "receipts_graph", + "receipts_linearized", + "room_aliases", + "room_depth", + "room_memberships", + "room_stats_state", + "room_stats_current", + "room_stats_historical", + "room_stats_earliest_token", + "rooms", + "stream_ordering_to_exterm", + "topics", + "users_in_public_rooms", + "users_who_share_private_rooms", + # no useful index, but let's clear them anyway + "appservice_room_list", + "e2e_room_keys", + "event_push_summary", + "pusher_throttle", + "group_summary_rooms", + "local_invites", + "room_account_data", + "room_tags", + ): + logger.info("[purge] removing %s from %s", room_id, table) + txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,)) + + # Other tables we do NOT need to clear out: + # + # - blocked_rooms + # This is important, to make sure that we don't accidentally rejoin a blocked + # room after it was purged + # + # - user_directory + # This has a room_id column, but it is unused + # + + # Other tables that we might want to consider clearing out include: + # + # - event_reports + # Given that these are intended for abuse management my initial + # inclination is to leave them in place. + # + # - current_state_delta_stream + # - ex_outlier_stream + # - room_tags_revisions + # The problem with these is that they are largeish and there is no room_id + # index on them. In any case we should be clearing out 'stream' tables + # periodically anyway (#5888) + + # TODO: we could probably usefully do a bunch of cache invalidation here + + logger.info("[purge] done") + @defer.inlineCallbacks def is_event_after(self, event_id1, event_id2): """Returns True if event_id1 is after event_id2 in the stream |