diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ddf7ab6479..bb6ff0595a 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,7 +23,7 @@ from functools import wraps
from six import iteritems, text_type
from six.moves import range
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import json
from prometheus_client import Counter, Histogram
from twisted.internet import defer
@@ -1389,6 +1389,18 @@ class EventsStore(
],
)
+ for event, _ in events_and_contexts:
+ if not event.internal_metadata.is_redacted():
+ # If we're persisting an unredacted event we go and ensure
+ # that we mark any redactions that reference this event as
+ # requiring censoring.
+ self._simple_update_txn(
+ txn,
+ table="redactions",
+ keyvalues={"redacts": event.event_id},
+ updatevalues={"have_censored": False},
+ )
+
def _store_rejected_events_txn(self, txn, events_and_contexts):
"""Add rows to the 'rejections' table for received events which were
rejected
@@ -1552,9 +1564,15 @@ class EventsStore(
def _store_redaction(self, txn, event):
# invalidate the cache for the redacted event
txn.call_after(self._invalidate_get_event_cache, event.redacts)
- txn.execute(
- "INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
- (event.event_id, event.redacts),
+
+ self._simple_insert_txn(
+ txn,
+ table="redactions",
+ values={
+ "event_id": event.event_id,
+ "redacts": event.redacts,
+ "received_ts": self._clock.time_msec(),
+ },
)
@defer.inlineCallbacks
@@ -1571,36 +1589,29 @@ class EventsStore(
if self.hs.config.redaction_retention_period is None:
return
- max_pos = yield self.find_first_stream_ordering_after_ts(
- self._clock.time_msec() - self.hs.config.redaction_retention_period
- )
+ before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period
# We fetch all redactions that:
# 1. point to an event we have,
- # 2. has a stream ordering from before the cut off, and
+ # 2. has a received_ts from before the cut off, and
# 3. we haven't yet censored.
#
# This is limited to 100 events to ensure that we don't try and do too
# much at once. We'll get called again so this should eventually catch
# up.
- #
- # We use the range [-max_pos, max_pos] to handle backfilled events,
- # which are given negative stream ordering.
sql = """
- SELECT redact_event.event_id, redacts FROM redactions
- INNER JOIN events AS redact_event USING (event_id)
- INNER JOIN events AS original_event ON (
- redact_event.room_id = original_event.room_id
- AND redacts = original_event.event_id
+ SELECT redactions.event_id, redacts FROM redactions
+ LEFT JOIN events AS original_event ON (
+ redacts = original_event.event_id
)
WHERE NOT have_censored
- AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ?
- ORDER BY redact_event.stream_ordering ASC
+ AND redactions.received_ts <= ?
+ ORDER BY redactions.received_ts ASC
LIMIT ?
"""
rows = yield self._execute(
- "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100
+ "_censor_redactions_fetch", None, sql, before_ts, 100
)
updates = []
@@ -1621,9 +1632,7 @@ class EventsStore(
and original_event.internal_metadata.is_redacted()
):
# Redaction was allowed
- pruned_json = encode_canonical_json(
- prune_event_dict(original_event.get_dict())
- )
+ pruned_json = encode_json(prune_event_dict(original_event.get_dict()))
else:
# Redaction wasn't allowed
pruned_json = None
|