diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 3fac256881..195057e841 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -75,6 +75,7 @@ from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.state import StateFilter
+from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.types import (
PersistedEventPosition,
RoomStreamToken,
@@ -644,9 +645,71 @@ class FederationEventHandler:
f"room {ev.room_id}, when we were backfilling in {room_id}"
)
+ # We expect the events from the `/backfill` response to start from
+ # `?v` and include events that preceded it (so the list will be
+ # newest -> oldest, reverse-chronological). It's described in the
+ # spec this way so we can rely on people doing it the right way for
+ # the historical messages to show up correctly.
+ reverse_chronological_events = events
+ # `[::-1]` is just syntax to reverse the list and give us a copy
+ chronological_events = reverse_chronological_events[::-1]
+
+ # We want to calculate the `stream_ordering` from newest -> oldest
+ # (reverse-chronological) (so MSC2716 historical events end up
+ # sorting in the correct order) and persist oldest -> newest
+ # (chronological) to get the least missing `prev_event` fetch
+ # thrashing.
+ # ------------------------------------------------------------------
+
+ # Since we have been configured to write, we ought to have id generators,
+ # rather than id trackers.
+ assert (
+ self._instance_name in self._config.worker.writers.events
+ ), "Can only write stream IDs on master"
+ assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator)
+ stream_ordering_manager = self._store._backfill_id_gen.get_next_mult(
+ len(reverse_chronological_events)
+ )
+ async with stream_ordering_manager as stream_orderings:
+ # Calculate the `stream_ordering` from newest -> oldest
+ # (reverse-chronological) (so historical events end up sorting
+ # in the correct order).
+ #
+ # Backfilled events start with `stream_ordering=-1` and
+ # decrement. For events, that we backfill at the same `depth`
+ # (like chains of historical messages) in order for them to have
+ # the best chance of ending up in the correct order, assign
+ # `stream_ordering` to the assumed reverse-chronological list of
+ # events to backfill (where the newest events get
+ # stream_ordering assigned first)
+ #
+ # depth : stream_ordering : event
+ # ----- : --------------- : -----------------------
+ # 1 : 1 : Event before 1
+ # 2 : 2 : Event before 2
+ # 3 : -4 : Historical message 1
+ # 3 : -4 : Historical message 2
+ # 3 : -3 : Historical message 3
+ # 3 : -2 : Historical message 4
+ # 3 : -1 : Historical message 5
+ # 3 : 3 : Event after 1
+ # 4 : 4 : Event after 2
+ #
+ for event, stream in zip(
+ reverse_chronological_events, stream_orderings
+ ):
+ event.internal_metadata.stream_ordering = stream
+
await self._process_pulled_events(
dest,
- events,
+ # Persist events from oldest -> newest (chronological) to get
+ # the least missing `prev_event` fetch thrashing.
+ # `_process_pulled_events` does some sorting of its own by
+ # `depth` but if we let it sort the reverse-chronological list
+ # of events, it naively orders events with the same depth in the
+ # opposite order we want. If we pass it an already sorted by
+ # depth list, then everything lines up.
+ chronological_events,
backfilled=True,
)
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index bb489b8189..27d75dba72 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -209,7 +209,11 @@ class PersistEventsStore:
async with stream_ordering_manager as stream_orderings:
for (event, _), stream in zip(events_and_contexts, stream_orderings):
- event.internal_metadata.stream_ordering = stream
+ # If someone has already decided the stream_ordering for the
+ # event before, then just use that. This is done during backfill
+ # to help ordering of MSC2716 historical messages.
+ if event.internal_metadata.stream_ordering is None:
+ event.internal_metadata.stream_ordering = stream
await self.db_pool.runInteraction(
"persist_events",
|