diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9a08618da5..42141d3670 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -88,7 +88,7 @@ from synapse.types import (
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter
+from synapse.util.iterutils import batch_iter, partition
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
@@ -865,7 +865,7 @@ class FederationEventHandler:
[event.event_id for event in events]
)
- new_events = []
+ new_events: List[EventBase] = []
for event in events:
event_id = event.event_id
@@ -895,12 +895,66 @@ class FederationEventHandler:
str(len(new_events)),
)
- # We want to sort these by depth so we process them and
- # tell clients about them in order.
- sorted_events = sorted(new_events, key=lambda x: x.depth)
- for ev in sorted_events:
- with nested_logging_context(ev.event_id):
- await self._process_pulled_event(origin, ev, backfilled=backfilled)
+ @trace
+ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
+ # We want to sort these by depth so we process them and tell clients about
+ # them in order. It's also more efficient to backfill this way (`depth`
+ # ascending) because one backfill event is likely to be the `prev_event` of
+ # the next event we're going to process.
+ sorted_events = sorted(new_events, key=lambda x: x.depth)
+ for ev in sorted_events:
+ with nested_logging_context(ev.event_id):
+ await self._process_pulled_event(origin, ev, backfilled=backfilled)
+
+ # Check if we've already tried to process these events at some point in the
+ # past. We aren't concerned with the expontntial backoff here, just whether it
+ # has failed to be processed before.
+ event_ids_with_failed_pull_attempts = (
+ await self._store.get_event_ids_with_failed_pull_attempts(
+ [event.event_id for event in new_events]
+ )
+ )
+
+ # We construct the event lists in source order from `/backfill` response because
+ # it's a) easiest, but also b) the order in which we process things matters for
+ # MSC2716 historical batches because many historical events are all at the same
+ # `depth` and we rely on the tenuous sort that the other server gave us and hope
+ # they're doing their best. The brittle nature of this ordering for historical
+ # messages over federation is one of the reasons why we don't want to continue
+ # on MSC2716 until we have online topological ordering.
+ events_with_failed_pull_attempts, fresh_events = partition(
+ new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts",
+ str(event_ids_with_failed_pull_attempts),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length",
+ str(len(events_with_failed_pull_attempts)),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "fresh_events",
+ str([event.event_id for event in fresh_events]),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "fresh_events.length",
+ str(len(fresh_events)),
+ )
+
+ # Process previously failed backfill events in the background to not waste
+ # time on something that is likely to fail again.
+ if len(events_with_failed_pull_attempts) > 0:
+ run_as_background_process(
+ "_process_new_pulled_events_with_failed_pull_attempts",
+ _process_new_pulled_events,
+ events_with_failed_pull_attempts,
+ )
+
+ # We can optimistically try to process and wait for the event to be fully
+ # persisted if we've never tried before.
+ if len(fresh_events) > 0:
+ await _process_new_pulled_events(fresh_events)
@trace
@tag_args
|