summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/federation.py227
-rw-r--r--synapse/storage/databases/main/event_federation.py297
2 files changed, 103 insertions, 421 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py

index 21c615432a..2dc5e64a39 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -1043,217 +1043,6 @@ class FederationHandler: else: return [] - async def get_backfill_events( - self, origin: str, room_id: str, event_id_list: list, limit: int - ) -> List[EventBase]: - logger.info( - "get_backfill_events(room_id=%s): seeding backfill with event_id_list=%s limit=%s origin=%s", - room_id, - event_id_list, - limit, - origin, - ) - - event_id_results = set() - - # In a PriorityQueue, the lowest valued entries are retrieved first. - # We're using depth as the priority in the queue and tie-break based on - # stream_ordering. Depth is lowest at the oldest-in-time message and - # highest and newest-in-time message. We add events to the queue with a - # negative depth so that we process the newest-in-time messages first - # going backwards in time. stream_ordering follows the same pattern. - queue = PriorityQueue() - seed_events = await self.store.get_events_as_list(event_id_list) - logger.info( - "get_backfill_events(room_id=%s): seed_events=%s", - room_id, - [ - BackfillQueueNavigationItem( - depth=seed_event.depth, - stream_ordering=seed_event.internal_metadata.stream_ordering, - event_id=seed_event.event_id, - type=seed_event.type, - ) - for seed_event in seed_events - ], - ) - for seed_event in seed_events: - # Make sure the seed event actually pertains to this room. We also - # need to make sure the depth is available since our whole DAG - # navigation here depends on depth. - if seed_event.room_id == room_id and seed_event.depth: - queue.put( - ( - -seed_event.depth, - -seed_event.internal_metadata.stream_ordering, - seed_event.event_id, - seed_event.type, - ) - ) - - while not queue.empty() and len(event_id_results) < limit: - try: - _, _, event_id, event_type = queue.get_nowait() - except Empty: - break - - if event_id in event_id_results: - continue - - found_undiscovered_connected_historical_messages = False - if self.hs.config.experimental.msc2716_enabled: - # Try and find any potential historical batches of message history. - # - # First we look for an insertion event connected to the current - # event (by prev_event). If we find any, we'll add them to the queue - # and navigate up the DAG like normal in the next iteration of the - # loop. - connected_insertion_event_backfill_results = ( - await self.store.get_connected_insertion_event_backfill_results( - event_id, limit - len(event_id_results) - ) - ) - logger.info( - "get_backfill_events(room_id=%s): connected_insertion_event_backfill_results(%s)=%s", - room_id, - event_id, - connected_insertion_event_backfill_results, - ) - for ( - connected_insertion_event_backfill_item - ) in connected_insertion_event_backfill_results: - if ( - connected_insertion_event_backfill_item.event_id - not in event_id_results - ): - # Check whether the insertion event is already on the - # federating homeserver we're trying to send backfill - # events to - room_version = await self.store.get_room_version(room_id) - event_exists_on_remote_server = None - try: - # Because of the nature of backfill giving events to - # the federated homeserver in one chunk and then we - # can possibly query about that same event in the - # next chunk, we need to avoid getting a cached - # response. We want to know *now* whether they have - # backfilled the insertion event. - event_exists_on_remote_server = await self.federation_client.get_pdu_from_destination_raw( - origin, - connected_insertion_event_backfill_item.event_id, - room_version=room_version, - outlier=True, - timeout=10000, - ) - except Exception as e: - logger.info( - "get_backfill_events(room_id=%s): Failed to fetch insertion event_id=%s from origin=%s but we're just going to assume it's not backfilled there yet. error=%s", - room_id, - connected_insertion_event_backfill_item.event_id, - origin, - e, - ) - - logger.info( - "get_backfill_events(room_id=%s): checked if insertion event_id=%s exists on federated homeserver(origin=%s) already? event_exists_on_remote_server=%s", - room_id, - connected_insertion_event_backfill_item.event_id, - origin, - event_exists_on_remote_server, - ) - - # If the event is already on the federated homeserver, - # we don't need to try to branch off onto this - # historical chain of messages. Below, we will instead - # just go up the `prev_events` as normal. - # - # This is important so that the first time we backfill - # the federated homeserver, we jump off and go down the - # historical branch. But after the historical branch is - # exhausted and the event comes up again in backfill, we - # will choose the "live" DAG. - if not event_exists_on_remote_server: - found_undiscovered_connected_historical_messages = True - queue.put( - ( - -connected_insertion_event_backfill_item.depth, - -connected_insertion_event_backfill_item.stream_ordering, - connected_insertion_event_backfill_item.event_id, - connected_insertion_event_backfill_item.type, - ) - ) - - # Second, we need to go and try to find any batch events connected - # to a given insertion event (by batch_id). If we find any, we'll - # add them to the queue and navigate up the DAG like normal in the - # next iteration of the loop. - if event_type == EventTypes.MSC2716_INSERTION: - connected_batch_event_backfill_results = ( - await self.store.get_connected_batch_event_backfill_results( - event_id, limit - len(event_id_results) - ) - ) - logger.info( - "get_backfill_events(room_id=%s): connected_batch_event_backfill_results(%s)=%s", - room_id, - event_id, - connected_batch_event_backfill_results, - ) - for ( - connected_batch_event_backfill_item - ) in connected_batch_event_backfill_results: - if ( - connected_batch_event_backfill_item.event_id - not in event_id_results - ): - queue.put( - ( - -connected_batch_event_backfill_item.depth, - -connected_batch_event_backfill_item.stream_ordering, - connected_batch_event_backfill_item.event_id, - connected_batch_event_backfill_item.type, - ) - ) - - # If we found a historical branch of history off of the message lets - # navigate down that in the next iteration of the loop instead of - # the normal prev_event chain. - if not found_undiscovered_connected_historical_messages: - event_id_results.add(event_id) - - # Now we just look up the DAG by prev_events as normal - connected_prev_event_backfill_results = ( - await self.store.get_connected_prev_event_backfill_results( - event_id, limit - len(event_id_results) - ) - ) - logger.info( - "get_backfill_events(room_id=%s): connected_prev_event_backfill_results(%s)=%s", - room_id, - event_id, - connected_prev_event_backfill_results, - ) - for ( - connected_prev_event_backfill_item - ) in connected_prev_event_backfill_results: - if ( - connected_prev_event_backfill_item.event_id - not in event_id_results - ): - queue.put( - ( - -connected_prev_event_backfill_item.depth, - -connected_prev_event_backfill_item.stream_ordering, - connected_prev_event_backfill_item.event_id, - connected_prev_event_backfill_item.type, - ) - ) - - events = await self.store.get_events_as_list(event_id_results) - return sorted( - events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering) - ) - @log_function async def on_backfill_request( self, origin: str, room_id: str, pdu_list: List[str], limit: int @@ -1280,22 +1069,6 @@ class FederationHandler: ], ) - # events = await self.get_backfill_events(origin, room_id, pdu_list, limit) - # logger.info( - # "new implementation backfill events(%d)=%s", - # len(events), - # [ - # "event_id=%s,depth=%d,body=%s,prevs=%s\n" - # % ( - # event.event_id, - # event.depth, - # event.content.get("body", event.type), - # event.prev_event_ids(), - # ) - # for event in events - # ], - # ) - events = await filter_events_for_server(self.storage, origin, events) return events diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 5d9fae48e9..299af0ded2 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -995,116 +995,70 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn ) - async def get_connected_insertion_event_backfill_results( - self, event_id: str, limit: int - ) -> List[BackfillQueueNavigationItem]: - def _get_connected_insertion_event_backfill_results_txn(txn): - # Look for the "insertion" events connected to the given event_id - connected_insertion_event_query = """ - SELECT e.depth, e.stream_ordering, i.event_id, e.type FROM insertion_event_edges AS i - /* Get the depth of the insertion event from the events table */ - INNER JOIN events AS e USING (event_id) - /* Find an insertion event which points via prev_events to the given event_id */ - WHERE i.insertion_prev_event_id = ? - LIMIT ? - """ - - txn.execute( - connected_insertion_event_query, - (event_id, limit), - ) - connected_insertion_event_id_results = txn.fetchall() - return [ - BackfillQueueNavigationItem( - depth=row[0], - stream_ordering=row[1], - event_id=row[2], - type=row[3], - ) - for row in connected_insertion_event_id_results - ] + def _get_connected_batch_event_backfill_results_txn( + self, txn: LoggingTransaction, insertion_event_id: str, limit: int + ): + # Find any batch connections of a given insertion event + batch_connection_query = """ + SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i + /* Find the batch that connects to the given insertion event */ + INNER JOIN batch_events AS c + ON i.next_batch_id = c.batch_id + /* Get the depth of the batch start event from the events table */ + INNER JOIN events AS e USING (event_id) + /* Find an insertion event which matches the given event_id */ + WHERE i.event_id = ? + LIMIT ? + """ - return await self.db_pool.runInteraction( - "get_connected_insertion_event_backfill_results", - _get_connected_insertion_event_backfill_results_txn, + # Find any batch connections for the given insertion event + txn.execute( + batch_connection_query, + (insertion_event_id, limit), ) - - async def get_connected_batch_event_backfill_results( - self, insertion_event_id: str, limit: int - ) -> List[BackfillQueueNavigationItem]: - def _get_connected_batch_event_backfill_results_txn(txn): - # Find any batch connections of a given insertion event - batch_connection_query = """ - SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i - /* Find the batch that connects to the given insertion event */ - INNER JOIN batch_events AS c - ON i.next_batch_id = c.batch_id - /* Get the depth of the batch start event from the events table */ - INNER JOIN events AS e USING (event_id) - /* Find an insertion event which matches the given event_id */ - WHERE i.event_id = ? - LIMIT ? - """ - - # Find any batch connections for the given insertion event - txn.execute( - batch_connection_query, - (insertion_event_id, limit), + batch_start_event_id_results = txn.fetchall() + return [ + BackfillQueueNavigationItem( + depth=row[0], + stream_ordering=row[1], + event_id=row[2], + type=row[3], ) - batch_start_event_id_results = txn.fetchall() - return [ - BackfillQueueNavigationItem( - depth=row[0], - stream_ordering=row[1], - event_id=row[2], - type=row[3], - ) - for row in batch_start_event_id_results - ] + for row in batch_start_event_id_results + ] - return await self.db_pool.runInteraction( - "get_connected_batch_event_backfill_results", - _get_connected_batch_event_backfill_results_txn, - ) - - async def get_connected_prev_event_backfill_results( - self, event_id: str, limit: int - ) -> List[BackfillQueueNavigationItem]: - def _get_connected_prev_event_backfill_results_txn(txn): - # Look for the prev_event_id connected to the given event_id - connected_prev_event_query = """ - SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges - /* Get the depth and stream_ordering of the prev_event_id from the events table */ - INNER JOIN events - ON prev_event_id = events.event_id - /* Look for an edge which matches the given event_id */ - WHERE event_edges.event_id = ? - AND event_edges.is_state = ? - /* Because we can have many events at the same depth, - * we want to also tie-break and sort on stream_ordering */ - ORDER BY depth DESC, stream_ordering DESC - LIMIT ? - """ - - txn.execute( - connected_prev_event_query, - (event_id, False, limit), - ) - prev_event_id_results = txn.fetchall() - return [ - BackfillQueueNavigationItem( - depth=row[0], - stream_ordering=row[1], - event_id=row[2], - type=row[3], - ) - for row in prev_event_id_results - ] + def _get_connected_prev_event_backfill_results_txn( + self, txn: LoggingTransaction, event_id: str, limit: int + ): + # Look for the prev_event_id connected to the given event_id + connected_prev_event_query = """ + SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges + /* Get the depth and stream_ordering of the prev_event_id from the events table */ + INNER JOIN events + ON prev_event_id = events.event_id + /* Look for an edge which matches the given event_id */ + WHERE event_edges.event_id = ? + AND event_edges.is_state = ? + /* Because we can have many events at the same depth, + * we want to also tie-break and sort on stream_ordering */ + ORDER BY depth DESC, stream_ordering DESC + LIMIT ? + """ - return await self.db_pool.runInteraction( - "get_connected_prev_event_backfill_results", - _get_connected_prev_event_backfill_results_txn, + txn.execute( + connected_prev_event_query, + (event_id, False, limit), ) + prev_event_id_results = txn.fetchall() + return [ + BackfillQueueNavigationItem( + depth=row[0], + stream_ordering=row[1], + event_id=row[2], + type=row[3], + ) + for row in prev_event_id_results + ] async def get_backfill_events( self, room_id: str, seed_event_id_list: list, limit: int @@ -1130,6 +1084,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas ) def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit): + """ + We want to make sure that we do a breadth-first, "depth" ordered search. + We also handle navigating historical branches of history connected by + insertion and batch events. + """ logger.info( "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s", room_id, @@ -1139,47 +1098,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas event_id_results = set() - # We want to make sure that we do a breadth-first, "depth" ordered - # search. - - # Look for the prev_event_id connected to the given event_id - connected_prev_event_query = """ - SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges - /* Get the depth and stream_ordering of the prev_event_id from the events table */ - INNER JOIN events - ON prev_event_id = events.event_id - /* Look for an edge which matches the given event_id */ - WHERE event_edges.event_id = ? - AND event_edges.is_state = ? - /* Because we can have many events at the same depth, - * we want to also tie-break and sort on stream_ordering */ - ORDER BY depth DESC, stream_ordering DESC - LIMIT ? - """ - - # Look for the "insertion" events connected to the given event_id - connected_insertion_event_query = """ - SELECT e.depth, e.stream_ordering, i.event_id, e.type FROM insertion_event_edges AS i - /* Get the depth of the insertion event from the events table */ - INNER JOIN events AS e USING (event_id) - /* Find an insertion event which points via prev_events to the given event_id */ - WHERE i.insertion_prev_event_id = ? - LIMIT ? - """ - - # Find any batch connections of a given insertion event - batch_connection_query = """ - SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i - /* Find the batch that connects to the given insertion event */ - INNER JOIN batch_events AS c - ON i.next_batch_id = c.batch_id - /* Get the depth of the batch start event from the events table */ - INNER JOIN events AS e USING (event_id) - /* Find an insertion event which matches the given event_id */ - WHERE i.event_id = ? - LIMIT ? - """ - # In a PriorityQueue, the lowest valued entries are retrieved first. # We're using depth as the priority in the queue and tie-break based on # stream_ordering. Depth is lowest at the oldest-in-time message and @@ -1233,70 +1151,61 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # Try and find any potential historical batches of message history. if self.hs.config.experimental.msc2716_enabled: - # # First we look for an insertion event connected to the current - # # event (by prev_event). If we find any, we'll add them to the queue - # # and navigate up the DAG like normal in the next iteration of the - # # loop. - # txn.execute( - # connected_insertion_event_query, - # (event_id, limit - len(event_id_results)), - # ) - # connected_insertion_event_id_results = txn.fetchall() - # logger.debug( - # "_get_backfill_events(room_id=%s): connected_insertion_event_query %s", - # room_id, - # connected_insertion_event_id_results, - # ) - # for row in connected_insertion_event_id_results: - # connected_insertion_event_depth = row[0] - # connected_insertion_event_stream_ordering = row[1] - # connected_insertion_event_id = row[2] - # connected_insertion_event_type = row[3] - # if connected_insertion_event_id not in event_id_results: - # queue.put( - # ( - # -connected_insertion_event_depth, - # -connected_insertion_event_stream_ordering, - # connected_insertion_event_id, - # connected_insertion_event_type, - # ) - # ) - - # Second, we need to go and try to find any batch events connected + # We need to go and try to find any batch events connected # to a given insertion event (by batch_id). If we find any, we'll # add them to the queue and navigate up the DAG like normal in the # next iteration of the loop. if event_type == EventTypes.MSC2716_INSERTION: # Find any batch connections for the given insertion event - txn.execute( - batch_connection_query, - (event_id, limit - len(event_id_results)), + connected_batch_event_backfill_results = ( + self._get_connected_batch_event_backfill_results_txn( + txn, event_id, limit - len(event_id_results) + ) ) - batch_start_event_id_results = txn.fetchall() logger.debug( - "_get_backfill_events(room_id=%s): batch_start_event_id_results %s", + "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s", room_id, - batch_start_event_id_results, + connected_batch_event_backfill_results, ) - for row in batch_start_event_id_results: - if row[2] not in event_id_results: - queue.put((-row[0], -row[1], row[2], row[3])) + for ( + connected_batch_event_backfill_item + ) in connected_batch_event_backfill_results: + if ( + connected_batch_event_backfill_item.event_id + not in event_id_results + ): + queue.put( + ( + -connected_batch_event_backfill_item.depth, + -connected_batch_event_backfill_item.stream_ordering, + connected_batch_event_backfill_item.event_id, + connected_batch_event_backfill_item.type, + ) + ) # Now we just look up the DAG by prev_events as normal - txn.execute( - connected_prev_event_query, - (event_id, False, limit - len(event_id_results)), + connected_prev_event_backfill_results = ( + self._get_connected_prev_event_backfill_results_txn( + txn, event_id, limit - len(event_id_results) + ) ) - prev_event_id_results = txn.fetchall() logger.debug( - "_get_backfill_events(room_id=%s): prev_event_ids %s", + "_get_backfill_events(room_id=%s): connected_prev_event_backfill_results=%s", room_id, - prev_event_id_results, + connected_prev_event_backfill_results, ) - - for row in prev_event_id_results: - if row[2] not in event_id_results: - queue.put((-row[0], -row[1], row[2], row[3])) + for ( + connected_prev_event_backfill_item + ) in connected_prev_event_backfill_results: + if connected_prev_event_backfill_item.event_id not in event_id_results: + queue.put( + ( + -connected_prev_event_backfill_item.depth, + -connected_prev_event_backfill_item.stream_ordering, + connected_prev_event_backfill_item.event_id, + connected_prev_event_backfill_item.type, + ) + ) return event_id_results