diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a396a201d4..86baf397fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1502,6 +1502,9 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
+ self._handle_insertion_event(txn, event)
+ self._handle_chunk_event(txn, event)
+
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
if labels:
@@ -1754,6 +1757,94 @@ class PersistEventsStore:
if rel_type == RelationTypes.REPLACE:
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
+ def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
+ """Handles keeping track of insertion events and edges/connections.
+ Part of MSC2716.
+
+ Args:
+ txn: The database transaction object
+ event: The event to process
+ """
+
+ if event.type != EventTypes.MSC2716_INSERTION:
+ # Not a insertion event
+ return
+
+ # Skip processing a insertion event if the room version doesn't
+ # support it.
+ room_version = self.store.get_room_version_txn(txn, event.room_id)
+ if not room_version.msc2716_historical:
+ return
+
+ next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
+ if next_chunk_id is None:
+ # Invalid insertion event without next chunk ID
+ return
+
+ logger.debug(
+ "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+ )
+
+ # Keep track of the insertion event and the chunk ID
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="insertion_events",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "next_chunk_id": next_chunk_id,
+ },
+ )
+
+ # Insert an edge for every prev_event connection
+ for prev_event_id in event.prev_events:
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="insertion_event_edges",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "insertion_prev_event_id": prev_event_id,
+ },
+ )
+
+ def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
+ """Handles inserting the chunk edges/connections between the chunk event
+ and an insertion event. Part of MSC2716.
+
+ Args:
+ txn: The database transaction object
+ event: The event to process
+ """
+
+ if event.type != EventTypes.MSC2716_CHUNK:
+ # Not a chunk event
+ return
+
+ # Skip processing a chunk event if the room version doesn't
+ # support it.
+ room_version = self.store.get_room_version_txn(txn, event.room_id)
+ if not room_version.msc2716_historical:
+ return
+
+ chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
+ if chunk_id is None:
+ # Invalid chunk event without a chunk ID
+ return
+
+ logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+
+ # Keep track of the insertion event and the chunk ID
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="chunk_events",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "chunk_id": chunk_id,
+ },
+ )
+
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.
|