summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py184
1 files changed, 134 insertions, 50 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py

index e28c74daf0..c2163c7e53 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -62,6 +62,7 @@ from synapse.types import JsonDict, StateMap, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server +from synapse.storage.databases.main.event_federation import BackfillQueueNavigationItem if TYPE_CHECKING: from synapse.server import HomeServer @@ -1043,8 +1044,16 @@ class FederationHandler: return [] async def get_backfill_events( - self, room_id: str, event_id_list: list, limit: int + self, origin: str, room_id: str, event_id_list: list, limit: int ) -> List[EventBase]: + logger.info( + "get_backfill_events(room_id=%s): seeding backfill with event_id_list=%s limit=%s origin=%s", + room_id, + event_id_list, + limit, + origin, + ) + event_id_results = set() # In a PriorityQueue, the lowest valued entries are retrieved first. @@ -1054,8 +1063,20 @@ class FederationHandler: # negative depth so that we process the newest-in-time messages first # going backwards in time. stream_ordering follows the same pattern. queue = PriorityQueue() - seed_events = await self.store.get_events_as_list(event_id_list) + logger.info( + "get_backfill_events(room_id=%s): seed_events=%s", + room_id, + [ + BackfillQueueNavigationItem( + depth=seed_event.depth, + stream_ordering=seed_event.internal_metadata.stream_ordering, + event_id=seed_event.event_id, + type=seed_event.type, + ) + for seed_event in seed_events + ], + ) for seed_event in seed_events: # Make sure the seed event actually pertains to this room. We also # need to make sure the depth is available since our whole DAG @@ -1079,8 +1100,7 @@ class FederationHandler: if event_id in event_id_results: continue - event_id_results.add(event_id) - + found_undiscovered_connected_historical_messages = False if self.hs.config.experimental.msc2716_enabled: # Try and find any potential historical batches of message history. # @@ -1093,8 +1113,10 @@ class FederationHandler: event_id, limit - len(event_id_results) ) ) - logger.debug( - "_get_backfill_events: connected_insertion_event_backfill_results=%s", + logger.info( + "get_backfill_events(room_id=%s): connected_insertion_event_backfill_results(%s)=%s", + room_id, + event_id, connected_insertion_event_backfill_results, ) for ( @@ -1104,15 +1126,63 @@ class FederationHandler: connected_insertion_event_backfill_item.event_id not in event_id_results ): - queue.put( - ( - -connected_insertion_event_backfill_item.depth, - -connected_insertion_event_backfill_item.stream_ordering, + # Check whether the insertion event is already on the + # federating homeserver we're trying to send backfill + # events to + room_version = await self.store.get_room_version(room_id) + event_exists_on_remote_server = None + try: + # Because of the nature of backfill giving events to + # the federated homeserver in one chunk and then we + # can possibly query about that same event in the + # next chunk, we need to avoid getting a cached + # response. We want to know *now* whether they have + # backfilled the insertion event. + event_exists_on_remote_server = await self.federation_client.get_pdu_from_destination_raw( + origin, connected_insertion_event_backfill_item.event_id, - connected_insertion_event_backfill_item.type, + room_version=room_version, + outlier=True, + timeout=10000, ) + except Exception as e: + logger.info( + "get_backfill_events(room_id=%s): Failed to fetch insertion event_id=%s from origin=%s but we're just going to assume it's not backfilled there yet. error=%s", + room_id, + connected_insertion_event_backfill_item.event_id, + origin, + e, + ) + + logger.info( + "get_backfill_events(room_id=%s): checked if insertion event_id=%s exists on federated homeserver(origin=%s) already? event_exists_on_remote_server=%s", + room_id, + connected_insertion_event_backfill_item.event_id, + origin, + event_exists_on_remote_server, ) + # If the event is already on the federated homeserver, + # we don't need to try to branch off onto this + # historical chain of messages. Below, we will instead + # just go up the `prev_events` as normal. + # + # This is important so that the first time we backfill + # the federated homeserver, we jump off and go down the + # historical branch. But after the historical branch is + # exhausted and the event comes up again in backfill, we + # will choose the "live" DAG. + if not event_exists_on_remote_server: + found_undiscovered_connected_historical_messages = True + queue.put( + ( + -connected_insertion_event_backfill_item.depth, + -connected_insertion_event_backfill_item.stream_ordering, + connected_insertion_event_backfill_item.event_id, + connected_insertion_event_backfill_item.type, + ) + ) + # Second, we need to go and try to find any batch events connected # to a given insertion event (by batch_id). If we find any, we'll # add them to the queue and navigate up the DAG like normal in the @@ -1123,8 +1193,10 @@ class FederationHandler: event_id, limit - len(event_id_results) ) ) - logger.debug( - "_get_backfill_events: connected_batch_event_backfill_results %s", + logger.info( + "get_backfill_events(room_id=%s): connected_batch_event_backfill_results(%s)=%s", + room_id, + event_id, connected_batch_event_backfill_results, ) for ( @@ -1143,28 +1215,39 @@ class FederationHandler: ) ) - # Now we just look up the DAG by prev_events as normal - connected_prev_event_backfill_results = ( - await self.store.get_connected_prev_event_backfill_results( - event_id, limit - len(event_id_results) + # If we found a historical branch of history off of the message lets + # navigate down that in the next iteration of the loop instead of + # the normal prev_event chain. + if not found_undiscovered_connected_historical_messages: + event_id_results.add(event_id) + + # Now we just look up the DAG by prev_events as normal + connected_prev_event_backfill_results = ( + await self.store.get_connected_prev_event_backfill_results( + event_id, limit - len(event_id_results) + ) ) - ) - logger.debug( - "_get_backfill_events: prev_event_ids %s", - connected_prev_event_backfill_results, - ) - for ( - connected_prev_event_backfill_item - ) in connected_prev_event_backfill_results: - if connected_prev_event_backfill_item.event_id not in event_id_results: - queue.put( - ( - -connected_prev_event_backfill_item.depth, - -connected_prev_event_backfill_item.stream_ordering, - connected_prev_event_backfill_item.event_id, - connected_prev_event_backfill_item.type, + logger.info( + "get_backfill_events(room_id=%s): connected_prev_event_backfill_results(%s)=%s", + room_id, + event_id, + connected_prev_event_backfill_results, + ) + for ( + connected_prev_event_backfill_item + ) in connected_prev_event_backfill_results: + if ( + connected_prev_event_backfill_item.event_id + not in event_id_results + ): + queue.put( + ( + -connected_prev_event_backfill_item.depth, + -connected_prev_event_backfill_item.stream_ordering, + connected_prev_event_backfill_item.event_id, + connected_prev_event_backfill_item.type, + ) ) - ) events = await self.store.get_events_as_list(event_id_results) return sorted( @@ -1182,24 +1265,25 @@ class FederationHandler: # Synapse asks for 100 events per backfill request. Do not allow more. limit = min(limit, 100) - events = await self.store.get_backfill_events(room_id, pdu_list, limit) - logger.info( - "old implementation backfill events=%s", - [ - "event_id=%s,depth=%d,body=%s,prevs=%s\n" - % ( - event.event_id, - event.depth, - event.content.get("body", event.type), - event.prev_event_ids(), - ) - for event in events - ], - ) - - events = await self.get_backfill_events(room_id, pdu_list, limit) + # events = await self.store.get_backfill_events(room_id, pdu_list, limit) + # logger.info( + # "old implementation backfill events=%s", + # [ + # "event_id=%s,depth=%d,body=%s,prevs=%s\n" + # % ( + # event.event_id, + # event.depth, + # event.content.get("body", event.type), + # event.prev_event_ids(), + # ) + # for event in events + # ], + # ) + + events = await self.get_backfill_events(origin, room_id, pdu_list, limit) logger.info( - "new implementation backfill events=%s", + "new implementation backfill events(%d)=%s", + len(events), [ "event_id=%s,depth=%d,body=%s,prevs=%s\n" % (