summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r--synapse/federation/federation_client.py235
1 files changed, 197 insertions, 38 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py

index 6a8d76529b..c4c0bc7315 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -61,6 +61,7 @@ from synapse.federation.federation_base import ( ) from synapse.federation.transport.client import SendJoinResponse from synapse.http.types import QueryParams +from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache @@ -79,6 +80,18 @@ PDU_RETRY_TIME_MS = 1 * 60 * 1000 T = TypeVar("T") +@attr.s(frozen=True, slots=True, auto_attribs=True) +class PulledPduInfo: + """ + A result object that stores the PDU and info about it like which homeserver we + pulled it from (`pull_origin`) + """ + + pdu: EventBase + # Which homeserver we pulled the PDU from + pull_origin: str + + class InvalidResponseError(RuntimeError): """Helper for _try_destination_list: indicates that the server returned a response we couldn't parse @@ -113,7 +126,9 @@ class FederationClient(FederationBase): self.hostname = hs.hostname self.signing_key = hs.signing_key - self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache( + # Cache mapping `event_id` to a tuple of the event itself and the `pull_origin` + # (which server we pulled the event from) + self._get_pdu_cache: ExpiringCache[str, Tuple[EventBase, str]] = ExpiringCache( cache_name="get_pdu_cache", clock=self._clock, max_len=1000, @@ -233,6 +248,8 @@ class FederationClient(FederationBase): destination, content, timeout ) + @trace + @tag_args async def backfill( self, dest: str, room_id: str, limit: int, extremities: Collection[str] ) -> Optional[List[EventBase]]: @@ -275,7 +292,7 @@ class FederationClient(FederationBase): pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus] # Check signatures and hash of pdus, removing any from the list that fail checks - pdus[:] = await self._check_sigs_and_hash_and_fetch( + pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch( dest, pdus, room_version=room_version ) @@ -325,7 +342,17 @@ class FederationClient(FederationBase): # Check signatures are correct. try: - signed_pdu = await self._check_sigs_and_hash(room_version, pdu) + + async def _record_failure_callback( + event: EventBase, cause: str + ) -> None: + await self.store.record_event_failed_pull_attempt( + event.room_id, event.event_id, cause + ) + + signed_pdu = await self._check_sigs_and_hash( + room_version, pdu, _record_failure_callback + ) except InvalidEventSignatureError as e: errmsg = f"event id {pdu.event_id}: {e}" logger.warning("%s", errmsg) @@ -335,13 +362,15 @@ class FederationClient(FederationBase): return None + @trace + @tag_args async def get_pdu( self, - destinations: Iterable[str], + destinations: Collection[str], event_id: str, room_version: RoomVersion, timeout: Optional[int] = None, - ) -> Optional[EventBase]: + ) -> Optional[PulledPduInfo]: """Requests the PDU with given origin and ID from the remote home servers. @@ -356,11 +385,11 @@ class FederationClient(FederationBase): moving to the next destination. None indicates no timeout. Returns: - The requested PDU, or None if we were unable to find it. + The requested PDU wrapped in `PulledPduInfo`, or None if we were unable to find it. """ logger.debug( - "get_pdu: event_id=%s from destinations=%s", event_id, destinations + "get_pdu(event_id=%s): from destinations=%s", event_id, destinations ) # TODO: Rate limit the number of times we try and get the same event. @@ -369,19 +398,25 @@ class FederationClient(FederationBase): # it gets persisted to the database), so we cache the results of the lookup. # Note that this is separate to the regular get_event cache which caches # events once they have been persisted. - event = self._get_pdu_cache.get(event_id) + get_pdu_cache_entry = self._get_pdu_cache.get(event_id) + event = None + pull_origin = None + if get_pdu_cache_entry: + event, pull_origin = get_pdu_cache_entry # If we don't see the event in the cache, go try to fetch it from the # provided remote federated destinations - if not event: + else: pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) + # TODO: We can probably refactor this to use `_try_destination_list` for destination in destinations: now = self._clock.time_msec() last_attempt = pdu_attempts.get(destination, 0) if last_attempt + PDU_RETRY_TIME_MS > now: logger.debug( - "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", + "get_pdu(event_id=%s): skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)", + event_id, destination, last_attempt, PDU_RETRY_TIME_MS, @@ -396,43 +431,48 @@ class FederationClient(FederationBase): room_version=room_version, timeout=timeout, ) + pull_origin = destination pdu_attempts[destination] = now if event: # Prime the cache - self._get_pdu_cache[event.event_id] = event + self._get_pdu_cache[event.event_id] = (event, pull_origin) # Now that we have an event, we can break out of this # loop and stop asking other destinations. break + except NotRetryingDestination as e: + logger.info("get_pdu(event_id=%s): %s", event_id, e) + continue + except FederationDeniedError: + logger.info( + "get_pdu(event_id=%s): Not attempting to fetch PDU from %s because the homeserver is not on our federation whitelist", + event_id, + destination, + ) + continue except SynapseError as e: logger.info( - "Failed to get PDU %s from %s because %s", + "get_pdu(event_id=%s): Failed to get PDU from %s because %s", event_id, destination, e, ) continue - except NotRetryingDestination as e: - logger.info(str(e)) - continue - except FederationDeniedError as e: - logger.info(str(e)) - continue except Exception as e: pdu_attempts[destination] = now logger.info( - "Failed to get PDU %s from %s because %s", + "get_pdu(event_id=%s): Failed to get PDU from %s because %s", event_id, destination, e, ) continue - if not event: + if not event or not pull_origin: return None # `event` now refers to an object stored in `get_pdu_cache`. Our @@ -444,8 +484,10 @@ class FederationClient(FederationBase): event.room_version, ) - return event_copy + return PulledPduInfo(event_copy, pull_origin) + @trace + @tag_args async def get_room_state_ids( self, destination: str, room_id: str, event_id: str ) -> Tuple[List[str], List[str]]: @@ -465,6 +507,23 @@ class FederationClient(FederationBase): state_event_ids = result["pdu_ids"] auth_event_ids = result.get("auth_chain_ids", []) + set_tag( + SynapseTags.RESULT_PREFIX + "state_event_ids", + str(state_event_ids), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "state_event_ids.length", + str(len(state_event_ids)), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "auth_event_ids", + str(auth_event_ids), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "auth_event_ids.length", + str(len(auth_event_ids)), + ) + if not isinstance(state_event_ids, list) or not isinstance( auth_event_ids, list ): @@ -472,6 +531,8 @@ class FederationClient(FederationBase): return state_event_ids, auth_event_ids + @trace + @tag_args async def get_room_state( self, destination: str, @@ -521,23 +582,28 @@ class FederationClient(FederationBase): len(auth_event_map), ) - valid_auth_events = await self._check_sigs_and_hash_and_fetch( + valid_auth_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch( destination, auth_event_map.values(), room_version ) - valid_state_events = await self._check_sigs_and_hash_and_fetch( - destination, state_event_map.values(), room_version + valid_state_events = ( + await self._check_sigs_and_hash_for_pulled_events_and_fetch( + destination, state_event_map.values(), room_version + ) ) return valid_state_events, valid_auth_events - async def _check_sigs_and_hash_and_fetch( + @trace + async def _check_sigs_and_hash_for_pulled_events_and_fetch( self, origin: str, pdus: Collection[EventBase], room_version: RoomVersion, ) -> List[EventBase]: - """Checks the signatures and hashes of a list of events. + """ + Checks the signatures and hashes of a list of pulled events we got from + federation and records any signature failures as failed pull attempts. If a PDU fails its signature check then we check if we have it in the database, and if not then request it from the sender's server (if that @@ -560,17 +626,27 @@ class FederationClient(FederationBase): Returns: A list of PDUs that have valid signatures and hashes. """ + set_tag( + SynapseTags.RESULT_PREFIX + "pdus.length", + str(len(pdus)), + ) # We limit how many PDUs we check at once, as if we try to do hundreds # of thousands of PDUs at once we see large memory spikes. - valid_pdus = [] + valid_pdus: List[EventBase] = [] + + async def _record_failure_callback(event: EventBase, cause: str) -> None: + await self.store.record_event_failed_pull_attempt( + event.room_id, event.event_id, cause + ) async def _execute(pdu: EventBase) -> None: valid_pdu = await self._check_sigs_and_hash_and_fetch_one( pdu=pdu, origin=origin, room_version=room_version, + record_failure_callback=_record_failure_callback, ) if valid_pdu: @@ -580,11 +656,16 @@ class FederationClient(FederationBase): return valid_pdus + @trace + @tag_args async def _check_sigs_and_hash_and_fetch_one( self, pdu: EventBase, origin: str, room_version: RoomVersion, + record_failure_callback: Optional[ + Callable[[EventBase, str], Awaitable[None]] + ] = None, ) -> Optional[EventBase]: """Takes a PDU and checks its signatures and hashes. @@ -601,6 +682,11 @@ class FederationClient(FederationBase): origin pdu room_version + record_failure_callback: A callback to run whenever the given event + fails signature or hash checks. This includes exceptions + that would be normally be thrown/raised but also things like + checking for event tampering where we just return the redacted + event. Returns: The PDU (possibly redacted) if it has valid signatures and hashes. @@ -608,29 +694,44 @@ class FederationClient(FederationBase): """ try: - return await self._check_sigs_and_hash(room_version, pdu) + return await self._check_sigs_and_hash( + room_version, pdu, record_failure_callback + ) except InvalidEventSignatureError as e: logger.warning( "Signature on retrieved event %s was invalid (%s). " - "Checking local store/orgin server", + "Checking local store/origin server", pdu.event_id, e, ) + log_kv( + { + "message": "Signature on retrieved event was invalid. " + "Checking local store/origin server", + "event_id": pdu.event_id, + "InvalidEventSignatureError": e, + } + ) # Check local db. res = await self.store.get_event( pdu.event_id, allow_rejected=True, allow_none=True ) + # If the PDU fails its signature check and we don't have it in our + # database, we then request it from sender's server (if that is not the + # same as `origin`). pdu_origin = get_domain_from_id(pdu.sender) if not res and pdu_origin != origin: try: - res = await self.get_pdu( + pulled_pdu_info = await self.get_pdu( destinations=[pdu_origin], event_id=pdu.event_id, room_version=room_version, timeout=10000, ) + if pulled_pdu_info is not None: + res = pulled_pdu_info.pdu except SynapseError: pass @@ -650,7 +751,7 @@ class FederationClient(FederationBase): auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]] - signed_auth = await self._check_sigs_and_hash_and_fetch( + signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch( destination, auth_chain, room_version=room_version ) @@ -732,6 +833,7 @@ class FederationClient(FederationBase): ) for destination in destinations: + # We don't want to ask our own server for information we don't have if destination == self.server_name: continue @@ -740,9 +842,21 @@ class FederationClient(FederationBase): except ( RequestSendFailed, InvalidResponseError, - NotRetryingDestination, ) as e: logger.warning("Failed to %s via %s: %s", description, destination, e) + # Skip to the next homeserver in the list to try. + continue + except NotRetryingDestination as e: + logger.info("%s: %s", description, e) + continue + except FederationDeniedError: + logger.info( + "%s: Not attempting to %s from %s because the homeserver is not on our federation whitelist", + description, + description, + destination, + ) + continue except UnsupportedRoomVersionError: raise except HttpResponseException as e: @@ -862,9 +976,6 @@ class FederationClient(FederationBase): # The protoevent received over the JSON wire may not have all # the required fields. Lets just gloss over that because # there's some we never care about - if "prev_state" not in pdu_dict: - pdu_dict["prev_state"] = [] - ev = builder.create_local_event_from_event_dict( self._clock, self.hostname, @@ -1146,7 +1257,7 @@ class FederationClient(FederationBase): # Otherwise, consider it a legitimate error and raise. err = e.to_synapse_error() if self._is_unknown_endpoint(e, err): - if room_version.event_format != EventFormatVersions.V1: + if room_version.event_format != EventFormatVersions.ROOM_V1_V2: raise SynapseError( 400, "User's homeserver does not support this room version", @@ -1223,7 +1334,7 @@ class FederationClient(FederationBase): return resp[1] async def send_knock(self, destinations: List[str], pdu: EventBase) -> JsonDict: - """Attempts to send a knock event to given a list of servers. Iterates + """Attempts to send a knock event to a given list of servers. Iterates through the list until one attempt succeeds. Doing so will cause the remote server to add the event to the graph, @@ -1360,7 +1471,7 @@ class FederationClient(FederationBase): event_from_pdu_json(e, room_version) for e in content.get("events", []) ] - signed_events = await self._check_sigs_and_hash_and_fetch( + signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch( destination, events, room_version=room_version ) except HttpResponseException as e: @@ -1538,6 +1649,54 @@ class FederationClient(FederationBase): return result async def timestamp_to_event( + self, *, destinations: List[str], room_id: str, timestamp: int, direction: str + ) -> Optional["TimestampToEventResponse"]: + """ + Calls each remote federating server from `destinations` asking for their closest + event to the given timestamp in the given direction until we get a response. + Also validates the response to always return the expected keys or raises an + error. + + Args: + destinations: The domains of homeservers to try fetching from + room_id: Room to fetch the event from + timestamp: The point in time (inclusive) we should navigate from in + the given direction to find the closest event. + direction: ["f"|"b"] to indicate whether we should navigate forward + or backward from the given timestamp to find the closest event. + + Returns: + A parsed TimestampToEventResponse including the closest event_id + and origin_server_ts or None if no destination has a response. + """ + + async def _timestamp_to_event_from_destination( + destination: str, + ) -> TimestampToEventResponse: + return await self._timestamp_to_event_from_destination( + destination, room_id, timestamp, direction + ) + + try: + # Loop through each homeserver candidate until we get a succesful response + timestamp_to_event_response = await self._try_destination_list( + "timestamp_to_event", + destinations, + # TODO: The requested timestamp may lie in a part of the + # event graph that the remote server *also* didn't have, + # in which case they will have returned another event + # which may be nowhere near the requested timestamp. In + # the future, we may need to reconcile that gap and ask + # other homeservers, and/or extend `/timestamp_to_event` + # to return events on *both* sides of the timestamp to + # help reconcile the gap faster. + _timestamp_to_event_from_destination, + ) + return timestamp_to_event_response + except SynapseError: + return None + + async def _timestamp_to_event_from_destination( self, destination: str, room_id: str, timestamp: int, direction: str ) -> "TimestampToEventResponse": """