summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-08-04 12:07:57 -0500
committerGitHub <noreply@github.com>2021-08-04 12:07:57 -0500
commit684d19a11c3b93c9dd5fb90f43d38aa7e8c6005f (patch)
treed55571250ea653f0819e2ff697d417de280bccf1
parentImprove event caching code (#10119) (diff)
downloadsynapse-684d19a11c3b93c9dd5fb90f43d38aa7e8c6005f.tar.xz
Add support for MSC2716 marker events (#10498)
* Make historical messages available to federated servers

Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716

Follow-up to https://github.com/matrix-org/synapse/pull/9247

* Debug message not available on federation

* Add base starting insertion point when no chunk ID is provided

* Fix messages from multiple senders in historical chunk

Follow-up to https://github.com/matrix-org/synapse/pull/9247

Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716

---

Previously, Synapse would throw a 403,
`Cannot force another user to join.`,
because we were trying to use `?user_id` from a single virtual user
which did not match with messages from other users in the chunk.

* Remove debug lines

* Messing with selecting insertion event extremeties

* Move db schema change to new version

* Add more better comments

* Make a fake requester with just what we need

See https://github.com/matrix-org/synapse/pull/10276#discussion_r660999080

* Store insertion events in table

* Make base insertion event float off on its own

See https://github.com/matrix-org/synapse/pull/10250#issuecomment-875711889

Conflicts:
	synapse/rest/client/v1/room.py

* Validate that the app service can actually control the given user

See https://github.com/matrix-org/synapse/pull/10276#issuecomment-876316455

Conflicts:
	synapse/rest/client/v1/room.py

* Add some better comments on what we're trying to check for

* Continue debugging

* Share validation logic

* Add inserted historical messages to /backfill response

* Remove debug sql queries

* Some marker event implemntation trials

* Clean up PR

* Rename insertion_event_id to just event_id

* Add some better sql comments

* More accurate description

* Add changelog

* Make it clear what MSC the change is part of

* Add more detail on which insertion event came through

* Address review and improve sql queries

* Only use event_id as unique constraint

* Fix test case where insertion event is already in the normal DAG

* Remove debug changes

* Add support for MSC2716 marker events

* Process markers when we receive it over federation

* WIP: make hs2 backfill historical messages after marker event

* hs2 to better ask for insertion event extremity

But running into the `sqlite3.IntegrityError: NOT NULL constraint failed: event_to_state_groups.state_group`
error

* Add insertion_event_extremities table

* Switch to chunk events so we can auth via power_levels

Previously, we were using `content.chunk_id` to connect one
chunk to another. But these events can be from any `sender`
and we can't tell who should be able to send historical events.
We know we only want the application service to do it but these
events have the sender of a real historical message, not the
application service user ID as the sender. Other federated homeservers
also have no indicator which senders are an application service on
the originating homeserver.

So we want to auth all of the MSC2716 events via power_levels
and have them be sent by the application service with proper
PL levels in the room.

* Switch to chunk events for federation

* Add unstable room version to support new historical PL

* Messy: Fix undefined state_group for federated historical events

```
2021-07-13 02:27:57,810 - synapse.handlers.federation - 1248 - ERROR - GET-4 - Failed to backfill from hs1 because NOT NULL constraint failed: event_to_state_groups.state_group
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 1216, in try_backfill
    await self.backfill(
  File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 1035, in backfill
    await self._auth_and_persist_event(dest, event, context, backfilled=True)
  File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 2222, in _auth_and_persist_event
    await self._run_push_actions_and_persist_event(event, context, backfilled)
  File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 2244, in _run_push_actions_and_persist_event
    await self.persist_events_and_notify(
  File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 3290, in persist_events_and_notify
    events, max_stream_token = await self.storage.persistence.persist_events(
  File "/usr/local/lib/python3.8/site-packages/synapse/logging/opentracing.py", line 774, in _trace_inner
    return await func(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 320, in persist_events
    ret_vals = await yieldable_gather_results(enqueue, partitioned.items())
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 237, in handle_queue_loop
    ret = await self._per_item_callback(
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 577, in _persist_event_batch
    await self.persist_events_store._persist_events_and_state_updates(
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 176, in _persist_events_and_state_updates
    await self.db_pool.runInteraction(
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 681, in runInteraction
    result = await self.runWithConnection(
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 770, in runWithConnection
    return await make_deferred_yieldable(
  File "/usr/local/lib/python3.8/site-packages/twisted/python/threadpool.py", line 238, in inContext
    result = inContext.theWork()  # type: ignore[attr-defined]
  File "/usr/local/lib/python3.8/site-packages/twisted/python/threadpool.py", line 254, in <lambda>
    inContext.theWork = lambda: context.call(  # type: ignore[attr-defined]
  File "/usr/local/lib/python3.8/site-packages/twisted/python/context.py", line 118, in callWithContext
    return self.currentContext().callWithContext(ctx, func, *args, **kw)
  File "/usr/local/lib/python3.8/site-packages/twisted/python/context.py", line 83, in callWithContext
    return func(*args, **kw)
  File "/usr/local/lib/python3.8/site-packages/twisted/enterprise/adbapi.py", line 293, in _runWithConnection
    compat.reraise(excValue, excTraceback)
  File "/usr/local/lib/python3.8/site-packages/twisted/python/deprecate.py", line 298, in deprecatedFunction
    return function(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/twisted/python/compat.py", line 403, in reraise
    raise exception.with_traceback(traceback)
  File "/usr/local/lib/python3.8/site-packages/twisted/enterprise/adbapi.py", line 284, in _runWithConnection
    result = func(conn, *args, **kw)
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 765, in inner_func
    return func(db_conn, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 549, in new_transaction
    r = func(cursor, *args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/synapse/logging/utils.py", line 69, in wrapped
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 385, in _persist_events_txn
    self._store_event_state_mappings_txn(txn, events_and_contexts)
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 2065, in _store_event_state_mappings_txn
    self.db_pool.simple_insert_many_txn(
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 923, in simple_insert_many_txn
    txn.execute_batch(sql, vals)
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 280, in execute_batch
    self.executemany(sql, args)
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 300, in executemany
    self._do_execute(self.txn.executemany, sql, *args)
  File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 330, in _do_execute
    return func(sql, *args)
sqlite3.IntegrityError: NOT NULL constraint failed: event_to_state_groups.state_group
```

* Revert "Messy: Fix undefined state_group for federated historical events"

This reverts commit 187ab28611546321e02770944c86f30ee2bc742a.

* Fix federated events being rejected for no state_groups

Add fix from https://github.com/matrix-org/synapse/pull/10439
until it merges.

* Adapting to experimental room version

* Some log cleanup

* Add better comments around extremity fetching code and why

* Rename to be more accurate to what the function returns

* Add changelog

* Ignore rejected events

* Use simplified upsert

* Add Erik's explanation of extra event checks

See https://github.com/matrix-org/synapse/pull/10498#discussion_r680880332

* Clarify that the depth is not directly correlated to the backwards extremity that we return

See https://github.com/matrix-org/synapse/pull/10498#discussion_r681725404

* lock only matters for sqlite

See https://github.com/matrix-org/synapse/pull/10498#discussion_r681728061

* Move new SQL changes to its own delta file

* Clean up upsert docstring

* Bump database schema version (62)
-rw-r--r--changelog.d/10498.feature1
-rwxr-xr-xscripts-dev/complement.sh2
-rw-r--r--synapse/handlers/federation.py119
-rw-r--r--synapse/storage/database.py14
-rw-r--r--synapse/storage/databases/main/event_federation.py114
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/schema/__init__.py2
-rw-r--r--synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql24
8 files changed, 265 insertions, 35 deletions
diff --git a/changelog.d/10498.feature b/changelog.d/10498.feature
new file mode 100644
index 0000000000..5df896572d
--- /dev/null
+++ b/changelog.d/10498.feature
@@ -0,0 +1 @@
+Add support for "marker" events which makes historical events discoverable for servers that already have all of the scrollback history (part of MSC2716).
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh
index cba015d942..5d0ef8dd3a 100755
--- a/scripts-dev/complement.sh
+++ b/scripts-dev/complement.sh
@@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
 fi
 
 # Run the tests!
-go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
+go test -v -tags synapse_blacklist,msc2946,msc3083,msc2403,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/...
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 8197b60b76..8b602e3813 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -42,6 +42,7 @@ from twisted.internet import defer
 
 from synapse import event_auth
 from synapse.api.constants import (
+    EventContentFields,
     EventTypes,
     Membership,
     RejectedReason,
@@ -262,7 +263,12 @@ class FederationHandler(BaseHandler):
 
         state = None
 
-        # Get missing pdus if necessary.
+        # Check that the event passes auth based on the state at the event. This is
+        # done for events that are to be added to the timeline (non-outliers).
+        #
+        # Get missing pdus if necessary:
+        #  - Fetching any missing prev events to fill in gaps in the graph
+        #  - Fetching state if we have a hole in the graph
         if not pdu.internal_metadata.is_outlier():
             # We only backfill backwards to the min depth.
             min_depth = await self.get_min_depth_for_context(pdu.room_id)
@@ -432,6 +438,13 @@ class FederationHandler(BaseHandler):
                         affected=event_id,
                     )
 
+        # A second round of checks for all events. Check that the event passes auth
+        # based on `auth_events`, this allows us to assert that the event would
+        # have been allowed at some point. If an event passes this check its OK
+        # for it to be used as part of a returned `/state` request, as either
+        # a) we received the event as part of the original join and so trust it, or
+        # b) we'll do a state resolution with existing state before it becomes
+        # part of the "current state", which adds more protection.
         await self._process_received_pdu(origin, pdu, state=state)
 
     async def _get_missing_events_for_pdu(
@@ -889,6 +902,79 @@ class FederationHandler(BaseHandler):
                     "resync_device_due_to_pdu", self._resync_device, event.sender
                 )
 
+        await self._handle_marker_event(origin, event)
+
+    async def _handle_marker_event(self, origin: str, marker_event: EventBase):
+        """Handles backfilling the insertion event when we receive a marker
+        event that points to one.
+
+        Args:
+            origin: Origin of the event. Will be called to get the insertion event
+            marker_event: The event to process
+        """
+
+        if marker_event.type != EventTypes.MSC2716_MARKER:
+            # Not a marker event
+            return
+
+        if marker_event.rejected_reason is not None:
+            # Rejected event
+            return
+
+        # Skip processing a marker event if the room version doesn't
+        # support it.
+        room_version = await self.store.get_room_version(marker_event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        logger.debug("_handle_marker_event: received %s", marker_event)
+
+        insertion_event_id = marker_event.content.get(
+            EventContentFields.MSC2716_MARKER_INSERTION
+        )
+
+        if insertion_event_id is None:
+            # Nothing to retrieve then (invalid marker)
+            return
+
+        logger.debug(
+            "_handle_marker_event: backfilling insertion event %s", insertion_event_id
+        )
+
+        await self._get_events_and_persist(
+            origin,
+            marker_event.room_id,
+            [insertion_event_id],
+        )
+
+        insertion_event = await self.store.get_event(
+            insertion_event_id, allow_none=True
+        )
+        if insertion_event is None:
+            logger.warning(
+                "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
+                origin,
+                insertion_event_id,
+                marker_event.event_id,
+            )
+            return
+
+        logger.debug(
+            "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
+            insertion_event,
+            marker_event,
+        )
+
+        await self.store.insert_insertion_extremity(
+            insertion_event_id, marker_event.room_id
+        )
+
+        logger.debug(
+            "_handle_marker_event: insertion extremity added for %s from marker event %s",
+            insertion_event,
+            marker_event,
+        )
+
     async def _resync_device(self, sender: str) -> None:
         """We have detected that the device list for the given user may be out
         of sync, so we try and resync them.
@@ -1057,9 +1143,19 @@ class FederationHandler(BaseHandler):
     async def _maybe_backfill_inner(
         self, room_id: str, current_depth: int, limit: int
     ) -> bool:
-        extremities = await self.store.get_oldest_events_with_depth_in_room(room_id)
+        oldest_events_with_depth = (
+            await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
+        )
+        insertion_events_to_be_backfilled = (
+            await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
+        )
+        logger.debug(
+            "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
+            oldest_events_with_depth,
+            insertion_events_to_be_backfilled,
+        )
 
-        if not extremities:
+        if not oldest_events_with_depth and not insertion_events_to_be_backfilled:
             logger.debug("Not backfilling as no extremeties found.")
             return False
 
@@ -1089,10 +1185,12 @@ class FederationHandler(BaseHandler):
         #   state *before* the event, ignoring the special casing certain event
         #   types have.
 
-        forward_events = await self.store.get_successor_events(list(extremities))
+        forward_event_ids = await self.store.get_successor_events(
+            list(oldest_events_with_depth)
+        )
 
         extremities_events = await self.store.get_events(
-            forward_events,
+            forward_event_ids,
             redact_behaviour=EventRedactBehaviour.AS_IS,
             get_prev_content=False,
         )
@@ -1106,10 +1204,19 @@ class FederationHandler(BaseHandler):
             redact=False,
             check_history_visibility_only=True,
         )
+        logger.debug(
+            "_maybe_backfill_inner: filtered_extremities %s", filtered_extremities
+        )
 
-        if not filtered_extremities:
+        if not filtered_extremities and not insertion_events_to_be_backfilled:
             return False
 
+        extremities = {
+            **oldest_events_with_depth,
+            # TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks
+            **insertion_events_to_be_backfilled,
+        }
+
         # Check if we reached a point where we should start backfilling.
         sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1]))
         max_depth = sorted_extremeties_tuple[0][1]
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index c8015a3848..95d2caff62 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -941,13 +941,13 @@ class DatabasePool:
 
         `lock` should generally be set to True (the default), but can be set
         to False if either of the following are true:
-
-        * there is a UNIQUE INDEX on the key columns. In this case a conflict
-          will cause an IntegrityError in which case this function will retry
-          the update.
-
-        * we somehow know that we are the only thread which will be updating
-          this table.
+            1. there is a UNIQUE INDEX on the key columns. In this case a conflict
+            will cause an IntegrityError in which case this function will retry
+            the update.
+            2. we somehow know that we are the only thread which will be updating
+            this table.
+        As an additional note, this parameter only matters for old SQLite versions
+        because we will use native upserts otherwise.
 
         Args:
             table: The table to upsert into
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 44018c1c31..bddf5ef192 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -671,27 +671,97 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # Return all events where not all sets can reach them.
         return {eid for eid, n in event_to_missing_sets.items() if n}
 
-    async def get_oldest_events_with_depth_in_room(self, room_id):
+    async def get_oldest_event_ids_with_depth_in_room(self, room_id) -> Dict[str, int]:
+        """Gets the oldest events(backwards extremities) in the room along with the
+        aproximate depth.
+
+        We use this function so that we can compare and see if someones current
+        depth at their current scrollback is within pagination range of the
+        event extremeties. If the current depth is close to the depth of given
+        oldest event, we can trigger a backfill.
+
+        Args:
+            room_id: Room where we want to find the oldest events
+
+        Returns:
+            Map from event_id to depth
+        """
+
+        def get_oldest_event_ids_with_depth_in_room_txn(txn, room_id):
+            # Assemble a dictionary with event_id -> depth for the oldest events
+            # we know of in the room. Backwards extremeties are the oldest
+            # events we know of in the room but we only know of them because
+            # some other event referenced them by prev_event and aren't peristed
+            # in our database yet (meaning we don't know their depth
+            # specifically). So we need to look for the aproximate depth from
+            # the events connected to the current backwards extremeties.
+            sql = """
+                SELECT b.event_id, MAX(e.depth) FROM events as e
+                /**
+                 * Get the edge connections from the event_edges table
+                 * so we can see whether this event's prev_events points
+                 * to a backward extremity in the next join.
+                 */
+                INNER JOIN event_edges as g
+                ON g.event_id = e.event_id
+                /**
+                 * We find the "oldest" events in the room by looking for
+                 * events connected to backwards extremeties (oldest events
+                 * in the room that we know of so far).
+                 */
+                INNER JOIN event_backward_extremities as b
+                ON g.prev_event_id = b.event_id
+                WHERE b.room_id = ? AND g.is_state is ?
+                GROUP BY b.event_id
+            """
+
+            txn.execute(sql, (room_id, False))
+
+            return dict(txn)
+
         return await self.db_pool.runInteraction(
-            "get_oldest_events_with_depth_in_room",
-            self.get_oldest_events_with_depth_in_room_txn,
+            "get_oldest_event_ids_with_depth_in_room",
+            get_oldest_event_ids_with_depth_in_room_txn,
             room_id,
         )
 
-    def get_oldest_events_with_depth_in_room_txn(self, txn, room_id):
-        sql = (
-            "SELECT b.event_id, MAX(e.depth) FROM events as e"
-            " INNER JOIN event_edges as g"
-            " ON g.event_id = e.event_id"
-            " INNER JOIN event_backward_extremities as b"
-            " ON g.prev_event_id = b.event_id"
-            " WHERE b.room_id = ? AND g.is_state is ?"
-            " GROUP BY b.event_id"
-        )
+    async def get_insertion_event_backwards_extremities_in_room(
+        self, room_id
+    ) -> Dict[str, int]:
+        """Get the insertion events we know about that we haven't backfilled yet.
 
-        txn.execute(sql, (room_id, False))
+        We use this function so that we can compare and see if someones current
+        depth at their current scrollback is within pagination range of the
+        insertion event. If the current depth is close to the depth of given
+        insertion event, we can trigger a backfill.
 
-        return dict(txn)
+        Args:
+            room_id: Room where we want to find the oldest events
+
+        Returns:
+            Map from event_id to depth
+        """
+
+        def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
+            sql = """
+                SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
+                /* We only want insertion events that are also marked as backwards extremities */
+                INNER JOIN insertion_event_extremities as b USING (event_id)
+                /* Get the depth of the insertion event from the events table */
+                INNER JOIN events AS e USING (event_id)
+                WHERE b.room_id = ?
+                GROUP BY b.event_id
+            """
+
+            txn.execute(sql, (room_id,))
+
+            return dict(txn)
+
+        return await self.db_pool.runInteraction(
+            "get_insertion_event_backwards_extremities_in_room",
+            get_insertion_event_backwards_extremities_in_room_txn,
+            room_id,
+        )
 
     async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
         """Returns the event ID and depth for the event that has the max depth from a set of event IDs
@@ -1041,7 +1111,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
                     if row[1] not in event_results:
                         queue.put((-row[0], row[1]))
 
-            # Navigate up the DAG by prev_event
             txn.execute(query, (event_id, False, limit - len(event_results)))
             prev_event_id_results = txn.fetchall()
             logger.debug(
@@ -1136,6 +1205,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             _delete_old_forward_extrem_cache_txn,
         )
 
+    async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
+        await self.db_pool.simple_upsert(
+            table="insertion_event_extremities",
+            keyvalues={"event_id": event_id},
+            values={
+                "event_id": event_id,
+                "room_id": room_id,
+            },
+            insertion_values={},
+            desc="insert_insertion_extremity",
+            lock=False,
+        )
+
     async def insert_received_event_to_staging(
         self, origin: str, event: EventBase
     ) -> None:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 86baf397fb..40b53274fb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1845,6 +1845,18 @@ class PersistEventsStore:
             },
         )
 
+        # When we receive an event with a `chunk_id` referencing the
+        # `next_chunk_id` of the insertion event, we can remove it from the
+        # `insertion_event_extremities` table.
+        sql = """
+            DELETE FROM insertion_event_extremities WHERE event_id IN (
+                SELECT event_id FROM insertion_events
+                WHERE next_chunk_id = ?
+            )
+        """
+
+        txn.execute(sql, (chunk_id,))
+
     def _handle_redaction(self, txn, redacted_event_id):
         """Handles receiving a redaction and checking whether we need to remove
         any redacted relations from the database.
@@ -2101,15 +2113,17 @@ class PersistEventsStore:
 
         Forward extremities are handled when we first start persisting the events.
         """
+        # From the events passed in, add all of the prev events as backwards extremities.
+        # Ignore any events that are already backwards extrems or outliers.
         query = (
             "INSERT INTO event_backward_extremities (event_id, room_id)"
             " SELECT ?, ? WHERE NOT EXISTS ("
-            " SELECT 1 FROM event_backward_extremities"
-            " WHERE event_id = ? AND room_id = ?"
+            "   SELECT 1 FROM event_backward_extremities"
+            "   WHERE event_id = ? AND room_id = ?"
             " )"
             " AND NOT EXISTS ("
-            " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            " AND outlier = ?"
+            "   SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+            "   AND outlier = ?"
             " )"
         )
 
@@ -2123,6 +2137,8 @@ class PersistEventsStore:
             ],
         )
 
+        # Delete all these events that we've already fetched and now know that their
+        # prev events are the new backwards extremeties.
         query = (
             "DELETE FROM event_backward_extremities"
             " WHERE event_id = ? AND room_id = ?"
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 36340a652a..fd4dd67d91 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 61
+SCHEMA_VERSION = 62
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
diff --git a/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql b/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql
new file mode 100644
index 0000000000..b731ef284a
--- /dev/null
+++ b/synapse/storage/schema/main/delta/62/01insertion_event_extremities.sql
@@ -0,0 +1,24 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Add a table that keeps track of which "insertion" events need to be backfilled
+CREATE TABLE IF NOT EXISTS insertion_event_extremities(
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS insertion_event_extremities_event_id ON insertion_event_extremities(event_id);
+CREATE INDEX IF NOT EXISTS insertion_event_extremities_room_id ON insertion_event_extremities(room_id);