summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py47
1 files changed, 29 insertions, 18 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ddf7ab6479..2e485c8644 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1389,6 +1389,18 @@ class EventsStore(
             ],
         )
 
+        for event, _ in events_and_contexts:
+            if not event.internal_metadata.is_redacted():
+                # If we're persisting an unredacted event we go and ensure
+                # that we mark any redactions that reference this event as
+                # requiring censoring.
+                self._simple_update_txn(
+                    txn,
+                    table="redactions",
+                    keyvalues={"redacts": event.event_id},
+                    updatevalues={"have_censored": False},
+                )
+
     def _store_rejected_events_txn(self, txn, events_and_contexts):
         """Add rows to the 'rejections' table for received events which were
         rejected
@@ -1552,9 +1564,15 @@ class EventsStore(
     def _store_redaction(self, txn, event):
         # invalidate the cache for the redacted event
         txn.call_after(self._invalidate_get_event_cache, event.redacts)
-        txn.execute(
-            "INSERT INTO redactions (event_id, redacts) VALUES (?,?)",
-            (event.event_id, event.redacts),
+
+        self._simple_insert_txn(
+            txn,
+            table="redactions",
+            values={
+                "event_id": event.event_id,
+                "redacts": event.redacts,
+                "received_ts": self._clock.time_msec(),
+            },
         )
 
     @defer.inlineCallbacks
@@ -1571,36 +1589,29 @@ class EventsStore(
         if self.hs.config.redaction_retention_period is None:
             return
 
-        max_pos = yield self.find_first_stream_ordering_after_ts(
-            self._clock.time_msec() - self.hs.config.redaction_retention_period
-        )
+        before_ts = self._clock.time_msec() - self.hs.config.redaction_retention_period
 
         # We fetch all redactions that:
         #   1. point to an event we have,
-        #   2. has a stream ordering from before the cut off, and
+        #   2. has a received_ts from before the cut off, and
         #   3. we haven't yet censored.
         #
         # This is limited to 100 events to ensure that we don't try and do too
         # much at once. We'll get called again so this should eventually catch
         # up.
-        #
-        # We use the range [-max_pos, max_pos] to handle backfilled events,
-        # which are given negative stream ordering.
         sql = """
-            SELECT redact_event.event_id, redacts FROM redactions
-            INNER JOIN events AS redact_event USING (event_id)
-            INNER JOIN events AS original_event ON (
-                redact_event.room_id = original_event.room_id
-                AND redacts = original_event.event_id
+            SELECT redactions.event_id, redacts FROM redactions
+            LEFT JOIN events AS original_event ON (
+                redacts = original_event.event_id
             )
             WHERE NOT have_censored
-            AND ? <= redact_event.stream_ordering AND redact_event.stream_ordering <= ?
-            ORDER BY redact_event.stream_ordering ASC
+            AND redactions.received_ts <= ?
+            ORDER BY redactions.received_ts ASC
             LIMIT ?
         """
 
         rows = yield self._execute(
-            "_censor_redactions_fetch", None, sql, -max_pos, max_pos, 100
+            "_censor_redactions_fetch", None, sql, before_ts, 100
         )
 
         updates = []