diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index da319943cc..66aca2f864 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -43,7 +43,9 @@ from synapse.api.constants import (
from synapse.api.errors import (
AuthError,
Codes,
+ EventSizeError,
FederationError,
+ FederationPullAttemptBackoffError,
HttpResponseException,
RequestSendFailed,
SynapseError,
@@ -57,7 +59,7 @@ from synapse.event_auth import (
)
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
-from synapse.federation.federation_client import InvalidResponseError
+from synapse.federation.federation_client import InvalidResponseError, PulledPduInfo
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import (
SynapseTags,
@@ -74,7 +76,6 @@ from synapse.replication.http.federation import (
from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
-from synapse.storage.state import StateFilter
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
@@ -82,6 +83,7 @@ from synapse.types import (
UserID,
get_domain_from_id,
)
+from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
@@ -414,7 +416,9 @@ class FederationEventHandler:
# First, precalculate the joined hosts so that the federation sender doesn't
# need to.
- await self._event_creation_handler.cache_joined_hosts_for_event(event, context)
+ await self._event_creation_handler.cache_joined_hosts_for_events(
+ [(event, context)]
+ )
await self._check_for_soft_fail(event, context=context, origin=origin)
await self._run_push_actions_and_persist_event(event, context)
@@ -565,6 +569,9 @@ class FederationEventHandler:
event: partial-state event to be de-partial-stated
Raises:
+ FederationPullAttemptBackoffError if we are are deliberately not attempting
+ to pull the given event over federation because we've already done so
+ recently and are backing off.
FederationError if we fail to request state from the remote server.
"""
logger.info("Updating state for %s", event.event_id)
@@ -792,9 +799,42 @@ class FederationEventHandler:
],
)
+ # Check if we already any of these have these events.
+ # Note: we currently make a lookup in the database directly here rather than
+ # checking the event cache, due to:
+ # https://github.com/matrix-org/synapse/issues/13476
+ existing_events_map = await self._store._get_events_from_db(
+ [event.event_id for event in events]
+ )
+
+ new_events = []
+ for event in events:
+ event_id = event.event_id
+
+ # If we've already seen this event ID...
+ if event_id in existing_events_map:
+ existing_event = existing_events_map[event_id]
+
+ # ...and the event itself was not previously stored as an outlier...
+ if not existing_event.event.internal_metadata.is_outlier():
+ # ...then there's no need to persist it. We have it already.
+ logger.info(
+ "_process_pulled_event: Ignoring received event %s which we "
+ "have already seen",
+ event.event_id,
+ )
+ continue
+
+ # While we have seen this event before, it was stored as an outlier.
+ # We'll now persist it as a non-outlier.
+ logger.info("De-outliering event %s", event_id)
+
+ # Continue on with the events that are new to us.
+ new_events.append(event)
+
# We want to sort these by depth so we process them and
# tell clients about them in order.
- sorted_events = sorted(events, key=lambda x: x.depth)
+ sorted_events = sorted(new_events, key=lambda x: x.depth)
for ev in sorted_events:
with nested_logging_context(ev.event_id):
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@@ -846,18 +886,6 @@ class FederationEventHandler:
event_id = event.event_id
- existing = await self._store.get_event(
- event_id, allow_none=True, allow_rejected=True
- )
- if existing:
- if not existing.internal_metadata.is_outlier():
- logger.info(
- "_process_pulled_event: Ignoring received event %s which we have already seen",
- event_id,
- )
- return
- logger.info("De-outliering event %s", event_id)
-
try:
self._sanity_check_event(event)
except SynapseError as err:
@@ -899,6 +927,18 @@ class FederationEventHandler:
context,
backfilled=backfilled,
)
+ except FederationPullAttemptBackoffError as exc:
+ # Log a warning about why we failed to process the event (the error message
+ # for `FederationPullAttemptBackoffError` is pretty good)
+ logger.warning("_process_pulled_event: %s", exc)
+ # We do not record a failed pull attempt when we backoff fetching a missing
+ # `prev_event` because not being able to fetch the `prev_events` just means
+ # we won't be able to de-outlier the pulled event. But we can still use an
+ # `outlier` in the state/auth chain for another event. So we shouldn't stop
+ # a downstream event from trying to pull it.
+ #
+ # This avoids a cascade of backoff for all events in the DAG downstream from
+ # one event backoff upstream.
except FederationError as e:
await self._store.record_event_failed_pull_attempt(
event.room_id, event_id, str(e)
@@ -945,6 +985,9 @@ class FederationEventHandler:
The event context.
Raises:
+ FederationPullAttemptBackoffError if we are are deliberately not attempting
+ to pull the given event over federation because we've already done so
+ recently and are backing off.
FederationError if we fail to get the state from the remote server after any
missing `prev_event`s.
"""
@@ -955,6 +998,18 @@ class FederationEventHandler:
seen = await self._store.have_events_in_timeline(prevs)
missing_prevs = prevs - seen
+ # If we've already recently attempted to pull this missing event, don't
+ # try it again so soon. Since we have to fetch all of the prev_events, we can
+ # bail early here if we find any to ignore.
+ prevs_to_ignore = await self._store.get_event_ids_to_not_pull_from_backoff(
+ room_id, missing_prevs
+ )
+ if len(prevs_to_ignore) > 0:
+ raise FederationPullAttemptBackoffError(
+ event_ids=prevs_to_ignore,
+ message=f"While computing context for event={event_id}, not attempting to pull missing prev_event={prevs_to_ignore[0]} because we already tried to pull recently (backing off).",
+ )
+
if not missing_prevs:
return await self._state_handler.compute_event_context(event)
@@ -1011,10 +1066,9 @@ class FederationEventHandler:
state_res_store=StateResolutionStore(self._store),
)
- except Exception:
+ except Exception as e:
logger.warning(
- "Error attempting to resolve state at missing prev_events",
- exc_info=True,
+ "Error attempting to resolve state at missing prev_events: %s", e
)
raise FederationError(
"ERROR",
@@ -1463,8 +1517,8 @@ class FederationEventHandler:
)
async def backfill_event_id(
- self, destination: str, room_id: str, event_id: str
- ) -> EventBase:
+ self, destinations: List[str], room_id: str, event_id: str
+ ) -> PulledPduInfo:
"""Backfill a single event and persist it as a non-outlier which means
we also pull in all of the state and auth events necessary for it.
@@ -1476,24 +1530,21 @@ class FederationEventHandler:
Raises:
FederationError if we are unable to find the event from the destination
"""
- logger.info(
- "backfill_event_id: event_id=%s from destination=%s", event_id, destination
- )
+ logger.info("backfill_event_id: event_id=%s", event_id)
room_version = await self._store.get_room_version(room_id)
- event_from_response = await self._federation_client.get_pdu(
- [destination],
+ pulled_pdu_info = await self._federation_client.get_pdu(
+ destinations,
event_id,
room_version,
)
- if not event_from_response:
+ if not pulled_pdu_info:
raise FederationError(
"ERROR",
404,
- "Unable to find event_id=%s from destination=%s to backfill."
- % (event_id, destination),
+ f"Unable to find event_id={event_id} from remote servers to backfill.",
affected=event_id,
)
@@ -1501,13 +1552,13 @@ class FederationEventHandler:
# and auth events to de-outlier it. This also sets up the necessary
# `state_groups` for the event.
await self._process_pulled_events(
- destination,
- [event_from_response],
+ pulled_pdu_info.pull_origin,
+ [pulled_pdu_info.pdu],
# Prevent notifications going to clients
backfilled=True,
)
- return event_from_response
+ return pulled_pdu_info
@trace
@tag_args
@@ -1530,19 +1581,19 @@ class FederationEventHandler:
async def get_event(event_id: str) -> None:
with nested_logging_context(event_id):
try:
- event = await self._federation_client.get_pdu(
+ pulled_pdu_info = await self._federation_client.get_pdu(
[destination],
event_id,
room_version,
)
- if event is None:
+ if pulled_pdu_info is None:
logger.warning(
"Server %s didn't return event %s",
destination,
event_id,
)
return
- events.append(event)
+ events.append(pulled_pdu_info.pdu)
except Exception as e:
logger.warning(
@@ -1686,6 +1737,15 @@ class FederationEventHandler:
except AuthError as e:
logger.warning("Rejecting %r because %s", event, e)
context.rejected = RejectedReason.AUTH_ERROR
+ except EventSizeError as e:
+ if e.unpersistable:
+ # This event is completely unpersistable.
+ raise e
+ # Otherwise, we are somewhat lenient and just persist the event
+ # as rejected, for moderate compatibility with older Synapse
+ # versions.
+ logger.warning("While validating received event %r: %s", event, e)
+ context.rejected = RejectedReason.OVERSIZED_EVENT
events_and_contexts_to_persist.append((event, context))
@@ -1731,6 +1791,16 @@ class FederationEventHandler:
# TODO: use a different rejected reason here?
context.rejected = RejectedReason.AUTH_ERROR
return
+ except EventSizeError as e:
+ if e.unpersistable:
+ # This event is completely unpersistable.
+ raise e
+ # Otherwise, we are somewhat lenient and just persist the event
+ # as rejected, for moderate compatibility with older Synapse
+ # versions.
+ logger.warning("While validating received event %r: %s", event, e)
+ context.rejected = RejectedReason.OVERSIZED_EVENT
+ return
# next, check that we have all of the event's auth events.
#
@@ -2117,8 +2187,8 @@ class FederationEventHandler:
min_depth,
)
else:
- await self._bulk_push_rule_evaluator.action_for_event_by_user(
- event, context
+ await self._bulk_push_rule_evaluator.action_for_events_by_user(
+ [(event, context)]
)
try:
|