diff options
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 118 |
1 files changed, 69 insertions, 49 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 8107cfa53a..5d9fae48e9 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1106,20 +1106,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas _get_connected_prev_event_backfill_results_txn, ) - async def get_backfill_events(self, room_id: str, event_list: list, limit: int): + async def get_backfill_events( + self, room_id: str, seed_event_id_list: list, limit: int + ): """Get a list of Events for a given topic that occurred before (and - including) the events in event_list. Return a list of max size `limit` + including) the events in seed_event_id_list. Return a list of max size `limit` Args: room_id - event_list + seed_event_id_list limit """ event_ids = await self.db_pool.runInteraction( "get_backfill_events", self._get_backfill_events, room_id, - event_list, + seed_event_id_list, limit, ) events = await self.get_events_as_list(event_ids) @@ -1127,10 +1129,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering) ) - def _get_backfill_events(self, txn, room_id, event_list, limit): - logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit) + def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit): + logger.info( + "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s", + room_id, + seed_event_id_list, + limit, + ) - event_results = set() + event_id_results = set() # We want to make sure that we do a breadth-first, "depth" ordered # search. @@ -1181,11 +1188,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # going backwards in time. stream_ordering follows the same pattern. queue = PriorityQueue() - for event_id in event_list: + for seed_event_id in seed_event_id_list: event_lookup_result = self.db_pool.simple_select_one_txn( txn, table="events", - keyvalues={"event_id": event_id, "room_id": room_id}, + keyvalues={"event_id": seed_event_id, "room_id": room_id}, retcols=( "type", "depth", @@ -1194,57 +1201,66 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas allow_none=True, ) + logger.info( + "get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s", + room_id, + seed_event_id, + event_lookup_result["depth"], + event_lookup_result["stream_ordering"], + event_lookup_result["type"], + ) + if event_lookup_result["depth"]: queue.put( ( -event_lookup_result["depth"], -event_lookup_result["stream_ordering"], - event_id, + seed_event_id, event_lookup_result["type"], ) ) - while not queue.empty() and len(event_results) < limit: + while not queue.empty() and len(event_id_results) < limit: try: _, _, event_id, event_type = queue.get_nowait() except Empty: break - if event_id in event_results: + if event_id in event_id_results: continue - event_results.add(event_id) + event_id_results.add(event_id) + # Try and find any potential historical batches of message history. if self.hs.config.experimental.msc2716_enabled: - # Try and find any potential historical batches of message history. - # - # First we look for an insertion event connected to the current - # event (by prev_event). If we find any, we'll add them to the queue - # and navigate up the DAG like normal in the next iteration of the - # loop. - txn.execute( - connected_insertion_event_query, - (event_id, limit - len(event_results)), - ) - connected_insertion_event_id_results = txn.fetchall() - logger.debug( - "_get_backfill_events: connected_insertion_event_query %s", - connected_insertion_event_id_results, - ) - for row in connected_insertion_event_id_results: - connected_insertion_event_depth = row[0] - connected_insertion_event_stream_ordering = row[1] - connected_insertion_event_id = row[2] - connected_insertion_event_type = row[3] - if connected_insertion_event_id not in event_results: - queue.put( - ( - -connected_insertion_event_depth, - -connected_insertion_event_stream_ordering, - connected_insertion_event_id, - connected_insertion_event_type, - ) - ) + # # First we look for an insertion event connected to the current + # # event (by prev_event). If we find any, we'll add them to the queue + # # and navigate up the DAG like normal in the next iteration of the + # # loop. + # txn.execute( + # connected_insertion_event_query, + # (event_id, limit - len(event_id_results)), + # ) + # connected_insertion_event_id_results = txn.fetchall() + # logger.debug( + # "_get_backfill_events(room_id=%s): connected_insertion_event_query %s", + # room_id, + # connected_insertion_event_id_results, + # ) + # for row in connected_insertion_event_id_results: + # connected_insertion_event_depth = row[0] + # connected_insertion_event_stream_ordering = row[1] + # connected_insertion_event_id = row[2] + # connected_insertion_event_type = row[3] + # if connected_insertion_event_id not in event_id_results: + # queue.put( + # ( + # -connected_insertion_event_depth, + # -connected_insertion_event_stream_ordering, + # connected_insertion_event_id, + # connected_insertion_event_type, + # ) + # ) # Second, we need to go and try to find any batch events connected # to a given insertion event (by batch_id). If we find any, we'll @@ -1254,31 +1270,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Find any batch connections for the given insertion event txn.execute( batch_connection_query, - (event_id, limit - len(event_results)), + (event_id, limit - len(event_id_results)), ) batch_start_event_id_results = txn.fetchall() logger.debug( - "_get_backfill_events: batch_start_event_id_results %s", + "_get_backfill_events(room_id=%s): batch_start_event_id_results %s", + room_id, batch_start_event_id_results, ) for row in batch_start_event_id_results: - if row[2] not in event_results: + if row[2] not in event_id_results: queue.put((-row[0], -row[1], row[2], row[3])) + # Now we just look up the DAG by prev_events as normal txn.execute( connected_prev_event_query, - (event_id, False, limit - len(event_results)), + (event_id, False, limit - len(event_id_results)), ) prev_event_id_results = txn.fetchall() logger.debug( - "_get_backfill_events: prev_event_ids %s", prev_event_id_results + "_get_backfill_events(room_id=%s): prev_event_ids %s", + room_id, + prev_event_id_results, ) for row in prev_event_id_results: - if row[2] not in event_results: + if row[2] not in event_id_results: queue.put((-row[0], -row[1], row[2], row[3])) - return event_results + return event_id_results async def get_missing_events(self, room_id, earliest_events, latest_events, limit): ids = await self.db_pool.runInteraction( |