summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/event_federation.py118
1 files changed, 69 insertions, 49 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 8107cfa53a..5d9fae48e9 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1106,20 +1106,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             _get_connected_prev_event_backfill_results_txn,
         )
 
-    async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
+    async def get_backfill_events(
+        self, room_id: str, seed_event_id_list: list, limit: int
+    ):
         """Get a list of Events for a given topic that occurred before (and
-        including) the events in event_list. Return a list of max size `limit`
+        including) the events in seed_event_id_list. Return a list of max size `limit`
 
         Args:
             room_id
-            event_list
+            seed_event_id_list
             limit
         """
         event_ids = await self.db_pool.runInteraction(
             "get_backfill_events",
             self._get_backfill_events,
             room_id,
-            event_list,
+            seed_event_id_list,
             limit,
         )
         events = await self.get_events_as_list(event_ids)
@@ -1127,10 +1129,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
         )
 
-    def _get_backfill_events(self, txn, room_id, event_list, limit):
-        logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
+    def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
+        logger.info(
+            "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
+            room_id,
+            seed_event_id_list,
+            limit,
+        )
 
-        event_results = set()
+        event_id_results = set()
 
         # We want to make sure that we do a breadth-first, "depth" ordered
         # search.
@@ -1181,11 +1188,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # going backwards in time. stream_ordering follows the same pattern.
         queue = PriorityQueue()
 
-        for event_id in event_list:
+        for seed_event_id in seed_event_id_list:
             event_lookup_result = self.db_pool.simple_select_one_txn(
                 txn,
                 table="events",
-                keyvalues={"event_id": event_id, "room_id": room_id},
+                keyvalues={"event_id": seed_event_id, "room_id": room_id},
                 retcols=(
                     "type",
                     "depth",
@@ -1194,57 +1201,66 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                 allow_none=True,
             )
 
+            logger.info(
+                "get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
+                room_id,
+                seed_event_id,
+                event_lookup_result["depth"],
+                event_lookup_result["stream_ordering"],
+                event_lookup_result["type"],
+            )
+
             if event_lookup_result["depth"]:
                 queue.put(
                     (
                         -event_lookup_result["depth"],
                         -event_lookup_result["stream_ordering"],
-                        event_id,
+                        seed_event_id,
                         event_lookup_result["type"],
                     )
                 )
 
-        while not queue.empty() and len(event_results) < limit:
+        while not queue.empty() and len(event_id_results) < limit:
             try:
                 _, _, event_id, event_type = queue.get_nowait()
             except Empty:
                 break
 
-            if event_id in event_results:
+            if event_id in event_id_results:
                 continue
 
-            event_results.add(event_id)
+            event_id_results.add(event_id)
 
+            # Try and find any potential historical batches of message history.
             if self.hs.config.experimental.msc2716_enabled:
-                # Try and find any potential historical batches of message history.
-                #
-                # First we look for an insertion event connected to the current
-                # event (by prev_event). If we find any, we'll add them to the queue
-                # and navigate up the DAG like normal in the next iteration of the
-                # loop.
-                txn.execute(
-                    connected_insertion_event_query,
-                    (event_id, limit - len(event_results)),
-                )
-                connected_insertion_event_id_results = txn.fetchall()
-                logger.debug(
-                    "_get_backfill_events: connected_insertion_event_query %s",
-                    connected_insertion_event_id_results,
-                )
-                for row in connected_insertion_event_id_results:
-                    connected_insertion_event_depth = row[0]
-                    connected_insertion_event_stream_ordering = row[1]
-                    connected_insertion_event_id = row[2]
-                    connected_insertion_event_type = row[3]
-                    if connected_insertion_event_id not in event_results:
-                        queue.put(
-                            (
-                                -connected_insertion_event_depth,
-                                -connected_insertion_event_stream_ordering,
-                                connected_insertion_event_id,
-                                connected_insertion_event_type,
-                            )
-                        )
+                # # First we look for an insertion event connected to the current
+                # # event (by prev_event). If we find any, we'll add them to the queue
+                # # and navigate up the DAG like normal in the next iteration of the
+                # # loop.
+                # txn.execute(
+                #     connected_insertion_event_query,
+                #     (event_id, limit - len(event_id_results)),
+                # )
+                # connected_insertion_event_id_results = txn.fetchall()
+                # logger.debug(
+                #     "_get_backfill_events(room_id=%s): connected_insertion_event_query %s",
+                #     room_id,
+                #     connected_insertion_event_id_results,
+                # )
+                # for row in connected_insertion_event_id_results:
+                #     connected_insertion_event_depth = row[0]
+                #     connected_insertion_event_stream_ordering = row[1]
+                #     connected_insertion_event_id = row[2]
+                #     connected_insertion_event_type = row[3]
+                #     if connected_insertion_event_id not in event_id_results:
+                #         queue.put(
+                #             (
+                #                 -connected_insertion_event_depth,
+                #                 -connected_insertion_event_stream_ordering,
+                #                 connected_insertion_event_id,
+                #                 connected_insertion_event_type,
+                #             )
+                #         )
 
                 # Second, we need to go and try to find any batch events connected
                 # to a given insertion event (by batch_id). If we find any, we'll
@@ -1254,31 +1270,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                     # Find any batch connections for the given insertion event
                     txn.execute(
                         batch_connection_query,
-                        (event_id, limit - len(event_results)),
+                        (event_id, limit - len(event_id_results)),
                     )
                     batch_start_event_id_results = txn.fetchall()
                     logger.debug(
-                        "_get_backfill_events: batch_start_event_id_results %s",
+                        "_get_backfill_events(room_id=%s): batch_start_event_id_results %s",
+                        room_id,
                         batch_start_event_id_results,
                     )
                     for row in batch_start_event_id_results:
-                        if row[2] not in event_results:
+                        if row[2] not in event_id_results:
                             queue.put((-row[0], -row[1], row[2], row[3]))
 
+            # Now we just look up the DAG by prev_events as normal
             txn.execute(
                 connected_prev_event_query,
-                (event_id, False, limit - len(event_results)),
+                (event_id, False, limit - len(event_id_results)),
             )
             prev_event_id_results = txn.fetchall()
             logger.debug(
-                "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+                "_get_backfill_events(room_id=%s): prev_event_ids %s",
+                room_id,
+                prev_event_id_results,
             )
 
             for row in prev_event_id_results:
-                if row[2] not in event_results:
+                if row[2] not in event_id_results:
                     queue.put((-row[0], -row[1], row[2], row[3]))
 
-        return event_results
+        return event_id_results
 
     async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
         ids = await self.db_pool.runInteraction(