diff options
author | Eric Eastwood <erice@element.io> | 2021-11-02 17:27:04 -0500 |
---|---|---|
committer | Eric Eastwood <erice@element.io> | 2021-11-02 17:27:04 -0500 |
commit | 321f9ea68b4b87da91dc96c22f7c466a91d46c68 (patch) | |
tree | a7e546ea02f3c81db6f1117bbfa3a8ff2ab2f0c2 /synapse/storage/databases | |
parent | Fix lints (diff) | |
download | synapse-321f9ea68b4b87da91dc96c22f7c466a91d46c68.tar.xz |
Move back to the old get_backfill_events and simplify backfill.
We now rely on the marker events to backfill the base insertion event which puts it as a insertion event extremity. This functionality was already in place (see `handle_marker_event`) and was an easy transition. This way, remote federated homeserver will have the insertion extremity to ask about in backfill and goes down the historical branch no problem because of the depth order and the rest of the DAG navigation happens as normal. Yay simplification! The key breakthrough was discussing all the ways we can find connected insertion events. https://docs.google.com/document/d/1KCEmpnGr4J-I8EeaVQ8QJZKBDu53ViI7V62y5BzfXr0/edit#bookmark=id.1hbt9acs963h The three options we came up were: - Find by insertion event prev_events (this is what we were doing before) - Find connected insertion events by depth - Find connected insertion events by the marker event - This made the most sense since we already backfill the insertion event when a marker event is processed (see `handle_marker_event`). - Gets rid of the extra insertion event lookup in backfill because we know it's already backfilled from the marker processing. - And gets rid of the extra federated lookup we added in this to PR to ask whether the homeserver requesting backfill already has the insertion event (deciding whether we fork to the history branch before we go down the "live" DAG)
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 118 |
1 files changed, 69 insertions, 49 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 8107cfa53a..5d9fae48e9 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1106,20 +1106,22 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas _get_connected_prev_event_backfill_results_txn, ) - async def get_backfill_events(self, room_id: str, event_list: list, limit: int): + async def get_backfill_events( + self, room_id: str, seed_event_id_list: list, limit: int + ): """Get a list of Events for a given topic that occurred before (and - including) the events in event_list. Return a list of max size `limit` + including) the events in seed_event_id_list. Return a list of max size `limit` Args: room_id - event_list + seed_event_id_list limit """ event_ids = await self.db_pool.runInteraction( "get_backfill_events", self._get_backfill_events, room_id, - event_list, + seed_event_id_list, limit, ) events = await self.get_events_as_list(event_ids) @@ -1127,10 +1129,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering) ) - def _get_backfill_events(self, txn, room_id, event_list, limit): - logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit) + def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit): + logger.info( + "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s", + room_id, + seed_event_id_list, + limit, + ) - event_results = set() + event_id_results = set() # We want to make sure that we do a breadth-first, "depth" ordered # search. @@ -1181,11 +1188,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # going backwards in time. stream_ordering follows the same pattern. queue = PriorityQueue() - for event_id in event_list: + for seed_event_id in seed_event_id_list: event_lookup_result = self.db_pool.simple_select_one_txn( txn, table="events", - keyvalues={"event_id": event_id, "room_id": room_id}, + keyvalues={"event_id": seed_event_id, "room_id": room_id}, retcols=( "type", "depth", @@ -1194,57 +1201,66 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas allow_none=True, ) + logger.info( + "get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s", + room_id, + seed_event_id, + event_lookup_result["depth"], + event_lookup_result["stream_ordering"], + event_lookup_result["type"], + ) + if event_lookup_result["depth"]: queue.put( ( -event_lookup_result["depth"], -event_lookup_result["stream_ordering"], - event_id, + seed_event_id, event_lookup_result["type"], ) ) - while not queue.empty() and len(event_results) < limit: + while not queue.empty() and len(event_id_results) < limit: try: _, _, event_id, event_type = queue.get_nowait() except Empty: break - if event_id in event_results: + if event_id in event_id_results: continue - event_results.add(event_id) + event_id_results.add(event_id) + # Try and find any potential historical batches of message history. if self.hs.config.experimental.msc2716_enabled: - # Try and find any potential historical batches of message history. - # - # First we look for an insertion event connected to the current - # event (by prev_event). If we find any, we'll add them to the queue - # and navigate up the DAG like normal in the next iteration of the - # loop. - txn.execute( - connected_insertion_event_query, - (event_id, limit - len(event_results)), - ) - connected_insertion_event_id_results = txn.fetchall() - logger.debug( - "_get_backfill_events: connected_insertion_event_query %s", - connected_insertion_event_id_results, - ) - for row in connected_insertion_event_id_results: - connected_insertion_event_depth = row[0] - connected_insertion_event_stream_ordering = row[1] - connected_insertion_event_id = row[2] - connected_insertion_event_type = row[3] - if connected_insertion_event_id not in event_results: - queue.put( - ( - -connected_insertion_event_depth, - -connected_insertion_event_stream_ordering, - connected_insertion_event_id, - connected_insertion_event_type, - ) - ) + # # First we look for an insertion event connected to the current + # # event (by prev_event). If we find any, we'll add them to the queue + # # and navigate up the DAG like normal in the next iteration of the + # # loop. + # txn.execute( + # connected_insertion_event_query, + # (event_id, limit - len(event_id_results)), + # ) + # connected_insertion_event_id_results = txn.fetchall() + # logger.debug( + # "_get_backfill_events(room_id=%s): connected_insertion_event_query %s", + # room_id, + # connected_insertion_event_id_results, + # ) + # for row in connected_insertion_event_id_results: + # connected_insertion_event_depth = row[0] + # connected_insertion_event_stream_ordering = row[1] + # connected_insertion_event_id = row[2] + # connected_insertion_event_type = row[3] + # if connected_insertion_event_id not in event_id_results: + # queue.put( + # ( + # -connected_insertion_event_depth, + # -connected_insertion_event_stream_ordering, + # connected_insertion_event_id, + # connected_insertion_event_type, + # ) + # ) # Second, we need to go and try to find any batch events connected # to a given insertion event (by batch_id). If we find any, we'll @@ -1254,31 +1270,35 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Find any batch connections for the given insertion event txn.execute( batch_connection_query, - (event_id, limit - len(event_results)), + (event_id, limit - len(event_id_results)), ) batch_start_event_id_results = txn.fetchall() logger.debug( - "_get_backfill_events: batch_start_event_id_results %s", + "_get_backfill_events(room_id=%s): batch_start_event_id_results %s", + room_id, batch_start_event_id_results, ) for row in batch_start_event_id_results: - if row[2] not in event_results: + if row[2] not in event_id_results: queue.put((-row[0], -row[1], row[2], row[3])) + # Now we just look up the DAG by prev_events as normal txn.execute( connected_prev_event_query, - (event_id, False, limit - len(event_results)), + (event_id, False, limit - len(event_id_results)), ) prev_event_id_results = txn.fetchall() logger.debug( - "_get_backfill_events: prev_event_ids %s", prev_event_id_results + "_get_backfill_events(room_id=%s): prev_event_ids %s", + room_id, + prev_event_id_results, ) for row in prev_event_id_results: - if row[2] not in event_results: + if row[2] not in event_id_results: queue.put((-row[0], -row[1], row[2], row[3])) - return event_results + return event_id_results async def get_missing_events(self, room_id, earliest_events, latest_events, limit): ids = await self.db_pool.runInteraction( |