From a4bf72c30c5953b721a64eae89db186fa8735bb3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Aug 2019 17:38:51 +0100 Subject: Censor redactions in DB after a month --- synapse/storage/events.py | 88 +++++++++++++++++++++- .../storage/schema/delta/56/redaction_censor.sql | 17 +++++ 2 files changed, 104 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/56/redaction_censor.sql (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a95c36a8b..2970da6829 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -23,7 +23,7 @@ from functools import wraps from six import iteritems, text_type from six.moves import range -from canonicaljson import json +from canonicaljson import encode_canonical_json, json from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.events.utils import prune_event_dict from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.utils import log_function from synapse.metrics import BucketCollector @@ -262,6 +263,13 @@ class EventsStore( hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + def _censor_redactions(): + return run_as_background_process( + "_censor_redactions", self._censor_redactions + ) + + hs.get_clock().looping_call(_censor_redactions, 10 * 60 * 1000) + @defer.inlineCallbacks def _read_forward_extremities(self): def fetch(txn): @@ -1548,6 +1556,84 @@ class EventsStore( (event.event_id, event.redacts), ) + @defer.inlineCallbacks + def _censor_redactions(self): + """Censors all redactions older than a month that haven't been censored. + + By censor we mean update the event_json table with the redacted event. + + Returns: + Deferred + """ + + if self.stream_ordering_month_ago is None: + return + + max_pos = self.stream_ordering_month_ago + + # We fetch all redactions that point to an event that we have that has + # a stream ordering from over a month ago, that we haven't yet censored + # in the DB. + sql = """ + SELECT er.event_id, redacts FROM redactions + INNER JOIN events AS er USING (event_id) + INNER JOIN events AS eb ON (er.room_id = eb.room_id AND redacts = eb.event_id) + WHERE NOT have_censored + AND ? <= er.stream_ordering AND er.stream_ordering <= ? + ORDER BY er.stream_ordering ASC + LIMIT ? + """ + + rows = yield self._execute( + "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100 + ) + + updates = [] + + for redaction_id, event_id in rows: + redaction_event = yield self.get_event(redaction_id, allow_none=True) + original_event = yield self.get_event( + event_id, allow_rejected=True, allow_none=True + ) + + # The SQL above ensures that we have both the redaction and + # original event, so if the `get_event` calls return None it + # means that the redaction wasn't allowed. Either way we know that + # the result won't change so we mark the fact that we've checked. + if ( + redaction_event + and original_event + and original_event.internal_metadata.is_redacted() + ): + # Redaction was allowed + pruned_json = encode_canonical_json( + prune_event_dict(original_event.get_dict()) + ) + else: + # Redaction wasn't allowed + pruned_json = None + + updates.append((redaction_id, event_id, pruned_json)) + + def _update_censor_txn(txn): + for redaction_id, event_id, pruned_json in updates: + if pruned_json: + self._simple_update_one_txn( + txn, + table="event_json", + keyvalues={"event_id": event_id}, + updatevalues={"json": pruned_json}, + ) + + self._simple_update_one_txn( + txn, + table="redactions", + keyvalues={"event_id": redaction_id}, + updatevalues={"have_censored": True}, + ) + + yield self.runInteraction("_update_censor_txn", _update_censor_txn) + @defer.inlineCallbacks def count_daily_messages(self): """ diff --git a/synapse/storage/schema/delta/56/redaction_censor.sql b/synapse/storage/schema/delta/56/redaction_censor.sql new file mode 100644 index 0000000000..fe51b02309 --- /dev/null +++ b/synapse/storage/schema/delta/56/redaction_censor.sql @@ -0,0 +1,17 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +ALTER TABLE redactions ADD COLUMN have_censored BOOL NOT NULL DEFAULT false; +CREATE INDEX redactions_have_censored ON redactions(event_id) WHERE not have_censored; -- cgit 1.4.1 From 3ff0422d2dbfa668df365da99a4b7caeea85528d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Sep 2019 17:16:03 +0100 Subject: Make redaction retention period configurable --- docs/sample_config.yaml | 5 +++++ synapse/config/server.py | 15 +++++++++++++++ synapse/storage/events.py | 6 ++++-- tests/storage/test_redaction.py | 4 +++- 4 files changed, 27 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 43969bbb70..e23b80d2b8 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -306,6 +306,11 @@ listeners: # #allow_per_room_profiles: false +# How long to keep redacted events in unredacted form in the database. +# By default redactions are kept indefinitely. +# +#redaction_retention_period: 30d + ## TLS ## diff --git a/synapse/config/server.py b/synapse/config/server.py index 2abdef0971..8efab924d4 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -162,6 +162,16 @@ class ServerConfig(Config): self.mau_trial_days = config.get("mau_trial_days", 0) + # How long to keep redacted events in the database in unredacted form + # before redacting them. + redaction_retention_period = config.get("redaction_retention_period") + if redaction_retention_period: + self.redaction_retention_period = self.parse_duration( + redaction_retention_period + ) + else: + self.redaction_retention_period = None + # Options to disable HS self.hs_disabled = config.get("hs_disabled", False) self.hs_disabled_message = config.get("hs_disabled_message", "") @@ -718,6 +728,11 @@ class ServerConfig(Config): # Defaults to 'true'. # #allow_per_room_profiles: false + + # How long to keep redacted events in unredacted form in the database. + # By default redactions are kept indefinitely. + # + #redaction_retention_period: 30d """ % locals() ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 2970da6829..d0d1781c90 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1566,10 +1566,12 @@ class EventsStore( Deferred """ - if self.stream_ordering_month_ago is None: + if not self.hs.config.redaction_retention_period: return - max_pos = self.stream_ordering_month_ago + max_pos = yield self.find_first_stream_ordering_after_ts( + self._clock.time_msec() - self.hs.config.redaction_retention_period + ) # We fetch all redactions that point to an event that we have that has # a stream ordering from over a month ago, that we haven't yet censored diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index 0c9f3c7071..f0e86d41a8 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -344,7 +344,9 @@ class RedactionTestCase(unittest.HomeserverTestCase): {"content": {"body": "t", "msgtype": "message"}}, json.loads(event_json) ) - # Advance by 30 days + # Advance by 30 days, then advance again to ensure that the looping call + # for updating the stream position gets called and then the looping call + # for the censoring gets called. self.reactor.advance(60 * 60 * 24 * 31) self.reactor.advance(60 * 60 * 2) -- cgit 1.4.1 From 80e14a8546efb9e2f9edec3b1de0a8b943351252 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Sep 2019 13:23:41 +0100 Subject: Handle setting retention period to 0 --- synapse/config/server.py | 2 +- synapse/storage/events.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/config/server.py b/synapse/config/server.py index 8efab924d4..aa71835dc3 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -165,7 +165,7 @@ class ServerConfig(Config): # How long to keep redacted events in the database in unredacted form # before redacting them. redaction_retention_period = config.get("redaction_retention_period") - if redaction_retention_period: + if redaction_retention_period is not None: self.redaction_retention_period = self.parse_duration( redaction_retention_period ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a5d13ddc49..77ba7eb2af 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1566,7 +1566,7 @@ class EventsStore( Deferred """ - if not self.hs.config.redaction_retention_period: + if self.hs.config.redaction_retention_period is None: return max_pos = yield self.find_first_stream_ordering_after_ts( -- cgit 1.4.1 From fffe17b77d06927aaf64fa80be5b765c870a4ef5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Sep 2019 13:24:24 +0100 Subject: Don't start looping call unless enabled --- synapse/storage/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 77ba7eb2af..9ef7aefd97 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -268,7 +268,8 @@ class EventsStore( "_censor_redactions", self._censor_redactions ) - hs.get_clock().looping_call(_censor_redactions, 10 * 60 * 1000) + if self.hs.config.redaction_retention_period is not None: + hs.get_clock().looping_call(_censor_redactions, 10 * 60 * 1000) @defer.inlineCallbacks def _read_forward_extremities(self): -- cgit 1.4.1 From 916c69722833dd94c53d0fedeec8cc42d2085e73 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Sep 2019 13:31:00 +0100 Subject: Fixup comment --- synapse/storage/events.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 9ef7aefd97..4484ae7ce0 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -269,7 +269,7 @@ class EventsStore( ) if self.hs.config.redaction_retention_period is not None: - hs.get_clock().looping_call(_censor_redactions, 10 * 60 * 1000) + hs.get_clock().looping_call(_censor_redactions, 5 * 60 * 1000) @defer.inlineCallbacks def _read_forward_extremities(self): @@ -1574,9 +1574,17 @@ class EventsStore( self._clock.time_msec() - self.hs.config.redaction_retention_period ) - # We fetch all redactions that point to an event that we have that has - # a stream ordering from over a month ago, that we haven't yet censored - # in the DB. + # We fetch all redactions that: + # 1. point to an event we have that has, + # 2. has a stream ordering from before the cut off, and + # 3. we haven't yet censored. + # + # This is limited to 100 events to ensure that we don't try and do too + # much at once. We'll get called again so this should eventually catch + # up. + # + # We use the range [-max_pos, max_pos] to handle backfilled events, + # which are given negative stream ordering. sql = """ SELECT er.event_id, redacts FROM redactions INNER JOIN events AS er USING (event_id) -- cgit 1.4.1 From e7184a437062ae21846b8e071ded73526209e90c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Sep 2019 13:33:38 +0100 Subject: Use better names in SQL --- synapse/storage/events.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 4484ae7ce0..0da6e0b1a1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1586,12 +1586,15 @@ class EventsStore( # We use the range [-max_pos, max_pos] to handle backfilled events, # which are given negative stream ordering. sql = """ - SELECT er.event_id, redacts FROM redactions - INNER JOIN events AS er USING (event_id) - INNER JOIN events AS eb ON (er.room_id = eb.room_id AND redacts = eb.event_id) + SELECT redact_event.event_id, redacts FROM redactions + INNER JOIN events AS redact_event USING (event_id) + INNER JOIN events AS original_event ON ( + redact_event.room_id = original_event.room_id + AND redacts = original_event.event_id + ) WHERE NOT have_censored - AND ? <= er.stream_ordering AND er.stream_ordering <= ? - ORDER BY er.stream_ordering ASC + AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ? + ORDER BY redact_event.stream_ordering ASC LIMIT ? """ -- cgit 1.4.1 From 8b9ade8c7871c862cf2122a156f00e411cd7a276 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Sep 2019 13:40:05 +0100 Subject: Default to censoring redactions after seven days --- docs/sample_config.yaml | 8 +++++--- synapse/config/server.py | 10 ++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) (limited to 'synapse') diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index e23b80d2b8..24adc3da2f 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -306,10 +306,12 @@ listeners: # #allow_per_room_profiles: false -# How long to keep redacted events in unredacted form in the database. -# By default redactions are kept indefinitely. +# How long to keep redacted events in unredacted form in the database. After +# this period redacted events get replaced with their redacted form in the DB. # -#redaction_retention_period: 30d +# Defaults to `7d`. Set to `null` to disable. +# +redaction_retention_period: 7d ## TLS ## diff --git a/synapse/config/server.py b/synapse/config/server.py index aa71835dc3..c8b9fe2d0f 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -164,7 +164,7 @@ class ServerConfig(Config): # How long to keep redacted events in the database in unredacted form # before redacting them. - redaction_retention_period = config.get("redaction_retention_period") + redaction_retention_period = config.get("redaction_retention_period", "7d") if redaction_retention_period is not None: self.redaction_retention_period = self.parse_duration( redaction_retention_period @@ -729,10 +729,12 @@ class ServerConfig(Config): # #allow_per_room_profiles: false - # How long to keep redacted events in unredacted form in the database. - # By default redactions are kept indefinitely. + # How long to keep redacted events in unredacted form in the database. After + # this period redacted events get replaced with their redacted form in the DB. # - #redaction_retention_period: 30d + # Defaults to `7d`. Set to `null` to disable. + # + redaction_retention_period: 7d """ % locals() ) -- cgit 1.4.1 From 580f3df9b2573c0278dd952d1478689e5cd23a7b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 9 Sep 2019 15:08:24 +0100 Subject: Fix comments --- synapse/storage/events.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 0da6e0b1a1..ddf7ab6479 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1559,7 +1559,8 @@ class EventsStore( @defer.inlineCallbacks def _censor_redactions(self): - """Censors all redactions older than a month that haven't been censored. + """Censors all redactions older than the configured period that haven't + been censored yet. By censor we mean update the event_json table with the redacted event. @@ -1575,7 +1576,7 @@ class EventsStore( ) # We fetch all redactions that: - # 1. point to an event we have that has, + # 1. point to an event we have, # 2. has a stream ordering from before the cut off, and # 3. we haven't yet censored. # -- cgit 1.4.1