diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6e9f3d1dc0..a9be143bd5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -221,6 +221,7 @@ class EventsStore(
):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
+ EVENT_DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
@@ -252,6 +253,11 @@ class EventsStore(
psql_only=True,
)
+ self.register_background_update_handler(
+ self.EVENT_DELETE_SOFT_FAILED_EXTREMITIES,
+ self._cleanup_extremities_bg_update,
+ )
+
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -2341,6 +2347,186 @@ class EventsStore(
get_all_updated_current_state_deltas_txn,
)
+ @defer.inlineCallbacks
+ def _cleanup_extremities_bg_update(self, progress, batch_size):
+ """Background update to clean out extremities that should have been
+ deleted previously.
+
+ Mainly used to deal with the aftermath of #5269.
+ """
+
+ # This works by first copying all existing forward extremities into the
+ # `_extremities_to_check` table at start up, and then checking each
+ # event in that table whether we have any descendants that are not
+ # soft-failed/rejected. If that is the case then we delete that event
+ # from the forward extremities table.
+ #
+ # For efficiency, we do this in batches by recursively pulling out all
+ # descendants of a batch until we find the non soft-failed/rejected
+ # events, i.e. the set of descendants whose chain of prev events back
+ # to the batch of extremities are all soft-failed or rejected.
+ # Typically, we won't find any such events as extremities will rarely
+ # have any descendants, but if they do then we should delete those
+ # extremities.
+
+ def _cleanup_extremities_bg_update_txn(txn):
+ # The set of extremity event IDs that we're checking this round
+ original_set = set()
+
+ # A dict[str, set[str]] of event ID to their prev events.
+ graph = {}
+
+ # The set of descendants of the original set that are not rejected
+ # nor soft-failed. Ancestors of these events should be removed
+ # from the forward extremities table.
+ non_rejected_leaves = set()
+
+ # Set of event IDs that have been soft failed, and for which we
+ # should check if they have descendants which haven't been soft
+ # failed.
+ soft_failed_events_to_lookup = set()
+
+ # First, we get `batch_size` events from the table, pulling out
+ # their prev events, if any, and their prev events rejection status.
+ txn.execute(
+ """SELECT prev_event_id, event_id, internal_metadata,
+ rejections.event_id IS NOT NULL, events.outlier
+ FROM (
+ SELECT event_id AS prev_event_id
+ FROM _extremities_to_check
+ LIMIT ?
+ ) AS f
+ LEFT JOIN event_edges USING (prev_event_id)
+ LEFT JOIN events USING (event_id)
+ LEFT JOIN event_json USING (event_id)
+ LEFT JOIN rejections USING (event_id)
+ """, (batch_size,)
+ )
+
+ for prev_event_id, event_id, metadata, rejected, outlier in txn:
+ original_set.add(prev_event_id)
+
+ if not event_id or outlier:
+ # Common case where the forward extremity doesn't have any
+ # descendants.
+ continue
+
+ graph.setdefault(event_id, set()).add(prev_event_id)
+
+ soft_failed = False
+ if metadata:
+ soft_failed = json.loads(metadata).get("soft_failed")
+
+ if soft_failed or rejected:
+ soft_failed_events_to_lookup.add(event_id)
+ else:
+ non_rejected_leaves.add(event_id)
+
+ # Now we recursively check all the soft-failed descendants we
+ # found above in the same way, until we have nothing left to
+ # check.
+ while soft_failed_events_to_lookup:
+ # We only want to do 100 at a time, so we split given list
+ # into two.
+ batch = list(soft_failed_events_to_lookup)
+ to_check, to_defer = batch[:100], batch[100:]
+ soft_failed_events_to_lookup = set(to_defer)
+
+ sql = """SELECT prev_event_id, event_id, internal_metadata,
+ rejections.event_id IS NOT NULL
+ FROM event_edges
+ INNER JOIN events USING (event_id)
+ INNER JOIN event_json USING (event_id)
+ LEFT JOIN rejections USING (event_id)
+ WHERE
+ prev_event_id IN (%s)
+ AND NOT events.outlier
+ """ % (
+ ",".join("?" for _ in to_check),
+ )
+ txn.execute(sql, to_check)
+
+ for prev_event_id, event_id, metadata, rejected in txn:
+ if event_id in graph:
+ # Already handled this event previously, but we still
+ # want to record the edge.
+ graph.setdefault(event_id, set()).add(prev_event_id)
+ logger.info("Already handled")
+ continue
+
+ graph.setdefault(event_id, set()).add(prev_event_id)
+
+ soft_failed = json.loads(metadata).get("soft_failed")
+ if soft_failed or rejected:
+ soft_failed_events_to_lookup.add(event_id)
+ else:
+ non_rejected_leaves.add(event_id)
+
+ # We have a set of non-soft-failed descendants, so we recurse up
+ # the graph to find all ancestors and add them to the set of event
+ # IDs that we can delete from forward extremities table.
+ to_delete = set()
+ while non_rejected_leaves:
+ event_id = non_rejected_leaves.pop()
+ prev_event_ids = graph.get(event_id, set())
+ non_rejected_leaves.update(prev_event_ids)
+ to_delete.update(prev_event_ids)
+
+ to_delete.intersection_update(original_set)
+
+ logger.info("Deleting up to %d forward extremities", len(to_delete))
+
+ self._simple_delete_many_txn(
+ txn=txn,
+ table="event_forward_extremities",
+ column="event_id",
+ iterable=to_delete,
+ keyvalues={},
+ )
+
+ if to_delete:
+ # We now need to invalidate the caches of these rooms
+ rows = self._simple_select_many_txn(
+ txn,
+ table="events",
+ column="event_id",
+ iterable=to_delete,
+ keyvalues={},
+ retcols=("room_id",)
+ )
+ for row in rows:
+ txn.call_after(
+ self.get_latest_event_ids_in_room.invalidate,
+ (row["room_id"],)
+ )
+
+ self._simple_delete_many_txn(
+ txn=txn,
+ table="_extremities_to_check",
+ column="event_id",
+ iterable=original_set,
+ keyvalues={},
+ )
+
+ return len(original_set)
+
+ num_handled = yield self.runInteraction(
+ "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
+ )
+
+ if not num_handled:
+ yield self._end_background_update(self.EVENT_DELETE_SOFT_FAILED_EXTREMITIES)
+
+ def _drop_table_txn(txn):
+ txn.execute("DROP TABLE _extremities_to_check")
+
+ yield self.runInteraction(
+ "_cleanup_extremities_bg_update_drop_table",
+ _drop_table_txn,
+ )
+
+ defer.returnValue(num_handled)
+
AllNewEventsResult = namedtuple(
"AllNewEventsResult",
diff --git a/synapse/storage/schema/delta/54/delete_forward_extremities.sql b/synapse/storage/schema/delta/54/delete_forward_extremities.sql
new file mode 100644
index 0000000000..7056bd1d00
--- /dev/null
+++ b/synapse/storage/schema/delta/54/delete_forward_extremities.sql
@@ -0,0 +1,19 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('delete_soft_failed_extremities', '{}');
+
+CREATE TABLE _extremities_to_check AS SELECT event_id FROM event_forward_extremities;
|