summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py262
1 files changed, 81 insertions, 181 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2ffc27ff41..f9162be9b9 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2018-2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -219,41 +220,11 @@ class EventsStore(
     EventsWorkerStore,
     BackgroundUpdateStore,
 ):
-    EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
-    EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
 
     def __init__(self, db_conn, hs):
         super(EventsStore, self).__init__(db_conn, hs)
-        self.register_background_update_handler(
-            self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
-        )
-        self.register_background_update_handler(
-            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
-            self._background_reindex_fields_sender,
-        )
-
-        self.register_background_index_update(
-            "event_contains_url_index",
-            index_name="event_contains_url_index",
-            table="events",
-            columns=["room_id", "topological_ordering", "stream_ordering"],
-            where_clause="contains_url = true AND outlier = false",
-        )
-
-        # an event_id index on event_search is useful for the purge_history
-        # api. Plus it means we get to enforce some integrity with a UNIQUE
-        # clause
-        self.register_background_index_update(
-            "event_search_event_id_idx",
-            index_name="event_search_event_id_idx",
-            table="event_search",
-            columns=["event_id"],
-            unique=True,
-            psql_only=True,
-        )
 
         self._event_persist_queue = _EventPeristenceQueue()
-
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
     @defer.inlineCallbacks
@@ -554,10 +525,18 @@ class EventsStore(
             e_id for event in new_events for e_id in event.prev_event_ids()
         )
 
-        # Finally, remove any events which are prev_events of any existing events.
+        # Remove any events which are prev_events of any existing events.
         existing_prevs = yield self._get_events_which_are_prevs(result)
         result.difference_update(existing_prevs)
 
+        # Finally handle the case where the new events have soft-failed prev
+        # events. If they do we need to remove them and their prev events,
+        # otherwise we end up with dangling extremities.
+        existing_prevs = yield self._get_prevs_before_rejected(
+            e_id for event in new_events for e_id in event.prev_event_ids()
+        )
+        result.difference_update(existing_prevs)
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -573,7 +552,7 @@ class EventsStore(
         """
         results = []
 
-        def _get_events(txn, batch):
+        def _get_events_which_are_prevs_txn(txn, batch):
             sql = """
             SELECT prev_event_id, internal_metadata
             FROM event_edges
@@ -596,11 +575,79 @@ class EventsStore(
             )
 
         for chunk in batch_iter(event_ids, 100):
-            yield self.runInteraction("_get_events_which_are_prevs", _get_events, chunk)
+            yield self.runInteraction(
+                "_get_events_which_are_prevs",
+                _get_events_which_are_prevs_txn,
+                chunk,
+            )
 
         defer.returnValue(results)
 
     @defer.inlineCallbacks
+    def _get_prevs_before_rejected(self, event_ids):
+        """Get soft-failed ancestors to remove from the extremities.
+
+        Given a set of events, find all those that have been soft-failed or
+        rejected. Returns those soft failed/rejected events and their prev
+        events (whether soft-failed/rejected or not), and recurses up the
+        prev-event graph until it finds no more soft-failed/rejected events.
+
+        This is used to find extremities that are ancestors of new events, but
+        are separated by soft failed events.
+
+        Args:
+            event_ids (Iterable[str]): Events to find prev events for. Note
+                that these must have already been persisted.
+
+        Returns:
+            Deferred[set[str]]
+        """
+
+        # The set of event_ids to return. This includes all soft-failed events
+        # and their prev events.
+        existing_prevs = set()
+
+        def _get_prevs_before_rejected_txn(txn, batch):
+            to_recursively_check = batch
+
+            while to_recursively_check:
+                sql = """
+                SELECT
+                    event_id, prev_event_id, internal_metadata,
+                    rejections.event_id IS NOT NULL
+                FROM event_edges
+                    INNER JOIN events USING (event_id)
+                    LEFT JOIN rejections USING (event_id)
+                    LEFT JOIN event_json USING (event_id)
+                WHERE
+                    event_id IN (%s)
+                    AND NOT events.outlier
+                """ % (
+                    ",".join("?" for _ in to_recursively_check),
+                )
+
+                txn.execute(sql, to_recursively_check)
+                to_recursively_check = []
+
+                for event_id, prev_event_id, metadata, rejected in txn:
+                    if prev_event_id in existing_prevs:
+                        continue
+
+                    soft_failed = json.loads(metadata).get("soft_failed")
+                    if soft_failed or rejected:
+                        to_recursively_check.append(prev_event_id)
+                        existing_prevs.add(prev_event_id)
+
+        for chunk in batch_iter(event_ids, 100):
+            yield self.runInteraction(
+                "_get_prevs_before_rejected",
+                _get_prevs_before_rejected_txn,
+                chunk,
+            )
+
+        defer.returnValue(existing_prevs)
+
+    @defer.inlineCallbacks
     def _get_new_state_after_events(
         self, room_id, events_context, old_latest_event_ids, new_latest_event_ids
     ):
@@ -1503,153 +1550,6 @@ class EventsStore(
         ret = yield self.runInteraction("count_daily_active_rooms", _count)
         defer.returnValue(ret)
 
-    @defer.inlineCallbacks
-    def _background_reindex_fields_sender(self, progress, batch_size):
-        target_min_stream_id = progress["target_min_stream_id_inclusive"]
-        max_stream_id = progress["max_stream_id_exclusive"]
-        rows_inserted = progress.get("rows_inserted", 0)
-
-        INSERT_CLUMP_SIZE = 1000
-
-        def reindex_txn(txn):
-            sql = (
-                "SELECT stream_ordering, event_id, json FROM events"
-                " INNER JOIN event_json USING (event_id)"
-                " WHERE ? <= stream_ordering AND stream_ordering < ?"
-                " ORDER BY stream_ordering DESC"
-                " LIMIT ?"
-            )
-
-            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
-
-            rows = txn.fetchall()
-            if not rows:
-                return 0
-
-            min_stream_id = rows[-1][0]
-
-            update_rows = []
-            for row in rows:
-                try:
-                    event_id = row[1]
-                    event_json = json.loads(row[2])
-                    sender = event_json["sender"]
-                    content = event_json["content"]
-
-                    contains_url = "url" in content
-                    if contains_url:
-                        contains_url &= isinstance(content["url"], text_type)
-                except (KeyError, AttributeError):
-                    # If the event is missing a necessary field then
-                    # skip over it.
-                    continue
-
-                update_rows.append((sender, contains_url, event_id))
-
-            sql = "UPDATE events SET sender = ?, contains_url = ? WHERE event_id = ?"
-
-            for index in range(0, len(update_rows), INSERT_CLUMP_SIZE):
-                clump = update_rows[index : index + INSERT_CLUMP_SIZE]
-                txn.executemany(sql, clump)
-
-            progress = {
-                "target_min_stream_id_inclusive": target_min_stream_id,
-                "max_stream_id_exclusive": min_stream_id,
-                "rows_inserted": rows_inserted + len(rows),
-            }
-
-            self._background_update_progress_txn(
-                txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
-            )
-
-            return len(rows)
-
-        result = yield self.runInteraction(
-            self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, reindex_txn
-        )
-
-        if not result:
-            yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
-
-        defer.returnValue(result)
-
-    @defer.inlineCallbacks
-    def _background_reindex_origin_server_ts(self, progress, batch_size):
-        target_min_stream_id = progress["target_min_stream_id_inclusive"]
-        max_stream_id = progress["max_stream_id_exclusive"]
-        rows_inserted = progress.get("rows_inserted", 0)
-
-        INSERT_CLUMP_SIZE = 1000
-
-        def reindex_search_txn(txn):
-            sql = (
-                "SELECT stream_ordering, event_id FROM events"
-                " WHERE ? <= stream_ordering AND stream_ordering < ?"
-                " ORDER BY stream_ordering DESC"
-                " LIMIT ?"
-            )
-
-            txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
-
-            rows = txn.fetchall()
-            if not rows:
-                return 0
-
-            min_stream_id = rows[-1][0]
-            event_ids = [row[1] for row in rows]
-
-            rows_to_update = []
-
-            chunks = [event_ids[i : i + 100] for i in range(0, len(event_ids), 100)]
-            for chunk in chunks:
-                ev_rows = self._simple_select_many_txn(
-                    txn,
-                    table="event_json",
-                    column="event_id",
-                    iterable=chunk,
-                    retcols=["event_id", "json"],
-                    keyvalues={},
-                )
-
-                for row in ev_rows:
-                    event_id = row["event_id"]
-                    event_json = json.loads(row["json"])
-                    try:
-                        origin_server_ts = event_json["origin_server_ts"]
-                    except (KeyError, AttributeError):
-                        # If the event is missing a necessary field then
-                        # skip over it.
-                        continue
-
-                    rows_to_update.append((origin_server_ts, event_id))
-
-            sql = "UPDATE events SET origin_server_ts = ? WHERE event_id = ?"
-
-            for index in range(0, len(rows_to_update), INSERT_CLUMP_SIZE):
-                clump = rows_to_update[index : index + INSERT_CLUMP_SIZE]
-                txn.executemany(sql, clump)
-
-            progress = {
-                "target_min_stream_id_inclusive": target_min_stream_id,
-                "max_stream_id_exclusive": min_stream_id,
-                "rows_inserted": rows_inserted + len(rows_to_update),
-            }
-
-            self._background_update_progress_txn(
-                txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
-            )
-
-            return len(rows_to_update)
-
-        result = yield self.runInteraction(
-            self.EVENT_ORIGIN_SERVER_TS_NAME, reindex_search_txn
-        )
-
-        if not result:
-            yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
-
-        defer.returnValue(result)
-
     def get_current_backfill_token(self):
         """The current minimum token that backfilled events have reached"""
         return -self._backfill_id_gen.get_current_token()