summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_base.py22
-rw-r--r--synapse/federation/federation_client.py62
-rw-r--r--synapse/federation/federation_server.py30
-rw-r--r--synapse/federation/transport/server/_base.py2
4 files changed, 107 insertions, 9 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 2522bf78fc..4269a98db2 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -23,6 +23,7 @@ from synapse.crypto.keyring import Keyring
 from synapse.events import EventBase, make_event_from_dict
 from synapse.events.utils import prune_event, validate_canonicaljson
 from synapse.http.servlet import assert_params_in_dict
+from synapse.logging.opentracing import log_kv, trace
 from synapse.types import JsonDict, get_domain_from_id
 
 if TYPE_CHECKING:
@@ -55,6 +56,7 @@ class FederationBase:
         self._clock = hs.get_clock()
         self._storage_controllers = hs.get_storage_controllers()
 
+    @trace
     async def _check_sigs_and_hash(
         self, room_version: RoomVersion, pdu: EventBase
     ) -> EventBase:
@@ -97,17 +99,36 @@ class FederationBase:
                     "Event %s seems to have been redacted; using our redacted copy",
                     pdu.event_id,
                 )
+                log_kv(
+                    {
+                        "message": "Event seems to have been redacted; using our redacted copy",
+                        "event_id": pdu.event_id,
+                    }
+                )
             else:
                 logger.warning(
                     "Event %s content has been tampered, redacting",
                     pdu.event_id,
                 )
+                log_kv(
+                    {
+                        "message": "Event content has been tampered, redacting",
+                        "event_id": pdu.event_id,
+                    }
+                )
             return redacted_event
 
         spam_check = await self.spam_checker.check_event_for_spam(pdu)
 
         if spam_check != self.spam_checker.NOT_SPAM:
             logger.warning("Event contains spam, soft-failing %s", pdu.event_id)
+            log_kv(
+                {
+                    "message": "Event contains spam, redacting (to save disk space) "
+                    "as well as soft-failing (to stop using the event in prev_events)",
+                    "event_id": pdu.event_id,
+                }
+            )
             # we redact (to save disk space) as well as soft-failing (to stop
             # using the event in prev_events).
             redacted_event = prune_event(pdu)
@@ -117,6 +138,7 @@ class FederationBase:
         return pdu
 
 
+@trace
 async def _check_sigs_on_pdu(
     keyring: Keyring, room_version: RoomVersion, pdu: EventBase
 ) -> None:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 842f5327c2..7ee2974bb1 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -61,6 +61,7 @@ from synapse.federation.federation_base import (
 )
 from synapse.federation.transport.client import SendJoinResponse
 from synapse.http.types import QueryParams
+from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, tag_args, trace
 from synapse.types import JsonDict, UserID, get_domain_from_id
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -233,6 +234,8 @@ class FederationClient(FederationBase):
             destination, content, timeout
         )
 
+    @trace
+    @tag_args
     async def backfill(
         self, dest: str, room_id: str, limit: int, extremities: Collection[str]
     ) -> Optional[List[EventBase]]:
@@ -335,6 +338,8 @@ class FederationClient(FederationBase):
 
         return None
 
+    @trace
+    @tag_args
     async def get_pdu(
         self,
         destinations: Iterable[str],
@@ -403,9 +408,9 @@ class FederationClient(FederationBase):
                         # Prime the cache
                         self._get_pdu_cache[event.event_id] = event
 
-                        # FIXME: We should add a `break` here to avoid calling every
-                        # destination after we already found a PDU (will follow-up
-                        # in a separate PR)
+                        # Now that we have an event, we can break out of this
+                        # loop and stop asking other destinations.
+                        break
 
                 except SynapseError as e:
                     logger.info(
@@ -446,6 +451,8 @@ class FederationClient(FederationBase):
 
         return event_copy
 
+    @trace
+    @tag_args
     async def get_room_state_ids(
         self, destination: str, room_id: str, event_id: str
     ) -> Tuple[List[str], List[str]]:
@@ -465,6 +472,23 @@ class FederationClient(FederationBase):
         state_event_ids = result["pdu_ids"]
         auth_event_ids = result.get("auth_chain_ids", [])
 
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "state_event_ids",
+            str(state_event_ids),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "state_event_ids.length",
+            str(len(state_event_ids)),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "auth_event_ids",
+            str(auth_event_ids),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "auth_event_ids.length",
+            str(len(auth_event_ids)),
+        )
+
         if not isinstance(state_event_ids, list) or not isinstance(
             auth_event_ids, list
         ):
@@ -472,6 +496,8 @@ class FederationClient(FederationBase):
 
         return state_event_ids, auth_event_ids
 
+    @trace
+    @tag_args
     async def get_room_state(
         self,
         destination: str,
@@ -531,6 +557,7 @@ class FederationClient(FederationBase):
 
         return valid_state_events, valid_auth_events
 
+    @trace
     async def _check_sigs_and_hash_and_fetch(
         self,
         origin: str,
@@ -560,11 +587,15 @@ class FederationClient(FederationBase):
         Returns:
             A list of PDUs that have valid signatures and hashes.
         """
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "pdus.length",
+            str(len(pdus)),
+        )
 
         # We limit how many PDUs we check at once, as if we try to do hundreds
         # of thousands of PDUs at once we see large memory spikes.
 
-        valid_pdus = []
+        valid_pdus: List[EventBase] = []
 
         async def _execute(pdu: EventBase) -> None:
             valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
@@ -580,6 +611,8 @@ class FederationClient(FederationBase):
 
         return valid_pdus
 
+    @trace
+    @tag_args
     async def _check_sigs_and_hash_and_fetch_one(
         self,
         pdu: EventBase,
@@ -612,16 +645,27 @@ class FederationClient(FederationBase):
         except InvalidEventSignatureError as e:
             logger.warning(
                 "Signature on retrieved event %s was invalid (%s). "
-                "Checking local store/orgin server",
+                "Checking local store/origin server",
                 pdu.event_id,
                 e,
             )
+            log_kv(
+                {
+                    "message": "Signature on retrieved event was invalid. "
+                    "Checking local store/origin server",
+                    "event_id": pdu.event_id,
+                    "InvalidEventSignatureError": e,
+                }
+            )
 
         # Check local db.
         res = await self.store.get_event(
             pdu.event_id, allow_rejected=True, allow_none=True
         )
 
+        # If the PDU fails its signature check and we don't have it in our
+        # database, we then request it from sender's server (if that is not the
+        # same as `origin`).
         pdu_origin = get_domain_from_id(pdu.sender)
         if not res and pdu_origin != origin:
             try:
@@ -725,6 +769,12 @@ class FederationClient(FederationBase):
         if failover_errcodes is None:
             failover_errcodes = ()
 
+        if not destinations:
+            # Give a bit of a clearer message if no servers were specified at all.
+            raise SynapseError(
+                502, f"Failed to {description} via any server: No servers specified."
+            )
+
         for destination in destinations:
             if destination == self.server_name:
                 continue
@@ -774,7 +824,7 @@ class FederationClient(FederationBase):
                     "Failed to %s via %s", description, destination, exc_info=True
                 )
 
-        raise SynapseError(502, "Failed to %s via any server" % (description,))
+        raise SynapseError(502, f"Failed to {description} via any server")
 
     async def make_membership_event(
         self,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index ae550d3f4d..75fbc6073d 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -61,7 +61,12 @@ from synapse.logging.context import (
     nested_logging_context,
     run_in_background,
 )
-from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
+from synapse.logging.opentracing import (
+    log_kv,
+    start_active_span_from_edu,
+    tag_args,
+    trace,
+)
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
@@ -469,7 +474,7 @@ class FederationServer(FederationBase):
                     )
                     for pdu in pdus_by_room[room_id]:
                         event_id = pdu.event_id
-                        pdu_results[event_id] = e.error_dict()
+                        pdu_results[event_id] = e.error_dict(self.hs.config)
                     return
 
                 for pdu in pdus_by_room[room_id]:
@@ -547,6 +552,8 @@ class FederationServer(FederationBase):
 
         return 200, resp
 
+    @trace
+    @tag_args
     async def on_state_ids_request(
         self, origin: str, room_id: str, event_id: str
     ) -> Tuple[int, JsonDict]:
@@ -569,6 +576,8 @@ class FederationServer(FederationBase):
 
         return 200, resp
 
+    @trace
+    @tag_args
     async def _on_state_ids_request_compute(
         self, room_id: str, event_id: str
     ) -> JsonDict:
@@ -843,8 +852,25 @@ class FederationServer(FederationBase):
                 Codes.BAD_JSON,
             )
 
+        # Note that get_room_version throws if the room does not exist here.
         room_version = await self.store.get_room_version(room_id)
 
+        if await self.store.is_partial_state_room(room_id):
+            # If our server is still only partially joined, we can't give a complete
+            # response to /send_join, /send_knock or /send_leave.
+            # This is because we will not be able to provide the server list (for partial
+            # joins) or the full state (for full joins).
+            # Return a 404 as we would if we weren't in the room at all.
+            logger.info(
+                f"Rejecting /send_{membership_type} to %s because it's a partial state room",
+                room_id,
+            )
+            raise SynapseError(
+                404,
+                f"Unable to handle /send_{membership_type} right now; this server is not fully joined.",
+                errcode=Codes.NOT_FOUND,
+            )
+
         if membership_type == Membership.KNOCK and not room_version.msc2403_knocking:
             raise SynapseError(
                 403,
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index 84100a5a52..bb0f8d6b7b 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -309,7 +309,7 @@ class BaseFederationServlet:
                 raise
 
             # update the active opentracing span with the authenticated entity
-            set_tag("authenticated_entity", origin)
+            set_tag("authenticated_entity", str(origin))
 
             # if the origin is authenticated and whitelisted, use its span context
             # as the parent.