summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2023-05-24 23:22:24 -0500
committerGitHub <noreply@github.com>2023-05-24 23:22:24 -0500
commit77156a4bc1f87e98754e3f7f86e52a84a4253a10 (patch)
treecb1c5f894d6a34b92e6bc2404b0b28f654e6ed8f /synapse
parentAdd requesting user id parameter to key claim methods in `TransportLayerClien... (diff)
downloadsynapse-77156a4bc1f87e98754e3f7f86e52a84a4253a10.tar.xz
Process previously failed backfill events in the background (#15585)
Process previously failed backfill events in the background because they are bound to fail again and we don't need to waste time holding up the request for something that is bound to fail again.

Fix https://github.com/matrix-org/synapse/issues/13623

Follow-up to https://github.com/matrix-org/synapse/issues/13621 and https://github.com/matrix-org/synapse/issues/13622

Part of making `/messages` faster: https://github.com/matrix-org/synapse/issues/13356
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation_event.py70
-rw-r--r--synapse/storage/databases/main/event_federation.py31
-rw-r--r--synapse/util/iterutils.py27
3 files changed, 119 insertions, 9 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9a08618da5..42141d3670 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -88,7 +88,7 @@ from synapse.types import (
 )
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter
+from synapse.util.iterutils import batch_iter, partition
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.stringutils import shortstr
 
@@ -865,7 +865,7 @@ class FederationEventHandler:
             [event.event_id for event in events]
         )
 
-        new_events = []
+        new_events: List[EventBase] = []
         for event in events:
             event_id = event.event_id
 
@@ -895,12 +895,66 @@ class FederationEventHandler:
             str(len(new_events)),
         )
 
-        # We want to sort these by depth so we process them and
-        # tell clients about them in order.
-        sorted_events = sorted(new_events, key=lambda x: x.depth)
-        for ev in sorted_events:
-            with nested_logging_context(ev.event_id):
-                await self._process_pulled_event(origin, ev, backfilled=backfilled)
+        @trace
+        async def _process_new_pulled_events(new_events: Collection[EventBase]) -> None:
+            # We want to sort these by depth so we process them and tell clients about
+            # them in order. It's also more efficient to backfill this way (`depth`
+            # ascending) because one backfill event is likely to be the `prev_event` of
+            # the next event we're going to process.
+            sorted_events = sorted(new_events, key=lambda x: x.depth)
+            for ev in sorted_events:
+                with nested_logging_context(ev.event_id):
+                    await self._process_pulled_event(origin, ev, backfilled=backfilled)
+
+        # Check if we've already tried to process these events at some point in the
+        # past. We aren't concerned with the expontntial backoff here, just whether it
+        # has failed to be processed before.
+        event_ids_with_failed_pull_attempts = (
+            await self._store.get_event_ids_with_failed_pull_attempts(
+                [event.event_id for event in new_events]
+            )
+        )
+
+        # We construct the event lists in source order from `/backfill` response because
+        # it's a) easiest, but also b) the order in which we process things matters for
+        # MSC2716 historical batches because many historical events are all at the same
+        # `depth` and we rely on the tenuous sort that the other server gave us and hope
+        # they're doing their best. The brittle nature of this ordering for historical
+        # messages over federation is one of the reasons why we don't want to continue
+        # on MSC2716 until we have online topological ordering.
+        events_with_failed_pull_attempts, fresh_events = partition(
+            new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts
+        )
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "events_with_failed_pull_attempts",
+            str(event_ids_with_failed_pull_attempts),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "events_with_failed_pull_attempts.length",
+            str(len(events_with_failed_pull_attempts)),
+        )
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "fresh_events",
+            str([event.event_id for event in fresh_events]),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "fresh_events.length",
+            str(len(fresh_events)),
+        )
+
+        # Process previously failed backfill events in the background to not waste
+        # time on something that is likely to fail again.
+        if len(events_with_failed_pull_attempts) > 0:
+            run_as_background_process(
+                "_process_new_pulled_events_with_failed_pull_attempts",
+                _process_new_pulled_events,
+                events_with_failed_pull_attempts,
+            )
+
+        # We can optimistically try to process and wait for the event to be fully
+        # persisted if we've never tried before.
+        if len(fresh_events) > 0:
+            await _process_new_pulled_events(fresh_events)
 
     @trace
     @tag_args
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ac19de183c..2681917d0b 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -46,7 +46,7 @@ from synapse.storage.database import (
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.signatures import SignatureWorkerStore
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
-from synapse.types import JsonDict
+from synapse.types import JsonDict, StrCollection
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.lrucache import LruCache
@@ -1584,6 +1584,35 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         txn.execute(sql, (room_id, event_id, 1, self._clock.time_msec(), cause))
 
     @trace
+    async def get_event_ids_with_failed_pull_attempts(
+        self, event_ids: StrCollection
+    ) -> Set[str]:
+        """
+        Filter the given list of `event_ids` and return events which have any failed
+        pull attempts.
+
+        Args:
+            event_ids: A list of events to filter down.
+
+        Returns:
+            A filtered down list of `event_ids` that have previous failed pull attempts.
+        """
+
+        rows = await self.db_pool.simple_select_many_batch(
+            table="event_failed_pull_attempts",
+            column="event_id",
+            iterable=event_ids,
+            keyvalues={},
+            retcols=("event_id",),
+            desc="get_event_ids_with_failed_pull_attempts",
+        )
+        event_ids_with_failed_pull_attempts: Set[str] = {
+            row["event_id"] for row in rows
+        }
+
+        return event_ids_with_failed_pull_attempts
+
+    @trace
     async def get_event_ids_to_not_pull_from_backoff(
         self,
         room_id: str,
diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index 4938ddf703..a0efb96d3b 100644
--- a/synapse/util/iterutils.py
+++ b/synapse/util/iterutils.py
@@ -15,11 +15,13 @@
 import heapq
 from itertools import islice
 from typing import (
+    Callable,
     Collection,
     Dict,
     Generator,
     Iterable,
     Iterator,
+    List,
     Mapping,
     Set,
     Sized,
@@ -71,6 +73,31 @@ def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
     return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen))
 
 
+def partition(
+    iterable: Iterable[T], predicate: Callable[[T], bool]
+) -> Tuple[List[T], List[T]]:
+    """
+    Separate a given iterable into two lists based on the result of a predicate function.
+
+    Args:
+        iterable: the iterable to partition (separate)
+        predicate: a function that takes an item from the iterable and returns a boolean
+
+    Returns:
+        A tuple of two lists, the first containing all items for which the predicate
+        returned True, the second containing all items for which the predicate returned
+        False
+    """
+    true_results = []
+    false_results = []
+    for item in iterable:
+        if predicate(item):
+            true_results.append(item)
+        else:
+            false_results.append(item)
+    return true_results, false_results
+
+
 def sorted_topologically(
     nodes: Iterable[T],
     graph: Mapping[T, Collection[T]],