summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py82
1 files changed, 79 insertions, 3 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2ffc27ff41..6e9f3d1dc0 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -554,10 +554,18 @@ class EventsStore(
             e_id for event in new_events for e_id in event.prev_event_ids()
         )
 
-        # Finally, remove any events which are prev_events of any existing events.
+        # Remove any events which are prev_events of any existing events.
         existing_prevs = yield self._get_events_which_are_prevs(result)
         result.difference_update(existing_prevs)
 
+        # Finally handle the case where the new events have soft-failed prev
+        # events. If they do we need to remove them and their prev events,
+        # otherwise we end up with dangling extremities.
+        existing_prevs = yield self._get_prevs_before_rejected(
+            e_id for event in new_events for e_id in event.prev_event_ids()
+        )
+        result.difference_update(existing_prevs)
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -573,7 +581,7 @@ class EventsStore(
         """
         results = []
 
-        def _get_events(txn, batch):
+        def _get_events_which_are_prevs_txn(txn, batch):
             sql = """
             SELECT prev_event_id, internal_metadata
             FROM event_edges
@@ -596,11 +604,79 @@ class EventsStore(
             )
 
         for chunk in batch_iter(event_ids, 100):
-            yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
+            yield self.runInteraction(
+                "_get_events_which_are_prevs",
+                _get_events_which_are_prevs_txn,
+                chunk,
+            )
 
         defer.returnValue(results)
 
     @defer.inlineCallbacks
+    def _get_prevs_before_rejected(self, event_ids):
+        """Get soft-failed ancestors to remove from the extremities.
+
+        Given a set of events, find all those that have been soft-failed or
+        rejected. Returns those soft failed/rejected events and their prev
+        events (whether soft-failed/rejected or not), and recurses up the
+        prev-event graph until it finds no more soft-failed/rejected events.
+
+        This is used to find extremities that are ancestors of new events, but
+        are separated by soft failed events.
+
+        Args:
+            event_ids (Iterable[str]): Events to find prev events for. Note
+                that these must have already been persisted.
+
+        Returns:
+            Deferred[set[str]]
+        """
+
+        # The set of event_ids to return. This includes all soft-failed events
+        # and their prev events.
+        existing_prevs = set()
+
+        def _get_prevs_before_rejected_txn(txn, batch):
+            to_recursively_check = batch
+
+            while to_recursively_check:
+                sql = """
+                SELECT
+                    event_id, prev_event_id, internal_metadata,
+                    rejections.event_id IS NOT NULL
+                FROM event_edges
+                    INNER JOIN events USING (event_id)
+                    LEFT JOIN rejections USING (event_id)
+                    LEFT JOIN event_json USING (event_id)
+                WHERE
+                    event_id IN (%s)
+                    AND NOT events.outlier
+                """ % (
+                    ",".join("?" for _ in to_recursively_check),
+                )
+
+                txn.execute(sql, to_recursively_check)
+                to_recursively_check = []
+
+                for event_id, prev_event_id, metadata, rejected in txn:
+                    if prev_event_id in existing_prevs:
+                        continue
+
+                    soft_failed = json.loads(metadata).get("soft_failed")
+                    if soft_failed or rejected:
+                        to_recursively_check.append(prev_event_id)
+                        existing_prevs.add(prev_event_id)
+
+        for chunk in batch_iter(event_ids, 100):
+            yield self.runInteraction(
+                "_get_prevs_before_rejected",
+                _get_prevs_before_rejected_txn,
+                chunk,
+            )
+
+        defer.returnValue(existing_prevs)
+
+    @defer.inlineCallbacks
     def _get_new_state_after_events(
         self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
     ):