summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/tests.yml4
-rw-r--r--Cargo.lock4
-rw-r--r--changelog.d/15233.misc1
-rw-r--r--changelog.d/15680.bugfix1
-rw-r--r--changelog.d/15737.feature1
-rw-r--r--changelog.d/15743.misc1
-rw-r--r--changelog.d/15748.removal1
-rw-r--r--changelog.d/15755.misc1
-rw-r--r--changelog.d/15758.bugfix1
-rw-r--r--changelog.d/15770.bugfix1
-rw-r--r--changelog.d/15772.doc1
-rw-r--r--changelog.d/15776.bugfix1
-rw-r--r--changelog.d/15781.bugfix1
-rw-r--r--changelog.d/15788.bugfix1
-rw-r--r--changelog.d/15804.bugfix1
-rw-r--r--changelog.d/15805.doc1
-rw-r--r--changelog.d/15806.misc1
-rw-r--r--contrib/lnav/synapse-log-format.json2
-rw-r--r--docker/complement/conf/workers-shared-extra.yaml.j22
-rwxr-xr-xdocker/configure_workers_and_start.py1
-rw-r--r--docs/admin_api/rooms.md2
-rw-r--r--docs/workers.md1
-rw-r--r--poetry.lock223
-rw-r--r--pyproject.toml2
-rw-r--r--rust/src/push/base_rules.rs4
-rwxr-xr-xscripts-dev/complement.sh4
-rwxr-xr-xscripts-dev/federation_client.py4
-rw-r--r--synapse/api/constants.py14
-rw-r--r--synapse/api/room_versions.py61
-rw-r--r--synapse/app/generic_worker.py2
-rw-r--r--synapse/config/experimental.py3
-rw-r--r--synapse/event_auth.py40
-rw-r--r--synapse/events/__init__.py9
-rw-r--r--synapse/events/snapshot.py159
-rw-r--r--synapse/events/utils.py9
-rw-r--r--synapse/federation/federation_client.py5
-rw-r--r--synapse/federation/federation_server.py4
-rw-r--r--synapse/federation/sender/__init__.py34
-rw-r--r--synapse/handlers/federation.py33
-rw-r--r--synapse/handlers/federation_event.py109
-rw-r--r--synapse/handlers/message.py168
-rw-r--r--synapse/handlers/pagination.py137
-rw-r--r--synapse/handlers/room_batch.py466
-rw-r--r--synapse/handlers/room_member.py56
-rw-r--r--synapse/http/federation/matrix_federation_agent.py14
-rw-r--r--synapse/http/matrixfederationclient.py9
-rw-r--r--synapse/media/_base.py15
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py1
-rw-r--r--synapse/push/clientformat.py26
-rw-r--r--synapse/rest/__init__.py2
-rw-r--r--synapse/rest/client/room_batch.py254
-rw-r--r--synapse/rest/client/versions.py2
-rw-r--r--synapse/server.py5
-rw-r--r--synapse/storage/controllers/persist_events.py5
-rw-r--r--synapse/storage/database.py2
-rw-r--r--synapse/storage/databases/main/__init__.py2
-rw-r--r--synapse/storage/databases/main/event_federation.py211
-rw-r--r--synapse/storage/databases/main/events.py140
-rw-r--r--synapse/storage/databases/main/events_worker.py2
-rw-r--r--synapse/storage/databases/main/room_batch.py47
-rw-r--r--synapse/util/__init__.py5
-rw-r--r--synapse/util/caches/lrucache.py8
-rw-r--r--tests/events/test_snapshot.py3
-rw-r--r--tests/federation/test_federation_catch_up.py22
-rw-r--r--tests/federation/test_federation_client.py4
-rw-r--r--tests/http/federation/test_matrix_federation_agent.py38
-rw-r--r--tests/media/test_base.py12
-rw-r--r--tests/media/test_media_storage.py20
-rw-r--r--tests/rest/client/test_push_rule_attrs.py67
-rw-r--r--tests/rest/client/test_room_batch.py302
-rw-r--r--tests/storage/databases/main/test_events_worker.py49
-rw-r--r--tests/storage/test_event_chain.py5
-rw-r--r--tests/storage/test_event_federation.py211
-rw-r--r--tests/test_state.py11
74 files changed, 667 insertions, 2404 deletions
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index a0d1c24e90..6c22984997 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -399,8 +399,8 @@ jobs:
       env:
         SYTEST_BRANCH: ${{ github.head_ref }}
         POSTGRES: ${{ matrix.job.postgres && 1}}
-        MULTI_POSTGRES: ${{ (matrix.job.postgres == 'multi-postgres') && 1}}
-        ASYNCIO_REACTOR: ${{ (matrix.job.reactor == 'asyncio') && 1 }}
+        MULTI_POSTGRES: ${{ (matrix.job.postgres == 'multi-postgres') || '' }}
+        ASYNCIO_REACTOR: ${{ (matrix.job.reactor == 'asyncio') || '' }}
         WORKERS: ${{ matrix.job.workers && 1 }}
         BLACKLIST: ${{ matrix.job.workers && 'synapse-blacklist-with-workers' }}
         TOP: ${{ github.workspace }}
diff --git a/Cargo.lock b/Cargo.lock
index 9bb8225226..51ff26ec1b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -340,9 +340,9 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.96"
+version = "1.0.97"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "057d394a50403bcac12672b2b18fb387ab6d289d957dab67dd201875391e52f1"
+checksum = "bdf3bf93142acad5821c99197022e170842cdbc1c30482b98750c688c640842a"
 dependencies = [
  "itoa",
  "ryu",
diff --git a/changelog.d/15233.misc b/changelog.d/15233.misc
new file mode 100644
index 0000000000..1dff00bf3c
--- /dev/null
+++ b/changelog.d/15233.misc
@@ -0,0 +1 @@
+Replace `EventContext` fields `prev_group` and `delta_ids` with field `state_group_deltas`.
diff --git a/changelog.d/15680.bugfix b/changelog.d/15680.bugfix
new file mode 100644
index 0000000000..04ac19b4ec
--- /dev/null
+++ b/changelog.d/15680.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where media files were served in an unsafe manner. Contributed by @joshqou.
diff --git a/changelog.d/15737.feature b/changelog.d/15737.feature
new file mode 100644
index 0000000000..9a547b5ebd
--- /dev/null
+++ b/changelog.d/15737.feature
@@ -0,0 +1 @@
+Improve `/messages` response time by avoiding backfill when we already have messages to return.
diff --git a/changelog.d/15743.misc b/changelog.d/15743.misc
new file mode 100644
index 0000000000..b95eed929e
--- /dev/null
+++ b/changelog.d/15743.misc
@@ -0,0 +1 @@
+Regularly try to send transactions to other servers after they failed instead of waiting for a new event to be available before trying.
diff --git a/changelog.d/15748.removal b/changelog.d/15748.removal
new file mode 100644
index 0000000000..dcb9780178
--- /dev/null
+++ b/changelog.d/15748.removal
@@ -0,0 +1 @@
+Remove experimental [MSC2716](https://github.com/matrix-org/matrix-spec-proposals/pull/2716) implementation to incrementally import history into existing rooms.
diff --git a/changelog.d/15755.misc b/changelog.d/15755.misc
new file mode 100644
index 0000000000..a65340d380
--- /dev/null
+++ b/changelog.d/15755.misc
@@ -0,0 +1 @@
+Fix requesting multiple keys at once over federation, related to [MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983).
diff --git a/changelog.d/15758.bugfix b/changelog.d/15758.bugfix
new file mode 100644
index 0000000000..cabe25ca24
--- /dev/null
+++ b/changelog.d/15758.bugfix
@@ -0,0 +1 @@
+Avoid invalidating a cache that was just prefilled.
diff --git a/changelog.d/15770.bugfix b/changelog.d/15770.bugfix
new file mode 100644
index 0000000000..a65340d380
--- /dev/null
+++ b/changelog.d/15770.bugfix
@@ -0,0 +1 @@
+Fix requesting multiple keys at once over federation, related to [MSC3983](https://github.com/matrix-org/matrix-spec-proposals/pull/3983).
diff --git a/changelog.d/15772.doc b/changelog.d/15772.doc
new file mode 100644
index 0000000000..4d6c933c71
--- /dev/null
+++ b/changelog.d/15772.doc
@@ -0,0 +1 @@
+Document `looping_call()` functionality that will wait for the given function to finish before scheduling another.
diff --git a/changelog.d/15776.bugfix b/changelog.d/15776.bugfix
new file mode 100644
index 0000000000..f146a85f1a
--- /dev/null
+++ b/changelog.d/15776.bugfix
@@ -0,0 +1 @@
+Fix joining rooms through aliases where the alias server isn't a real homeserver. Contributed by @tulir @ Beeper.
diff --git a/changelog.d/15781.bugfix b/changelog.d/15781.bugfix
new file mode 100644
index 0000000000..5faf59afee
--- /dev/null
+++ b/changelog.d/15781.bugfix
@@ -0,0 +1 @@
+Fix a bug in push rules handling leading to an invalid (per spec) `is_user_mention` rule sent to clients. Also fix wrong rule names for `is_user_mention` and `is_room_mention`.
\ No newline at end of file
diff --git a/changelog.d/15788.bugfix b/changelog.d/15788.bugfix
new file mode 100644
index 0000000000..d22aae7baf
--- /dev/null
+++ b/changelog.d/15788.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in 1.57.0 where the wrong table would be locked on updating database rows when using SQLite as the database backend.
\ No newline at end of file
diff --git a/changelog.d/15804.bugfix b/changelog.d/15804.bugfix
new file mode 100644
index 0000000000..7c8b954397
--- /dev/null
+++ b/changelog.d/15804.bugfix
@@ -0,0 +1 @@
+Fix Sytest environmental variable evaluation in CI.
diff --git a/changelog.d/15805.doc b/changelog.d/15805.doc
new file mode 100644
index 0000000000..446f627cfc
--- /dev/null
+++ b/changelog.d/15805.doc
@@ -0,0 +1 @@
+Fix a typo in the [Admin API](https://matrix-org.github.io/synapse/latest/usage/administration/admin_api/index.html).
diff --git a/changelog.d/15806.misc b/changelog.d/15806.misc
new file mode 100644
index 0000000000..80d0eb2f8f
--- /dev/null
+++ b/changelog.d/15806.misc
@@ -0,0 +1 @@
+Switch from `matrix://` to `matrix-federation://` scheme for internal Synapse routing of outbound federation traffic.
diff --git a/contrib/lnav/synapse-log-format.json b/contrib/lnav/synapse-log-format.json
index ad7017ee5e..649cd623e8 100644
--- a/contrib/lnav/synapse-log-format.json
+++ b/contrib/lnav/synapse-log-format.json
@@ -29,7 +29,7 @@
         "level": "error"
       },
       {
-        "line": "my-matrix-server-federation-sender-1 | 2023-01-25 20:56:20,995 - synapse.http.matrixfederationclient - 709 - WARNING - federation_transaction_transmission_loop-3 - {PUT-O-3} [example.com] Request failed: PUT matrix://example.com/_matrix/federation/v1/send/1674680155797: HttpResponseException('403: Forbidden')",
+        "line": "my-matrix-server-federation-sender-1 | 2023-01-25 20:56:20,995 - synapse.http.matrixfederationclient - 709 - WARNING - federation_transaction_transmission_loop-3 - {PUT-O-3} [example.com] Request failed: PUT matrix-federation://example.com/_matrix/federation/v1/send/1674680155797: HttpResponseException('403: Forbidden')",
         "level": "warning"
       },
       {
diff --git a/docker/complement/conf/workers-shared-extra.yaml.j2 b/docker/complement/conf/workers-shared-extra.yaml.j2
index 63acf86a46..2b11b487f6 100644
--- a/docker/complement/conf/workers-shared-extra.yaml.j2
+++ b/docker/complement/conf/workers-shared-extra.yaml.j2
@@ -92,8 +92,6 @@ allow_device_name_lookup_over_federation: true
 ## Experimental Features ##
 
 experimental_features:
-  # Enable history backfilling support
-  msc2716_enabled: true
   # client-side support for partial state in /send_join responses
   faster_joins: true
   # Enable support for polls
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 87a740e3d4..62fb88daab 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -244,7 +244,6 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
             "^/_matrix/client/(api/v1|r0|v3|unstable)/join/",
             "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
             "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
-            "^/_matrix/client/(v1|unstable/org.matrix.msc2716)/rooms/.*/batch_send",
         ],
         "shared_extra_conf": {},
         "worker_extra_conf": "",
diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md
index 66b29e82dc..90b06045a8 100644
--- a/docs/admin_api/rooms.md
+++ b/docs/admin_api/rooms.md
@@ -419,7 +419,7 @@ The following query parameters are available:
 
 * `from` (required) - The token to start returning events from. This token can be obtained from a prev_batch
   or next_batch token returned by the /sync endpoint, or from an end token returned by a previous request to this endpoint.
-* `to` - The token to spot returning events at.
+* `to` - The token to stop returning events at.
 * `limit` - The maximum number of events to return. Defaults to `10`.
 * `filter` - A JSON RoomEventFilter to filter returned events with.
 * `dir` - The direction to return events from. Either `f` for forwards or `b` for backwards. Setting
diff --git a/docs/workers.md b/docs/workers.md
index 991814c0bc..735128762a 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -232,7 +232,6 @@ information.
     ^/_matrix/client/v1/rooms/.*/hierarchy$
     ^/_matrix/client/(v1|unstable)/rooms/.*/relations/
     ^/_matrix/client/v1/rooms/.*/threads$
-    ^/_matrix/client/unstable/org.matrix.msc2716/rooms/.*/batch_send$
     ^/_matrix/client/unstable/im.nheko.summary/rooms/.*/summary$
     ^/_matrix/client/(r0|v3|unstable)/account/3pid$
     ^/_matrix/client/(r0|v3|unstable)/account/whoami$
diff --git a/poetry.lock b/poetry.lock
index cf4a89c85a..88f1e9548a 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -32,21 +32,24 @@ wrapt = [
 
 [[package]]
 name = "attrs"
-version = "22.2.0"
+version = "23.1.0"
 description = "Classes Without Boilerplate"
 optional = false
-python-versions = ">=3.6"
+python-versions = ">=3.7"
 files = [
-    {file = "attrs-22.2.0-py3-none-any.whl", hash = "sha256:29e95c7f6778868dbd49170f98f8818f78f3dc5e0e37c0b1f474e3561b240836"},
-    {file = "attrs-22.2.0.tar.gz", hash = "sha256:c9227bfc2f01993c03f68db37d1d15c9690188323c067c641f1a35ca58185f99"},
+    {file = "attrs-23.1.0-py3-none-any.whl", hash = "sha256:1f28b4522cdc2fb4256ac1a020c78acf9cba2c6b461ccd2c126f3aa8e8335d04"},
+    {file = "attrs-23.1.0.tar.gz", hash = "sha256:6279836d581513a26f1bf235f9acd333bc9115683f14f7e8fae46c98fc50e015"},
 ]
 
+[package.dependencies]
+importlib-metadata = {version = "*", markers = "python_version < \"3.8\""}
+
 [package.extras]
-cov = ["attrs[tests]", "coverage-enable-subprocess", "coverage[toml] (>=5.3)"]
-dev = ["attrs[docs,tests]"]
-docs = ["furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier", "zope.interface"]
-tests = ["attrs[tests-no-zope]", "zope.interface"]
-tests-no-zope = ["cloudpickle", "cloudpickle", "hypothesis", "hypothesis", "mypy (>=0.971,<0.990)", "mypy (>=0.971,<0.990)", "pympler", "pympler", "pytest (>=4.3.0)", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-mypy-plugins", "pytest-xdist[psutil]", "pytest-xdist[psutil]"]
+cov = ["attrs[tests]", "coverage[toml] (>=5.3)"]
+dev = ["attrs[docs,tests]", "pre-commit"]
+docs = ["furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier", "zope-interface"]
+tests = ["attrs[tests-no-zope]", "zope-interface"]
+tests-no-zope = ["cloudpickle", "hypothesis", "mypy (>=1.1.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"]
 
 [[package]]
 name = "authlib"
@@ -725,89 +728,89 @@ files = [
 
 [[package]]
 name = "ijson"
-version = "3.2.0.post0"
+version = "3.2.1"
 description = "Iterative JSON parser with standard Python iterator interfaces"
 optional = false
 python-versions = "*"
 files = [
-    {file = "ijson-3.2.0.post0-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:5809752045ef74c26adf159ed03df7fb7e7a8d656992fd7562663ed47d6d39d9"},
-    {file = "ijson-3.2.0.post0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:ce4be2beece2629bd24bcab147741d1532bd5ed40fb52f2b4fcde5c5bf606df0"},
-    {file = "ijson-3.2.0.post0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:5d365df54d18076f1d5f2ffb1eef2ac7f0d067789838f13d393b5586fbb77b02"},
-    {file = "ijson-3.2.0.post0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5c93ae4d49d8cf8accfedc8a8e7815851f56ceb6e399b0c186754a68fed22844"},
-    {file = "ijson-3.2.0.post0-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:47a56e3628c227081a2aa58569cbf2af378bad8af648aa904080e87cd6644cfb"},
-    {file = "ijson-3.2.0.post0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a8af68fe579f6f0b9a8b3f033d10caacfed6a4b89b8c7a1d9478a8f5d8aba4a1"},
-    {file = "ijson-3.2.0.post0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:6eed1ddd3147de49226db4f213851cf7860493a7b6c7bd5e62516941c007094c"},
-    {file = "ijson-3.2.0.post0-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:9ecbf85a6d73fc72f6534c38f7d92ed15d212e29e0dbe9810a465d61c8a66d23"},
-    {file = "ijson-3.2.0.post0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:fd218b338ac68213c997d4c88437c0e726f16d301616bf837e1468901934042c"},
-    {file = "ijson-3.2.0.post0-cp310-cp310-win32.whl", hash = "sha256:4e7c4fdc7d24747c8cc7d528c145afda4de23210bf4054bd98cd63bf07e4882d"},
-    {file = "ijson-3.2.0.post0-cp310-cp310-win_amd64.whl", hash = "sha256:4d4e143908f47307042c9678803d27706e0e2099d0a6c1988c6cae1da07760bf"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:56500dac8f52989ef7c0075257a8b471cbea8ef77f1044822742b3cbf2246e8b"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:535665a77408b6bea56eb828806fae125846dff2e2e0ed4cb2e0a8e36244d753"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a4465c90b25ca7903410fabe4145e7b45493295cc3b84ec1216653fbe9021276"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:efee1e9b4f691e1086730f3010e31c55625bc2e0f7db292a38a2cdf2774c2e13"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6fd55f7a46429de95383fc0d0158c1bfb798e976d59d52830337343c2d9bda5c"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:25919b444426f58dcc62f763d1c6be6297f309da85ecab55f51da6ca86fc9fdf"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:c85892d68895ba7a0b16a0e6b7d9f9a0e30e86f2b1e0f6986243473ba8735432"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:27409ba44cfd006901971063d37699f72e092b5efaa1586288b5067d80c6b5bd"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:11dfd64633fe1382c4237477ac3836f682ca17e25e0d0799e84737795b0611df"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-win32.whl", hash = "sha256:41e955e173f77f54337fecaaa58a35c464b75e232b1f939b282497134a4d4f0e"},
-    {file = "ijson-3.2.0.post0-cp311-cp311-win_amd64.whl", hash = "sha256:b3bdd2e12d9b9a18713dd6f3c5ef3734fdab25b79b177054ba9e35ecc746cb6e"},
-    {file = "ijson-3.2.0.post0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:26b57838e712b8852c40ec6d74c6de8bb226446440e1af1354c077a6f81b9142"},
-    {file = "ijson-3.2.0.post0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f6464242f7895268d3086d7829ef031b05c77870dad1e13e51ef79d0a9cfe029"},
-    {file = "ijson-3.2.0.post0-cp36-cp36m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b3c6cf18b61b94db9590f86af0dd60edbccb36e151643152b8688066f677fbc9"},
-    {file = "ijson-3.2.0.post0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:992e9e68003df32e2aa0f31eb82c0a94f21286203ab2f2b2c666410e17b59d2f"},
-    {file = "ijson-3.2.0.post0-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:d3e255ef05b434f20fc9d4b18ea15733d1038bec3e4960d772b06216fa79e82d"},
-    {file = "ijson-3.2.0.post0-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:424232c2bf3e8181f1b572db92c179c2376b57eba9fc8931453fba975f48cb80"},
-    {file = "ijson-3.2.0.post0-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:bced6cd5b09d4d002dda9f37292dd58d26eb1c4d0d179b820d3708d776300bb4"},
-    {file = "ijson-3.2.0.post0-cp36-cp36m-win32.whl", hash = "sha256:a8c84dff2d60ae06d5280ec87cd63050bbd74a90c02bfc7c390c803cfc8ac8fc"},
-    {file = "ijson-3.2.0.post0-cp36-cp36m-win_amd64.whl", hash = "sha256:a340413a9bf307fafd99254a4dd4ac6c567b91a205bf896dde18888315fd7fcd"},
-    {file = "ijson-3.2.0.post0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:b3456cd5b16ec9db3ef23dd27f37bf5a14f765e8272e9af3e3de9ee9a4cba867"},
-    {file = "ijson-3.2.0.post0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0eb838b4e4360e65c00aa13c78b35afc2477759d423b602b60335af5bed3de5b"},
-    {file = "ijson-3.2.0.post0-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fe7f414edd69dd9199b0dfffa0ada22f23d8009e10fe2a719e0993b7dcc2e6e2"},
-    {file = "ijson-3.2.0.post0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:183841b8d033ca95457f61fb0719185dc7f51a616070bdf1dcaf03473bed05b2"},
-    {file = "ijson-3.2.0.post0-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:1302dc6490da7d44c3a76a5f0b87d8bec9f918454c6d6e6bf4ed922e47da58bb"},
-    {file = "ijson-3.2.0.post0-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:3b21b1ecd20ed2f918f6f99cdfa68284a416c0f015ffa64b68fa933df1b24d40"},
-    {file = "ijson-3.2.0.post0-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:e97e6e07851cefe7baa41f1ebf5c0899d2d00d94bfef59825752e4c784bebbe8"},
-    {file = "ijson-3.2.0.post0-cp37-cp37m-win32.whl", hash = "sha256:cd0450e76b9c629b7f86e7d5b91b7cc9c281dd719630160a992b19a856f7bdbd"},
-    {file = "ijson-3.2.0.post0-cp37-cp37m-win_amd64.whl", hash = "sha256:bed8dcb7dbfdb98e647ad47676045e0891f610d38095dcfdae468e1e1efb2766"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:a7698bc480df76073067017f73ba4139dbaae20f7a6c9a0c7855b9c5e9a62124"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:2f204f6d4cedeb28326c230a0b046968b5263c234c65a5b18cee22865800fff7"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:9829a17f6f78d7f4d0aeff28c126926a1e5f86828ebb60d6a0acfa0d08457f9f"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f470f3d750e00df86e03254fdcb422d2f726f4fb3a0d8eeee35e81343985e58a"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:eb167ee21d9c413d6b0ab65ec12f3e7ea0122879da8b3569fa1063526f9f03a8"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:84eed88177f6c243c52b280cb094f751de600d98d2221e0dec331920894889ec"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:53f1a13eb99ab514c562869513172135d4b55a914b344e6518ba09ad3ef1e503"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:f6785ba0f65eb64b1ce3b7fcfec101085faf98f4e77b234f14287fd4138ffb25"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:79b94662c2e9d366ab362c2c5858097eae0da100dea0dfd340db09ab28c8d5e8"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-win32.whl", hash = "sha256:5242cb2313ba3ece307b426efa56424ac13cc291c36f292b501d412a98ad0703"},
-    {file = "ijson-3.2.0.post0-cp38-cp38-win_amd64.whl", hash = "sha256:775444a3b647350158d0b3c6c39c88b4a0995643a076cb104bf25042c9aedcf8"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:1d64ffaab1d006a4fa9584a4c723e95cc9609bf6c3365478e250cd0bffaaadf3"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:434e57e7ec5c334ccb0e67bb4d9e60c264dcb2a3843713dbeb12cb19fe42a668"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:158494bfe89ccb32618d0e53b471364080ceb975462ec464d9f9f37d9832b653"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:8f20072376e338af0e51ccecb02335b4e242d55a9218a640f545be7fc64cca99"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b3e8d46c1004afcf2bf513a8fb575ee2ec3d8009a2668566b5926a2dcf7f1a45"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:986a0347fe19e5117a5241276b72add570839e5bcdc7a6dac4b538c5928eeff5"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:535a59d61b9aef6fc2a3d01564c1151e38e5a44b92cd6583cb4e8ccf0f58043f"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:830de03f391f7e72b8587bb178c22d534da31153e9ee4234d54ef82cde5ace5e"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:6def9ac8d73b76cb02e9e9837763f27f71e5e67ec0afae5f1f4cf8f61c39b1ac"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-win32.whl", hash = "sha256:11bb84a53c37e227e733c6dffad2037391cf0b3474bff78596dc4373b02008a0"},
-    {file = "ijson-3.2.0.post0-cp39-cp39-win_amd64.whl", hash = "sha256:f349bee14d0a4a72ba41e1b1cce52af324ebf704f5066c09e3dd04cfa6f545f0"},
-    {file = "ijson-3.2.0.post0-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:5418066666b25b05f2b8ae2698408daa0afa68f07b0b217f2ab24465b7e9cbd9"},
-    {file = "ijson-3.2.0.post0-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3ccc4d4b947549f9c431651c02b95ef571412c78f88ded198612a41d5c5701a0"},
-    {file = "ijson-3.2.0.post0-pp37-pypy37_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:dcec67fc15e5978ad286e8cc2a3f9347076e28e0e01673b5ace18c73da64e3ff"},
-    {file = "ijson-3.2.0.post0-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6ee9537e8a8aa15dd2d0912737aeb6265e781e74f7f7cad8165048fcb5f39230"},
-    {file = "ijson-3.2.0.post0-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:03dfd4c8ed19e704d04b0ad4f34f598dc569fd3f73089f80eed698e7f6069233"},
-    {file = "ijson-3.2.0.post0-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:2d50b2ad9c6c51ca160aa60de7f4dacd1357c38d0e503f51aed95c1c1945ff53"},
-    {file = "ijson-3.2.0.post0-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:51c1db80d7791fb761ad9a6c70f521acd2c4b0e5afa2fe0d813beb2140d16c37"},
-    {file = "ijson-3.2.0.post0-pp38-pypy38_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:13f2939db983327dd0492f6c1c0e77be3f2cbf9b620c92c7547d1d2cd6ef0486"},
-    {file = "ijson-3.2.0.post0-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2f9d449f86f8971c24609e319811f7f3b6b734f0218c4a0e799debe19300d15b"},
-    {file = "ijson-3.2.0.post0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:7e0d1713a9074a7677eb8e43f424b731589d1c689d4676e2f57a5ce59d089e89"},
-    {file = "ijson-3.2.0.post0-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:c8646eb81eec559d7d8b1e51a5087299d06ecab3bc7da54c01f7df94350df135"},
-    {file = "ijson-3.2.0.post0-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:09fe3a53e00c59de33b825ba8d6d39f544a7d7180983cd3d6bd2c3794ae35442"},
-    {file = "ijson-3.2.0.post0-pp39-pypy39_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:93aaec00cbde65c192f15c21f3ee44d2ab0c11eb1a35020b5c4c2676f7fe01d0"},
-    {file = "ijson-3.2.0.post0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:00594ed3ef2218fee8c652d9e7f862fb39f8251b67c6379ef12f7e044bf6bbf3"},
-    {file = "ijson-3.2.0.post0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:1a75cfb34217b41136b714985be645f12269e4345da35d7b48aabd317c82fd10"},
-    {file = "ijson-3.2.0.post0.tar.gz", hash = "sha256:80a5bd7e9923cab200701f67ad2372104328b99ddf249dbbe8834102c852d316"},
+    {file = "ijson-3.2.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:6f827f6961f093e1055a2be0c3137f0e7d667979da455ac9648f72d4a2bb8970"},
+    {file = "ijson-3.2.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b6e51f4497065cd0d09f5e906cd538a8d22609eab716e3c883769acf147ab1b6"},
+    {file = "ijson-3.2.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:f022686c40bff3e340627a5a0c9212718d529e787ada3b76ba546d47a9ecdbbd"},
+    {file = "ijson-3.2.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e4105c15a13fa1dc24ebd3bf2e679fa14dcbfcc48bc39138a0fa3f4ddf6cc09b"},
+    {file = "ijson-3.2.1-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:404423e666f185dfb753ddc92705c84dffdc4cc872aaf825bbe0607893cb5b02"},
+    {file = "ijson-3.2.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:39e71f32830827cf21d0233a814092e5a23668e18f52eca5cac4f670d9df1240"},
+    {file = "ijson-3.2.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:43af7ed5292caa1452747e2b62485b6c0ece4bcbc5bf6f2758abd547e4124a14"},
+    {file = "ijson-3.2.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:e805aa6897a11b0f73f1f6bca078981df8960aeeccf527a214f240409c742bab"},
+    {file = "ijson-3.2.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:5b2df0bd84889e9017a670328fe3e82ec509fd6744c7ac2c99c7ee2300d76afa"},
+    {file = "ijson-3.2.1-cp310-cp310-win32.whl", hash = "sha256:675259c7ea7f51ffaf8cb9e79bf875e28bb09622892943f4f415588fd7ab7bec"},
+    {file = "ijson-3.2.1-cp310-cp310-win_amd64.whl", hash = "sha256:90d4b2eb771a3585c8186820fe50e3282ef62477b865e765a50a8295674abeac"},
+    {file = "ijson-3.2.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:fc581a61e210bf6013c1fa6536566e51127be1cfbd69539b63d8b813206d2fe0"},
+    {file = "ijson-3.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:75cdf7ad4c00a8f5ac94ff27e3b7c1bf5ac463f125bca2be1744c5bc9600db5c"},
+    {file = "ijson-3.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:85a2bf4636ace4d92e7c5d857a1c5694f42407c868953cf2927f18127bcd0d58"},
+    {file = "ijson-3.2.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9fe0cb66e7dd4aa11da5fff60bdf5ee04819a5e6a57acf7ca12c65f7fc009afc"},
+    {file = "ijson-3.2.1-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c6f7957ad38cb714378944032f2c2ee9c6531b5b0b38c5ccd08cedbb0ceddd02"},
+    {file = "ijson-3.2.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13283d264cca8a63e5bad91e82eec39711e95893e7e8d4a419799a8c5f85203a"},
+    {file = "ijson-3.2.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:12c24cde850fe79bc806be0e9fc38b47dd5ac0a223070ccb12e9b695425e2936"},
+    {file = "ijson-3.2.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:2ce8eed838e5a0791cb5948117b5453f2b3b3c28d93d06ee2bbf2c198c47881c"},
+    {file = "ijson-3.2.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:b81c2589f191b0dc741f532be00b4bea617297dd9698431c8053e2d28272d4db"},
+    {file = "ijson-3.2.1-cp311-cp311-win32.whl", hash = "sha256:ba2beac56ac96f728d0f2430e4c667c66819a423d321bb9db9ebdebd803e1b5b"},
+    {file = "ijson-3.2.1-cp311-cp311-win_amd64.whl", hash = "sha256:c71614ed4bbc6a32ff1e42d7ce92a176fb67d658913343792d2c4567aa130817"},
+    {file = "ijson-3.2.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:683fc8b0ea085e75ea34044fdc70649b37367d494f132a2bd1e59d7135054d89"},
+    {file = "ijson-3.2.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:deeaecec2f4e20e8bec20b0a5cdc34daebe7903f2e700f7dcaef68b5925d35ea"},
+    {file = "ijson-3.2.1-cp36-cp36m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:11923ac3188877f19dbb7051f7345202701cc39bf8e5ac44f8ae536c9eca8c82"},
+    {file = "ijson-3.2.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:400deefcdae21e90fc39c1dcfc6ba2df24537e8c65bd57b763ed5256b73ba64d"},
+    {file = "ijson-3.2.1-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:56bc4bad53770710a3a91944fe640fdeb269987a14352b74ebbad2aa55801c00"},
+    {file = "ijson-3.2.1-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:f5a179523e085126844c6161aabcd193dbb5747bd01fadb68e92abf048f32ec9"},
+    {file = "ijson-3.2.1-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:ee24655986e4415fbb7a0cf51445fff3072ceac0e219f4bbbd5c53535a3c5680"},
+    {file = "ijson-3.2.1-cp36-cp36m-win32.whl", hash = "sha256:4a5c672b0540005c1bb0bba97aa559a87a2e4ee409fc68e2f5ba5b30f009ac99"},
+    {file = "ijson-3.2.1-cp36-cp36m-win_amd64.whl", hash = "sha256:cfaf1d89b0e122e69c87a15db6d6f44feb9db96d2af7fe88cdc464177a257b5d"},
+    {file = "ijson-3.2.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:1cbd052eb67c1b3611f25974ba967886e89391faaf55afec93808c19f06ca612"},
+    {file = "ijson-3.2.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f13ffc491886e5d7bde7d68712d168bce0141b2a918db1164bc8599c0123e293"},
+    {file = "ijson-3.2.1-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bc4c4fc6bafc777f8422fe36edb1cbd72a13cb29695893a064c9c95776a4bdf9"},
+    {file = "ijson-3.2.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a42fcb2bf9748c26f004690b2feb6e13e4875bb7c9d83535f887c21e0a982a7c"},
+    {file = "ijson-3.2.1-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:0c92f7bc2f3a947c2ba7f7aa48382c36079f8259c930e81d9164341f9b853c45"},
+    {file = "ijson-3.2.1-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:fd497042db562379339660e787bc8679ed3abaa740768d39bc3746e769e7c7a5"},
+    {file = "ijson-3.2.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:7d61c7cd8ddd75dcef818ff5a111a31b902a6a0e410ee0c2b2ecaa6dac92658a"},
+    {file = "ijson-3.2.1-cp37-cp37m-win32.whl", hash = "sha256:36caf624d263fc40e7e805d759d09ea368d8cf497aecb3241ac2f0a286ad8eca"},
+    {file = "ijson-3.2.1-cp37-cp37m-win_amd64.whl", hash = "sha256:32f9ed25ff80942e433119600bca13b86a8f9b8b0966edbc1d91a48ccbdd4d54"},
+    {file = "ijson-3.2.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:e89bbd747140eac3a3c9e7e5835b90d85c4a02763fc5134861bfc1ea03b66ae7"},
+    {file = "ijson-3.2.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:d69b4b1d509de36ec42a0e4af30ede39fb754e4039b2928ef7282ebc2125ffdd"},
+    {file = "ijson-3.2.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e7feb0771f50deabe6ce85b210fa9e005843d3d3c60fb3315d69e1f9d0d75e0c"},
+    {file = "ijson-3.2.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5fd8148a363888054ff06eaaa1103f2f98720ab39666084a214e4fedfc13cf64"},
+    {file = "ijson-3.2.1-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:598638dcc5141e9ae269903901877103f5362e0db4443e34721df8f8d34577b4"},
+    {file = "ijson-3.2.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e979190b7d0fabca20d6b7224ab1c1aa461ad1ab72ba94f1bb1e5894cd59f342"},
+    {file = "ijson-3.2.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:bc810eb80b4f486c7957201ba2a53f53ddc9b3233af67e4359e29371bf04883b"},
+    {file = "ijson-3.2.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:26e758584611dfe826dd18ffd94dc0d8a062ce56e41674ad3bfa371c7b78c4b5"},
+    {file = "ijson-3.2.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:24e9ae5b35b85ea094b6c36495bc856089254aed6a48bada8d7eec5a04f74439"},
+    {file = "ijson-3.2.1-cp38-cp38-win32.whl", hash = "sha256:4b5dc7b5b4b8cb3087d188f37911cd67e26672d33d3571e73440de3f0a86f7e6"},
+    {file = "ijson-3.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:1af94ff40609270bbb3eac47e072582bb578f5023fac8408cccd80fe5892d221"},
+    {file = "ijson-3.2.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:2dda67affceebc52c8bc5fe72c3a4a1e338e4d4b0497dbac5089c2d3862df214"},
+    {file = "ijson-3.2.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:bd780303ddfedc8d57cdb9f2d53a8cea2f2f4a6fb857bf8fe5a0c3ab1d4ca901"},
+    {file = "ijson-3.2.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4fbab6af1bab88a8e46beda08cf44610eed0adb8d157a1a60b4bb6c3a121c6de"},
+    {file = "ijson-3.2.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c97a07988a1e0ce2bc8e8a62eb5f25195a3bd58a939ac353cbc6018a548cc08d"},
+    {file = "ijson-3.2.1-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a65671a6826ae723837143914c823ad7bcc0d1a3e38d87c71df897a2556fb48f"},
+    {file = "ijson-3.2.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a1806372008bbed9ee92db5747e38c047fa1c4ee89cb2dd5daaa57feb46ce50a"},
+    {file = "ijson-3.2.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:91e5a8e96f78a59e2520078c227a4fec5bf91c13adeded9e33fb13981cb823c3"},
+    {file = "ijson-3.2.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:1f820fce8ef093718f2319ff6f1322390664659b783775919dadccb1b470153d"},
+    {file = "ijson-3.2.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:bca3e8c91a1076a20620dbaa6a2848772b0e8a4055e86d42d3fa39221b53ed1a"},
+    {file = "ijson-3.2.1-cp39-cp39-win32.whl", hash = "sha256:de87f137b7438d43840f4339a37d4e6a58c987f4bb2a70609969f854f8ae20f3"},
+    {file = "ijson-3.2.1-cp39-cp39-win_amd64.whl", hash = "sha256:0caebb8350b47266a58b766ec08e1de441d6d160702c428b5cf7504d93c832c4"},
+    {file = "ijson-3.2.1-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:37389785c1abd27fcc24800fcfa9a6b1022743413e4056507fd32356b623ff33"},
+    {file = "ijson-3.2.1-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4b364b82231d51cbeae52468c3b27e8a042e544ab764c8f3975e912cf010603f"},
+    {file = "ijson-3.2.1-pp37-pypy37_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0a5999d0ec28a8ec47cf20c736fd4f895dc077bf6441bf237b00b074315a295d"},
+    {file = "ijson-3.2.1-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8bd481857a39305517fb6f1313d558c2dc4e78c9e9384cc5bc1c3e28f1afbedf"},
+    {file = "ijson-3.2.1-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:545f62f12f89350d4d73f2a779cb269198ae578fac080085a1927148b803e602"},
+    {file = "ijson-3.2.1-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:4d5622505d01c2f3d7b9638c1eb8c747eb550936b505225893704289ff28576f"},
+    {file = "ijson-3.2.1-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:20293bb36423b129fad3753858ccf7b2ccb5b2c0d3759efe810d0b9d79633a7e"},
+    {file = "ijson-3.2.1-pp38-pypy38_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7cd8a4921b852fd2cb5b0c985540c97ff6893139a57fe7121d510ec5d1c0ca44"},
+    {file = "ijson-3.2.1-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc902ff1ae1efed7d526294d7a9dd3df66d29b2cdc05fb5479838fef1327a534"},
+    {file = "ijson-3.2.1-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:2925a7978d8170146a9cb49a15a982b71fbbf21980bf2e16cd90c528545b7c02"},
+    {file = "ijson-3.2.1-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:c21c6509f6944939399f3630c5dc424d30d71d375f6cd58f9af56158fdf7251c"},
+    {file = "ijson-3.2.1-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f5729fc7648bc972d70922d7dad15459cca3a9e5ed0328eb9ae3ffa004066194"},
+    {file = "ijson-3.2.1-pp39-pypy39_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:805a2d5ed5a15d60327bc9347f2d4125ab621fb18071db98b1c598f1ee99e8f1"},
+    {file = "ijson-3.2.1-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d0220a4b6c63f44589e429157174e3f4b8d1e534d5fb82bdb43a7f8dd77ae4b"},
+    {file = "ijson-3.2.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:271d9b7c083f65c58ff0afd9dbb5d2f3d445f734632aebfef4a261b0a337abdb"},
+    {file = "ijson-3.2.1.tar.gz", hash = "sha256:8574bf19f31fab870488769ad919a80f130825236ac8bde9a733f69c2961d7a7"},
 ]
 
 [[package]]
@@ -1583,13 +1586,13 @@ files = [
 
 [[package]]
 name = "phonenumbers"
-version = "8.13.13"
+version = "8.13.14"
 description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers."
 optional = false
 python-versions = "*"
 files = [
-    {file = "phonenumbers-8.13.13-py2.py3-none-any.whl", hash = "sha256:55657adb607484aba6d56270b8a1f9b302f35496076e6c02051d06ed366374d9"},
-    {file = "phonenumbers-8.13.13.tar.gz", hash = "sha256:4bdf8c989aff0cdb105aef170ad2c21f14b4537bcb32cf349f1f710df992a40a"},
+    {file = "phonenumbers-8.13.14-py2.py3-none-any.whl", hash = "sha256:a4b20b6ba7dd402728f5cc8e86e1f29b1a873af45f5381dbee7e3083af497ff6"},
+    {file = "phonenumbers-8.13.14.tar.gz", hash = "sha256:5fa952b4abf9fccdaf1f130d96114a520c48890d4091b50a064e22c0fdc12dec"},
 ]
 
 [[package]]
@@ -2242,28 +2245,28 @@ jupyter = ["ipywidgets (>=7.5.1,<9)"]
 
 [[package]]
 name = "ruff"
-version = "0.0.265"
+version = "0.0.272"
 description = "An extremely fast Python linter, written in Rust."
 optional = false
 python-versions = ">=3.7"
 files = [
-    {file = "ruff-0.0.265-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:30ddfe22de6ce4eb1260408f4480bbbce998f954dbf470228a21a9b2c45955e4"},
-    {file = "ruff-0.0.265-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:a11bd0889e88d3342e7bc514554bb4461bf6cc30ec115821c2425cfaac0b1b6a"},
-    {file = "ruff-0.0.265-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:2a9b38bdb40a998cbc677db55b6225a6c4fadcf8819eb30695e1b8470942426b"},
-    {file = "ruff-0.0.265-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a8b44a245b60512403a6a03a5b5212da274d33862225c5eed3bcf12037eb19bb"},
-    {file = "ruff-0.0.265-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b279fa55ea175ef953208a6d8bfbcdcffac1c39b38cdb8c2bfafe9222add70bb"},
-    {file = "ruff-0.0.265-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:5028950f7af9b119d43d91b215d5044976e43b96a0d1458d193ef0dd3c587bf8"},
-    {file = "ruff-0.0.265-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:4057eb539a1d88eb84e9f6a36e0a999e0f261ed850ae5d5817e68968e7b89ed9"},
-    {file = "ruff-0.0.265-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d586e69ab5cbf521a1910b733412a5735936f6a610d805b89d35b6647e2a66aa"},
-    {file = "ruff-0.0.265-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aa17b13cd3f29fc57d06bf34c31f21d043735cc9a681203d634549b0e41047d1"},
-    {file = "ruff-0.0.265-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:9ac13b11d9ad3001de9d637974ec5402a67cefdf9fffc3929ab44c2fcbb850a1"},
-    {file = "ruff-0.0.265-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:62a9578b48cfd292c64ea3d28681dc16b1aa7445b7a7709a2884510fc0822118"},
-    {file = "ruff-0.0.265-py3-none-musllinux_1_2_i686.whl", hash = "sha256:d0f9967f84da42d28e3d9d9354cc1575f96ed69e6e40a7d4b780a7a0418d9409"},
-    {file = "ruff-0.0.265-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:1d5a8de2fbaf91ea5699451a06f4074e7a312accfa774ad9327cde3e4fda2081"},
-    {file = "ruff-0.0.265-py3-none-win32.whl", hash = "sha256:9e9db5ccb810742d621f93272e3cc23b5f277d8d00c4a79668835d26ccbe48dd"},
-    {file = "ruff-0.0.265-py3-none-win_amd64.whl", hash = "sha256:f54facf286103006171a00ce20388d88ed1d6732db3b49c11feb9bf3d46f90e9"},
-    {file = "ruff-0.0.265-py3-none-win_arm64.whl", hash = "sha256:c78470656e33d32ddc54e8482b1b0fc6de58f1195586731e5ff1405d74421499"},
-    {file = "ruff-0.0.265.tar.gz", hash = "sha256:53c17f0dab19ddc22b254b087d1381b601b155acfa8feed514f0d6a413d0ab3a"},
+    {file = "ruff-0.0.272-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:ae9b57546e118660175d45d264b87e9b4c19405c75b587b6e4d21e6a17bf4fdf"},
+    {file = "ruff-0.0.272-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:1609b864a8d7ee75a8c07578bdea0a7db75a144404e75ef3162e0042bfdc100d"},
+    {file = "ruff-0.0.272-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ee76b4f05fcfff37bd6ac209d1370520d509ea70b5a637bdf0a04d0c99e13dff"},
+    {file = "ruff-0.0.272-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:48eccf225615e106341a641f826b15224b8a4240b84269ead62f0afd6d7e2d95"},
+    {file = "ruff-0.0.272-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:677284430ac539bb23421a2b431b4ebc588097ef3ef918d0e0a8d8ed31fea216"},
+    {file = "ruff-0.0.272-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:9c4bfb75456a8e1efe14c52fcefb89cfb8f2a0d31ed8d804b82c6cf2dc29c42c"},
+    {file = "ruff-0.0.272-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:86bc788245361a8148ff98667da938a01e1606b28a45e50ac977b09d3ad2c538"},
+    {file = "ruff-0.0.272-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:27b2ea68d2aa69fff1b20b67636b1e3e22a6a39e476c880da1282c3e4bf6ee5a"},
+    {file = "ruff-0.0.272-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bd2bbe337a3f84958f796c77820d55ac2db1e6753f39d1d1baed44e07f13f96d"},
+    {file = "ruff-0.0.272-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:d5a208f8ef0e51d4746930589f54f9f92f84bb69a7d15b1de34ce80a7681bc00"},
+    {file = "ruff-0.0.272-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:905ff8f3d6206ad56fcd70674453527b9011c8b0dc73ead27618426feff6908e"},
+    {file = "ruff-0.0.272-py3-none-musllinux_1_2_i686.whl", hash = "sha256:19643d448f76b1eb8a764719072e9c885968971bfba872e14e7257e08bc2f2b7"},
+    {file = "ruff-0.0.272-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:691d72a00a99707a4e0b2846690961157aef7b17b6b884f6b4420a9f25cd39b5"},
+    {file = "ruff-0.0.272-py3-none-win32.whl", hash = "sha256:dc406e5d756d932da95f3af082814d2467943631a587339ee65e5a4f4fbe83eb"},
+    {file = "ruff-0.0.272-py3-none-win_amd64.whl", hash = "sha256:a37ec80e238ead2969b746d7d1b6b0d31aa799498e9ba4281ab505b93e1f4b28"},
+    {file = "ruff-0.0.272-py3-none-win_arm64.whl", hash = "sha256:06b8ee4eb8711ab119db51028dd9f5384b44728c23586424fd6e241a5b9c4a3b"},
+    {file = "ruff-0.0.272.tar.gz", hash = "sha256:273a01dc8c3c4fd4c2af7ea7a67c8d39bb09bce466e640dd170034da75d14cab"},
 ]
 
 [[package]]
@@ -3291,4 +3294,4 @@ user-search = ["pyicu"]
 [metadata]
 lock-version = "2.0"
 python-versions = "^3.7.1"
-content-hash = "7ad11e62a675e09444cf33ca2de3216fc4efc5874a2575e54d95d577a52439d3"
+content-hash = "090924370b17fd265407b5a3f9cbc00997308f575b455399b39a48e3ca1a5a8e"
diff --git a/pyproject.toml b/pyproject.toml
index 3626be9797..90812de019 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -311,7 +311,7 @@ all = [
 # We pin black so that our tests don't start failing on new releases.
 isort = ">=5.10.1"
 black = ">=22.3.0"
-ruff = "0.0.265"
+ruff = "0.0.272"
 
 # Typechecking
 lxml-stubs = ">=0.4.0"
diff --git a/rust/src/push/base_rules.rs b/rust/src/push/base_rules.rs
index 9d6c304d92..7eea9313f0 100644
--- a/rust/src/push/base_rules.rs
+++ b/rust/src/push/base_rules.rs
@@ -142,7 +142,7 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[
         default_enabled: true,
     },
     PushRule {
-        rule_id: Cow::Borrowed("global/override/.m.is_user_mention"),
+        rule_id: Cow::Borrowed("global/override/.m.rule.is_user_mention"),
         priority_class: 5,
         conditions: Cow::Borrowed(&[Condition::Known(
             KnownCondition::ExactEventPropertyContainsType(EventPropertyIsTypeCondition {
@@ -163,7 +163,7 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[
         default_enabled: true,
     },
     PushRule {
-        rule_id: Cow::Borrowed("global/override/.m.is_room_mention"),
+        rule_id: Cow::Borrowed("global/override/.m.rule.is_room_mention"),
         priority_class: 5,
         conditions: Cow::Borrowed(&[
             Condition::Known(KnownCondition::EventPropertyIs(EventPropertyIsCondition {
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh
index 131f26234e..24b83cfeb6 100755
--- a/scripts-dev/complement.sh
+++ b/scripts-dev/complement.sh
@@ -246,10 +246,6 @@ else
   else
     export PASS_SYNAPSE_COMPLEMENT_DATABASE=sqlite
   fi
-
-  # The tests for importing historical messages (MSC2716)
-  # only pass with monoliths, currently.
-  test_tags="$test_tags,msc2716"
 fi
 
 if [[ -n "$ASYNCIO_REACTOR" ]]; then
diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py
index b1d5e2e616..63f0b25ddd 100755
--- a/scripts-dev/federation_client.py
+++ b/scripts-dev/federation_client.py
@@ -136,11 +136,11 @@ def request(
         authorization_headers.append(header)
         print("Authorization: %s" % header, file=sys.stderr)
 
-    dest = "matrix://%s%s" % (destination, path)
+    dest = "matrix-federation://%s%s" % (destination, path)
     print("Requesting %s" % dest, file=sys.stderr)
 
     s = requests.Session()
-    s.mount("matrix://", MatrixConnectionAdapter())
+    s.mount("matrix-federation://", MatrixConnectionAdapter())
 
     headers: Dict[str, str] = {
         "Authorization": authorization_headers[0],
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index faf0770c66..dc32553d0c 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -123,10 +123,6 @@ class EventTypes:
     SpaceChild: Final = "m.space.child"
     SpaceParent: Final = "m.space.parent"
 
-    MSC2716_INSERTION: Final = "org.matrix.msc2716.insertion"
-    MSC2716_BATCH: Final = "org.matrix.msc2716.batch"
-    MSC2716_MARKER: Final = "org.matrix.msc2716.marker"
-
     Reaction: Final = "m.reaction"
 
 
@@ -222,16 +218,6 @@ class EventContentFields:
     # Used in m.room.guest_access events.
     GUEST_ACCESS: Final = "guest_access"
 
-    # Used on normal messages to indicate they were historically imported after the fact
-    MSC2716_HISTORICAL: Final = "org.matrix.msc2716.historical"
-    # For "insertion" events to indicate what the next batch ID should be in
-    # order to connect to it
-    MSC2716_NEXT_BATCH_ID: Final = "next_batch_id"
-    # Used on "batch" events to indicate which insertion event it connects to
-    MSC2716_BATCH_ID: Final = "batch_id"
-    # For "marker" events
-    MSC2716_INSERTION_EVENT_REFERENCE: Final = "insertion_event_reference"
-
     # The authorising user for joining a restricted room.
     AUTHORISING_USER: Final = "join_authorised_via_users_server"
 
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index c5c71e242f..25c105a4c8 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -91,11 +91,6 @@ class RoomVersion:
     # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
     # m.room.membership event with membership 'knock'.
     msc2403_knocking: bool
-    # MSC2716: Adds m.room.power_levels -> content.historical field to control
-    # whether "insertion", "chunk", "marker" events can be sent
-    msc2716_historical: bool
-    # MSC2716: Adds support for redacting "insertion", "chunk", and "marker" events
-    msc2716_redactions: bool
     # MSC3389: Protect relation information from redaction.
     msc3389_relation_redactions: bool
     # MSC3787: Adds support for a `knock_restricted` join rule, mixing concepts of
@@ -130,8 +125,6 @@ class RoomVersions:
         msc3083_join_rules=False,
         msc3375_redaction_rules=False,
         msc2403_knocking=False,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -153,8 +146,6 @@ class RoomVersions:
         msc3083_join_rules=False,
         msc3375_redaction_rules=False,
         msc2403_knocking=False,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -176,8 +167,6 @@ class RoomVersions:
         msc3083_join_rules=False,
         msc3375_redaction_rules=False,
         msc2403_knocking=False,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -199,8 +188,6 @@ class RoomVersions:
         msc3083_join_rules=False,
         msc3375_redaction_rules=False,
         msc2403_knocking=False,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -222,8 +209,6 @@ class RoomVersions:
         msc3083_join_rules=False,
         msc3375_redaction_rules=False,
         msc2403_knocking=False,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -245,8 +230,6 @@ class RoomVersions:
         msc3083_join_rules=False,
         msc3375_redaction_rules=False,
         msc2403_knocking=False,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -268,8 +251,6 @@ class RoomVersions:
         msc3083_join_rules=False,
         msc3375_redaction_rules=False,
         msc2403_knocking=False,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -291,8 +272,6 @@ class RoomVersions:
         msc3083_join_rules=False,
         msc3375_redaction_rules=False,
         msc2403_knocking=True,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -314,8 +293,6 @@ class RoomVersions:
         msc3083_join_rules=True,
         msc3375_redaction_rules=False,
         msc2403_knocking=True,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -337,8 +314,6 @@ class RoomVersions:
         msc3083_join_rules=True,
         msc3375_redaction_rules=True,
         msc2403_knocking=True,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -360,8 +335,6 @@ class RoomVersions:
         msc3083_join_rules=True,
         msc3375_redaction_rules=True,
         msc2403_knocking=True,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=True,
         msc3667_int_only_power_levels=False,
@@ -383,8 +356,6 @@ class RoomVersions:
         msc3083_join_rules=True,
         msc3375_redaction_rules=True,
         msc2403_knocking=True,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=False,
         msc3667_int_only_power_levels=False,
@@ -406,8 +377,6 @@ class RoomVersions:
         msc3083_join_rules=True,
         msc3375_redaction_rules=True,
         msc2403_knocking=True,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=True,
         msc3667_int_only_power_levels=True,
@@ -415,29 +384,6 @@ class RoomVersions:
         msc3931_push_features=(),
         msc3989_redaction_rules=False,
     )
-    MSC2716v4 = RoomVersion(
-        "org.matrix.msc2716v4",
-        RoomDisposition.UNSTABLE,
-        EventFormatVersions.ROOM_V4_PLUS,
-        StateResolutionVersions.V2,
-        enforce_key_validity=True,
-        special_case_aliases_auth=False,
-        strict_canonicaljson=True,
-        limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=False,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=True,
-        msc2716_historical=True,
-        msc2716_redactions=True,
-        msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
-        msc3931_push_features=(),
-        msc3989_redaction_rules=False,
-    )
     MSC1767v10 = RoomVersion(
         # MSC1767 (Extensible Events) based on room version "10"
         "org.matrix.msc1767.10",
@@ -453,8 +399,6 @@ class RoomVersions:
         msc3083_join_rules=True,
         msc3375_redaction_rules=True,
         msc2403_knocking=True,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=True,
         msc3667_int_only_power_levels=True,
@@ -476,8 +420,6 @@ class RoomVersions:
         msc3083_join_rules=True,
         msc3375_redaction_rules=True,
         msc2403_knocking=True,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=True,
         msc3667_int_only_power_levels=True,
@@ -500,8 +442,6 @@ class RoomVersions:
         msc3083_join_rules=True,
         msc3375_redaction_rules=True,
         msc2403_knocking=True,
-        msc2716_historical=False,
-        msc2716_redactions=False,
         msc3389_relation_redactions=False,
         msc3787_knock_restricted_join_rule=True,
         msc3667_int_only_power_levels=True,
@@ -526,7 +466,6 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
         RoomVersions.V9,
         RoomVersions.MSC3787,
         RoomVersions.V10,
-        RoomVersions.MSC2716v4,
         RoomVersions.MSC3989,
         RoomVersions.MSC3820opt2,
     )
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 909ebccf78..7406c3948c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -83,7 +83,6 @@ from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
 from synapse.storage.databases.main.registration import RegistrationWorkerStore
 from synapse.storage.databases.main.relations import RelationsWorkerStore
 from synapse.storage.databases.main.room import RoomWorkerStore
-from synapse.storage.databases.main.room_batch import RoomBatchStore
 from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
 from synapse.storage.databases.main.search import SearchStore
 from synapse.storage.databases.main.session import SessionStore
@@ -120,7 +119,6 @@ class GenericWorkerStore(
     # the races it creates aren't too bad.
     KeyStore,
     RoomWorkerStore,
-    RoomBatchStore,
     DirectoryWorkerStore,
     PushRulesWorkerStore,
     ApplicationServiceTransactionWorkerStore,
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 1d5b5ded45..8e0f5356b4 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -247,9 +247,6 @@ class ExperimentalConfig(Config):
         # MSC3026 (busy presence state)
         self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False)
 
-        # MSC2716 (importing historical messages)
-        self.msc2716_enabled: bool = experimental.get("msc2716_enabled", False)
-
         # MSC3244 (room version capabilities)
         self.msc3244_enabled: bool = experimental.get("msc3244_enabled", True)
 
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index b4b43ec4d7..3aaf53dfbd 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -339,13 +339,6 @@ def check_state_dependent_auth_rules(
     if event.type == EventTypes.Redaction:
         check_redaction(event.room_version, event, auth_dict)
 
-    if (
-        event.type == EventTypes.MSC2716_INSERTION
-        or event.type == EventTypes.MSC2716_BATCH
-        or event.type == EventTypes.MSC2716_MARKER
-    ):
-        check_historical(event.room_version, event, auth_dict)
-
     logger.debug("Allowing! %s", event)
 
 
@@ -365,7 +358,6 @@ LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = {
     RoomVersions.V9,
     RoomVersions.MSC3787,
     RoomVersions.V10,
-    RoomVersions.MSC2716v4,
     RoomVersions.MSC1767v10,
 }
 
@@ -823,38 +815,6 @@ def check_redaction(
     raise AuthError(403, "You don't have permission to redact events")
 
 
-def check_historical(
-    room_version_obj: RoomVersion,
-    event: "EventBase",
-    auth_events: StateMap["EventBase"],
-) -> None:
-    """Check whether the event sender is allowed to send historical related
-    events like "insertion", "batch", and "marker".
-
-    Returns:
-        None
-
-    Raises:
-        AuthError if the event sender is not allowed to send historical related events
-        ("insertion", "batch", and "marker").
-    """
-    # Ignore the auth checks in room versions that do not support historical
-    # events
-    if not room_version_obj.msc2716_historical:
-        return
-
-    user_level = get_user_power_level(event.user_id, auth_events)
-
-    historical_level = get_named_level(auth_events, "historical", 100)
-
-    if user_level < historical_level:
-        raise UnstableSpecAuthError(
-            403,
-            'You don\'t have permission to send send historical related events ("insertion", "batch", and "marker")',
-            errcode=Codes.INSUFFICIENT_POWER,
-        )
-
-
 def _check_power_levels(
     room_version_obj: RoomVersion,
     event: "EventBase",
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index de7e5be42b..75b62adb33 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -198,7 +198,6 @@ class _EventInternalMetadata:
     soft_failed: DictProperty[bool] = DictProperty("soft_failed")
     proactively_send: DictProperty[bool] = DictProperty("proactively_send")
     redacted: DictProperty[bool] = DictProperty("redacted")
-    historical: DictProperty[bool] = DictProperty("historical")
 
     txn_id: DictProperty[str] = DictProperty("txn_id")
     """The transaction ID, if it was set when the event was created."""
@@ -288,14 +287,6 @@ class _EventInternalMetadata:
         """
         return self._dict.get("redacted", False)
 
-    def is_historical(self) -> bool:
-        """Whether this is a historical message.
-        This is used by the batchsend historical message endpoint and
-        is needed to and mark the event as backfilled and skip some checks
-        like push notifications.
-        """
-        return self._dict.get("historical", False)
-
     def is_notifiable(self) -> bool:
         """Whether this event can trigger a push notification"""
         return not self.is_outlier() or self.is_out_of_band_membership()
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index e7e8225b8e..a43498ed4d 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from abc import ABC, abstractmethod
-from typing import TYPE_CHECKING, List, Optional, Tuple
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
 
 import attr
 from immutabledict import immutabledict
@@ -107,33 +107,32 @@ class EventContext(UnpersistedEventContextBase):
         state_delta_due_to_event: If `state_group` and `state_group_before_event` are not None
             then this is the delta of the state between the two groups.
 
-        prev_group: If it is known, ``state_group``'s prev_group. Note that this being
-            None does not necessarily mean that ``state_group`` does not have
-            a prev_group!
+        state_group_deltas: If not empty, this is a dict collecting a mapping of the state
+            difference between state groups.
 
-            If the event is a state event, this is normally the same as
-            ``state_group_before_event``.
+            The keys are a tuple of two integers: the initial group and final state group.
+            The corresponding value is a state map representing the state delta between
+            these state groups.
 
-            If ``state_group`` is None (ie, the event is an outlier), ``prev_group``
-            will always also be ``None``.
+            The dictionary is expected to have at most two entries with state groups of:
 
-            Note that this *not* (necessarily) the state group associated with
-            ``_prev_state_ids``.
+            1. The state group before the event and after the event.
+            2. The state group preceding the state group before the event and the
+               state group before the event.
 
-        delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group``
-            and ``state_group``.
+            This information is collected and stored as part of an optimization for persisting
+            events.
 
         partial_state: if True, we may be storing this event with a temporary,
             incomplete state.
     """
 
     _storage: "StorageControllers"
+    state_group_deltas: Dict[Tuple[int, int], StateMap[str]]
     rejected: Optional[str] = None
     _state_group: Optional[int] = None
     state_group_before_event: Optional[int] = None
     _state_delta_due_to_event: Optional[StateMap[str]] = None
-    prev_group: Optional[int] = None
-    delta_ids: Optional[StateMap[str]] = None
     app_service: Optional[ApplicationService] = None
 
     partial_state: bool = False
@@ -145,16 +144,14 @@ class EventContext(UnpersistedEventContextBase):
         state_group_before_event: Optional[int],
         state_delta_due_to_event: Optional[StateMap[str]],
         partial_state: bool,
-        prev_group: Optional[int] = None,
-        delta_ids: Optional[StateMap[str]] = None,
+        state_group_deltas: Dict[Tuple[int, int], StateMap[str]],
     ) -> "EventContext":
         return EventContext(
             storage=storage,
             state_group=state_group,
             state_group_before_event=state_group_before_event,
             state_delta_due_to_event=state_delta_due_to_event,
-            prev_group=prev_group,
-            delta_ids=delta_ids,
+            state_group_deltas=state_group_deltas,
             partial_state=partial_state,
         )
 
@@ -163,7 +160,7 @@ class EventContext(UnpersistedEventContextBase):
         storage: "StorageControllers",
     ) -> "EventContext":
         """Return an EventContext instance suitable for persisting an outlier event"""
-        return EventContext(storage=storage)
+        return EventContext(storage=storage, state_group_deltas={})
 
     async def persist(self, event: EventBase) -> "EventContext":
         return self
@@ -183,13 +180,15 @@ class EventContext(UnpersistedEventContextBase):
             "state_group": self._state_group,
             "state_group_before_event": self.state_group_before_event,
             "rejected": self.rejected,
-            "prev_group": self.prev_group,
+            "state_group_deltas": _encode_state_group_delta(self.state_group_deltas),
             "state_delta_due_to_event": _encode_state_dict(
                 self._state_delta_due_to_event
             ),
-            "delta_ids": _encode_state_dict(self.delta_ids),
             "app_service_id": self.app_service.id if self.app_service else None,
             "partial_state": self.partial_state,
+            # add dummy delta_ids and prev_group for backwards compatibility
+            "delta_ids": None,
+            "prev_group": None,
         }
 
     @staticmethod
@@ -204,17 +203,24 @@ class EventContext(UnpersistedEventContextBase):
         Returns:
             The event context.
         """
+        # workaround for backwards/forwards compatibility: if the input doesn't have a value
+        # for "state_group_deltas" just assign an empty dict
+        state_group_deltas = input.get("state_group_deltas", None)
+        if state_group_deltas:
+            state_group_deltas = _decode_state_group_delta(state_group_deltas)
+        else:
+            state_group_deltas = {}
+
         context = EventContext(
             # We use the state_group and prev_state_id stuff to pull the
             # current_state_ids out of the DB and construct prev_state_ids.
             storage=storage,
             state_group=input["state_group"],
             state_group_before_event=input["state_group_before_event"],
-            prev_group=input["prev_group"],
+            state_group_deltas=state_group_deltas,
             state_delta_due_to_event=_decode_state_dict(
                 input["state_delta_due_to_event"]
             ),
-            delta_ids=_decode_state_dict(input["delta_ids"]),
             rejected=input["rejected"],
             partial_state=input.get("partial_state", False),
         )
@@ -349,7 +355,7 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
     _storage: "StorageControllers"
     state_group_before_event: Optional[int]
     state_group_after_event: Optional[int]
-    state_delta_due_to_event: Optional[dict]
+    state_delta_due_to_event: Optional[StateMap[str]]
     prev_group_for_state_group_before_event: Optional[int]
     delta_ids_to_state_group_before_event: Optional[StateMap[str]]
     partial_state: bool
@@ -380,26 +386,16 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
 
         events_and_persisted_context = []
         for event, unpersisted_context in amended_events_and_context:
-            if event.is_state():
-                context = EventContext(
-                    storage=unpersisted_context._storage,
-                    state_group=unpersisted_context.state_group_after_event,
-                    state_group_before_event=unpersisted_context.state_group_before_event,
-                    state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
-                    partial_state=unpersisted_context.partial_state,
-                    prev_group=unpersisted_context.state_group_before_event,
-                    delta_ids=unpersisted_context.state_delta_due_to_event,
-                )
-            else:
-                context = EventContext(
-                    storage=unpersisted_context._storage,
-                    state_group=unpersisted_context.state_group_after_event,
-                    state_group_before_event=unpersisted_context.state_group_before_event,
-                    state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
-                    partial_state=unpersisted_context.partial_state,
-                    prev_group=unpersisted_context.prev_group_for_state_group_before_event,
-                    delta_ids=unpersisted_context.delta_ids_to_state_group_before_event,
-                )
+            state_group_deltas = unpersisted_context._build_state_group_deltas()
+
+            context = EventContext(
+                storage=unpersisted_context._storage,
+                state_group=unpersisted_context.state_group_after_event,
+                state_group_before_event=unpersisted_context.state_group_before_event,
+                state_delta_due_to_event=unpersisted_context.state_delta_due_to_event,
+                partial_state=unpersisted_context.partial_state,
+                state_group_deltas=state_group_deltas,
+            )
             events_and_persisted_context.append((event, context))
         return events_and_persisted_context
 
@@ -452,11 +448,11 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
 
         # if the event isn't a state event the state group doesn't change
         if not self.state_delta_due_to_event:
-            state_group_after_event = self.state_group_before_event
+            self.state_group_after_event = self.state_group_before_event
 
         # otherwise if it is a state event we need to get a state group for it
         else:
-            state_group_after_event = await self._storage.state.store_state_group(
+            self.state_group_after_event = await self._storage.state.store_state_group(
                 event.event_id,
                 event.room_id,
                 prev_group=self.state_group_before_event,
@@ -464,16 +460,81 @@ class UnpersistedEventContext(UnpersistedEventContextBase):
                 current_state_ids=None,
             )
 
+        state_group_deltas = self._build_state_group_deltas()
+
         return EventContext.with_state(
             storage=self._storage,
-            state_group=state_group_after_event,
+            state_group=self.state_group_after_event,
             state_group_before_event=self.state_group_before_event,
             state_delta_due_to_event=self.state_delta_due_to_event,
+            state_group_deltas=state_group_deltas,
             partial_state=self.partial_state,
-            prev_group=self.state_group_before_event,
-            delta_ids=self.state_delta_due_to_event,
         )
 
+    def _build_state_group_deltas(self) -> Dict[Tuple[int, int], StateMap]:
+        """
+        Collect deltas between the state groups associated with this context
+        """
+        state_group_deltas = {}
+
+        # if we know the state group before the event and after the event, add them and the
+        # state delta between them to state_group_deltas
+        if self.state_group_before_event and self.state_group_after_event:
+            # if we have the state groups we should have the delta
+            assert self.state_delta_due_to_event is not None
+            state_group_deltas[
+                (
+                    self.state_group_before_event,
+                    self.state_group_after_event,
+                )
+            ] = self.state_delta_due_to_event
+
+        # the state group before the event may also have a state group which precedes it, if
+        # we have that and the state group before the event, add them and the state
+        # delta between them to state_group_deltas
+        if (
+            self.prev_group_for_state_group_before_event
+            and self.state_group_before_event
+        ):
+            # if we have both state groups we should have the delta between them
+            assert self.delta_ids_to_state_group_before_event is not None
+            state_group_deltas[
+                (
+                    self.prev_group_for_state_group_before_event,
+                    self.state_group_before_event,
+                )
+            ] = self.delta_ids_to_state_group_before_event
+
+        return state_group_deltas
+
+
+def _encode_state_group_delta(
+    state_group_delta: Dict[Tuple[int, int], StateMap[str]]
+) -> List[Tuple[int, int, Optional[List[Tuple[str, str, str]]]]]:
+    if not state_group_delta:
+        return []
+
+    state_group_delta_encoded = []
+    for key, value in state_group_delta.items():
+        state_group_delta_encoded.append((key[0], key[1], _encode_state_dict(value)))
+
+    return state_group_delta_encoded
+
+
+def _decode_state_group_delta(
+    input: List[Tuple[int, int, List[Tuple[str, str, str]]]]
+) -> Dict[Tuple[int, int], StateMap[str]]:
+    if not input:
+        return {}
+
+    state_group_deltas = {}
+    for state_group_1, state_group_2, state_dict in input:
+        state_map = _decode_state_dict(state_dict)
+        assert state_map is not None
+        state_group_deltas[(state_group_1, state_group_2)] = state_map
+
+    return state_group_deltas
+
 
 def _encode_state_dict(
     state_dict: Optional[StateMap[str]],
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index e7b7b78b84..a55efcca56 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -164,21 +164,12 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
         if room_version.msc2176_redaction_rules:
             add_fields("invite")
 
-        if room_version.msc2716_historical:
-            add_fields("historical")
-
     elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
         add_fields("aliases")
     elif event_type == EventTypes.RoomHistoryVisibility:
         add_fields("history_visibility")
     elif event_type == EventTypes.Redaction and room_version.msc2176_redaction_rules:
         add_fields("redacts")
-    elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_INSERTION:
-        add_fields(EventContentFields.MSC2716_NEXT_BATCH_ID)
-    elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_BATCH:
-        add_fields(EventContentFields.MSC2716_BATCH_ID)
-    elif room_version.msc2716_redactions and event_type == EventTypes.MSC2716_MARKER:
-        add_fields(EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE)
 
     # Protect the rel_type and event_id fields under the m.relates_to field.
     if room_version.msc3389_relation_redactions:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index a2cf3a96c6..e5359ca558 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -260,7 +260,9 @@ class FederationClient(FederationBase):
         use_unstable = False
         for user_id, one_time_keys in query.items():
             for device_id, algorithms in one_time_keys.items():
-                if any(count > 1 for count in algorithms.values()):
+                # If more than one algorithm is requested, attempt to use the unstable
+                # endpoint.
+                if sum(algorithms.values()) > 1:
                     use_unstable = True
                 if algorithms:
                     # For the stable query, choose only the first algorithm.
@@ -296,6 +298,7 @@ class FederationClient(FederationBase):
         else:
             logger.debug("Skipping unstable claim client keys API")
 
+        # TODO Potentially attempt multiple queries and combine the results?
         return await self.transport_layer.claim_client_keys(
             user, destination, content, timeout
         )
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9425b32507..61fa3b30af 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -1016,7 +1016,9 @@ class FederationServer(FederationBase):
             for user_id, device_keys in result.items():
                 for device_id, keys in device_keys.items():
                     for key_id, key in keys.items():
-                        json_result.setdefault(user_id, {})[device_id] = {key_id: key}
+                        json_result.setdefault(user_id, {}).setdefault(device_id, {})[
+                            key_id
+                        ] = key
 
         logger.info(
             "Claimed one-time-keys: %s",
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index f3bdc5a4d2..97abbdee18 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -109,10 +109,8 @@ was enabled*, Catch-Up Mode is exited and we return to `_transaction_transmissio
 
 If a remote server is unreachable over federation, we back off from that server,
 with an exponentially-increasing retry interval.
-Whilst we don't automatically retry after the interval, we prevent making new attempts
-until such time as the back-off has cleared.
-Once the back-off is cleared and a new PDU or EDU arrives for transmission, the transmission
-loop resumes and empties the queue by making federation requests.
+We automatically retry after the retry interval expires (roughly, the logic to do so
+being triggered every minute).
 
 If the backoff grows too large (> 1 hour), the in-memory queue is emptied (to prevent
 unbounded growth) and Catch-Up Mode is entered.
@@ -145,7 +143,6 @@ from prometheus_client import Counter
 from typing_extensions import Literal
 
 from twisted.internet import defer
-from twisted.internet.interfaces import IDelayedCall
 
 import synapse.metrics
 from synapse.api.presence import UserPresenceState
@@ -184,14 +181,18 @@ sent_pdus_destination_dist_total = Counter(
     "Total number of PDUs queued for sending across all destinations",
 )
 
-# Time (in s) after Synapse's startup that we will begin to wake up destinations
-# that have catch-up outstanding.
-CATCH_UP_STARTUP_DELAY_SEC = 15
+# Time (in s) to wait before trying to wake up destinations that have
+# catch-up outstanding. This will also be the delay applied at startup
+# before trying the same.
+# Please note that rate limiting still applies, so while the loop is
+# executed every X seconds the destinations may not be wake up because
+# they are being rate limited following previous attempt failures.
+WAKEUP_RETRY_PERIOD_SEC = 60
 
 # Time (in s) to wait in between waking up each destination, i.e. one destination
-# will be woken up every <x> seconds after Synapse's startup until we have woken
-# every destination has outstanding catch-up.
-CATCH_UP_STARTUP_INTERVAL_SEC = 5
+# will be woken up every <x> seconds until we have woken every destination
+# has outstanding catch-up.
+WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC = 5
 
 
 class AbstractFederationSender(metaclass=abc.ABCMeta):
@@ -415,12 +416,10 @@ class FederationSender(AbstractFederationSender):
             / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
         )
 
-        # wake up destinations that have outstanding PDUs to be caught up
-        self._catchup_after_startup_timer: Optional[
-            IDelayedCall
-        ] = self.clock.call_later(
-            CATCH_UP_STARTUP_DELAY_SEC,
+        # Regularly wake up destinations that have outstanding PDUs to be caught up
+        self.clock.looping_call(
             run_as_background_process,
+            WAKEUP_RETRY_PERIOD_SEC * 1000.0,
             "wake_destinations_needing_catchup",
             self._wake_destinations_needing_catchup,
         )
@@ -966,7 +965,6 @@ class FederationSender(AbstractFederationSender):
 
             if not destinations_to_wake:
                 # finished waking all destinations!
-                self._catchup_after_startup_timer = None
                 break
 
             last_processed = destinations_to_wake[-1]
@@ -983,4 +981,4 @@ class FederationSender(AbstractFederationSender):
                     last_processed,
                 )
                 self.wake_destination(destination)
-                await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
+                await self.clock.sleep(WAKEUP_INTERVAL_BETWEEN_DESTINATIONS_SEC)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b7b5e21020..cc5ed97730 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -105,14 +105,12 @@ backfill_processing_before_timer = Histogram(
 )
 
 
+# TODO: We can refactor this away now that there is only one backfill point again
 class _BackfillPointType(Enum):
     # a regular backwards extremity (ie, an event which we don't yet have, but which
     # is referred to by other events in the DAG)
     BACKWARDS_EXTREMITY = enum.auto()
 
-    # an MSC2716 "insertion event"
-    INSERTION_PONT = enum.auto()
-
 
 @attr.s(slots=True, auto_attribs=True, frozen=True)
 class _BackfillPoint:
@@ -273,32 +271,10 @@ class FederationHandler:
             )
         ]
 
-        insertion_events_to_be_backfilled: List[_BackfillPoint] = []
-        if self.hs.config.experimental.msc2716_enabled:
-            insertion_events_to_be_backfilled = [
-                _BackfillPoint(event_id, depth, _BackfillPointType.INSERTION_PONT)
-                for event_id, depth in await self.store.get_insertion_event_backward_extremities_in_room(
-                    room_id=room_id,
-                    current_depth=current_depth,
-                    # We only need to end up with 5 extremities combined with
-                    # the backfill points to make the `/backfill` request ...
-                    # (see the other comment above for more context).
-                    limit=50,
-                )
-            ]
-        logger.debug(
-            "_maybe_backfill_inner: backwards_extremities=%s insertion_events_to_be_backfilled=%s",
-            backwards_extremities,
-            insertion_events_to_be_backfilled,
-        )
-
         # we now have a list of potential places to backpaginate from. We prefer to
         # start with the most recent (ie, max depth), so let's sort the list.
         sorted_backfill_points: List[_BackfillPoint] = sorted(
-            itertools.chain(
-                backwards_extremities,
-                insertion_events_to_be_backfilled,
-            ),
+            backwards_extremities,
             key=lambda e: -int(e.depth),
         )
 
@@ -411,10 +387,7 @@ class FederationHandler:
             #   event but not anything before it. This would require looking at the
             #   state *before* the event, ignoring the special casing certain event
             #   types have.
-            if bp.type == _BackfillPointType.INSERTION_PONT:
-                event_ids_to_check = [bp.event_id]
-            else:
-                event_ids_to_check = await self.store.get_successor_events(bp.event_id)
+            event_ids_to_check = await self.store.get_successor_events(bp.event_id)
 
             events_to_check = await self.store.get_events_as_list(
                 event_ids_to_check,
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 42141d3670..d32d224d56 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -601,18 +601,6 @@ class FederationEventHandler:
                 room_id, [(event, context)]
             )
 
-            # If we're joining the room again, check if there is new marker
-            # state indicating that there is new history imported somewhere in
-            # the DAG. Multiple markers can exist in the current state with
-            # unique state_keys.
-            #
-            # Do this after the state from the remote join was persisted (via
-            # `persist_events_and_notify`). Otherwise we can run into a
-            # situation where the create event doesn't exist yet in the
-            # `current_state_events`
-            for e in state:
-                await self._handle_marker_event(origin, e)
-
             return stream_id_after_persist
 
     async def update_state_for_partial_state_event(
@@ -915,13 +903,6 @@ class FederationEventHandler:
             )
         )
 
-        # We construct the event lists in source order from `/backfill` response because
-        # it's a) easiest, but also b) the order in which we process things matters for
-        # MSC2716 historical batches because many historical events are all at the same
-        # `depth` and we rely on the tenuous sort that the other server gave us and hope
-        # they're doing their best. The brittle nature of this ordering for historical
-        # messages over federation is one of the reasons why we don't want to continue
-        # on MSC2716 until we have online topological ordering.
         events_with_failed_pull_attempts, fresh_events = partition(
             new_events, lambda e: e.event_id in event_ids_with_failed_pull_attempts
         )
@@ -1460,8 +1441,6 @@ class FederationEventHandler:
 
         await self._run_push_actions_and_persist_event(event, context, backfilled)
 
-        await self._handle_marker_event(origin, event)
-
         if backfilled or context.rejected:
             return
 
@@ -1559,94 +1538,6 @@ class FederationEventHandler:
         except Exception:
             logger.exception("Failed to resync device for %s", sender)
 
-    @trace
-    async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
-        """Handles backfilling the insertion event when we receive a marker
-        event that points to one.
-
-        Args:
-            origin: Origin of the event. Will be called to get the insertion event
-            marker_event: The event to process
-        """
-
-        if marker_event.type != EventTypes.MSC2716_MARKER:
-            # Not a marker event
-            return
-
-        if marker_event.rejected_reason is not None:
-            # Rejected event
-            return
-
-        # Skip processing a marker event if the room version doesn't
-        # support it or the event is not from the room creator.
-        room_version = await self._store.get_room_version(marker_event.room_id)
-        create_event = await self._store.get_create_event_for_room(marker_event.room_id)
-        if not room_version.msc2175_implicit_room_creator:
-            room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
-        else:
-            room_creator = create_event.sender
-        if not room_version.msc2716_historical and (
-            not self._config.experimental.msc2716_enabled
-            or marker_event.sender != room_creator
-        ):
-            return
-
-        logger.debug("_handle_marker_event: received %s", marker_event)
-
-        insertion_event_id = marker_event.content.get(
-            EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
-        )
-
-        if insertion_event_id is None:
-            # Nothing to retrieve then (invalid marker)
-            return
-
-        already_seen_insertion_event = await self._store.have_seen_event(
-            marker_event.room_id, insertion_event_id
-        )
-        if already_seen_insertion_event:
-            # No need to process a marker again if we have already seen the
-            # insertion event that it was pointing to
-            return
-
-        logger.debug(
-            "_handle_marker_event: backfilling insertion event %s", insertion_event_id
-        )
-
-        await self._get_events_and_persist(
-            origin,
-            marker_event.room_id,
-            [insertion_event_id],
-        )
-
-        insertion_event = await self._store.get_event(
-            insertion_event_id, allow_none=True
-        )
-        if insertion_event is None:
-            logger.warning(
-                "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
-                origin,
-                insertion_event_id,
-                marker_event.event_id,
-            )
-            return
-
-        logger.debug(
-            "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
-            insertion_event,
-            marker_event,
-        )
-
-        await self._store.insert_insertion_extremity(
-            insertion_event_id, marker_event.room_id
-        )
-
-        logger.debug(
-            "_handle_marker_event: insertion extremity added for %s from marker event %s",
-            insertion_event,
-            marker_event,
-        )
-
     async def backfill_event_id(
         self, destinations: List[str], room_id: str, event_id: str
     ) -> PulledPduInfo:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 0b61c2272b..4292b47037 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -60,7 +60,6 @@ from synapse.replication.http.send_event import ReplicationSendEventRestServlet
 from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.types import (
-    MutableStateMap,
     PersistedEventPosition,
     Requester,
     RoomAlias,
@@ -573,7 +572,6 @@ class EventCreationHandler:
         state_event_ids: Optional[List[str]] = None,
         require_consent: bool = True,
         outlier: bool = False,
-        historical: bool = False,
         depth: Optional[int] = None,
         state_map: Optional[StateMap[str]] = None,
         for_batch: bool = False,
@@ -599,7 +597,7 @@ class EventCreationHandler:
             allow_no_prev_events: Whether to allow this event to be created an empty
                 list of prev_events. Normally this is prohibited just because most
                 events should have a prev_event and we should only use this in special
-                cases like MSC2716.
+                cases (previously useful for MSC2716).
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
@@ -614,13 +612,10 @@ class EventCreationHandler:
                 If non-None, prev_event_ids must also be provided.
 
             state_event_ids:
-                The full state at a given event. This is used particularly by the MSC2716
-                /batch_send endpoint. One use case is with insertion events which float at
-                the beginning of a historical batch and don't have any `prev_events` to
-                derive from; we add all of these state events as the explicit state so the
-                rest of the historical batch can inherit the same state and state_group.
-                This should normally be left as None, which will cause the auth_event_ids
-                to be calculated based on the room state at the prev_events.
+                The full state at a given event. This was previously used particularly
+                by the MSC2716 /batch_send endpoint. This should normally be left as
+                None, which will cause the auth_event_ids to be calculated based on the
+                room state at the prev_events.
 
             require_consent: Whether to check if the requester has
                 consented to the privacy policy.
@@ -629,10 +624,6 @@ class EventCreationHandler:
                 it's from an arbitrary point and floating in the DAG as
                 opposed to being inline with the current DAG.
 
-            historical: Indicates whether the message is being inserted
-                back in time around some existing events. This is used to skip
-                a few checks and mark the event as backfilled.
-
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
@@ -717,8 +708,6 @@ class EventCreationHandler:
 
         builder.internal_metadata.outlier = outlier
 
-        builder.internal_metadata.historical = historical
-
         event, unpersisted_context = await self.create_new_client_event(
             builder=builder,
             requester=requester,
@@ -947,7 +936,6 @@ class EventCreationHandler:
         txn_id: Optional[str] = None,
         ignore_shadow_ban: bool = False,
         outlier: bool = False,
-        historical: bool = False,
         depth: Optional[int] = None,
     ) -> Tuple[EventBase, int]:
         """
@@ -961,19 +949,16 @@ class EventCreationHandler:
             allow_no_prev_events: Whether to allow this event to be created an empty
                 list of prev_events. Normally this is prohibited just because most
                 events should have a prev_event and we should only use this in special
-                cases like MSC2716.
+                cases (previously useful for MSC2716).
             prev_event_ids:
                 The event IDs to use as the prev events.
                 Should normally be left as None to automatically request them
                 from the database.
             state_event_ids:
-                The full state at a given event. This is used particularly by the MSC2716
-                /batch_send endpoint. One use case is with insertion events which float at
-                the beginning of a historical batch and don't have any `prev_events` to
-                derive from; we add all of these state events as the explicit state so the
-                rest of the historical batch can inherit the same state and state_group.
-                This should normally be left as None, which will cause the auth_event_ids
-                to be calculated based on the room state at the prev_events.
+                The full state at a given event. This was previously used particularly
+                by the MSC2716 /batch_send endpoint. This should normally be left as
+                None, which will cause the auth_event_ids to be calculated based on the
+                room state at the prev_events.
             ratelimit: Whether to rate limit this send.
             txn_id: The transaction ID.
             ignore_shadow_ban: True if shadow-banned users should be allowed to
@@ -981,9 +966,6 @@ class EventCreationHandler:
             outlier: Indicates whether the event is an `outlier`, i.e. if
                 it's from an arbitrary point and floating in the DAG as
                 opposed to being inline with the current DAG.
-            historical: Indicates whether the message is being inserted
-                back in time around some existing events. This is used to skip
-                a few checks and mark the event as backfilled.
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
@@ -1053,7 +1035,6 @@ class EventCreationHandler:
                     prev_event_ids=prev_event_ids,
                     state_event_ids=state_event_ids,
                     outlier=outlier,
-                    historical=historical,
                     depth=depth,
                 )
                 context = await unpersisted_context.persist(event)
@@ -1145,7 +1126,7 @@ class EventCreationHandler:
             allow_no_prev_events: Whether to allow this event to be created an empty
                 list of prev_events. Normally this is prohibited just because most
                 events should have a prev_event and we should only use this in special
-                cases like MSC2716.
+                cases (previously useful for MSC2716).
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
@@ -1158,13 +1139,10 @@ class EventCreationHandler:
                 based on the room state at the prev_events.
 
             state_event_ids:
-                The full state at a given event. This is used particularly by the MSC2716
-                /batch_send endpoint. One use case is with insertion events which float at
-                the beginning of a historical batch and don't have any `prev_events` to
-                derive from; we add all of these state events as the explicit state so the
-                rest of the historical batch can inherit the same state and state_group.
-                This should normally be left as None, which will cause the auth_event_ids
-                to be calculated based on the room state at the prev_events.
+                The full state at a given event. This was previously used particularly
+                by the MSC2716 /batch_send endpoint. This should normally be left as
+                None, which will cause the auth_event_ids to be calculated based on the
+                room state at the prev_events.
 
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
@@ -1261,52 +1239,6 @@ class EventCreationHandler:
             if builder.internal_metadata.outlier:
                 event.internal_metadata.outlier = True
                 context = EventContext.for_outlier(self._storage_controllers)
-            elif (
-                event.type == EventTypes.MSC2716_INSERTION
-                and state_event_ids
-                and builder.internal_metadata.is_historical()
-            ):
-                # Add explicit state to the insertion event so it has state to derive
-                # from even though it's floating with no `prev_events`. The rest of
-                # the batch can derive from this state and state_group.
-                #
-                # TODO(faster_joins): figure out how this works, and make sure that the
-                #   old state is complete.
-                #   https://github.com/matrix-org/synapse/issues/13003
-                metadata = await self.store.get_metadata_for_events(state_event_ids)
-
-                state_map_for_event: MutableStateMap[str] = {}
-                for state_id in state_event_ids:
-                    data = metadata.get(state_id)
-                    if data is None:
-                        # We're trying to persist a new historical batch of events
-                        # with the given state, e.g. via
-                        # `RoomBatchSendEventRestServlet`. The state can be inferred
-                        # by Synapse or set directly by the client.
-                        #
-                        # Either way, we should have persisted all the state before
-                        # getting here.
-                        raise Exception(
-                            f"State event {state_id} not found in DB,"
-                            " Synapse should have persisted it before using it."
-                        )
-
-                    if data.state_key is None:
-                        raise Exception(
-                            f"Trying to set non-state event {state_id} as state"
-                        )
-
-                    state_map_for_event[(data.event_type, data.state_key)] = state_id
-
-                # TODO(faster_joins): check how MSC2716 works and whether we can have
-                #   partial state here
-                #   https://github.com/matrix-org/synapse/issues/13003
-                context = await self.state.calculate_context_info(
-                    event,
-                    state_ids_before_event=state_map_for_event,
-                    partial_state=False,
-                )
-
             else:
                 context = await self.state.calculate_context_info(event)
 
@@ -1876,28 +1808,6 @@ class EventCreationHandler:
                             403, "Redacting server ACL events is not permitted"
                         )
 
-                    # Add a little safety stop-gap to prevent people from trying to
-                    # redact MSC2716 related events when they're in a room version
-                    # which does not support it yet. We allow people to use MSC2716
-                    # events in existing room versions but only from the room
-                    # creator since it does not require any changes to the auth
-                    # rules and in effect, the redaction algorithm . In the
-                    # supported room version, we add the `historical` power level to
-                    # auth the MSC2716 related events and adjust the redaction
-                    # algorthim to keep the `historical` field around (redacting an
-                    # event should only strip fields which don't affect the
-                    # structural protocol level).
-                    is_msc2716_event = (
-                        original_event.type == EventTypes.MSC2716_INSERTION
-                        or original_event.type == EventTypes.MSC2716_BATCH
-                        or original_event.type == EventTypes.MSC2716_MARKER
-                    )
-                    if not room_version_obj.msc2716_historical and is_msc2716_event:
-                        raise AuthError(
-                            403,
-                            "Redacting MSC2716 events is not supported in this room version",
-                        )
-
                 event_types = event_auth.auth_types_for_event(event.room_version, event)
                 prev_state_ids = await context.get_prev_state_ids(
                     StateFilter.from_types(event_types)
@@ -1935,58 +1845,12 @@ class EventCreationHandler:
                 if prev_state_ids:
                     raise AuthError(403, "Changing the room create event is forbidden")
 
-            if event.type == EventTypes.MSC2716_INSERTION:
-                room_version = await self.store.get_room_version_id(event.room_id)
-                room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
-                create_event = await self.store.get_create_event_for_room(event.room_id)
-                if not room_version_obj.msc2175_implicit_room_creator:
-                    room_creator = create_event.content.get(
-                        EventContentFields.ROOM_CREATOR
-                    )
-                else:
-                    room_creator = create_event.sender
-
-                # Only check an insertion event if the room version
-                # supports it or the event is from the room creator.
-                if room_version_obj.msc2716_historical or (
-                    self.config.experimental.msc2716_enabled
-                    and event.sender == room_creator
-                ):
-                    next_batch_id = event.content.get(
-                        EventContentFields.MSC2716_NEXT_BATCH_ID
-                    )
-                    conflicting_insertion_event_id = None
-                    if next_batch_id:
-                        conflicting_insertion_event_id = (
-                            await self.store.get_insertion_event_id_by_batch_id(
-                                event.room_id, next_batch_id
-                            )
-                        )
-                    if conflicting_insertion_event_id is not None:
-                        # The current insertion event that we're processing is invalid
-                        # because an insertion event already exists in the room with the
-                        # same next_batch_id. We can't allow multiple because the batch
-                        # pointing will get weird, e.g. we can't determine which insertion
-                        # event the batch event is pointing to.
-                        raise SynapseError(
-                            HTTPStatus.BAD_REQUEST,
-                            "Another insertion event already exists with the same next_batch_id",
-                            errcode=Codes.INVALID_PARAM,
-                        )
-
-            # Mark any `m.historical` messages as backfilled so they don't appear
-            # in `/sync` and have the proper decrementing `stream_ordering` as we import
-            backfilled = False
-            if event.internal_metadata.is_historical():
-                backfilled = True
-
         assert self._storage_controllers.persistence is not None
         (
             persisted_events,
             max_stream_token,
         ) = await self._storage_controllers.persistence.persist_events(
-            events_and_context, backfilled=backfilled
+            events_and_context,
         )
 
         events_and_pos = []
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d5257acb7d..19b8728db9 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -40,6 +40,11 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+# How many single event gaps we tolerate returning in a `/messages` response before we
+# backfill and try to fill in the history. This is an arbitrarily picked number so feel
+# free to tune it in the future.
+BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
+
 
 @attr.s(slots=True, auto_attribs=True)
 class PurgeStatus:
@@ -486,35 +491,35 @@ class PaginationHandler:
                         room_id, room_token.stream
                     )
 
-                if not use_admin_priviledge and membership == Membership.LEAVE:
-                    # If they have left the room then clamp the token to be before
-                    # they left the room, to save the effort of loading from the
-                    # database.
-
-                    # This is only None if the room is world_readable, in which
-                    # case "JOIN" would have been returned.
-                    assert member_event_id
+            # If they have left the room then clamp the token to be before
+            # they left the room, to save the effort of loading from the
+            # database.
+            if (
+                pagin_config.direction == Direction.BACKWARDS
+                and not use_admin_priviledge
+                and membership == Membership.LEAVE
+            ):
+                # This is only None if the room is world_readable, in which case
+                # "Membership.JOIN" would have been returned and we should never hit
+                # this branch.
+                assert member_event_id
+
+                leave_token = await self.store.get_topological_token_for_event(
+                    member_event_id
+                )
+                assert leave_token.topological is not None
 
-                    leave_token = await self.store.get_topological_token_for_event(
-                        member_event_id
+                if leave_token.topological < curr_topo:
+                    from_token = from_token.copy_and_replace(
+                        StreamKeyType.ROOM, leave_token
                     )
-                    assert leave_token.topological is not None
-
-                    if leave_token.topological < curr_topo:
-                        from_token = from_token.copy_and_replace(
-                            StreamKeyType.ROOM, leave_token
-                        )
-
-                await self.hs.get_federation_handler().maybe_backfill(
-                    room_id,
-                    curr_topo,
-                    limit=pagin_config.limit,
-                )
 
             to_room_key = None
             if pagin_config.to_token:
                 to_room_key = pagin_config.to_token.room_key
 
+            # Initially fetch the events from the database. With any luck, we can return
+            # these without blocking on backfill (handled below).
             events, next_key = await self.store.paginate_room_events(
                 room_id=room_id,
                 from_key=from_token.room_key,
@@ -524,6 +529,94 @@ class PaginationHandler:
                 event_filter=event_filter,
             )
 
+            if pagin_config.direction == Direction.BACKWARDS:
+                # We use a `Set` because there can be multiple events at a given depth
+                # and we only care about looking at the unique continum of depths to
+                # find gaps.
+                event_depths: Set[int] = {event.depth for event in events}
+                sorted_event_depths = sorted(event_depths)
+
+                # Inspect the depths of the returned events to see if there are any gaps
+                found_big_gap = False
+                number_of_gaps = 0
+                previous_event_depth = (
+                    sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
+                )
+                for event_depth in sorted_event_depths:
+                    # We don't expect a negative depth but we'll just deal with it in
+                    # any case by taking the absolute value to get the true gap between
+                    # any two integers.
+                    depth_gap = abs(event_depth - previous_event_depth)
+                    # A `depth_gap` of 1 is a normal continuous chain to the next event
+                    # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
+                    # also possible there is no event at a given depth but we can't ever
+                    # know that for sure)
+                    if depth_gap > 1:
+                        number_of_gaps += 1
+
+                    # We only tolerate a small number single-event long gaps in the
+                    # returned events because those are most likely just events we've
+                    # failed to pull in the past. Anything longer than that is probably
+                    # a sign that we're missing a decent chunk of history and we should
+                    # try to backfill it.
+                    #
+                    # XXX: It's possible we could tolerate longer gaps if we checked
+                    # that a given events `prev_events` is one that has failed pull
+                    # attempts and we could just treat it like a dead branch of history
+                    # for now or at least something that we don't need the block the
+                    # client on to try pulling.
+                    #
+                    # XXX: If we had something like MSC3871 to indicate gaps in the
+                    # timeline to the client, we could also get away with any sized gap
+                    # and just have the client refetch the holes as they see fit.
+                    if depth_gap > 2:
+                        found_big_gap = True
+                        break
+                    previous_event_depth = event_depth
+
+                # Backfill in the foreground if we found a big gap, have too many holes,
+                # or we don't have enough events to fill the limit that the client asked
+                # for.
+                missing_too_many_events = (
+                    number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
+                )
+                not_enough_events_to_fill_response = len(events) < pagin_config.limit
+                if (
+                    found_big_gap
+                    or missing_too_many_events
+                    or not_enough_events_to_fill_response
+                ):
+                    did_backfill = (
+                        await self.hs.get_federation_handler().maybe_backfill(
+                            room_id,
+                            curr_topo,
+                            limit=pagin_config.limit,
+                        )
+                    )
+
+                    # If we did backfill something, refetch the events from the database to
+                    # catch anything new that might have been added since we last fetched.
+                    if did_backfill:
+                        events, next_key = await self.store.paginate_room_events(
+                            room_id=room_id,
+                            from_key=from_token.room_key,
+                            to_key=to_room_key,
+                            direction=pagin_config.direction,
+                            limit=pagin_config.limit,
+                            event_filter=event_filter,
+                        )
+                else:
+                    # Otherwise, we can backfill in the background for eventual
+                    # consistency's sake but we don't need to block the client waiting
+                    # for a costly federation call and processing.
+                    run_as_background_process(
+                        "maybe_backfill_in_the_background",
+                        self.hs.get_federation_handler().maybe_backfill,
+                        room_id,
+                        curr_topo,
+                        limit=pagin_config.limit,
+                    )
+
             next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
 
         # if no events are returned from pagination, that implies
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
deleted file mode 100644
index bf9df60218..0000000000
--- a/synapse/handlers/room_batch.py
+++ /dev/null
@@ -1,466 +0,0 @@
-import logging
-from typing import TYPE_CHECKING, List, Tuple
-
-from synapse.api.constants import EventContentFields, EventTypes
-from synapse.appservice import ApplicationService
-from synapse.http.servlet import assert_params_in_dict
-from synapse.types import JsonDict, Requester, UserID, create_requester
-from synapse.util.stringutils import random_string
-
-if TYPE_CHECKING:
-    from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class RoomBatchHandler:
-    def __init__(self, hs: "HomeServer"):
-        self.hs = hs
-        self.store = hs.get_datastores().main
-        self._state_storage_controller = hs.get_storage_controllers().state
-        self.event_creation_handler = hs.get_event_creation_handler()
-        self.room_member_handler = hs.get_room_member_handler()
-        self.auth = hs.get_auth()
-
-    async def inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int:
-        """Finds the depth which would sort it after the most-recent
-        prev_event_id but before the successors of those events. If no
-        successors are found, we assume it's an historical extremity part of the
-        current batch and use the same depth of the prev_event_ids.
-
-        Args:
-            prev_event_ids: List of prev event IDs
-
-        Returns:
-            Inherited depth
-        """
-        (
-            most_recent_prev_event_id,
-            most_recent_prev_event_depth,
-        ) = await self.store.get_max_depth_of(prev_event_ids)
-
-        # We want to insert the historical event after the `prev_event` but before the successor event
-        #
-        # We inherit depth from the successor event instead of the `prev_event`
-        # because events returned from `/messages` are first sorted by `topological_ordering`
-        # which is just the `depth` and then tie-break with `stream_ordering`.
-        #
-        # We mark these inserted historical events as "backfilled" which gives them a
-        # negative `stream_ordering`. If we use the same depth as the `prev_event`,
-        # then our historical event will tie-break and be sorted before the `prev_event`
-        # when it should come after.
-        #
-        # We want to use the successor event depth so they appear after `prev_event` because
-        # it has a larger `depth` but before the successor event because the `stream_ordering`
-        # is negative before the successor event.
-        assert most_recent_prev_event_id is not None
-        successor_event_ids = await self.store.get_successor_events(
-            most_recent_prev_event_id
-        )
-
-        # If we can't find any successor events, then it's a forward extremity of
-        # historical messages and we can just inherit from the previous historical
-        # event which we can already assume has the correct depth where we want
-        # to insert into.
-        if not successor_event_ids:
-            depth = most_recent_prev_event_depth
-        else:
-            (
-                _,
-                oldest_successor_depth,
-            ) = await self.store.get_min_depth_of(successor_event_ids)
-
-            depth = oldest_successor_depth
-
-        return depth
-
-    def create_insertion_event_dict(
-        self, sender: str, room_id: str, origin_server_ts: int
-    ) -> JsonDict:
-        """Creates an event dict for an "insertion" event with the proper fields
-        and a random batch ID.
-
-        Args:
-            sender: The event author MXID
-            room_id: The room ID that the event belongs to
-            origin_server_ts: Timestamp when the event was sent
-
-        Returns:
-            The new event dictionary to insert.
-        """
-
-        next_batch_id = random_string(8)
-        insertion_event = {
-            "type": EventTypes.MSC2716_INSERTION,
-            "sender": sender,
-            "room_id": room_id,
-            "content": {
-                EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id,
-                EventContentFields.MSC2716_HISTORICAL: True,
-            },
-            "origin_server_ts": origin_server_ts,
-        }
-
-        return insertion_event
-
-    async def create_requester_for_user_id_from_app_service(
-        self, user_id: str, app_service: ApplicationService
-    ) -> Requester:
-        """Creates a new requester for the given user_id
-        and validates that the app service is allowed to control
-        the given user.
-
-        Args:
-            user_id: The author MXID that the app service is controlling
-            app_service: The app service that controls the user
-
-        Returns:
-            Requester object
-        """
-
-        await self.auth.validate_appservice_can_control_user_id(app_service, user_id)
-
-        return create_requester(user_id, app_service=app_service)
-
-    async def get_most_recent_full_state_ids_from_event_id_list(
-        self, event_ids: List[str]
-    ) -> List[str]:
-        """Find the most recent event_id and grab the full state at that event.
-        We will use this as a base to auth our historical messages against.
-
-        Args:
-            event_ids: List of event ID's to look at
-
-        Returns:
-            List of event ID's
-        """
-
-        (
-            most_recent_event_id,
-            _,
-        ) = await self.store.get_max_depth_of(event_ids)
-        # mapping from (type, state_key) -> state_event_id
-        assert most_recent_event_id is not None
-        prev_state_map = await self._state_storage_controller.get_state_ids_for_event(
-            most_recent_event_id
-        )
-        # List of state event ID's
-        full_state_ids = list(prev_state_map.values())
-
-        return full_state_ids
-
-    async def persist_state_events_at_start(
-        self,
-        state_events_at_start: List[JsonDict],
-        room_id: str,
-        initial_state_event_ids: List[str],
-        app_service_requester: Requester,
-    ) -> List[str]:
-        """Takes all `state_events_at_start` event dictionaries and creates/persists
-        them in a floating state event chain which don't resolve into the current room
-        state. They are floating because they reference no prev_events which disconnects
-        them from the normal DAG.
-
-        Args:
-            state_events_at_start:
-            room_id: Room where you want the events persisted in.
-            initial_state_event_ids:
-                The base set of state for the historical batch which the floating
-                state chain will derive from. This should probably be the state
-                from the `prev_event` defined by `/batch_send?prev_event_id=$abc`.
-            app_service_requester: The requester of an application service.
-
-        Returns:
-            List of state event ID's we just persisted
-        """
-        assert app_service_requester.app_service
-
-        state_event_ids_at_start = []
-        state_event_ids = initial_state_event_ids.copy()
-
-        # Make the state events float off on their own by specifying no
-        # prev_events for the first one in the chain so we don't have a bunch of
-        # `@mxid joined the room` noise between each batch.
-        prev_event_ids_for_state_chain: List[str] = []
-
-        for index, state_event in enumerate(state_events_at_start):
-            assert_params_in_dict(
-                state_event, ["type", "origin_server_ts", "content", "sender"]
-            )
-
-            logger.debug(
-                "RoomBatchSendEventRestServlet inserting state_event=%s", state_event
-            )
-
-            event_dict = {
-                "type": state_event["type"],
-                "origin_server_ts": state_event["origin_server_ts"],
-                "content": state_event["content"],
-                "room_id": room_id,
-                "sender": state_event["sender"],
-                "state_key": state_event["state_key"],
-            }
-
-            # Mark all events as historical
-            event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
-
-            # TODO: This is pretty much the same as some other code to handle inserting state in this file
-            if event_dict["type"] == EventTypes.Member:
-                membership = event_dict["content"].get("membership", None)
-                event_id, _ = await self.room_member_handler.update_membership(
-                    await self.create_requester_for_user_id_from_app_service(
-                        state_event["sender"], app_service_requester.app_service
-                    ),
-                    target=UserID.from_string(event_dict["state_key"]),
-                    room_id=room_id,
-                    action=membership,
-                    content=event_dict["content"],
-                    historical=True,
-                    # Only the first event in the state chain should be floating.
-                    # The rest should hang off each other in a chain.
-                    allow_no_prev_events=index == 0,
-                    prev_event_ids=prev_event_ids_for_state_chain,
-                    # The first event in the state chain is floating with no
-                    # `prev_events` which means it can't derive state from
-                    # anywhere automatically. So we need to set some state
-                    # explicitly.
-                    #
-                    # Make sure to use a copy of this list because we modify it
-                    # later in the loop here. Otherwise it will be the same
-                    # reference and also update in the event when we append
-                    # later.
-                    state_event_ids=state_event_ids.copy(),
-                )
-            else:
-                (
-                    event,
-                    _,
-                ) = await self.event_creation_handler.create_and_send_nonmember_event(
-                    await self.create_requester_for_user_id_from_app_service(
-                        state_event["sender"], app_service_requester.app_service
-                    ),
-                    event_dict,
-                    historical=True,
-                    # Only the first event in the state chain should be floating.
-                    # The rest should hang off each other in a chain.
-                    allow_no_prev_events=index == 0,
-                    prev_event_ids=prev_event_ids_for_state_chain,
-                    # The first event in the state chain is floating with no
-                    # `prev_events` which means it can't derive state from
-                    # anywhere automatically. So we need to set some state
-                    # explicitly.
-                    #
-                    # Make sure to use a copy of this list because we modify it
-                    # later in the loop here. Otherwise it will be the same
-                    # reference and also update in the event when we append later.
-                    state_event_ids=state_event_ids.copy(),
-                )
-                event_id = event.event_id
-
-            state_event_ids_at_start.append(event_id)
-            state_event_ids.append(event_id)
-            # Connect all the state in a floating chain
-            prev_event_ids_for_state_chain = [event_id]
-
-        return state_event_ids_at_start
-
-    async def persist_historical_events(
-        self,
-        events_to_create: List[JsonDict],
-        room_id: str,
-        inherited_depth: int,
-        initial_state_event_ids: List[str],
-        app_service_requester: Requester,
-    ) -> List[str]:
-        """Create and persists all events provided sequentially. Handles the
-        complexity of creating events in chronological order so they can
-        reference each other by prev_event but still persists in
-        reverse-chronoloical order so they have the correct
-        (topological_ordering, stream_ordering) and sort correctly from
-        /messages.
-
-        Args:
-            events_to_create: List of historical events to create in JSON
-                dictionary format.
-            room_id: Room where you want the events persisted in.
-            inherited_depth: The depth to create the events at (you will
-                probably by calling inherit_depth_from_prev_ids(...)).
-            initial_state_event_ids:
-                This is used to set explicit state for the insertion event at
-                the start of the historical batch since it's floating with no
-                prev_events to derive state from automatically.
-            app_service_requester: The requester of an application service.
-
-        Returns:
-            List of persisted event IDs
-        """
-        assert app_service_requester.app_service
-
-        # We expect the first event in a historical batch to be an insertion event
-        assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION
-        # We expect the last event in a historical batch to be an batch event
-        assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
-
-        # Make the historical event chain float off on its own by specifying no
-        # prev_events for the first event in the chain which causes the HS to
-        # ask for the state at the start of the batch later.
-        prev_event_ids: List[str] = []
-
-        event_ids = []
-        events_to_persist = []
-        for index, ev in enumerate(events_to_create):
-            assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
-
-            assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
-                ev["sender"],
-            )
-
-            event_dict = {
-                "type": ev["type"],
-                "origin_server_ts": ev["origin_server_ts"],
-                "content": ev["content"],
-                "room_id": room_id,
-                "sender": ev["sender"],  # requester.user.to_string(),
-                "prev_events": prev_event_ids.copy(),
-            }
-
-            # Mark all events as historical
-            event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
-
-            event, unpersisted_context = await self.event_creation_handler.create_event(
-                await self.create_requester_for_user_id_from_app_service(
-                    ev["sender"], app_service_requester.app_service
-                ),
-                event_dict,
-                # Only the first event (which is the insertion event) in the
-                # chain should be floating. The rest should hang off each other
-                # in a chain.
-                allow_no_prev_events=index == 0,
-                prev_event_ids=event_dict.get("prev_events"),
-                # Since the first event (which is the insertion event) in the
-                # chain is floating with no `prev_events`, it can't derive state
-                # from anywhere automatically. So we need to set some state
-                # explicitly.
-                state_event_ids=initial_state_event_ids if index == 0 else None,
-                historical=True,
-                depth=inherited_depth,
-            )
-            context = await unpersisted_context.persist(event)
-            assert context._state_group
-
-            # Normally this is done when persisting the event but we have to
-            # pre-emptively do it here because we create all the events first,
-            # then persist them in another pass below. And we want to share
-            # state_groups across the whole batch so this lookup needs to work
-            # for the next event in the batch in this loop.
-            await self.store.store_state_group_id_for_event_id(
-                event_id=event.event_id,
-                state_group_id=context._state_group,
-            )
-
-            logger.debug(
-                "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s",
-                event,
-                prev_event_ids,
-            )
-
-            events_to_persist.append((event, context))
-            event_id = event.event_id
-
-            event_ids.append(event_id)
-            prev_event_ids = [event_id]
-
-        # Persist events in reverse-chronological order so they have the
-        # correct stream_ordering as they are backfilled (which decrements).
-        # Events are sorted by (topological_ordering, stream_ordering)
-        # where topological_ordering is just depth.
-        for event, context in reversed(events_to_persist):
-            # This call can't raise `PartialStateConflictError` since we forbid
-            # use of the historical batch API during partial state
-            await self.event_creation_handler.handle_new_client_event(
-                await self.create_requester_for_user_id_from_app_service(
-                    event.sender, app_service_requester.app_service
-                ),
-                events_and_context=[(event, context)],
-            )
-
-        return event_ids
-
-    async def handle_batch_of_events(
-        self,
-        events_to_create: List[JsonDict],
-        room_id: str,
-        batch_id_to_connect_to: str,
-        inherited_depth: int,
-        initial_state_event_ids: List[str],
-        app_service_requester: Requester,
-    ) -> Tuple[List[str], str]:
-        """
-        Handles creating and persisting all of the historical events as well as
-        insertion and batch meta events to make the batch navigable in the DAG.
-
-        Args:
-            events_to_create: List of historical events to create in JSON
-                dictionary format.
-            room_id: Room where you want the events created in.
-            batch_id_to_connect_to: The batch_id from the insertion event you
-                want this batch to connect to.
-            inherited_depth: The depth to create the events at (you will
-                probably by calling inherit_depth_from_prev_ids(...)).
-            initial_state_event_ids:
-                This is used to set explicit state for the insertion event at
-                the start of the historical batch since it's floating with no
-                prev_events to derive state from automatically. This should
-                probably be the state from the `prev_event` defined by
-                `/batch_send?prev_event_id=$abc` plus the outcome of
-                `persist_state_events_at_start`
-            app_service_requester: The requester of an application service.
-
-        Returns:
-            Tuple containing a list of created events and the next_batch_id
-        """
-
-        # Connect this current batch to the insertion event from the previous batch
-        last_event_in_batch = events_to_create[-1]
-        batch_event = {
-            "type": EventTypes.MSC2716_BATCH,
-            "sender": app_service_requester.user.to_string(),
-            "room_id": room_id,
-            "content": {
-                EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to,
-                EventContentFields.MSC2716_HISTORICAL: True,
-            },
-            # Since the batch event is put at the end of the batch,
-            # where the newest-in-time event is, copy the origin_server_ts from
-            # the last event we're inserting
-            "origin_server_ts": last_event_in_batch["origin_server_ts"],
-        }
-        # Add the batch event to the end of the batch (newest-in-time)
-        events_to_create.append(batch_event)
-
-        # Add an "insertion" event to the start of each batch (next to the oldest-in-time
-        # event in the batch) so the next batch can be connected to this one.
-        insertion_event = self.create_insertion_event_dict(
-            sender=app_service_requester.user.to_string(),
-            room_id=room_id,
-            # Since the insertion event is put at the start of the batch,
-            # where the oldest-in-time event is, copy the origin_server_ts from
-            # the first event we're inserting
-            origin_server_ts=events_to_create[0]["origin_server_ts"],
-        )
-        next_batch_id = insertion_event["content"][
-            EventContentFields.MSC2716_NEXT_BATCH_ID
-        ]
-        # Prepend the insertion event to the start of the batch (oldest-in-time)
-        events_to_create = [insertion_event] + events_to_create
-
-        # Create and persist all of the historical events
-        event_ids = await self.persist_historical_events(
-            events_to_create=events_to_create,
-            room_id=room_id,
-            inherited_depth=inherited_depth,
-            initial_state_event_ids=initial_state_event_ids,
-            app_service_requester=app_service_requester,
-        )
-
-        return event_ids, next_batch_id
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index af0ca5c26d..82e4fa7363 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -362,7 +362,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         content: Optional[dict] = None,
         require_consent: bool = True,
         outlier: bool = False,
-        historical: bool = False,
         origin_server_ts: Optional[int] = None,
     ) -> Tuple[str, int]:
         """
@@ -378,16 +377,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             allow_no_prev_events: Whether to allow this event to be created an empty
                 list of prev_events. Normally this is prohibited just because most
                 events should have a prev_event and we should only use this in special
-                cases like MSC2716.
+                cases (previously useful for MSC2716).
             prev_event_ids: The event IDs to use as the prev events
             state_event_ids:
-                The full state at a given event. This is used particularly by the MSC2716
-                /batch_send endpoint. One use case is the historical `state_events_at_start`;
-                since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
-                have any `state_ids` set and therefore can't derive any state even though the
-                prev_events are set so we need to set them ourself via this argument.
-                This should normally be left as None, which will cause the auth_event_ids
-                to be calculated based on the room state at the prev_events.
+                The full state at a given event. This was previously used particularly
+                by the MSC2716 /batch_send endpoint. This should normally be left as
+                None, which will cause the auth_event_ids to be calculated based on the
+                room state at the prev_events.
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
@@ -400,9 +396,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             outlier: Indicates whether the event is an `outlier`, i.e. if
                 it's from an arbitrary point and floating in the DAG as
                 opposed to being inline with the current DAG.
-            historical: Indicates whether the message is being inserted
-                back in time around some existing events. This is used to skip
-                a few checks and mark the event as backfilled.
             origin_server_ts: The origin_server_ts to use if a new event is created. Uses
                 the current timestamp if set to None.
 
@@ -477,7 +470,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                     depth=depth,
                     require_consent=require_consent,
                     outlier=outlier,
-                    historical=historical,
                 )
                 context = await unpersisted_context.persist(event)
                 prev_state_ids = await context.get_prev_state_ids(
@@ -585,7 +577,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         new_room: bool = False,
         require_consent: bool = True,
         outlier: bool = False,
-        historical: bool = False,
         allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         state_event_ids: Optional[List[str]] = None,
@@ -610,22 +601,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             outlier: Indicates whether the event is an `outlier`, i.e. if
                 it's from an arbitrary point and floating in the DAG as
                 opposed to being inline with the current DAG.
-            historical: Indicates whether the message is being inserted
-                back in time around some existing events. This is used to skip
-                a few checks and mark the event as backfilled.
             allow_no_prev_events: Whether to allow this event to be created an empty
                 list of prev_events. Normally this is prohibited just because most
                 events should have a prev_event and we should only use this in special
-                cases like MSC2716.
+                cases (previously useful for MSC2716).
             prev_event_ids: The event IDs to use as the prev events
             state_event_ids:
-                The full state at a given event. This is used particularly by the MSC2716
-                /batch_send endpoint. One use case is the historical `state_events_at_start`;
-                since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
-                have any `state_ids` set and therefore can't derive any state even though the
-                prev_events are set so we need to set them ourself via this argument.
-                This should normally be left as None, which will cause the auth_event_ids
-                to be calculated based on the room state at the prev_events.
+                The full state at a given event. This was previously used particularly
+                by the MSC2716 /batch_send endpoint. This should normally be left as
+                None, which will cause the auth_event_ids to be calculated based on the
+                room state at the prev_events.
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
@@ -667,7 +652,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                         new_room=new_room,
                         require_consent=require_consent,
                         outlier=outlier,
-                        historical=historical,
                         allow_no_prev_events=allow_no_prev_events,
                         prev_event_ids=prev_event_ids,
                         state_event_ids=state_event_ids,
@@ -691,7 +675,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         new_room: bool = False,
         require_consent: bool = True,
         outlier: bool = False,
-        historical: bool = False,
         allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         state_event_ids: Optional[List[str]] = None,
@@ -718,22 +701,16 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             outlier: Indicates whether the event is an `outlier`, i.e. if
                 it's from an arbitrary point and floating in the DAG as
                 opposed to being inline with the current DAG.
-            historical: Indicates whether the message is being inserted
-                back in time around some existing events. This is used to skip
-                a few checks and mark the event as backfilled.
             allow_no_prev_events: Whether to allow this event to be created an empty
                 list of prev_events. Normally this is prohibited just because most
                 events should have a prev_event and we should only use this in special
-                cases like MSC2716.
+                cases (previously useful for MSC2716).
             prev_event_ids: The event IDs to use as the prev events
             state_event_ids:
-                The full state at a given event. This is used particularly by the MSC2716
-                /batch_send endpoint. One use case is the historical `state_events_at_start`;
-                since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
-                have any `state_ids` set and therefore can't derive any state even though the
-                prev_events are set so we need to set them ourself via this argument.
-                This should normally be left as None, which will cause the auth_event_ids
-                to be calculated based on the room state at the prev_events.
+                The full state at a given event. This was previously used particularly
+                by the MSC2716 /batch_send endpoint. This should normally be left as
+                None, which will cause the auth_event_ids to be calculated based on the
+                room state at the prev_events.
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
@@ -877,7 +854,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 content=content,
                 require_consent=require_consent,
                 outlier=outlier,
-                historical=historical,
                 origin_server_ts=origin_server_ts,
             )
 
@@ -1498,7 +1474,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         # put the server which owns the alias at the front of the server list.
         if room_alias.domain in servers:
             servers.remove(room_alias.domain)
-        servers.insert(0, room_alias.domain)
+            servers.insert(0, room_alias.domain)
 
         return RoomID.from_string(room_id), servers
 
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 7e8cf31682..91a24efcd0 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -51,8 +51,10 @@ logger = logging.getLogger(__name__)
 @implementer(IAgent)
 class MatrixFederationAgent:
     """An Agent-like thing which provides a `request` method which correctly
-    handles resolving matrix server names when using matrix://. Handles standard
-    https URIs as normal.
+    handles resolving matrix server names when using `matrix-federation://`. Handles
+    standard https URIs as normal. The `matrix-federation://` scheme is internal to
+    Synapse and we purposely want to avoid colliding with the `matrix://` URL scheme
+    which is now specced.
 
     Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
 
@@ -167,14 +169,14 @@ class MatrixFederationAgent:
         # There must be a valid hostname.
         assert parsed_uri.hostname
 
-        # If this is a matrix:// URI check if the server has delegated matrix
+        # If this is a matrix-federation:// URI check if the server has delegated matrix
         # traffic using well-known delegation.
         #
         # We have to do this here and not in the endpoint as we need to rewrite
         # the host header with the delegated server name.
         delegated_server = None
         if (
-            parsed_uri.scheme == b"matrix"
+            parsed_uri.scheme == b"matrix-federation"
             and not _is_ip_literal(parsed_uri.hostname)
             and not parsed_uri.port
         ):
@@ -250,7 +252,7 @@ class MatrixHostnameEndpointFactory:
 
 @implementer(IStreamClientEndpoint)
 class MatrixHostnameEndpoint:
-    """An endpoint that resolves matrix:// URLs using Matrix server name
+    """An endpoint that resolves matrix-federation:// URLs using Matrix server name
     resolution (i.e. via SRV). Does not check for well-known delegation.
 
     Args:
@@ -379,7 +381,7 @@ class MatrixHostnameEndpoint:
         connect to.
         """
 
-        if self._parsed_uri.scheme != b"matrix":
+        if self._parsed_uri.scheme != b"matrix-federation":
             return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)]
 
         # Note: We don't do well-known lookup as that needs to have happened
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index abb5ae5815..fc0101808d 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -174,7 +174,14 @@ class MatrixFederationRequest:
 
         # The object is frozen so we can pre-compute this.
         uri = urllib.parse.urlunparse(
-            (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
+            (
+                b"matrix-federation",
+                destination_bytes,
+                path_bytes,
+                None,
+                query_bytes,
+                b"",
+            )
         )
         object.__setattr__(self, "uri", uri)
 
diff --git a/synapse/media/_base.py b/synapse/media/_base.py
index ef8334ae25..20cb8b9010 100644
--- a/synapse/media/_base.py
+++ b/synapse/media/_base.py
@@ -152,6 +152,9 @@ def add_file_headers(
         content_type = media_type
 
     request.setHeader(b"Content-Type", content_type.encode("UTF-8"))
+
+    # Use a Content-Disposition of attachment to force download of media.
+    disposition = "attachment"
     if upload_name:
         # RFC6266 section 4.1 [1] defines both `filename` and `filename*`.
         #
@@ -173,11 +176,17 @@ def add_file_headers(
         # correctly interpret those as of 0.99.2 and (b) they are a bit of a pain and we
         # may as well just do the filename* version.
         if _can_encode_filename_as_token(upload_name):
-            disposition = "inline; filename=%s" % (upload_name,)
+            disposition = "%s; filename=%s" % (
+                disposition,
+                upload_name,
+            )
         else:
-            disposition = "inline; filename*=utf-8''%s" % (_quote(upload_name),)
+            disposition = "%s; filename*=utf-8''%s" % (
+                disposition,
+                _quote(upload_name),
+            )
 
-        request.setHeader(b"Content-Disposition", disposition.encode("ascii"))
+    request.setHeader(b"Content-Disposition", disposition.encode("ascii"))
 
     # cache for at least a day.
     # XXX: we might want to turn this off for data we don't want to
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 33002cc0f2..67377c647b 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -322,7 +322,6 @@ class BulkPushRuleEvaluator:
     ) -> None:
         if (
             not event.internal_metadata.is_notifiable()
-            or event.internal_metadata.is_historical()
             or event.room_id in self.hs.config.server.rooms_to_exclude_from_sync
         ):
             # Push rules for events that aren't notifiable can't be processed by this and
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index 88b52c26a0..735cef0aed 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -41,12 +41,7 @@ def format_push_rules_for_user(
 
         rulearray.append(template_rule)
 
-        for type_key in ("pattern", "value"):
-            type_value = template_rule.pop(f"{type_key}_type", None)
-            if type_value == "user_id":
-                template_rule[type_key] = user.to_string()
-            elif type_value == "user_localpart":
-                template_rule[type_key] = user.localpart
+        _convert_type_to_value(template_rule, user)
 
         template_rule["enabled"] = enabled
 
@@ -63,19 +58,20 @@ def format_push_rules_for_user(
         for c in template_rule["conditions"]:
             c.pop("_cache_key", None)
 
-            pattern_type = c.pop("pattern_type", None)
-            if pattern_type == "user_id":
-                c["pattern"] = user.to_string()
-            elif pattern_type == "user_localpart":
-                c["pattern"] = user.localpart
-
-            sender_type = c.pop("sender_type", None)
-            if sender_type == "user_id":
-                c["sender"] = user.to_string()
+            _convert_type_to_value(c, user)
 
     return rules
 
 
+def _convert_type_to_value(rule_or_cond: Dict[str, Any], user: UserID) -> None:
+    for type_key in ("pattern", "value"):
+        type_value = rule_or_cond.pop(f"{type_key}_type", None)
+        if type_value == "user_id":
+            rule_or_cond[type_key] = user.to_string()
+        elif type_value == "user_localpart":
+            rule_or_cond[type_key] = user.localpart
+
+
 def _add_empty_priority_class_arrays(d: Dict[str, list]) -> Dict[str, list]:
     for pc in PRIORITY_CLASS_MAP.keys():
         d[pc] = []
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 1af8d99d20..df0845edb2 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -48,7 +48,6 @@ from synapse.rest.client import (
     rendezvous,
     report_event,
     room,
-    room_batch,
     room_keys,
     room_upgrade_rest_servlet,
     sendtodevice,
@@ -132,7 +131,6 @@ class ClientRestResource(JsonResource):
         user_directory.register_servlets(hs, client_resource)
         if is_main_process:
             room_upgrade_rest_servlet.register_servlets(hs, client_resource)
-        room_batch.register_servlets(hs, client_resource)
         capabilities.register_servlets(hs, client_resource)
         if is_main_process:
             account_validity.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
deleted file mode 100644
index 69f85112d8..0000000000
--- a/synapse/rest/client/room_batch.py
+++ /dev/null
@@ -1,254 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import re
-from http import HTTPStatus
-from typing import TYPE_CHECKING, Tuple
-
-from synapse.api.constants import EventContentFields
-from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.http.server import HttpServer
-from synapse.http.servlet import (
-    RestServlet,
-    assert_params_in_dict,
-    parse_json_object_from_request,
-    parse_string,
-    parse_strings_from_args,
-)
-from synapse.http.site import SynapseRequest
-from synapse.types import JsonDict
-
-if TYPE_CHECKING:
-    from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class RoomBatchSendEventRestServlet(RestServlet):
-    """
-    API endpoint which can insert a batch of events historically back in time
-    next to the given `prev_event`.
-
-    `batch_id` comes from `next_batch_id `in the response of the batch send
-    endpoint and is derived from the "insertion" events added to each batch.
-    It's not required for the first batch send.
-
-    `state_events_at_start` is used to define the historical state events
-    needed to auth the events like join events. These events will float
-    outside of the normal DAG as outlier's and won't be visible in the chat
-    history which also allows us to insert multiple batches without having a bunch
-    of `@mxid joined the room` noise between each batch.
-
-    `events` is chronological list of events you want to insert.
-    There is a reverse-chronological constraint on batches so once you insert
-    some messages, you can only insert older ones after that.
-    tldr; Insert batches from your most recent history -> oldest history.
-
-    POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event_id=<eventID>&batch_id=<batchID>
-    {
-        "events": [ ... ],
-        "state_events_at_start": [ ... ]
-    }
-    """
-
-    PATTERNS = (
-        re.compile(
-            "^/_matrix/client/unstable/org.matrix.msc2716"
-            "/rooms/(?P<room_id>[^/]*)/batch_send$"
-        ),
-    )
-    CATEGORY = "Client API requests"
-
-    def __init__(self, hs: "HomeServer"):
-        super().__init__()
-        self.store = hs.get_datastores().main
-        self.event_creation_handler = hs.get_event_creation_handler()
-        self.auth = hs.get_auth()
-        self.room_batch_handler = hs.get_room_batch_handler()
-
-    async def on_POST(
-        self, request: SynapseRequest, room_id: str
-    ) -> Tuple[int, JsonDict]:
-        requester = await self.auth.get_user_by_req(request, allow_guest=False)
-
-        if not requester.app_service:
-            raise AuthError(
-                HTTPStatus.FORBIDDEN,
-                "Only application services can use the /batchsend endpoint",
-            )
-
-        body = parse_json_object_from_request(request)
-        assert_params_in_dict(body, ["state_events_at_start", "events"])
-
-        assert request.args is not None
-        prev_event_ids_from_query = parse_strings_from_args(
-            request.args, "prev_event_id"
-        )
-        batch_id_from_query = parse_string(request, "batch_id")
-
-        if prev_event_ids_from_query is None:
-            raise SynapseError(
-                HTTPStatus.BAD_REQUEST,
-                "prev_event query parameter is required when inserting historical messages back in time",
-                errcode=Codes.MISSING_PARAM,
-            )
-
-        if await self.store.is_partial_state_room(room_id):
-            raise SynapseError(
-                HTTPStatus.BAD_REQUEST,
-                "Cannot insert history batches until we have fully joined the room",
-                errcode=Codes.UNABLE_DUE_TO_PARTIAL_STATE,
-            )
-
-        # Verify the batch_id_from_query corresponds to an actual insertion event
-        # and have the batch connected.
-        if batch_id_from_query:
-            corresponding_insertion_event_id = (
-                await self.store.get_insertion_event_id_by_batch_id(
-                    room_id, batch_id_from_query
-                )
-            )
-            if corresponding_insertion_event_id is None:
-                raise SynapseError(
-                    HTTPStatus.BAD_REQUEST,
-                    "No insertion event corresponds to the given ?batch_id",
-                    errcode=Codes.INVALID_PARAM,
-                )
-
-        # Make sure that the prev_event_ids exist and aren't outliers - ie, they are
-        # regular parts of the room DAG where we know the state.
-        non_outlier_prev_events = await self.store.have_events_in_timeline(
-            prev_event_ids_from_query
-        )
-        for prev_event_id in prev_event_ids_from_query:
-            if prev_event_id not in non_outlier_prev_events:
-                raise SynapseError(
-                    HTTPStatus.BAD_REQUEST,
-                    "prev_event %s does not exist, or is an outlier" % (prev_event_id,),
-                    errcode=Codes.INVALID_PARAM,
-                )
-
-        # For the event we are inserting next to (`prev_event_ids_from_query`),
-        # find the most recent state events that allowed that message to be
-        # sent. We will use that as a base to auth our historical messages
-        # against.
-        state_event_ids = await self.room_batch_handler.get_most_recent_full_state_ids_from_event_id_list(
-            prev_event_ids_from_query
-        )
-
-        state_event_ids_at_start = []
-        # Create and persist all of the state events that float off on their own
-        # before the batch. These will most likely be all of the invite/member
-        # state events used to auth the upcoming historical messages.
-        if body["state_events_at_start"]:
-            state_event_ids_at_start = (
-                await self.room_batch_handler.persist_state_events_at_start(
-                    state_events_at_start=body["state_events_at_start"],
-                    room_id=room_id,
-                    initial_state_event_ids=state_event_ids,
-                    app_service_requester=requester,
-                )
-            )
-            # Update our ongoing auth event ID list with all of the new state we
-            # just created
-            state_event_ids.extend(state_event_ids_at_start)
-
-        inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids(
-            prev_event_ids_from_query
-        )
-
-        events_to_create = body["events"]
-
-        # Figure out which batch to connect to. If they passed in
-        # batch_id_from_query let's use it. The batch ID passed in comes
-        # from the batch_id in the "insertion" event from the previous batch.
-        last_event_in_batch = events_to_create[-1]
-        base_insertion_event = None
-        if batch_id_from_query:
-            batch_id_to_connect_to = batch_id_from_query
-        # Otherwise, create an insertion event to act as a starting point.
-        #
-        # We don't always have an insertion event to start hanging more history
-        # off of (ideally there would be one in the main DAG, but that's not the
-        # case if we're wanting to add history to e.g. existing rooms without
-        # an insertion event), in which case we just create a new insertion event
-        # that can then get pointed to by a "marker" event later.
-        else:
-            base_insertion_event_dict = (
-                self.room_batch_handler.create_insertion_event_dict(
-                    sender=requester.user.to_string(),
-                    room_id=room_id,
-                    origin_server_ts=last_event_in_batch["origin_server_ts"],
-                )
-            )
-            base_insertion_event_dict["prev_events"] = prev_event_ids_from_query.copy()
-
-            (
-                base_insertion_event,
-                _,
-            ) = await self.event_creation_handler.create_and_send_nonmember_event(
-                await self.room_batch_handler.create_requester_for_user_id_from_app_service(
-                    base_insertion_event_dict["sender"],
-                    requester.app_service,
-                ),
-                base_insertion_event_dict,
-                prev_event_ids=base_insertion_event_dict.get("prev_events"),
-                # Also set the explicit state here because we want to resolve
-                # any `state_events_at_start` here too. It's not strictly
-                # necessary to accomplish anything but if someone asks for the
-                # state at this point, we probably want to show them the
-                # historical state that was part of this batch.
-                state_event_ids=state_event_ids,
-                historical=True,
-                depth=inherited_depth,
-            )
-
-            batch_id_to_connect_to = base_insertion_event.content[
-                EventContentFields.MSC2716_NEXT_BATCH_ID
-            ]
-
-        # Create and persist all of the historical events as well as insertion
-        # and batch meta events to make the batch navigable in the DAG.
-        event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events(
-            events_to_create=events_to_create,
-            room_id=room_id,
-            batch_id_to_connect_to=batch_id_to_connect_to,
-            inherited_depth=inherited_depth,
-            initial_state_event_ids=state_event_ids,
-            app_service_requester=requester,
-        )
-
-        insertion_event_id = event_ids[0]
-        batch_event_id = event_ids[-1]
-        historical_event_ids = event_ids[1:-1]
-
-        response_dict = {
-            "state_event_ids": state_event_ids_at_start,
-            "event_ids": historical_event_ids,
-            "next_batch_id": next_batch_id,
-            "insertion_event_id": insertion_event_id,
-            "batch_event_id": batch_event_id,
-        }
-        if base_insertion_event is not None:
-            response_dict["base_insertion_event_id"] = base_insertion_event.event_id
-
-        return HTTPStatus.OK, response_dict
-
-
-def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
-    msc2716_enabled = hs.config.experimental.msc2716_enabled
-
-    if msc2716_enabled:
-        RoomBatchSendEventRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 1910648755..95400ba570 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -102,8 +102,6 @@ class VersionsRestServlet(RestServlet):
                     "org.matrix.msc2285.stable": True,  # TODO: Remove when MSC2285 becomes a part of the spec
                     # Supports filtering of /publicRooms by room type as per MSC3827
                     "org.matrix.msc3827.stable": True,
-                    # Adds support for importing historical messages as per MSC2716
-                    "org.matrix.msc2716": self.config.experimental.msc2716_enabled,
                     # Adds support for thread relations, per MSC3440.
                     "org.matrix.msc3440.stable": True,  # TODO: remove when "v1.3" is added above
                     # Support for thread read receipts & notification counts.
diff --git a/synapse/server.py b/synapse/server.py
index 0f36ef69cb..b72b76a38b 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -91,7 +91,6 @@ from synapse.handlers.room import (
     RoomShutdownHandler,
     TimestampLookupHandler,
 )
-from synapse.handlers.room_batch import RoomBatchHandler
 from synapse.handlers.room_list import RoomListHandler
 from synapse.handlers.room_member import (
     RoomForgetterHandler,
@@ -493,10 +492,6 @@ class HomeServer(metaclass=abc.ABCMeta):
         return RoomCreationHandler(self)
 
     @cache_in_self
-    def get_room_batch_handler(self) -> RoomBatchHandler:
-        return RoomBatchHandler(self)
-
-    @cache_in_self
     def get_room_shutdown_handler(self) -> RoomShutdownHandler:
         return RoomShutdownHandler(self)
 
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index f1d2c71c91..35c0680365 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -839,9 +839,8 @@ class EventsPersistenceStorageController:
                         "group" % (ev.event_id,)
                     )
                 continue
-
-            if ctx.prev_group:
-                state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
+            if ctx.state_group_deltas:
+                state_group_deltas.update(ctx.state_group_deltas)
 
         # We need to map the event_ids to their state groups. First, let's
         # check if the event is one we're persisting, in which case we can
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 10fa6c4802..7e49ae11bc 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1529,7 +1529,7 @@ class DatabasePool:
         # Lock the table just once, to prevent it being done once per row.
         # Note that, according to Postgres' documentation, once obtained,
         # the lock is held for the remainder of the current transaction.
-        self.engine.lock_table(txn, "user_ips")
+        self.engine.lock_table(txn, table)
 
         for keyv, valv in zip(key_values, value_values):
             _keys = dict(zip(key_names, keyv))
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 0032a92f49..3a10c265c9 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -61,7 +61,6 @@ from .registration import RegistrationStore
 from .rejections import RejectionsStore
 from .relations import RelationsStore
 from .room import RoomStore
-from .room_batch import RoomBatchStore
 from .roommember import RoomMemberStore
 from .search import SearchStore
 from .session import SessionStore
@@ -87,7 +86,6 @@ class DataStore(
     DeviceStore,
     RoomMemberStore,
     RoomStore,
-    RoomBatchStore,
     RegistrationStore,
     ProfileStore,
     PresenceStore,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 2681917d0b..8b6e3c1dc7 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -31,7 +31,7 @@ from typing import (
 import attr
 from prometheus_client import Counter, Gauge
 
-from synapse.api.constants import MAX_DEPTH, EventTypes
+from synapse.api.constants import MAX_DEPTH
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import EventFormatVersions, RoomVersion
 from synapse.events import EventBase, make_event_from_dict
@@ -891,124 +891,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             room_id,
         )
 
-    @trace
-    async def get_insertion_event_backward_extremities_in_room(
-        self,
-        room_id: str,
-        current_depth: int,
-        limit: int,
-    ) -> List[Tuple[str, int]]:
-        """
-        Get the insertion events we know about that we haven't backfilled yet
-        along with the approximate depth. Only returns insertion events that are
-        at a depth lower than or equal to the `current_depth`. Sorted by depth,
-        highest to lowest (descending) so the closest events to the
-        `current_depth` are first in the list.
-
-        We ignore insertion events that are newer than the user's current scroll
-        position (ie, those with depth greater than `current_depth`) as:
-            1. we don't really care about getting events that have happened
-               after our current position; and
-            2. by the nature of paginating and scrolling back, we have likely
-               previously tried and failed to backfill from that insertion event, so
-               to avoid getting "stuck" requesting the same backfill repeatedly
-               we drop those insertion event.
-
-        Args:
-            room_id: Room where we want to find the oldest events
-            current_depth: The depth at the user's current scrollback position
-            limit: The max number of insertion event extremities to return
-
-        Returns:
-            List of (event_id, depth) tuples. Sorted by depth, highest to lowest
-            (descending) so the closest events to the `current_depth` are first
-            in the list.
-        """
-
-        def get_insertion_event_backward_extremities_in_room_txn(
-            txn: LoggingTransaction, room_id: str
-        ) -> List[Tuple[str, int]]:
-            if isinstance(self.database_engine, PostgresEngine):
-                least_function = "LEAST"
-            elif isinstance(self.database_engine, Sqlite3Engine):
-                least_function = "MIN"
-            else:
-                raise RuntimeError("Unknown database engine")
-
-            sql = f"""
-                SELECT
-                    insertion_event_extremity.event_id, event.depth
-                /* We only want insertion events that are also marked as backwards extremities */
-                FROM insertion_event_extremities AS insertion_event_extremity
-                /* Get the depth of the insertion event from the events table */
-                INNER JOIN events AS event USING (event_id)
-                /**
-                 * We use this info to make sure we don't retry to use a backfill point
-                 * if we've already attempted to backfill from it recently.
-                 */
-                LEFT JOIN event_failed_pull_attempts AS failed_backfill_attempt_info
-                ON
-                    failed_backfill_attempt_info.room_id = insertion_event_extremity.room_id
-                    AND failed_backfill_attempt_info.event_id = insertion_event_extremity.event_id
-                WHERE
-                    insertion_event_extremity.room_id = ?
-                    /**
-                     * We only want extremities that are older than or at
-                     * the same position of the given `current_depth` (where older
-                     * means less than the given depth) because we're looking backwards
-                     * from the `current_depth` when backfilling.
-                     *
-                     *                         current_depth (ignore events that come after this, ignore 2-4)
-                     *                         |
-                     *                         ▼
-                     * <oldest-in-time> [0]<--[1]<--[2]<--[3]<--[4] <newest-in-time>
-                     */
-                    AND event.depth <= ? /* current_depth */
-                    /**
-                     * Exponential back-off (up to the upper bound) so we don't retry the
-                     * same backfill point over and over. ex. 2hr, 4hr, 8hr, 16hr, etc
-                     *
-                     * We use `1 << n` as a power of 2 equivalent for compatibility
-                     * with older SQLites. The left shift equivalent only works with
-                     * powers of 2 because left shift is a binary operation (base-2).
-                     * Otherwise, we would use `power(2, n)` or the power operator, `2^n`.
-                     */
-                    AND (
-                        failed_backfill_attempt_info.event_id IS NULL
-                        OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
-                            (1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
-                            * ? /* step */
-                        )
-                    )
-                /**
-                 * Sort from highest (closest to the `current_depth`) to the lowest depth
-                 * because the closest are most relevant to backfill from first.
-                 * Then tie-break on alphabetical order of the event_ids so we get a
-                 * consistent ordering which is nice when asserting things in tests.
-                 */
-                ORDER BY event.depth DESC, insertion_event_extremity.event_id DESC
-                LIMIT ?
-            """
-
-            txn.execute(
-                sql,
-                (
-                    room_id,
-                    current_depth,
-                    self._clock.time_msec(),
-                    BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
-                    BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
-                    limit,
-                ),
-            )
-            return cast(List[Tuple[str, int]], txn.fetchall())
-
-        return await self.db_pool.runInteraction(
-            "get_insertion_event_backward_extremities_in_room",
-            get_insertion_event_backward_extremities_in_room_txn,
-            room_id,
-        )
-
     async def get_max_depth_of(
         self, event_ids: Collection[str]
     ) -> Tuple[Optional[str], int]:
@@ -1280,50 +1162,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
         return event_ids
 
-    def _get_connected_batch_event_backfill_results_txn(
-        self, txn: LoggingTransaction, insertion_event_id: str, limit: int
-    ) -> List[BackfillQueueNavigationItem]:
-        """
-        Find any batch connections of a given insertion event.
-        A batch event points at a insertion event via:
-        batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID]
-
-        Args:
-            txn: The database transaction to use
-            insertion_event_id: The event ID to navigate from. We will find
-                batch events that point back at this insertion event.
-            limit: Max number of event ID's to query for and return
-
-        Returns:
-            List of batch events that the backfill queue can process
-        """
-        batch_connection_query = """
-            SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
-            /* Find the batch that connects to the given insertion event */
-            INNER JOIN batch_events AS c
-            ON i.next_batch_id = c.batch_id
-            /* Get the depth of the batch start event from the events table */
-            INNER JOIN events AS e ON c.event_id = e.event_id
-            /* Find an insertion event which matches the given event_id */
-            WHERE i.event_id = ?
-            LIMIT ?
-        """
-
-        # Find any batch connections for the given insertion event
-        txn.execute(
-            batch_connection_query,
-            (insertion_event_id, limit),
-        )
-        return [
-            BackfillQueueNavigationItem(
-                depth=row[0],
-                stream_ordering=row[1],
-                event_id=row[2],
-                type=row[3],
-            )
-            for row in txn
-        ]
-
     def _get_connected_prev_event_backfill_results_txn(
         self, txn: LoggingTransaction, event_id: str, limit: int
     ) -> List[BackfillQueueNavigationItem]:
@@ -1472,40 +1310,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
             event_id_results.add(event_id)
 
-            # Try and find any potential historical batches of message history.
-            if self.hs.config.experimental.msc2716_enabled:
-                # We need to go and try to find any batch events connected
-                # to a given insertion event (by batch_id). If we find any, we'll
-                # add them to the queue and navigate up the DAG like normal in the
-                # next iteration of the loop.
-                if event_type == EventTypes.MSC2716_INSERTION:
-                    # Find any batch connections for the given insertion event
-                    connected_batch_event_backfill_results = (
-                        self._get_connected_batch_event_backfill_results_txn(
-                            txn, event_id, limit - len(event_id_results)
-                        )
-                    )
-                    logger.debug(
-                        "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
-                        room_id,
-                        connected_batch_event_backfill_results,
-                    )
-                    for (
-                        connected_batch_event_backfill_item
-                    ) in connected_batch_event_backfill_results:
-                        if (
-                            connected_batch_event_backfill_item.event_id
-                            not in event_id_results
-                        ):
-                            queue.put(
-                                (
-                                    -connected_batch_event_backfill_item.depth,
-                                    -connected_batch_event_backfill_item.stream_ordering,
-                                    connected_batch_event_backfill_item.event_id,
-                                    connected_batch_event_backfill_item.type,
-                                )
-                            )
-
             # Now we just look up the DAG by prev_events as normal
             connected_prev_event_backfill_results = (
                 self._get_connected_prev_event_backfill_results_txn(
@@ -1748,19 +1552,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             _delete_old_forward_extrem_cache_txn,
         )
 
-    @trace
-    async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
-        await self.db_pool.simple_upsert(
-            table="insertion_event_extremities",
-            keyvalues={"event_id": event_id},
-            values={
-                "event_id": event_id,
-                "room_id": room_id,
-            },
-            insertion_values={},
-            desc="insert_insertion_extremity",
-        )
-
     async def insert_received_event_to_staging(
         self, origin: str, event: EventBase
     ) -> None:
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index e2e6eb479f..5c9db7554e 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1664,9 +1664,6 @@ class PersistEventsStore:
 
             self._handle_event_relations(txn, event)
 
-            self._handle_insertion_event(txn, event)
-            self._handle_batch_event(txn, event)
-
             # Store the labels for this event.
             labels = event.content.get(EventContentFields.LABELS)
             if labels:
@@ -1729,13 +1726,22 @@ class PersistEventsStore:
             if not row["rejects"] and not row["redacts"]:
                 to_prefill.append(EventCacheEntry(event=event, redacted_event=None))
 
-        async def prefill() -> None:
+        async def external_prefill() -> None:
+            for cache_entry in to_prefill:
+                await self.store._get_event_cache.set_external(
+                    (cache_entry.event.event_id,), cache_entry
+                )
+
+        def local_prefill() -> None:
             for cache_entry in to_prefill:
-                await self.store._get_event_cache.set(
+                self.store._get_event_cache.set_local(
                     (cache_entry.event.event_id,), cache_entry
                 )
 
-        txn.async_call_after(prefill)
+        # The order these are called here is not as important as knowing that after the
+        # transaction is finished, the async_call_after will run before the call_after.
+        txn.async_call_after(external_prefill)
+        txn.call_after(local_prefill)
 
     def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
         assert event.redacts is not None
@@ -1918,128 +1924,6 @@ class PersistEventsStore:
                 ),
             )
 
-    def _handle_insertion_event(
-        self, txn: LoggingTransaction, event: EventBase
-    ) -> None:
-        """Handles keeping track of insertion events and edges/connections.
-        Part of MSC2716.
-
-        Args:
-            txn: The database transaction object
-            event: The event to process
-        """
-
-        if event.type != EventTypes.MSC2716_INSERTION:
-            # Not a insertion event
-            return
-
-        # Skip processing an insertion event if the room version doesn't
-        # support it or the event is not from the room creator.
-        room_version = self.store.get_room_version_txn(txn, event.room_id)
-        room_creator = self.db_pool.simple_select_one_onecol_txn(
-            txn,
-            table="rooms",
-            keyvalues={"room_id": event.room_id},
-            retcol="creator",
-            allow_none=True,
-        )
-        if not room_version.msc2716_historical and (
-            not self.hs.config.experimental.msc2716_enabled
-            or event.sender != room_creator
-        ):
-            return
-
-        next_batch_id = event.content.get(EventContentFields.MSC2716_NEXT_BATCH_ID)
-        if next_batch_id is None:
-            # Invalid insertion event without next batch ID
-            return
-
-        logger.debug(
-            "_handle_insertion_event (next_batch_id=%s) %s", next_batch_id, event
-        )
-
-        # Keep track of the insertion event and the batch ID
-        self.db_pool.simple_insert_txn(
-            txn,
-            table="insertion_events",
-            values={
-                "event_id": event.event_id,
-                "room_id": event.room_id,
-                "next_batch_id": next_batch_id,
-            },
-        )
-
-        # Insert an edge for every prev_event connection
-        for prev_event_id in event.prev_event_ids():
-            self.db_pool.simple_insert_txn(
-                txn,
-                table="insertion_event_edges",
-                values={
-                    "event_id": event.event_id,
-                    "room_id": event.room_id,
-                    "insertion_prev_event_id": prev_event_id,
-                },
-            )
-
-    def _handle_batch_event(self, txn: LoggingTransaction, event: EventBase) -> None:
-        """Handles inserting the batch edges/connections between the batch event
-        and an insertion event. Part of MSC2716.
-
-        Args:
-            txn: The database transaction object
-            event: The event to process
-        """
-
-        if event.type != EventTypes.MSC2716_BATCH:
-            # Not a batch event
-            return
-
-        # Skip processing a batch event if the room version doesn't
-        # support it or the event is not from the room creator.
-        room_version = self.store.get_room_version_txn(txn, event.room_id)
-        room_creator = self.db_pool.simple_select_one_onecol_txn(
-            txn,
-            table="rooms",
-            keyvalues={"room_id": event.room_id},
-            retcol="creator",
-            allow_none=True,
-        )
-        if not room_version.msc2716_historical and (
-            not self.hs.config.experimental.msc2716_enabled
-            or event.sender != room_creator
-        ):
-            return
-
-        batch_id = event.content.get(EventContentFields.MSC2716_BATCH_ID)
-        if batch_id is None:
-            # Invalid batch event without a batch ID
-            return
-
-        logger.debug("_handle_batch_event batch_id=%s %s", batch_id, event)
-
-        # Keep track of the insertion event and the batch ID
-        self.db_pool.simple_insert_txn(
-            txn,
-            table="batch_events",
-            values={
-                "event_id": event.event_id,
-                "room_id": event.room_id,
-                "batch_id": batch_id,
-            },
-        )
-
-        # When we receive an event with a `batch_id` referencing the
-        # `next_batch_id` of the insertion event, we can remove it from the
-        # `insertion_event_extremities` table.
-        sql = """
-            DELETE FROM insertion_event_extremities WHERE event_id IN (
-                SELECT event_id FROM insertion_events
-                WHERE next_batch_id = ?
-            )
-        """
-
-        txn.execute(sql, (batch_id,))
-
     def _handle_redact_relations(
         self, txn: LoggingTransaction, room_id: str, redacted_event_id: str
     ) -> None:
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index d93ffc4efa..7e7648c951 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -883,7 +883,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
         """
-        Invalidates an event in the asyncronous get event cache, which may be remote.
+        Invalidates an event in the asynchronous get event cache, which may be remote.
 
         Arguments:
             event_id: the event ID to invalidate
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
deleted file mode 100644
index 131f357d04..0000000000
--- a/synapse/storage/databases/main/room_batch.py
+++ /dev/null
@@ -1,47 +0,0 @@
-# Copyright 2021 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import Optional
-
-from synapse.storage._base import SQLBaseStore
-
-
-class RoomBatchStore(SQLBaseStore):
-    async def get_insertion_event_id_by_batch_id(
-        self, room_id: str, batch_id: str
-    ) -> Optional[str]:
-        """Retrieve a insertion event ID.
-
-        Args:
-            batch_id: The batch ID of the insertion event to retrieve.
-
-        Returns:
-            The event_id of an insertion event, or None if there is no known
-            insertion event for the given insertion event.
-        """
-        return await self.db_pool.simple_select_one_onecol(
-            table="insertion_events",
-            keyvalues={"room_id": room_id, "next_batch_id": batch_id},
-            retcol="event_id",
-            allow_none=True,
-        )
-
-    async def store_state_group_id_for_event_id(
-        self, event_id: str, state_group_id: int
-    ) -> None:
-        await self.db_pool.simple_upsert(
-            table="event_to_state_groups",
-            keyvalues={"event_id": event_id},
-            values={"state_group": state_group_id, "event_id": event_id},
-        )
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 7ea0c4c36b..9f3b8741c1 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -116,6 +116,11 @@ class Clock:
 
         Waits `msec` initially before calling `f` for the first time.
 
+        If the function given to `looping_call` returns an awaitable/deferred, the next
+        call isn't scheduled until after the returned awaitable has finished. We get
+        this functionality thanks to this function being a thin wrapper around
+        `twisted.internet.task.LoopingCall`.
+
         Note that the function will be called with no logcontext, so if it is anything
         other than trivial, you probably want to wrap it in run_as_background_process.
 
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 6137c85e10..be6554319a 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -842,7 +842,13 @@ class AsyncLruCache(Generic[KT, VT]):
         return self._lru_cache.get(key, update_metrics=update_metrics)
 
     async def set(self, key: KT, value: VT) -> None:
-        self._lru_cache.set(key, value)
+        # This will add the entries in the correct order, local first external second
+        self.set_local(key, value)
+        await self.set_external(key, value)
+
+    async def set_external(self, key: KT, value: VT) -> None:
+        # This method should add an entry to any configured external cache, in this case noop.
+        pass
 
     def set_local(self, key: KT, value: VT) -> None:
         self._lru_cache.set(key, value)
diff --git a/tests/events/test_snapshot.py b/tests/events/test_snapshot.py
index 6687c28e8f..b5e42f9600 100644
--- a/tests/events/test_snapshot.py
+++ b/tests/events/test_snapshot.py
@@ -101,8 +101,7 @@ class TestEventContext(unittest.HomeserverTestCase):
         self.assertEqual(
             context.state_group_before_event, d_context.state_group_before_event
         )
-        self.assertEqual(context.prev_group, d_context.prev_group)
-        self.assertEqual(context.delta_ids, d_context.delta_ids)
+        self.assertEqual(context.state_group_deltas, d_context.state_group_deltas)
         self.assertEqual(context.app_service, d_context.app_service)
 
         self.assertEqual(
diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index 391ae51707..b290b020a2 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -431,28 +431,24 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
         # ACT: call _wake_destinations_needing_catchup
 
         # patch wake_destination to just count the destinations instead
-        woken = []
+        woken = set()
 
         def wake_destination_track(destination: str) -> None:
-            woken.append(destination)
+            woken.add(destination)
 
         self.federation_sender.wake_destination = wake_destination_track  # type: ignore[assignment]
 
-        # cancel the pre-existing timer for _wake_destinations_needing_catchup
-        # this is because we are calling it manually rather than waiting for it
-        # to be called automatically
-        assert self.federation_sender._catchup_after_startup_timer is not None
-        self.federation_sender._catchup_after_startup_timer.cancel()
-
-        self.get_success(
-            self.federation_sender._wake_destinations_needing_catchup(), by=5.0
-        )
+        # We wait quite long so that all dests can be woken up, since there is a delay
+        # between them.
+        self.pump(by=5.0)
 
         # ASSERT (_wake_destinations_needing_catchup):
         # - all remotes are woken up, save for zzzerver
         self.assertNotIn("zzzerver", woken)
-        # - all destinations are woken exactly once; they appear once in woken.
-        self.assertCountEqual(woken, server_names[:-1])
+        # - all destinations are woken, potentially more than once, since the
+        # wake up is called regularly and we don't ack in this test that a transaction
+        # has been successfully sent.
+        self.assertCountEqual(woken, set(server_names[:-1]))
 
     def test_not_latest_event(self) -> None:
         """Test that we send the latest event in the room even if its not ours."""
diff --git a/tests/federation/test_federation_client.py b/tests/federation/test_federation_client.py
index 91694e4fca..a45ab83683 100644
--- a/tests/federation/test_federation_client.py
+++ b/tests/federation/test_federation_client.py
@@ -124,7 +124,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
         # check the right call got made to the agent
         self._mock_agent.request.assert_called_once_with(
             b"GET",
-            b"matrix://yet.another.server/_matrix/federation/v1/state/%21room_id?event_id=event_id",
+            b"matrix-federation://yet.another.server/_matrix/federation/v1/state/%21room_id?event_id=event_id",
             headers=mock.ANY,
             bodyProducer=None,
         )
@@ -232,7 +232,7 @@ class FederationClientTest(FederatingHomeserverTestCase):
         # check the right call got made to the agent
         self._mock_agent.request.assert_called_once_with(
             b"GET",
-            b"matrix://yet.another.server/_matrix/federation/v1/event/event_id",
+            b"matrix-federation://yet.another.server/_matrix/federation/v1/event/event_id",
             headers=mock.ANY,
             bodyProducer=None,
         )
diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py
index 105b4caefa..aed2a4c07a 100644
--- a/tests/http/federation/test_matrix_federation_agent.py
+++ b/tests/http/federation/test_matrix_federation_agent.py
@@ -292,7 +292,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.agent = self._make_agent()
 
         self.reactor.lookups["testserv"] = "1.2.3.4"
-        test_d = self._make_get_request(b"matrix://testserv:8448/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://testserv:8448/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -393,7 +393,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
 
         self.reactor.lookups["testserv"] = "1.2.3.4"
         self.reactor.lookups["proxy.com"] = "9.9.9.9"
-        test_d = self._make_get_request(b"matrix://testserv:8448/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://testserv:8448/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -532,7 +532,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         # there will be a getaddrinfo on the IP
         self.reactor.lookups["1.2.3.4"] = "1.2.3.4"
 
-        test_d = self._make_get_request(b"matrix://1.2.3.4/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://1.2.3.4/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -568,7 +568,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         # there will be a getaddrinfo on the IP
         self.reactor.lookups["::1"] = "::1"
 
-        test_d = self._make_get_request(b"matrix://[::1]/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://[::1]/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -604,7 +604,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         # there will be a getaddrinfo on the IP
         self.reactor.lookups["::1"] = "::1"
 
-        test_d = self._make_get_request(b"matrix://[::1]:80/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://[::1]:80/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -639,7 +639,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.mock_resolver.resolve_service.side_effect = generate_resolve_service([])
         self.reactor.lookups["testserv1"] = "1.2.3.4"
 
-        test_d = self._make_get_request(b"matrix://testserv1/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://testserv1/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -693,7 +693,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         # there will be a getaddrinfo on the IP
         self.reactor.lookups["1.2.3.5"] = "1.2.3.5"
 
-        test_d = self._make_get_request(b"matrix://1.2.3.5/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://1.2.3.5/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -725,7 +725,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.mock_resolver.resolve_service.side_effect = generate_resolve_service([])
         self.reactor.lookups["testserv"] = "1.2.3.4"
 
-        test_d = self._make_get_request(b"matrix://testserv/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -780,7 +780,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.reactor.lookups["testserv"] = "1.2.3.4"
         self.reactor.lookups["target-server"] = "1::f"
 
-        test_d = self._make_get_request(b"matrix://testserv/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -844,7 +844,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.reactor.lookups["testserv"] = "1.2.3.4"
         self.reactor.lookups["target-server"] = "1::f"
 
-        test_d = self._make_get_request(b"matrix://testserv/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -933,7 +933,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.mock_resolver.resolve_service.side_effect = generate_resolve_service([])
         self.reactor.lookups["testserv"] = "1.2.3.4"
 
-        test_d = self._make_get_request(b"matrix://testserv/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -1009,7 +1009,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
             ),
         )
 
-        test_d = agent.request(b"GET", b"matrix://testserv/foo/bar")
+        test_d = agent.request(b"GET", b"matrix-federation://testserv/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -1042,7 +1042,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         )
         self.reactor.lookups["srvtarget"] = "1.2.3.4"
 
-        test_d = self._make_get_request(b"matrix://testserv/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -1082,7 +1082,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.reactor.lookups["testserv"] = "1.2.3.4"
         self.reactor.lookups["srvtarget"] = "5.6.7.8"
 
-        test_d = self._make_get_request(b"matrix://testserv/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -1143,7 +1143,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.reactor.lookups["xn--bcher-kva.com"] = "1.2.3.4"
 
         # this is idna for bücher.com
-        test_d = self._make_get_request(b"matrix://xn--bcher-kva.com/foo/bar")
+        test_d = self._make_get_request(
+            b"matrix-federation://xn--bcher-kva.com/foo/bar"
+        )
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -1204,7 +1206,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
         )
         self.reactor.lookups["xn--trget-3qa.com"] = "1.2.3.4"
 
-        test_d = self._make_get_request(b"matrix://xn--bcher-kva.com/foo/bar")
+        test_d = self._make_get_request(
+            b"matrix-federation://xn--bcher-kva.com/foo/bar"
+        )
 
         # Nothing happened yet
         self.assertNoResult(test_d)
@@ -1411,7 +1415,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         )
         self.reactor.lookups["target.com"] = "1.2.3.4"
 
-        test_d = self._make_get_request(b"matrix://testserv/foo/bar")
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
 
         # Nothing happened yet
         self.assertNoResult(test_d)
diff --git a/tests/media/test_base.py b/tests/media/test_base.py
index 66498c744d..4728c80969 100644
--- a/tests/media/test_base.py
+++ b/tests/media/test_base.py
@@ -20,12 +20,12 @@ from tests import unittest
 class GetFileNameFromHeadersTests(unittest.TestCase):
     # input -> expected result
     TEST_CASES = {
-        b"inline; filename=abc.txt": "abc.txt",
-        b'inline; filename="azerty"': "azerty",
-        b'inline; filename="aze%20rty"': "aze%20rty",
-        b'inline; filename="aze"rty"': 'aze"rty',
-        b'inline; filename="azer;ty"': "azer;ty",
-        b"inline; filename*=utf-8''foo%C2%A3bar": "foo£bar",
+        b"attachment; filename=abc.txt": "abc.txt",
+        b'attachment; filename="azerty"': "azerty",
+        b'attachment; filename="aze%20rty"': "aze%20rty",
+        b'attachment; filename="aze"rty"': 'aze"rty',
+        b'attachment; filename="azer;ty"': "azer;ty",
+        b"attachment; filename*=utf-8''foo%C2%A3bar": "foo£bar",
     }
 
     def tests(self) -> None:
diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py
index f0f2da65db..ea0051dde4 100644
--- a/tests/media/test_media_storage.py
+++ b/tests/media/test_media_storage.py
@@ -317,7 +317,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
 
     def test_handle_missing_content_type(self) -> None:
         channel = self._req(
-            b"inline; filename=out" + self.test_image.extension,
+            b"attachment; filename=out" + self.test_image.extension,
             include_content_type=False,
         )
         headers = channel.headers
@@ -331,7 +331,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
         If the filename is filename=<ascii> then Synapse will decode it as an
         ASCII string, and use filename= in the response.
         """
-        channel = self._req(b"inline; filename=out" + self.test_image.extension)
+        channel = self._req(b"attachment; filename=out" + self.test_image.extension)
 
         headers = channel.headers
         self.assertEqual(
@@ -339,7 +339,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
         )
         self.assertEqual(
             headers.getRawHeaders(b"Content-Disposition"),
-            [b"inline; filename=out" + self.test_image.extension],
+            [b"attachment; filename=out" + self.test_image.extension],
         )
 
     def test_disposition_filenamestar_utf8escaped(self) -> None:
@@ -350,7 +350,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
         """
         filename = parse.quote("\u2603".encode()).encode("ascii")
         channel = self._req(
-            b"inline; filename*=utf-8''" + filename + self.test_image.extension
+            b"attachment; filename*=utf-8''" + filename + self.test_image.extension
         )
 
         headers = channel.headers
@@ -359,13 +359,13 @@ class MediaRepoTests(unittest.HomeserverTestCase):
         )
         self.assertEqual(
             headers.getRawHeaders(b"Content-Disposition"),
-            [b"inline; filename*=utf-8''" + filename + self.test_image.extension],
+            [b"attachment; filename*=utf-8''" + filename + self.test_image.extension],
         )
 
     def test_disposition_none(self) -> None:
         """
-        If there is no filename, one isn't passed on in the Content-Disposition
-        of the request.
+        If there is no filename, Content-Disposition should only
+        be a disposition type.
         """
         channel = self._req(None)
 
@@ -373,7 +373,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
         self.assertEqual(
             headers.getRawHeaders(b"Content-Type"), [self.test_image.content_type]
         )
-        self.assertEqual(headers.getRawHeaders(b"Content-Disposition"), None)
+        self.assertEqual(headers.getRawHeaders(b"Content-Disposition"), [b"attachment"])
 
     def test_thumbnail_crop(self) -> None:
         """Test that a cropped remote thumbnail is available."""
@@ -612,7 +612,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
         Tests that the `X-Robots-Tag` header is present, which informs web crawlers
         to not index, archive, or follow links in media.
         """
-        channel = self._req(b"inline; filename=out" + self.test_image.extension)
+        channel = self._req(b"attachment; filename=out" + self.test_image.extension)
 
         headers = channel.headers
         self.assertEqual(
@@ -625,7 +625,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
         Test that the Cross-Origin-Resource-Policy header is set to "cross-origin"
         allowing web clients to embed media from the downloads API.
         """
-        channel = self._req(b"inline; filename=out" + self.test_image.extension)
+        channel = self._req(b"attachment; filename=out" + self.test_image.extension)
 
         headers = channel.headers
 
diff --git a/tests/rest/client/test_push_rule_attrs.py b/tests/rest/client/test_push_rule_attrs.py
index 4f875b9289..5aca74475f 100644
--- a/tests/rest/client/test_push_rule_attrs.py
+++ b/tests/rest/client/test_push_rule_attrs.py
@@ -412,3 +412,70 @@ class PushRuleAttributesTestCase(HomeserverTestCase):
         )
         self.assertEqual(channel.code, 404)
         self.assertEqual(channel.json_body["errcode"], Codes.NOT_FOUND)
+
+    def test_contains_user_name(self) -> None:
+        """
+        Tests that `contains_user_name` rule is present and have proper value in `pattern`.
+        """
+        username = "bob"
+        self.register_user(username, "pass")
+        token = self.login(username, "pass")
+
+        channel = self.make_request(
+            "GET",
+            "/pushrules/global/content/.m.rule.contains_user_name",
+            access_token=token,
+        )
+
+        self.assertEqual(channel.code, 200)
+
+        self.assertEqual(
+            {
+                "rule_id": ".m.rule.contains_user_name",
+                "default": True,
+                "enabled": True,
+                "pattern": username,
+                "actions": [
+                    "notify",
+                    {"set_tweak": "highlight"},
+                    {"set_tweak": "sound", "value": "default"},
+                ],
+            },
+            channel.json_body,
+        )
+
+    def test_is_user_mention(self) -> None:
+        """
+        Tests that `is_user_mention` rule is present and have proper value in `value`.
+        """
+        user = self.register_user("bob", "pass")
+        token = self.login("bob", "pass")
+
+        channel = self.make_request(
+            "GET",
+            "/pushrules/global/override/.m.rule.is_user_mention",
+            access_token=token,
+        )
+
+        self.assertEqual(channel.code, 200)
+
+        self.assertEqual(
+            {
+                "rule_id": ".m.rule.is_user_mention",
+                "default": True,
+                "enabled": True,
+                "conditions": [
+                    {
+                        "kind": "event_property_contains",
+                        "key": "content.m\\.mentions.user_ids",
+                        "value": user,
+                    }
+                ],
+                "actions": [
+                    "notify",
+                    {"set_tweak": "highlight"},
+                    {"set_tweak": "sound", "value": "default"},
+                ],
+            },
+            channel.json_body,
+        )
diff --git a/tests/rest/client/test_room_batch.py b/tests/rest/client/test_room_batch.py
deleted file mode 100644
index 9d5cb60d16..0000000000
--- a/tests/rest/client/test_room_batch.py
+++ /dev/null
@@ -1,302 +0,0 @@
-import logging
-from typing import List, Tuple
-from unittest.mock import Mock, patch
-
-from twisted.test.proto_helpers import MemoryReactor
-
-from synapse.api.constants import EventContentFields, EventTypes
-from synapse.appservice import ApplicationService
-from synapse.rest import admin
-from synapse.rest.client import login, register, room, room_batch, sync
-from synapse.server import HomeServer
-from synapse.types import JsonDict, RoomStreamToken
-from synapse.util import Clock
-
-from tests import unittest
-
-logger = logging.getLogger(__name__)
-
-
-def _create_join_state_events_for_batch_send_request(
-    virtual_user_ids: List[str],
-    insert_time: int,
-) -> List[JsonDict]:
-    return [
-        {
-            "type": EventTypes.Member,
-            "sender": virtual_user_id,
-            "origin_server_ts": insert_time,
-            "content": {
-                "membership": "join",
-                "displayname": "display-name-for-%s" % (virtual_user_id,),
-            },
-            "state_key": virtual_user_id,
-        }
-        for virtual_user_id in virtual_user_ids
-    ]
-
-
-def _create_message_events_for_batch_send_request(
-    virtual_user_id: str, insert_time: int, count: int
-) -> List[JsonDict]:
-    return [
-        {
-            "type": EventTypes.Message,
-            "sender": virtual_user_id,
-            "origin_server_ts": insert_time,
-            "content": {
-                "msgtype": "m.text",
-                "body": "Historical %d" % (i),
-                EventContentFields.MSC2716_HISTORICAL: True,
-            },
-        }
-        for i in range(count)
-    ]
-
-
-class RoomBatchTestCase(unittest.HomeserverTestCase):
-    """Test importing batches of historical messages."""
-
-    servlets = [
-        admin.register_servlets_for_client_rest_resource,
-        room_batch.register_servlets,
-        room.register_servlets,
-        register.register_servlets,
-        login.register_servlets,
-        sync.register_servlets,
-    ]
-
-    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-        config = self.default_config()
-
-        self.appservice = ApplicationService(
-            token="i_am_an_app_service",
-            id="1234",
-            namespaces={"users": [{"regex": r"@as_user.*", "exclusive": True}]},
-            # Note: this user does not have to match the regex above
-            sender="@as_main:test",
-        )
-
-        mock_load_appservices = Mock(return_value=[self.appservice])
-        with patch(
-            "synapse.storage.databases.main.appservice.load_appservices",
-            mock_load_appservices,
-        ):
-            hs = self.setup_test_homeserver(config=config)
-        return hs
-
-    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
-        self.clock = clock
-        self._storage_controllers = hs.get_storage_controllers()
-
-        self.virtual_user_id, _ = self.register_appservice_user(
-            "as_user_potato", self.appservice.token
-        )
-
-    def _create_test_room(self) -> Tuple[str, str, str, str]:
-        room_id = self.helper.create_room_as(
-            self.appservice.sender, tok=self.appservice.token
-        )
-
-        res_a = self.helper.send_event(
-            room_id=room_id,
-            type=EventTypes.Message,
-            content={
-                "msgtype": "m.text",
-                "body": "A",
-            },
-            tok=self.appservice.token,
-        )
-        event_id_a = res_a["event_id"]
-
-        res_b = self.helper.send_event(
-            room_id=room_id,
-            type=EventTypes.Message,
-            content={
-                "msgtype": "m.text",
-                "body": "B",
-            },
-            tok=self.appservice.token,
-        )
-        event_id_b = res_b["event_id"]
-
-        res_c = self.helper.send_event(
-            room_id=room_id,
-            type=EventTypes.Message,
-            content={
-                "msgtype": "m.text",
-                "body": "C",
-            },
-            tok=self.appservice.token,
-        )
-        event_id_c = res_c["event_id"]
-
-        return room_id, event_id_a, event_id_b, event_id_c
-
-    @unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
-    def test_same_state_groups_for_whole_historical_batch(self) -> None:
-        """Make sure that when using the `/batch_send` endpoint to import a
-        bunch of historical messages, it re-uses the same `state_group` across
-        the whole batch. This is an easy optimization to make sure we're getting
-        right because the state for the whole batch is contained in
-        `state_events_at_start` and can be shared across everything.
-        """
-
-        time_before_room = int(self.clock.time_msec())
-        room_id, event_id_a, _, _ = self._create_test_room()
-
-        channel = self.make_request(
-            "POST",
-            "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
-            % (room_id, event_id_a),
-            content={
-                "events": _create_message_events_for_batch_send_request(
-                    self.virtual_user_id, time_before_room, 3
-                ),
-                "state_events_at_start": _create_join_state_events_for_batch_send_request(
-                    [self.virtual_user_id], time_before_room
-                ),
-            },
-            access_token=self.appservice.token,
-        )
-        self.assertEqual(channel.code, 200, channel.result)
-
-        # Get the historical event IDs that we just imported
-        historical_event_ids = channel.json_body["event_ids"]
-        self.assertEqual(len(historical_event_ids), 3)
-
-        # Fetch the state_groups
-        state_group_map = self.get_success(
-            self._storage_controllers.state.get_state_groups_ids(
-                room_id, historical_event_ids
-            )
-        )
-
-        # We expect all of the historical events to be using the same state_group
-        # so there should only be a single state_group here!
-        self.assertEqual(
-            len(state_group_map.keys()),
-            1,
-            "Expected a single state_group to be returned by saw state_groups=%s"
-            % (state_group_map.keys(),),
-        )
-
-    @unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
-    def test_sync_while_batch_importing(self) -> None:
-        """
-        Make sure that /sync correctly returns full room state when a user joins
-        during ongoing batch backfilling.
-        See: https://github.com/matrix-org/synapse/issues/12281
-        """
-        # Create user who will be invited & join room
-        user_id = self.register_user("beep", "test")
-        user_tok = self.login("beep", "test")
-
-        time_before_room = int(self.clock.time_msec())
-
-        # Create a room with some events
-        room_id, _, _, _ = self._create_test_room()
-        # Invite the user
-        self.helper.invite(
-            room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id
-        )
-
-        # Create another room, send a bunch of events to advance the stream token
-        other_room_id = self.helper.create_room_as(
-            self.appservice.sender, tok=self.appservice.token
-        )
-        for _ in range(5):
-            self.helper.send_event(
-                room_id=other_room_id,
-                type=EventTypes.Message,
-                content={"msgtype": "m.text", "body": "C"},
-                tok=self.appservice.token,
-            )
-
-        # Join the room as the normal user
-        self.helper.join(room_id, user_id, tok=user_tok)
-
-        # Create an event to hang the historical batch from - In order to see
-        # the failure case originally reported in #12281, the historical batch
-        # must be hung from the most recent event in the room so the base
-        # insertion event ends up with the highest `topogological_ordering`
-        # (`depth`) in the room but will have a negative `stream_ordering`
-        # because it's a `historical` event. Previously, when assembling the
-        # `state` for the `/sync` response, the bugged logic would sort by
-        # `topological_ordering` descending and pick up the base insertion
-        # event because it has a negative `stream_ordering` below the given
-        # pagination token. Now we properly sort by `stream_ordering`
-        # descending which puts `historical` events with a negative
-        # `stream_ordering` way at the bottom and aren't selected as expected.
-        response = self.helper.send_event(
-            room_id=room_id,
-            type=EventTypes.Message,
-            content={
-                "msgtype": "m.text",
-                "body": "C",
-            },
-            tok=self.appservice.token,
-        )
-        event_to_hang_id = response["event_id"]
-
-        channel = self.make_request(
-            "POST",
-            "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
-            % (room_id, event_to_hang_id),
-            content={
-                "events": _create_message_events_for_batch_send_request(
-                    self.virtual_user_id, time_before_room, 3
-                ),
-                "state_events_at_start": _create_join_state_events_for_batch_send_request(
-                    [self.virtual_user_id], time_before_room
-                ),
-            },
-            access_token=self.appservice.token,
-        )
-        self.assertEqual(channel.code, 200, channel.result)
-
-        # Now we need to find the invite + join events stream tokens so we can sync between
-        main_store = self.hs.get_datastores().main
-        events, next_key = self.get_success(
-            main_store.get_recent_events_for_room(
-                room_id,
-                50,
-                end_token=main_store.get_room_max_token(),
-            ),
-        )
-        invite_event_position = None
-        for event in events:
-            if (
-                event.type == "m.room.member"
-                and event.content["membership"] == "invite"
-            ):
-                invite_event_position = self.get_success(
-                    main_store.get_topological_token_for_event(event.event_id)
-                )
-                break
-
-        assert invite_event_position is not None, "No invite event found"
-
-        # Remove the topological order from the token by re-creating w/stream only
-        invite_event_position = RoomStreamToken(None, invite_event_position.stream)
-
-        # Sync everything after this token
-        since_token = self.get_success(invite_event_position.to_string(main_store))
-        sync_response = self.make_request(
-            "GET",
-            f"/sync?since={since_token}",
-            access_token=user_tok,
-        )
-
-        # Assert that, for this room, the user was considered to have joined and thus
-        # receives the full state history
-        state_event_types = [
-            event["type"]
-            for event in sync_response.json_body["rooms"]["join"][room_id]["state"][
-                "events"
-            ]
-        ]
-
-        assert (
-            "m.room.create" in state_event_types
-        ), "Missing room full state in sync response"
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py
index 788500e38f..b223dc750b 100644
--- a/tests/storage/databases/main/test_events_worker.py
+++ b/tests/storage/databases/main/test_events_worker.py
@@ -139,6 +139,55 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
             # That should result in a single db query to lookup
             self.assertEqual(ctx.get_resource_usage().db_txn_count, 1)
 
+    def test_persisting_event_prefills_get_event_cache(self) -> None:
+        """
+        Test to make sure that the `_get_event_cache` is prefilled after we persist an
+        event and returns the updated value.
+        """
+        event, event_context = self.get_success(
+            create_event(
+                self.hs,
+                room_id=self.room_id,
+                sender=self.user,
+                type="test_event_type",
+                content={"body": "conflabulation"},
+            )
+        )
+
+        # First, check `_get_event_cache` for the event we just made
+        # to verify it's not in the cache.
+        res = self.store._get_event_cache.get_local((event.event_id,))
+        self.assertEqual(res, None, "Event was cached when it should not have been.")
+
+        with LoggingContext(name="test") as ctx:
+            # Persist the event which should invalidate then prefill the
+            # `_get_event_cache` so we don't return stale values.
+            # Side Note: Apparently, persisting an event isn't a transaction in the
+            # sense that it is recorded in the LoggingContext
+            persistence = self.hs.get_storage_controllers().persistence
+            assert persistence is not None
+            self.get_success(
+                persistence.persist_event(
+                    event,
+                    event_context,
+                )
+            )
+
+            # Check `_get_event_cache` again and we should see the updated fact
+            # that we now have the event cached after persisting it.
+            res = self.store._get_event_cache.get_local((event.event_id,))
+            self.assertEqual(res.event, event, "Event not cached as expected.")  # type: ignore
+
+            # Try and fetch the event from the database.
+            self.get_success(self.store.get_event(event.event_id))
+
+            # Verify that the database hit was avoided.
+            self.assertEqual(
+                ctx.get_resource_usage().evt_db_fetch_count,
+                0,
+                "Database was hit, which would not happen if event was cached.",
+            )
+
     def test_invalidate_cache_by_room_id(self) -> None:
         """
         Test to make sure that all events associated with the given `(room_id,)`
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index e39b63edac..48ebfadaab 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -401,7 +401,10 @@ class EventChainStoreTestCase(HomeserverTestCase):
             assert persist_events_store is not None
             persist_events_store._store_event_txn(
                 txn,
-                [(e, EventContext(self.hs.get_storage_controllers())) for e in events],
+                [
+                    (e, EventContext(self.hs.get_storage_controllers(), {}))
+                    for e in events
+                ],
             )
 
             # Actually call the function that calculates the auth chain stuff.
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 4b8d8328d7..0f3b0744f1 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -20,7 +20,6 @@ from parameterized import parameterized
 
 from twisted.test.proto_helpers import MemoryReactor
 
-from synapse.api.constants import EventTypes
 from synapse.api.room_versions import (
     KNOWN_ROOM_VERSIONS,
     EventFormatVersions,
@@ -924,216 +923,6 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
         backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
         self.assertEqual(backfill_event_ids, ["b3", "b2", "b1"])
 
-    def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo:
-        """
-        Sets up a room with various insertion event backward extremities to test
-        backfill functions against.
-
-        Returns:
-            _BackfillSetupInfo including the `room_id` to test against and
-            `depth_map` of events in the room
-        """
-        room_id = "!backfill-room-test:some-host"
-
-        depth_map: Dict[str, int] = {
-            "1": 1,
-            "2": 2,
-            "insertion_eventA": 3,
-            "3": 4,
-            "insertion_eventB": 5,
-            "4": 6,
-            "5": 7,
-        }
-
-        def populate_db(txn: LoggingTransaction) -> None:
-            # Insert the room to satisfy the foreign key constraint of
-            # `event_failed_pull_attempts`
-            self.store.db_pool.simple_insert_txn(
-                txn,
-                "rooms",
-                {
-                    "room_id": room_id,
-                    "creator": "room_creator_user_id",
-                    "is_public": True,
-                    "room_version": "6",
-                },
-            )
-
-            # Insert our server events
-            stream_ordering = 0
-            for event_id, depth in depth_map.items():
-                self.store.db_pool.simple_insert_txn(
-                    txn,
-                    table="events",
-                    values={
-                        "event_id": event_id,
-                        "type": EventTypes.MSC2716_INSERTION
-                        if event_id.startswith("insertion_event")
-                        else "test_regular_type",
-                        "room_id": room_id,
-                        "depth": depth,
-                        "topological_ordering": depth,
-                        "stream_ordering": stream_ordering,
-                        "processed": True,
-                        "outlier": False,
-                    },
-                )
-
-                if event_id.startswith("insertion_event"):
-                    self.store.db_pool.simple_insert_txn(
-                        txn,
-                        table="insertion_event_extremities",
-                        values={
-                            "event_id": event_id,
-                            "room_id": room_id,
-                        },
-                    )
-
-                stream_ordering += 1
-
-        self.get_success(
-            self.store.db_pool.runInteraction(
-                "_setup_room_for_insertion_backfill_tests_populate_db",
-                populate_db,
-            )
-        )
-
-        return _BackfillSetupInfo(room_id=room_id, depth_map=depth_map)
-
-    def test_get_insertion_event_backward_extremities_in_room(self) -> None:
-        """
-        Test to make sure only insertion event backward extremities that are
-        older and come before the `current_depth` are returned.
-        """
-        setup_info = self._setup_room_for_insertion_backfill_tests()
-        room_id = setup_info.room_id
-        depth_map = setup_info.depth_map
-
-        # Try at "insertion_eventB"
-        backfill_points = self.get_success(
-            self.store.get_insertion_event_backward_extremities_in_room(
-                room_id, depth_map["insertion_eventB"], limit=100
-            )
-        )
-        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertEqual(backfill_event_ids, ["insertion_eventB", "insertion_eventA"])
-
-        # Try at "insertion_eventA"
-        backfill_points = self.get_success(
-            self.store.get_insertion_event_backward_extremities_in_room(
-                room_id, depth_map["insertion_eventA"], limit=100
-            )
-        )
-        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        # Event "2" has a depth of 2 but is not included here because we only
-        # know the approximate depth of 5 from our event "3".
-        self.assertListEqual(backfill_event_ids, ["insertion_eventA"])
-
-    def test_get_insertion_event_backward_extremities_in_room_excludes_events_we_have_attempted(
-        self,
-    ) -> None:
-        """
-        Test to make sure that insertion events we have attempted to backfill
-        (and within backoff timeout duration) do not show up as an event to
-        backfill again.
-        """
-        setup_info = self._setup_room_for_insertion_backfill_tests()
-        room_id = setup_info.room_id
-        depth_map = setup_info.depth_map
-
-        # Record some attempts to backfill these events which will make
-        # `get_insertion_event_backward_extremities_in_room` exclude them
-        # because we haven't passed the backoff interval.
-        self.get_success(
-            self.store.record_event_failed_pull_attempt(
-                room_id, "insertion_eventA", "fake cause"
-            )
-        )
-
-        # No time has passed since we attempted to backfill ^
-
-        # Try at "insertion_eventB"
-        backfill_points = self.get_success(
-            self.store.get_insertion_event_backward_extremities_in_room(
-                room_id, depth_map["insertion_eventB"], limit=100
-            )
-        )
-        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        # Only the backfill points that we didn't record earlier exist here.
-        self.assertEqual(backfill_event_ids, ["insertion_eventB"])
-
-    def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration(
-        self,
-    ) -> None:
-        """
-        Test to make sure after we fake attempt to backfill event
-        "insertion_eventA" many times, we can see retry and see the
-        "insertion_eventA" again after the backoff timeout duration has
-        exceeded.
-        """
-        setup_info = self._setup_room_for_insertion_backfill_tests()
-        room_id = setup_info.room_id
-        depth_map = setup_info.depth_map
-
-        # Record some attempts to backfill these events which will make
-        # `get_backfill_points_in_room` exclude them because we
-        # haven't passed the backoff interval.
-        self.get_success(
-            self.store.record_event_failed_pull_attempt(
-                room_id, "insertion_eventB", "fake cause"
-            )
-        )
-        self.get_success(
-            self.store.record_event_failed_pull_attempt(
-                room_id, "insertion_eventA", "fake cause"
-            )
-        )
-        self.get_success(
-            self.store.record_event_failed_pull_attempt(
-                room_id, "insertion_eventA", "fake cause"
-            )
-        )
-        self.get_success(
-            self.store.record_event_failed_pull_attempt(
-                room_id, "insertion_eventA", "fake cause"
-            )
-        )
-        self.get_success(
-            self.store.record_event_failed_pull_attempt(
-                room_id, "insertion_eventA", "fake cause"
-            )
-        )
-
-        # Now advance time by 2 hours and we should only be able to see
-        # "insertion_eventB" because we have waited long enough for the single
-        # attempt (2^1 hours) but we still shouldn't see "insertion_eventA"
-        # because we haven't waited long enough for this many attempts.
-        self.reactor.advance(datetime.timedelta(hours=2).total_seconds())
-
-        # Try at "insertion_eventA" and make sure that "insertion_eventA" is not
-        # in the list because we've already attempted many times
-        backfill_points = self.get_success(
-            self.store.get_insertion_event_backward_extremities_in_room(
-                room_id, depth_map["insertion_eventA"], limit=100
-            )
-        )
-        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertEqual(backfill_event_ids, [])
-
-        # Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
-        # see if we can now backfill it
-        self.reactor.advance(datetime.timedelta(hours=20).total_seconds())
-
-        # Try at "insertion_eventA" again after we advanced enough time and we
-        # should see "insertion_eventA" again
-        backfill_points = self.get_success(
-            self.store.get_insertion_event_backward_extremities_in_room(
-                room_id, depth_map["insertion_eventA"], limit=100
-            )
-        )
-        backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
-        self.assertEqual(backfill_event_ids, ["insertion_eventA"])
-
     def test_get_event_ids_with_failed_pull_attempts(self) -> None:
         """
         Test to make sure we properly get event_ids based on whether they have any
diff --git a/tests/test_state.py b/tests/test_state.py
index 7a49b87953..eded38c766 100644
--- a/tests/test_state.py
+++ b/tests/test_state.py
@@ -555,10 +555,15 @@ class StateTestCase(unittest.TestCase):
             (e.event_id for e in old_state + [event]), current_state_ids.values()
         )
 
-        self.assertIsNotNone(context.state_group_before_event)
+        assert context.state_group_before_event is not None
+        assert context.state_group is not None
+        self.assertEqual(
+            context.state_group_deltas.get(
+                (context.state_group_before_event, context.state_group)
+            ),
+            {(event.type, event.state_key): event.event_id},
+        )
         self.assertNotEqual(context.state_group_before_event, context.state_group)
-        self.assertEqual(context.state_group_before_event, context.prev_group)
-        self.assertEqual({("state", ""): event.event_id}, context.delta_ids)
 
     @defer.inlineCallbacks
     def test_trivial_annotate_message(