diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 160 |
1 files changed, 140 insertions, 20 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 1958afe1d7..ee49ef235d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -33,11 +33,13 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.events.utils import prune_event_dict from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.utils import log_function from synapse.metrics import BucketCollector from synapse.metrics.background_process_metrics import run_as_background_process from synapse.state import StateResolutionStore +from synapse.storage._base import make_in_list_sql_clause from synapse.storage.background_updates import BackgroundUpdateStore from synapse.storage.event_federation import EventFederationStore from synapse.storage.events_worker import EventsWorkerStore @@ -262,6 +264,14 @@ class EventsStore( hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + def _censor_redactions(): + return run_as_background_process( + "_censor_redactions", self._censor_redactions + ) + + if self.hs.config.redaction_retention_period is not None: + hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) + @defer.inlineCallbacks def _read_forward_extremities(self): def fetch(txn): @@ -632,14 +642,16 @@ class EventsStore( LEFT JOIN rejections USING (event_id) LEFT JOIN event_json USING (event_id) WHERE - prev_event_id IN (%s) - AND NOT events.outlier + NOT events.outlier AND rejections.event_id IS NULL - """ % ( - ",".join("?" for _ in batch), + AND + """ + + clause, args = make_in_list_sql_clause( + self.database_engine, "prev_event_id", batch ) - txn.execute(sql, batch) + txn.execute(sql + clause, args) results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed")) for chunk in batch_iter(event_ids, 100): @@ -686,13 +698,15 @@ class EventsStore( LEFT JOIN rejections USING (event_id) LEFT JOIN event_json USING (event_id) WHERE - event_id IN (%s) - AND NOT events.outlier - """ % ( - ",".join("?" for _ in to_recursively_check), + NOT events.outlier + AND + """ + + clause, args = make_in_list_sql_clause( + self.database_engine, "event_id", to_recursively_check ) - txn.execute(sql, to_recursively_check) + txn.execute(sql + clause, args) to_recursively_check = [] for event_id, prev_event_id, metadata, rejected in txn: @@ -1380,6 +1394,18 @@ class EventsStore( ], ) + for event, _ in events_and_contexts: + if not event.internal_metadata.is_redacted(): + # If we're persisting an unredacted event we go and ensure + # that we mark any redactions that reference this event as + # requiring censoring. + self._simple_update_txn( + txn, + table="redactions", + keyvalues={"redacts": event.event_id}, + updatevalues={"have_censored": False}, + ) + def _store_rejected_events_txn(self, txn, events_and_contexts): """Add rows to the 'rejections' table for received events which were rejected @@ -1522,10 +1548,14 @@ class EventsStore( " FROM events as e" " LEFT JOIN rejections as rej USING (event_id)" " LEFT JOIN redactions as r ON e.event_id = r.redacts" - " WHERE e.event_id IN (%s)" - ) % (",".join(["?"] * len(ev_map)),) + " WHERE " + ) + + clause, args = make_in_list_sql_clause( + self.database_engine, "e.event_id", list(ev_map) + ) - txn.execute(sql, list(ev_map)) + txn.execute(sql + clause, args) rows = self.cursor_to_dict(txn) for row in rows: event = ev_map[row["event_id"]] @@ -1543,12 +1573,101 @@ class EventsStore( def _store_redaction(self, txn, event): # invalidate the cache for the redacted event txn.call_after(self._invalidate_get_event_cache, event.redacts) - txn.execute( - "INSERT INTO redactions (event_id, redacts) VALUES (?,?)", - (event.event_id, event.redacts), + + self._simple_insert_txn( + txn, + table="redactions", + values={ + "event_id": event.event_id, + "redacts": event.redacts, + "received_ts": self._clock.time_msec(), + }, ) @defer.inlineCallbacks + def _censor_redactions(self): + """Censors all redactions older than the configured period that haven't + been censored yet. + + By censor we mean update the event_json table with the redacted event. + + Returns: + Deferred + """ + + if self.hs.config.redaction_retention_period is None: + return + + before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period + + # We fetch all redactions that: + # 1. point to an event we have, + # 2. has a received_ts from before the cut off, and + # 3. we haven't yet censored. + # + # This is limited to 100 events to ensure that we don't try and do too + # much at once. We'll get called again so this should eventually catch + # up. + sql = """ + SELECT redactions.event_id, redacts FROM redactions + LEFT JOIN events AS original_event ON ( + redacts = original_event.event_id + ) + WHERE NOT have_censored + AND redactions.received_ts <= ? + ORDER BY redactions.received_ts ASC + LIMIT ? + """ + + rows = yield self._execute( + "_censor_redactions_fetch", None, sql, before_ts, 100 + ) + + updates = [] + + for redaction_id, event_id in rows: + redaction_event = yield self.get_event(redaction_id, allow_none=True) + original_event = yield self.get_event( + event_id, allow_rejected=True, allow_none=True + ) + + # The SQL above ensures that we have both the redaction and + # original event, so if the `get_event` calls return None it + # means that the redaction wasn't allowed. Either way we know that + # the result won't change so we mark the fact that we've checked. + if ( + redaction_event + and original_event + and original_event.internal_metadata.is_redacted() + ): + # Redaction was allowed + pruned_json = encode_json(prune_event_dict(original_event.get_dict())) + else: + # Redaction wasn't allowed + pruned_json = None + + updates.append((redaction_id, event_id, pruned_json)) + + def _update_censor_txn(txn): + for redaction_id, event_id, pruned_json in updates: + if pruned_json: + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + + self._simple_update_one_txn( + txn, + table="redactions", + keyvalues={"event_id": redaction_id}, + updatevalues={"have_censored": True}, + ) + + yield self.runInteraction("_update_censor_txn", _update_censor_txn) + + @defer.inlineCallbacks def count_daily_messages(self): """ Returns an estimate of the number of messages sent in the last day. @@ -2139,11 +2258,12 @@ class EventsStore( sql = """ SELECT DISTINCT state_group FROM event_to_state_groups LEFT JOIN events_to_purge AS ep USING (event_id) - WHERE state_group IN (%s) AND ep.event_id IS NULL - """ % ( - ",".join("?" for _ in current_search), + WHERE ep.event_id IS NULL AND + """ + clause, args = make_in_list_sql_clause( + txn.database_engine, "state_group", current_search ) - txn.execute(sql, list(current_search)) + txn.execute(sql + clause, list(args)) referenced = set(sg for sg, in txn) referenced_groups |= referenced |