diff options
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r-- | synapse/handlers/federation.py | 178 |
1 files changed, 118 insertions, 60 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3b5eaf5156..dd4b9f66d1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -32,6 +32,7 @@ from typing import ( ) import attr +from prometheus_client import Histogram from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -59,6 +60,7 @@ from synapse.events.validator import EventValidator from synapse.federation.federation_client import InvalidResponseError from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import nested_logging_context +from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.module_api import NOT_SPAM from synapse.replication.http.federation import ( @@ -68,7 +70,7 @@ from synapse.replication.http.federation import ( from synapse.storage.databases.main.events import PartialStateConflictError from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.state import StateFilter -from synapse.types import JsonDict, StateMap, get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server @@ -78,36 +80,28 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) - -def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]: - """Get joined domains from state - - Args: - state: State map from type/state key to event. - - Returns: - Returns a list of servers with the lowest depth of their joins. - Sorted by lowest depth first. - """ - joined_users = [ - (state_key, int(event.depth)) - for (e_type, state_key), event in state.items() - if e_type == EventTypes.Member and event.membership == Membership.JOIN - ] - - joined_domains: Dict[str, int] = {} - for u, d in joined_users: - try: - dom = get_domain_from_id(u) - old_d = joined_domains.get(dom) - if old_d: - joined_domains[dom] = min(d, old_d) - else: - joined_domains[dom] = d - except Exception: - pass - - return sorted(joined_domains.items(), key=lambda d: d[1]) +# Added to debug performance and track progress on optimizations +backfill_processing_before_timer = Histogram( + "synapse_federation_backfill_processing_before_time_seconds", + "sec", + [], + buckets=( + 0.1, + 0.5, + 1.0, + 2.5, + 5.0, + 7.5, + 10.0, + 15.0, + 20.0, + 30.0, + 40.0, + 60.0, + 80.0, + "+Inf", + ), +) class _BackfillPointType(Enum): @@ -137,6 +131,7 @@ class FederationHandler: def __init__(self, hs: "HomeServer"): self.hs = hs + self.clock = hs.get_clock() self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() self._state_storage_controller = self._storage_controllers.state @@ -180,6 +175,7 @@ class FederationHandler: "resume_sync_partial_state_room", self._resume_sync_partial_state_room ) + @trace async def maybe_backfill( self, room_id: str, current_depth: int, limit: int ) -> bool: @@ -195,12 +191,39 @@ class FederationHandler: return. This is used as part of the heuristic to decide if we should back paginate. """ + # Starting the processing time here so we can include the room backfill + # linearizer lock queue in the timing + processing_start_time = self.clock.time_msec() + async with self._room_backfill.queue(room_id): - return await self._maybe_backfill_inner(room_id, current_depth, limit) + return await self._maybe_backfill_inner( + room_id, + current_depth, + limit, + processing_start_time=processing_start_time, + ) async def _maybe_backfill_inner( - self, room_id: str, current_depth: int, limit: int + self, + room_id: str, + current_depth: int, + limit: int, + *, + processing_start_time: int, ) -> bool: + """ + Checks whether the `current_depth` is at or approaching any backfill + points in the room and if so, will backfill. We only care about + checking backfill points that happened before the `current_depth` + (meaning less than or equal to the `current_depth`). + + Args: + room_id: The room to backfill in. + current_depth: The depth to check at for any upcoming backfill points. + limit: The max number of events to request from the remote federated server. + processing_start_time: The time when `maybe_backfill` started + processing. Only used for timing. + """ backwards_extremities = [ _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY) for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room( @@ -368,23 +391,29 @@ class FederationHandler: logger.debug( "_maybe_backfill_inner: extremities_to_request %s", extremities_to_request ) + set_tag( + SynapseTags.RESULT_PREFIX + "extremities_to_request", + str(extremities_to_request), + ) + set_tag( + SynapseTags.RESULT_PREFIX + "extremities_to_request.length", + str(len(extremities_to_request)), + ) # Now we need to decide which hosts to hit first. - - # First we try hosts that are already in the room + # First we try hosts that are already in the room. # TODO: HEURISTIC ALERT. + likely_domains = ( + await self._storage_controllers.state.get_current_hosts_in_room(room_id) + ) - curr_state = await self._storage_controllers.state.get_current_state(room_id) - - curr_domains = get_domains_from_state(curr_state) - - likely_domains = [ - domain for domain, depth in curr_domains if domain != self.server_name - ] - - async def try_backfill(domains: List[str]) -> bool: + async def try_backfill(domains: Collection[str]) -> bool: # TODO: Should we try multiple of these at a time? for dom in domains: + # We don't want to ask our own server for information we don't have + if dom == self.server_name: + continue + try: await self._federation_event_handler.backfill( dom, room_id, limit=100, extremities=extremities_to_request @@ -423,6 +452,11 @@ class FederationHandler: return False + processing_end_time = self.clock.time_msec() + backfill_processing_before_timer.observe( + (processing_end_time - processing_start_time) / 1000 + ) + success = await try_backfill(likely_domains) if success: return True @@ -546,9 +580,9 @@ class FederationHandler: ) if ret.partial_state: - # TODO(faster_joins): roll this back if we don't manage to start the - # background resync (eg process_remote_join fails) - # https://github.com/matrix-org/synapse/issues/12998 + # Mark the room as having partial state. + # The background process is responsible for unmarking this flag, + # even if the join fails. await self.store.store_partial_state_room(room_id, ret.servers_in_room) try: @@ -574,17 +608,21 @@ class FederationHandler: room_id, ) raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) - - if ret.partial_state: - # Kick off the process of asynchronously fetching the state for this - # room. - run_as_background_process( - desc="sync_partial_state_room", - func=self._sync_partial_state_room, - initial_destination=origin, - other_destinations=ret.servers_in_room, - room_id=room_id, - ) + finally: + # Always kick off the background process that asynchronously fetches + # state for the room. + # If the join failed, the background process is responsible for + # cleaning up — including unmarking the room as a partial state room. + if ret.partial_state: + # Kick off the process of asynchronously fetching the state for this + # room. + run_as_background_process( + desc="sync_partial_state_room", + func=self._sync_partial_state_room, + initial_destination=origin, + other_destinations=ret.servers_in_room, + room_id=room_id, + ) # We wait here until this instance has seen the events come down # replication (if we're using replication) as the below uses caches. @@ -748,6 +786,23 @@ class FederationHandler: # (and return a 404 otherwise) room_version = await self.store.get_room_version(room_id) + if await self.store.is_partial_state_room(room_id): + # If our server is still only partially joined, we can't give a complete + # response to /make_join, so return a 404 as we would if we weren't in the + # room at all. + # The main reason we can't respond properly is that we need to know about + # the auth events for the join event that we would return. + # We also should not bother entertaining the /make_join since we cannot + # handle the /send_join. + logger.info( + "Rejecting /make_join to %s because it's a partial state room", room_id + ) + raise SynapseError( + 404, + "Unable to handle /make_join right now; this server is not fully joined.", + errcode=Codes.NOT_FOUND, + ) + # now check that we are *still* in the room is_in_room = await self._event_auth_handler.check_host_in_room( room_id, self.server_name @@ -1058,6 +1113,8 @@ class FederationHandler: return event + @trace + @tag_args async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]: """Returns the state at the event. i.e. not including said event.""" event = await self.store.get_event(event_id, check_room_id=room_id) @@ -1539,15 +1596,16 @@ class FederationHandler: # Make an infinite iterator of destinations to try. Once we find a working # destination, we'll stick with it until it flakes. + destinations: Collection[str] if initial_destination is not None: # Move `initial_destination` to the front of the list. destinations = list(other_destinations) if initial_destination in destinations: destinations.remove(initial_destination) destinations = [initial_destination] + destinations - destination_iter = itertools.cycle(destinations) else: - destination_iter = itertools.cycle(other_destinations) + destinations = other_destinations + destination_iter = itertools.cycle(destinations) # `destination` is the current remote homeserver we're pulling from. destination = next(destination_iter) |