diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 103 |
1 files changed, 102 insertions, 1 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d74f667d79..6de125d2f9 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,7 +23,7 @@ from functools import wraps from six import iteritems, text_type from six.moves import range -from canonicaljson import json +from canonicaljson import encode_canonical_json, json from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.events.utils import prune_event_dict from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.utils import log_function from synapse.metrics import BucketCollector @@ -262,6 +263,14 @@ class EventsStore( hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + def _censor_redactions(): + return run_as_background_process( + "_censor_redactions", self._censor_redactions + ) + + if self.hs.config.redaction_retention_period is not None: + hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) + @defer.inlineCallbacks def _read_forward_extremities(self): def fetch(txn): @@ -1552,6 +1561,98 @@ class EventsStore( ) @defer.inlineCallbacks + def _censor_redactions(self): + """Censors all redactions older than the configured period that haven't + been censored yet. + + By censor we mean update the event_json table with the redacted event. + + Returns: + Deferred + """ + + if self.hs.config.redaction_retention_period is None: + return + + max_pos = yield self.find_first_stream_ordering_after_ts( + self._clock.time_msec() - self.hs.config.redaction_retention_period + ) + + # We fetch all redactions that: + # 1. point to an event we have, + # 2. has a stream ordering from before the cut off, and + # 3. we haven't yet censored. + # + # This is limited to 100 events to ensure that we don't try and do too + # much at once. We'll get called again so this should eventually catch + # up. + # + # We use the range [-max_pos, max_pos] to handle backfilled events, + # which are given negative stream ordering. + sql = """ + SELECT redact_event.event_id, redacts FROM redactions + INNER JOIN events AS redact_event USING (event_id) + INNER JOIN events AS original_event ON ( + redact_event.room_id = original_event.room_id + AND redacts = original_event.event_id + ) + WHERE NOT have_censored + AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ? + ORDER BY redact_event.stream_ordering ASC + LIMIT ? + """ + + rows = yield self._execute( + "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100 + ) + + updates = [] + + for redaction_id, event_id in rows: + redaction_event = yield self.get_event(redaction_id, allow_none=True) + original_event = yield self.get_event( + event_id, allow_rejected=True, allow_none=True + ) + + # The SQL above ensures that we have both the redaction and + # original event, so if the `get_event` calls return None it + # means that the redaction wasn't allowed. Either way we know that + # the result won't change so we mark the fact that we've checked. + if ( + redaction_event + and original_event + and original_event.internal_metadata.is_redacted() + ): + # Redaction was allowed + pruned_json = encode_canonical_json( + prune_event_dict(original_event.get_dict()) + ) + else: + # Redaction wasn't allowed + pruned_json = None + + updates.append((redaction_id, event_id, pruned_json)) + + def _update_censor_txn(txn): + for redaction_id, event_id, pruned_json in updates: + if pruned_json: + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + + self._simple_update_one_txn( + txn, + table="redactions", + keyvalues={"event_id": redaction_id}, + updatevalues={"have_censored": True}, + ) + + yield self.runInteraction("_update_censor_txn", _update_censor_txn) + + @defer.inlineCallbacks def count_daily_messages(self): """ Returns an estimate of the number of messages sent in the last day. |