diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7d80ff6998..7c485aa7e0 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -68,12 +68,14 @@ from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
event_from_pdu_json,
+ parse_events_from_pdu_json,
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.client import is_unknown_endpoint
from synapse.http.types import QueryParams
from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.types import JsonDict, StrCollection, UserID, get_domain_from_id
+from synapse.types.handlers.policy_server import RECOMMENDATION_OK, RECOMMENDATION_SPAM
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.retryutils import NotRetryingDestination
@@ -349,7 +351,7 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
- pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus]
+ pdus = parse_events_from_pdu_json(transaction_data_pdus, room_version)
# Check signatures and hash of pdus, removing any from the list that fail checks
pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
@@ -393,9 +395,7 @@ class FederationClient(FederationBase):
transaction_data,
)
- pdu_list: List[EventBase] = [
- event_from_pdu_json(p, room_version) for p in transaction_data["pdus"]
- ]
+ pdu_list = parse_events_from_pdu_json(transaction_data["pdus"], room_version)
if pdu_list and pdu_list[0]:
pdu = pdu_list[0]
@@ -424,6 +424,62 @@ class FederationClient(FederationBase):
@trace
@tag_args
+ async def get_pdu_policy_recommendation(
+ self, destination: str, pdu: EventBase, timeout: Optional[int] = None
+ ) -> str:
+ """Requests that the destination server (typically a policy server)
+ check the event and return its recommendation on how to handle the
+ event.
+
+ If the policy server could not be contacted or the policy server
+ returned an unknown recommendation, this returns an OK recommendation.
+ This type fixing behaviour is done because the typical caller will be
+ in a critical call path and would generally interpret a `None` or similar
+ response as "weird value; don't care; move on without taking action". We
+ just frontload that logic here.
+
+
+ Args:
+ destination: The remote homeserver to ask (a policy server)
+ pdu: The event to check
+ timeout: How long to try (in ms) the destination for before
+ giving up. None indicates no timeout.
+
+ Returns:
+ The policy recommendation, or RECOMMENDATION_OK if the policy server was
+ uncontactable or returned an unknown recommendation.
+ """
+
+ logger.debug(
+ "get_pdu_policy_recommendation for event_id=%s from %s",
+ pdu.event_id,
+ destination,
+ )
+
+ try:
+ res = await self.transport_layer.get_policy_recommendation_for_pdu(
+ destination, pdu, timeout=timeout
+ )
+ recommendation = res.get("recommendation")
+ if not isinstance(recommendation, str):
+ raise InvalidResponseError("recommendation is not a string")
+ if recommendation not in (RECOMMENDATION_OK, RECOMMENDATION_SPAM):
+ logger.warning(
+ "get_pdu_policy_recommendation: unknown recommendation: %s",
+ recommendation,
+ )
+ return RECOMMENDATION_OK
+ return recommendation
+ except Exception as e:
+ logger.warning(
+ "get_pdu_policy_recommendation: server %s responded with error, assuming OK recommendation: %s",
+ destination,
+ e,
+ )
+ return RECOMMENDATION_OK
+
+ @trace
+ @tag_args
async def get_pdu(
self,
destinations: Collection[str],
@@ -809,7 +865,7 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
- auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]]
+ auth_chain = parse_events_from_pdu_json(res["auth_chain"], room_version)
signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, auth_chain, room_version=room_version
@@ -1529,9 +1585,7 @@ class FederationClient(FederationBase):
room_version = await self.store.get_room_version(room_id)
- events = [
- event_from_pdu_json(e, room_version) for e in content.get("events", [])
- ]
+ events = parse_events_from_pdu_json(content.get("events", []), room_version)
signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
destination, events, room_version=room_version
|