summary refs log tree commit diff
path: root/synapse/handlers/federation_event.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation_event.py')
-rw-r--r--synapse/handlers/federation_event.py70
1 files changed, 62 insertions, 8 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9a08618da5..42141d3670 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -88,7 +88,7 @@ from synapse.types import (
 )
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter
+from synapse.util.iterutils import batch_iter, partition
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.stringutils import shortstr
 
@@ -865,7 +865,7 @@ class FederationEventHandler:
             [event.event_id for event in events]
         )
 
-        new_events = []
+        new_events: List[EventBase] = []
         for event in events:
             event_id = event.event_id
 
@@ -895,12 +895,66 @@ class FederationEventHandler:
             str(len(new_events)),
         )
 
-        # We want to sort these by depth so we process them and
-        # tell clients about them in order.
-        sorted_events = sorted(new_events, key=lambda x: x.depth)
-        for ev in sorted_events:
-            with nested_logging_context(ev.event_id):
-                await self._process_pulled_event(origin, ev, backfilled=backfilled)
+        @trace
+        async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
+            # We want to sort these by depth so we process them and tell clients about
+            # them in order. It's also more efficient to backfill this way (`depth`
+            # ascending) because one backfill event is likely to be the `prev_event` of
+            # the next event we're going to process.
+            sorted_events = sorted(new_events, key=lambda x: x.depth)
+            for ev in sorted_events:
+                with nested_logging_context(ev.event_id):
+                    await self._process_pulled_event(origin, ev, backfilled=backfilled)
+
+        # Check if we've already tried to process these events at some point in the
+        # past. We aren't concerned with the expontntial backoff here, just whether it
+        # has failed to be processed before.
+        event_ids_with_failed_pull_attempts = (
+            await self._store.get_event_ids_with_failed_pull_attempts(
+                [event.event_id for event in new_events]
+            )
+        )
+
+        # We construct the event lists in source order from `/backfill` response because
+        # it's a) easiest, but also b) the order in which we process things matters for
+        # MSC2716 historical batches because many historical events are all at the same
+        # `depth` and we rely on the tenuous sort that the other server gave us and hope
+        # they're doing their best. The brittle nature of this ordering for historical
+        # messages over federation is one of the reasons why we don't want to continue
+        # on MSC2716 until we have online topological ordering.
+        events_with_failed_pull_attempts, fresh_events = partition(
+            new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts
+        )
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts",
+            str(event_ids_with_failed_pull_attempts),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length",
+            str(len(events_with_failed_pull_attempts)),
+        )
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "fresh_events",
+            str([event.event_id for event in fresh_events]),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "fresh_events.length",
+            str(len(fresh_events)),
+        )
+
+        # Process previously failed backfill events in the background to not waste
+        # time on something that is likely to fail again.
+        if len(events_with_failed_pull_attempts) > 0:
+            run_as_background_process(
+                "_process_new_pulled_events_with_failed_pull_attempts",
+                _process_new_pulled_events,
+                events_with_failed_pull_attempts,
+            )
+
+        # We can optimistically try to process and wait for the event to be fully
+        # persisted if we've never tried before.
+        if len(fresh_events) > 0:
+            await _process_new_pulled_events(fresh_events)
 
     @trace
     @tag_args