summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-09-29 23:45:35 -0500
committerEric Eastwood <erice@element.io>2022-09-29 23:53:41 -0500
commit68ae0fd5c59e9a7a005c85f0fb69415e39febf7e (patch)
tree148b388d0bbcbdc32f004b1e517094362998314c
parentDon't require `setuptools_rust` at runtime (#13952) (diff)
downloadsynapse-github/madlittlemods/test-backfill-and-messages-still-works-with-many-batches.tar.xz
Pulled from scratch changes in,
https://github.com/matrix-org/synapse/pull/13864
-rw-r--r--synapse/handlers/federation_event.py65
-rw-r--r--synapse/storage/databases/main/events.py6
2 files changed, 69 insertions, 2 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 3fac256881..195057e841 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -75,6 +75,7 @@ from synapse.state import StateResolutionStore
 from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
+from synapse.storage.util.id_generators import AbstractStreamIdGenerator
 from synapse.types import (
     PersistedEventPosition,
     RoomStreamToken,
@@ -644,9 +645,71 @@ class FederationEventHandler:
                         f"room {ev.room_id}, when we were backfilling in {room_id}"
                     )
 
+            # We expect the events from the `/backfill` response to start from
+            # `?v` and include events that preceded it (so the list will be
+            # newest -> oldest, reverse-chronological). It's described in the
+            # spec this way so we can rely on people doing it the right way for
+            # the historical messages to show up correctly.
+            reverse_chronological_events = events
+            # `[::-1]` is just syntax to reverse the list and give us a copy
+            chronological_events = reverse_chronological_events[::-1]
+
+            # We want to calculate the `stream_ordering` from newest -> oldest
+            # (reverse-chronological) (so MSC2716 historical events end up
+            # sorting in the correct order) and persist oldest -> newest
+            # (chronological) to get the least missing `prev_event` fetch
+            # thrashing.
+            # ------------------------------------------------------------------
+
+            # Since we have been configured to write, we ought to have id generators,
+            # rather than id trackers.
+            assert (
+                self._instance_name in self._config.worker.writers.events
+            ), "Can only write stream IDs on master"
+            assert isinstance(self._store._backfill_id_gen, AbstractStreamIdGenerator)
+            stream_ordering_manager = self._store._backfill_id_gen.get_next_mult(
+                len(reverse_chronological_events)
+            )
+            async with stream_ordering_manager as stream_orderings:
+                # Calculate the `stream_ordering` from newest -> oldest
+                # (reverse-chronological) (so historical events end up sorting
+                # in the correct order).
+                #
+                # Backfilled events start with `stream_ordering=-1` and
+                # decrement. For events, that we backfill at the same `depth`
+                # (like chains of historical messages) in order for them to have
+                # the best chance of ending up in the correct order, assign
+                # `stream_ordering` to the assumed reverse-chronological list of
+                # events to backfill (where the newest events get
+                # stream_ordering assigned first)
+                #
+                # depth : stream_ordering : event
+                # ----- : --------------- : -----------------------
+                # 1     :  1              : Event before 1
+                # 2     :  2              : Event before 2
+                # 3     : -4              : Historical message 1
+                # 3     : -4              : Historical message 2
+                # 3     : -3              : Historical message 3
+                # 3     : -2              : Historical message 4
+                # 3     : -1              : Historical message 5
+                # 3     :  3              : Event after 1
+                # 4     :  4              : Event after 2
+                #
+                for event, stream in zip(
+                    reverse_chronological_events, stream_orderings
+                ):
+                    event.internal_metadata.stream_ordering = stream
+
             await self._process_pulled_events(
                 dest,
-                events,
+                # Persist events from oldest -> newest (chronological) to get
+                # the least missing `prev_event` fetch thrashing.
+                # `_process_pulled_events` does some sorting of its own by
+                # `depth` but if we let it sort the reverse-chronological list
+                # of events, it naively orders events with the same depth in the
+                # opposite order we want. If we pass it an already sorted by
+                # depth list, then everything lines up.
+                chronological_events,
                 backfilled=True,
             )
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index bb489b8189..27d75dba72 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -209,7 +209,11 @@ class PersistEventsStore:
 
         async with stream_ordering_manager as stream_orderings:
             for (event, _), stream in zip(events_and_contexts, stream_orderings):
-                event.internal_metadata.stream_ordering = stream
+                # If someone has already decided the stream_ordering for the
+                # event before, then just use that. This is done during backfill
+                # to help ordering of MSC2716 historical messages.
+                if event.internal_metadata.stream_ordering is None:
+                    event.internal_metadata.stream_ordering = stream
 
             await self.db_pool.runInteraction(
                 "persist_events",