diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7c450ecad0..842f5327c2 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -53,7 +53,7 @@ from synapse.api.room_versions import (
RoomVersion,
RoomVersions,
)
-from synapse.events import EventBase, builder
+from synapse.events import EventBase, builder, make_event_from_dict
from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
@@ -299,7 +299,8 @@ class FederationClient(FederationBase):
moving to the next destination. None indicates no timeout.
Returns:
- The requested PDU, or None if we were unable to find it.
+ A copy of the requested PDU that is safe to modify, or None if we
+ were unable to find it.
Raises:
SynapseError, NotRetryingDestination, FederationDeniedError
@@ -309,7 +310,7 @@ class FederationClient(FederationBase):
)
logger.debug(
- "retrieved event id %s from %s: %r",
+ "get_pdu_from_destination_raw: retrieved event id %s from %s: %r",
event_id,
destination,
transaction_data,
@@ -358,54 +359,92 @@ class FederationClient(FederationBase):
The requested PDU, or None if we were unable to find it.
"""
- # TODO: Rate limit the number of times we try and get the same event.
+ logger.debug(
+ "get_pdu: event_id=%s from destinations=%s", event_id, destinations
+ )
- ev = self._get_pdu_cache.get(event_id)
- if ev:
- return ev
+ # TODO: Rate limit the number of times we try and get the same event.
- pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
+ # We might need the same event multiple times in quick succession (before
+ # it gets persisted to the database), so we cache the results of the lookup.
+ # Note that this is separate to the regular get_event cache which caches
+ # events once they have been persisted.
+ event = self._get_pdu_cache.get(event_id)
+
+ # If we don't see the event in the cache, go try to fetch it from the
+ # provided remote federated destinations
+ if not event:
+ pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
+
+ for destination in destinations:
+ now = self._clock.time_msec()
+ last_attempt = pdu_attempts.get(destination, 0)
+ if last_attempt + PDU_RETRY_TIME_MS > now:
+ logger.debug(
+ "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
+ destination,
+ last_attempt,
+ PDU_RETRY_TIME_MS,
+ now,
+ )
+ continue
+
+ try:
+ event = await self.get_pdu_from_destination_raw(
+ destination=destination,
+ event_id=event_id,
+ room_version=room_version,
+ timeout=timeout,
+ )
- signed_pdu = None
- for destination in destinations:
- now = self._clock.time_msec()
- last_attempt = pdu_attempts.get(destination, 0)
- if last_attempt + PDU_RETRY_TIME_MS > now:
- continue
+ pdu_attempts[destination] = now
- try:
- signed_pdu = await self.get_pdu_from_destination_raw(
- destination=destination,
- event_id=event_id,
- room_version=room_version,
- timeout=timeout,
- )
+ if event:
+ # Prime the cache
+ self._get_pdu_cache[event.event_id] = event
- pdu_attempts[destination] = now
+ # FIXME: We should add a `break` here to avoid calling every
+ # destination after we already found a PDU (will follow-up
+ # in a separate PR)
- except SynapseError as e:
- logger.info(
- "Failed to get PDU %s from %s because %s", event_id, destination, e
- )
- continue
- except NotRetryingDestination as e:
- logger.info(str(e))
- continue
- except FederationDeniedError as e:
- logger.info(str(e))
- continue
- except Exception as e:
- pdu_attempts[destination] = now
+ except SynapseError as e:
+ logger.info(
+ "Failed to get PDU %s from %s because %s",
+ event_id,
+ destination,
+ e,
+ )
+ continue
+ except NotRetryingDestination as e:
+ logger.info(str(e))
+ continue
+ except FederationDeniedError as e:
+ logger.info(str(e))
+ continue
+ except Exception as e:
+ pdu_attempts[destination] = now
+
+ logger.info(
+ "Failed to get PDU %s from %s because %s",
+ event_id,
+ destination,
+ e,
+ )
+ continue
- logger.info(
- "Failed to get PDU %s from %s because %s", event_id, destination, e
- )
- continue
+ if not event:
+ return None
- if signed_pdu:
- self._get_pdu_cache[event_id] = signed_pdu
+ # `event` now refers to an object stored in `get_pdu_cache`. Our
+ # callers may need to modify the returned object (eg to set
+ # `event.internal_metadata.outlier = true`), so we return a copy
+ # rather than the original object.
+ event_copy = make_event_from_dict(
+ event.get_pdu_json(),
+ event.room_version,
+ )
- return signed_pdu
+ return event_copy
async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
|