summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-11-02 17:27:04 -0500
committerEric Eastwood <erice@element.io>2021-11-02 17:27:04 -0500
commit321f9ea68b4b87da91dc96c22f7c466a91d46c68 (patch)
treea7e546ea02f3c81db6f1117bbfa3a8ff2ab2f0c2 /synapse/storage/databases
parentFix lints (diff)
downloadsynapse-321f9ea68b4b87da91dc96c22f7c466a91d46c68.tar.xz
Move back to the old get_backfill_events and simplify backfill.
We now rely on the marker events to backfill the base insertion event
which puts it as a insertion event extremity. This functionality was
already in place (see `handle_marker_event`) and was an easy transition.
This way, remote federated homeserver will have the insertion extremity
to ask about in backfill and goes down the historical branch no problem
because of the depth order and the rest of the DAG navigation happens as
normal. Yay simplification!

The key breakthrough was discussing all the ways we can find connected insertion events.
https://docs.google.com/document/d/1KCEmpnGr4J-I8EeaVQ8QJZKBDu53ViI7V62y5BzfXr0/edit#bookmark=id.1hbt9acs963h

The three options we came up were:

 - Find by insertion event prev_events (this is what we were doing before)
 - Find connected insertion events by depth
 - Find connected insertion events by the marker event
    - This made the most sense since we already backfill the insertion event
      when a marker event is processed (see `handle_marker_event`).
    - Gets rid of the extra insertion event lookup in backfill because we
      know it's already backfilled from the marker processing.
    - And gets rid of the extra federated lookup we added in this to PR
      to ask whether the homeserver requesting backfill already has the
      insertion event (deciding whether we fork
      to the history branch before we go down the "live" DAG)
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/event_federation.py118
1 files changed, 69 insertions, 49 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 8107cfa53a..5d9fae48e9 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1106,20 +1106,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             _get_connected_prev_event_backfill_results_txn,
         )
 
-    async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
+    async def get_backfill_events(
+        self, room_id: str, seed_event_id_list: list, limit: int
+    ):
         """Get a list of Events for a given topic that occurred before (and
-        including) the events in event_list. Return a list of max size `limit`
+        including) the events in seed_event_id_list. Return a list of max size `limit`
 
         Args:
             room_id
-            event_list
+            seed_event_id_list
             limit
         """
         event_ids = await self.db_pool.runInteraction(
             "get_backfill_events",
             self._get_backfill_events,
             room_id,
-            event_list,
+            seed_event_id_list,
             limit,
         )
         events = await self.get_events_as_list(event_ids)
@@ -1127,10 +1129,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
         )
 
-    def _get_backfill_events(self, txn, room_id, event_list, limit):
-        logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
+    def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
+        logger.info(
+            "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
+            room_id,
+            seed_event_id_list,
+            limit,
+        )
 
-        event_results = set()
+        event_id_results = set()
 
         # We want to make sure that we do a breadth-first, "depth" ordered
         # search.
@@ -1181,11 +1188,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # going backwards in time. stream_ordering follows the same pattern.
         queue = PriorityQueue()
 
-        for event_id in event_list:
+        for seed_event_id in seed_event_id_list:
             event_lookup_result = self.db_pool.simple_select_one_txn(
                 txn,
                 table="events",
-                keyvalues={"event_id": event_id, "room_id": room_id},
+                keyvalues={"event_id": seed_event_id, "room_id": room_id},
                 retcols=(
                     "type",
                     "depth",
@@ -1194,57 +1201,66 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                 allow_none=True,
             )
 
+            logger.info(
+                "get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
+                room_id,
+                seed_event_id,
+                event_lookup_result["depth"],
+                event_lookup_result["stream_ordering"],
+                event_lookup_result["type"],
+            )
+
             if event_lookup_result["depth"]:
                 queue.put(
                     (
                         -event_lookup_result["depth"],
                         -event_lookup_result["stream_ordering"],
-                        event_id,
+                        seed_event_id,
                         event_lookup_result["type"],
                     )
                 )
 
-        while not queue.empty() and len(event_results) < limit:
+        while not queue.empty() and len(event_id_results) < limit:
             try:
                 _, _, event_id, event_type = queue.get_nowait()
             except Empty:
                 break
 
-            if event_id in event_results:
+            if event_id in event_id_results:
                 continue
 
-            event_results.add(event_id)
+            event_id_results.add(event_id)
 
+            # Try and find any potential historical batches of message history.
             if self.hs.config.experimental.msc2716_enabled:
-                # Try and find any potential historical batches of message history.
-                #
-                # First we look for an insertion event connected to the current
-                # event (by prev_event). If we find any, we'll add them to the queue
-                # and navigate up the DAG like normal in the next iteration of the
-                # loop.
-                txn.execute(
-                    connected_insertion_event_query,
-                    (event_id, limit - len(event_results)),
-                )
-                connected_insertion_event_id_results = txn.fetchall()
-                logger.debug(
-                    "_get_backfill_events: connected_insertion_event_query %s",
-                    connected_insertion_event_id_results,
-                )
-                for row in connected_insertion_event_id_results:
-                    connected_insertion_event_depth = row[0]
-                    connected_insertion_event_stream_ordering = row[1]
-                    connected_insertion_event_id = row[2]
-                    connected_insertion_event_type = row[3]
-                    if connected_insertion_event_id not in event_results:
-                        queue.put(
-                            (
-                                -connected_insertion_event_depth,
-                                -connected_insertion_event_stream_ordering,
-                                connected_insertion_event_id,
-                                connected_insertion_event_type,
-                            )
-                        )
+                # # First we look for an insertion event connected to the current
+                # # event (by prev_event). If we find any, we'll add them to the queue
+                # # and navigate up the DAG like normal in the next iteration of the
+                # # loop.
+                # txn.execute(
+                #     connected_insertion_event_query,
+                #     (event_id, limit - len(event_id_results)),
+                # )
+                # connected_insertion_event_id_results = txn.fetchall()
+                # logger.debug(
+                #     "_get_backfill_events(room_id=%s): connected_insertion_event_query %s",
+                #     room_id,
+                #     connected_insertion_event_id_results,
+                # )
+                # for row in connected_insertion_event_id_results:
+                #     connected_insertion_event_depth = row[0]
+                #     connected_insertion_event_stream_ordering = row[1]
+                #     connected_insertion_event_id = row[2]
+                #     connected_insertion_event_type = row[3]
+                #     if connected_insertion_event_id not in event_id_results:
+                #         queue.put(
+                #             (
+                #                 -connected_insertion_event_depth,
+                #                 -connected_insertion_event_stream_ordering,
+                #                 connected_insertion_event_id,
+                #                 connected_insertion_event_type,
+                #             )
+                #         )
 
                 # Second, we need to go and try to find any batch events connected
                 # to a given insertion event (by batch_id). If we find any, we'll
@@ -1254,31 +1270,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                     # Find any batch connections for the given insertion event
                     txn.execute(
                         batch_connection_query,
-                        (event_id, limit - len(event_results)),
+                        (event_id, limit - len(event_id_results)),
                     )
                     batch_start_event_id_results = txn.fetchall()
                     logger.debug(
-                        "_get_backfill_events: batch_start_event_id_results %s",
+                        "_get_backfill_events(room_id=%s): batch_start_event_id_results %s",
+                        room_id,
                         batch_start_event_id_results,
                     )
                     for row in batch_start_event_id_results:
-                        if row[2] not in event_results:
+                        if row[2] not in event_id_results:
                             queue.put((-row[0], -row[1], row[2], row[3]))
 
+            # Now we just look up the DAG by prev_events as normal
             txn.execute(
                 connected_prev_event_query,
-                (event_id, False, limit - len(event_results)),
+                (event_id, False, limit - len(event_id_results)),
             )
             prev_event_id_results = txn.fetchall()
             logger.debug(
-                "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+                "_get_backfill_events(room_id=%s): prev_event_ids %s",
+                room_id,
+                prev_event_id_results,
             )
 
             for row in prev_event_id_results:
-                if row[2] not in event_results:
+                if row[2] not in event_id_results:
                     queue.put((-row[0], -row[1], row[2], row[3]))
 
-        return event_results
+        return event_id_results
 
     async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
         ids = await self.db_pool.runInteraction(