diff --git a/changelog.d/15585.feature b/changelog.d/15585.feature
new file mode 100644
index 0000000000..1adcfb69ee
--- /dev/null
+++ b/changelog.d/15585.feature
@@ -0,0 +1 @@
+Process previously failed backfill events in the background to avoid blocking requests for something that is bound to fail again.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9a08618da5..42141d3670 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -88,7 +88,7 @@ from synapse.types import (
)
from synapse.types.state import StateFilter
from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter
+from synapse.util.iterutils import batch_iter, partition
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
@@ -865,7 +865,7 @@ class FederationEventHandler:
[event.event_id for event in events]
)
- new_events = []
+ new_events: List[EventBase] = []
for event in events:
event_id = event.event_id
@@ -895,12 +895,66 @@ class FederationEventHandler:
str(len(new_events)),
)
- # We want to sort these by depth so we process them and
- # tell clients about them in order.
- sorted_events = sorted(new_events, key=lambda x: x.depth)
- for ev in sorted_events:
- with nested_logging_context(ev.event_id):
- await self._process_pulled_event(origin, ev, backfilled=backfilled)
+ @trace
+ async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
+ # We want to sort these by depth so we process them and tell clients about
+ # them in order. It's also more efficient to backfill this way (`depth`
+ # ascending) because one backfill event is likely to be the `prev_event` of
+ # the next event we're going to process.
+ sorted_events = sorted(new_events, key=lambda x: x.depth)
+ for ev in sorted_events:
+ with nested_logging_context(ev.event_id):
+ await self._process_pulled_event(origin, ev, backfilled=backfilled)
+
+ # Check if we've already tried to process these events at some point in the
+ # past. We aren't concerned with the expontntial backoff here, just whether it
+ # has failed to be processed before.
+ event_ids_with_failed_pull_attempts = (
+ await self._store.get_event_ids_with_failed_pull_attempts(
+ [event.event_id for event in new_events]
+ )
+ )
+
+ # We construct the event lists in source order from `/backfill` response because
+ # it's a) easiest, but also b) the order in which we process things matters for
+ # MSC2716 historical batches because many historical events are all at the same
+ # `depth` and we rely on the tenuous sort that the other server gave us and hope
+ # they're doing their best. The brittle nature of this ordering for historical
+ # messages over federation is one of the reasons why we don't want to continue
+ # on MSC2716 until we have online topological ordering.
+ events_with_failed_pull_attempts, fresh_events = partition(
+ new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts",
+ str(event_ids_with_failed_pull_attempts),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length",
+ str(len(events_with_failed_pull_attempts)),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "fresh_events",
+ str([event.event_id for event in fresh_events]),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "fresh_events.length",
+ str(len(fresh_events)),
+ )
+
+ # Process previously failed backfill events in the background to not waste
+ # time on something that is likely to fail again.
+ if len(events_with_failed_pull_attempts) > 0:
+ run_as_background_process(
+ "_process_new_pulled_events_with_failed_pull_attempts",
+ _process_new_pulled_events,
+ events_with_failed_pull_attempts,
+ )
+
+ # We can optimistically try to process and wait for the event to be fully
+ # persisted if we've never tried before.
+ if len(fresh_events) > 0:
+ await _process_new_pulled_events(fresh_events)
@trace
@tag_args
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ac19de183c..2681917d0b 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -46,7 +46,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.signatures import SignatureWorkerStore
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
-from synapse.types import JsonDict
+from synapse.types import JsonDict, StrCollection
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
@@ -1584,6 +1584,35 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
@trace
+ async def get_event_ids_with_failed_pull_attempts(
+ self, event_ids: StrCollection
+ ) -> Set[str]:
+ """
+ Filter the given list of `event_ids` and return events which have any failed
+ pull attempts.
+
+ Args:
+ event_ids: A list of events to filter down.
+
+ Returns:
+ A filtered down list of `event_ids` that have previous failed pull attempts.
+ """
+
+ rows = await self.db_pool.simple_select_many_batch(
+ table="event_failed_pull_attempts",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=("event_id",),
+ desc="get_event_ids_with_failed_pull_attempts",
+ )
+ event_ids_with_failed_pull_attempts: Set[str] = {
+ row["event_id"] for row in rows
+ }
+
+ return event_ids_with_failed_pull_attempts
+
+ @trace
async def get_event_ids_to_not_pull_from_backoff(
self,
room_id: str,
diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index 4938ddf703..a0efb96d3b 100644
--- a/synapse/util/iterutils.py
+++ b/synapse/util/iterutils.py
@@ -15,11 +15,13 @@
import heapq
from itertools import islice
from typing import (
+ Callable,
Collection,
Dict,
Generator,
Iterable,
Iterator,
+ List,
Mapping,
Set,
Sized,
@@ -71,6 +73,31 @@ def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen))
+def partition(
+ iterable: Iterable[T], predicate: Callable[[T], bool]
+) -> Tuple[List[T], List[T]]:
+ """
+ Separate a given iterable into two lists based on the result of a predicate function.
+
+ Args:
+ iterable: the iterable to partition (separate)
+ predicate: a function that takes an item from the iterable and returns a boolean
+
+ Returns:
+ A tuple of two lists, the first containing all items for which the predicate
+ returned True, the second containing all items for which the predicate returned
+ False
+ """
+ true_results = []
+ false_results = []
+ for item in iterable:
+ if predicate(item):
+ true_results.append(item)
+ else:
+ false_results.append(item)
+ return true_results, false_results
+
+
def sorted_topologically(
nodes: Iterable[T],
graph: Mapping[T, Collection[T]],
diff --git a/tests/handlers/test_federation_event.py b/tests/handlers/test_federation_event.py
index c067e5bfe3..23f1b33b2f 100644
--- a/tests/handlers/test_federation_event.py
+++ b/tests/handlers/test_federation_event.py
@@ -664,6 +664,101 @@ class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
StoreError,
)
+ def test_backfill_process_previously_failed_pull_attempt_event_in_the_background(
+ self,
+ ) -> None:
+ """
+ Sanity check that events are still processed even if it is in the background
+ for events that already have failed pull attempts.
+ """
+ OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
+ main_store = self.hs.get_datastores().main
+
+ # Create the room
+ user_id = self.register_user("kermit", "test")
+ tok = self.login("kermit", "test")
+ room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+ room_version = self.get_success(main_store.get_room_version(room_id))
+
+ # Allow the remote user to send state events
+ self.helper.send_state(
+ room_id,
+ "m.room.power_levels",
+ {"events_default": 0, "state_default": 0},
+ tok=tok,
+ )
+
+ # Add the remote user to the room
+ member_event = self.get_success(
+ event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
+ )
+
+ initial_state_map = self.get_success(
+ main_store.get_partial_current_state_ids(room_id)
+ )
+
+ auth_event_ids = [
+ initial_state_map[("m.room.create", "")],
+ initial_state_map[("m.room.power_levels", "")],
+ member_event.event_id,
+ ]
+
+ # Create a regular event that should process
+ pulled_event = make_event_from_dict(
+ self.add_hashes_and_signatures_from_other_server(
+ {
+ "type": "test_regular_type",
+ "room_id": room_id,
+ "sender": OTHER_USER,
+ "prev_events": [
+ member_event.event_id,
+ ],
+ "auth_events": auth_event_ids,
+ "origin_server_ts": 1,
+ "depth": 12,
+ "content": {"body": "pulled_event"},
+ }
+ ),
+ room_version,
+ )
+
+ # Record a failed pull attempt for this event which will cause us to backfill it
+ # in the background from here on out.
+ self.get_success(
+ main_store.record_event_failed_pull_attempt(
+ room_id, pulled_event.event_id, "fake cause"
+ )
+ )
+
+ # We expect an outbound request to /backfill, so stub that out
+ self.mock_federation_transport_client.backfill.return_value = make_awaitable(
+ {
+ "origin": self.OTHER_SERVER_NAME,
+ "origin_server_ts": 123,
+ "pdus": [
+ pulled_event.get_pdu_json(),
+ ],
+ }
+ )
+
+ # The function under test: try to backfill and process the pulled event
+ with LoggingContext("test"):
+ self.get_success(
+ self.hs.get_federation_event_handler().backfill(
+ self.OTHER_SERVER_NAME,
+ room_id,
+ limit=1,
+ extremities=["$some_extremity"],
+ )
+ )
+
+ # Ensure `run_as_background_process(...)` has a chance to run (essentially
+ # `wait_for_background_processes()`)
+ self.reactor.pump((0.1,))
+
+ # Make sure we processed and persisted the pulled event
+ self.get_success(main_store.get_event(pulled_event.event_id, allow_none=False))
+
def test_process_pulled_event_with_rejected_missing_state(self) -> None:
"""Ensure that we correctly handle pulled events with missing state containing a
rejected state event
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 81e50bdd55..4b8d8328d7 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -1134,6 +1134,43 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
self.assertEqual(backfill_event_ids, ["insertion_eventA"])
+ def test_get_event_ids_with_failed_pull_attempts(self) -> None:
+ """
+ Test to make sure we properly get event_ids based on whether they have any
+ failed pull attempts.
+ """
+ # Create the room
+ user_id = self.register_user("alice", "test")
+ tok = self.login("alice", "test")
+ room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
+
+ self.get_success(
+ self.store.record_event_failed_pull_attempt(
+ room_id, "$failed_event_id1", "fake cause"
+ )
+ )
+ self.get_success(
+ self.store.record_event_failed_pull_attempt(
+ room_id, "$failed_event_id2", "fake cause"
+ )
+ )
+
+ event_ids_with_failed_pull_attempts = self.get_success(
+ self.store.get_event_ids_with_failed_pull_attempts(
+ event_ids=[
+ "$failed_event_id1",
+ "$fresh_event_id1",
+ "$failed_event_id2",
+ "$fresh_event_id2",
+ ]
+ )
+ )
+
+ self.assertEqual(
+ event_ids_with_failed_pull_attempts,
+ {"$failed_event_id1", "$failed_event_id2"},
+ )
+
def test_get_event_ids_to_not_pull_from_backoff(self) -> None:
"""
Test to make sure only event IDs we should backoff from are returned.
|