summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-17 14:03:59 +0100
committerErik Johnston <erik@matrix.org>2024-07-17 14:03:59 +0100
commitff1b13d94c7ccad7298044e313b157d2f8f2f83a (patch)
treeb4e5d0fa194bf7aa56079f5471b0b964384d40bb
parentMerge remote-tracking branch 'origin/develop' into erikj/ss_hacks (diff)
parentNewsfile (diff)
downloadsynapse-ff1b13d94c7ccad7298044e313b157d2f8f2f83a.tar.xz
Merge branch 'erikj/ss_room_store' into erikj/ss_hacks
-rw-r--r--.github/workflows/tests.yml4
-rw-r--r--Cargo.lock8
-rw-r--r--changelog.d/17424.misc1
-rw-r--r--changelog.d/17438.bugfix1
-rw-r--r--changelog.d/17439.bugfix1
-rw-r--r--changelog.d/17447.feature1
-rw-r--r--changelog.d/17449.bugfix1
-rw-r--r--changelog.d/17452.misc1
-rw-r--r--poetry.lock91
-rw-r--r--synapse/app/homeserver.py2
-rw-r--r--synapse/config/repository.py2
-rw-r--r--synapse/config/server.py6
-rw-r--r--synapse/handlers/sliding_sync.py323
-rw-r--r--synapse/http/matrixfederationclient.py126
-rw-r--r--synapse/rest/client/sync.py12
-rw-r--r--synapse/storage/databases/main/state_deltas.py38
-rw-r--r--synapse/types/__init__.py102
-rw-r--r--synapse/types/handlers/__init__.py40
-rw-r--r--synapse/types/rest/client/__init__.py5
-rw-r--r--tests/media/test_media_storage.py49
-rw-r--r--tests/rest/client/test_media.py50
-rw-r--r--tests/rest/client/test_sync.py167
-rw-r--r--tests/server.py4
-rw-r--r--tests/test_types.py71
-rw-r--r--tests/util/test_check_dependencies.py3
25 files changed, 892 insertions, 217 deletions
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 767495101b..730f8552d9 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -305,7 +305,7 @@ jobs:
       - lint-readme
     runs-on: ubuntu-latest
     steps:
-      - uses: matrix-org/done-action@v2
+      - uses: matrix-org/done-action@v3
         with:
           needs: ${{ toJSON(needs) }}
 
@@ -737,7 +737,7 @@ jobs:
       - linting-done
     runs-on: ubuntu-latest
     steps:
-      - uses: matrix-org/done-action@v2
+      - uses: matrix-org/done-action@v3
         with:
           needs: ${{ toJSON(needs) }}
 
diff --git a/Cargo.lock b/Cargo.lock
index 3a8bf7a49c..e9adfcbdc3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -67,9 +67,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
 
 [[package]]
 name = "bytes"
-version = "1.6.0"
+version = "1.6.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
+checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952"
 
 [[package]]
 name = "cfg-if"
@@ -597,9 +597,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
 
 [[package]]
 name = "ulid"
-version = "1.1.2"
+version = "1.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259"
+checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289"
 dependencies = [
  "getrandom",
  "rand",
diff --git a/changelog.d/17424.misc b/changelog.d/17424.misc
new file mode 100644
index 0000000000..d4a81c137f
--- /dev/null
+++ b/changelog.d/17424.misc
@@ -0,0 +1 @@
+Make sure we always use the right logic for enabling the media repo.
diff --git a/changelog.d/17438.bugfix b/changelog.d/17438.bugfix
new file mode 100644
index 0000000000..cff6eecd48
--- /dev/null
+++ b/changelog.d/17438.bugfix
@@ -0,0 +1 @@
+Fix rare bug where `/sync` would break for a user when using workers with multiple stream writers.
diff --git a/changelog.d/17439.bugfix b/changelog.d/17439.bugfix
new file mode 100644
index 0000000000..f36c3ec255
--- /dev/null
+++ b/changelog.d/17439.bugfix
@@ -0,0 +1 @@
+Limit concurrent remote downloads to 6 per IP address, and decrement remote downloads without a content-length from the ratelimiter after the download is complete.
\ No newline at end of file
diff --git a/changelog.d/17447.feature b/changelog.d/17447.feature
new file mode 100644
index 0000000000..6f80e298ae
--- /dev/null
+++ b/changelog.d/17447.feature
@@ -0,0 +1 @@
+Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17449.bugfix b/changelog.d/17449.bugfix
new file mode 100644
index 0000000000..cd847a3d1c
--- /dev/null
+++ b/changelog.d/17449.bugfix
@@ -0,0 +1 @@
+Remove unnecessary call to resume producing in fake channel.
\ No newline at end of file
diff --git a/changelog.d/17452.misc b/changelog.d/17452.misc
new file mode 100644
index 0000000000..4fd07f617b
--- /dev/null
+++ b/changelog.d/17452.misc
@@ -0,0 +1 @@
+Change sliding sync to use their own token format in preparation for storing per-connection state.
diff --git a/poetry.lock b/poetry.lock
index f578a33ed5..4092a41884 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -934,13 +934,13 @@ i18n = ["Babel (>=2.7)"]
 
 [[package]]
 name = "jsonschema"
-version = "4.22.0"
+version = "4.23.0"
 description = "An implementation of JSON Schema validation for Python"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "jsonschema-4.22.0-py3-none-any.whl", hash = "sha256:ff4cfd6b1367a40e7bc6411caec72effadd3db0bbe5017de188f2d6108335802"},
-    {file = "jsonschema-4.22.0.tar.gz", hash = "sha256:5b22d434a45935119af990552c862e5d6d564e8f6601206b305a61fdf661a2b7"},
+    {file = "jsonschema-4.23.0-py3-none-any.whl", hash = "sha256:fbadb6f8b144a8f8cf9f0b89ba94501d143e50411a1278633f56a7acf7fd5566"},
+    {file = "jsonschema-4.23.0.tar.gz", hash = "sha256:d71497fef26351a33265337fa77ffeb82423f3ea21283cd9467bb03999266bc4"},
 ]
 
 [package.dependencies]
@@ -953,7 +953,7 @@ rpds-py = ">=0.7.1"
 
 [package.extras]
 format = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3987", "uri-template", "webcolors (>=1.11)"]
-format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=1.11)"]
+format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=24.6.0)"]
 
 [[package]]
 name = "jsonschema-specifications"
@@ -1389,38 +1389,38 @@ files = [
 
 [[package]]
 name = "mypy"
-version = "1.9.0"
+version = "1.10.1"
 description = "Optional static typing for Python"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "mypy-1.9.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f8a67616990062232ee4c3952f41c779afac41405806042a8126fe96e098419f"},
-    {file = "mypy-1.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d357423fa57a489e8c47b7c85dfb96698caba13d66e086b412298a1a0ea3b0ed"},
-    {file = "mypy-1.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:49c87c15aed320de9b438ae7b00c1ac91cd393c1b854c2ce538e2a72d55df150"},
-    {file = "mypy-1.9.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:48533cdd345c3c2e5ef48ba3b0d3880b257b423e7995dada04248725c6f77374"},
-    {file = "mypy-1.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:4d3dbd346cfec7cb98e6cbb6e0f3c23618af826316188d587d1c1bc34f0ede03"},
-    {file = "mypy-1.9.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:653265f9a2784db65bfca694d1edd23093ce49740b2244cde583aeb134c008f3"},
-    {file = "mypy-1.9.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3a3c007ff3ee90f69cf0a15cbcdf0995749569b86b6d2f327af01fd1b8aee9dc"},
-    {file = "mypy-1.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2418488264eb41f69cc64a69a745fad4a8f86649af4b1041a4c64ee61fc61129"},
-    {file = "mypy-1.9.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:68edad3dc7d70f2f17ae4c6c1b9471a56138ca22722487eebacfd1eb5321d612"},
-    {file = "mypy-1.9.0-cp311-cp311-win_amd64.whl", hash = "sha256:85ca5fcc24f0b4aeedc1d02f93707bccc04733f21d41c88334c5482219b1ccb3"},
-    {file = "mypy-1.9.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:aceb1db093b04db5cd390821464504111b8ec3e351eb85afd1433490163d60cd"},
-    {file = "mypy-1.9.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0235391f1c6f6ce487b23b9dbd1327b4ec33bb93934aa986efe8a9563d9349e6"},
-    {file = "mypy-1.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d4d5ddc13421ba3e2e082a6c2d74c2ddb3979c39b582dacd53dd5d9431237185"},
-    {file = "mypy-1.9.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:190da1ee69b427d7efa8aa0d5e5ccd67a4fb04038c380237a0d96829cb157913"},
-    {file = "mypy-1.9.0-cp312-cp312-win_amd64.whl", hash = "sha256:fe28657de3bfec596bbeef01cb219833ad9d38dd5393fc649f4b366840baefe6"},
-    {file = "mypy-1.9.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:e54396d70be04b34f31d2edf3362c1edd023246c82f1730bbf8768c28db5361b"},
-    {file = "mypy-1.9.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:5e6061f44f2313b94f920e91b204ec600982961e07a17e0f6cd83371cb23f5c2"},
-    {file = "mypy-1.9.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:81a10926e5473c5fc3da8abb04119a1f5811a236dc3a38d92015cb1e6ba4cb9e"},
-    {file = "mypy-1.9.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b685154e22e4e9199fc95f298661deea28aaede5ae16ccc8cbb1045e716b3e04"},
-    {file = "mypy-1.9.0-cp38-cp38-win_amd64.whl", hash = "sha256:5d741d3fc7c4da608764073089e5f58ef6352bedc223ff58f2f038c2c4698a89"},
-    {file = "mypy-1.9.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:587ce887f75dd9700252a3abbc9c97bbe165a4a630597845c61279cf32dfbf02"},
-    {file = "mypy-1.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f88566144752999351725ac623471661c9d1cd8caa0134ff98cceeea181789f4"},
-    {file = "mypy-1.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:61758fabd58ce4b0720ae1e2fea5cfd4431591d6d590b197775329264f86311d"},
-    {file = "mypy-1.9.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:e49499be624dead83927e70c756970a0bc8240e9f769389cdf5714b0784ca6bf"},
-    {file = "mypy-1.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:571741dc4194b4f82d344b15e8837e8c5fcc462d66d076748142327626a1b6e9"},
-    {file = "mypy-1.9.0-py3-none-any.whl", hash = "sha256:a260627a570559181a9ea5de61ac6297aa5af202f06fd7ab093ce74e7181e43e"},
-    {file = "mypy-1.9.0.tar.gz", hash = "sha256:3cc5da0127e6a478cddd906068496a97a7618a21ce9b54bde5bf7e539c7af974"},
+    {file = "mypy-1.10.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e36f229acfe250dc660790840916eb49726c928e8ce10fbdf90715090fe4ae02"},
+    {file = "mypy-1.10.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:51a46974340baaa4145363b9e051812a2446cf583dfaeba124af966fa44593f7"},
+    {file = "mypy-1.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:901c89c2d67bba57aaaca91ccdb659aa3a312de67f23b9dfb059727cce2e2e0a"},
+    {file = "mypy-1.10.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:0cd62192a4a32b77ceb31272d9e74d23cd88c8060c34d1d3622db3267679a5d9"},
+    {file = "mypy-1.10.1-cp310-cp310-win_amd64.whl", hash = "sha256:a2cbc68cb9e943ac0814c13e2452d2046c2f2b23ff0278e26599224cf164e78d"},
+    {file = "mypy-1.10.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:bd6f629b67bb43dc0d9211ee98b96d8dabc97b1ad38b9b25f5e4c4d7569a0c6a"},
+    {file = "mypy-1.10.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a1bbb3a6f5ff319d2b9d40b4080d46cd639abe3516d5a62c070cf0114a457d84"},
+    {file = "mypy-1.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b8edd4e9bbbc9d7b79502eb9592cab808585516ae1bcc1446eb9122656c6066f"},
+    {file = "mypy-1.10.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6166a88b15f1759f94a46fa474c7b1b05d134b1b61fca627dd7335454cc9aa6b"},
+    {file = "mypy-1.10.1-cp311-cp311-win_amd64.whl", hash = "sha256:5bb9cd11c01c8606a9d0b83ffa91d0b236a0e91bc4126d9ba9ce62906ada868e"},
+    {file = "mypy-1.10.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d8681909f7b44d0b7b86e653ca152d6dff0eb5eb41694e163c6092124f8246d7"},
+    {file = "mypy-1.10.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:378c03f53f10bbdd55ca94e46ec3ba255279706a6aacaecac52ad248f98205d3"},
+    {file = "mypy-1.10.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bacf8f3a3d7d849f40ca6caea5c055122efe70e81480c8328ad29c55c69e93e"},
+    {file = "mypy-1.10.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:701b5f71413f1e9855566a34d6e9d12624e9e0a8818a5704d74d6b0402e66c04"},
+    {file = "mypy-1.10.1-cp312-cp312-win_amd64.whl", hash = "sha256:3c4c2992f6ea46ff7fce0072642cfb62af7a2484efe69017ed8b095f7b39ef31"},
+    {file = "mypy-1.10.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:604282c886497645ffb87b8f35a57ec773a4a2721161e709a4422c1636ddde5c"},
+    {file = "mypy-1.10.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:37fd87cab83f09842653f08de066ee68f1182b9b5282e4634cdb4b407266bade"},
+    {file = "mypy-1.10.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8addf6313777dbb92e9564c5d32ec122bf2c6c39d683ea64de6a1fd98b90fe37"},
+    {file = "mypy-1.10.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:5cc3ca0a244eb9a5249c7c583ad9a7e881aa5d7b73c35652296ddcdb33b2b9c7"},
+    {file = "mypy-1.10.1-cp38-cp38-win_amd64.whl", hash = "sha256:1b3a2ffce52cc4dbaeee4df762f20a2905aa171ef157b82192f2e2f368eec05d"},
+    {file = "mypy-1.10.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fe85ed6836165d52ae8b88f99527d3d1b2362e0cb90b005409b8bed90e9059b3"},
+    {file = "mypy-1.10.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c2ae450d60d7d020d67ab440c6e3fae375809988119817214440033f26ddf7bf"},
+    {file = "mypy-1.10.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6be84c06e6abd72f960ba9a71561c14137a583093ffcf9bbfaf5e613d63fa531"},
+    {file = "mypy-1.10.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2189ff1e39db399f08205e22a797383613ce1cb0cb3b13d8bcf0170e45b96cc3"},
+    {file = "mypy-1.10.1-cp39-cp39-win_amd64.whl", hash = "sha256:97a131ee36ac37ce9581f4220311247ab6cba896b4395b9c87af0675a13a755f"},
+    {file = "mypy-1.10.1-py3-none-any.whl", hash = "sha256:71d8ac0b906354ebda8ef1673e5fde785936ac1f29ff6987c7483cfbd5a4235a"},
+    {file = "mypy-1.10.1.tar.gz", hash = "sha256:1f8f492d7db9e3593ef42d4f115f04e556130f2819ad33ab84551403e97dd4c0"},
 ]
 
 [package.dependencies]
@@ -2504,19 +2504,18 @@ tests = ["coverage[toml] (>=5.0.2)", "pytest"]
 
 [[package]]
 name = "setuptools"
-version = "67.6.0"
+version = "70.0.0"
 description = "Easily download, build, install, upgrade, and uninstall Python packages"
 optional = false
-python-versions = ">=3.7"
+python-versions = ">=3.8"
 files = [
-    {file = "setuptools-67.6.0-py3-none-any.whl", hash = "sha256:b78aaa36f6b90a074c1fa651168723acbf45d14cb1196b6f02c0fd07f17623b2"},
-    {file = "setuptools-67.6.0.tar.gz", hash = "sha256:2ee892cd5f29f3373097f5a814697e397cf3ce313616df0af11231e2ad118077"},
+    {file = "setuptools-70.0.0-py3-none-any.whl", hash = "sha256:54faa7f2e8d2d11bcd2c07bed282eef1046b5c080d1c32add737d7b5817b1ad4"},
+    {file = "setuptools-70.0.0.tar.gz", hash = "sha256:f211a66637b8fa059bb28183da127d4e86396c991a942b028c6650d4319c3fd0"},
 ]
 
 [package.extras]
-docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (==0.8.3)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
-testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8 (<5)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
-testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"]
+docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
+testing = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
 
 [[package]]
 name = "setuptools-rust"
@@ -2704,19 +2703,19 @@ docs = ["sphinx (<7.0.0)"]
 
 [[package]]
 name = "twine"
-version = "5.1.0"
+version = "5.1.1"
 description = "Collection of utilities for publishing packages on PyPI"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "twine-5.1.0-py3-none-any.whl", hash = "sha256:fe1d814395bfe50cfbe27783cb74efe93abeac3f66deaeb6c8390e4e92bacb43"},
-    {file = "twine-5.1.0.tar.gz", hash = "sha256:4d74770c88c4fcaf8134d2a6a9d863e40f08255ff7d8e2acb3cbbd57d25f6e9d"},
+    {file = "twine-5.1.1-py3-none-any.whl", hash = "sha256:215dbe7b4b94c2c50a7315c0275d2258399280fbb7d04182c7e55e24b5f93997"},
+    {file = "twine-5.1.1.tar.gz", hash = "sha256:9aa0825139c02b3434d913545c7b847a21c835e11597f5255842d457da2322db"},
 ]
 
 [package.dependencies]
 importlib-metadata = ">=3.6"
 keyring = ">=15.1"
-pkginfo = ">=1.8.1"
+pkginfo = ">=1.8.1,<1.11"
 readme-renderer = ">=35.0"
 requests = ">=2.20"
 requests-toolbelt = ">=0.8.0,<0.9.0 || >0.9.0"
@@ -2851,13 +2850,13 @@ files = [
 
 [[package]]
 name = "types-jsonschema"
-version = "4.22.0.20240610"
+version = "4.23.0.20240712"
 description = "Typing stubs for jsonschema"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "types-jsonschema-4.22.0.20240610.tar.gz", hash = "sha256:f82ab9fe756e3a2642ea9712c46b403ce61eb380b939b696cff3252af42f65b0"},
-    {file = "types_jsonschema-4.22.0.20240610-py3-none-any.whl", hash = "sha256:89996b9bd1928f820a0e252b2844be21cd2e55d062b6fa1048d88453006ad89e"},
+    {file = "types-jsonschema-4.23.0.20240712.tar.gz", hash = "sha256:b20db728dcf7ea3e80e9bdeb55e8b8420c6c040cda14e8cf284465adee71d217"},
+    {file = "types_jsonschema-4.23.0.20240712-py3-none-any.whl", hash = "sha256:8c33177ce95336241c1d61ccb56a9964d4361b99d5f1cd81a1ab4909b0dd7cf4"},
 ]
 
 [package.dependencies]
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 2b111847b7..e114ab7ec4 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -217,7 +217,7 @@ class SynapseHomeServer(HomeServer):
             )
 
         if name in ["media", "federation", "client"]:
-            if self.config.server.enable_media_repo:
+            if self.config.media.can_load_media_repo:
                 media_repo = self.get_media_repository_resource()
                 resources.update(
                     {
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 1645470499..dc0e93ffa1 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -126,7 +126,7 @@ class ContentRepositoryConfig(Config):
         # Only enable the media repo if either the media repo is enabled or the
         # current worker app is the media repo.
         if (
-            self.root.server.enable_media_repo is False
+            config.get("enable_media_repo", True) is False
             and config.get("worker_app") != "synapse.app.media_repository"
         ):
             self.can_load_media_repo = False
diff --git a/synapse/config/server.py b/synapse/config/server.py
index a2b2305776..8bb97df175 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -395,12 +395,6 @@ class ServerConfig(Config):
                 self.presence_router_config,
             ) = load_module(presence_router_config, ("presence", "presence_router"))
 
-        # whether to enable the media repository endpoints. This should be set
-        # to false if the media repository is running as a separate endpoint;
-        # doing so ensures that we will not run cache cleanup jobs on the
-        # master, potentially causing inconsistency.
-        self.enable_media_repo = config.get("enable_media_repo", True)
-
         # Whether to require authentication to retrieve profile data (avatars,
         # display names) of other users through the client API.
         self.require_auth_for_profile_requests = config.get(
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 239967fa73..6f2988e64c 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,11 +18,13 @@
 #
 #
 import logging
+from enum import Enum
 from itertools import chain
 from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple
 
 import attr
 from immutabledict import immutabledict
+from typing_extensions import assert_never
 
 from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
 from synapse.events import EventBase
@@ -37,7 +39,9 @@ from synapse.types import (
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
+    SlidingSyncStreamToken,
     StateMap,
+    StrCollection,
     StreamKeyType,
     StreamToken,
     UserID,
@@ -343,11 +347,13 @@ class SlidingSyncHandler:
         self.relations_handler = hs.get_relations_handler()
         self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
+        self.connection_store = SlidingSyncConnectionStore()
+
     async def wait_for_sync_for_user(
         self,
         requester: Requester,
         sync_config: SlidingSyncConfig,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[SlidingSyncStreamToken] = None,
         timeout_ms: int = 0,
     ) -> SlidingSyncResult:
         """
@@ -382,7 +388,7 @@ class SlidingSyncHandler:
             # this returns false, it means we timed out waiting, and we should
             # just return an empty response.
             before_wait_ts = self.clock.time_msec()
-            if not await self.notifier.wait_for_stream_token(from_token):
+            if not await self.notifier.wait_for_stream_token(from_token.stream_token):
                 logger.warning(
                     "Timed out waiting for worker to catch up. Returning empty response"
                 )
@@ -420,7 +426,7 @@ class SlidingSyncHandler:
                 sync_config.user.to_string(),
                 timeout_ms,
                 current_sync_callback,
-                from_token=from_token,
+                from_token=from_token.stream_token,
             )
 
         return result
@@ -430,7 +436,7 @@ class SlidingSyncHandler:
         self,
         sync_config: SlidingSyncConfig,
         to_token: StreamToken,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[SlidingSyncStreamToken] = None,
     ) -> SlidingSyncResult:
         """
         Generates the response body of a Sliding Sync result, represented as a
@@ -451,6 +457,12 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        await self.connection_store.mark_token_seen(
+            user_id,
+            conn_id=sync_config.connection_id(),
+            from_token=from_token,
+        )
+
         # Get all of the room IDs that the user should be able to see in the sync
         # response
         has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@@ -463,7 +475,7 @@ class SlidingSyncHandler:
                 await self.get_room_membership_for_user_at_to_token(
                     user=sync_config.user,
                     to_token=to_token,
-                    from_token=from_token,
+                    from_token=from_token.stream_token if from_token else None,
                 )
             )
 
@@ -605,7 +617,7 @@ class SlidingSyncHandler:
         @tag_args
         async def handle_room(room_id: str) -> None:
             room_sync_result = await self.get_room_sync_data(
-                user=sync_config.user,
+                sync_config=sync_config,
                 room_id=room_id,
                 room_sync_config=room_sync_config,
                 room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -624,8 +636,21 @@ class SlidingSyncHandler:
             sync_config=sync_config, to_token=to_token
         )
 
+        if has_lists or has_room_subscriptions:
+            connection_token = await self.connection_store.record_rooms(
+                user_id,
+                conn_id=sync_config.connection_id(),
+                from_token=from_token,
+                sent_room_ids=relevant_room_map.keys(),
+                unsent_room_ids=[],  # TODO: We currently ssume that we have sent down all updates.
+            )
+        elif from_token:
+            connection_token = from_token.connection_token
+        else:
+            connection_token = 0
+
         return SlidingSyncResult(
-            next_pos=to_token,
+            next_pos=SlidingSyncStreamToken(to_token, connection_token),
             lists=lists,
             rooms=rooms,
             extensions=extensions,
@@ -713,10 +738,17 @@ class SlidingSyncHandler:
                 instance_to_max_stream_ordering_map[instance_name] = stream_ordering
 
         # Then assemble the `RoomStreamToken`
+        min_stream_pos = min(instance_to_max_stream_ordering_map.values())
         membership_snapshot_token = RoomStreamToken(
             # Minimum position in the `instance_map`
-            stream=min(instance_to_max_stream_ordering_map.values()),
-            instance_map=immutabledict(instance_to_max_stream_ordering_map),
+            stream=min_stream_pos,
+            instance_map=immutabledict(
+                {
+                    instance_name: stream_pos
+                    for instance_name, stream_pos in instance_to_max_stream_ordering_map.items()
+                    if stream_pos > min_stream_pos
+                }
+            ),
         )
 
         # Since we fetched the users room list at some point in time after the from/to
@@ -1359,11 +1391,11 @@ class SlidingSyncHandler:
 
     async def get_room_sync_data(
         self,
-        user: UserID,
+        sync_config: SlidingSyncConfig,
         room_id: str,
         room_sync_config: RoomSyncConfig,
         room_membership_for_user_at_to_token: _RoomMembershipForUser,
-        from_token: Optional[StreamToken],
+        from_token: Optional[SlidingSyncStreamToken],
         to_token: StreamToken,
     ) -> SlidingSyncResult.RoomResult:
         """
@@ -1381,6 +1413,38 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
             to_token: The point in the stream to sync up to.
         """
+        user = sync_config.user
+
+        # Determine whether we should limit the timeline to the token range.
+        #
+        # We should return historical messages (before token range) in the
+        # following cases because we want clients to be able to show a basic
+        # screen of information:
+        #  - Initial sync (because no `from_token` to limit us anyway)
+        #  - When users `newly_joined`
+        #  - For an incremental sync where we haven't sent it down this
+        #    connection before
+        to_bound = None
+        initial = True
+        if from_token and not room_membership_for_user_at_to_token.newly_joined:
+            room_status = await self.connection_store.have_sent_room(
+                user_id=user.to_string(),
+                conn_id=sync_config.connection_id(),
+                connection_token=from_token.connection_token,
+                room_id=room_id,
+            )
+            if room_status.status == HaveSentRoomFlag.LIVE:
+                to_bound = from_token.stream_token.room_key
+                initial = False
+            elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
+                assert room_status.last_token is not None
+                to_bound = room_status.last_token
+                initial = False
+            elif room_status.status == HaveSentRoomFlag.NEVER:
+                to_bound = None
+                initial = True
+            else:
+                assert_never(room_status.status)
 
         # Assemble the list of timeline events
         #
@@ -1417,22 +1481,6 @@ class SlidingSyncHandler:
                     room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
                 )
 
-            # Determine whether we should limit the timeline to the token range.
-            #
-            # We should return historical messages (before token range) in the
-            # following cases because we want clients to be able to show a basic
-            # screen of information:
-            #  - Initial sync (because no `from_token` to limit us anyway)
-            #  - When users `newly_joined`
-            #  - TODO: For an incremental sync where we haven't sent it down this
-            #    connection before
-            to_bound = (
-                from_token.room_key
-                if from_token is not None
-                and not room_membership_for_user_at_to_token.newly_joined
-                else None
-            )
-
             fiddled_timeline_limit = room_sync_config.timeline_limit
             if to_bound:
                 fiddled_timeline_limit = max(fiddled_timeline_limit, 10)
@@ -1498,7 +1546,9 @@ class SlidingSyncHandler:
                         instance_name=timeline_event.internal_metadata.instance_name,
                         stream=timeline_event.internal_metadata.stream_ordering,
                     )
-                    if persisted_position.persisted_after(from_token.room_key):
+                    if persisted_position.persisted_after(
+                        from_token.stream_token.room_key
+                    ):
                         num_live += 1
                     else:
                         # Since we're iterating over the timeline events in
@@ -1555,12 +1605,6 @@ class SlidingSyncHandler:
         # indicate to the client that a state reset happened. Perhaps we should indicate
         # this by setting `initial: True` and empty `required_state`.
 
-        # TODO: Since we can't determine whether we've already sent a room down this
-        # Sliding Sync connection before (we plan to add this optimization in the
-        # future), we're always returning the requested room state instead of
-        # updates.
-        initial = True
-
         # Check whether the room has a name set
         name_state_ids = await self.get_current_state_ids_at(
             room_id=room_id,
@@ -1711,9 +1755,17 @@ class SlidingSyncHandler:
                 to_token=to_token,
             )
         else:
-            # TODO: Once we can figure out if we've sent a room down this connection before,
-            # we can return updates instead of the full required state.
-            raise NotImplementedError()
+            assert to_bound is not None
+
+            deltas = await self.store.get_current_state_deltas_for_room(
+                room_id, to_bound, to_token.room_key
+            )
+            # TODO: Filter room state before fetching events
+            # TODO: Handle state resets where event_id is None
+            events = await self.store.get_events(
+                [d.event_id for d in deltas if d.event_id]
+            )
+            room_state = {(s.type, s.state_key): s for s in events.values()}
 
         required_room_state: StateMap[EventBase] = {}
         if required_state_filter != StateFilter.none():
@@ -1841,7 +1893,7 @@ class SlidingSyncHandler:
         """
 
         user_id = sync_config.user.to_string()
-        device_id = sync_config.device_id
+        device_id = sync_config.requester.device_id
 
         # Check that this request has a valid device ID (not all requests have
         # to belong to a device, and so device_id is None), and that the
@@ -1898,3 +1950,198 @@ class SlidingSyncHandler:
             next_batch=f"{stream_id}",
             events=messages,
         )
+
+
+class HaveSentRoomFlag(Enum):
+    """Flag for whether we have sent the room down a sliding sync connection.
+
+    The valid state changes here are:
+        NEVER -> LIVE
+        LIVE -> PREVIOUSLY
+        PREVIOUSLY -> LIVE
+    """
+
+    # The room has never been sent down (or we have forgotten we have sent it
+    # down).
+    NEVER = 1
+
+    # We have previously sent the room down, but there are updates that we
+    # haven't sent down.
+    PREVIOUSLY = 2
+
+    # We have sent the room down and the client has received all updates.
+    LIVE = 3
+
+
+@attr.s(auto_attribs=True, slots=True, frozen=True)
+class HaveSentRoom:
+    """Whether we have sent the room down a sliding sync connection.
+
+    Attributes:
+        status: Flag of if we have or haven't sent down the room
+        last_token: If the flag is `PREVIOUSLY` then this is non-null and
+            contains the last stream token of the last updates we sent down
+            the room, i.e. we still need to send everything since then to the
+            client.
+    """
+
+    status: HaveSentRoomFlag
+    last_token: Optional[RoomStreamToken]
+
+    @staticmethod
+    def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
+        """Constructor for `PREVIOUSLY` flag."""
+        return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
+
+
+HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
+HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
+
+
+@attr.s(auto_attribs=True)
+class SlidingSyncConnectionStore:
+    """In-memory store of per-connection state, including what rooms we have
+    previously sent down a sliding sync connection.
+
+    Note: This is NOT safe to run in a worker setup.
+
+    The complication here is that we need to handle requests being resent, i.e.
+    if we sent down a room in a response that the client received, we must
+    consider the room *not* sent when we get the request again.
+
+    This is handled by using an integer "token", which is returned to the client
+    as part of the sync token. For each connection we store a mapping from
+    tokens to the room states, and create a new entry when we send down new
+    rooms.
+
+    Note that for any given sliding sync connection we will only store a maximum
+    of two different tokens: the previous token from the request and a new token
+    sent in the response. When we receive a request with a given token, we then
+    clear out all other entries with a different token.
+
+    Attributes:
+        _connections: Mapping from `(user_id, conn_id)` to mapping of `token`
+            to mapping of room ID to `HaveSentRoom`.
+    """
+
+    # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
+    _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
+        attr.Factory(dict)
+    )
+
+    async def have_sent_room(
+        self, user_id: str, conn_id: str, connection_token: int, room_id: str
+    ) -> HaveSentRoom:
+        """Whether for the given user_id/conn_id/token, return whether we have
+        previously sent the room down
+        """
+
+        sync_statuses = self._connections.setdefault((user_id, conn_id), {})
+        room_status = sync_statuses.get(connection_token, {}).get(
+            room_id, HAVE_SENT_ROOM_NEVER
+        )
+
+        return room_status
+
+    async def record_rooms(
+        self,
+        user_id: str,
+        conn_id: str,
+        from_token: Optional[SlidingSyncStreamToken],
+        *,
+        sent_room_ids: StrCollection,
+        unsent_room_ids: StrCollection,
+    ) -> int:
+        """Record which rooms we have/haven't sent down in a new response
+
+        Attributes:
+            user_id
+            conn_id
+            from_token: The since token from the request, if any
+            sent_room_ids: The set of room IDs that we have sent down as
+                part of this request (only needs to be ones we didn't
+                previously sent down).
+            unsent_room_ids: The set of room IDs that have had updates
+                since the `last_room_token`, but which were not included in
+                this request
+        """
+        prev_connection_token = 0
+        if from_token is not None:
+            prev_connection_token = from_token.connection_token
+
+        # If there are no changes then this is a noop.
+        if not sent_room_ids and not unsent_room_ids:
+            return prev_connection_token
+
+        sync_statuses = self._connections.setdefault((user_id, conn_id), {})
+
+        # Generate a new token, removing any existing entries in that token
+        # (which can happen if requests get resent).
+        new_store_token = prev_connection_token + 1
+        sync_statuses.pop(new_store_token, None)
+
+        # Copy over and update the room mappings.
+        new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
+
+        # Whether we have updated the `new_room_statuses`, if we don't by the
+        # end we can treat this as a noop.
+        have_updated = False
+        for room_id in sent_room_ids:
+            new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
+            have_updated = True
+
+        # Whether we add/update the entries for unsent rooms depends on the
+        # existing entry:
+        #   - LIVE: We have previously sent down everything up to
+        #     `last_room_token, so we update the entry to be `PREVIOUSLY` with
+        #     `last_room_token`.
+        #   - PREVIOUSLY: We have previously sent down everything up to *a*
+        #     given token, so we don't need to update the entry.
+        #   - NEVER: We have never previously sent down the room, and we haven't
+        #     sent anything down this time either so we leave it as NEVER.
+
+        # Work out the new state for unsent rooms that were `LIVE`.
+        if from_token:
+            new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
+        else:
+            new_unsent_state = HAVE_SENT_ROOM_NEVER
+
+        for room_id in unsent_room_ids:
+            prev_state = new_room_statuses.get(room_id)
+            if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
+                new_room_statuses[room_id] = new_unsent_state
+                have_updated = True
+
+        if not have_updated:
+            return prev_connection_token
+
+        sync_statuses[new_store_token] = new_room_statuses
+
+        return new_store_token
+
+    async def mark_token_seen(
+        self,
+        user_id: str,
+        conn_id: str,
+        from_token: Optional[SlidingSyncStreamToken],
+    ) -> None:
+        """We have received a request with the given token, so we can clear out
+        any other tokens associated with the connection.
+
+        If there is no from token then we have started afresh, and so we delete
+        all tokens associated with the device.
+        """
+        # Clear out any tokens for the connection that doesn't match the one
+        # from the request.
+
+        sync_statuses = self._connections.pop((user_id, conn_id), {})
+        if from_token is None:
+            return
+
+        sync_statuses = {
+            i: room_statuses
+            for i, room_statuses in sync_statuses.items()
+            if i == from_token.connection_token
+        }
+        if sync_statuses:
+            self._connections[(user_id, conn_id)] = sync_statuses
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 749b01dd0e..6fd75fd381 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -90,7 +90,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import set_tag, start_active_span, tags
 from synapse.types import JsonDict
 from synapse.util import json_decoder
-from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
+from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
 from synapse.util.metrics import Measure
 from synapse.util.stringutils import parse_and_validate_server_name
 
@@ -475,6 +475,8 @@ class MatrixFederationHttpClient:
             use_proxy=True,
         )
 
+        self.remote_download_linearizer = Linearizer("remote_download_linearizer", 6)
+
     def wake_destination(self, destination: str) -> None:
         """Called when the remote server may have come back online."""
 
@@ -1486,35 +1488,44 @@ class MatrixFederationHttpClient:
         )
 
         headers = dict(response.headers.getAllRawHeaders())
-
         expected_size = response.length
-        # if we don't get an expected length then use the max length
+
         if expected_size == UNKNOWN_LENGTH:
             expected_size = max_size
-            logger.debug(
-                f"File size unknown, assuming file is max allowable size: {max_size}"
-            )
+        else:
+            if int(expected_size) > max_size:
+                msg = "Requested file is too large > %r bytes" % (max_size,)
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
 
-        read_body, _ = await download_ratelimiter.can_do_action(
-            requester=None,
-            key=ip_address,
-            n_actions=expected_size,
-        )
-        if not read_body:
-            msg = "Requested file size exceeds ratelimits"
-            logger.warning(
-                "{%s} [%s] %s",
-                request.txn_id,
-                request.destination,
-                msg,
+            read_body, _ = await download_ratelimiter.can_do_action(
+                requester=None,
+                key=ip_address,
+                n_actions=expected_size,
             )
-            raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+            if not read_body:
+                msg = "Requested file size exceeds ratelimits"
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(
+                    HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
+                )
 
         try:
-            # add a byte of headroom to max size as function errs at >=
-            d = read_body_with_max_size(response, output_stream, expected_size + 1)
-            d.addTimeout(self.default_timeout_seconds, self.reactor)
-            length = await make_deferred_yieldable(d)
+            async with self.remote_download_linearizer.queue(ip_address):
+                # add a byte of headroom to max size as function errs at >=
+                d = read_body_with_max_size(response, output_stream, expected_size + 1)
+                d.addTimeout(self.default_timeout_seconds, self.reactor)
+                length = await make_deferred_yieldable(d)
         except BodyExceededMaxSize:
             msg = "Requested file is too large > %r bytes" % (expected_size,)
             logger.warning(
@@ -1560,6 +1571,13 @@ class MatrixFederationHttpClient:
             request.method,
             request.uri.decode("ascii"),
         )
+
+        # if we didn't know the length upfront, decrement the actual size from ratelimiter
+        if response.length == UNKNOWN_LENGTH:
+            download_ratelimiter.record_action(
+                requester=None, key=ip_address, n_actions=length
+            )
+
         return length, headers
 
     async def federation_get_file(
@@ -1630,29 +1648,37 @@ class MatrixFederationHttpClient:
         )
 
         headers = dict(response.headers.getAllRawHeaders())
-
         expected_size = response.length
-        # if we don't get an expected length then use the max length
+
         if expected_size == UNKNOWN_LENGTH:
             expected_size = max_size
-            logger.debug(
-                f"File size unknown, assuming file is max allowable size: {max_size}"
-            )
+        else:
+            if int(expected_size) > max_size:
+                msg = "Requested file is too large > %r bytes" % (max_size,)
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
 
-        read_body, _ = await download_ratelimiter.can_do_action(
-            requester=None,
-            key=ip_address,
-            n_actions=expected_size,
-        )
-        if not read_body:
-            msg = "Requested file size exceeds ratelimits"
-            logger.warning(
-                "{%s} [%s] %s",
-                request.txn_id,
-                request.destination,
-                msg,
+            read_body, _ = await download_ratelimiter.can_do_action(
+                requester=None,
+                key=ip_address,
+                n_actions=expected_size,
             )
-            raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+            if not read_body:
+                msg = "Requested file size exceeds ratelimits"
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(
+                    HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
+                )
 
         # this should be a multipart/mixed response with the boundary string in the header
         try:
@@ -1672,11 +1698,12 @@ class MatrixFederationHttpClient:
             raise SynapseError(HTTPStatus.BAD_GATEWAY, msg)
 
         try:
-            # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
-            deferred = read_multipart_response(
-                response, output_stream, boundary, expected_size + 1
-            )
-            deferred.addTimeout(self.default_timeout_seconds, self.reactor)
+            async with self.remote_download_linearizer.queue(ip_address):
+                # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
+                deferred = read_multipart_response(
+                    response, output_stream, boundary, expected_size + 1
+                )
+                deferred.addTimeout(self.default_timeout_seconds, self.reactor)
         except BodyExceededMaxSize:
             msg = "Requested file is too large > %r bytes" % (expected_size,)
             logger.warning(
@@ -1743,6 +1770,13 @@ class MatrixFederationHttpClient:
             request.method,
             request.uri.decode("ascii"),
         )
+
+        # if we didn't know the length upfront, decrement the actual size from ratelimiter
+        if response.length == UNKNOWN_LENGTH:
+            download_ratelimiter.record_action(
+                requester=None, key=ip_address, n_actions=length
+            )
+
         return length, headers, multipart_response.json
 
 
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index b7e2c99455..7c91b15cef 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -54,7 +54,7 @@ from synapse.http.servlet import (
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname
 from synapse.rest.admin.experimental_features import ExperimentalFeature
-from synapse.types import JsonDict, Requester, StreamToken
+from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
 from synapse.types.rest.client import SlidingSyncBody
 from synapse.util import json_decoder
 from synapse.util.caches.lrucache import LruCache
@@ -881,7 +881,6 @@ class SlidingSyncRestServlet(RestServlet):
         )
 
         user = requester.user
-        device_id = requester.device_id
 
         timeout = parse_integer(request, "timeout", default=0)
         # Position in the stream
@@ -889,7 +888,9 @@ class SlidingSyncRestServlet(RestServlet):
 
         from_token = None
         if from_token_string is not None:
-            from_token = await StreamToken.from_string(self.store, from_token_string)
+            from_token = await SlidingSyncStreamToken.from_string(
+                self.store, from_token_string
+            )
 
         # TODO: We currently don't know whether we're going to use sticky params or
         # maybe some filters like sync v2  where they are built up once and referenced
@@ -904,11 +905,12 @@ class SlidingSyncRestServlet(RestServlet):
 
         sync_config = SlidingSyncConfig(
             user=user,
-            device_id=device_id,
+            requester=requester,
             # FIXME: Currently, we're just manually copying the fields from the
-            # `SlidingSyncBody` into the config. How can we gurantee into the future
+            # `SlidingSyncBody` into the config. How can we guarantee into the future
             # that we don't forget any? I would like something more structured like
             # `copy_attributes(from=body, to=config)`
+            conn_id=body.conn_id,
             lists=body.lists,
             room_subscriptions=body.room_subscriptions,
             extensions=body.extensions,
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 036972ac25..cd6cb2c7a9 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -26,6 +26,8 @@ import attr
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import LoggingTransaction
+from synapse.storage.databases.main.stream import _filter_results_by_stream
+from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 logger = logging.getLogger(__name__)
@@ -156,3 +158,39 @@ class StateDeltasStore(SQLBaseStore):
             "get_max_stream_id_in_current_state_deltas",
             self._get_max_stream_id_in_current_state_deltas_txn,
         )
+
+    async def get_current_state_deltas_for_room(
+        self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
+    ) -> List[StateDelta]:
+        """Get the state deltas between that have happened between two
+        tokens."""
+
+        def get_current_state_deltas_for_room_txn(
+            txn: LoggingTransaction,
+        ) -> List[StateDelta]:
+            sql = """
+                SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
+                FROM current_state_delta_stream
+                WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            txn.execute(
+                sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
+            )
+
+            return [
+                StateDelta(
+                    stream_id=row[1],
+                    room_id=room_id,
+                    event_type=row[2],
+                    state_key=row[3],
+                    event_id=row[4],
+                    prev_event_id=row[5],
+                )
+                for row in txn
+                if _filter_results_by_stream(from_token, to_token, row[0], row[1])
+            ]
+
+        return await self.db_pool.runInteraction(
+            "get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
+        )
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index b22a13ef01..23ac1842f8 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -20,6 +20,7 @@
 #
 #
 import abc
+import logging
 import re
 import string
 from enum import Enum
@@ -74,6 +75,9 @@ if TYPE_CHECKING:
     from synapse.storage.databases.main import DataStore, PurgeEventsStore
     from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
 
+
+logger = logging.getLogger(__name__)
+
 # Define a state map type from type/state_key to T (usually an event ID or
 # event)
 T = TypeVar("T")
@@ -454,6 +458,8 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
     represented by a default `stream` attribute and a map of instance name to
     stream position of any writers that are ahead of the default stream
     position.
+
+    The values in `instance_map` must be greater than the `stream` attribute.
     """
 
     stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True)
@@ -468,6 +474,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
         kw_only=True,
     )
 
+    def __attrs_post_init__(self) -> None:
+        # Enforce that all instances have a value greater than the min stream
+        # position.
+        for i, v in self.instance_map.items():
+            if v <= self.stream:
+                raise ValueError(
+                    f"'instance_map' includes a stream position before the main 'stream' attribute. Instance: {i}"
+                )
+
     @classmethod
     @abc.abstractmethod
     async def parse(cls, store: "DataStore", string: str) -> "Self":
@@ -494,6 +509,9 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
             for instance in set(self.instance_map).union(other.instance_map)
         }
 
+        # Filter out any redundant entries.
+        instance_map = {i: s for i, s in instance_map.items() if s > max_stream}
+
         return attr.evolve(
             self, stream=max_stream, instance_map=immutabledict(instance_map)
         )
@@ -539,10 +557,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
     def bound_stream_token(self, max_stream: int) -> "Self":
         """Bound the stream positions to a maximum value"""
 
+        min_pos = min(self.stream, max_stream)
         return type(self)(
-            stream=min(self.stream, max_stream),
+            stream=min_pos,
             instance_map=immutabledict(
-                {k: min(s, max_stream) for k, s in self.instance_map.items()}
+                {
+                    k: min(s, max_stream)
+                    for k, s in self.instance_map.items()
+                    if min(s, max_stream) > min_pos
+                }
             ),
         )
 
@@ -637,6 +660,8 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
                 "Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'."
             )
 
+        super().__attrs_post_init__()
+
     @classmethod
     async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken":
         try:
@@ -651,6 +676,11 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
 
                 instance_map = {}
                 for part in parts[1:]:
+                    if not part:
+                        # Handle tokens of the form `m5~`, which were created by
+                        # a bug
+                        continue
+
                     key, value = part.split(".")
                     instance_id = int(key)
                     pos = int(value)
@@ -666,7 +696,10 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
         except CancelledError:
             raise
         except Exception:
-            pass
+            # We log an exception here as even though this *might* be a client
+            # handing a bad token, its more likely that Synapse returned a bad
+            # token (and we really want to catch those!).
+            logger.exception("Failed to parse stream token: %r", string)
         raise SynapseError(400, "Invalid room stream token %r" % (string,))
 
     @classmethod
@@ -713,6 +746,8 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
         return self.instance_map.get(instance_name, self.stream)
 
     async def to_string(self, store: "DataStore") -> str:
+        """See class level docstring for information about the format."""
+
         if self.topological is not None:
             return "t%d-%d" % (self.topological, self.stream)
         elif self.instance_map:
@@ -727,8 +762,10 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
                 instance_id = await store.get_id_for_instance(name)
                 entries.append(f"{instance_id}.{pos}")
 
-            encoded_map = "~".join(entries)
-            return f"m{self.stream}~{encoded_map}"
+            if entries:
+                encoded_map = "~".join(entries)
+                return f"m{self.stream}~{encoded_map}"
+            return f"s{self.stream}"
         else:
             return "s%d" % (self.stream,)
 
@@ -756,6 +793,11 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
 
                 instance_map = {}
                 for part in parts[1:]:
+                    if not part:
+                        # Handle tokens of the form `m5~`, which were created by
+                        # a bug
+                        continue
+
                     key, value = part.split(".")
                     instance_id = int(key)
                     pos = int(value)
@@ -770,10 +812,15 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
         except CancelledError:
             raise
         except Exception:
-            pass
+            # We log an exception here as even though this *might* be a client
+            # handing a bad token, its more likely that Synapse returned a bad
+            # token (and we really want to catch those!).
+            logger.exception("Failed to parse stream token: %r", string)
         raise SynapseError(400, "Invalid stream token %r" % (string,))
 
     async def to_string(self, store: "DataStore") -> str:
+        """See class level docstring for information about the format."""
+
         if self.instance_map:
             entries = []
             for name, pos in self.instance_map.items():
@@ -786,8 +833,10 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
                 instance_id = await store.get_id_for_instance(name)
                 entries.append(f"{instance_id}.{pos}")
 
-            encoded_map = "~".join(entries)
-            return f"m{self.stream}~{encoded_map}"
+            if entries:
+                encoded_map = "~".join(entries)
+                return f"m{self.stream}~{encoded_map}"
+            return str(self.stream)
         else:
             return str(self.stream)
 
@@ -1089,6 +1138,43 @@ StreamToken.START = StreamToken(
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
+class SlidingSyncStreamToken:
+    """The same as a `StreamToken`, but includes an extra field at the start for
+    the sliding sync connection token (separated by a '/'). This is used to
+    store per-connection state.
+
+    This then looks something like:
+        5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379
+    """
+
+    stream_token: StreamToken
+    connection_token: int
+
+    @staticmethod
+    @cancellable
+    async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken":
+        """Creates a SlidingSyncStreamToken from its textual representation."""
+        try:
+            connection_token_str, stream_token_str = string.split("/", 1)
+            connection_token = int(connection_token_str)
+            stream_token = await StreamToken.from_string(store, stream_token_str)
+
+            return SlidingSyncStreamToken(
+                stream_token=stream_token,
+                connection_token=connection_token,
+            )
+        except CancelledError:
+            raise
+        except Exception:
+            raise SynapseError(400, "Invalid stream token")
+
+    async def to_string(self, store: "DataStore") -> str:
+        """Serializes the token to a string"""
+        stream_token_str = await self.stream_token.to_string(store)
+        return f"{self.connection_token}/{stream_token_str}"
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class PersistedPosition:
     """Position of a newly persisted row with instance that persisted it."""
 
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 409120470a..0c2ab13c93 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -31,7 +31,14 @@ else:
     from pydantic import Extra
 
 from synapse.events import EventBase
-from synapse.types import JsonDict, JsonMapping, StreamToken, UserID
+from synapse.types import (
+    JsonDict,
+    JsonMapping,
+    Requester,
+    SlidingSyncStreamToken,
+    StreamToken,
+    UserID,
+)
 from synapse.types.rest.client import SlidingSyncBody
 
 if TYPE_CHECKING:
@@ -102,7 +109,7 @@ class SlidingSyncConfig(SlidingSyncBody):
     """
 
     user: UserID
-    device_id: Optional[str]
+    requester: Requester
 
     # Pydantic config
     class Config:
@@ -113,6 +120,31 @@ class SlidingSyncConfig(SlidingSyncBody):
         # Allow custom types like `UserID` to be used in the model
         arbitrary_types_allowed = True
 
+    def connection_id(self) -> str:
+        """Return a string identifier for this connection. May clash with
+        connection IDs from different users.
+
+        This is generally a combination of device ID and conn_id. However, both
+        these two are optional (e.g. puppet access tokens don't have device
+        IDs), so this handles those edge cases.
+        """
+
+        # `conn_id` can be null, in which case we default to the empty string
+        # (if conn ID is empty then the client can't have multiple sync loops)
+        conn_id = self.conn_id or ""
+
+        if self.requester.device_id:
+            return f"D/{self.requester.device_id}/{conn_id}"
+
+        if self.requester.access_token_id:
+            # If we don't have a device, then the access token ID should be a
+            # stable ID.
+            return f"A/{self.requester.access_token_id}/{conn_id}"
+
+        # If we have neither then its likely an AS or some weird token. Either
+        # way we can just fail here.
+        raise Exception("Cannot use sliding sync with access token type")
+
 
 class OperationType(Enum):
     """
@@ -287,7 +319,7 @@ class SlidingSyncResult:
         def __bool__(self) -> bool:
             return bool(self.to_device)
 
-    next_pos: StreamToken
+    next_pos: SlidingSyncStreamToken
     lists: Dict[str, SlidingWindowList]
     rooms: Dict[str, RoomResult]
     extensions: Extensions
@@ -300,7 +332,7 @@ class SlidingSyncResult:
         return bool(self.lists or self.rooms or self.extensions)
 
     @staticmethod
-    def empty(next_pos: StreamToken) -> "SlidingSyncResult":
+    def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
         "Return a new empty result"
         return SlidingSyncResult(
             next_pos=next_pos,
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index dbe37bc712..5be8cf5389 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -120,6 +120,9 @@ class SlidingSyncBody(RequestBodyModel):
     Sliding Sync API request body.
 
     Attributes:
+        conn_id: An optional string to identify this connection to the server. If this
+            is missing, only 1 sliding sync connection can be made to the server at
+            any one time.
         lists: Sliding window API. A map of list key to list information
             (:class:`SlidingSyncList`). Max lists: 100. The list keys should be
             arbitrary strings which the client is using to refer to the list. Keep this
@@ -315,6 +318,8 @@ class SlidingSyncBody(RequestBodyModel):
 
         to_device: Optional[ToDeviceExtension] = None
 
+    conn_id: Optional[str]
+
     # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
     if TYPE_CHECKING:
         lists: Optional[Dict[str, SlidingSyncList]] = None
diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py
index 70912e22f8..e55001fb40 100644
--- a/tests/media/test_media_storage.py
+++ b/tests/media/test_media_storage.py
@@ -1057,13 +1057,15 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
         )
         assert channel.code == 200
 
+    @override_config({"remote_media_download_burst_count": "87M"})
     @patch(
         "synapse.http.matrixfederationclient.read_body_with_max_size",
         read_body_with_max_size_30MiB,
     )
-    def test_download_ratelimit_max_size_sub(self) -> None:
+    def test_download_ratelimit_unknown_length(self) -> None:
         """
-        Test that if no content-length is provided, the default max size is applied instead
+        Test that if no content-length is provided, ratelimit will still be applied after
+        download once length is known
         """
 
         # mock out actually sending the request
@@ -1077,19 +1079,48 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
 
         self.client._send_request = _send_request  # type: ignore
 
-        # ten requests should go through using the max size (500MB/50MB)
-        for i in range(10):
-            channel2 = self.make_request(
+        # 3 requests should go through (note 3rd one would technically violate ratelimit but
+        # is applied *after* download - the next one will be ratelimited)
+        for i in range(3):
+            channel = self.make_request(
                 "GET",
                 f"/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy{i}",
                 shorthand=False,
             )
-            assert channel2.code == 200
+            assert channel.code == 200
 
-        # eleventh will hit ratelimit
-        channel3 = self.make_request(
+        # 4th will hit ratelimit
+        channel2 = self.make_request(
             "GET",
             "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyx",
             shorthand=False,
         )
-        assert channel3.code == 429
+        assert channel2.code == 429
+
+    @override_config({"max_upload_size": "29M"})
+    @patch(
+        "synapse.http.matrixfederationclient.read_body_with_max_size",
+        read_body_with_max_size_30MiB,
+    )
+    def test_max_download_respected(self) -> None:
+        """
+        Test that the max download size is enforced - note that max download size is determined
+        by the max_upload_size
+        """
+
+        # mock out actually sending the request
+        async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
+            resp = MagicMock(spec=IResponse)
+            resp.code = 200
+            resp.length = 31457280
+            resp.headers = Headers({"Content-Type": ["application/octet-stream"]})
+            resp.phrase = b"OK"
+            return resp
+
+        self.client._send_request = _send_request  # type: ignore
+
+        channel = self.make_request(
+            "GET", "/_matrix/media/v3/download/remote.org/abcd", shorthand=False
+        )
+        assert channel.code == 502
+        assert channel.json_body["errcode"] == "M_TOO_LARGE"
diff --git a/tests/rest/client/test_media.py b/tests/rest/client/test_media.py
index 7f2caed7d5..466c5a0b70 100644
--- a/tests/rest/client/test_media.py
+++ b/tests/rest/client/test_media.py
@@ -1809,13 +1809,19 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
         )
         assert channel.code == 200
 
+    @override_config(
+        {
+            "remote_media_download_burst_count": "87M",
+        }
+    )
     @patch(
         "synapse.http.matrixfederationclient.read_multipart_response",
         read_multipart_response_30MiB,
     )
-    def test_download_ratelimit_max_size_sub(self) -> None:
+    def test_download_ratelimit_unknown_length(self) -> None:
         """
-        Test that if no content-length is provided, the default max size is applied instead
+        Test that if no content-length is provided, ratelimiting is still applied after
+        media is downloaded and length is known
         """
 
         # mock out actually sending the request
@@ -1831,8 +1837,9 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
 
         self.client._send_request = _send_request  # type: ignore
 
-        # ten requests should go through using the max size (500MB/50MB)
-        for i in range(10):
+        # first 3 will go through (note that 3rd request technically violates rate limit but
+        # that since the ratelimiting is applied *after* download it goes through, but next one fails)
+        for i in range(3):
             channel2 = self.make_request(
                 "GET",
                 f"/_matrix/client/v1/media/download/remote.org/abc{i}",
@@ -1841,7 +1848,7 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
             )
             assert channel2.code == 200
 
-        # eleventh will hit ratelimit
+        # 4th will hit ratelimit
         channel3 = self.make_request(
             "GET",
             "/_matrix/client/v1/media/download/remote.org/abcd",
@@ -1850,6 +1857,39 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
         )
         assert channel3.code == 429
 
+    @override_config({"max_upload_size": "29M"})
+    @patch(
+        "synapse.http.matrixfederationclient.read_multipart_response",
+        read_multipart_response_30MiB,
+    )
+    def test_max_download_respected(self) -> None:
+        """
+        Test that the max download size is enforced - note that max download size is determined
+        by the max_upload_size
+        """
+
+        # mock out actually sending the request, returns a 30MiB response
+        async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
+            resp = MagicMock(spec=IResponse)
+            resp.code = 200
+            resp.length = 31457280
+            resp.headers = Headers(
+                {"Content-Type": ["multipart/mixed; boundary=gc0p4Jq0M2Yt08jU534c0p"]}
+            )
+            resp.phrase = b"OK"
+            return resp
+
+        self.client._send_request = _send_request  # type: ignore
+
+        channel = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abcd",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        assert channel.code == 502
+        assert channel.json_body["errcode"] == "M_TOO_LARGE"
+
     def test_file_download(self) -> None:
         content = io.BytesIO(b"file_to_stream")
         content_uri = self.get_success(
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index f5d57e689c..b8d2a0a2d0 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -50,7 +50,14 @@ from synapse.rest.client import (
     sync,
 )
 from synapse.server import HomeServer
-from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID
+from synapse.types import (
+    JsonDict,
+    RoomStreamToken,
+    SlidingSyncStreamToken,
+    StreamKeyType,
+    StreamToken,
+    UserID,
+)
 from synapse.util import Clock
 
 from tests import unittest
@@ -1448,7 +1455,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         future_position_token_serialized = self.get_success(
-            future_position_token.to_string(self.store)
+            SlidingSyncStreamToken(future_position_token, 0).to_string(self.store)
         )
 
         # Make the Sliding Sync request
@@ -2608,7 +2615,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             room_id1, "activity before token2", tok=user2_tok
         )
 
-        from_token = self.event_sources.get_current_token()
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        from_token = channel.json_body["pos"]
 
         # Join the room after the `from_token` which will make us consider this room as
         # `newly_joined`.
@@ -2630,8 +2652,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         # Make an incremental Sliding Sync request (what we're trying to test)
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            self.sync_endpoint + f"?pos={from_token}",
             {
                 "lists": {
                     "foo-list": {
@@ -2817,7 +2838,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
         self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
 
-        from_token = self.event_sources.get_current_token()
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        from_token = channel.json_body["pos"]
 
         self.helper.send(room_id1, "activity after token5", tok=user2_tok)
         self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
@@ -2825,8 +2861,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            self.sync_endpoint + f"?pos={from_token}",
             {
                 "lists": {
                     "foo-list": {
@@ -3074,7 +3109,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
         self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
 
-        from_token = self.event_sources.get_current_token()
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        from_token = channel.json_body["pos"]
 
         self.helper.send(room_id1, "activity after token5", tok=user2_tok)
         self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
@@ -3082,8 +3132,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            self.sync_endpoint + f"?pos={from_token}",
             {
                 "lists": {
                     "foo-list": {
@@ -3239,7 +3288,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "activity before2", tok=user2_tok)
         self.helper.join(room_id1, user1_id, tok=user1_tok)
 
-        from_token = self.event_sources.get_current_token()
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        from_token = channel.json_body["pos"]
 
         event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
         event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
@@ -3255,8 +3319,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            self.sync_endpoint + f"?pos={from_token}",
             {
                 "lists": {
                     "foo-list": {
@@ -3316,15 +3379,29 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         self.helper.send(room_id1, "activity after3", tok=user2_tok)
 
-        from_token = self.event_sources.get_current_token()
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        from_token = channel.json_body["pos"]
 
         self.helper.send(room_id1, "activity after4", tok=user2_tok)
 
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            self.sync_endpoint + f"?pos={from_token}",
             {
                 "lists": {
                     "foo-list": {
@@ -3451,13 +3528,27 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id1, user1_id, tok=user1_tok)
 
-        after_room_token = self.event_sources.get_current_token()
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        after_room_token = channel.json_body["pos"]
 
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(after_room_token.to_string(self.store))}",
+            self.sync_endpoint + f"?pos={after_room_token}",
             {
                 "lists": {
                     "foo-list": {
@@ -3476,21 +3567,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
         self.assertEqual(channel.code, 200, channel.json_body)
 
-        state_map = self.get_success(
-            self.storage_controllers.state.get_current_state(room_id1)
-        )
-
-        # The returned state doesn't change from initial to incremental sync. In the
-        # future, we will only return updates but only if we've sent the room down the
+        # We only return updates but only if we've sent the room down the
         # connection before.
-        self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
-            {
-                state_map[(EventTypes.Create, "")],
-                state_map[(EventTypes.RoomHistoryVisibility, "")],
-            },
-            exact=True,
-        )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state"))
         self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_wildcard(self) -> None:
@@ -3729,7 +3808,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         user3_id = self.register_user("user3", "pass")
         user3_tok = self.login(user3_id, "pass")
 
-        from_token = self.event_sources.get_current_token()
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        from_token = channel.json_body["pos"]
 
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id1, user1_id, tok=user1_tok)
@@ -3767,8 +3861,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         # Make the Sliding Sync request with lazy loading for the room members
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            self.sync_endpoint + f"?pos={from_token}",
             {
                 "lists": {
                     "foo-list": {
diff --git a/tests/server.py b/tests/server.py
index f1cd0f76be..85602e6953 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -289,10 +289,6 @@ class FakeChannel:
         self._reactor.run()
 
         while not self.is_finished():
-            # If there's a producer, tell it to resume producing so we get content
-            if self._producer:
-                self._producer.resumeProducing()
-
             if self._reactor.seconds() > end_time:
                 raise TimedOutException("Timed out waiting for request to finish.")
 
diff --git a/tests/test_types.py b/tests/test_types.py
index 944aa784fc..00adc65a5a 100644
--- a/tests/test_types.py
+++ b/tests/test_types.py
@@ -19,9 +19,18 @@
 #
 #
 
+from typing import Type
+from unittest import skipUnless
+
+from immutabledict import immutabledict
+from parameterized import parameterized_class
+
 from synapse.api.errors import SynapseError
 from synapse.types import (
+    AbstractMultiWriterStreamToken,
+    MultiWriterStreamToken,
     RoomAlias,
+    RoomStreamToken,
     UserID,
     get_domain_from_id,
     get_localpart_from_id,
@@ -29,6 +38,7 @@ from synapse.types import (
 )
 
 from tests import unittest
+from tests.utils import USE_POSTGRES_FOR_TESTS
 
 
 class IsMineIDTests(unittest.HomeserverTestCase):
@@ -127,3 +137,64 @@ class MapUsernameTestCase(unittest.TestCase):
         # this should work with either a unicode or a bytes
         self.assertEqual(map_username_to_mxid_localpart("têst"), "t=c3=aast")
         self.assertEqual(map_username_to_mxid_localpart("têst".encode()), "t=c3=aast")
+
+
+@parameterized_class(
+    ("token_type",),
+    [
+        (MultiWriterStreamToken,),
+        (RoomStreamToken,),
+    ],
+    class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{params_dict['token_type'].__name__}",
+)
+class MultiWriterTokenTestCase(unittest.HomeserverTestCase):
+    """Tests for the different types of multi writer tokens."""
+
+    token_type: Type[AbstractMultiWriterStreamToken]
+
+    def test_basic_token(self) -> None:
+        """Test that a simple stream token can be serialized and unserialized"""
+        store = self.hs.get_datastores().main
+
+        token = self.token_type(stream=5)
+
+        string_token = self.get_success(token.to_string(store))
+
+        if isinstance(token, RoomStreamToken):
+            self.assertEqual(string_token, "s5")
+        else:
+            self.assertEqual(string_token, "5")
+
+        parsed_token = self.get_success(self.token_type.parse(store, string_token))
+        self.assertEqual(parsed_token, token)
+
+    @skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres")
+    def test_instance_map(self) -> None:
+        """Test for stream token with instance map"""
+        store = self.hs.get_datastores().main
+
+        token = self.token_type(stream=5, instance_map=immutabledict({"foo": 6}))
+
+        string_token = self.get_success(token.to_string(store))
+        self.assertEqual(string_token, "m5~1.6")
+
+        parsed_token = self.get_success(self.token_type.parse(store, string_token))
+        self.assertEqual(parsed_token, token)
+
+    def test_instance_map_assertion(self) -> None:
+        """Test that we assert values in the instance map are greater than the
+        min stream position"""
+
+        with self.assertRaises(ValueError):
+            self.token_type(stream=5, instance_map=immutabledict({"foo": 4}))
+
+        with self.assertRaises(ValueError):
+            self.token_type(stream=5, instance_map=immutabledict({"foo": 5}))
+
+    def test_parse_bad_token(self) -> None:
+        """Test that we can parse tokens produced by a bug in Synapse of the
+        form `m5~`"""
+        store = self.hs.get_datastores().main
+
+        parsed_token = self.get_success(self.token_type.parse(store, "m5~"))
+        self.assertEqual(parsed_token, self.token_type(stream=5))
diff --git a/tests/util/test_check_dependencies.py b/tests/util/test_check_dependencies.py
index fb67146c69..13a4e6ddaa 100644
--- a/tests/util/test_check_dependencies.py
+++ b/tests/util/test_check_dependencies.py
@@ -21,6 +21,7 @@
 
 from contextlib import contextmanager
 from os import PathLike
+from pathlib import Path
 from typing import Generator, Optional, Union
 from unittest.mock import patch
 
@@ -41,7 +42,7 @@ class DummyDistribution(metadata.Distribution):
     def version(self) -> str:
         return self._version
 
-    def locate_file(self, path: Union[str, PathLike]) -> PathLike:
+    def locate_file(self, path: Union[str, PathLike]) -> Path:
         raise NotImplementedError()
 
     def read_text(self, filename: str) -> None: