diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index af652a7659..23a42b9074 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -39,7 +39,7 @@ from synapse.events import builder, room_version_to_event_format
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.utils import log_function
-from synapse.util import unwrapFirstError
+from synapse.util import batch_iter, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
@@ -327,7 +327,74 @@ class FederationClient(FederationBase):
):
raise Exception("invalid response from /state_ids")
- return state_event_ids, auth_event_ids
+ desired_events = set(state_event_ids + auth_event_ids)
+ event_map = yield self.get_events_from_store_or_dest(
+ destination, room_id, desired_events
+ )
+
+ failed_to_fetch = desired_events - event_map.keys()
+ if failed_to_fetch:
+ logger.warning(
+ "Failed to fetch missing state/auth events for %s: %s",
+ room_id,
+ failed_to_fetch,
+ )
+
+ pdus = [event_map[e_id] for e_id in state_event_ids if e_id in event_map]
+ auth_chain = [event_map[e_id] for e_id in auth_event_ids if e_id in event_map]
+
+ auth_chain.sort(key=lambda e: e.depth)
+
+ return pdus, auth_chain
+
+ @defer.inlineCallbacks
+ def get_events_from_store_or_dest(self, destination, room_id, event_ids):
+ """Fetch events from a remote destination, checking if we already have them.
+
+ Args:
+ destination (str)
+ room_id (str)
+ event_ids (Iterable[str])
+
+ Returns:
+ Deferred[dict[str, EventBase]]: A deferred resolving to a map
+ from event_id to event
+ """
+ fetched_events = yield self.store.get_events(event_ids, allow_rejected=True)
+
+ missing_events = set(event_ids) - fetched_events.keys()
+
+ if not missing_events:
+ return fetched_events
+
+ logger.debug(
+ "Fetching unknown state/auth events %s for room %s",
+ missing_events,
+ event_ids,
+ )
+
+ room_version = yield self.store.get_room_version(room_id)
+
+ # XXX 20 requests at once? really?
+ for batch in batch_iter(missing_events, 20):
+ deferreds = [
+ run_in_background(
+ self.get_pdu,
+ destinations=[destination],
+ event_id=e_id,
+ room_version=room_version,
+ )
+ for e_id in batch
+ ]
+
+ res = yield make_deferred_yieldable(
+ defer.DeferredList(deferreds, consumeErrors=True)
+ )
+ for success, result in res:
+ if success and result:
+ fetched_events[result.event_id] = result
+
+ return fetched_events
@defer.inlineCallbacks
@log_function
|