summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/event_federation.py313
-rw-r--r--synapse/storage/databases/main/events.py13
2 files changed, 223 insertions, 103 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ca71f073fc..22f6474127 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,9 +16,10 @@ import logging
 from queue import Empty, PriorityQueue
 from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple
 
+import attr
 from prometheus_client import Counter, Gauge
 
-from synapse.api.constants import MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import EventFormatVersions, RoomVersion
 from synapse.events import EventBase, make_event_from_dict
@@ -60,6 +61,15 @@ pdus_pruned_from_federation_queue = Counter(
 logger = logging.getLogger(__name__)
 
 
+# All the info we need while iterating the DAG while backfilling
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class BackfillQueueNavigationItem:
+    depth: int
+    stream_ordering: int
+    event_id: str
+    type: str
+
+
 class _NoChainCoverIndex(Exception):
     def __init__(self, room_id: str):
         super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
@@ -74,6 +84,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
     ):
         super().__init__(database, db_conn, hs)
 
+        self.hs = hs
+
         if hs.config.worker.run_background_tasks:
             hs.get_clock().looping_call(
                 self._delete_old_forward_extrem_cache, 60 * 60 * 1000
@@ -737,7 +749,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             room_id,
         )
 
-    async def get_insertion_event_backwards_extremities_in_room(
+    async def get_insertion_event_backward_extremities_in_room(
         self, room_id
     ) -> Dict[str, int]:
         """Get the insertion events we know about that we haven't backfilled yet.
@@ -754,7 +766,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             Map from event_id to depth
         """
 
-        def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
+        def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
             sql = """
                 SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
                 /* We only want insertion events that are also marked as backwards extremities */
@@ -770,8 +782,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             return dict(txn)
 
         return await self.db_pool.runInteraction(
-            "get_insertion_event_backwards_extremities_in_room",
-            get_insertion_event_backwards_extremities_in_room_txn,
+            "get_insertion_event_backward_extremities_in_room",
+            get_insertion_event_backward_extremities_in_room_txn,
             room_id,
         )
 
@@ -997,143 +1009,242 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
         )
 
-    async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
-        """Get a list of Events for a given topic that occurred before (and
-        including) the events in event_list. Return a list of max size `limit`
+    def _get_connected_batch_event_backfill_results_txn(
+        self, txn: LoggingTransaction, insertion_event_id: str, limit: int
+    ) -> List[BackfillQueueNavigationItem]:
+        """
+        Find any batch connections of a given insertion event.
+        A batch event points at a insertion event via:
+        batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID]
 
         Args:
-            room_id
-            event_list
-            limit
+            txn: The database transaction to use
+            insertion_event_id: The event ID to navigate from. We will find
+                batch events that point back at this insertion event.
+            limit: Max number of event ID's to query for and return
+
+        Returns:
+            List of batch events that the backfill queue can process
+        """
+        batch_connection_query = """
+            SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
+            /* Find the batch that connects to the given insertion event */
+            INNER JOIN batch_events AS c
+            ON i.next_batch_id = c.batch_id
+            /* Get the depth of the batch start event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which matches the given event_id */
+            WHERE i.event_id = ?
+            LIMIT ?
         """
-        event_ids = await self.db_pool.runInteraction(
-            "get_backfill_events",
-            self._get_backfill_events,
-            room_id,
-            event_list,
-            limit,
-        )
-        events = await self.get_events_as_list(event_ids)
-        return sorted(events, key=lambda e: -e.depth)
 
-    def _get_backfill_events(self, txn, room_id, event_list, limit):
-        logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
+        # Find any batch connections for the given insertion event
+        txn.execute(
+            batch_connection_query,
+            (insertion_event_id, limit),
+        )
+        return [
+            BackfillQueueNavigationItem(
+                depth=row[0],
+                stream_ordering=row[1],
+                event_id=row[2],
+                type=row[3],
+            )
+            for row in txn
+        ]
 
-        event_results = set()
+    def _get_connected_prev_event_backfill_results_txn(
+        self, txn: LoggingTransaction, event_id: str, limit: int
+    ) -> List[BackfillQueueNavigationItem]:
+        """
+        Find any events connected by prev_event the specified event_id.
 
-        # We want to make sure that we do a breadth-first, "depth" ordered
-        # search.
+        Args:
+            txn: The database transaction to use
+            event_id: The event ID to navigate from
+            limit: Max number of event ID's to query for and return
 
+        Returns:
+            List of prev events that the backfill queue can process
+        """
         # Look for the prev_event_id connected to the given event_id
-        query = """
-            SELECT depth, prev_event_id FROM event_edges
-            /* Get the depth of the prev_event_id from the events table */
+        connected_prev_event_query = """
+            SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
+            /* Get the depth and stream_ordering of the prev_event_id from the events table */
             INNER JOIN events
             ON prev_event_id = events.event_id
-            /* Find an event which matches the given event_id */
+            /* Look for an edge which matches the given event_id */
             WHERE event_edges.event_id = ?
             AND event_edges.is_state = ?
+            /* Because we can have many events at the same depth,
+            * we want to also tie-break and sort on stream_ordering */
+            ORDER BY depth DESC, stream_ordering DESC
             LIMIT ?
         """
 
-        # Look for the "insertion" events connected to the given event_id
-        connected_insertion_event_query = """
-            SELECT e.depth, i.event_id FROM insertion_event_edges AS i
-            /* Get the depth of the insertion event from the events table */
-            INNER JOIN events AS e USING (event_id)
-            /* Find an insertion event which points via prev_events to the given event_id */
-            WHERE i.insertion_prev_event_id = ?
-            LIMIT ?
+        txn.execute(
+            connected_prev_event_query,
+            (event_id, False, limit),
+        )
+        return [
+            BackfillQueueNavigationItem(
+                depth=row[0],
+                stream_ordering=row[1],
+                event_id=row[2],
+                type=row[3],
+            )
+            for row in txn
+        ]
+
+    async def get_backfill_events(
+        self, room_id: str, seed_event_id_list: list, limit: int
+    ):
+        """Get a list of Events for a given topic that occurred before (and
+        including) the events in seed_event_id_list. Return a list of max size `limit`
+
+        Args:
+            room_id
+            seed_event_id_list
+            limit
         """
+        event_ids = await self.db_pool.runInteraction(
+            "get_backfill_events",
+            self._get_backfill_events,
+            room_id,
+            seed_event_id_list,
+            limit,
+        )
+        events = await self.get_events_as_list(event_ids)
+        return sorted(
+            events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
+        )
 
-        # Find any batch connections of a given insertion event
-        batch_connection_query = """
-            SELECT e.depth, c.event_id FROM insertion_events AS i
-            /* Find the batch that connects to the given insertion event */
-            INNER JOIN batch_events AS c
-            ON i.next_batch_id = c.batch_id
-            /* Get the depth of the batch start event from the events table */
-            INNER JOIN events AS e USING (event_id)
-            /* Find an insertion event which matches the given event_id */
-            WHERE i.event_id = ?
-            LIMIT ?
+    def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
+        """
+        We want to make sure that we do a breadth-first, "depth" ordered search.
+        We also handle navigating historical branches of history connected by
+        insertion and batch events.
         """
+        logger.debug(
+            "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
+            room_id,
+            seed_event_id_list,
+            limit,
+        )
+
+        event_id_results = set()
 
         # In a PriorityQueue, the lowest valued entries are retrieved first.
-        # We're using depth as the priority in the queue.
-        # Depth is lowest at the oldest-in-time message and highest and
-        # newest-in-time message. We add events to the queue with a negative depth so that
-        # we process the newest-in-time messages first going backwards in time.
+        # We're using depth as the priority in the queue and tie-break based on
+        # stream_ordering. Depth is lowest at the oldest-in-time message and
+        # highest and newest-in-time message. We add events to the queue with a
+        # negative depth so that we process the newest-in-time messages first
+        # going backwards in time. stream_ordering follows the same pattern.
         queue = PriorityQueue()
 
-        for event_id in event_list:
-            depth = self.db_pool.simple_select_one_onecol_txn(
+        for seed_event_id in seed_event_id_list:
+            event_lookup_result = self.db_pool.simple_select_one_txn(
                 txn,
                 table="events",
-                keyvalues={"event_id": event_id, "room_id": room_id},
-                retcol="depth",
+                keyvalues={"event_id": seed_event_id, "room_id": room_id},
+                retcols=(
+                    "type",
+                    "depth",
+                    "stream_ordering",
+                ),
                 allow_none=True,
             )
 
-            if depth:
-                queue.put((-depth, event_id))
+            if event_lookup_result is not None:
+                logger.debug(
+                    "_get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
+                    room_id,
+                    seed_event_id,
+                    event_lookup_result["depth"],
+                    event_lookup_result["stream_ordering"],
+                    event_lookup_result["type"],
+                )
 
-        while not queue.empty() and len(event_results) < limit:
+                if event_lookup_result["depth"]:
+                    queue.put(
+                        (
+                            -event_lookup_result["depth"],
+                            -event_lookup_result["stream_ordering"],
+                            seed_event_id,
+                            event_lookup_result["type"],
+                        )
+                    )
+
+        while not queue.empty() and len(event_id_results) < limit:
             try:
-                _, event_id = queue.get_nowait()
+                _, _, event_id, event_type = queue.get_nowait()
             except Empty:
                 break
 
-            if event_id in event_results:
+            if event_id in event_id_results:
                 continue
 
-            event_results.add(event_id)
+            event_id_results.add(event_id)
 
             # Try and find any potential historical batches of message history.
-            #
-            # First we look for an insertion event connected to the current
-            # event (by prev_event). If we find any, we need to go and try to
-            # find any batch events connected to the insertion event (by
-            # batch_id). If we find any, we'll add them to the queue and
-            # navigate up the DAG like normal in the next iteration of the loop.
-            txn.execute(
-                connected_insertion_event_query, (event_id, limit - len(event_results))
-            )
-            connected_insertion_event_id_results = txn.fetchall()
-            logger.debug(
-                "_get_backfill_events: connected_insertion_event_query %s",
-                connected_insertion_event_id_results,
-            )
-            for row in connected_insertion_event_id_results:
-                connected_insertion_event_depth = row[0]
-                connected_insertion_event = row[1]
-                queue.put((-connected_insertion_event_depth, connected_insertion_event))
+            if self.hs.config.experimental.msc2716_enabled:
+                # We need to go and try to find any batch events connected
+                # to a given insertion event (by batch_id). If we find any, we'll
+                # add them to the queue and navigate up the DAG like normal in the
+                # next iteration of the loop.
+                if event_type == EventTypes.MSC2716_INSERTION:
+                    # Find any batch connections for the given insertion event
+                    connected_batch_event_backfill_results = (
+                        self._get_connected_batch_event_backfill_results_txn(
+                            txn, event_id, limit - len(event_id_results)
+                        )
+                    )
+                    logger.debug(
+                        "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
+                        room_id,
+                        connected_batch_event_backfill_results,
+                    )
+                    for (
+                        connected_batch_event_backfill_item
+                    ) in connected_batch_event_backfill_results:
+                        if (
+                            connected_batch_event_backfill_item.event_id
+                            not in event_id_results
+                        ):
+                            queue.put(
+                                (
+                                    -connected_batch_event_backfill_item.depth,
+                                    -connected_batch_event_backfill_item.stream_ordering,
+                                    connected_batch_event_backfill_item.event_id,
+                                    connected_batch_event_backfill_item.type,
+                                )
+                            )
 
-                # Find any batch connections for the given insertion event
-                txn.execute(
-                    batch_connection_query,
-                    (connected_insertion_event, limit - len(event_results)),
-                )
-                batch_start_event_id_results = txn.fetchall()
-                logger.debug(
-                    "_get_backfill_events: batch_start_event_id_results %s",
-                    batch_start_event_id_results,
+            # Now we just look up the DAG by prev_events as normal
+            connected_prev_event_backfill_results = (
+                self._get_connected_prev_event_backfill_results_txn(
+                    txn, event_id, limit - len(event_id_results)
                 )
-                for row in batch_start_event_id_results:
-                    if row[1] not in event_results:
-                        queue.put((-row[0], row[1]))
-
-            txn.execute(query, (event_id, False, limit - len(event_results)))
-            prev_event_id_results = txn.fetchall()
+            )
             logger.debug(
-                "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+                "_get_backfill_events(room_id=%s): connected_prev_event_backfill_results=%s",
+                room_id,
+                connected_prev_event_backfill_results,
             )
+            for (
+                connected_prev_event_backfill_item
+            ) in connected_prev_event_backfill_results:
+                if connected_prev_event_backfill_item.event_id not in event_id_results:
+                    queue.put(
+                        (
+                            -connected_prev_event_backfill_item.depth,
+                            -connected_prev_event_backfill_item.stream_ordering,
+                            connected_prev_event_backfill_item.event_id,
+                            connected_prev_event_backfill_item.type,
+                        )
+                    )
 
-            for row in prev_event_id_results:
-                if row[1] not in event_results:
-                    queue.put((-row[0], row[1]))
-
-        return event_results
+        return event_id_results
 
     async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
         ids = await self.db_pool.runInteraction(
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b7554154ac..b804185c40 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2215,9 +2215,14 @@ class PersistEventsStore:
             "   SELECT 1 FROM event_backward_extremities"
             "   WHERE event_id = ? AND room_id = ?"
             " )"
+            # 1. Don't add an event as a extremity again if we already persisted it
+            # as a non-outlier.
+            # 2. Don't add an outlier as an extremity if it has no prev_events
             " AND NOT EXISTS ("
-            "   SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            "   AND outlier = ?"
+            "   SELECT 1 FROM events"
+            "   LEFT JOIN event_edges edge"
+            "   ON edge.event_id = events.event_id"
+            "   WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)"
             " )"
         )
 
@@ -2243,6 +2248,10 @@ class PersistEventsStore:
                 (ev.event_id, ev.room_id)
                 for ev in events
                 if not ev.internal_metadata.is_outlier()
+                # If we encountered an event with no prev_events, then we might
+                # as well remove it now because it won't ever have anything else
+                # to backfill from.
+                or len(ev.prev_event_ids()) == 0
             ],
         )