diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 842f5327c2..7ee2974bb1 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -61,6 +61,7 @@ from synapse.federation.federation_base import (
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.types import QueryParams
+from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@@ -233,6 +234,8 @@ class FederationClient(FederationBase):
destination, content, timeout
)
+ @trace
+ @tag_args
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> Optional[List[EventBase]]:
@@ -335,6 +338,8 @@ class FederationClient(FederationBase):
return None
+ @trace
+ @tag_args
async def get_pdu(
self,
destinations: Iterable[str],
@@ -403,9 +408,9 @@ class FederationClient(FederationBase):
# Prime the cache
self._get_pdu_cache[event.event_id] = event
- # FIXME: We should add a `break` here to avoid calling every
- # destination after we already found a PDU (will follow-up
- # in a separate PR)
+ # Now that we have an event, we can break out of this
+ # loop and stop asking other destinations.
+ break
except SynapseError as e:
logger.info(
@@ -446,6 +451,8 @@ class FederationClient(FederationBase):
return event_copy
+ @trace
+ @tag_args
async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
) -> Tuple[List[str], List[str]]:
@@ -465,6 +472,23 @@ class FederationClient(FederationBase):
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "state_event_ids",
+ str(state_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "state_event_ids.length",
+ str(len(state_event_ids)),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "auth_event_ids",
+ str(auth_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "auth_event_ids.length",
+ str(len(auth_event_ids)),
+ )
+
if not isinstance(state_event_ids, list) or not isinstance(
auth_event_ids, list
):
@@ -472,6 +496,8 @@ class FederationClient(FederationBase):
return state_event_ids, auth_event_ids
+ @trace
+ @tag_args
async def get_room_state(
self,
destination: str,
@@ -531,6 +557,7 @@ class FederationClient(FederationBase):
return valid_state_events, valid_auth_events
+ @trace
async def _check_sigs_and_hash_and_fetch(
self,
origin: str,
@@ -560,11 +587,15 @@ class FederationClient(FederationBase):
Returns:
A list of PDUs that have valid signatures and hashes.
"""
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "pdus.length",
+ str(len(pdus)),
+ )
# We limit how many PDUs we check at once, as if we try to do hundreds
# of thousands of PDUs at once we see large memory spikes.
- valid_pdus = []
+ valid_pdus: List[EventBase] = []
async def _execute(pdu: EventBase) -> None:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
@@ -580,6 +611,8 @@ class FederationClient(FederationBase):
return valid_pdus
+ @trace
+ @tag_args
async def _check_sigs_and_hash_and_fetch_one(
self,
pdu: EventBase,
@@ -612,16 +645,27 @@ class FederationClient(FederationBase):
except InvalidEventSignatureError as e:
logger.warning(
"Signature on retrieved event %s was invalid (%s). "
- "Checking local store/orgin server",
+ "Checking local store/origin server",
pdu.event_id,
e,
)
+ log_kv(
+ {
+ "message": "Signature on retrieved event was invalid. "
+ "Checking local store/origin server",
+ "event_id": pdu.event_id,
+ "InvalidEventSignatureError": e,
+ }
+ )
# Check local db.
res = await self.store.get_event(
pdu.event_id, allow_rejected=True, allow_none=True
)
+ # If the PDU fails its signature check and we don't have it in our
+ # database, we then request it from sender's server (if that is not the
+ # same as `origin`).
pdu_origin = get_domain_from_id(pdu.sender)
if not res and pdu_origin != origin:
try:
@@ -725,6 +769,12 @@ class FederationClient(FederationBase):
if failover_errcodes is None:
failover_errcodes = ()
+ if not destinations:
+ # Give a bit of a clearer message if no servers were specified at all.
+ raise SynapseError(
+ 502, f"Failed to {description} via any server: No servers specified."
+ )
+
for destination in destinations:
if destination == self.server_name:
continue
@@ -774,7 +824,7 @@ class FederationClient(FederationBase):
"Failed to %s via %s", description, destination, exc_info=True
)
- raise SynapseError(502, "Failed to %s via any server" % (description,))
+ raise SynapseError(502, f"Failed to {description} via any server")
async def make_membership_event(
self,
|