summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/federation/federation_client.py70
1 files changed, 62 insertions, 8 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py

index 7d80ff6998..7c485aa7e0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -68,12 +68,14 @@ from synapse.federation.federation_base import ( FederationBase, InvalidEventSignatureError, event_from_pdu_json, + parse_events_from_pdu_json, ) from synapse.federation.transport.client import SendJoinResponse from synapse.http.client import is_unknown_endpoint from synapse.http.types import QueryParams from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace from synapse.types import JsonDict, StrCollection, UserID, get_domain_from_id +from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -349,7 +351,7 @@ class FederationClient(FederationBase): room_version = await self.store.get_room_version(room_id) - pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus] + pdus = parse_events_from_pdu_json(transaction_data_pdus, room_version) # Check signatures and hash of pdus, removing any from the list that fail checks pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch( @@ -393,9 +395,7 @@ class FederationClient(FederationBase): transaction_data, ) - pdu_list: List[EventBase] = [ - event_from_pdu_json(p, room_version) for p in transaction_data["pdus"] - ] + pdu_list = parse_events_from_pdu_json(transaction_data["pdus"], room_version) if pdu_list and pdu_list[0]: pdu = pdu_list[0] @@ -424,6 +424,62 @@ class FederationClient(FederationBase): @trace @tag_args + async def get_pdu_policy_recommendation( + self, destination: str, pdu: EventBase, timeout: Optional[int] = None + ) -> str: + """Requests that the destination server (typically a policy server) + check the event and return its recommendation on how to handle the + event. + + If the policy server could not be contacted or the policy server + returned an unknown recommendation, this returns an OK recommendation. + This type fixing behaviour is done because the typical caller will be + in a critical call path and would generally interpret a `None` or similar + response as "weird value; don't care; move on without taking action". We + just frontload that logic here. + + + Args: + destination: The remote homeserver to ask (a policy server) + pdu: The event to check + timeout: How long to try (in ms) the destination for before + giving up. None indicates no timeout. + + Returns: + The policy recommendation, or RECOMMENDATION_OK if the policy server was + uncontactable or returned an unknown recommendation. + """ + + logger.debug( + "get_pdu_policy_recommendation for event_id=%s from %s", + pdu.event_id, + destination, + ) + + try: + res = await self.transport_layer.get_policy_recommendation_for_pdu( + destination, pdu, timeout=timeout + ) + recommendation = res.get("recommendation") + if not isinstance(recommendation, str): + raise InvalidResponseError("recommendation is not a string") + if recommendation not in (RECOMMENDATION_OK, RECOMMENDATION_SPAM): + logger.warning( + "get_pdu_policy_recommendation: unknown recommendation: %s", + recommendation, + ) + return RECOMMENDATION_OK + return recommendation + except Exception as e: + logger.warning( + "get_pdu_policy_recommendation: server %s responded with error, assuming OK recommendation: %s", + destination, + e, + ) + return RECOMMENDATION_OK + + @trace + @tag_args async def get_pdu( self, destinations: Collection[str], @@ -809,7 +865,7 @@ class FederationClient(FederationBase): room_version = await self.store.get_room_version(room_id) - auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]] + auth_chain = parse_events_from_pdu_json(res["auth_chain"], room_version) signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch( destination, auth_chain, room_version=room_version @@ -1529,9 +1585,7 @@ class FederationClient(FederationBase): room_version = await self.store.get_room_version(room_id) - events = [ - event_from_pdu_json(e, room_version) for e in content.get("events", []) - ] + events = parse_events_from_pdu_json(content.get("events", []), room_version) signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch( destination, events, room_version=room_version