summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/federation_event.py119
1 files changed, 64 insertions, 55 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py

index 66d3da8719..ab2ed53bce 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -648,42 +648,9 @@ class FederationEventHandler: logger.info("Got %d prev_events", len(missing_events)) await self._process_pulled_events(origin, missing_events, backfilled=False) - async def _process_pulled_events( - self, origin: str, events: Iterable[EventBase], backfilled: bool - ) -> None: - """Process a batch of events we have pulled from a remote server - - Pulls in any events required to auth the events, persists the received events, - and notifies clients, if appropriate. - - Assumes the events have already had their signatures and hashes checked. - - Params: - origin: The server we received these events from - events: The received events. - backfilled: True if this is part of a historical batch of events (inhibits - notification to clients, and validation of device keys.) - """ - - logger.info( - "backfill events=%s", - [ - "event_id=%s,depth=%d,body=%s,prevs=%s\n" - % ( - event.event_id, - event.depth, - event.content.get("body", event.type), - event.prev_event_ids(), - ) - for event in events - ], - ) - - # We want to sort these by depth so we process them and - # tell clients about them in order. - # sorted_events = sorted(events, key=lambda x: x.depth) - - event_ids = [event.event_id for event in events] + async def generateEventIdGraphFromEvents( + self, events: Iterable[EventBase] + ) -> Dict[str, Iterable[str]]: event_map = {event.event_id: event for event in events} # Since the insertion event we try to reference later on might be in the @@ -697,6 +664,7 @@ class FederationEventHandler: if event.type == EventTypes.MSC2716_INSERTION } + # Map a given event to it's successors (backwards prev_events) successor_event_id_map = {} for event in events: for prev_event_id in event.prev_event_ids(): @@ -727,11 +695,11 @@ class FederationEventHandler: ) if insertion_event_id: - # Add the insertion event as a fake edge connection to the batch - # event so the historical batch topologically sorts below - # the "live" event we branched off of. - event_id_graph.setdefault(event.event_id, []).append( - insertion_event_id + # Connect the insertion event via a fake edge pointing to the + # batch event so the historical batch topologically sorts + # behind-in-time the insertion event. + event_id_graph.setdefault(insertion_event_id, []).append( + event.event_id ) # Maybe we can get lucky and save ourselves a lookup @@ -743,10 +711,11 @@ class FederationEventHandler: ) if insertion_event: - # Also add some fake edges to connect the insertion - # event to it's prev_event successors so it sorts - # topologically behind-in-time the successor. Nestled - # perfectly between the prev_event and the successor. + # Connect the insertion events' `prev_event` successors + # via fake edges pointing to the insertion event itself + # so the insertion event sorts topologically + # behind-in-time the successor. Nestled perfectly + # between the prev_event and the successor. for insertion_prev_event_id in insertion_event.prev_event_ids(): successor_event_ids = successor_event_id_map[ insertion_prev_event_id @@ -757,21 +726,61 @@ class FederationEventHandler: successor_event_ids, ) if successor_event_ids: + for successor_event_id in successor_event_ids: + # Don't add itself back as a successor + if successor_event_id != insertion_event_id: + # Fake edge to point the successor back + # at the insertion event + event_id_graph.setdefault( + successor_event_id, [] + ).append(insertion_event_id) + + # TODO: We also need to add fake edges to connect the oldest-in-time messages + # in the batch to the event we branched off of, see https://github.com/matrix-org/synapse/pull/11114#discussion_r739300985 - event_id_graph.setdefault( - insertion_event_id, [] - ).extend( - [ - successor_event_id - for successor_event_id in successor_event_ids - # Don't add itself back as a successor - if successor_event_id != insertion_event_id - ] - ) + return event_id_graph + + async def _process_pulled_events( + self, origin: str, events: Iterable[EventBase], backfilled: bool + ) -> None: + """Process a batch of events we have pulled from a remote server + + Pulls in any events required to auth the events, persists the received events, + and notifies clients, if appropriate. + + Assumes the events have already had their signatures and hashes checked. + + Params: + origin: The server we received these events from + events: The received events. + backfilled: True if this is part of a historical batch of events (inhibits + notification to clients, and validation of device keys.) + """ + + logger.info( + "backfill events=%s", + [ + "event_id=%s,depth=%d,body=%s,prevs=%s\n" + % ( + event.event_id, + event.depth, + event.content.get("body", event.type), + event.prev_event_ids(), + ) + for event in events + ], + ) + + # We want to sort these by depth so we process them and + # tell clients about them in order. + # sorted_events = sorted(events, key=lambda x: x.depth) # We want to sort topologically so we process them and tell clients # about them in order. sorted_events = [] + event_ids = [event.event_id for event in events] + event_map = {event.event_id: event for event in events} + event_id_graph = await self.generateEventIdGraphFromEvents(events) for event_id in sorted_topologically(event_ids, event_id_graph): sorted_events.append(event_map[event_id]) sorted_events = reversed(sorted_events)