summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py127
1 files changed, 109 insertions, 18 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 11a005f0bf..4bb4d09d4a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,6 +32,7 @@ from typing import (
 )
 
 import attr
+from prometheus_client import Histogram
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
@@ -59,6 +60,7 @@ from synapse.events.validator import EventValidator
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.http.servlet import assert_params_in_dict
 from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.module_api import NOT_SPAM
 from synapse.replication.http.federation import (
@@ -78,6 +80,29 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+# Added to debug performance and track progress on optimizations
+backfill_processing_before_timer = Histogram(
+    "synapse_federation_backfill_processing_before_time_seconds",
+    "sec",
+    [],
+    buckets=(
+        0.1,
+        0.5,
+        1.0,
+        2.5,
+        5.0,
+        7.5,
+        10.0,
+        15.0,
+        20.0,
+        30.0,
+        40.0,
+        60.0,
+        80.0,
+        "+Inf",
+    ),
+)
+
 
 def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
     """Get joined domains from state
@@ -137,6 +162,7 @@ class FederationHandler:
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
 
+        self.clock = hs.get_clock()
         self.store = hs.get_datastores().main
         self._storage_controllers = hs.get_storage_controllers()
         self._state_storage_controller = self._storage_controllers.state
@@ -180,6 +206,7 @@ class FederationHandler:
                 "resume_sync_partial_state_room", self._resume_sync_partial_state_room
             )
 
+    @trace
     async def maybe_backfill(
         self, room_id: str, current_depth: int, limit: int
     ) -> bool:
@@ -195,12 +222,39 @@ class FederationHandler:
                 return. This is used as part of the heuristic to decide if we
                 should back paginate.
         """
+        # Starting the processing time here so we can include the room backfill
+        # linearizer lock queue in the timing
+        processing_start_time = self.clock.time_msec()
+
         async with self._room_backfill.queue(room_id):
-            return await self._maybe_backfill_inner(room_id, current_depth, limit)
+            return await self._maybe_backfill_inner(
+                room_id,
+                current_depth,
+                limit,
+                processing_start_time=processing_start_time,
+            )
 
     async def _maybe_backfill_inner(
-        self, room_id: str, current_depth: int, limit: int
+        self,
+        room_id: str,
+        current_depth: int,
+        limit: int,
+        *,
+        processing_start_time: int,
     ) -> bool:
+        """
+        Checks whether the `current_depth` is at or approaching any backfill
+        points in the room and if so, will backfill. We only care about
+        checking backfill points that happened before the `current_depth`
+        (meaning less than or equal to the `current_depth`).
+
+        Args:
+            room_id: The room to backfill in.
+            current_depth: The depth to check at for any upcoming backfill points.
+            limit: The max number of events to request from the remote federated server.
+            processing_start_time: The time when `maybe_backfill` started
+                processing. Only used for timing.
+        """
         backwards_extremities = [
             _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
             for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
@@ -368,6 +422,14 @@ class FederationHandler:
         logger.debug(
             "_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
         )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "extremities_to_request",
+            str(extremities_to_request),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "extremities_to_request.length",
+            str(len(extremities_to_request)),
+        )
 
         # Now we need to decide which hosts to hit first.
 
@@ -423,6 +485,11 @@ class FederationHandler:
 
             return False
 
+        processing_end_time = self.clock.time_msec()
+        backfill_processing_before_timer.observe(
+            (processing_end_time - processing_start_time) / 1000
+        )
+
         success = await try_backfill(likely_domains)
         if success:
             return True
@@ -546,9 +613,9 @@ class FederationHandler:
             )
 
             if ret.partial_state:
-                # TODO(faster_joins): roll this back if we don't manage to start the
-                #   background resync (eg process_remote_join fails)
-                #   https://github.com/matrix-org/synapse/issues/12998
+                # Mark the room as having partial state.
+                # The background process is responsible for unmarking this flag,
+                # even if the join fails.
                 await self.store.store_partial_state_room(room_id, ret.servers_in_room)
 
             try:
@@ -574,17 +641,21 @@ class FederationHandler:
                     room_id,
                 )
                 raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
-
-            if ret.partial_state:
-                # Kick off the process of asynchronously fetching the state for this
-                # room.
-                run_as_background_process(
-                    desc="sync_partial_state_room",
-                    func=self._sync_partial_state_room,
-                    initial_destination=origin,
-                    other_destinations=ret.servers_in_room,
-                    room_id=room_id,
-                )
+            finally:
+                # Always kick off the background process that asynchronously fetches
+                # state for the room.
+                # If the join failed, the background process is responsible for
+                # cleaning up — including unmarking the room as a partial state room.
+                if ret.partial_state:
+                    # Kick off the process of asynchronously fetching the state for this
+                    # room.
+                    run_as_background_process(
+                        desc="sync_partial_state_room",
+                        func=self._sync_partial_state_room,
+                        initial_destination=origin,
+                        other_destinations=ret.servers_in_room,
+                        room_id=room_id,
+                    )
 
             # We wait here until this instance has seen the events come down
             # replication (if we're using replication) as the below uses caches.
@@ -748,6 +819,23 @@ class FederationHandler:
         # (and return a 404 otherwise)
         room_version = await self.store.get_room_version(room_id)
 
+        if await self.store.is_partial_state_room(room_id):
+            # If our server is still only partially joined, we can't give a complete
+            # response to /make_join, so return a 404 as we would if we weren't in the
+            # room at all.
+            # The main reason we can't respond properly is that we need to know about
+            # the auth events for the join event that we would return.
+            # We also should not bother entertaining the /make_join since we cannot
+            # handle the /send_join.
+            logger.info(
+                "Rejecting /make_join to %s because it's a partial state room", room_id
+            )
+            raise SynapseError(
+                404,
+                "Unable to handle /make_join right now; this server is not fully joined.",
+                errcode=Codes.NOT_FOUND,
+            )
+
         # now check that we are *still* in the room
         is_in_room = await self._event_auth_handler.check_host_in_room(
             room_id, self.server_name
@@ -1071,6 +1159,8 @@ class FederationHandler:
 
         return event
 
+    @trace
+    @tag_args
     async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
         """Returns the state at the event. i.e. not including said event."""
         event = await self.store.get_event(event_id, check_room_id=room_id)
@@ -1552,15 +1642,16 @@ class FederationHandler:
 
         # Make an infinite iterator of destinations to try. Once we find a working
         # destination, we'll stick with it until it flakes.
+        destinations: Collection[str]
         if initial_destination is not None:
             # Move `initial_destination` to the front of the list.
             destinations = list(other_destinations)
             if initial_destination in destinations:
                 destinations.remove(initial_destination)
             destinations = [initial_destination] + destinations
-            destination_iter = itertools.cycle(destinations)
         else:
-            destination_iter = itertools.cycle(other_destinations)
+            destinations = other_destinations
+        destination_iter = itertools.cycle(destinations)
 
         # `destination` is the current remote homeserver we're pulling from.
         destination = next(destination_iter)