diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 3b5eaf5156..dd4b9f66d1 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,6 +32,7 @@ from typing import (
)
import attr
+from prometheus_client import Histogram
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
from unpaddedbase64 import decode_base64
@@ -59,6 +60,7 @@ from synapse.events.validator import EventValidator
from synapse.federation.federation_client import InvalidResponseError
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.module_api import NOT_SPAM
from synapse.replication.http.federation import (
@@ -68,7 +70,7 @@ from synapse.replication.http.federation import (
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
-from synapse.types import JsonDict, StateMap, get_domain_from_id
+from synapse.types import JsonDict, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.retryutils import NotRetryingDestination
from synapse.visibility import filter_events_for_server
@@ -78,36 +80,28 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-
-def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
- """Get joined domains from state
-
- Args:
- state: State map from type/state key to event.
-
- Returns:
- Returns a list of servers with the lowest depth of their joins.
- Sorted by lowest depth first.
- """
- joined_users = [
- (state_key, int(event.depth))
- for (e_type, state_key), event in state.items()
- if e_type == EventTypes.Member and event.membership == Membership.JOIN
- ]
-
- joined_domains: Dict[str, int] = {}
- for u, d in joined_users:
- try:
- dom = get_domain_from_id(u)
- old_d = joined_domains.get(dom)
- if old_d:
- joined_domains[dom] = min(d, old_d)
- else:
- joined_domains[dom] = d
- except Exception:
- pass
-
- return sorted(joined_domains.items(), key=lambda d: d[1])
+# Added to debug performance and track progress on optimizations
+backfill_processing_before_timer = Histogram(
+ "synapse_federation_backfill_processing_before_time_seconds",
+ "sec",
+ [],
+ buckets=(
+ 0.1,
+ 0.5,
+ 1.0,
+ 2.5,
+ 5.0,
+ 7.5,
+ 10.0,
+ 15.0,
+ 20.0,
+ 30.0,
+ 40.0,
+ 60.0,
+ 80.0,
+ "+Inf",
+ ),
+)
class _BackfillPointType(Enum):
@@ -137,6 +131,7 @@ class FederationHandler:
def __init__(self, hs: "HomeServer"):
self.hs = hs
+ self.clock = hs.get_clock()
self.store = hs.get_datastores().main
self._storage_controllers = hs.get_storage_controllers()
self._state_storage_controller = self._storage_controllers.state
@@ -180,6 +175,7 @@ class FederationHandler:
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
)
+ @trace
async def maybe_backfill(
self, room_id: str, current_depth: int, limit: int
) -> bool:
@@ -195,12 +191,39 @@ class FederationHandler:
return. This is used as part of the heuristic to decide if we
should back paginate.
"""
+ # Starting the processing time here so we can include the room backfill
+ # linearizer lock queue in the timing
+ processing_start_time = self.clock.time_msec()
+
async with self._room_backfill.queue(room_id):
- return await self._maybe_backfill_inner(room_id, current_depth, limit)
+ return await self._maybe_backfill_inner(
+ room_id,
+ current_depth,
+ limit,
+ processing_start_time=processing_start_time,
+ )
async def _maybe_backfill_inner(
- self, room_id: str, current_depth: int, limit: int
+ self,
+ room_id: str,
+ current_depth: int,
+ limit: int,
+ *,
+ processing_start_time: int,
) -> bool:
+ """
+ Checks whether the `current_depth` is at or approaching any backfill
+ points in the room and if so, will backfill. We only care about
+ checking backfill points that happened before the `current_depth`
+ (meaning less than or equal to the `current_depth`).
+
+ Args:
+ room_id: The room to backfill in.
+ current_depth: The depth to check at for any upcoming backfill points.
+ limit: The max number of events to request from the remote federated server.
+ processing_start_time: The time when `maybe_backfill` started
+ processing. Only used for timing.
+ """
backwards_extremities = [
_BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
@@ -368,23 +391,29 @@ class FederationHandler:
logger.debug(
"_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
)
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "extremities_to_request",
+ str(extremities_to_request),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "extremities_to_request.length",
+ str(len(extremities_to_request)),
+ )
# Now we need to decide which hosts to hit first.
-
- # First we try hosts that are already in the room
+ # First we try hosts that are already in the room.
# TODO: HEURISTIC ALERT.
+ likely_domains = (
+ await self._storage_controllers.state.get_current_hosts_in_room(room_id)
+ )
- curr_state = await self._storage_controllers.state.get_current_state(room_id)
-
- curr_domains = get_domains_from_state(curr_state)
-
- likely_domains = [
- domain for domain, depth in curr_domains if domain != self.server_name
- ]
-
- async def try_backfill(domains: List[str]) -> bool:
+ async def try_backfill(domains: Collection[str]) -> bool:
# TODO: Should we try multiple of these at a time?
for dom in domains:
+ # We don't want to ask our own server for information we don't have
+ if dom == self.server_name:
+ continue
+
try:
await self._federation_event_handler.backfill(
dom, room_id, limit=100, extremities=extremities_to_request
@@ -423,6 +452,11 @@ class FederationHandler:
return False
+ processing_end_time = self.clock.time_msec()
+ backfill_processing_before_timer.observe(
+ (processing_end_time - processing_start_time) / 1000
+ )
+
success = await try_backfill(likely_domains)
if success:
return True
@@ -546,9 +580,9 @@ class FederationHandler:
)
if ret.partial_state:
- # TODO(faster_joins): roll this back if we don't manage to start the
- # background resync (eg process_remote_join fails)
- # https://github.com/matrix-org/synapse/issues/12998
+ # Mark the room as having partial state.
+ # The background process is responsible for unmarking this flag,
+ # even if the join fails.
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
try:
@@ -574,17 +608,21 @@ class FederationHandler:
room_id,
)
raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
-
- if ret.partial_state:
- # Kick off the process of asynchronously fetching the state for this
- # room.
- run_as_background_process(
- desc="sync_partial_state_room",
- func=self._sync_partial_state_room,
- initial_destination=origin,
- other_destinations=ret.servers_in_room,
- room_id=room_id,
- )
+ finally:
+ # Always kick off the background process that asynchronously fetches
+ # state for the room.
+ # If the join failed, the background process is responsible for
+ # cleaning up — including unmarking the room as a partial state room.
+ if ret.partial_state:
+ # Kick off the process of asynchronously fetching the state for this
+ # room.
+ run_as_background_process(
+ desc="sync_partial_state_room",
+ func=self._sync_partial_state_room,
+ initial_destination=origin,
+ other_destinations=ret.servers_in_room,
+ room_id=room_id,
+ )
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
@@ -748,6 +786,23 @@ class FederationHandler:
# (and return a 404 otherwise)
room_version = await self.store.get_room_version(room_id)
+ if await self.store.is_partial_state_room(room_id):
+ # If our server is still only partially joined, we can't give a complete
+ # response to /make_join, so return a 404 as we would if we weren't in the
+ # room at all.
+ # The main reason we can't respond properly is that we need to know about
+ # the auth events for the join event that we would return.
+ # We also should not bother entertaining the /make_join since we cannot
+ # handle the /send_join.
+ logger.info(
+ "Rejecting /make_join to %s because it's a partial state room", room_id
+ )
+ raise SynapseError(
+ 404,
+ "Unable to handle /make_join right now; this server is not fully joined.",
+ errcode=Codes.NOT_FOUND,
+ )
+
# now check that we are *still* in the room
is_in_room = await self._event_auth_handler.check_host_in_room(
room_id, self.server_name
@@ -1058,6 +1113,8 @@ class FederationHandler:
return event
+ @trace
+ @tag_args
async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
"""Returns the state at the event. i.e. not including said event."""
event = await self.store.get_event(event_id, check_room_id=room_id)
@@ -1539,15 +1596,16 @@ class FederationHandler:
# Make an infinite iterator of destinations to try. Once we find a working
# destination, we'll stick with it until it flakes.
+ destinations: Collection[str]
if initial_destination is not None:
# Move `initial_destination` to the front of the list.
destinations = list(other_destinations)
if initial_destination in destinations:
destinations.remove(initial_destination)
destinations = [initial_destination] + destinations
- destination_iter = itertools.cycle(destinations)
else:
- destination_iter = itertools.cycle(other_destinations)
+ destinations = other_destinations
+ destination_iter = itertools.cycle(destinations)
# `destination` is the current remote homeserver we're pulling from.
destination = next(destination_iter)
|