summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-10-29 02:43:17 -0500
committerEric Eastwood <erice@element.io>2021-10-29 02:43:17 -0500
commit6ea263b73ba1d0e275ef4fd266c4a9144198ebb9 (patch)
treeec3ce4ef0060aa423b29c40fe6b6e1ca87c03512
parentWIP: Sort events topologically when we receive them over backfill (diff)
downloadsynapse-6ea263b73ba1d0e275ef4fd266c4a9144198ebb9.tar.xz
Revert "WIP: Sort events topologically when we receive them over backfill"
This reverts commit 5afc264dd54221142b0acd0a56cbe07c3eac2113.
-rw-r--r--synapse/handlers/federation_event.py125
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/rest/client/room_batch.py2
-rw-r--r--synapse/storage/databases/main/room_batch.py2
4 files changed, 5 insertions, 126 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py

index 66d3da8719..610a4e48c5 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py
@@ -72,7 +72,7 @@ from synapse.types import ( get_domain_from_id, ) from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter, sorted_topologically +from synapse.util.iterutils import batch_iter from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -665,130 +665,9 @@ class FederationEventHandler: notification to clients, and validation of device keys.) """ - logger.info( - "backfill events=%s", - [ - "event_id=%s,depth=%d,body=%s,prevs=%s\n" - % ( - event.event_id, - event.depth, - event.content.get("body", event.type), - event.prev_event_ids(), - ) - for event in events - ], - ) - # We want to sort these by depth so we process them and # tell clients about them in order. - # sorted_events = sorted(events, key=lambda x: x.depth) - - event_ids = [event.event_id for event in events] - event_map = {event.event_id: event for event in events} - - # Since the insertion event we try to reference later on might be in the - # backfill chunk itself, we need to make it easy to lookup. Maps a given - # batch_id to the insertion event. - batch_id_map = { - event.content.get( - EventContentFields.MSC2716_NEXT_BATCH_ID, None - ): event.event_id - for event in events - if event.type == EventTypes.MSC2716_INSERTION - } - - successor_event_id_map = {} - for event in events: - for prev_event_id in event.prev_event_ids(): - successor_event_id_map.setdefault(prev_event_id, []).append( - event.event_id - ) - - event_id_graph = {} - for event in events: - # Assign the real edges to the graph. - # Make a copy so we don't modify the actual prev_events when we extend them below. - event_id_graph.setdefault(event.event_id, []).extend( - event.prev_event_ids().copy() - ) - - # We need to make some fake edge connections from the batch event at - # the bottom of the historical batch to the insertion event. This - # way the historical batch topologically sorts in ahead-in-time of - # the event we branched off of. - batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID, None) - if event.type == EventTypes.MSC2716_BATCH and batch_id: - # Maybe we can get lucky and save ourselves a lookup - # by checking the events in the backfill first - insertion_event_id = batch_id_map[ - batch_id - ] or await self._store.get_insertion_event_id_by_batch_id( - event.room_id, batch_id - ) - - if insertion_event_id: - # Add the insertion event as a fake edge connection to the batch - # event so the historical batch topologically sorts below - # the "live" event we branched off of. - event_id_graph.setdefault(event.event_id, []).append( - insertion_event_id - ) - - # Maybe we can get lucky and save ourselves a lookup - # by checking the events in the backfill first - insertion_event = event_map[ - insertion_event_id - ] or await self._store.get_event( - insertion_event_id, allow_none=True - ) - - if insertion_event: - # Also add some fake edges to connect the insertion - # event to it's prev_event successors so it sorts - # topologically behind-in-time the successor. Nestled - # perfectly between the prev_event and the successor. - for insertion_prev_event_id in insertion_event.prev_event_ids(): - successor_event_ids = successor_event_id_map[ - insertion_prev_event_id - ] - logger.info( - "insertion_event_id=%s successor_event_ids=%s", - insertion_event_id, - successor_event_ids, - ) - if successor_event_ids: - - event_id_graph.setdefault( - insertion_event_id, [] - ).extend( - [ - successor_event_id - for successor_event_id in successor_event_ids - # Don't add itself back as a successor - if successor_event_id != insertion_event_id - ] - ) - - # We want to sort topologically so we process them and tell clients - # about them in order. - sorted_events = [] - for event_id in sorted_topologically(event_ids, event_id_graph): - sorted_events.append(event_map[event_id]) - sorted_events = reversed(sorted_events) - - logger.info( - "backfill sorted_events=%s", - [ - "event_id=%s,depth=%d,body=%s,prevs=%s\n" - % ( - event.event_id, - event.depth, - event.content.get("body", event.type), - event.prev_event_ids(), - ) - for event in sorted_events - ], - ) + sorted_events = sorted(events, key=lambda x: x.depth) for ev in sorted_events: with nested_logging_context(ev.event_id): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2f4b458d45..d6f0b99f58 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -1511,7 +1511,7 @@ class EventCreationHandler: EventContentFields.MSC2716_NEXT_BATCH_ID ) conflicting_insertion_event_id = ( - await self.store.get_insertion_event_id_by_batch_id( + await self.store.get_insertion_event_by_batch_id( event.room_id, next_batch_id ) ) diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index 5423d39efd..99f8156ad0 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py
@@ -112,7 +112,7 @@ class RoomBatchSendEventRestServlet(RestServlet): # and have the batch connected. if batch_id_from_query: corresponding_insertion_event_id = ( - await self.store.get_insertion_event_id_by_batch_id( + await self.store.get_insertion_event_by_batch_id( room_id, batch_id_from_query ) ) diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
index 97b2618437..dcbce8fdcf 100644 --- a/synapse/storage/databases/main/room_batch.py +++ b/synapse/storage/databases/main/room_batch.py
@@ -18,7 +18,7 @@ from synapse.storage._base import SQLBaseStore class RoomBatchStore(SQLBaseStore): - async def get_insertion_event_id_by_batch_id( + async def get_insertion_event_by_batch_id( self, room_id: str, batch_id: str ) -> Optional[str]: """Retrieve a insertion event ID.