diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index ad475a913b..7ee2974bb1 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -53,7 +53,7 @@ from synapse.api.room_versions import (
RoomVersion,
RoomVersions,
)
-from synapse.events import EventBase, builder
+from synapse.events import EventBase, builder, make_event_from_dict
from synapse.federation.federation_base import (
FederationBase,
InvalidEventSignatureError,
@@ -61,6 +61,7 @@ from synapse.federation.federation_base import (
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.types import QueryParams
+from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@@ -217,7 +218,7 @@ class FederationClient(FederationBase):
)
async def claim_client_keys(
- self, destination: str, content: JsonDict, timeout: int
+ self, destination: str, content: JsonDict, timeout: Optional[int]
) -> JsonDict:
"""Claims one-time keys for a device hosted on a remote server.
@@ -233,6 +234,8 @@ class FederationClient(FederationBase):
destination, content, timeout
)
+ @trace
+ @tag_args
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> Optional[List[EventBase]]:
@@ -299,7 +302,8 @@ class FederationClient(FederationBase):
moving to the next destination. None indicates no timeout.
Returns:
- The requested PDU, or None if we were unable to find it.
+ A copy of the requested PDU that is safe to modify, or None if we
+ were unable to find it.
Raises:
SynapseError, NotRetryingDestination, FederationDeniedError
@@ -309,7 +313,7 @@ class FederationClient(FederationBase):
)
logger.debug(
- "retrieved event id %s from %s: %r",
+ "get_pdu_from_destination_raw: retrieved event id %s from %s: %r",
event_id,
destination,
transaction_data,
@@ -334,6 +338,8 @@ class FederationClient(FederationBase):
return None
+ @trace
+ @tag_args
async def get_pdu(
self,
destinations: Iterable[str],
@@ -358,55 +364,95 @@ class FederationClient(FederationBase):
The requested PDU, or None if we were unable to find it.
"""
- # TODO: Rate limit the number of times we try and get the same event.
+ logger.debug(
+ "get_pdu: event_id=%s from destinations=%s", event_id, destinations
+ )
- ev = self._get_pdu_cache.get(event_id)
- if ev:
- return ev
+ # TODO: Rate limit the number of times we try and get the same event.
- pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
+ # We might need the same event multiple times in quick succession (before
+ # it gets persisted to the database), so we cache the results of the lookup.
+ # Note that this is separate to the regular get_event cache which caches
+ # events once they have been persisted.
+ event = self._get_pdu_cache.get(event_id)
+
+ # If we don't see the event in the cache, go try to fetch it from the
+ # provided remote federated destinations
+ if not event:
+ pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
+
+ for destination in destinations:
+ now = self._clock.time_msec()
+ last_attempt = pdu_attempts.get(destination, 0)
+ if last_attempt + PDU_RETRY_TIME_MS > now:
+ logger.debug(
+ "get_pdu: skipping destination=%s because we tried it recently last_attempt=%s and we only check every %s (now=%s)",
+ destination,
+ last_attempt,
+ PDU_RETRY_TIME_MS,
+ now,
+ )
+ continue
+
+ try:
+ event = await self.get_pdu_from_destination_raw(
+ destination=destination,
+ event_id=event_id,
+ room_version=room_version,
+ timeout=timeout,
+ )
- signed_pdu = None
- for destination in destinations:
- now = self._clock.time_msec()
- last_attempt = pdu_attempts.get(destination, 0)
- if last_attempt + PDU_RETRY_TIME_MS > now:
- continue
+ pdu_attempts[destination] = now
- try:
- signed_pdu = await self.get_pdu_from_destination_raw(
- destination=destination,
- event_id=event_id,
- room_version=room_version,
- timeout=timeout,
- )
+ if event:
+ # Prime the cache
+ self._get_pdu_cache[event.event_id] = event
- pdu_attempts[destination] = now
+ # Now that we have an event, we can break out of this
+ # loop and stop asking other destinations.
+ break
- except SynapseError as e:
- logger.info(
- "Failed to get PDU %s from %s because %s", event_id, destination, e
- )
- continue
- except NotRetryingDestination as e:
- logger.info(str(e))
- continue
- except FederationDeniedError as e:
- logger.info(str(e))
- continue
- except Exception as e:
- pdu_attempts[destination] = now
+ except SynapseError as e:
+ logger.info(
+ "Failed to get PDU %s from %s because %s",
+ event_id,
+ destination,
+ e,
+ )
+ continue
+ except NotRetryingDestination as e:
+ logger.info(str(e))
+ continue
+ except FederationDeniedError as e:
+ logger.info(str(e))
+ continue
+ except Exception as e:
+ pdu_attempts[destination] = now
+
+ logger.info(
+ "Failed to get PDU %s from %s because %s",
+ event_id,
+ destination,
+ e,
+ )
+ continue
- logger.info(
- "Failed to get PDU %s from %s because %s", event_id, destination, e
- )
- continue
+ if not event:
+ return None
- if signed_pdu:
- self._get_pdu_cache[event_id] = signed_pdu
+ # `event` now refers to an object stored in `get_pdu_cache`. Our
+ # callers may need to modify the returned object (eg to set
+ # `event.internal_metadata.outlier = true`), so we return a copy
+ # rather than the original object.
+ event_copy = make_event_from_dict(
+ event.get_pdu_json(),
+ event.room_version,
+ )
- return signed_pdu
+ return event_copy
+ @trace
+ @tag_args
async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
) -> Tuple[List[str], List[str]]:
@@ -426,6 +472,23 @@ class FederationClient(FederationBase):
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "state_event_ids",
+ str(state_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "state_event_ids.length",
+ str(len(state_event_ids)),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "auth_event_ids",
+ str(auth_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "auth_event_ids.length",
+ str(len(auth_event_ids)),
+ )
+
if not isinstance(state_event_ids, list) or not isinstance(
auth_event_ids, list
):
@@ -433,6 +496,8 @@ class FederationClient(FederationBase):
return state_event_ids, auth_event_ids
+ @trace
+ @tag_args
async def get_room_state(
self,
destination: str,
@@ -492,6 +557,7 @@ class FederationClient(FederationBase):
return valid_state_events, valid_auth_events
+ @trace
async def _check_sigs_and_hash_and_fetch(
self,
origin: str,
@@ -521,11 +587,15 @@ class FederationClient(FederationBase):
Returns:
A list of PDUs that have valid signatures and hashes.
"""
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "pdus.length",
+ str(len(pdus)),
+ )
# We limit how many PDUs we check at once, as if we try to do hundreds
# of thousands of PDUs at once we see large memory spikes.
- valid_pdus = []
+ valid_pdus: List[EventBase] = []
async def _execute(pdu: EventBase) -> None:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
@@ -541,6 +611,8 @@ class FederationClient(FederationBase):
return valid_pdus
+ @trace
+ @tag_args
async def _check_sigs_and_hash_and_fetch_one(
self,
pdu: EventBase,
@@ -573,16 +645,27 @@ class FederationClient(FederationBase):
except InvalidEventSignatureError as e:
logger.warning(
"Signature on retrieved event %s was invalid (%s). "
- "Checking local store/orgin server",
+ "Checking local store/origin server",
pdu.event_id,
e,
)
+ log_kv(
+ {
+ "message": "Signature on retrieved event was invalid. "
+ "Checking local store/origin server",
+ "event_id": pdu.event_id,
+ "InvalidEventSignatureError": e,
+ }
+ )
# Check local db.
res = await self.store.get_event(
pdu.event_id, allow_rejected=True, allow_none=True
)
+ # If the PDU fails its signature check and we don't have it in our
+ # database, we then request it from sender's server (if that is not the
+ # same as `origin`).
pdu_origin = get_domain_from_id(pdu.sender)
if not res and pdu_origin != origin:
try:
@@ -686,6 +769,12 @@ class FederationClient(FederationBase):
if failover_errcodes is None:
failover_errcodes = ()
+ if not destinations:
+ # Give a bit of a clearer message if no servers were specified at all.
+ raise SynapseError(
+ 502, f"Failed to {description} via any server: No servers specified."
+ )
+
for destination in destinations:
if destination == self.server_name:
continue
@@ -735,7 +824,7 @@ class FederationClient(FederationBase):
"Failed to %s via %s", description, destination, exc_info=True
)
- raise SynapseError(502, "Failed to %s via any server" % (description,))
+ raise SynapseError(502, f"Failed to {description} via any server")
async def make_membership_event(
self,
@@ -1642,10 +1731,6 @@ def _validate_hierarchy_event(d: JsonDict) -> None:
if not isinstance(event_type, str):
raise ValueError("Invalid event: 'event_type' must be a str")
- room_id = d.get("room_id")
- if not isinstance(room_id, str):
- raise ValueError("Invalid event: 'room_id' must be a str")
-
state_key = d.get("state_key")
if not isinstance(state_key, str):
raise ValueError("Invalid event: 'state_key' must be a str")
|