diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 2522bf78fc..4269a98db2 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -23,6 +23,7 @@ from synapse.crypto.keyring import Keyring
from synapse.events import EventBase, make_event_from_dict
from synapse.events.utils import prune_event, validate_canonicaljson
from synapse.http.servlet import assert_params_in_dict
+from synapse.logging.opentracing import log_kv, trace
from synapse.types import JsonDict, get_domain_from_id
if TYPE_CHECKING:
@@ -55,6 +56,7 @@ class FederationBase:
self._clock = hs.get_clock()
self._storage_controllers = hs.get_storage_controllers()
+ @trace
async def _check_sigs_and_hash(
self, room_version: RoomVersion, pdu: EventBase
) -> EventBase:
@@ -97,17 +99,36 @@ class FederationBase:
"Event %s seems to have been redacted; using our redacted copy",
pdu.event_id,
)
+ log_kv(
+ {
+ "message": "Event seems to have been redacted; using our redacted copy",
+ "event_id": pdu.event_id,
+ }
+ )
else:
logger.warning(
"Event %s content has been tampered, redacting",
pdu.event_id,
)
+ log_kv(
+ {
+ "message": "Event content has been tampered, redacting",
+ "event_id": pdu.event_id,
+ }
+ )
return redacted_event
spam_check = await self.spam_checker.check_event_for_spam(pdu)
if spam_check != self.spam_checker.NOT_SPAM:
logger.warning("Event contains spam, soft-failing %s", pdu.event_id)
+ log_kv(
+ {
+ "message": "Event contains spam, redacting (to save disk space) "
+ "as well as soft-failing (to stop using the event in prev_events)",
+ "event_id": pdu.event_id,
+ }
+ )
# we redact (to save disk space) as well as soft-failing (to stop
# using the event in prev_events).
redacted_event = prune_event(pdu)
@@ -117,6 +138,7 @@ class FederationBase:
return pdu
+@trace
async def _check_sigs_on_pdu(
keyring: Keyring, room_version: RoomVersion, pdu: EventBase
) -> None:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 842f5327c2..7ee2974bb1 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -61,6 +61,7 @@ from synapse.federation.federation_base import (
)
from synapse.federation.transport.client import SendJoinResponse
from synapse.http.types import QueryParams
+from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
from synapse.types import JsonDict, UserID, get_domain_from_id
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@@ -233,6 +234,8 @@ class FederationClient(FederationBase):
destination, content, timeout
)
+ @trace
+ @tag_args
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> Optional[List[EventBase]]:
@@ -335,6 +338,8 @@ class FederationClient(FederationBase):
return None
+ @trace
+ @tag_args
async def get_pdu(
self,
destinations: Iterable[str],
@@ -403,9 +408,9 @@ class FederationClient(FederationBase):
# Prime the cache
self._get_pdu_cache[event.event_id] = event
- # FIXME: We should add a `break` here to avoid calling every
- # destination after we already found a PDU (will follow-up
- # in a separate PR)
+ # Now that we have an event, we can break out of this
+ # loop and stop asking other destinations.
+ break
except SynapseError as e:
logger.info(
@@ -446,6 +451,8 @@ class FederationClient(FederationBase):
return event_copy
+ @trace
+ @tag_args
async def get_room_state_ids(
self, destination: str, room_id: str, event_id: str
) -> Tuple[List[str], List[str]]:
@@ -465,6 +472,23 @@ class FederationClient(FederationBase):
state_event_ids = result["pdu_ids"]
auth_event_ids = result.get("auth_chain_ids", [])
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "state_event_ids",
+ str(state_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "state_event_ids.length",
+ str(len(state_event_ids)),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "auth_event_ids",
+ str(auth_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "auth_event_ids.length",
+ str(len(auth_event_ids)),
+ )
+
if not isinstance(state_event_ids, list) or not isinstance(
auth_event_ids, list
):
@@ -472,6 +496,8 @@ class FederationClient(FederationBase):
return state_event_ids, auth_event_ids
+ @trace
+ @tag_args
async def get_room_state(
self,
destination: str,
@@ -531,6 +557,7 @@ class FederationClient(FederationBase):
return valid_state_events, valid_auth_events
+ @trace
async def _check_sigs_and_hash_and_fetch(
self,
origin: str,
@@ -560,11 +587,15 @@ class FederationClient(FederationBase):
Returns:
A list of PDUs that have valid signatures and hashes.
"""
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "pdus.length",
+ str(len(pdus)),
+ )
# We limit how many PDUs we check at once, as if we try to do hundreds
# of thousands of PDUs at once we see large memory spikes.
- valid_pdus = []
+ valid_pdus: List[EventBase] = []
async def _execute(pdu: EventBase) -> None:
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
@@ -580,6 +611,8 @@ class FederationClient(FederationBase):
return valid_pdus
+ @trace
+ @tag_args
async def _check_sigs_and_hash_and_fetch_one(
self,
pdu: EventBase,
@@ -612,16 +645,27 @@ class FederationClient(FederationBase):
except InvalidEventSignatureError as e:
logger.warning(
"Signature on retrieved event %s was invalid (%s). "
- "Checking local store/orgin server",
+ "Checking local store/origin server",
pdu.event_id,
e,
)
+ log_kv(
+ {
+ "message": "Signature on retrieved event was invalid. "
+ "Checking local store/origin server",
+ "event_id": pdu.event_id,
+ "InvalidEventSignatureError": e,
+ }
+ )
# Check local db.
res = await self.store.get_event(
pdu.event_id, allow_rejected=True, allow_none=True
)
+ # If the PDU fails its signature check and we don't have it in our
+ # database, we then request it from sender's server (if that is not the
+ # same as `origin`).
pdu_origin = get_domain_from_id(pdu.sender)
if not res and pdu_origin != origin:
try:
@@ -725,6 +769,12 @@ class FederationClient(FederationBase):
if failover_errcodes is None:
failover_errcodes = ()
+ if not destinations:
+ # Give a bit of a clearer message if no servers were specified at all.
+ raise SynapseError(
+ 502, f"Failed to {description} via any server: No servers specified."
+ )
+
for destination in destinations:
if destination == self.server_name:
continue
@@ -774,7 +824,7 @@ class FederationClient(FederationBase):
"Failed to %s via %s", description, destination, exc_info=True
)
- raise SynapseError(502, "Failed to %s via any server" % (description,))
+ raise SynapseError(502, f"Failed to {description} via any server")
async def make_membership_event(
self,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index ae550d3f4d..75fbc6073d 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -61,7 +61,12 @@ from synapse.logging.context import (
nested_logging_context,
run_in_background,
)
-from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
+from synapse.logging.opentracing import (
+ log_kv,
+ start_active_span_from_edu,
+ tag_args,
+ trace,
+)
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -469,7 +474,7 @@ class FederationServer(FederationBase):
)
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- pdu_results[event_id] = e.error_dict()
+ pdu_results[event_id] = e.error_dict(self.hs.config)
return
for pdu in pdus_by_room[room_id]:
@@ -547,6 +552,8 @@ class FederationServer(FederationBase):
return 200, resp
+ @trace
+ @tag_args
async def on_state_ids_request(
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, JsonDict]:
@@ -569,6 +576,8 @@ class FederationServer(FederationBase):
return 200, resp
+ @trace
+ @tag_args
async def _on_state_ids_request_compute(
self, room_id: str, event_id: str
) -> JsonDict:
@@ -843,8 +852,25 @@ class FederationServer(FederationBase):
Codes.BAD_JSON,
)
+ # Note that get_room_version throws if the room does not exist here.
room_version = await self.store.get_room_version(room_id)
+ if await self.store.is_partial_state_room(room_id):
+ # If our server is still only partially joined, we can't give a complete
+ # response to /send_join, /send_knock or /send_leave.
+ # This is because we will not be able to provide the server list (for partial
+ # joins) or the full state (for full joins).
+ # Return a 404 as we would if we weren't in the room at all.
+ logger.info(
+ f"Rejecting /send_{membership_type} to %s because it's a partial state room",
+ room_id,
+ )
+ raise SynapseError(
+ 404,
+ f"Unable to handle /send_{membership_type} right now; this server is not fully joined.",
+ errcode=Codes.NOT_FOUND,
+ )
+
if membership_type == Membership.KNOCK and not room_version.msc2403_knocking:
raise SynapseError(
403,
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index 84100a5a52..bb0f8d6b7b 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -309,7 +309,7 @@ class BaseFederationServlet:
raise
# update the active opentracing span with the authenticated entity
- set_tag("authenticated_entity", origin)
+ set_tag("authenticated_entity", str(origin))
# if the origin is authenticated and whitelisted, use its span context
# as the parent.
|