diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 91d1439191..f7223b03c3 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -29,7 +29,7 @@ from typing import (
Tuple,
)
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
from synapse import event_auth
from synapse.api.constants import (
@@ -44,6 +44,7 @@ from synapse.api.errors import (
AuthError,
Codes,
FederationError,
+ FederationPullAttemptBackoffError,
HttpResponseException,
RequestSendFailed,
SynapseError,
@@ -57,8 +58,15 @@ from synapse.event_auth import (
)
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
-from synapse.federation.federation_client import InvalidResponseError
+from synapse.federation.federation_client import InvalidResponseError, PulledPduInfo
from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import (
+ SynapseTags,
+ set_tag,
+ start_active_span,
+ tag_args,
+ trace,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
@@ -91,6 +99,36 @@ soft_failed_event_counter = Counter(
"Events received over federation that we marked as soft_failed",
)
+# Added to debug performance and track progress on optimizations
+backfill_processing_after_timer = Histogram(
+ "synapse_federation_backfill_processing_after_time_seconds",
+ "sec",
+ [],
+ buckets=(
+ 0.1,
+ 0.25,
+ 0.5,
+ 1.0,
+ 2.5,
+ 5.0,
+ 7.5,
+ 10.0,
+ 15.0,
+ 20.0,
+ 25.0,
+ 30.0,
+ 40.0,
+ 50.0,
+ 60.0,
+ 80.0,
+ 100.0,
+ 120.0,
+ 150.0,
+ 180.0,
+ "+Inf",
+ ),
+)
+
class FederationEventHandler:
"""Handles events that originated from federation.
@@ -201,7 +239,7 @@ class FederationEventHandler:
#
# Note that if we were never in the room then we would have already
# dropped the event, since we wouldn't know the room version.
- is_in_room = await self._event_auth_handler.check_host_in_room(
+ is_in_room = await self._event_auth_handler.is_host_in_room(
room_id, self._server_name
)
if not is_in_room:
@@ -377,7 +415,9 @@ class FederationEventHandler:
# First, precalculate the joined hosts so that the federation sender doesn't
# need to.
- await self._event_creation_handler.cache_joined_hosts_for_event(event, context)
+ await self._event_creation_handler.cache_joined_hosts_for_events(
+ [(event, context)]
+ )
await self._check_for_soft_fail(event, context=context, origin=origin)
await self._run_push_actions_and_persist_event(event, context)
@@ -409,6 +449,7 @@ class FederationEventHandler:
prev_member_event,
)
+ @trace
async def process_remote_join(
self,
origin: str,
@@ -527,6 +568,9 @@ class FederationEventHandler:
event: partial-state event to be de-partial-stated
Raises:
+ FederationPullAttemptBackoffError if we are are deliberately not attempting
+ to pull the given event over federation because we've already done so
+ recently and are backing off.
FederationError if we fail to request state from the remote server.
"""
logger.info("Updating state for %s", event.event_id)
@@ -566,6 +610,7 @@ class FederationEventHandler:
event.event_id
)
+ @trace
async def backfill(
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> None:
@@ -595,21 +640,23 @@ class FederationEventHandler:
if not events:
return
- # if there are any events in the wrong room, the remote server is buggy and
- # should not be trusted.
- for ev in events:
- if ev.room_id != room_id:
- raise InvalidResponseError(
- f"Remote server {dest} returned event {ev.event_id} which is in "
- f"room {ev.room_id}, when we were backfilling in {room_id}"
- )
+ with backfill_processing_after_timer.time():
+ # if there are any events in the wrong room, the remote server is buggy and
+ # should not be trusted.
+ for ev in events:
+ if ev.room_id != room_id:
+ raise InvalidResponseError(
+ f"Remote server {dest} returned event {ev.event_id} which is in "
+ f"room {ev.room_id}, when we were backfilling in {room_id}"
+ )
- await self._process_pulled_events(
- dest,
- events,
- backfilled=True,
- )
+ await self._process_pulled_events(
+ dest,
+ events,
+ backfilled=True,
+ )
+ @trace
async def _get_missing_events_for_pdu(
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
) -> None:
@@ -710,8 +757,9 @@ class FederationEventHandler:
logger.info("Got %d prev_events", len(missing_events))
await self._process_pulled_events(origin, missing_events, backfilled=False)
+ @trace
async def _process_pulled_events(
- self, origin: str, events: Iterable[EventBase], backfilled: bool
+ self, origin: str, events: Collection[EventBase], backfilled: bool
) -> None:
"""Process a batch of events we have pulled from a remote server
@@ -726,6 +774,15 @@ class FederationEventHandler:
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ str([event.event_id for event in events]),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+ str(len(events)),
+ )
+ set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
@@ -741,13 +798,48 @@ class FederationEventHandler:
],
)
+ # Check if we already any of these have these events.
+ # Note: we currently make a lookup in the database directly here rather than
+ # checking the event cache, due to:
+ # https://github.com/matrix-org/synapse/issues/13476
+ existing_events_map = await self._store._get_events_from_db(
+ [event.event_id for event in events]
+ )
+
+ new_events = []
+ for event in events:
+ event_id = event.event_id
+
+ # If we've already seen this event ID...
+ if event_id in existing_events_map:
+ existing_event = existing_events_map[event_id]
+
+ # ...and the event itself was not previously stored as an outlier...
+ if not existing_event.event.internal_metadata.is_outlier():
+ # ...then there's no need to persist it. We have it already.
+ logger.info(
+ "_process_pulled_event: Ignoring received event %s which we "
+ "have already seen",
+ event.event_id,
+ )
+ continue
+
+ # While we have seen this event before, it was stored as an outlier.
+ # We'll now persist it as a non-outlier.
+ logger.info("De-outliering event %s", event_id)
+
+ # Continue on with the events that are new to us.
+ new_events.append(event)
+
# We want to sort these by depth so we process them and
# tell clients about them in order.
- sorted_events = sorted(events, key=lambda x: x.depth)
+ sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
+ @trace
+ @tag_args
async def _process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None:
@@ -793,22 +885,13 @@ class FederationEventHandler:
event_id = event.event_id
- existing = await self._store.get_event(
- event_id, allow_none=True, allow_rejected=True
- )
- if existing:
- if not existing.internal_metadata.is_outlier():
- logger.info(
- "_process_pulled_event: Ignoring received event %s which we have already seen",
- event_id,
- )
- return
- logger.info("De-outliering event %s", event_id)
-
try:
self._sanity_check_event(event)
except SynapseError as err:
logger.warning("Event %s failed sanity check: %s", event_id, err)
+ await self._store.record_event_failed_pull_attempt(
+ event.room_id, event_id, str(err)
+ )
return
try:
@@ -843,12 +926,29 @@ class FederationEventHandler:
context,
backfilled=backfilled,
)
+ except FederationPullAttemptBackoffError as exc:
+ # Log a warning about why we failed to process the event (the error message
+ # for `FederationPullAttemptBackoffError` is pretty good)
+ logger.warning("_process_pulled_event: %s", exc)
+ # We do not record a failed pull attempt when we backoff fetching a missing
+ # `prev_event` because not being able to fetch the `prev_events` just means
+ # we won't be able to de-outlier the pulled event. But we can still use an
+ # `outlier` in the state/auth chain for another event. So we shouldn't stop
+ # a downstream event from trying to pull it.
+ #
+ # This avoids a cascade of backoff for all events in the DAG downstream from
+ # one event backoff upstream.
except FederationError as e:
+ await self._store.record_event_failed_pull_attempt(
+ event.room_id, event_id, str(e)
+ )
+
if e.code == 403:
logger.warning("Pulled event %s failed history check.", event_id)
else:
raise
+ @trace
async def _compute_event_context_with_maybe_missing_prevs(
self, dest: str, event: EventBase
) -> EventContext:
@@ -884,6 +984,9 @@ class FederationEventHandler:
The event context.
Raises:
+ FederationPullAttemptBackoffError if we are are deliberately not attempting
+ to pull the given event over federation because we've already done so
+ recently and are backing off.
FederationError if we fail to get the state from the remote server after any
missing `prev_event`s.
"""
@@ -894,6 +997,18 @@ class FederationEventHandler:
seen = await self._store.have_events_in_timeline(prevs)
missing_prevs = prevs - seen
+ # If we've already recently attempted to pull this missing event, don't
+ # try it again so soon. Since we have to fetch all of the prev_events, we can
+ # bail early here if we find any to ignore.
+ prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
+ room_id, missing_prevs
+ )
+ if len(prevs_to_ignore) > 0:
+ raise FederationPullAttemptBackoffError(
+ event_ids=prevs_to_ignore,
+ message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
+ )
+
if not missing_prevs:
return await self._state_handler.compute_event_context(event)
@@ -950,10 +1065,9 @@ class FederationEventHandler:
state_res_store=StateResolutionStore(self._store),
)
- except Exception:
+ except Exception as e:
logger.warning(
- "Error attempting to resolve state at missing prev_events",
- exc_info=True,
+ "Error attempting to resolve state at missing prev_events: %s", e
)
raise FederationError(
"ERROR",
@@ -965,6 +1079,8 @@ class FederationEventHandler:
event, state_ids_before_event=state_map, partial_state=partial_state
)
+ @trace
+ @tag_args
async def _get_state_ids_after_missing_prev_event(
self,
destination: str,
@@ -985,6 +1101,14 @@ class FederationEventHandler:
InvalidResponseError: if the remote homeserver's response contains fields
of the wrong type.
"""
+
+ # It would be better if we could query the difference from our known
+ # state to the given `event_id` so the sending server doesn't have to
+ # send as much and we don't have to process as many events. For example
+ # in a room like #matrix:matrix.org, we get 200k events (77k state_events, 122k
+ # auth_events) from this call.
+ #
+ # Tracked by https://github.com/matrix-org/synapse/issues/13618
(
state_event_ids,
auth_event_ids,
@@ -1004,10 +1128,10 @@ class FederationEventHandler:
logger.debug("Fetching %i events from cache/store", len(desired_events))
have_events = await self._store.have_seen_events(room_id, desired_events)
- missing_desired_events = desired_events - have_events
+ missing_desired_event_ids = desired_events - have_events
logger.debug(
"We are missing %i events (got %i)",
- len(missing_desired_events),
+ len(missing_desired_event_ids),
len(have_events),
)
@@ -1019,13 +1143,30 @@ class FederationEventHandler:
# already have a bunch of the state events. It would be nice if the
# federation api gave us a way of finding out which we actually need.
- missing_auth_events = set(auth_event_ids) - have_events
- missing_auth_events.difference_update(
- await self._store.have_seen_events(room_id, missing_auth_events)
+ missing_auth_event_ids = set(auth_event_ids) - have_events
+ missing_auth_event_ids.difference_update(
+ await self._store.have_seen_events(room_id, missing_auth_event_ids)
)
- logger.debug("We are also missing %i auth events", len(missing_auth_events))
+ logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))
- missing_events = missing_desired_events | missing_auth_events
+ missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
+
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_auth_event_ids",
+ str(missing_auth_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length",
+ str(len(missing_auth_event_ids)),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_desired_event_ids",
+ str(missing_desired_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length",
+ str(len(missing_desired_event_ids)),
+ )
# Making an individual request for each of 1000s of events has a lot of
# overhead. On the other hand, we don't really want to fetch all of the events
@@ -1036,13 +1177,13 @@ class FederationEventHandler:
#
# TODO: might it be better to have an API which lets us do an aggregate event
# request
- if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
+ if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
logger.debug("Requesting complete state from remote")
await self._get_state_and_persist(destination, room_id, event_id)
else:
- logger.debug("Fetching %i events from remote", len(missing_events))
+ logger.debug("Fetching %i events from remote", len(missing_event_ids))
await self._get_events_and_persist(
- destination=destination, room_id=room_id, event_ids=missing_events
+ destination=destination, room_id=room_id, event_ids=missing_event_ids
)
# We now need to fill out the state map, which involves fetching the
@@ -1099,6 +1240,14 @@ class FederationEventHandler:
event_id,
failed_to_fetch,
)
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "failed_to_fetch",
+ str(failed_to_fetch),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "failed_to_fetch.length",
+ str(len(failed_to_fetch)),
+ )
if remote_event.is_state() and remote_event.rejected_reason is None:
state_map[
@@ -1107,6 +1256,8 @@ class FederationEventHandler:
return state_map
+ @trace
+ @tag_args
async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1128,6 +1279,7 @@ class FederationEventHandler:
destination=destination, room_id=room_id, event_ids=(event_id,)
)
+ @trace
async def _process_received_pdu(
self,
origin: str,
@@ -1278,6 +1430,7 @@ class FederationEventHandler:
except Exception:
logger.exception("Failed to resync device for %s", sender)
+ @trace
async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
@@ -1309,7 +1462,7 @@ class FederationEventHandler:
logger.debug("_handle_marker_event: received %s", marker_event)
insertion_event_id = marker_event.content.get(
- EventContentFields.MSC2716_MARKER_INSERTION
+ EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
)
if insertion_event_id is None:
@@ -1363,8 +1516,8 @@ class FederationEventHandler:
)
async def backfill_event_id(
- self, destination: str, room_id: str, event_id: str
- ) -> EventBase:
+ self, destinations: List[str], room_id: str, event_id: str
+ ) -> PulledPduInfo:
"""Backfill a single event and persist it as a non-outlier which means
we also pull in all of the state and auth events necessary for it.
@@ -1376,24 +1529,21 @@ class FederationEventHandler:
Raises:
FederationError if we are unable to find the event from the destination
"""
- logger.info(
- "backfill_event_id: event_id=%s from destination=%s", event_id, destination
- )
+ logger.info("backfill_event_id: event_id=%s", event_id)
room_version = await self._store.get_room_version(room_id)
- event_from_response = await self._federation_client.get_pdu(
- [destination],
+ pulled_pdu_info = await self._federation_client.get_pdu(
+ destinations,
event_id,
room_version,
)
- if not event_from_response:
+ if not pulled_pdu_info:
raise FederationError(
"ERROR",
404,
- "Unable to find event_id=%s from destination=%s to backfill."
- % (event_id, destination),
+ f"Unable to find event_id={event_id} from remote servers to backfill.",
affected=event_id,
)
@@ -1401,14 +1551,16 @@ class FederationEventHandler:
# and auth events to de-outlier it. This also sets up the necessary
# `state_groups` for the event.
await self._process_pulled_events(
- destination,
- [event_from_response],
+ pulled_pdu_info.pull_origin,
+ [pulled_pdu_info.pdu],
# Prevent notifications going to clients
backfilled=True,
)
- return event_from_response
+ return pulled_pdu_info
+ @trace
+ @tag_args
async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
@@ -1428,19 +1580,19 @@ class FederationEventHandler:
async def get_event(event_id: str) -> None:
with nested_logging_context(event_id):
try:
- event = await self._federation_client.get_pdu(
+ pulled_pdu_info = await self._federation_client.get_pdu(
[destination],
event_id,
room_version,
)
- if event is None:
+ if pulled_pdu_info is None:
logger.warning(
"Server %s didn't return event %s",
destination,
event_id,
)
return
- events.append(event)
+ events.append(pulled_pdu_info.pdu)
except Exception as e:
logger.warning(
@@ -1454,6 +1606,7 @@ class FederationEventHandler:
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_outliers(room_id, events)
+ @trace
async def _auth_and_persist_outliers(
self, room_id: str, events: Iterable[EventBase]
) -> None:
@@ -1472,6 +1625,16 @@ class FederationEventHandler:
"""
event_map = {event.event_id: event for event in events}
+ event_ids = event_map.keys()
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ str(event_ids),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+ str(len(event_ids)),
+ )
+
# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
@@ -1588,6 +1751,7 @@ class FederationEventHandler:
backfilled=True,
)
+ @trace
async def _check_event_auth(
self, origin: Optional[str], event: EventBase, context: EventContext
) -> None:
@@ -1626,6 +1790,14 @@ class FederationEventHandler:
claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
origin, event
)
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "claimed_auth_events",
+ str([ev.event_id for ev in claimed_auth_events]),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "claimed_auth_events.length",
+ str(len(claimed_auth_events)),
+ )
# ... and check that the event passes auth at those auth events.
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1723,6 +1895,7 @@ class FederationEventHandler:
)
context.rejected = RejectedReason.AUTH_ERROR
+ @trace
async def _maybe_kick_guest_users(self, event: EventBase) -> None:
if event.type != EventTypes.GuestAccess:
return
@@ -1930,6 +2103,8 @@ class FederationEventHandler:
# instead we raise an AuthError, which will make the caller ignore it.
raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
+ @trace
+ @tag_args
async def _get_remote_auth_chain_for_event(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1958,6 +2133,7 @@ class FederationEventHandler:
await self._auth_and_persist_outliers(room_id, remote_auth_events)
+ @trace
async def _run_push_actions_and_persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> None:
@@ -1991,8 +2167,8 @@ class FederationEventHandler:
min_depth,
)
else:
- await self._bulk_push_rule_evaluator.action_for_event_by_user(
- event, context
+ await self._bulk_push_rule_evaluator.action_for_events_by_user(
+ [(event, context)]
)
try:
@@ -2034,6 +2210,7 @@ class FederationEventHandler:
if instance != self._instance_name:
# Limit the number of events sent over replication. We choose 200
# here as that is what we default to in `max_request_body_size(..)`
+ result = {}
try:
for batch in batch_iter(event_and_contexts, 200):
result = await self._send_events(
@@ -2066,8 +2243,17 @@ class FederationEventHandler:
self._message_handler.maybe_schedule_expiry(event)
if not backfilled: # Never notify for backfilled events
- for event in events:
- await self._notify_persisted_event(event, max_stream_token)
+ with start_active_span("notify_persisted_events"):
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "event_ids",
+ str([ev.event_id for ev in events]),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "event_ids.length",
+ str(len(events)),
+ )
+ for event in events:
+ await self._notify_persisted_event(event, max_stream_token)
return max_stream_token.stream
@@ -2104,8 +2290,8 @@ class FederationEventHandler:
event_pos = PersistedEventPosition(
self._instance_name, event.internal_metadata.stream_ordering
)
- await self._notifier.on_new_room_event(
- event, event_pos, max_stream_token, extra_users=extra_users
+ await self._notifier.on_new_room_events(
+ [(event, event_pos)], max_stream_token, extra_users=extra_users
)
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
|