diff options
author | Eric Eastwood <erice@element.io> | 2022-07-20 15:58:51 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-20 15:58:51 -0500 |
commit | 0f971ca68e808dd16f53f5594a6b33b7bddcc9a9 (patch) | |
tree | 07e68c0a7bc2d05cc6f6ac2e8905d069bff40872 /synapse/federation/federation_client.py | |
parent | Validate federation destinations and log an error if server name is invalid. ... (diff) | |
download | synapse-0f971ca68e808dd16f53f5594a6b33b7bddcc9a9.tar.xz |
Update `get_pdu` to return the original, pristine `EventBase` (#13320)
Update `get_pdu` to return the untouched, pristine `EventBase` as it was originally seen over federation (no metadata added). Previously, we returned the same `event` reference that we stored in the cache which downstream code modified in place and added metadata like setting it as an `outlier` and essentially poisoned our cache. Now we always return a copy of the `event` so the original can stay pristine in our cache and re-used for the next cache call. Split out from https://github.com/matrix-org/synapse/pull/13205 As discussed at: - https://github.com/matrix-org/synapse/pull/13205#discussion_r918365746 - https://github.com/matrix-org/synapse/pull/13205#discussion_r918366125 Related to https://github.com/matrix-org/synapse/issues/12584. This PR doesn't fix that issue because it hits [`get_event` which exists from the local database before it tries to `get_pdu`](https://github.com/matrix-org/synapse/blob/7864f33e286dec22368dc0b11c06eebb1462a51e/synapse/federation/federation_client.py#L581-L594).
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r-- | synapse/federation/federation_client.py | 123 |
1 files changed, 81 insertions, 42 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 7c450ecad0..842f5327c2 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -53,7 +53,7 @@ from synapse.api.room_versions import ( RoomVersion, RoomVersions, ) -from synapse.events import EventBase, builder +from synapse.events import EventBase, builder, make_event_from_dict from synapse.federation.federation_base import ( FederationBase, InvalidEventSignatureError, @@ -299,7 +299,8 @@ class FederationClient(FederationBase): moving to the next destination. None indicates no timeout. Returns: - The requested PDU, or None if we were unable to find it. + A copy of the requested PDU that is safe to modify, or None if we + were unable to find it. Raises: SynapseError, NotRetryingDestination, FederationDeniedError @@ -309,7 +310,7 @@ class FederationClient(FederationBase): ) logger.debug( - "retrieved event id %s from %s: %r", + "get_pdu_from_destination_raw: retrieved event id %s from %s: %r", event_id, destination, transaction_data, @@ -358,54 +359,92 @@ class FederationClient(FederationBase): The requested PDU, or None if we were unable to find it. """ - # TODO: Rate limit the number of times we try and get the same event. + logger.debug( + "get_pdu: event_id=%s from destinations=%s", event_id, destinations + ) - ev = self._get_pdu_cache.get(event_id) - if ev: - return ev + # TODO: Rate limit the number of times we try and get the same event. - pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) + # We might need the same event multiple times in quick succession (before + # it gets persisted to the database), so we cache the results of the lookup. + # Note that this is separate to the regular get_event cache which caches + # events once they have been persisted. + event = self._get_pdu_cache.get(event_id) + + # If we don't see the event in the cache, go try to fetch it from the + # provided remote federated destinations + if not event: + pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) + + for destination in destinations: + now = self._clock.time_msec() + last_attempt = pdu_attempts.get(destination, 0) + if last_attempt + PDU_RETRY_TIME_MS > now: + logger.debug( + "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", + destination, + last_attempt, + PDU_RETRY_TIME_MS, + now, + ) + continue + + try: + event = await self.get_pdu_from_destination_raw( + destination=destination, + event_id=event_id, + room_version=room_version, + timeout=timeout, + ) - signed_pdu = None - for destination in destinations: - now = self._clock.time_msec() - last_attempt = pdu_attempts.get(destination, 0) - if last_attempt + PDU_RETRY_TIME_MS > now: - continue + pdu_attempts[destination] = now - try: - signed_pdu = await self.get_pdu_from_destination_raw( - destination=destination, - event_id=event_id, - room_version=room_version, - timeout=timeout, - ) + if event: + # Prime the cache + self._get_pdu_cache[event.event_id] = event - pdu_attempts[destination] = now + # FIXME: We should add a `break` here to avoid calling every + # destination after we already found a PDU (will follow-up + # in a separate PR) - except SynapseError as e: - logger.info( - "Failed to get PDU %s from %s because %s", event_id, destination, e - ) - continue - except NotRetryingDestination as e: - logger.info(str(e)) - continue - except FederationDeniedError as e: - logger.info(str(e)) - continue - except Exception as e: - pdu_attempts[destination] = now + except SynapseError as e: + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, + destination, + e, + ) + continue + except NotRetryingDestination as e: + logger.info(str(e)) + continue + except FederationDeniedError as e: + logger.info(str(e)) + continue + except Exception as e: + pdu_attempts[destination] = now + + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, + destination, + e, + ) + continue - logger.info( - "Failed to get PDU %s from %s because %s", event_id, destination, e - ) - continue + if not event: + return None - if signed_pdu: - self._get_pdu_cache[event_id] = signed_pdu + # `event` now refers to an object stored in `get_pdu_cache`. Our + # callers may need to modify the returned object (eg to set + # `event.internal_metadata.outlier = true`), so we return a copy + # rather than the original object. + event_copy = make_event_from_dict( + event.get_pdu_json(), + event.room_version, + ) - return signed_pdu + return event_copy async def get_room_state_ids( self, destination: str, room_id: str, event_id: str |