diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 66d3da8719..ab2ed53bce 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -648,42 +648,9 @@ class FederationEventHandler:
logger.info("Got %d prev_events", len(missing_events))
await self._process_pulled_events(origin, missing_events, backfilled=False)
- async def _process_pulled_events(
- self, origin: str, events: Iterable[EventBase], backfilled: bool
- ) -> None:
- """Process a batch of events we have pulled from a remote server
-
- Pulls in any events required to auth the events, persists the received events,
- and notifies clients, if appropriate.
-
- Assumes the events have already had their signatures and hashes checked.
-
- Params:
- origin: The server we received these events from
- events: The received events.
- backfilled: True if this is part of a historical batch of events (inhibits
- notification to clients, and validation of device keys.)
- """
-
- logger.info(
- "backfill events=%s",
- [
- "event_id=%s,depth=%d,body=%s,prevs=%s\n"
- % (
- event.event_id,
- event.depth,
- event.content.get("body", event.type),
- event.prev_event_ids(),
- )
- for event in events
- ],
- )
-
- # We want to sort these by depth so we process them and
- # tell clients about them in order.
- # sorted_events = sorted(events, key=lambda x: x.depth)
-
- event_ids = [event.event_id for event in events]
+ async def generateEventIdGraphFromEvents(
+ self, events: Iterable[EventBase]
+ ) -> Dict[str, Iterable[str]]:
event_map = {event.event_id: event for event in events}
# Since the insertion event we try to reference later on might be in the
@@ -697,6 +664,7 @@ class FederationEventHandler:
if event.type == EventTypes.MSC2716_INSERTION
}
+ # Map a given event to it's successors (backwards prev_events)
successor_event_id_map = {}
for event in events:
for prev_event_id in event.prev_event_ids():
@@ -727,11 +695,11 @@ class FederationEventHandler:
)
if insertion_event_id:
- # Add the insertion event as a fake edge connection to the batch
- # event so the historical batch topologically sorts below
- # the "live" event we branched off of.
- event_id_graph.setdefault(event.event_id, []).append(
- insertion_event_id
+ # Connect the insertion event via a fake edge pointing to the
+ # batch event so the historical batch topologically sorts
+ # behind-in-time the insertion event.
+ event_id_graph.setdefault(insertion_event_id, []).append(
+ event.event_id
)
# Maybe we can get lucky and save ourselves a lookup
@@ -743,10 +711,11 @@ class FederationEventHandler:
)
if insertion_event:
- # Also add some fake edges to connect the insertion
- # event to it's prev_event successors so it sorts
- # topologically behind-in-time the successor. Nestled
- # perfectly between the prev_event and the successor.
+ # Connect the insertion events' `prev_event` successors
+ # via fake edges pointing to the insertion event itself
+ # so the insertion event sorts topologically
+ # behind-in-time the successor. Nestled perfectly
+ # between the prev_event and the successor.
for insertion_prev_event_id in insertion_event.prev_event_ids():
successor_event_ids = successor_event_id_map[
insertion_prev_event_id
@@ -757,21 +726,61 @@ class FederationEventHandler:
successor_event_ids,
)
if successor_event_ids:
+ for successor_event_id in successor_event_ids:
+ # Don't add itself back as a successor
+ if successor_event_id != insertion_event_id:
+ # Fake edge to point the successor back
+ # at the insertion event
+ event_id_graph.setdefault(
+ successor_event_id, []
+ ).append(insertion_event_id)
+
+ # TODO: We also need to add fake edges to connect the oldest-in-time messages
+ # in the batch to the event we branched off of, see https://github.com/matrix-org/synapse/pull/11114#discussion_r739300985
- event_id_graph.setdefault(
- insertion_event_id, []
- ).extend(
- [
- successor_event_id
- for successor_event_id in successor_event_ids
- # Don't add itself back as a successor
- if successor_event_id != insertion_event_id
- ]
- )
+ return event_id_graph
+
+ async def _process_pulled_events(
+ self, origin: str, events: Iterable[EventBase], backfilled: bool
+ ) -> None:
+ """Process a batch of events we have pulled from a remote server
+
+ Pulls in any events required to auth the events, persists the received events,
+ and notifies clients, if appropriate.
+
+ Assumes the events have already had their signatures and hashes checked.
+
+ Params:
+ origin: The server we received these events from
+ events: The received events.
+ backfilled: True if this is part of a historical batch of events (inhibits
+ notification to clients, and validation of device keys.)
+ """
+
+ logger.info(
+ "backfill events=%s",
+ [
+ "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+ % (
+ event.event_id,
+ event.depth,
+ event.content.get("body", event.type),
+ event.prev_event_ids(),
+ )
+ for event in events
+ ],
+ )
+
+ # We want to sort these by depth so we process them and
+ # tell clients about them in order.
+ # sorted_events = sorted(events, key=lambda x: x.depth)
# We want to sort topologically so we process them and tell clients
# about them in order.
sorted_events = []
+ event_ids = [event.event_id for event in events]
+ event_map = {event.event_id: event for event in events}
+ event_id_graph = await self.generateEventIdGraphFromEvents(events)
for event_id in sorted_topologically(event_ids, event_id_graph):
sorted_events.append(event_map[event_id])
sorted_events = reversed(sorted_events)
|