diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 610a4e48c5..66d3da8719 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -72,7 +72,7 @@ from synapse.types import (
get_domain_from_id,
)
from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter
+from synapse.util.iterutils import batch_iter, sorted_topologically
from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
@@ -665,9 +665,130 @@ class FederationEventHandler:
notification to clients, and validation of device keys.)
"""
+ logger.info(
+ "backfill events=%s",
+ [
+ "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+ % (
+ event.event_id,
+ event.depth,
+ event.content.get("body", event.type),
+ event.prev_event_ids(),
+ )
+ for event in events
+ ],
+ )
+
# We want to sort these by depth so we process them and
# tell clients about them in order.
- sorted_events = sorted(events, key=lambda x: x.depth)
+ # sorted_events = sorted(events, key=lambda x: x.depth)
+
+ event_ids = [event.event_id for event in events]
+ event_map = {event.event_id: event for event in events}
+
+ # Since the insertion event we try to reference later on might be in the
+ # backfill chunk itself, we need to make it easy to lookup. Maps a given
+ # batch_id to the insertion event.
+ batch_id_map = {
+ event.content.get(
+ EventContentFields.MSC2716_NEXT_BATCH_ID, None
+ ): event.event_id
+ for event in events
+ if event.type == EventTypes.MSC2716_INSERTION
+ }
+
+ successor_event_id_map = {}
+ for event in events:
+ for prev_event_id in event.prev_event_ids():
+ successor_event_id_map.setdefault(prev_event_id, []).append(
+ event.event_id
+ )
+
+ event_id_graph = {}
+ for event in events:
+ # Assign the real edges to the graph.
+ # Make a copy so we don't modify the actual prev_events when we extend them below.
+ event_id_graph.setdefault(event.event_id, []).extend(
+ event.prev_event_ids().copy()
+ )
+
+ # We need to make some fake edge connections from the batch event at
+ # the bottom of the historical batch to the insertion event. This
+ # way the historical batch topologically sorts in ahead-in-time of
+ # the event we branched off of.
+ batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID, None)
+ if event.type == EventTypes.MSC2716_BATCH and batch_id:
+ # Maybe we can get lucky and save ourselves a lookup
+ # by checking the events in the backfill first
+ insertion_event_id = batch_id_map[
+ batch_id
+ ] or await self._store.get_insertion_event_id_by_batch_id(
+ event.room_id, batch_id
+ )
+
+ if insertion_event_id:
+ # Add the insertion event as a fake edge connection to the batch
+ # event so the historical batch topologically sorts below
+ # the "live" event we branched off of.
+ event_id_graph.setdefault(event.event_id, []).append(
+ insertion_event_id
+ )
+
+ # Maybe we can get lucky and save ourselves a lookup
+ # by checking the events in the backfill first
+ insertion_event = event_map[
+ insertion_event_id
+ ] or await self._store.get_event(
+ insertion_event_id, allow_none=True
+ )
+
+ if insertion_event:
+ # Also add some fake edges to connect the insertion
+ # event to it's prev_event successors so it sorts
+ # topologically behind-in-time the successor. Nestled
+ # perfectly between the prev_event and the successor.
+ for insertion_prev_event_id in insertion_event.prev_event_ids():
+ successor_event_ids = successor_event_id_map[
+ insertion_prev_event_id
+ ]
+ logger.info(
+ "insertion_event_id=%s successor_event_ids=%s",
+ insertion_event_id,
+ successor_event_ids,
+ )
+ if successor_event_ids:
+
+ event_id_graph.setdefault(
+ insertion_event_id, []
+ ).extend(
+ [
+ successor_event_id
+ for successor_event_id in successor_event_ids
+ # Don't add itself back as a successor
+ if successor_event_id != insertion_event_id
+ ]
+ )
+
+ # We want to sort topologically so we process them and tell clients
+ # about them in order.
+ sorted_events = []
+ for event_id in sorted_topologically(event_ids, event_id_graph):
+ sorted_events.append(event_map[event_id])
+ sorted_events = reversed(sorted_events)
+
+ logger.info(
+ "backfill sorted_events=%s",
+ [
+ "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+ % (
+ event.event_id,
+ event.depth,
+ event.content.get("body", event.type),
+ event.prev_event_ids(),
+ )
+ for event in sorted_events
+ ],
+ )
for ev in sorted_events:
with nested_logging_context(ev.event_id):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d6f0b99f58..2f4b458d45 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1511,7 +1511,7 @@ class EventCreationHandler:
EventContentFields.MSC2716_NEXT_BATCH_ID
)
conflicting_insertion_event_id = (
- await self.store.get_insertion_event_by_batch_id(
+ await self.store.get_insertion_event_id_by_batch_id(
event.room_id, next_batch_id
)
)
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index 99f8156ad0..5423d39efd 100644
--- a/synapse/rest/client/room_batch.py
+++ b/synapse/rest/client/room_batch.py
@@ -112,7 +112,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
# and have the batch connected.
if batch_id_from_query:
corresponding_insertion_event_id = (
- await self.store.get_insertion_event_by_batch_id(
+ await self.store.get_insertion_event_id_by_batch_id(
room_id, batch_id_from_query
)
)
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
index dcbce8fdcf..97b2618437 100644
--- a/synapse/storage/databases/main/room_batch.py
+++ b/synapse/storage/databases/main/room_batch.py
@@ -18,7 +18,7 @@ from synapse.storage._base import SQLBaseStore
class RoomBatchStore(SQLBaseStore):
- async def get_insertion_event_by_batch_id(
+ async def get_insertion_event_id_by_batch_id(
self, room_id: str, batch_id: str
) -> Optional[str]:
"""Retrieve a insertion event ID.
|