summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/federation/federation_client.py84
-rw-r--r--synapse/handlers/federation.py184
2 files changed, 196 insertions, 72 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py

index 2ab4dec88f..eafffff2bc 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -264,6 +264,62 @@ class FederationClient(FederationBase): return pdus + async def get_pdu_from_destination_raw( + self, + destination: str, + event_id: str, + room_version: RoomVersion, + outlier: bool = False, + timeout: Optional[int] = None, + ) -> Optional[EventBase]: + """Requests the PDU with given origin and ID from the remote home + server. + + Does not have any caching or rate limiting! + + Args: + destination: Which homeserver to query + event_id: event to fetch + room_version: version of the room + outlier: Indicates whether the PDU is an `outlier`, i.e. if + it's from an arbitrary point in the context as opposed to part + of the current block of PDUs. Defaults to `False` + timeout: How long to try (in ms) each destination for before + moving to the next destination. None indicates no timeout. + + Returns: + The requested PDU, or None if we were unable to find it. + + Raises: + SynapseError, NotRetryingDestination, FederationDeniedError + """ + + signed_pdu = None + + transaction_data = await self.transport_layer.get_event( + destination, event_id, timeout=timeout + ) + + logger.info( + "retrieved event id %s from %s: %r", + event_id, + destination, + transaction_data, + ) + + pdu_list: List[EventBase] = [ + event_from_pdu_json(p, room_version, outlier=outlier) + for p in transaction_data["pdus"] + ] + + if pdu_list and pdu_list[0]: + pdu = pdu_list[0] + + # Check signatures are correct. + signed_pdu = await self._check_sigs_and_hash(room_version, pdu) + + return signed_pdu + async def get_pdu( self, destinations: Iterable[str], @@ -308,30 +364,14 @@ class FederationClient(FederationBase): continue try: - transaction_data = await self.transport_layer.get_event( - destination, event_id, timeout=timeout - ) - - logger.debug( - "retrieved event id %s from %s: %r", - event_id, - destination, - transaction_data, + signed_pdu = await self.get_pdu_from_destination_raw( + destination=destination, + event_id=event_id, + room_version=room_version, + outlier=outlier, + timeout=timeout, ) - pdu_list: List[EventBase] = [ - event_from_pdu_json(p, room_version, outlier=outlier) - for p in transaction_data["pdus"] - ] - - if pdu_list and pdu_list[0]: - pdu = pdu_list[0] - - # Check signatures are correct. - signed_pdu = await self._check_sigs_and_hash(room_version, pdu) - - break - pdu_attempts[destination] = now except SynapseError as e: diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index e28c74daf0..c2163c7e53 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -62,6 +62,7 @@ from synapse.types import JsonDict, StateMap, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server +from synapse.storage.databases.main.event_federation import BackfillQueueNavigationItem if TYPE_CHECKING: from synapse.server import HomeServer @@ -1043,8 +1044,16 @@ class FederationHandler: return [] async def get_backfill_events( - self, room_id: str, event_id_list: list, limit: int + self, origin: str, room_id: str, event_id_list: list, limit: int ) -> List[EventBase]: + logger.info( + "get_backfill_events(room_id=%s): seeding backfill with event_id_list=%s limit=%s origin=%s", + room_id, + event_id_list, + limit, + origin, + ) + event_id_results = set() # In a PriorityQueue, the lowest valued entries are retrieved first. @@ -1054,8 +1063,20 @@ class FederationHandler: # negative depth so that we process the newest-in-time messages first # going backwards in time. stream_ordering follows the same pattern. queue = PriorityQueue() - seed_events = await self.store.get_events_as_list(event_id_list) + logger.info( + "get_backfill_events(room_id=%s): seed_events=%s", + room_id, + [ + BackfillQueueNavigationItem( + depth=seed_event.depth, + stream_ordering=seed_event.internal_metadata.stream_ordering, + event_id=seed_event.event_id, + type=seed_event.type, + ) + for seed_event in seed_events + ], + ) for seed_event in seed_events: # Make sure the seed event actually pertains to this room. We also # need to make sure the depth is available since our whole DAG @@ -1079,8 +1100,7 @@ class FederationHandler: if event_id in event_id_results: continue - event_id_results.add(event_id) - + found_undiscovered_connected_historical_messages = False if self.hs.config.experimental.msc2716_enabled: # Try and find any potential historical batches of message history. # @@ -1093,8 +1113,10 @@ class FederationHandler: event_id, limit - len(event_id_results) ) ) - logger.debug( - "_get_backfill_events: connected_insertion_event_backfill_results=%s", + logger.info( + "get_backfill_events(room_id=%s): connected_insertion_event_backfill_results(%s)=%s", + room_id, + event_id, connected_insertion_event_backfill_results, ) for ( @@ -1104,15 +1126,63 @@ class FederationHandler: connected_insertion_event_backfill_item.event_id not in event_id_results ): - queue.put( - ( - -connected_insertion_event_backfill_item.depth, - -connected_insertion_event_backfill_item.stream_ordering, + # Check whether the insertion event is already on the + # federating homeserver we're trying to send backfill + # events to + room_version = await self.store.get_room_version(room_id) + event_exists_on_remote_server = None + try: + # Because of the nature of backfill giving events to + # the federated homeserver in one chunk and then we + # can possibly query about that same event in the + # next chunk, we need to avoid getting a cached + # response. We want to know *now* whether they have + # backfilled the insertion event. + event_exists_on_remote_server = await self.federation_client.get_pdu_from_destination_raw( + origin, connected_insertion_event_backfill_item.event_id, - connected_insertion_event_backfill_item.type, + room_version=room_version, + outlier=True, + timeout=10000, ) + except Exception as e: + logger.info( + "get_backfill_events(room_id=%s): Failed to fetch insertion event_id=%s from origin=%s but we're just going to assume it's not backfilled there yet. error=%s", + room_id, + connected_insertion_event_backfill_item.event_id, + origin, + e, + ) + + logger.info( + "get_backfill_events(room_id=%s): checked if insertion event_id=%s exists on federated homeserver(origin=%s) already? event_exists_on_remote_server=%s", + room_id, + connected_insertion_event_backfill_item.event_id, + origin, + event_exists_on_remote_server, ) + # If the event is already on the federated homeserver, + # we don't need to try to branch off onto this + # historical chain of messages. Below, we will instead + # just go up the `prev_events` as normal. + # + # This is important so that the first time we backfill + # the federated homeserver, we jump off and go down the + # historical branch. But after the historical branch is + # exhausted and the event comes up again in backfill, we + # will choose the "live" DAG. + if not event_exists_on_remote_server: + found_undiscovered_connected_historical_messages = True + queue.put( + ( + -connected_insertion_event_backfill_item.depth, + -connected_insertion_event_backfill_item.stream_ordering, + connected_insertion_event_backfill_item.event_id, + connected_insertion_event_backfill_item.type, + ) + ) + # Second, we need to go and try to find any batch events connected # to a given insertion event (by batch_id). If we find any, we'll # add them to the queue and navigate up the DAG like normal in the @@ -1123,8 +1193,10 @@ class FederationHandler: event_id, limit - len(event_id_results) ) ) - logger.debug( - "_get_backfill_events: connected_batch_event_backfill_results %s", + logger.info( + "get_backfill_events(room_id=%s): connected_batch_event_backfill_results(%s)=%s", + room_id, + event_id, connected_batch_event_backfill_results, ) for ( @@ -1143,28 +1215,39 @@ class FederationHandler: ) ) - # Now we just look up the DAG by prev_events as normal - connected_prev_event_backfill_results = ( - await self.store.get_connected_prev_event_backfill_results( - event_id, limit - len(event_id_results) + # If we found a historical branch of history off of the message lets + # navigate down that in the next iteration of the loop instead of + # the normal prev_event chain. + if not found_undiscovered_connected_historical_messages: + event_id_results.add(event_id) + + # Now we just look up the DAG by prev_events as normal + connected_prev_event_backfill_results = ( + await self.store.get_connected_prev_event_backfill_results( + event_id, limit - len(event_id_results) + ) ) - ) - logger.debug( - "_get_backfill_events: prev_event_ids %s", - connected_prev_event_backfill_results, - ) - for ( - connected_prev_event_backfill_item - ) in connected_prev_event_backfill_results: - if connected_prev_event_backfill_item.event_id not in event_id_results: - queue.put( - ( - -connected_prev_event_backfill_item.depth, - -connected_prev_event_backfill_item.stream_ordering, - connected_prev_event_backfill_item.event_id, - connected_prev_event_backfill_item.type, + logger.info( + "get_backfill_events(room_id=%s): connected_prev_event_backfill_results(%s)=%s", + room_id, + event_id, + connected_prev_event_backfill_results, + ) + for ( + connected_prev_event_backfill_item + ) in connected_prev_event_backfill_results: + if ( + connected_prev_event_backfill_item.event_id + not in event_id_results + ): + queue.put( + ( + -connected_prev_event_backfill_item.depth, + -connected_prev_event_backfill_item.stream_ordering, + connected_prev_event_backfill_item.event_id, + connected_prev_event_backfill_item.type, + ) ) - ) events = await self.store.get_events_as_list(event_id_results) return sorted( @@ -1182,24 +1265,25 @@ class FederationHandler: # Synapse asks for 100 events per backfill request. Do not allow more. limit = min(limit, 100) - events = await self.store.get_backfill_events(room_id, pdu_list, limit) - logger.info( - "old implementation backfill events=%s", - [ - "event_id=%s,depth=%d,body=%s,prevs=%s\n" - % ( - event.event_id, - event.depth, - event.content.get("body", event.type), - event.prev_event_ids(), - ) - for event in events - ], - ) - - events = await self.get_backfill_events(room_id, pdu_list, limit) + # events = await self.store.get_backfill_events(room_id, pdu_list, limit) + # logger.info( + # "old implementation backfill events=%s", + # [ + # "event_id=%s,depth=%d,body=%s,prevs=%s\n" + # % ( + # event.event_id, + # event.depth, + # event.content.get("body", event.type), + # event.prev_event_ids(), + # ) + # for event in events + # ], + # ) + + events = await self.get_backfill_events(origin, room_id, pdu_list, limit) logger.info( - "new implementation backfill events=%s", + "new implementation backfill events(%d)=%s", + len(events), [ "event_id=%s,depth=%d,body=%s,prevs=%s\n" % (