summary refs log tree commit diff
path: root/synapse/storage/databases/main/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events.py')
-rw-r--r--synapse/storage/databases/main/events.py91
1 files changed, 91 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a396a201d4..86baf397fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1502,6 +1502,9 @@ class PersistEventsStore:
 
             self._handle_event_relations(txn, event)
 
+            self._handle_insertion_event(txn, event)
+            self._handle_chunk_event(txn, event)
+
             # Store the labels for this event.
             labels = event.content.get(EventContentFields.LABELS)
             if labels:
@@ -1754,6 +1757,94 @@ class PersistEventsStore:
         if rel_type == RelationTypes.REPLACE:
             txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
 
+    def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles keeping track of insertion events and edges/connections.
+        Part of MSC2716.
+
+        Args:
+            txn: The database transaction object
+            event: The event to process
+        """
+
+        if event.type != EventTypes.MSC2716_INSERTION:
+            # Not a insertion event
+            return
+
+        # Skip processing a insertion event if the room version doesn't
+        # support it.
+        room_version = self.store.get_room_version_txn(txn, event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
+        if next_chunk_id is None:
+            # Invalid insertion event without next chunk ID
+            return
+
+        logger.debug(
+            "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+        )
+
+        # Keep track of the insertion event and the chunk ID
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="insertion_events",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "next_chunk_id": next_chunk_id,
+            },
+        )
+
+        # Insert an edge for every prev_event connection
+        for prev_event_id in event.prev_events:
+            self.db_pool.simple_insert_txn(
+                txn,
+                table="insertion_event_edges",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "insertion_prev_event_id": prev_event_id,
+                },
+            )
+
+    def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles inserting the chunk edges/connections between the chunk event
+        and an insertion event. Part of MSC2716.
+
+        Args:
+            txn: The database transaction object
+            event: The event to process
+        """
+
+        if event.type != EventTypes.MSC2716_CHUNK:
+            # Not a chunk event
+            return
+
+        # Skip processing a chunk event if the room version doesn't
+        # support it.
+        room_version = self.store.get_room_version_txn(txn, event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
+        if chunk_id is None:
+            # Invalid chunk event without a chunk ID
+            return
+
+        logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+
+        # Keep track of the insertion event and the chunk ID
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="chunk_events",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "chunk_id": chunk_id,
+            },
+        )
+
     def _handle_redaction(self, txn, redacted_event_id):
         """Handles receiving a redaction and checking whether we need to remove
         any redacted relations from the database.