summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/10245.feature1
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/api/room_versions.py27
-rw-r--r--synapse/event_auth.py38
-rw-r--r--synapse/events/utils.py3
-rw-r--r--synapse/handlers/federation.py6
-rw-r--r--synapse/handlers/room.py1
-rw-r--r--synapse/rest/client/v1/room.py7
-rw-r--r--synapse/storage/databases/main/event_federation.py88
-rw-r--r--synapse/storage/databases/main/events.py91
-rw-r--r--synapse/storage/databases/main/state.py50
-rw-r--r--synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql49
12 files changed, 338 insertions, 26 deletions
diff --git a/changelog.d/10245.feature b/changelog.d/10245.feature
new file mode 100644
index 0000000000..b3c48cc2cc
--- /dev/null
+++ b/changelog.d/10245.feature
@@ -0,0 +1 @@
+Make historical events discoverable from backfill for servers without any scrollback history (part of MSC2716).
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index a40d6d7961..a986fdb47a 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -206,9 +206,6 @@ class EventContentFields:
     MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
     # For "marker" events
     MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
-    MSC2716_MARKER_INSERTION_PREV_EVENTS = (
-        "org.matrix.msc2716.marker.insertion_prev_events"
-    )
 
 
 class RoomTypes:
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index 697319e52d..bc678efe49 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -73,6 +73,9 @@ class RoomVersion:
     # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
     # m.room.membership event with membership 'knock'.
     msc2403_knocking = attr.ib(type=bool)
+    # MSC2716: Adds m.room.power_levels -> content.historical field to control
+    # whether "insertion", "chunk", "marker" events can be sent
+    msc2716_historical = attr.ib(type=bool)
 
 
 class RoomVersions:
@@ -88,6 +91,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V2 = RoomVersion(
         "2",
@@ -101,6 +105,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V3 = RoomVersion(
         "3",
@@ -114,6 +119,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V4 = RoomVersion(
         "4",
@@ -127,6 +133,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V5 = RoomVersion(
         "5",
@@ -140,6 +147,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V6 = RoomVersion(
         "6",
@@ -153,6 +161,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     MSC2176 = RoomVersion(
         "org.matrix.msc2176",
@@ -166,6 +175,7 @@ class RoomVersions:
         msc2176_redaction_rules=True,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     MSC3083 = RoomVersion(
         "org.matrix.msc3083.v2",
@@ -179,6 +189,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=True,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V7 = RoomVersion(
         "7",
@@ -192,6 +203,21 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=True,
+        msc2716_historical=False,
+    )
+    MSC2716 = RoomVersion(
+        "org.matrix.msc2716",
+        RoomDisposition.STABLE,
+        EventFormatVersions.V3,
+        StateResolutionVersions.V2,
+        enforce_key_validity=True,
+        special_case_aliases_auth=False,
+        strict_canonicaljson=True,
+        limit_notifications_power_levels=True,
+        msc2176_redaction_rules=False,
+        msc3083_join_rules=False,
+        msc2403_knocking=True,
+        msc2716_historical=True,
     )
 
 
@@ -207,6 +233,7 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
         RoomVersions.MSC2176,
         RoomVersions.MSC3083,
         RoomVersions.V7,
+        RoomVersions.MSC2716,
     )
 }
 
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index cc92d35477..0fa7ffc99f 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -205,6 +205,13 @@ def check(
     if event.type == EventTypes.Redaction:
         check_redaction(room_version_obj, event, auth_events)
 
+    if (
+        event.type == EventTypes.MSC2716_INSERTION
+        or event.type == EventTypes.MSC2716_CHUNK
+        or event.type == EventTypes.MSC2716_MARKER
+    ):
+        check_historical(room_version_obj, event, auth_events)
+
     logger.debug("Allowing! %s", event)
 
 
@@ -539,6 +546,37 @@ def check_redaction(
     raise AuthError(403, "You don't have permission to redact events")
 
 
+def check_historical(
+    room_version_obj: RoomVersion,
+    event: EventBase,
+    auth_events: StateMap[EventBase],
+) -> None:
+    """Check whether the event sender is allowed to send historical related
+    events like "insertion", "chunk", and "marker".
+
+    Returns:
+        None
+
+    Raises:
+        AuthError if the event sender is not allowed to send historical related events
+        ("insertion", "chunk", and "marker").
+    """
+    # Ignore the auth checks in room versions that do not support historical
+    # events
+    if not room_version_obj.msc2716_historical:
+        return
+
+    user_level = get_user_power_level(event.user_id, auth_events)
+
+    historical_level = get_named_level(auth_events, "historical", 100)
+
+    if user_level < historical_level:
+        raise AuthError(
+            403,
+            'You don\'t have permission to send send historical related events ("insertion", "chunk", and "marker")',
+        )
+
+
 def _check_power_levels(
     room_version_obj: RoomVersion,
     event: EventBase,
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index f4da9e0923..a0c07f62f4 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -126,6 +126,9 @@ def prune_event_dict(room_version: RoomVersion, event_dict: dict) -> dict:
         if room_version.msc2176_redaction_rules:
             add_fields("invite")
 
+        if room_version.msc2716_historical:
+            add_fields("historical")
+
     elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
         add_fields("aliases")
     elif event_type == EventTypes.RoomHistoryVisibility:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index aba095d2e1..8197b60b76 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2748,9 +2748,11 @@ class FederationHandler(BaseHandler):
                             event.event_id,
                             e.event_id,
                         )
-                        context = await self.state_handler.compute_event_context(e)
+                        missing_auth_event_context = (
+                            await self.state_handler.compute_event_context(e)
+                        )
                         await self._auth_and_persist_event(
-                            origin, e, context, auth_events=auth
+                            origin, e, missing_auth_event_context, auth_events=auth
                         )
 
                         if e.event_id in event_auth_events:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 370561e549..b33fe09f77 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -951,6 +951,7 @@ class RoomCreationHandler(BaseHandler):
                 "kick": 50,
                 "redact": 50,
                 "invite": 50,
+                "historical": 100,
             }
 
             if config["original_invitees_have_ops"]:
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 25ba52c624..502a917588 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -504,7 +504,6 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
 
         events_to_create = body["events"]
 
-        prev_event_ids = prev_events_from_query
         inherited_depth = await self._inherit_depth_from_prev_ids(
             prev_events_from_query
         )
@@ -516,6 +515,10 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
         chunk_id_to_connect_to = chunk_id_from_query
         base_insertion_event = None
         if chunk_id_from_query:
+            #  All but the first base insertion event should point at a fake
+            #  event, which causes the HS to ask for the state at the start of
+            #  the chunk later.
+            prev_event_ids = [fake_prev_event_id]
             # TODO: Verify the chunk_id_from_query corresponds to an insertion event
             pass
         # Otherwise, create an insertion event to act as a starting point.
@@ -526,6 +529,8 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
         # an insertion event), in which case we just create a new insertion event
         # that can then get pointed to by a "marker" event later.
         else:
+            prev_event_ids = prev_events_from_query
+
             base_insertion_event_dict = self._create_insertion_event_dict(
                 sender=requester.user.to_string(),
                 room_id=room_id,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f4a00b0736..547e43ab98 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -936,15 +936,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # We want to make sure that we do a breadth-first, "depth" ordered
         # search.
 
-        query = (
-            "SELECT depth, prev_event_id FROM event_edges"
-            " INNER JOIN events"
-            " ON prev_event_id = events.event_id"
-            " WHERE event_edges.event_id = ?"
-            " AND event_edges.is_state = ?"
-            " LIMIT ?"
-        )
+        # Look for the prev_event_id connected to the given event_id
+        query = """
+            SELECT depth, prev_event_id FROM event_edges
+            /* Get the depth of the prev_event_id from the events table */
+            INNER JOIN events
+            ON prev_event_id = events.event_id
+            /* Find an event which matches the given event_id */
+            WHERE event_edges.event_id = ?
+            AND event_edges.is_state = ?
+            LIMIT ?
+        """
+
+        # Look for the "insertion" events connected to the given event_id
+        connected_insertion_event_query = """
+            SELECT e.depth, i.event_id FROM insertion_event_edges AS i
+            /* Get the depth of the insertion event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which points via prev_events to the given event_id */
+            WHERE i.insertion_prev_event_id = ?
+            LIMIT ?
+        """
+
+        # Find any chunk connections of a given insertion event
+        chunk_connection_query = """
+            SELECT e.depth, c.event_id FROM insertion_events AS i
+            /* Find the chunk that connects to the given insertion event */
+            INNER JOIN chunk_events AS c
+            ON i.next_chunk_id = c.chunk_id
+            /* Get the depth of the chunk start event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which matches the given event_id */
+            WHERE i.event_id = ?
+            LIMIT ?
+        """
 
+        # In a PriorityQueue, the lowest valued entries are retrieved first.
+        # We're using depth as the priority in the queue.
+        # Depth is lowest at the oldest-in-time message and highest and
+        # newest-in-time message. We add events to the queue with a negative depth so that
+        # we process the newest-in-time messages first going backwards in time.
         queue = PriorityQueue()
 
         for event_id in event_list:
@@ -970,9 +1001,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
             event_results.add(event_id)
 
+            # Try and find any potential historical chunks of message history.
+            #
+            # First we look for an insertion event connected to the current
+            # event (by prev_event). If we find any, we need to go and try to
+            # find any chunk events connected to the insertion event (by
+            # chunk_id). If we find any, we'll add them to the queue and
+            # navigate up the DAG like normal in the next iteration of the loop.
+            txn.execute(
+                connected_insertion_event_query, (event_id, limit - len(event_results))
+            )
+            connected_insertion_event_id_results = txn.fetchall()
+            logger.debug(
+                "_get_backfill_events: connected_insertion_event_query %s",
+                connected_insertion_event_id_results,
+            )
+            for row in connected_insertion_event_id_results:
+                connected_insertion_event_depth = row[0]
+                connected_insertion_event = row[1]
+                queue.put((-connected_insertion_event_depth, connected_insertion_event))
+
+                # Find any chunk connections for the given insertion event
+                txn.execute(
+                    chunk_connection_query,
+                    (connected_insertion_event, limit - len(event_results)),
+                )
+                chunk_start_event_id_results = txn.fetchall()
+                logger.debug(
+                    "_get_backfill_events: chunk_start_event_id_results %s",
+                    chunk_start_event_id_results,
+                )
+                for row in chunk_start_event_id_results:
+                    if row[1] not in event_results:
+                        queue.put((-row[0], row[1]))
+
+            # Navigate up the DAG by prev_event
             txn.execute(query, (event_id, False, limit - len(event_results)))
+            prev_event_id_results = txn.fetchall()
+            logger.debug(
+                "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+            )
 
-            for row in txn:
+            for row in prev_event_id_results:
                 if row[1] not in event_results:
                     queue.put((-row[0], row[1]))
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a396a201d4..86baf397fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1502,6 +1502,9 @@ class PersistEventsStore:
 
             self._handle_event_relations(txn, event)
 
+            self._handle_insertion_event(txn, event)
+            self._handle_chunk_event(txn, event)
+
             # Store the labels for this event.
             labels = event.content.get(EventContentFields.LABELS)
             if labels:
@@ -1754,6 +1757,94 @@ class PersistEventsStore:
         if rel_type == RelationTypes.REPLACE:
             txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
 
+    def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles keeping track of insertion events and edges/connections.
+        Part of MSC2716.
+
+        Args:
+            txn: The database transaction object
+            event: The event to process
+        """
+
+        if event.type != EventTypes.MSC2716_INSERTION:
+            # Not a insertion event
+            return
+
+        # Skip processing a insertion event if the room version doesn't
+        # support it.
+        room_version = self.store.get_room_version_txn(txn, event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
+        if next_chunk_id is None:
+            # Invalid insertion event without next chunk ID
+            return
+
+        logger.debug(
+            "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+        )
+
+        # Keep track of the insertion event and the chunk ID
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="insertion_events",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "next_chunk_id": next_chunk_id,
+            },
+        )
+
+        # Insert an edge for every prev_event connection
+        for prev_event_id in event.prev_events:
+            self.db_pool.simple_insert_txn(
+                txn,
+                table="insertion_event_edges",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "insertion_prev_event_id": prev_event_id,
+                },
+            )
+
+    def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles inserting the chunk edges/connections between the chunk event
+        and an insertion event. Part of MSC2716.
+
+        Args:
+            txn: The database transaction object
+            event: The event to process
+        """
+
+        if event.type != EventTypes.MSC2716_CHUNK:
+            # Not a chunk event
+            return
+
+        # Skip processing a chunk event if the room version doesn't
+        # support it.
+        room_version = self.store.get_room_version_txn(txn, event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
+        if chunk_id is None:
+            # Invalid chunk event without a chunk ID
+            return
+
+        logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+
+        # Keep track of the insertion event and the chunk ID
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="chunk_events",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "chunk_id": chunk_id,
+            },
+        )
+
     def _handle_redaction(self, txn, redacted_event_id):
         """Handles receiving a redaction and checking whether we need to remove
         any redacted relations from the database.
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 1757064a68..8e22da99ae 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -22,7 +22,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
 from synapse.storage.state import StateFilter
@@ -58,15 +58,32 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     async def get_room_version(self, room_id: str) -> RoomVersion:
         """Get the room_version of a given room
-
         Raises:
             NotFoundError: if the room is unknown
+            UnsupportedRoomVersionError: if the room uses an unknown room version.
+                Typically this happens if support for the room's version has been
+                removed from Synapse.
+        """
+        return await self.db_pool.runInteraction(
+            "get_room_version_txn",
+            self.get_room_version_txn,
+            room_id,
+        )
 
+    def get_room_version_txn(
+        self, txn: LoggingTransaction, room_id: str
+    ) -> RoomVersion:
+        """Get the room_version of a given room
+        Args:
+            txn: Transaction object
+            room_id: The room_id of the room you are trying to get the version for
+        Raises:
+            NotFoundError: if the room is unknown
             UnsupportedRoomVersionError: if the room uses an unknown room version.
                 Typically this happens if support for the room's version has been
                 removed from Synapse.
         """
-        room_version_id = await self.get_room_version_id(room_id)
+        room_version_id = self.get_room_version_id_txn(txn, room_id)
         v = KNOWN_ROOM_VERSIONS.get(room_version_id)
 
         if not v:
@@ -80,7 +97,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     @cached(max_entries=10000)
     async def get_room_version_id(self, room_id: str) -> str:
         """Get the room_version of a given room
+        Raises:
+            NotFoundError: if the room is unknown
+        """
+        return await self.db_pool.runInteraction(
+            "get_room_version_id_txn",
+            self.get_room_version_id_txn,
+            room_id,
+        )
 
+    def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str:
+        """Get the room_version of a given room
+        Args:
+            txn: Transaction object
+            room_id: The room_id of the room you are trying to get the version for
         Raises:
             NotFoundError: if the room is unknown
         """
@@ -88,24 +118,22 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         # First we try looking up room version from the database, but for old
         # rooms we might not have added the room version to it yet so we fall
         # back to previous behaviour and look in current state events.
-
+        #
         # We really should have an entry in the rooms table for every room we
         # care about, but let's be a bit paranoid (at least while the background
         # update is happening) to avoid breaking existing rooms.
-        version = await self.db_pool.simple_select_one_onecol(
+        room_version = self.db_pool.simple_select_one_onecol_txn(
+            txn,
             table="rooms",
             keyvalues={"room_id": room_id},
             retcol="room_version",
-            desc="get_room_version",
             allow_none=True,
         )
 
-        if version is not None:
-            return version
+        if room_version is None:
+            raise NotFoundError("Could not room_version for %s" % (room_id,))
 
-        # Retrieve the room's create event
-        create_event = await self.get_create_event_for_room(room_id)
-        return create_event.content.get("room_version", "1")
+        return room_version
 
     async def get_room_predecessor(self, room_id: str) -> Optional[dict]:
         """Get the predecessor of an upgraded room if it exists.
diff --git a/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql b/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql
new file mode 100644
index 0000000000..7d7bafc631
--- /dev/null
+++ b/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql
@@ -0,0 +1,49 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a table that keeps track of "insertion" events and
+-- their next_chunk_id's so we can navigate to the next chunk of history.
+CREATE TABLE IF NOT EXISTS insertion_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    next_chunk_id TEXT NOT NULL
+);
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_events_event_id ON insertion_events(event_id);
+CREATE INDEX IF NOT EXISTS insertion_events_next_chunk_id ON insertion_events(next_chunk_id);
+
+-- Add a table that keeps track of all of the events we are inserting between.
+-- We use this when navigating the DAG and when we hit an event which matches
+-- `insertion_prev_event_id`, it should backfill from the "insertion" event and
+-- navigate the historical messages from there.
+CREATE TABLE IF NOT EXISTS insertion_event_edges(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    insertion_prev_event_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_edges_event_id ON insertion_event_edges(event_id);
+CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id);
+CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id);
+
+-- Add a table that keeps track of how each chunk is labeled. The chunks are
+-- connected together based on an insertion events `next_chunk_id`.
+CREATE TABLE IF NOT EXISTS chunk_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    chunk_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS chunk_events_event_id ON chunk_events(event_id);
+CREATE INDEX IF NOT EXISTS chunk_events_chunk_id ON chunk_events(chunk_id);