summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-10-30 00:46:02 -0500
committerEric Eastwood <erice@element.io>2021-10-30 00:46:02 -0500
commit76d454f81dbfe71da063bc33b17dc09870ee53fd (patch)
treea68e46ee6e319123a3a4e1d8bb2fb402c1dd85b8
parentFix backfill being able to cleanly branch into history and back to "live" (diff)
downloadsynapse-76d454f81dbfe71da063bc33b17dc09870ee53fd.tar.xz
Some backfill receive sorting fixes but not using it yet
-rw-r--r--synapse/handlers/federation_event.py104
1 files changed, 51 insertions, 53 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py

index c9060a594f..f219a0f42a 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -702,41 +702,37 @@ class FederationEventHandler: event.event_id ) - # # Maybe we can get lucky and save ourselves a lookup - # # by checking the events in the backfill first - # insertion_event = event_map[ - # insertion_event_id - # ] or await self._store.get_event( - # insertion_event_id, allow_none=True - # ) - - # if insertion_event: - # # Connect the insertion events' `prev_event` successors - # # via fake edges pointing to the insertion event itself - # # so the insertion event sorts topologically - # # behind-in-time the successor. Nestled perfectly - # # between the prev_event and the successor. - # for insertion_prev_event_id in insertion_event.prev_event_ids(): - # successor_event_ids = successor_event_id_map[ - # insertion_prev_event_id - # ] - # logger.info( - # "insertion_event_id=%s successor_event_ids=%s", - # insertion_event_id, - # successor_event_ids, - # ) - # if successor_event_ids: - # for successor_event_id in successor_event_ids: - # # Don't add itself back as a successor - # if successor_event_id != insertion_event_id: - # # Fake edge to point the successor back - # # at the insertion event - # event_id_graph.setdefault( - # successor_event_id, [] - # ).append(insertion_event_id) - - # TODO: We also need to add fake edges to connect the oldest-in-time messages - # in the batch to the event we branched off of, see https://github.com/matrix-org/synapse/pull/11114#discussion_r739300985 + # Maybe we can get lucky and save ourselves a lookup + # by checking the events in the backfill first + insertion_event = event_map[ + insertion_event_id + ] or await self._store.get_event( + insertion_event_id, allow_none=True + ) + + if insertion_event: + # Connect the insertion events' `prev_event` successors + # via fake edges pointing to the insertion event itself + # so the insertion event sorts topologically + # behind-in-time the successor. Nestled perfectly + # between the prev_event and the successor. + for insertion_prev_event_id in insertion_event.prev_event_ids(): + successor_event_ids = successor_event_id_map[ + insertion_prev_event_id + ] + if successor_event_ids: + for successor_event_id in successor_event_ids: + # Don't add itself back as a successor + if successor_event_id != insertion_event_id: + # Fake edge to point the successor back + # at the insertion event + event_id_graph.setdefault( + successor_event_id, [] + ).append(insertion_event_id) + + # TODO: We also need to add fake edges to connect insertion events -> to + # the base event in the "live" DAG we branched off of, see scenario 2 + # https://github.com/matrix-org/synapse/pull/11114#discussion_r739300985 return event_id_graph @@ -774,6 +770,9 @@ class FederationEventHandler: # We want to sort these by depth so we process them and # tell clients about them in order. sorted_events = sorted(events, key=lambda x: x.depth) + for ev in sorted_events: + with nested_logging_context(ev.event_id): + await self._process_pulled_event(origin, ev, backfilled=backfilled) # # We want to sort topologically so we process them and tell clients # # about them in order. @@ -783,25 +782,24 @@ class FederationEventHandler: # event_id_graph = await self.generateEventIdGraphFromEvents(events) # for event_id in sorted_topologically(event_ids, event_id_graph): # sorted_events.append(event_map[event_id]) - # sorted_events = reversed(sorted_events) - - logger.info( - "backfill sorted_events=%s", - [ - "event_id=%s,depth=%d,body=%s,prevs=%s\n" - % ( - event.event_id, - event.depth, - event.content.get("body", event.type), - event.prev_event_ids(), - ) - for event in sorted_events - ], - ) - for ev in sorted_events: - with nested_logging_context(ev.event_id): - await self._process_pulled_event(origin, ev, backfilled=backfilled) + # logger.info( + # "backfill sorted_events=%s", + # [ + # "event_id=%s,depth=%d,body=%s,prevs=%s\n" + # % ( + # event.event_id, + # event.depth, + # event.content.get("body", event.type), + # event.prev_event_ids(), + # ) + # for event in reversed(sorted_events) + # ], + # ) + + # for ev in reversed(sorted_events): + # with nested_logging_context(ev.event_id): + # await self._process_pulled_event(origin, ev, backfilled=backfilled) async def _process_pulled_event( self, origin: str, event: EventBase, backfilled: bool