summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-05-28 18:52:41 +0100
committerErik Johnston <erik@matrix.org>2019-05-30 16:15:37 +0100
commit6ebc08c09d4ced251750cb087aa4689f90cdd4b6 (patch)
tree1a4645aec6534fea50247108fea03cf1f628330a /synapse/storage/events.py
parentCorrectly filter out extremities with soft failed prevs (#5274) (diff)
downloadsynapse-6ebc08c09d4ced251750cb087aa4689f90cdd4b6.tar.xz
Add DB bg update to cleanup extremities.
Due to #5269 we may have extremities in our DB that we shouldn't have,
so lets add a cleanup task such to remove those.
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py186
1 files changed, 186 insertions, 0 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6e9f3d1dc0..a9be143bd5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -221,6 +221,7 @@ class EventsStore(
 ):
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
+    EVENT_DELETE_SOFT_FAILED_EXTREMITIES = "delete_soft_failed_extremities"
 
     def __init__(self, db_conn, hs):
         super(EventsStore, self).__init__(db_conn, hs)
@@ -252,6 +253,11 @@ class EventsStore(
             psql_only=True,
         )
 
+        self.register_background_update_handler(
+            self.EVENT_DELETE_SOFT_FAILED_EXTREMITIES,
+            self._cleanup_extremities_bg_update,
+        )
+
         self._event_persist_queue = _EventPeristenceQueue()
 
         self._state_resolution_handler = hs.get_state_resolution_handler()
@@ -2341,6 +2347,186 @@ class EventsStore(
             get_all_updated_current_state_deltas_txn,
         )
 
+    @defer.inlineCallbacks
+    def _cleanup_extremities_bg_update(self, progress, batch_size):
+        """Background update to clean out extremities that should have been
+        deleted previously.
+
+        Mainly used to deal with the aftermath of #5269.
+        """
+
+        # This works by first copying all existing forward extremities into the
+        # `_extremities_to_check` table at start up, and then checking each
+        # event in that table whether we have any descendants that are not
+        # soft-failed/rejected. If that is the case then we delete that event
+        # from the forward extremities table.
+        #
+        # For efficiency, we do this in batches by recursively pulling out all
+        # descendants of a batch until we find the non soft-failed/rejected
+        # events, i.e. the set of descendants whose chain of prev events back
+        # to the batch of extremities are all soft-failed or rejected.
+        # Typically, we won't find any such events as extremities will rarely
+        # have any descendants, but if they do then we should delete those
+        # extremities.
+
+        def _cleanup_extremities_bg_update_txn(txn):
+            # The set of extremity event IDs that we're checking this round
+            original_set = set()
+
+            # A dict[str, set[str]] of event ID to their prev events.
+            graph = {}
+
+            # The set of descendants of the original set that are not rejected
+            # nor soft-failed. Ancestors of these events should be removed
+            # from the forward extremities table.
+            non_rejected_leaves = set()
+
+            # Set of event IDs that have been soft failed, and for which we
+            # should check if they have descendants which haven't been soft
+            # failed.
+            soft_failed_events_to_lookup = set()
+
+            # First, we get `batch_size` events from the table, pulling out
+            # their prev events, if any, and their prev events rejection status.
+            txn.execute(
+                """SELECT prev_event_id, event_id, internal_metadata,
+                    rejections.event_id IS NOT NULL, events.outlier
+                FROM (
+                    SELECT event_id AS prev_event_id
+                    FROM _extremities_to_check
+                    LIMIT ?
+                ) AS f
+                LEFT JOIN event_edges USING (prev_event_id)
+                LEFT JOIN events USING (event_id)
+                LEFT JOIN event_json USING (event_id)
+                LEFT JOIN rejections USING (event_id)
+                """, (batch_size,)
+            )
+
+            for prev_event_id, event_id, metadata, rejected, outlier in txn:
+                original_set.add(prev_event_id)
+
+                if not event_id or outlier:
+                    # Common case where the forward extremity doesn't have any
+                    # descendants.
+                    continue
+
+                graph.setdefault(event_id, set()).add(prev_event_id)
+
+                soft_failed = False
+                if metadata:
+                    soft_failed = json.loads(metadata).get("soft_failed")
+
+                if soft_failed or rejected:
+                    soft_failed_events_to_lookup.add(event_id)
+                else:
+                    non_rejected_leaves.add(event_id)
+
+            # Now we recursively check all the soft-failed descendants we
+            # found above in the same way, until we have nothing left to
+            # check.
+            while soft_failed_events_to_lookup:
+                # We only want to do 100 at a time, so we split given list
+                # into two.
+                batch = list(soft_failed_events_to_lookup)
+                to_check, to_defer = batch[:100], batch[100:]
+                soft_failed_events_to_lookup = set(to_defer)
+
+                sql = """SELECT prev_event_id, event_id, internal_metadata,
+                    rejections.event_id IS NOT NULL
+                    FROM event_edges
+                    INNER JOIN events USING (event_id)
+                    INNER JOIN event_json USING (event_id)
+                    LEFT JOIN rejections USING (event_id)
+                    WHERE
+                        prev_event_id IN (%s)
+                        AND NOT events.outlier
+                """ % (
+                    ",".join("?" for _ in to_check),
+                )
+                txn.execute(sql, to_check)
+
+                for prev_event_id, event_id, metadata, rejected in txn:
+                    if event_id in graph:
+                        # Already handled this event previously, but we still
+                        # want to record the edge.
+                        graph.setdefault(event_id, set()).add(prev_event_id)
+                        logger.info("Already handled")
+                        continue
+
+                    graph.setdefault(event_id, set()).add(prev_event_id)
+
+                    soft_failed = json.loads(metadata).get("soft_failed")
+                    if soft_failed or rejected:
+                        soft_failed_events_to_lookup.add(event_id)
+                    else:
+                        non_rejected_leaves.add(event_id)
+
+            # We have a set of non-soft-failed descendants, so we recurse up
+            # the graph to find all ancestors and add them to the set of event
+            # IDs that we can delete from forward extremities table.
+            to_delete = set()
+            while non_rejected_leaves:
+                event_id = non_rejected_leaves.pop()
+                prev_event_ids = graph.get(event_id, set())
+                non_rejected_leaves.update(prev_event_ids)
+                to_delete.update(prev_event_ids)
+
+            to_delete.intersection_update(original_set)
+
+            logger.info("Deleting up to %d forward extremities", len(to_delete))
+
+            self._simple_delete_many_txn(
+                txn=txn,
+                table="event_forward_extremities",
+                column="event_id",
+                iterable=to_delete,
+                keyvalues={},
+            )
+
+            if to_delete:
+                # We now need to invalidate the caches of these rooms
+                rows = self._simple_select_many_txn(
+                    txn,
+                    table="events",
+                    column="event_id",
+                    iterable=to_delete,
+                    keyvalues={},
+                    retcols=("room_id",)
+                )
+                for row in rows:
+                    txn.call_after(
+                        self.get_latest_event_ids_in_room.invalidate,
+                        (row["room_id"],)
+                    )
+
+            self._simple_delete_many_txn(
+                txn=txn,
+                table="_extremities_to_check",
+                column="event_id",
+                iterable=original_set,
+                keyvalues={},
+            )
+
+            return len(original_set)
+
+        num_handled = yield self.runInteraction(
+            "_cleanup_extremities_bg_update", _cleanup_extremities_bg_update_txn,
+        )
+
+        if not num_handled:
+            yield self._end_background_update(self.EVENT_DELETE_SOFT_FAILED_EXTREMITIES)
+
+            def _drop_table_txn(txn):
+                txn.execute("DROP TABLE _extremities_to_check")
+
+            yield self.runInteraction(
+                "_cleanup_extremities_bg_update_drop_table",
+                _drop_table_txn,
+            )
+
+        defer.returnValue(num_handled)
+
 
 AllNewEventsResult = namedtuple(
     "AllNewEventsResult",