diff --git a/changelog.d/11242.misc b/changelog.d/11242.misc
new file mode 100644
index 0000000000..3a98259edf
--- /dev/null
+++ b/changelog.d/11242.misc
@@ -0,0 +1 @@
+Split out federated PDU retrieval function into a non-cached version.
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 670186f548..3b85b135e0 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -277,6 +277,58 @@ class FederationClient(FederationBase):
return pdus
+ async def get_pdu_from_destination_raw(
+ self,
+ destination: str,
+ event_id: str,
+ room_version: RoomVersion,
+ outlier: bool = False,
+ timeout: Optional[int] = None,
+ ) -> Optional[EventBase]:
+ """Requests the PDU with given origin and ID from the remote home
+ server. Does not have any caching or rate limiting!
+
+ Args:
+ destination: Which homeserver to query
+ event_id: event to fetch
+ room_version: version of the room
+ outlier: Indicates whether the PDU is an `outlier`, i.e. if
+ it's from an arbitrary point in the context as opposed to part
+ of the current block of PDUs. Defaults to `False`
+ timeout: How long to try (in ms) each destination for before
+ moving to the next destination. None indicates no timeout.
+
+ Returns:
+ The requested PDU, or None if we were unable to find it.
+
+ Raises:
+ SynapseError, NotRetryingDestination, FederationDeniedError
+ """
+ transaction_data = await self.transport_layer.get_event(
+ destination, event_id, timeout=timeout
+ )
+
+ logger.debug(
+ "retrieved event id %s from %s: %r",
+ event_id,
+ destination,
+ transaction_data,
+ )
+
+ pdu_list: List[EventBase] = [
+ event_from_pdu_json(p, room_version, outlier=outlier)
+ for p in transaction_data["pdus"]
+ ]
+
+ if pdu_list and pdu_list[0]:
+ pdu = pdu_list[0]
+
+ # Check signatures are correct.
+ signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
+ return signed_pdu
+
+ return None
+
async def get_pdu(
self,
destinations: Iterable[str],
@@ -321,30 +373,14 @@ class FederationClient(FederationBase):
continue
try:
- transaction_data = await self.transport_layer.get_event(
- destination, event_id, timeout=timeout
- )
-
- logger.debug(
- "retrieved event id %s from %s: %r",
- event_id,
- destination,
- transaction_data,
+ signed_pdu = await self.get_pdu_from_destination_raw(
+ destination=destination,
+ event_id=event_id,
+ room_version=room_version,
+ outlier=outlier,
+ timeout=timeout,
)
- pdu_list: List[EventBase] = [
- event_from_pdu_json(p, room_version, outlier=outlier)
- for p in transaction_data["pdus"]
- ]
-
- if pdu_list and pdu_list[0]:
- pdu = pdu_list[0]
-
- # Check signatures are correct.
- signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
-
- break
-
pdu_attempts[destination] = now
except SynapseError as e:
|