summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-07-28 10:46:37 -0500
committerGitHub <noreply@github.com>2021-07-28 10:46:37 -0500
commitd0b294ad974c05621426369a00be6bf05c4af997 (patch)
tree1a4ab4505665684c30ea50dddde1890af4a928b6
parentMerge tag 'v1.39.0rc3' into develop (diff)
downloadsynapse-d0b294ad974c05621426369a00be6bf05c4af997.tar.xz
Make historical events discoverable from backfill for servers without any scrollback history (MSC2716) (#10245)
* Make historical messages available to federated servers

Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716

Follow-up to https://github.com/matrix-org/synapse/pull/9247

* Debug message not available on federation

* Add base starting insertion point when no chunk ID is provided

* Fix messages from multiple senders in historical chunk

Follow-up to https://github.com/matrix-org/synapse/pull/9247

Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716

---

Previously, Synapse would throw a 403,
`Cannot force another user to join.`,
because we were trying to use `?user_id` from a single virtual user
which did not match with messages from other users in the chunk.

* Remove debug lines

* Messing with selecting insertion event extremeties

* Move db schema change to new version

* Add more better comments

* Make a fake requester with just what we need

See https://github.com/matrix-org/synapse/pull/10276#discussion_r660999080

* Store insertion events in table

* Make base insertion event float off on its own

See https://github.com/matrix-org/synapse/pull/10250#issuecomment-875711889

Conflicts:
	synapse/rest/client/v1/room.py

* Validate that the app service can actually control the given user

See https://github.com/matrix-org/synapse/pull/10276#issuecomment-876316455

Conflicts:
	synapse/rest/client/v1/room.py

* Add some better comments on what we're trying to check for

* Continue debugging

* Share validation logic

* Add inserted historical messages to /backfill response

* Remove debug sql queries

* Some marker event implemntation trials

* Clean up PR

* Rename insertion_event_id to just event_id

* Add some better sql comments

* More accurate description

* Add changelog

* Make it clear what MSC the change is part of

* Add more detail on which insertion event came through

* Address review and improve sql queries

* Only use event_id as unique constraint

* Fix test case where insertion event is already in the normal DAG

* Remove debug changes

* Switch to chunk events so we can auth via power_levels

Previously, we were using `content.chunk_id` to connect one
chunk to another. But these events can be from any `sender`
and we can't tell who should be able to send historical events.
We know we only want the application service to do it but these
events have the sender of a real historical message, not the
application service user ID as the sender. Other federated homeservers
also have no indicator which senders are an application service on
the originating homeserver.

So we want to auth all of the MSC2716 events via power_levels
and have them be sent by the application service with proper
PL levels in the room.

* Switch to chunk events for federation

* Add unstable room version to support new historical PL

* Fix federated events being rejected for no state_groups

Add fix from https://github.com/matrix-org/synapse/pull/10439
until it merges.

* Only connect base insertion event to prev_event_ids

Per discussion with @erikjohnston,
https://matrix.to/#/!UytJQHLQYfvYWsGrGY:jki.re/$12bTUiObDFdHLAYtT7E-BvYRp3k_xv8w0dUQHibasJk?via=jki.re&via=matrix.org

* Make it possible to get the room_version with txn

* Allow but ignore historical events in unsupported room version

See https://github.com/matrix-org/synapse/pull/10245#discussion_r675592489

We can't reject historical events on unsupported room versions because homeservers without knowledge of MSC2716 or the new room version don't reject historical events either.

Since we can't rely on the auth check here to stop historical events on unsupported room versions, I've added some additional checks in the processing/persisting code (`synapse/storage/databases/main/events.py` ->  `_handle_insertion_event` and `_handle_chunk_event`). I've had to do some refactoring so there is method to fetch the room version by `txn`.

* Move to unique index syntax

See https://github.com/matrix-org/synapse/pull/10245#discussion_r675638509

* High-level document how the insertion->chunk lookup works

* Remove create_event fallback for room_versions

See https://github.com/matrix-org/synapse/pull/10245/files#r677641879

* Use updated method name
-rw-r--r--changelog.d/10245.feature1
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/api/room_versions.py27
-rw-r--r--synapse/event_auth.py38
-rw-r--r--synapse/events/utils.py3
-rw-r--r--synapse/handlers/federation.py6
-rw-r--r--synapse/handlers/room.py1
-rw-r--r--synapse/rest/client/v1/room.py7
-rw-r--r--synapse/storage/databases/main/event_federation.py88
-rw-r--r--synapse/storage/databases/main/events.py91
-rw-r--r--synapse/storage/databases/main/state.py50
-rw-r--r--synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql49
12 files changed, 338 insertions, 26 deletions
diff --git a/changelog.d/10245.feature b/changelog.d/10245.feature
new file mode 100644
index 0000000000..b3c48cc2cc
--- /dev/null
+++ b/changelog.d/10245.feature
@@ -0,0 +1 @@
+Make historical events discoverable from backfill for servers without any scrollback history (part of MSC2716).
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index a40d6d7961..a986fdb47a 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -206,9 +206,6 @@ class EventContentFields:
     MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
     # For "marker" events
     MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
-    MSC2716_MARKER_INSERTION_PREV_EVENTS = (
-        "org.matrix.msc2716.marker.insertion_prev_events"
-    )
 
 
 class RoomTypes:
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index 697319e52d..bc678efe49 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -73,6 +73,9 @@ class RoomVersion:
     # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
     # m.room.membership event with membership 'knock'.
     msc2403_knocking = attr.ib(type=bool)
+    # MSC2716: Adds m.room.power_levels -> content.historical field to control
+    # whether "insertion", "chunk", "marker" events can be sent
+    msc2716_historical = attr.ib(type=bool)
 
 
 class RoomVersions:
@@ -88,6 +91,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V2 = RoomVersion(
         "2",
@@ -101,6 +105,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V3 = RoomVersion(
         "3",
@@ -114,6 +119,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V4 = RoomVersion(
         "4",
@@ -127,6 +133,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V5 = RoomVersion(
         "5",
@@ -140,6 +147,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V6 = RoomVersion(
         "6",
@@ -153,6 +161,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     MSC2176 = RoomVersion(
         "org.matrix.msc2176",
@@ -166,6 +175,7 @@ class RoomVersions:
         msc2176_redaction_rules=True,
         msc3083_join_rules=False,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     MSC3083 = RoomVersion(
         "org.matrix.msc3083.v2",
@@ -179,6 +189,7 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=True,
         msc2403_knocking=False,
+        msc2716_historical=False,
     )
     V7 = RoomVersion(
         "7",
@@ -192,6 +203,21 @@ class RoomVersions:
         msc2176_redaction_rules=False,
         msc3083_join_rules=False,
         msc2403_knocking=True,
+        msc2716_historical=False,
+    )
+    MSC2716 = RoomVersion(
+        "org.matrix.msc2716",
+        RoomDisposition.STABLE,
+        EventFormatVersions.V3,
+        StateResolutionVersions.V2,
+        enforce_key_validity=True,
+        special_case_aliases_auth=False,
+        strict_canonicaljson=True,
+        limit_notifications_power_levels=True,
+        msc2176_redaction_rules=False,
+        msc3083_join_rules=False,
+        msc2403_knocking=True,
+        msc2716_historical=True,
     )
 
 
@@ -207,6 +233,7 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
         RoomVersions.MSC2176,
         RoomVersions.MSC3083,
         RoomVersions.V7,
+        RoomVersions.MSC2716,
     )
 }
 
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index cc92d35477..0fa7ffc99f 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -205,6 +205,13 @@ def check(
     if event.type == EventTypes.Redaction:
         check_redaction(room_version_obj, event, auth_events)
 
+    if (
+        event.type == EventTypes.MSC2716_INSERTION
+        or event.type == EventTypes.MSC2716_CHUNK
+        or event.type == EventTypes.MSC2716_MARKER
+    ):
+        check_historical(room_version_obj, event, auth_events)
+
     logger.debug("Allowing! %s", event)
 
 
@@ -539,6 +546,37 @@ def check_redaction(
     raise AuthError(403, "You don't have permission to redact events")
 
 
+def check_historical(
+    room_version_obj: RoomVersion,
+    event: EventBase,
+    auth_events: StateMap[EventBase],
+) -> None:
+    """Check whether the event sender is allowed to send historical related
+    events like "insertion", "chunk", and "marker".
+
+    Returns:
+        None
+
+    Raises:
+        AuthError if the event sender is not allowed to send historical related events
+        ("insertion", "chunk", and "marker").
+    """
+    # Ignore the auth checks in room versions that do not support historical
+    # events
+    if not room_version_obj.msc2716_historical:
+        return
+
+    user_level = get_user_power_level(event.user_id, auth_events)
+
+    historical_level = get_named_level(auth_events, "historical", 100)
+
+    if user_level < historical_level:
+        raise AuthError(
+            403,
+            'You don\'t have permission to send send historical related events ("insertion", "chunk", and "marker")',
+        )
+
+
 def _check_power_levels(
     room_version_obj: RoomVersion,
     event: EventBase,
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index f4da9e0923..a0c07f62f4 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -126,6 +126,9 @@ def prune_event_dict(room_version: RoomVersion, event_dict: dict) -> dict:
         if room_version.msc2176_redaction_rules:
             add_fields("invite")
 
+        if room_version.msc2716_historical:
+            add_fields("historical")
+
     elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
         add_fields("aliases")
     elif event_type == EventTypes.RoomHistoryVisibility:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index aba095d2e1..8197b60b76 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2748,9 +2748,11 @@ class FederationHandler(BaseHandler):
                             event.event_id,
                             e.event_id,
                         )
-                        context = await self.state_handler.compute_event_context(e)
+                        missing_auth_event_context = (
+                            await self.state_handler.compute_event_context(e)
+                        )
                         await self._auth_and_persist_event(
-                            origin, e, context, auth_events=auth
+                            origin, e, missing_auth_event_context, auth_events=auth
                         )
 
                         if e.event_id in event_auth_events:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 370561e549..b33fe09f77 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -951,6 +951,7 @@ class RoomCreationHandler(BaseHandler):
                 "kick": 50,
                 "redact": 50,
                 "invite": 50,
+                "historical": 100,
             }
 
             if config["original_invitees_have_ops"]:
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 25ba52c624..502a917588 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -504,7 +504,6 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
 
         events_to_create = body["events"]
 
-        prev_event_ids = prev_events_from_query
         inherited_depth = await self._inherit_depth_from_prev_ids(
             prev_events_from_query
         )
@@ -516,6 +515,10 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
         chunk_id_to_connect_to = chunk_id_from_query
         base_insertion_event = None
         if chunk_id_from_query:
+            #  All but the first base insertion event should point at a fake
+            #  event, which causes the HS to ask for the state at the start of
+            #  the chunk later.
+            prev_event_ids = [fake_prev_event_id]
             # TODO: Verify the chunk_id_from_query corresponds to an insertion event
             pass
         # Otherwise, create an insertion event to act as a starting point.
@@ -526,6 +529,8 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet):
         # an insertion event), in which case we just create a new insertion event
         # that can then get pointed to by a "marker" event later.
         else:
+            prev_event_ids = prev_events_from_query
+
             base_insertion_event_dict = self._create_insertion_event_dict(
                 sender=requester.user.to_string(),
                 room_id=room_id,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f4a00b0736..547e43ab98 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -936,15 +936,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # We want to make sure that we do a breadth-first, "depth" ordered
         # search.
 
-        query = (
-            "SELECT depth, prev_event_id FROM event_edges"
-            " INNER JOIN events"
-            " ON prev_event_id = events.event_id"
-            " WHERE event_edges.event_id = ?"
-            " AND event_edges.is_state = ?"
-            " LIMIT ?"
-        )
+        # Look for the prev_event_id connected to the given event_id
+        query = """
+            SELECT depth, prev_event_id FROM event_edges
+            /* Get the depth of the prev_event_id from the events table */
+            INNER JOIN events
+            ON prev_event_id = events.event_id
+            /* Find an event which matches the given event_id */
+            WHERE event_edges.event_id = ?
+            AND event_edges.is_state = ?
+            LIMIT ?
+        """
+
+        # Look for the "insertion" events connected to the given event_id
+        connected_insertion_event_query = """
+            SELECT e.depth, i.event_id FROM insertion_event_edges AS i
+            /* Get the depth of the insertion event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which points via prev_events to the given event_id */
+            WHERE i.insertion_prev_event_id = ?
+            LIMIT ?
+        """
+
+        # Find any chunk connections of a given insertion event
+        chunk_connection_query = """
+            SELECT e.depth, c.event_id FROM insertion_events AS i
+            /* Find the chunk that connects to the given insertion event */
+            INNER JOIN chunk_events AS c
+            ON i.next_chunk_id = c.chunk_id
+            /* Get the depth of the chunk start event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which matches the given event_id */
+            WHERE i.event_id = ?
+            LIMIT ?
+        """
 
+        # In a PriorityQueue, the lowest valued entries are retrieved first.
+        # We're using depth as the priority in the queue.
+        # Depth is lowest at the oldest-in-time message and highest and
+        # newest-in-time message. We add events to the queue with a negative depth so that
+        # we process the newest-in-time messages first going backwards in time.
         queue = PriorityQueue()
 
         for event_id in event_list:
@@ -970,9 +1001,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
             event_results.add(event_id)
 
+            # Try and find any potential historical chunks of message history.
+            #
+            # First we look for an insertion event connected to the current
+            # event (by prev_event). If we find any, we need to go and try to
+            # find any chunk events connected to the insertion event (by
+            # chunk_id). If we find any, we'll add them to the queue and
+            # navigate up the DAG like normal in the next iteration of the loop.
+            txn.execute(
+                connected_insertion_event_query, (event_id, limit - len(event_results))
+            )
+            connected_insertion_event_id_results = txn.fetchall()
+            logger.debug(
+                "_get_backfill_events: connected_insertion_event_query %s",
+                connected_insertion_event_id_results,
+            )
+            for row in connected_insertion_event_id_results:
+                connected_insertion_event_depth = row[0]
+                connected_insertion_event = row[1]
+                queue.put((-connected_insertion_event_depth, connected_insertion_event))
+
+                # Find any chunk connections for the given insertion event
+                txn.execute(
+                    chunk_connection_query,
+                    (connected_insertion_event, limit - len(event_results)),
+                )
+                chunk_start_event_id_results = txn.fetchall()
+                logger.debug(
+                    "_get_backfill_events: chunk_start_event_id_results %s",
+                    chunk_start_event_id_results,
+                )
+                for row in chunk_start_event_id_results:
+                    if row[1] not in event_results:
+                        queue.put((-row[0], row[1]))
+
+            # Navigate up the DAG by prev_event
             txn.execute(query, (event_id, False, limit - len(event_results)))
+            prev_event_id_results = txn.fetchall()
+            logger.debug(
+                "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+            )
 
-            for row in txn:
+            for row in prev_event_id_results:
                 if row[1] not in event_results:
                     queue.put((-row[0], row[1]))
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a396a201d4..86baf397fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1502,6 +1502,9 @@ class PersistEventsStore:
 
             self._handle_event_relations(txn, event)
 
+            self._handle_insertion_event(txn, event)
+            self._handle_chunk_event(txn, event)
+
             # Store the labels for this event.
             labels = event.content.get(EventContentFields.LABELS)
             if labels:
@@ -1754,6 +1757,94 @@ class PersistEventsStore:
         if rel_type == RelationTypes.REPLACE:
             txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
 
+    def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles keeping track of insertion events and edges/connections.
+        Part of MSC2716.
+
+        Args:
+            txn: The database transaction object
+            event: The event to process
+        """
+
+        if event.type != EventTypes.MSC2716_INSERTION:
+            # Not a insertion event
+            return
+
+        # Skip processing a insertion event if the room version doesn't
+        # support it.
+        room_version = self.store.get_room_version_txn(txn, event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
+        if next_chunk_id is None:
+            # Invalid insertion event without next chunk ID
+            return
+
+        logger.debug(
+            "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+        )
+
+        # Keep track of the insertion event and the chunk ID
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="insertion_events",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "next_chunk_id": next_chunk_id,
+            },
+        )
+
+        # Insert an edge for every prev_event connection
+        for prev_event_id in event.prev_events:
+            self.db_pool.simple_insert_txn(
+                txn,
+                table="insertion_event_edges",
+                values={
+                    "event_id": event.event_id,
+                    "room_id": event.room_id,
+                    "insertion_prev_event_id": prev_event_id,
+                },
+            )
+
+    def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
+        """Handles inserting the chunk edges/connections between the chunk event
+        and an insertion event. Part of MSC2716.
+
+        Args:
+            txn: The database transaction object
+            event: The event to process
+        """
+
+        if event.type != EventTypes.MSC2716_CHUNK:
+            # Not a chunk event
+            return
+
+        # Skip processing a chunk event if the room version doesn't
+        # support it.
+        room_version = self.store.get_room_version_txn(txn, event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
+        if chunk_id is None:
+            # Invalid chunk event without a chunk ID
+            return
+
+        logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+
+        # Keep track of the insertion event and the chunk ID
+        self.db_pool.simple_insert_txn(
+            txn,
+            table="chunk_events",
+            values={
+                "event_id": event.event_id,
+                "room_id": event.room_id,
+                "chunk_id": chunk_id,
+            },
+        )
+
     def _handle_redaction(self, txn, redacted_event_id):
         """Handles receiving a redaction and checking whether we need to remove
         any redacted relations from the database.
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 1757064a68..8e22da99ae 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -22,7 +22,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
 from synapse.storage.state import StateFilter
@@ -58,15 +58,32 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     async def get_room_version(self, room_id: str) -> RoomVersion:
         """Get the room_version of a given room
-
         Raises:
             NotFoundError: if the room is unknown
+            UnsupportedRoomVersionError: if the room uses an unknown room version.
+                Typically this happens if support for the room's version has been
+                removed from Synapse.
+        """
+        return await self.db_pool.runInteraction(
+            "get_room_version_txn",
+            self.get_room_version_txn,
+            room_id,
+        )
 
+    def get_room_version_txn(
+        self, txn: LoggingTransaction, room_id: str
+    ) -> RoomVersion:
+        """Get the room_version of a given room
+        Args:
+            txn: Transaction object
+            room_id: The room_id of the room you are trying to get the version for
+        Raises:
+            NotFoundError: if the room is unknown
             UnsupportedRoomVersionError: if the room uses an unknown room version.
                 Typically this happens if support for the room's version has been
                 removed from Synapse.
         """
-        room_version_id = await self.get_room_version_id(room_id)
+        room_version_id = self.get_room_version_id_txn(txn, room_id)
         v = KNOWN_ROOM_VERSIONS.get(room_version_id)
 
         if not v:
@@ -80,7 +97,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     @cached(max_entries=10000)
     async def get_room_version_id(self, room_id: str) -> str:
         """Get the room_version of a given room
+        Raises:
+            NotFoundError: if the room is unknown
+        """
+        return await self.db_pool.runInteraction(
+            "get_room_version_id_txn",
+            self.get_room_version_id_txn,
+            room_id,
+        )
 
+    def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str:
+        """Get the room_version of a given room
+        Args:
+            txn: Transaction object
+            room_id: The room_id of the room you are trying to get the version for
         Raises:
             NotFoundError: if the room is unknown
         """
@@ -88,24 +118,22 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         # First we try looking up room version from the database, but for old
         # rooms we might not have added the room version to it yet so we fall
         # back to previous behaviour and look in current state events.
-
+        #
         # We really should have an entry in the rooms table for every room we
         # care about, but let's be a bit paranoid (at least while the background
         # update is happening) to avoid breaking existing rooms.
-        version = await self.db_pool.simple_select_one_onecol(
+        room_version = self.db_pool.simple_select_one_onecol_txn(
+            txn,
             table="rooms",
             keyvalues={"room_id": room_id},
             retcol="room_version",
-            desc="get_room_version",
             allow_none=True,
         )
 
-        if version is not None:
-            return version
+        if room_version is None:
+            raise NotFoundError("Could not room_version for %s" % (room_id,))
 
-        # Retrieve the room's create event
-        create_event = await self.get_create_event_for_room(room_id)
-        return create_event.content.get("room_version", "1")
+        return room_version
 
     async def get_room_predecessor(self, room_id: str) -> Optional[dict]:
         """Get the predecessor of an upgraded room if it exists.
diff --git a/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql b/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql
new file mode 100644
index 0000000000..7d7bafc631
--- /dev/null
+++ b/synapse/storage/schema/main/delta/61/01insertion_event_lookups.sql
@@ -0,0 +1,49 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a table that keeps track of "insertion" events and
+-- their next_chunk_id's so we can navigate to the next chunk of history.
+CREATE TABLE IF NOT EXISTS insertion_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    next_chunk_id TEXT NOT NULL
+);
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_events_event_id ON insertion_events(event_id);
+CREATE INDEX IF NOT EXISTS insertion_events_next_chunk_id ON insertion_events(next_chunk_id);
+
+-- Add a table that keeps track of all of the events we are inserting between.
+-- We use this when navigating the DAG and when we hit an event which matches
+-- `insertion_prev_event_id`, it should backfill from the "insertion" event and
+-- navigate the historical messages from there.
+CREATE TABLE IF NOT EXISTS insertion_event_edges(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    insertion_prev_event_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_edges_event_id ON insertion_event_edges(event_id);
+CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_room_id ON insertion_event_edges(room_id);
+CREATE INDEX IF NOT EXISTS insertion_event_edges_insertion_prev_event_id ON insertion_event_edges(insertion_prev_event_id);
+
+-- Add a table that keeps track of how each chunk is labeled. The chunks are
+-- connected together based on an insertion events `next_chunk_id`.
+CREATE TABLE IF NOT EXISTS chunk_events(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    chunk_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS chunk_events_event_id ON chunk_events(event_id);
+CREATE INDEX IF NOT EXISTS chunk_events_chunk_id ON chunk_events(chunk_id);