summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-18 12:51:37 +0100
committerErik Johnston <erik@matrix.org>2024-07-18 12:51:37 +0100
commit84d14b4aa82ba73907d4db746be230a58bf65bfc (patch)
treeb463749fda5c2fd8ba7d67c1e30d94df95afa542
parentMerge remote-tracking branch 'origin/release-v1.111' into matrix-org-hotfixes (diff)
parentAdd `m.room.create` to default bump event types (#17453) (diff)
downloadsynapse-84d14b4aa82ba73907d4db746be230a58bf65bfc.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
-rw-r--r--.github/workflows/tests.yml4
-rw-r--r--Cargo.lock8
-rw-r--r--changelog.d/17419.feature1
-rw-r--r--changelog.d/17424.misc1
-rw-r--r--changelog.d/17429.feature1
-rw-r--r--changelog.d/17432.feature1
-rw-r--r--changelog.d/17435.bugfix1
-rw-r--r--changelog.d/17438.bugfix1
-rw-r--r--changelog.d/17439.bugfix1
-rw-r--r--changelog.d/17449.bugfix1
-rw-r--r--changelog.d/17453.misc1
-rw-r--r--poetry.lock91
-rw-r--r--synapse/app/homeserver.py2
-rw-r--r--synapse/config/repository.py2
-rw-r--r--synapse/config/server.py6
-rw-r--r--synapse/handlers/sliding_sync.py754
-rw-r--r--synapse/http/matrixfederationclient.py126
-rw-r--r--synapse/rest/client/sync.py32
-rw-r--r--synapse/storage/databases/main/roommember.py57
-rw-r--r--synapse/types/__init__.py65
-rw-r--r--synapse/types/handlers/__init__.py18
-rw-r--r--synapse/types/rest/client/__init__.py4
-rw-r--r--tests/handlers/test_sliding_sync.py1045
-rw-r--r--tests/media/test_media_storage.py49
-rw-r--r--tests/rest/client/test_media.py50
-rw-r--r--tests/rest/client/test_sync.py671
-rw-r--r--tests/server.py4
-rw-r--r--tests/storage/test_roommember.py403
-rw-r--r--tests/test_types.py71
-rw-r--r--tests/unittest.py51
-rw-r--r--tests/util/test_check_dependencies.py3
31 files changed, 2887 insertions, 638 deletions
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 767495101b..730f8552d9 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -305,7 +305,7 @@ jobs:
       - lint-readme
     runs-on: ubuntu-latest
     steps:
-      - uses: matrix-org/done-action@v2
+      - uses: matrix-org/done-action@v3
         with:
           needs: ${{ toJSON(needs) }}
 
@@ -737,7 +737,7 @@ jobs:
       - linting-done
     runs-on: ubuntu-latest
     steps:
-      - uses: matrix-org/done-action@v2
+      - uses: matrix-org/done-action@v3
         with:
           needs: ${{ toJSON(needs) }}
 
diff --git a/Cargo.lock b/Cargo.lock
index 3a8bf7a49c..e9adfcbdc3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -67,9 +67,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
 
 [[package]]
 name = "bytes"
-version = "1.6.0"
+version = "1.6.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9"
+checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952"
 
 [[package]]
 name = "cfg-if"
@@ -597,9 +597,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
 
 [[package]]
 name = "ulid"
-version = "1.1.2"
+version = "1.1.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259"
+checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289"
 dependencies = [
  "getrandom",
  "rand",
diff --git a/changelog.d/17419.feature b/changelog.d/17419.feature
new file mode 100644
index 0000000000..186a27c470
--- /dev/null
+++ b/changelog.d/17419.feature
@@ -0,0 +1 @@
+Populate `heroes` and room summary fields (`joined_count`, `invited_count`) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17424.misc b/changelog.d/17424.misc
new file mode 100644
index 0000000000..d4a81c137f
--- /dev/null
+++ b/changelog.d/17424.misc
@@ -0,0 +1 @@
+Make sure we always use the right logic for enabling the media repo.
diff --git a/changelog.d/17429.feature b/changelog.d/17429.feature
new file mode 100644
index 0000000000..608b75d632
--- /dev/null
+++ b/changelog.d/17429.feature
@@ -0,0 +1 @@
+Populate `is_dm` room field in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17432.feature b/changelog.d/17432.feature
new file mode 100644
index 0000000000..c86f04c118
--- /dev/null
+++ b/changelog.d/17432.feature
@@ -0,0 +1 @@
+Add room subscriptions to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17435.bugfix b/changelog.d/17435.bugfix
new file mode 100644
index 0000000000..2d06a7c7fc
--- /dev/null
+++ b/changelog.d/17435.bugfix
@@ -0,0 +1 @@
+Order `heroes` by `stream_ordering` as the Matrix specification states (applies to `/sync`).
diff --git a/changelog.d/17438.bugfix b/changelog.d/17438.bugfix
new file mode 100644
index 0000000000..cff6eecd48
--- /dev/null
+++ b/changelog.d/17438.bugfix
@@ -0,0 +1 @@
+Fix rare bug where `/sync` would break for a user when using workers with multiple stream writers.
diff --git a/changelog.d/17439.bugfix b/changelog.d/17439.bugfix
new file mode 100644
index 0000000000..f36c3ec255
--- /dev/null
+++ b/changelog.d/17439.bugfix
@@ -0,0 +1 @@
+Limit concurrent remote downloads to 6 per IP address, and decrement remote downloads without a content-length from the ratelimiter after the download is complete.
\ No newline at end of file
diff --git a/changelog.d/17449.bugfix b/changelog.d/17449.bugfix
new file mode 100644
index 0000000000..cd847a3d1c
--- /dev/null
+++ b/changelog.d/17449.bugfix
@@ -0,0 +1 @@
+Remove unnecessary call to resume producing in fake channel.
\ No newline at end of file
diff --git a/changelog.d/17453.misc b/changelog.d/17453.misc
new file mode 100644
index 0000000000..2978a52477
--- /dev/null
+++ b/changelog.d/17453.misc
@@ -0,0 +1 @@
+Update experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint to bump room when it is created.
diff --git a/poetry.lock b/poetry.lock
index f578a33ed5..4092a41884 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -934,13 +934,13 @@ i18n = ["Babel (>=2.7)"]
 
 [[package]]
 name = "jsonschema"
-version = "4.22.0"
+version = "4.23.0"
 description = "An implementation of JSON Schema validation for Python"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "jsonschema-4.22.0-py3-none-any.whl", hash = "sha256:ff4cfd6b1367a40e7bc6411caec72effadd3db0bbe5017de188f2d6108335802"},
-    {file = "jsonschema-4.22.0.tar.gz", hash = "sha256:5b22d434a45935119af990552c862e5d6d564e8f6601206b305a61fdf661a2b7"},
+    {file = "jsonschema-4.23.0-py3-none-any.whl", hash = "sha256:fbadb6f8b144a8f8cf9f0b89ba94501d143e50411a1278633f56a7acf7fd5566"},
+    {file = "jsonschema-4.23.0.tar.gz", hash = "sha256:d71497fef26351a33265337fa77ffeb82423f3ea21283cd9467bb03999266bc4"},
 ]
 
 [package.dependencies]
@@ -953,7 +953,7 @@ rpds-py = ">=0.7.1"
 
 [package.extras]
 format = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3987", "uri-template", "webcolors (>=1.11)"]
-format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=1.11)"]
+format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=24.6.0)"]
 
 [[package]]
 name = "jsonschema-specifications"
@@ -1389,38 +1389,38 @@ files = [
 
 [[package]]
 name = "mypy"
-version = "1.9.0"
+version = "1.10.1"
 description = "Optional static typing for Python"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "mypy-1.9.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f8a67616990062232ee4c3952f41c779afac41405806042a8126fe96e098419f"},
-    {file = "mypy-1.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d357423fa57a489e8c47b7c85dfb96698caba13d66e086b412298a1a0ea3b0ed"},
-    {file = "mypy-1.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:49c87c15aed320de9b438ae7b00c1ac91cd393c1b854c2ce538e2a72d55df150"},
-    {file = "mypy-1.9.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:48533cdd345c3c2e5ef48ba3b0d3880b257b423e7995dada04248725c6f77374"},
-    {file = "mypy-1.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:4d3dbd346cfec7cb98e6cbb6e0f3c23618af826316188d587d1c1bc34f0ede03"},
-    {file = "mypy-1.9.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:653265f9a2784db65bfca694d1edd23093ce49740b2244cde583aeb134c008f3"},
-    {file = "mypy-1.9.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3a3c007ff3ee90f69cf0a15cbcdf0995749569b86b6d2f327af01fd1b8aee9dc"},
-    {file = "mypy-1.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2418488264eb41f69cc64a69a745fad4a8f86649af4b1041a4c64ee61fc61129"},
-    {file = "mypy-1.9.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:68edad3dc7d70f2f17ae4c6c1b9471a56138ca22722487eebacfd1eb5321d612"},
-    {file = "mypy-1.9.0-cp311-cp311-win_amd64.whl", hash = "sha256:85ca5fcc24f0b4aeedc1d02f93707bccc04733f21d41c88334c5482219b1ccb3"},
-    {file = "mypy-1.9.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:aceb1db093b04db5cd390821464504111b8ec3e351eb85afd1433490163d60cd"},
-    {file = "mypy-1.9.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0235391f1c6f6ce487b23b9dbd1327b4ec33bb93934aa986efe8a9563d9349e6"},
-    {file = "mypy-1.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d4d5ddc13421ba3e2e082a6c2d74c2ddb3979c39b582dacd53dd5d9431237185"},
-    {file = "mypy-1.9.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:190da1ee69b427d7efa8aa0d5e5ccd67a4fb04038c380237a0d96829cb157913"},
-    {file = "mypy-1.9.0-cp312-cp312-win_amd64.whl", hash = "sha256:fe28657de3bfec596bbeef01cb219833ad9d38dd5393fc649f4b366840baefe6"},
-    {file = "mypy-1.9.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:e54396d70be04b34f31d2edf3362c1edd023246c82f1730bbf8768c28db5361b"},
-    {file = "mypy-1.9.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:5e6061f44f2313b94f920e91b204ec600982961e07a17e0f6cd83371cb23f5c2"},
-    {file = "mypy-1.9.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:81a10926e5473c5fc3da8abb04119a1f5811a236dc3a38d92015cb1e6ba4cb9e"},
-    {file = "mypy-1.9.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b685154e22e4e9199fc95f298661deea28aaede5ae16ccc8cbb1045e716b3e04"},
-    {file = "mypy-1.9.0-cp38-cp38-win_amd64.whl", hash = "sha256:5d741d3fc7c4da608764073089e5f58ef6352bedc223ff58f2f038c2c4698a89"},
-    {file = "mypy-1.9.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:587ce887f75dd9700252a3abbc9c97bbe165a4a630597845c61279cf32dfbf02"},
-    {file = "mypy-1.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f88566144752999351725ac623471661c9d1cd8caa0134ff98cceeea181789f4"},
-    {file = "mypy-1.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:61758fabd58ce4b0720ae1e2fea5cfd4431591d6d590b197775329264f86311d"},
-    {file = "mypy-1.9.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:e49499be624dead83927e70c756970a0bc8240e9f769389cdf5714b0784ca6bf"},
-    {file = "mypy-1.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:571741dc4194b4f82d344b15e8837e8c5fcc462d66d076748142327626a1b6e9"},
-    {file = "mypy-1.9.0-py3-none-any.whl", hash = "sha256:a260627a570559181a9ea5de61ac6297aa5af202f06fd7ab093ce74e7181e43e"},
-    {file = "mypy-1.9.0.tar.gz", hash = "sha256:3cc5da0127e6a478cddd906068496a97a7618a21ce9b54bde5bf7e539c7af974"},
+    {file = "mypy-1.10.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e36f229acfe250dc660790840916eb49726c928e8ce10fbdf90715090fe4ae02"},
+    {file = "mypy-1.10.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:51a46974340baaa4145363b9e051812a2446cf583dfaeba124af966fa44593f7"},
+    {file = "mypy-1.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:901c89c2d67bba57aaaca91ccdb659aa3a312de67f23b9dfb059727cce2e2e0a"},
+    {file = "mypy-1.10.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:0cd62192a4a32b77ceb31272d9e74d23cd88c8060c34d1d3622db3267679a5d9"},
+    {file = "mypy-1.10.1-cp310-cp310-win_amd64.whl", hash = "sha256:a2cbc68cb9e943ac0814c13e2452d2046c2f2b23ff0278e26599224cf164e78d"},
+    {file = "mypy-1.10.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:bd6f629b67bb43dc0d9211ee98b96d8dabc97b1ad38b9b25f5e4c4d7569a0c6a"},
+    {file = "mypy-1.10.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a1bbb3a6f5ff319d2b9d40b4080d46cd639abe3516d5a62c070cf0114a457d84"},
+    {file = "mypy-1.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b8edd4e9bbbc9d7b79502eb9592cab808585516ae1bcc1446eb9122656c6066f"},
+    {file = "mypy-1.10.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6166a88b15f1759f94a46fa474c7b1b05d134b1b61fca627dd7335454cc9aa6b"},
+    {file = "mypy-1.10.1-cp311-cp311-win_amd64.whl", hash = "sha256:5bb9cd11c01c8606a9d0b83ffa91d0b236a0e91bc4126d9ba9ce62906ada868e"},
+    {file = "mypy-1.10.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d8681909f7b44d0b7b86e653ca152d6dff0eb5eb41694e163c6092124f8246d7"},
+    {file = "mypy-1.10.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:378c03f53f10bbdd55ca94e46ec3ba255279706a6aacaecac52ad248f98205d3"},
+    {file = "mypy-1.10.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bacf8f3a3d7d849f40ca6caea5c055122efe70e81480c8328ad29c55c69e93e"},
+    {file = "mypy-1.10.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:701b5f71413f1e9855566a34d6e9d12624e9e0a8818a5704d74d6b0402e66c04"},
+    {file = "mypy-1.10.1-cp312-cp312-win_amd64.whl", hash = "sha256:3c4c2992f6ea46ff7fce0072642cfb62af7a2484efe69017ed8b095f7b39ef31"},
+    {file = "mypy-1.10.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:604282c886497645ffb87b8f35a57ec773a4a2721161e709a4422c1636ddde5c"},
+    {file = "mypy-1.10.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:37fd87cab83f09842653f08de066ee68f1182b9b5282e4634cdb4b407266bade"},
+    {file = "mypy-1.10.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8addf6313777dbb92e9564c5d32ec122bf2c6c39d683ea64de6a1fd98b90fe37"},
+    {file = "mypy-1.10.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:5cc3ca0a244eb9a5249c7c583ad9a7e881aa5d7b73c35652296ddcdb33b2b9c7"},
+    {file = "mypy-1.10.1-cp38-cp38-win_amd64.whl", hash = "sha256:1b3a2ffce52cc4dbaeee4df762f20a2905aa171ef157b82192f2e2f368eec05d"},
+    {file = "mypy-1.10.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fe85ed6836165d52ae8b88f99527d3d1b2362e0cb90b005409b8bed90e9059b3"},
+    {file = "mypy-1.10.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c2ae450d60d7d020d67ab440c6e3fae375809988119817214440033f26ddf7bf"},
+    {file = "mypy-1.10.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6be84c06e6abd72f960ba9a71561c14137a583093ffcf9bbfaf5e613d63fa531"},
+    {file = "mypy-1.10.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2189ff1e39db399f08205e22a797383613ce1cb0cb3b13d8bcf0170e45b96cc3"},
+    {file = "mypy-1.10.1-cp39-cp39-win_amd64.whl", hash = "sha256:97a131ee36ac37ce9581f4220311247ab6cba896b4395b9c87af0675a13a755f"},
+    {file = "mypy-1.10.1-py3-none-any.whl", hash = "sha256:71d8ac0b906354ebda8ef1673e5fde785936ac1f29ff6987c7483cfbd5a4235a"},
+    {file = "mypy-1.10.1.tar.gz", hash = "sha256:1f8f492d7db9e3593ef42d4f115f04e556130f2819ad33ab84551403e97dd4c0"},
 ]
 
 [package.dependencies]
@@ -2504,19 +2504,18 @@ tests = ["coverage[toml] (>=5.0.2)", "pytest"]
 
 [[package]]
 name = "setuptools"
-version = "67.6.0"
+version = "70.0.0"
 description = "Easily download, build, install, upgrade, and uninstall Python packages"
 optional = false
-python-versions = ">=3.7"
+python-versions = ">=3.8"
 files = [
-    {file = "setuptools-67.6.0-py3-none-any.whl", hash = "sha256:b78aaa36f6b90a074c1fa651168723acbf45d14cb1196b6f02c0fd07f17623b2"},
-    {file = "setuptools-67.6.0.tar.gz", hash = "sha256:2ee892cd5f29f3373097f5a814697e397cf3ce313616df0af11231e2ad118077"},
+    {file = "setuptools-70.0.0-py3-none-any.whl", hash = "sha256:54faa7f2e8d2d11bcd2c07bed282eef1046b5c080d1c32add737d7b5817b1ad4"},
+    {file = "setuptools-70.0.0.tar.gz", hash = "sha256:f211a66637b8fa059bb28183da127d4e86396c991a942b028c6650d4319c3fd0"},
 ]
 
 [package.extras]
-docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (==0.8.3)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
-testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8 (<5)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
-testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"]
+docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
+testing = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
 
 [[package]]
 name = "setuptools-rust"
@@ -2704,19 +2703,19 @@ docs = ["sphinx (<7.0.0)"]
 
 [[package]]
 name = "twine"
-version = "5.1.0"
+version = "5.1.1"
 description = "Collection of utilities for publishing packages on PyPI"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "twine-5.1.0-py3-none-any.whl", hash = "sha256:fe1d814395bfe50cfbe27783cb74efe93abeac3f66deaeb6c8390e4e92bacb43"},
-    {file = "twine-5.1.0.tar.gz", hash = "sha256:4d74770c88c4fcaf8134d2a6a9d863e40f08255ff7d8e2acb3cbbd57d25f6e9d"},
+    {file = "twine-5.1.1-py3-none-any.whl", hash = "sha256:215dbe7b4b94c2c50a7315c0275d2258399280fbb7d04182c7e55e24b5f93997"},
+    {file = "twine-5.1.1.tar.gz", hash = "sha256:9aa0825139c02b3434d913545c7b847a21c835e11597f5255842d457da2322db"},
 ]
 
 [package.dependencies]
 importlib-metadata = ">=3.6"
 keyring = ">=15.1"
-pkginfo = ">=1.8.1"
+pkginfo = ">=1.8.1,<1.11"
 readme-renderer = ">=35.0"
 requests = ">=2.20"
 requests-toolbelt = ">=0.8.0,<0.9.0 || >0.9.0"
@@ -2851,13 +2850,13 @@ files = [
 
 [[package]]
 name = "types-jsonschema"
-version = "4.22.0.20240610"
+version = "4.23.0.20240712"
 description = "Typing stubs for jsonschema"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "types-jsonschema-4.22.0.20240610.tar.gz", hash = "sha256:f82ab9fe756e3a2642ea9712c46b403ce61eb380b939b696cff3252af42f65b0"},
-    {file = "types_jsonschema-4.22.0.20240610-py3-none-any.whl", hash = "sha256:89996b9bd1928f820a0e252b2844be21cd2e55d062b6fa1048d88453006ad89e"},
+    {file = "types-jsonschema-4.23.0.20240712.tar.gz", hash = "sha256:b20db728dcf7ea3e80e9bdeb55e8b8420c6c040cda14e8cf284465adee71d217"},
+    {file = "types_jsonschema-4.23.0.20240712-py3-none-any.whl", hash = "sha256:8c33177ce95336241c1d61ccb56a9964d4361b99d5f1cd81a1ab4909b0dd7cf4"},
 ]
 
 [package.dependencies]
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 2b111847b7..e114ab7ec4 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -217,7 +217,7 @@ class SynapseHomeServer(HomeServer):
             )
 
         if name in ["media", "federation", "client"]:
-            if self.config.server.enable_media_repo:
+            if self.config.media.can_load_media_repo:
                 media_repo = self.get_media_repository_resource()
                 resources.update(
                     {
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 1645470499..dc0e93ffa1 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -126,7 +126,7 @@ class ContentRepositoryConfig(Config):
         # Only enable the media repo if either the media repo is enabled or the
         # current worker app is the media repo.
         if (
-            self.root.server.enable_media_repo is False
+            config.get("enable_media_repo", True) is False
             and config.get("worker_app") != "synapse.app.media_repository"
         ):
             self.can_load_media_repo = False
diff --git a/synapse/config/server.py b/synapse/config/server.py
index a2b2305776..8bb97df175 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -395,12 +395,6 @@ class ServerConfig(Config):
                 self.presence_router_config,
             ) = load_module(presence_router_config, ("presence", "presence_router"))
 
-        # whether to enable the media repository endpoints. This should be set
-        # to false if the media repository is running as a separate endpoint;
-        # doing so ensures that we will not run cache cleanup jobs on the
-        # master, potentially causing inconsistency.
-        self.enable_media_repo = config.get("enable_media_repo", True)
-
         # Whether to require authentication to retrieve profile data (avatars,
         # display names) of other users through the client API.
         self.require_auth_for_profile_requests = config.get(
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 8e6c2fb860..a23a6b9dd9 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -19,7 +19,7 @@
 #
 import logging
 from itertools import chain
-from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple
 
 import attr
 from immutabledict import immutabledict
@@ -28,7 +28,9 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe
 from synapse.events import EventBase
 from synapse.events.utils import strip_event
 from synapse.handlers.relations import BundledAggregations
+from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.storage.roommember import MemberSummary
 from synapse.types import (
     JsonDict,
     PersistedEventPosition,
@@ -51,6 +53,7 @@ logger = logging.getLogger(__name__)
 
 # The event types that clients should consider as new activity.
 DEFAULT_BUMP_EVENT_TYPES = {
+    EventTypes.Create,
     EventTypes.Message,
     EventTypes.Encrypted,
     EventTypes.Sticker,
@@ -60,32 +63,79 @@ DEFAULT_BUMP_EVENT_TYPES = {
 }
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _RoomMembershipForUser:
+    """
+    Attributes:
+        room_id: The room ID of the membership event
+        event_id: The event ID of the membership event
+        event_pos: The stream position of the membership event
+        membership: The membership state of the user in the room
+        sender: The person who sent the membership event
+        newly_joined: Whether the user newly joined the room during the given token
+            range and is still joined to the room at the end of this range.
+        newly_left: Whether the user newly left (or kicked) the room during the given
+            token range and is still "leave" at the end of this range.
+        is_dm: Whether this user considers this room as a direct-message (DM) room
+    """
+
+    room_id: str
+    # Optional because state resets can affect room membership without a corresponding event.
+    event_id: Optional[str]
+    # Even during a state reset which removes the user from the room, we expect this to
+    # be set because `current_state_delta_stream` will note the position that the reset
+    # happened.
+    event_pos: PersistedEventPosition
+    # Even during a state reset which removes the user from the room, we expect this to
+    # be set to `LEAVE` because we can make that assumption based on the situaton (see
+    # `get_current_state_delta_membership_changes_for_user(...)`)
+    membership: str
+    # Optional because state resets can affect room membership without a corresponding event.
+    sender: Optional[str]
+    newly_joined: bool
+    newly_left: bool
+    is_dm: bool
+
+    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
+        return attr.evolve(self, **kwds)
+
+
 def filter_membership_for_sync(
-    *, membership: str, user_id: str, sender: Optional[str]
+    *, user_id: str, room_membership_for_user: _RoomMembershipForUser
 ) -> bool:
     """
     Returns True if the membership event should be included in the sync response,
     otherwise False.
 
     Attributes:
-        membership: The membership state of the user in the room.
         user_id: The user ID that the membership applies to
-        sender: The person who sent the membership event
+        room_membership_for_user: Membership information for the user in the room
     """
 
-    # Everything except `Membership.LEAVE` because we want everything that's *still*
-    # relevant to the user. There are few more things to include in the sync response
-    # (newly_left) but those are handled separately.
+    membership = room_membership_for_user.membership
+    sender = room_membership_for_user.sender
+    newly_left = room_membership_for_user.newly_left
+
+    # We want to allow everything except rooms the user has left unless `newly_left`
+    # because we want everything that's *still* relevant to the user. We include
+    # `newly_left` rooms because the last event that the user should see is their own
+    # leave event.
     #
-    # This logic includes kicks (leave events where the sender is not the same user) and
-    # can be read as "anything that isn't a leave or a leave with a different sender".
+    # A leave != kick. This logic includes kicks (leave events where the sender is not
+    # the same user).
     #
-    # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
-    # happened that removed the user from the room, or the user was the last person
-    # locally to leave the room which caused the server to leave the room. In both
-    # cases, we can just remove the rooms since they are no longer relevant to the user.
-    # They could still be added back later if they are `newly_left`.
-    return membership != Membership.LEAVE or sender not in (user_id, None)
+    # When `sender=None`, it means that a state reset happened that removed the user
+    # from the room without a corresponding leave event. We can just remove the rooms
+    # since they are no longer relevant to the user but will still appear if they are
+    # `newly_left`.
+    return (
+        # Anything except leave events
+        membership != Membership.LEAVE
+        # Unless...
+        or newly_left
+        # Allow kicks
+        or (membership == Membership.LEAVE and sender not in (user_id, None))
+    )
 
 
 # We can't freeze this class because we want to update it in place with the
@@ -279,29 +329,6 @@ class StateValues:
     LAZY: Final = "$LAZY"
 
 
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _RoomMembershipForUser:
-    """
-    Attributes:
-        event_id: The event ID of the membership event
-        event_pos: The stream position of the membership event
-        membership: The membership state of the user in the room
-        sender: The person who sent the membership event
-        newly_joined: Whether the user newly joined the room during the given token
-            range
-    """
-
-    room_id: str
-    event_id: Optional[str]
-    event_pos: PersistedEventPosition
-    membership: str
-    sender: Optional[str]
-    newly_joined: bool
-
-    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
-        return attr.evolve(self, **kwds)
-
-
 class SlidingSyncHandler:
     def __init__(self, hs: "HomeServer"):
         self.clock = hs.get_clock()
@@ -420,18 +447,31 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        # Get all of the room IDs that the user should be able to see in the sync
+        # response
+        has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
+        has_room_subscriptions = (
+            sync_config.room_subscriptions is not None
+            and len(sync_config.room_subscriptions) > 0
+        )
+        if has_lists or has_room_subscriptions:
+            room_membership_for_user_map = (
+                await self.get_room_membership_for_user_at_to_token(
+                    user=sync_config.user,
+                    to_token=to_token,
+                    from_token=from_token,
+                )
+            )
+
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
         # Keep track of the rooms that we're going to display and need to fetch more
         # info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
-        if sync_config.lists:
-            # Get all of the room IDs that the user should be able to see in the sync
-            # response
-            sync_room_map = await self.get_sync_room_ids_for_user(
-                sync_config.user,
-                from_token=from_token,
-                to_token=to_token,
+        if has_lists and sync_config.lists is not None:
+            sync_room_map = await self.filter_rooms_relevant_for_sync(
+                user=sync_config.user,
+                room_membership_for_user_map=room_membership_for_user_map,
             )
 
             for list_key, list_config in sync_config.lists.items():
@@ -520,7 +560,35 @@ class SlidingSyncHandler:
                     ops=ops,
                 )
 
-        # TODO: if (sync_config.room_subscriptions):
+        # Handle room subscriptions
+        if has_room_subscriptions and sync_config.room_subscriptions is not None:
+            for room_id, room_subscription in sync_config.room_subscriptions.items():
+                room_membership_for_user_at_to_token = (
+                    await self.check_room_subscription_allowed_for_user(
+                        room_id=room_id,
+                        room_membership_for_user_map=room_membership_for_user_map,
+                        to_token=to_token,
+                    )
+                )
+
+                # Skip this room if the user isn't allowed to see it
+                if not room_membership_for_user_at_to_token:
+                    continue
+
+                room_membership_for_user_map[room_id] = (
+                    room_membership_for_user_at_to_token
+                )
+
+                # Take the superset of the `RoomSyncConfig` for each room.
+                #
+                # Update our `relevant_room_map` with the room we're going to display
+                # and need to fetch more info about.
+                room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
+                existing_room_sync_config = relevant_room_map.get(room_id)
+                if existing_room_sync_config is not None:
+                    existing_room_sync_config.combine_room_sync_config(room_sync_config)
+                else:
+                    relevant_room_map[room_id] = room_sync_config
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -529,7 +597,9 @@ class SlidingSyncHandler:
                 user=sync_config.user,
                 room_id=room_id,
                 room_sync_config=room_sync_config,
-                room_membership_for_user_at_to_token=sync_room_map[room_id],
+                room_membership_for_user_at_to_token=room_membership_for_user_map[
+                    room_id
+                ],
                 from_token=from_token,
                 to_token=to_token,
             )
@@ -547,28 +617,23 @@ class SlidingSyncHandler:
             extensions=extensions,
         )
 
-    async def get_sync_room_ids_for_user(
+    async def get_room_membership_for_user_at_to_token(
         self,
         user: UserID,
         to_token: StreamToken,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[StreamToken],
     ) -> Dict[str, _RoomMembershipForUser]:
         """
-        Fetch room IDs that should be listed for this user in the sync response (the
-        full room list that will be filtered, sorted, and sliced).
+        Fetch room IDs that the user has had membership in (the full room list including
+        long-lost left rooms that will be filtered, sorted, and sliced).
 
-        We're looking for rooms where the user has the following state in the token
-        range (> `from_token` and <= `to_token`):
+        We're looking for rooms where the user has had any sort of membership in the
+        token range (> `from_token` and <= `to_token`)
 
-        - `invite`, `join`, `knock`, `ban` membership events
-        - Kicks (`leave` membership events where `sender` is different from the
-          `user_id`/`state_key`)
-        - `newly_left` (rooms that were left during the given token range)
-        - In order for bans/kicks to not show up in sync, you need to `/forget` those
-          rooms. This doesn't modify the event itself though and only adds the
-          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
-          to tell when a room was forgotten at the moment so we can't factor it into the
-          from/to range.
+        In order for bans/kicks to not show up, you need to `/forget` those rooms. This
+        doesn't modify the event itself though and only adds the `forgotten` flag to the
+        `room_memberships` table in Synapse. There isn't a way to tell when a room was
+        forgotten at the moment so we can't factor it into the token range.
 
         Args:
             user: User to fetch rooms for
@@ -576,8 +641,8 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
 
         Returns:
-            A dictionary of room IDs that should be listed in the sync response along
-            with membership information in that room at the time of `to_token`.
+            A dictionary of room IDs that the user has had membership in along with
+            membership information in that room at the time of `to_token`.
         """
         user_id = user.to_string()
 
@@ -588,9 +653,6 @@ class SlidingSyncHandler:
             # We want to fetch any kind of membership (joined and left rooms) in order
             # to get the `event_pos` of the latest room membership event for the
             # user.
-            #
-            # We will filter out the rooms that don't belong below (see
-            # `filter_membership_for_sync`)
             membership_list=Membership.LIST,
             excluded_rooms=self.rooms_to_exclude_globally,
         )
@@ -610,7 +672,10 @@ class SlidingSyncHandler:
                 event_pos=room_for_user.event_pos,
                 membership=room_for_user.membership,
                 sender=room_for_user.sender,
+                # We will update these fields below to be accurate
                 newly_joined=False,
+                newly_left=False,
+                is_dm=False,
             )
             for room_for_user in room_for_user_list
         }
@@ -635,10 +700,17 @@ class SlidingSyncHandler:
                 instance_to_max_stream_ordering_map[instance_name] = stream_ordering
 
         # Then assemble the `RoomStreamToken`
+        min_stream_pos = min(instance_to_max_stream_ordering_map.values())
         membership_snapshot_token = RoomStreamToken(
             # Minimum position in the `instance_map`
-            stream=min(instance_to_max_stream_ordering_map.values()),
-            instance_map=immutabledict(instance_to_max_stream_ordering_map),
+            stream=min_stream_pos,
+            instance_map=immutabledict(
+                {
+                    instance_name: stream_pos
+                    for instance_name, stream_pos in instance_to_max_stream_ordering_map.items()
+                    if stream_pos > min_stream_pos
+                }
+            ),
         )
 
         # Since we fetched the users room list at some point in time after the from/to
@@ -648,10 +720,9 @@ class SlidingSyncHandler:
         # - 1a) Remove rooms that the user joined after the `to_token`
         # - 1b) Add back rooms that the user left after the `to_token`
         # - 1c) Update room membership events to the point in time of the `to_token`
-        # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
-        # - 3) Figure out which rooms are `newly_joined`
-
-        # 1) -----------------------------------------------------
+        # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
+        # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
+        # - 4) Figure out which rooms are DM's
 
         # 1) Fetch membership changes that fall in the range from `to_token` up to
         # `membership_snapshot_token`
@@ -711,7 +782,10 @@ class SlidingSyncHandler:
                         event_pos=first_membership_change_after_to_token.prev_event_pos,
                         membership=first_membership_change_after_to_token.prev_membership,
                         sender=first_membership_change_after_to_token.prev_sender,
+                        # We will update these fields below to be accurate
                         newly_joined=False,
+                        newly_left=False,
+                        is_dm=False,
                     )
                 else:
                     # If we can't find the previous membership event, we shouldn't
@@ -719,22 +793,6 @@ class SlidingSyncHandler:
                     # exact membership state and shouldn't rely on the current snapshot.
                     sync_room_id_set.pop(room_id, None)
 
-        # Filter the rooms that that we have updated room membership events to the point
-        # in time of the `to_token` (from the "1)" fixups)
-        filtered_sync_room_id_set = {
-            room_id: room_membership_for_user
-            for room_id, room_membership_for_user in sync_room_id_set.items()
-            if filter_membership_for_sync(
-                membership=room_membership_for_user.membership,
-                user_id=user_id,
-                sender=room_membership_for_user.sender,
-            )
-        }
-
-        # 2) -----------------------------------------------------
-        # We fix-up newly_left rooms after the first fixup because it may have removed
-        # some left rooms that we can figure out are newly_left in the following code
-
         # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
         current_state_delta_membership_changes_in_from_to_range = []
         if from_token:
@@ -796,18 +854,40 @@ class SlidingSyncHandler:
             if last_membership_change_in_from_to_range.membership == Membership.JOIN:
                 possibly_newly_joined_room_ids.add(room_id)
 
-            # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
-            # include newly_left rooms because the last event that the user should see
-            # is their own leave event
+            # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`).
             if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
-                filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
-                    room_id=room_id,
-                    event_id=last_membership_change_in_from_to_range.event_id,
-                    event_pos=last_membership_change_in_from_to_range.event_pos,
-                    membership=last_membership_change_in_from_to_range.membership,
-                    sender=last_membership_change_in_from_to_range.sender,
-                    newly_joined=False,
-                )
+                # 2) Mark this room as `newly_left`
+
+                # If we're seeing a membership change here, we should expect to already
+                # have it in our snapshot but if a state reset happens, it wouldn't have
+                # shown up in our snapshot but appear as a change here.
+                existing_sync_entry = sync_room_id_set.get(room_id)
+                if existing_sync_entry is not None:
+                    # Normal expected case
+                    sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
+                        newly_left=True
+                    )
+                else:
+                    # State reset!
+                    logger.warn(
+                        "State reset detected for room_id %s with %s who is no longer in the room",
+                        room_id,
+                        user_id,
+                    )
+                    # Even though a state reset happened which removed the person from
+                    # the room, we still add it the list so the user knows they left the
+                    # room. Downstream code can check for a state reset by looking for
+                    # `event_id=None and membership is not None`.
+                    sync_room_id_set[room_id] = _RoomMembershipForUser(
+                        room_id=room_id,
+                        event_id=last_membership_change_in_from_to_range.event_id,
+                        event_pos=last_membership_change_in_from_to_range.event_pos,
+                        membership=last_membership_change_in_from_to_range.membership,
+                        sender=last_membership_change_in_from_to_range.sender,
+                        newly_joined=False,
+                        newly_left=True,
+                        is_dm=False,
+                    )
 
         # 3) Figure out `newly_joined`
         for room_id in possibly_newly_joined_room_ids:
@@ -818,9 +898,9 @@ class SlidingSyncHandler:
             # also some non-join in the range, we know they `newly_joined`.
             if has_non_join_in_from_to_range:
                 # We found a `newly_joined` room (we left and joined within the token range)
-                filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
-                    room_id
-                ].copy_and_replace(newly_joined=True)
+                sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+                    newly_joined=True
+                )
             else:
                 prev_event_id = first_membership_change_by_room_id_in_from_to_range[
                     room_id
@@ -832,7 +912,7 @@ class SlidingSyncHandler:
                 if prev_event_id is None:
                     # We found a `newly_joined` room (we are joining the room for the
                     # first time within the token range)
-                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    sync_room_id_set[room_id] = sync_room_id_set[
                         room_id
                     ].copy_and_replace(newly_joined=True)
                 # Last resort, we need to step back to the previous membership event
@@ -840,11 +920,150 @@ class SlidingSyncHandler:
                 elif prev_membership != Membership.JOIN:
                     # We found a `newly_joined` room (we left before the token range
                     # and joined within the token range)
-                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    sync_room_id_set[room_id] = sync_room_id_set[
                         room_id
                     ].copy_and_replace(newly_joined=True)
 
-        return filtered_sync_room_id_set
+        # 4) Figure out which rooms the user considers to be direct-message (DM) rooms
+        #
+        # We're using global account data (`m.direct`) instead of checking for
+        # `is_direct` on membership events because that property only appears for
+        # the invitee membership event (doesn't show up for the inviter).
+        #
+        # We're unable to take `to_token` into account for global account data since
+        # we only keep track of the latest account data for the user.
+        dm_map = await self.store.get_global_account_data_by_type_for_user(
+            user_id, AccountDataTypes.DIRECT
+        )
+
+        # Flatten out the map. Account data is set by the client so it needs to be
+        # scrutinized.
+        dm_room_id_set = set()
+        if isinstance(dm_map, dict):
+            for room_ids in dm_map.values():
+                # Account data should be a list of room IDs. Ignore anything else
+                if isinstance(room_ids, list):
+                    for room_id in room_ids:
+                        if isinstance(room_id, str):
+                            dm_room_id_set.add(room_id)
+
+        # 4) Fixup
+        for room_id in sync_room_id_set:
+            sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+                is_dm=room_id in dm_room_id_set
+            )
+
+        return sync_room_id_set
+
+    async def filter_rooms_relevant_for_sync(
+        self,
+        user: UserID,
+        room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+    ) -> Dict[str, _RoomMembershipForUser]:
+        """
+        Filter room IDs that should/can be listed for this user in the sync response (the
+        full room list that will be further filtered, sorted, and sliced).
+
+        We're looking for rooms where the user has the following state in the token
+        range (> `from_token` and <= `to_token`):
+
+        - `invite`, `join`, `knock`, `ban` membership events
+        - Kicks (`leave` membership events where `sender` is different from the
+          `user_id`/`state_key`)
+        - `newly_left` (rooms that were left during the given token range)
+        - In order for bans/kicks to not show up in sync, you need to `/forget` those
+          rooms. This doesn't modify the event itself though and only adds the
+          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
+          to tell when a room was forgotten at the moment so we can't factor it into the
+          from/to range.
+
+        Args:
+            user: User that is syncing
+            room_membership_for_user_map: Room membership for the user
+
+        Returns:
+            A dictionary of room IDs that should be listed in the sync response along
+            with membership information in that room at the time of `to_token`.
+        """
+        user_id = user.to_string()
+
+        # Filter rooms to only what we're interested to sync with
+        filtered_sync_room_map = {
+            room_id: room_membership_for_user
+            for room_id, room_membership_for_user in room_membership_for_user_map.items()
+            if filter_membership_for_sync(
+                user_id=user_id,
+                room_membership_for_user=room_membership_for_user,
+            )
+        }
+
+        return filtered_sync_room_map
+
+    async def check_room_subscription_allowed_for_user(
+        self,
+        room_id: str,
+        room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+        to_token: StreamToken,
+    ) -> Optional[_RoomMembershipForUser]:
+        """
+        Check whether the user is allowed to see the room based on whether they have
+        ever had membership in the room or if the room is `world_readable`.
+
+        Similar to `check_user_in_room_or_world_readable(...)`
+
+        Args:
+            room_id: Room to check
+            room_membership_for_user_map: Room membership for the user at the time of
+                the `to_token` (<= `to_token`).
+            to_token: The token to fetch rooms up to.
+
+        Returns:
+            The room membership for the user if they are allowed to subscribe to the
+            room else `None`.
+        """
+
+        # We can first check if they are already allowed to see the room based
+        # on our previous work to assemble the `room_membership_for_user_map`.
+        #
+        # If they have had any membership in the room over time (up to the `to_token`),
+        # let them subscribe and see what they can.
+        existing_membership_for_user = room_membership_for_user_map.get(room_id)
+        if existing_membership_for_user is not None:
+            return existing_membership_for_user
+
+        # TODO: Handle `world_readable` rooms
+        return None
+
+        # If the room is `world_readable`, it doesn't matter whether they can join,
+        # everyone can see the room.
+        # not_in_room_membership_for_user = _RoomMembershipForUser(
+        #     room_id=room_id,
+        #     event_id=None,
+        #     event_pos=None,
+        #     membership=None,
+        #     sender=None,
+        #     newly_joined=False,
+        #     newly_left=False,
+        #     is_dm=False,
+        # )
+        # room_state = await self.get_current_state_at(
+        #     room_id=room_id,
+        #     room_membership_for_user_at_to_token=not_in_room_membership_for_user,
+        #     state_filter=StateFilter.from_types(
+        #         [(EventTypes.RoomHistoryVisibility, "")]
+        #     ),
+        #     to_token=to_token,
+        # )
+
+        # visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, ""))
+        # if (
+        #     visibility_event is not None
+        #     and visibility_event.content.get("history_visibility")
+        #     == HistoryVisibility.WORLD_READABLE
+        # ):
+        #     return not_in_room_membership_for_user
+
+        # return None
 
     async def filter_rooms(
         self,
@@ -867,41 +1086,24 @@ class SlidingSyncHandler:
             A filtered dictionary of room IDs along with membership information in the
             room at the time of `to_token`.
         """
-        user_id = user.to_string()
-
-        # TODO: Apply filters
-
         filtered_room_id_set = set(sync_room_map.keys())
 
         # Filter for Direct-Message (DM) rooms
         if filters.is_dm is not None:
-            # We're using global account data (`m.direct`) instead of checking for
-            # `is_direct` on membership events because that property only appears for
-            # the invitee membership event (doesn't show up for the inviter). Account
-            # data is set by the client so it needs to be scrutinized.
-            #
-            # We're unable to take `to_token` into account for global account data since
-            # we only keep track of the latest account data for the user.
-            dm_map = await self.store.get_global_account_data_by_type_for_user(
-                user_id, AccountDataTypes.DIRECT
-            )
-
-            # Flatten out the map
-            dm_room_id_set = set()
-            if isinstance(dm_map, dict):
-                for room_ids in dm_map.values():
-                    # Account data should be a list of room IDs. Ignore anything else
-                    if isinstance(room_ids, list):
-                        for room_id in room_ids:
-                            if isinstance(room_id, str):
-                                dm_room_id_set.add(room_id)
-
             if filters.is_dm:
                 # Only DM rooms please
-                filtered_room_id_set = filtered_room_id_set.intersection(dm_room_id_set)
+                filtered_room_id_set = {
+                    room_id
+                    for room_id in filtered_room_id_set
+                    if sync_room_map[room_id].is_dm
+                }
             else:
                 # Only non-DM rooms please
-                filtered_room_id_set = filtered_room_id_set.difference(dm_room_id_set)
+                filtered_room_id_set = {
+                    room_id
+                    for room_id in filtered_room_id_set
+                    if not sync_room_map[room_id].is_dm
+                }
 
         if filters.spaces:
             raise NotImplementedError()
@@ -1043,6 +1245,102 @@ class SlidingSyncHandler:
             reverse=True,
         )
 
+    async def get_current_state_ids_at(
+        self,
+        room_id: str,
+        room_membership_for_user_at_to_token: _RoomMembershipForUser,
+        state_filter: StateFilter,
+        to_token: StreamToken,
+    ) -> StateMap[str]:
+        """
+        Get current state IDs for the user in the room according to their membership. This
+        will be the current state at the time of their LEAVE/BAN, otherwise will be the
+        current state <= to_token.
+
+        Args:
+            room_id: The room ID to fetch data for
+            room_membership_for_user_at_token: Membership information for the user
+                in the room at the time of `to_token`.
+            to_token: The point in the stream to sync up to.
+        """
+        room_state_ids: StateMap[str]
+        # People shouldn't see past their leave/ban event
+        if room_membership_for_user_at_to_token.membership in (
+            Membership.LEAVE,
+            Membership.BAN,
+        ):
+            # TODO: `get_state_ids_at(...)` doesn't take into account the "current state"
+            room_state_ids = await self.storage_controllers.state.get_state_ids_at(
+                room_id,
+                stream_position=to_token.copy_and_replace(
+                    StreamKeyType.ROOM,
+                    room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
+                ),
+                state_filter=state_filter,
+                # Partially-stated rooms should have all state events except for
+                # remote membership events. Since we've already excluded
+                # partially-stated rooms unless `required_state` only has
+                # `["m.room.member", "$LAZY"]` for membership, we should be able to
+                # retrieve everything requested. When we're lazy-loading, if there
+                # are some remote senders in the timeline, we should also have their
+                # membership event because we had to auth that timeline event. Plus
+                # we don't want to block the whole sync waiting for this one room.
+                await_full_state=False,
+            )
+        # Otherwise, we can get the latest current state in the room
+        else:
+            room_state_ids = await self.storage_controllers.state.get_current_state_ids(
+                room_id,
+                state_filter,
+                # Partially-stated rooms should have all state events except for
+                # remote membership events. Since we've already excluded
+                # partially-stated rooms unless `required_state` only has
+                # `["m.room.member", "$LAZY"]` for membership, we should be able to
+                # retrieve everything requested. When we're lazy-loading, if there
+                # are some remote senders in the timeline, we should also have their
+                # membership event because we had to auth that timeline event. Plus
+                # we don't want to block the whole sync waiting for this one room.
+                await_full_state=False,
+            )
+            # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
+
+        return room_state_ids
+
+    async def get_current_state_at(
+        self,
+        room_id: str,
+        room_membership_for_user_at_to_token: _RoomMembershipForUser,
+        state_filter: StateFilter,
+        to_token: StreamToken,
+    ) -> StateMap[EventBase]:
+        """
+        Get current state for the user in the room according to their membership. This
+        will be the current state at the time of their LEAVE/BAN, otherwise will be the
+        current state <= to_token.
+
+        Args:
+            room_id: The room ID to fetch data for
+            room_membership_for_user_at_token: Membership information for the user
+                in the room at the time of `to_token`.
+            to_token: The point in the stream to sync up to.
+        """
+        room_state_ids = await self.get_current_state_ids_at(
+            room_id=room_id,
+            room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+            state_filter=state_filter,
+            to_token=to_token,
+        )
+
+        event_map = await self.store.get_events(list(room_state_ids.values()))
+
+        state_map = {}
+        for key, event_id in room_state_ids.items():
+            event = event_map.get(event_id)
+            if event:
+                state_map[key] = event
+
+        return state_map
+
     async def get_room_sync_data(
         self,
         user: UserID,
@@ -1074,7 +1372,7 @@ class SlidingSyncHandler:
         # membership. Currently, we have to make all of these optional because
         # `invite`/`knock` rooms only have `stripped_state`. See
         # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
-        timeline_events: Optional[List[EventBase]] = None
+        timeline_events: List[EventBase] = []
         bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
         limited: Optional[bool] = None
         prev_batch_token: Optional[StreamToken] = None
@@ -1206,7 +1504,7 @@ class SlidingSyncHandler:
 
         # Figure out any stripped state events for invite/knocks. This allows the
         # potential joiner to identify the room.
-        stripped_state: Optional[List[JsonDict]] = None
+        stripped_state: List[JsonDict] = []
         if room_membership_for_user_at_to_token.membership in (
             Membership.INVITE,
             Membership.KNOCK,
@@ -1232,10 +1530,10 @@ class SlidingSyncHandler:
             stripped_state.append(strip_event(invite_or_knock_event))
 
         # TODO: Handle state resets. For example, if we see
-        # `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but
-        # `required_state` doesn't include it, we should indicate to the client that a
-        # state reset happened. Perhaps we should indicate this by setting `initial:
-        # True` and empty `required_state`.
+        # `room_membership_for_user_at_to_token.event_id=None and
+        # room_membership_for_user_at_to_token.membership is not None`, we should
+        # indicate to the client that a state reset happened. Perhaps we should indicate
+        # this by setting `initial: True` and empty `required_state`.
 
         # TODO: Since we can't determine whether we've already sent a room down this
         # Sliding Sync connection before (we plan to add this optimization in the
@@ -1243,6 +1541,44 @@ class SlidingSyncHandler:
         # updates.
         initial = True
 
+        # Check whether the room has a name set
+        name_state_ids = await self.get_current_state_ids_at(
+            room_id=room_id,
+            room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+            state_filter=StateFilter.from_types([(EventTypes.Name, "")]),
+            to_token=to_token,
+        )
+        name_event_id = name_state_ids.get((EventTypes.Name, ""))
+
+        room_membership_summary: Mapping[str, MemberSummary]
+        empty_membership_summary = MemberSummary([], 0)
+        if room_membership_for_user_at_to_token.membership in (
+            Membership.LEAVE,
+            Membership.BAN,
+        ):
+            # TODO: Figure out how to get the membership summary for left/banned rooms
+            room_membership_summary = {}
+        else:
+            room_membership_summary = await self.store.get_room_summary(room_id)
+            # TODO: Reverse/rewind back to the `to_token`
+
+        # `heroes` are required if the room name is not set.
+        #
+        # Note: When you're the first one on your server to be invited to a new room
+        # over federation, we only have access to some stripped state in
+        # `event.unsigned.invite_room_state` which currently doesn't include `heroes`,
+        # see https://github.com/matrix-org/matrix-spec/issues/380. This means that
+        # clients won't be able to calculate the room name when necessary and just a
+        # pitfall we have to deal with until that spec issue is resolved.
+        hero_user_ids: List[str] = []
+        # TODO: Should we also check for `EventTypes.CanonicalAlias`
+        # (`m.room.canonical_alias`) as a fallback for the room name? see
+        # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
+        if name_event_id is None:
+            hero_user_ids = extract_heroes_from_room_summary(
+                room_membership_summary, me=user.to_string()
+            )
+
         # Fetch the `required_state` for the room
         #
         # No `required_state` for invite/knock rooms (just `stripped_state`)
@@ -1253,13 +1589,11 @@ class SlidingSyncHandler:
         # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
         #
         # Calculate the `StateFilter` based on the `required_state` for the room
-        room_state: Optional[StateMap[EventBase]] = None
-        required_room_state: Optional[StateMap[EventBase]] = None
+        required_state_filter = StateFilter.none()
         if room_membership_for_user_at_to_token.membership not in (
             Membership.INVITE,
             Membership.KNOCK,
         ):
-            required_state_filter = StateFilter.none()
             # If we have a double wildcard ("*", "*") in the `required_state`, we need
             # to fetch all state for the room
             #
@@ -1325,86 +1659,65 @@ class SlidingSyncHandler:
 
                 required_state_filter = StateFilter.from_types(required_state_types)
 
-            # We need this base set of info for the response so let's just fetch it along
-            # with the `required_state` for the room
-            META_ROOM_STATE = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")]
+        # We need this base set of info for the response so let's just fetch it along
+        # with the `required_state` for the room
+        meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [
+            (EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids
+        ]
+        state_filter = StateFilter.all()
+        if required_state_filter != StateFilter.all():
             state_filter = StateFilter(
                 types=StateFilter.from_types(
-                    chain(META_ROOM_STATE, required_state_filter.to_types())
+                    chain(meta_room_state, required_state_filter.to_types())
                 ).types,
                 include_others=required_state_filter.include_others,
             )
 
-            # We can return all of the state that was requested if this was the first
-            # time we've sent the room down this connection.
-            if initial:
-                # People shouldn't see past their leave/ban event
-                if room_membership_for_user_at_to_token.membership in (
-                    Membership.LEAVE,
-                    Membership.BAN,
-                ):
-                    room_state = await self.storage_controllers.state.get_state_at(
-                        room_id,
-                        stream_position=to_token.copy_and_replace(
-                            StreamKeyType.ROOM,
-                            room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
-                        ),
-                        state_filter=state_filter,
-                        # Partially-stated rooms should have all state events except for
-                        # remote membership events. Since we've already excluded
-                        # partially-stated rooms unless `required_state` only has
-                        # `["m.room.member", "$LAZY"]` for membership, we should be able to
-                        # retrieve everything requested. When we're lazy-loading, if there
-                        # are some remote senders in the timeline, we should also have their
-                        # membership event because we had to auth that timeline event. Plus
-                        # we don't want to block the whole sync waiting for this one room.
-                        await_full_state=False,
-                    )
-                # Otherwise, we can get the latest current state in the room
-                else:
-                    room_state = await self.storage_controllers.state.get_current_state(
-                        room_id,
-                        state_filter,
-                        # Partially-stated rooms should have all state events except for
-                        # remote membership events. Since we've already excluded
-                        # partially-stated rooms unless `required_state` only has
-                        # `["m.room.member", "$LAZY"]` for membership, we should be able to
-                        # retrieve everything requested. When we're lazy-loading, if there
-                        # are some remote senders in the timeline, we should also have their
-                        # membership event because we had to auth that timeline event. Plus
-                        # we don't want to block the whole sync waiting for this one room.
-                        await_full_state=False,
-                    )
-                    # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
-            else:
-                # TODO: Once we can figure out if we've sent a room down this connection before,
-                # we can return updates instead of the full required state.
-                raise NotImplementedError()
+        # We can return all of the state that was requested if this was the first
+        # time we've sent the room down this connection.
+        room_state: StateMap[EventBase] = {}
+        if initial:
+            room_state = await self.get_current_state_at(
+                room_id=room_id,
+                room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+                state_filter=state_filter,
+                to_token=to_token,
+            )
+        else:
+            # TODO: Once we can figure out if we've sent a room down this connection before,
+            # we can return updates instead of the full required state.
+            raise NotImplementedError()
 
-            if required_state_filter != StateFilter.none():
-                required_room_state = required_state_filter.filter_state(room_state)
+        required_room_state: StateMap[EventBase] = {}
+        if required_state_filter != StateFilter.none():
+            required_room_state = required_state_filter.filter_state(room_state)
 
         # Find the room name and avatar from the state
         room_name: Optional[str] = None
+        # TODO: Should we also check for `EventTypes.CanonicalAlias`
+        # (`m.room.canonical_alias`) as a fallback for the room name? see
+        # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
+        name_event = room_state.get((EventTypes.Name, ""))
+        if name_event is not None:
+            room_name = name_event.content.get("name")
+
         room_avatar: Optional[str] = None
-        if room_state is not None:
-            name_event = room_state.get((EventTypes.Name, ""))
-            if name_event is not None:
-                room_name = name_event.content.get("name")
-
-            avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
-            if avatar_event is not None:
-                room_avatar = avatar_event.content.get("url")
-        elif stripped_state is not None:
-            for event in stripped_state:
-                if event["type"] == EventTypes.Name:
-                    room_name = event.get("content", {}).get("name")
-                elif event["type"] == EventTypes.RoomAvatar:
-                    room_avatar = event.get("content", {}).get("url")
-
-                # Found everything so we can stop looking
-                if room_name is not None and room_avatar is not None:
-                    break
+        avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
+        if avatar_event is not None:
+            room_avatar = avatar_event.content.get("url")
+
+        # Assemble heroes: extract the info from the state we just fetched
+        heroes: List[SlidingSyncResult.RoomResult.StrippedHero] = []
+        for hero_user_id in hero_user_ids:
+            member_event = room_state.get((EventTypes.Member, hero_user_id))
+            if member_event is not None:
+                heroes.append(
+                    SlidingSyncResult.RoomResult.StrippedHero(
+                        user_id=hero_user_id,
+                        display_name=member_event.content.get("displayname"),
+                        avatar_url=member_event.content.get("avatar_url"),
+                    )
+                )
 
         # Figure out the last bump event in the room
         last_bump_event_result = (
@@ -1423,14 +1736,10 @@ class SlidingSyncHandler:
         return SlidingSyncResult.RoomResult(
             name=room_name,
             avatar=room_avatar,
-            # TODO: Dummy value
-            heroes=None,
-            # TODO: Dummy value
-            is_dm=False,
+            heroes=heroes,
+            is_dm=room_membership_for_user_at_to_token.is_dm,
             initial=initial,
-            required_state=(
-                list(required_room_state.values()) if required_room_state else None
-            ),
+            required_state=list(required_room_state.values()),
             timeline_events=timeline_events,
             bundled_aggregations=bundled_aggregations,
             stripped_state=stripped_state,
@@ -1438,9 +1747,12 @@ class SlidingSyncHandler:
             limited=limited,
             num_live=num_live,
             bump_stamp=bump_stamp,
-            # TODO: Dummy values
-            joined_count=0,
-            invited_count=0,
+            joined_count=room_membership_summary.get(
+                Membership.JOIN, empty_membership_summary
+            ).count,
+            invited_count=room_membership_summary.get(
+                Membership.INVITE, empty_membership_summary
+            ).count,
             # TODO: These are just dummy values. We could potentially just remove these
             # since notifications can only really be done correctly on the client anyway
             # (encrypted rooms).
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 749b01dd0e..6fd75fd381 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -90,7 +90,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import set_tag, start_active_span, tags
 from synapse.types import JsonDict
 from synapse.util import json_decoder
-from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
+from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
 from synapse.util.metrics import Measure
 from synapse.util.stringutils import parse_and_validate_server_name
 
@@ -475,6 +475,8 @@ class MatrixFederationHttpClient:
             use_proxy=True,
         )
 
+        self.remote_download_linearizer = Linearizer("remote_download_linearizer", 6)
+
     def wake_destination(self, destination: str) -> None:
         """Called when the remote server may have come back online."""
 
@@ -1486,35 +1488,44 @@ class MatrixFederationHttpClient:
         )
 
         headers = dict(response.headers.getAllRawHeaders())
-
         expected_size = response.length
-        # if we don't get an expected length then use the max length
+
         if expected_size == UNKNOWN_LENGTH:
             expected_size = max_size
-            logger.debug(
-                f"File size unknown, assuming file is max allowable size: {max_size}"
-            )
+        else:
+            if int(expected_size) > max_size:
+                msg = "Requested file is too large > %r bytes" % (max_size,)
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
 
-        read_body, _ = await download_ratelimiter.can_do_action(
-            requester=None,
-            key=ip_address,
-            n_actions=expected_size,
-        )
-        if not read_body:
-            msg = "Requested file size exceeds ratelimits"
-            logger.warning(
-                "{%s} [%s] %s",
-                request.txn_id,
-                request.destination,
-                msg,
+            read_body, _ = await download_ratelimiter.can_do_action(
+                requester=None,
+                key=ip_address,
+                n_actions=expected_size,
             )
-            raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+            if not read_body:
+                msg = "Requested file size exceeds ratelimits"
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(
+                    HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
+                )
 
         try:
-            # add a byte of headroom to max size as function errs at >=
-            d = read_body_with_max_size(response, output_stream, expected_size + 1)
-            d.addTimeout(self.default_timeout_seconds, self.reactor)
-            length = await make_deferred_yieldable(d)
+            async with self.remote_download_linearizer.queue(ip_address):
+                # add a byte of headroom to max size as function errs at >=
+                d = read_body_with_max_size(response, output_stream, expected_size + 1)
+                d.addTimeout(self.default_timeout_seconds, self.reactor)
+                length = await make_deferred_yieldable(d)
         except BodyExceededMaxSize:
             msg = "Requested file is too large > %r bytes" % (expected_size,)
             logger.warning(
@@ -1560,6 +1571,13 @@ class MatrixFederationHttpClient:
             request.method,
             request.uri.decode("ascii"),
         )
+
+        # if we didn't know the length upfront, decrement the actual size from ratelimiter
+        if response.length == UNKNOWN_LENGTH:
+            download_ratelimiter.record_action(
+                requester=None, key=ip_address, n_actions=length
+            )
+
         return length, headers
 
     async def federation_get_file(
@@ -1630,29 +1648,37 @@ class MatrixFederationHttpClient:
         )
 
         headers = dict(response.headers.getAllRawHeaders())
-
         expected_size = response.length
-        # if we don't get an expected length then use the max length
+
         if expected_size == UNKNOWN_LENGTH:
             expected_size = max_size
-            logger.debug(
-                f"File size unknown, assuming file is max allowable size: {max_size}"
-            )
+        else:
+            if int(expected_size) > max_size:
+                msg = "Requested file is too large > %r bytes" % (max_size,)
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
 
-        read_body, _ = await download_ratelimiter.can_do_action(
-            requester=None,
-            key=ip_address,
-            n_actions=expected_size,
-        )
-        if not read_body:
-            msg = "Requested file size exceeds ratelimits"
-            logger.warning(
-                "{%s} [%s] %s",
-                request.txn_id,
-                request.destination,
-                msg,
+            read_body, _ = await download_ratelimiter.can_do_action(
+                requester=None,
+                key=ip_address,
+                n_actions=expected_size,
             )
-            raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+            if not read_body:
+                msg = "Requested file size exceeds ratelimits"
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(
+                    HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
+                )
 
         # this should be a multipart/mixed response with the boundary string in the header
         try:
@@ -1672,11 +1698,12 @@ class MatrixFederationHttpClient:
             raise SynapseError(HTTPStatus.BAD_GATEWAY, msg)
 
         try:
-            # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
-            deferred = read_multipart_response(
-                response, output_stream, boundary, expected_size + 1
-            )
-            deferred.addTimeout(self.default_timeout_seconds, self.reactor)
+            async with self.remote_download_linearizer.queue(ip_address):
+                # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
+                deferred = read_multipart_response(
+                    response, output_stream, boundary, expected_size + 1
+                )
+                deferred.addTimeout(self.default_timeout_seconds, self.reactor)
         except BodyExceededMaxSize:
             msg = "Requested file is too large > %r bytes" % (expected_size,)
             logger.warning(
@@ -1743,6 +1770,13 @@ class MatrixFederationHttpClient:
             request.method,
             request.uri.decode("ascii"),
         )
+
+        # if we didn't know the length upfront, decrement the actual size from ratelimiter
+        if response.length == UNKNOWN_LENGTH:
+            download_ratelimiter.record_action(
+                requester=None, key=ip_address, n_actions=length
+            )
+
         return length, headers, multipart_response.json
 
 
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 94d5faf9f7..1d8cbfdf00 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -997,8 +997,21 @@ class SlidingSyncRestServlet(RestServlet):
             if room_result.avatar:
                 serialized_rooms[room_id]["avatar"] = room_result.avatar
 
-            if room_result.heroes:
-                serialized_rooms[room_id]["heroes"] = room_result.heroes
+            if room_result.heroes is not None and len(room_result.heroes) > 0:
+                serialized_heroes = []
+                for hero in room_result.heroes:
+                    serialized_hero = {
+                        "user_id": hero.user_id,
+                    }
+                    if hero.display_name is not None:
+                        # Not a typo, just how "displayname" is spelled in the spec
+                        serialized_hero["displayname"] = hero.display_name
+
+                    if hero.avatar_url is not None:
+                        serialized_hero["avatar_url"] = hero.avatar_url
+
+                    serialized_heroes.append(serialized_hero)
+                serialized_rooms[room_id]["heroes"] = serialized_heroes
 
             # We should only include the `initial` key if it's `True` to save bandwidth.
             # The absense of this flag means `False`.
@@ -1006,7 +1019,10 @@ class SlidingSyncRestServlet(RestServlet):
                 serialized_rooms[room_id]["initial"] = room_result.initial
 
             # This will be omitted for invite/knock rooms with `stripped_state`
-            if room_result.required_state is not None:
+            if (
+                room_result.required_state is not None
+                and len(room_result.required_state) > 0
+            ):
                 serialized_required_state = (
                     await self.event_serializer.serialize_events(
                         room_result.required_state,
@@ -1017,7 +1033,10 @@ class SlidingSyncRestServlet(RestServlet):
                 serialized_rooms[room_id]["required_state"] = serialized_required_state
 
             # This will be omitted for invite/knock rooms with `stripped_state`
-            if room_result.timeline_events is not None:
+            if (
+                room_result.timeline_events is not None
+                and len(room_result.timeline_events) > 0
+            ):
                 serialized_timeline = await self.event_serializer.serialize_events(
                     room_result.timeline_events,
                     time_now,
@@ -1045,7 +1064,10 @@ class SlidingSyncRestServlet(RestServlet):
                 serialized_rooms[room_id]["is_dm"] = room_result.is_dm
 
             # Stripped state only applies to invite/knock rooms
-            if room_result.stripped_state is not None:
+            if (
+                room_result.stripped_state is not None
+                and len(room_result.stripped_state) > 0
+            ):
                 # TODO: `knocked_state` but that isn't specced yet.
                 #
                 # TODO: Instead of adding `knocked_state`, it would be good to rename
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 5d2fd08495..f62d9f705d 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -279,8 +279,19 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
 
     @cached(max_entries=100000)  # type: ignore[synapse-@cached-mutable]
     async def get_room_summary(self, room_id: str) -> Mapping[str, MemberSummary]:
-        """Get the details of a room roughly suitable for use by the room
+        """
+        Get the details of a room roughly suitable for use by the room
         summary extension to /sync. Useful when lazy loading room members.
+
+        Returns the total count of members in the room by membership type, and a
+        truncated list of members (the heroes). This will be the first 6 members of the
+        room:
+        - We want 5 heroes plus 1, in case one of them is the
+        calling user.
+        - They are ordered by `stream_ordering`, which are joined or
+        invited. When no joined or invited members are available, this also includes
+        banned and left users.
+
         Args:
             room_id: The room ID to query
         Returns:
@@ -308,23 +319,36 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
             for count, membership in txn:
                 res.setdefault(membership, MemberSummary([], count))
 
-            # we order by membership and then fairly arbitrarily by event_id so
-            # heroes are consistent
-            # Note, rejected events will have a null membership field, so
-            # we we manually filter them out.
+            # Order by membership (joins -> invites -> leave (former insiders) ->
+            # everything else (outsiders like bans/knocks), then by `stream_ordering` so
+            # the first members in the room show up first and to make the sort stable
+            # (consistent heroes).
+            #
+            # Note: rejected events will have a null membership field, so we we manually
+            # filter them out.
             sql = """
                 SELECT state_key, membership, event_id
                 FROM current_state_events
                 WHERE type = 'm.room.member' AND room_id = ?
                     AND membership IS NOT NULL
                 ORDER BY
-                    CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
-                    event_id ASC
+                    CASE membership WHEN ? THEN 1 WHEN ? THEN 2 WHEN ? THEN 3 ELSE 4 END ASC,
+                    event_stream_ordering ASC
                 LIMIT ?
             """
 
-            # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
-            txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
+            txn.execute(
+                sql,
+                (
+                    room_id,
+                    # Sort order
+                    Membership.JOIN,
+                    Membership.INVITE,
+                    Membership.LEAVE,
+                    # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
+                    6,
+                ),
+            )
             for user_id, membership, event_id in txn:
                 summary = res[membership]
                 # we will always have a summary for this membership type at this
@@ -1509,10 +1533,19 @@ def extract_heroes_from_room_summary(
 ) -> List[str]:
     """Determine the users that represent a room, from the perspective of the `me` user.
 
+    This function expects `MemberSummary.members` to already be sorted by
+    `stream_ordering` like the results from `get_room_summary(...)`.
+
     The rules which say which users we select are specified in the "Room Summary"
     section of
     https://spec.matrix.org/v1.4/client-server-api/#get_matrixclientv3sync
 
+
+    Args:
+        details: Mapping from membership type to member summary. We expect
+            `MemberSummary.members` to already be sorted by `stream_ordering`.
+        me: The user for whom we are determining the heroes for.
+
     Returns a list (possibly empty) of heroes' mxids.
     """
     empty_ms = MemberSummary([], 0)
@@ -1527,11 +1560,11 @@ def extract_heroes_from_room_summary(
         r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
     ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
 
-    # FIXME: order by stream ordering rather than as returned by SQL
+    # We expect `MemberSummary.members` to already be sorted by `stream_ordering`
     if joined_user_ids or invited_user_ids:
-        return sorted(joined_user_ids + invited_user_ids)[0:5]
+        return (joined_user_ids + invited_user_ids)[0:5]
     else:
-        return sorted(gone_user_ids)[0:5]
+        return gone_user_ids[0:5]
 
 
 @attr.s(slots=True, auto_attribs=True)
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index b22a13ef01..3962ecc996 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -20,6 +20,7 @@
 #
 #
 import abc
+import logging
 import re
 import string
 from enum import Enum
@@ -74,6 +75,9 @@ if TYPE_CHECKING:
     from synapse.storage.databases.main import DataStore, PurgeEventsStore
     from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
 
+
+logger = logging.getLogger(__name__)
+
 # Define a state map type from type/state_key to T (usually an event ID or
 # event)
 T = TypeVar("T")
@@ -454,6 +458,8 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
     represented by a default `stream` attribute and a map of instance name to
     stream position of any writers that are ahead of the default stream
     position.
+
+    The values in `instance_map` must be greater than the `stream` attribute.
     """
 
     stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True)
@@ -468,6 +474,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
         kw_only=True,
     )
 
+    def __attrs_post_init__(self) -> None:
+        # Enforce that all instances have a value greater than the min stream
+        # position.
+        for i, v in self.instance_map.items():
+            if v <= self.stream:
+                raise ValueError(
+                    f"'instance_map' includes a stream position before the main 'stream' attribute. Instance: {i}"
+                )
+
     @classmethod
     @abc.abstractmethod
     async def parse(cls, store: "DataStore", string: str) -> "Self":
@@ -494,6 +509,9 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
             for instance in set(self.instance_map).union(other.instance_map)
         }
 
+        # Filter out any redundant entries.
+        instance_map = {i: s for i, s in instance_map.items() if s > max_stream}
+
         return attr.evolve(
             self, stream=max_stream, instance_map=immutabledict(instance_map)
         )
@@ -539,10 +557,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
     def bound_stream_token(self, max_stream: int) -> "Self":
         """Bound the stream positions to a maximum value"""
 
+        min_pos = min(self.stream, max_stream)
         return type(self)(
-            stream=min(self.stream, max_stream),
+            stream=min_pos,
             instance_map=immutabledict(
-                {k: min(s, max_stream) for k, s in self.instance_map.items()}
+                {
+                    k: min(s, max_stream)
+                    for k, s in self.instance_map.items()
+                    if min(s, max_stream) > min_pos
+                }
             ),
         )
 
@@ -637,6 +660,8 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
                 "Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'."
             )
 
+        super().__attrs_post_init__()
+
     @classmethod
     async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken":
         try:
@@ -651,6 +676,11 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
 
                 instance_map = {}
                 for part in parts[1:]:
+                    if not part:
+                        # Handle tokens of the form `m5~`, which were created by
+                        # a bug
+                        continue
+
                     key, value = part.split(".")
                     instance_id = int(key)
                     pos = int(value)
@@ -666,7 +696,10 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
         except CancelledError:
             raise
         except Exception:
-            pass
+            # We log an exception here as even though this *might* be a client
+            # handing a bad token, its more likely that Synapse returned a bad
+            # token (and we really want to catch those!).
+            logger.exception("Failed to parse stream token: %r", string)
         raise SynapseError(400, "Invalid room stream token %r" % (string,))
 
     @classmethod
@@ -713,6 +746,8 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
         return self.instance_map.get(instance_name, self.stream)
 
     async def to_string(self, store: "DataStore") -> str:
+        """See class level docstring for information about the format."""
+
         if self.topological is not None:
             return "t%d-%d" % (self.topological, self.stream)
         elif self.instance_map:
@@ -727,8 +762,10 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
                 instance_id = await store.get_id_for_instance(name)
                 entries.append(f"{instance_id}.{pos}")
 
-            encoded_map = "~".join(entries)
-            return f"m{self.stream}~{encoded_map}"
+            if entries:
+                encoded_map = "~".join(entries)
+                return f"m{self.stream}~{encoded_map}"
+            return f"s{self.stream}"
         else:
             return "s%d" % (self.stream,)
 
@@ -756,6 +793,11 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
 
                 instance_map = {}
                 for part in parts[1:]:
+                    if not part:
+                        # Handle tokens of the form `m5~`, which were created by
+                        # a bug
+                        continue
+
                     key, value = part.split(".")
                     instance_id = int(key)
                     pos = int(value)
@@ -770,10 +812,15 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
         except CancelledError:
             raise
         except Exception:
-            pass
+            # We log an exception here as even though this *might* be a client
+            # handing a bad token, its more likely that Synapse returned a bad
+            # token (and we really want to catch those!).
+            logger.exception("Failed to parse stream token: %r", string)
         raise SynapseError(400, "Invalid stream token %r" % (string,))
 
     async def to_string(self, store: "DataStore") -> str:
+        """See class level docstring for information about the format."""
+
         if self.instance_map:
             entries = []
             for name, pos in self.instance_map.items():
@@ -786,8 +833,10 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
                 instance_id = await store.get_id_for_instance(name)
                 entries.append(f"{instance_id}.{pos}")
 
-            encoded_map = "~".join(entries)
-            return f"m{self.stream}~{encoded_map}"
+            if entries:
+                encoded_map = "~".join(entries)
+                return f"m{self.stream}~{encoded_map}"
+            return str(self.stream)
         else:
             return str(self.stream)
 
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index a8a3a8f242..409120470a 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -200,18 +200,24 @@ class SlidingSyncResult:
                 flag set. (same as sync v2)
         """
 
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class StrippedHero:
+            user_id: str
+            display_name: Optional[str]
+            avatar_url: Optional[str]
+
         name: Optional[str]
         avatar: Optional[str]
-        heroes: Optional[List[EventBase]]
+        heroes: Optional[List[StrippedHero]]
         is_dm: bool
         initial: bool
-        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
-        required_state: Optional[List[EventBase]]
-        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
-        timeline_events: Optional[List[EventBase]]
+        # Should be empty for invite/knock rooms with `stripped_state`
+        required_state: List[EventBase]
+        # Should be empty for invite/knock rooms with `stripped_state`
+        timeline_events: List[EventBase]
         bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
         # Optional because it's only relevant to invite/knock rooms
-        stripped_state: Optional[List[JsonDict]]
+        stripped_state: List[JsonDict]
         # Only optional because it won't be included for invite/knock rooms with `stripped_state`
         prev_batch: Optional[StreamToken]
         # Only optional because it won't be included for invite/knock rooms with `stripped_state`
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index 1e8fe76c99..dbe37bc712 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -200,9 +200,6 @@ class SlidingSyncBody(RequestBodyModel):
                     }
 
             timeline_limit: The maximum number of timeline events to return per response.
-            include_heroes: Return a stripped variant of membership events (containing
-                `user_id` and optionally `avatar_url` and `displayname`) for the users used
-                to calculate the room name.
             filters: Filters to apply to the list before sorting.
         """
 
@@ -270,7 +267,6 @@ class SlidingSyncBody(RequestBodyModel):
         else:
             ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None  # type: ignore[valid-type]
         slow_get_all_rooms: Optional[StrictBool] = False
-        include_heroes: Optional[StrictBool] = False
         filters: Optional[Filters] = None
 
     class RoomSubscription(CommonRoomParameters):
diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py
index eb4b0a05c7..a7aa9bb8af 100644
--- a/tests/handlers/test_sliding_sync.py
+++ b/tests/handlers/test_sliding_sync.py
@@ -19,7 +19,7 @@
 #
 import logging
 from copy import deepcopy
-from typing import Optional
+from typing import Dict, Optional
 from unittest.mock import patch
 
 from parameterized import parameterized
@@ -37,12 +37,16 @@ from synapse.api.constants import (
 from synapse.api.room_versions import RoomVersions
 from synapse.events import make_event_from_dict
 from synapse.events.snapshot import EventContext
-from synapse.handlers.sliding_sync import RoomSyncConfig, StateValues
+from synapse.handlers.sliding_sync import (
+    RoomSyncConfig,
+    StateValues,
+    _RoomMembershipForUser,
+)
 from synapse.rest import admin
 from synapse.rest.client import knock, login, room
 from synapse.server import HomeServer
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import JsonDict, UserID
+from synapse.types import JsonDict, StreamToken, UserID
 from synapse.types.handlers import SlidingSyncConfig
 from synapse.util import Clock
 
@@ -581,9 +585,9 @@ class RoomSyncConfigTestCase(TestCase):
         self._assert_room_config_equal(room_sync_config_b, expected, "A into B")
 
 
-class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
+class GetRoomMembershipForUserAtToTokenTestCase(HomeserverTestCase):
     """
-    Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it returns
+    Tests Sliding Sync handler `get_room_membership_for_user_at_to_token()` to make sure it returns
     the correct list of rooms IDs.
     """
 
@@ -616,7 +620,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         now_token = self.event_sources.get_current_token()
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=now_token,
                 to_token=now_token,
@@ -643,7 +647,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         after_room_token = self.event_sources.get_current_token()
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room_token,
                 to_token=after_room_token,
@@ -657,9 +661,11 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
             room_id_results[room_id].event_id,
             join_response["event_id"],
         )
+        self.assertEqual(room_id_results[room_id].membership, Membership.JOIN)
         # We should be considered `newly_joined` because we joined during the token
         # range
         self.assertEqual(room_id_results[room_id].newly_joined, True)
+        self.assertEqual(room_id_results[room_id].newly_left, False)
 
     def test_get_already_joined_room(self) -> None:
         """
@@ -676,7 +682,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         after_room_token = self.event_sources.get_current_token()
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_room_token,
                 to_token=after_room_token,
@@ -690,8 +696,10 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
             room_id_results[room_id].event_id,
             join_response["event_id"],
         )
+        self.assertEqual(room_id_results[room_id].membership, Membership.JOIN)
         # We should *NOT* be `newly_joined` because we joined before the token range
         self.assertEqual(room_id_results[room_id].newly_joined, False)
+        self.assertEqual(room_id_results[room_id].newly_left, False)
 
     def test_get_invited_banned_knocked_room(self) -> None:
         """
@@ -748,7 +756,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         after_room_token = self.event_sources.get_current_token()
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room_token,
                 to_token=after_room_token,
@@ -770,19 +778,25 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
             room_id_results[invited_room_id].event_id,
             invite_response["event_id"],
         )
+        self.assertEqual(room_id_results[invited_room_id].membership, Membership.INVITE)
+        self.assertEqual(room_id_results[invited_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[invited_room_id].newly_left, False)
+
         self.assertEqual(
             room_id_results[ban_room_id].event_id,
             ban_response["event_id"],
         )
+        self.assertEqual(room_id_results[ban_room_id].membership, Membership.BAN)
+        self.assertEqual(room_id_results[ban_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[ban_room_id].newly_left, False)
+
         self.assertEqual(
             room_id_results[knock_room_id].event_id,
             knock_room_membership_state_event.event_id,
         )
-        # We should *NOT* be `newly_joined` because we were not joined at the the time
-        # of the `to_token`.
-        self.assertEqual(room_id_results[invited_room_id].newly_joined, False)
-        self.assertEqual(room_id_results[ban_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[knock_room_id].membership, Membership.KNOCK)
         self.assertEqual(room_id_results[knock_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[knock_room_id].newly_left, False)
 
     def test_get_kicked_room(self) -> None:
         """
@@ -814,7 +828,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         after_kick_token = self.event_sources.get_current_token()
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_kick_token,
                 to_token=after_kick_token,
@@ -828,9 +842,12 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
             room_id_results[kick_room_id].event_id,
             kick_response["event_id"],
         )
+        self.assertEqual(room_id_results[kick_room_id].membership, Membership.LEAVE)
+        self.assertNotEqual(room_id_results[kick_room_id].sender, user1_id)
         # We should *NOT* be `newly_joined` because we were not joined at the the time
         # of the `to_token`.
         self.assertEqual(room_id_results[kick_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[kick_room_id].newly_left, False)
 
     def test_forgotten_rooms(self) -> None:
         """
@@ -904,7 +921,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         self.assertEqual(channel.code, 200, channel.result)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room_forgets,
                 to_token=before_room_forgets,
@@ -914,52 +931,58 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         # We shouldn't see the room because it was forgotten
         self.assertEqual(room_id_results.keys(), set())
 
-    def test_only_newly_left_rooms_show_up(self) -> None:
+    def test_newly_left_rooms(self) -> None:
         """
-        Test that newly_left rooms still show up in the sync response but rooms that
-        were left before the `from_token` don't show up. See condition "2)" comments in
-        the `get_sync_room_ids_for_user` method.
+        Test that newly_left are marked properly
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
 
         # Leave before we calculate the `from_token`
         room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
         # Leave during the from_token/to_token range (newly_left)
         room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        _leave_response2 = self.helper.leave(room_id2, user1_id, tok=user1_tok)
+        leave_response2 = self.helper.leave(room_id2, user1_id, tok=user1_tok)
 
         after_room2_token = self.event_sources.get_current_token()
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_room1_token,
                 to_token=after_room2_token,
             )
         )
 
-        # Only the newly_left room should show up
-        self.assertEqual(room_id_results.keys(), {room_id2})
-        # It should be pointing to the latest membership event in the from/to range but
-        # the `event_id` is `None` because we left the room causing the server to leave
-        # the room because no other local users are in it (quirk of the
-        # `current_state_delta_stream` table that we source things from)
+        self.assertEqual(room_id_results.keys(), {room_id1, room_id2})
+
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            leave_response1["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.LEAVE)
+        # We should *NOT* be `newly_joined` or `newly_left` because that happened before
+        # the from/to range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
+
         self.assertEqual(
             room_id_results[room_id2].event_id,
-            None,  # _leave_response2["event_id"],
+            leave_response2["event_id"],
         )
+        self.assertEqual(room_id_results[room_id2].membership, Membership.LEAVE)
         # We should *NOT* be `newly_joined` because we are instead `newly_left`
         self.assertEqual(room_id_results[room_id2].newly_joined, False)
+        self.assertEqual(room_id_results[room_id2].newly_left, True)
 
     def test_no_joins_after_to_token(self) -> None:
         """
         Rooms we join after the `to_token` should *not* show up. See condition "1b)"
-        comments in the `get_sync_room_ids_for_user()` method.
+        comments in the `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -978,7 +1001,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         self.helper.join(room_id2, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room1_token,
                 to_token=after_room1_token,
@@ -991,14 +1014,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
             room_id_results[room_id1].event_id,
             join_response1["event_id"],
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should be `newly_joined` because we joined during the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, True)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_join_during_range_and_left_room_after_to_token(self) -> None:
         """
         Room still shows up if we left the room but were joined during the
         from_token/to_token. See condition "1a)" comments in the
-        `get_sync_room_ids_for_user()` method.
+        `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1016,7 +1041,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room1_token,
                 to_token=after_room1_token,
@@ -1038,14 +1063,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should be `newly_joined` because we joined during the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, True)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_join_before_range_and_left_room_after_to_token(self) -> None:
         """
         Room still shows up if we left the room but were joined before the `from_token`
         so it should show up. See condition "1a)" comments in the
-        `get_sync_room_ids_for_user()` method.
+        `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1061,7 +1088,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_room1_token,
                 to_token=after_room1_token,
@@ -1082,14 +1109,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should *NOT* be `newly_joined` because we joined before the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_kicked_before_range_and_left_after_to_token(self) -> None:
         """
         Room still shows up if we left the room but were kicked before the `from_token`
         so it should show up. See condition "1a)" comments in the
-        `get_sync_room_ids_for_user()` method.
+        `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1123,7 +1152,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         leave_response = self.helper.leave(kick_room_id, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_kick_token,
                 to_token=after_kick_token,
@@ -1146,14 +1175,17 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[kick_room_id].membership, Membership.LEAVE)
+        self.assertNotEqual(room_id_results[kick_room_id].sender, user1_id)
         # We should *NOT* be `newly_joined` because we were kicked
         self.assertEqual(room_id_results[kick_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[kick_room_id].newly_left, False)
 
     def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None:
         """
         Newly left room should show up. But we're also testing that joining and leaving
         after the `to_token` doesn't mess with the results. See condition "2)" and "1a)"
-        comments in the `get_sync_room_ids_for_user()` method.
+        comments in the `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1176,7 +1208,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         leave_response2 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room1_token,
                 to_token=after_room1_token,
@@ -1199,14 +1231,17 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
-        # We should *NOT* be `newly_joined` because we left during the token range
+        self.assertEqual(room_id_results[room_id1].membership, Membership.LEAVE)
+        # We should *NOT* be `newly_joined` because we are actually `newly_left` during
+        # the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, True)
 
     def test_newly_left_during_range_and_join_after_to_token(self) -> None:
         """
         Newly left room should show up. But we're also testing that joining after the
         `to_token` doesn't mess with the results. See condition "2)" and "1b)" comments
-        in the `get_sync_room_ids_for_user()` method.
+        in the `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1228,7 +1263,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room1_token,
                 to_token=after_room1_token,
@@ -1250,16 +1285,19 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
-        # We should *NOT* be `newly_joined` because we left during the token range
+        self.assertEqual(room_id_results[room_id1].membership, Membership.LEAVE)
+        # We should *NOT* be `newly_joined` because we are actually `newly_left` during
+        # the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, True)
 
     def test_no_from_token(self) -> None:
         """
-        Test that if we don't provide a `from_token`, we get all the rooms that we we're
-        joined up to the `to_token`.
+        Test that if we don't provide a `from_token`, we get all the rooms that we had
+        membership in up to the `to_token`.
 
-        Providing `from_token` only really has the effect that it adds `newly_left`
-        rooms to the response.
+        Providing `from_token` only really has the effect that it marks rooms as
+        `newly_left` in the response.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1276,7 +1314,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # Join and leave the room2 before the `to_token`
         self.helper.join(room_id2, user1_id, tok=user1_tok)
-        self.helper.leave(room_id2, user1_id, tok=user1_tok)
+        leave_response2 = self.helper.leave(room_id2, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
@@ -1284,7 +1322,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         self.helper.join(room_id2, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=None,
                 to_token=after_room1_token,
@@ -1292,15 +1330,31 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         )
 
         # Only rooms we were joined to before the `to_token` should show up
-        self.assertEqual(room_id_results.keys(), {room_id1})
+        self.assertEqual(room_id_results.keys(), {room_id1, room_id2})
+
+        # Room1
         # It should be pointing to the latest membership event in the from/to range
         self.assertEqual(
             room_id_results[room_id1].event_id,
             join_response1["event_id"],
         )
-        # We should *NOT* be `newly_joined` because there is no `from_token` to
-        # define a "live" range to compare against
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
+        # We should *NOT* be `newly_joined`/`newly_left` because there is no
+        # `from_token` to define a "live" range to compare against
         self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
+
+        # Room2
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id2].event_id,
+            leave_response2["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id2].membership, Membership.LEAVE)
+        # We should *NOT* be `newly_joined`/`newly_left` because there is no
+        # `from_token` to define a "live" range to compare against
+        self.assertEqual(room_id_results[room_id2].newly_joined, False)
+        self.assertEqual(room_id_results[room_id2].newly_left, False)
 
     def test_from_token_ahead_of_to_token(self) -> None:
         """
@@ -1319,28 +1373,28 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
         room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
 
-        # Join room1 before `before_room_token`
-        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # Join room1 before `to_token`
+        join_room1_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
-        # Join and leave the room2 before `before_room_token`
-        self.helper.join(room_id2, user1_id, tok=user1_tok)
-        self.helper.leave(room_id2, user1_id, tok=user1_tok)
+        # Join and leave the room2 before `to_token`
+        _join_room2_response1 = self.helper.join(room_id2, user1_id, tok=user1_tok)
+        leave_room2_response1 = self.helper.leave(room_id2, user1_id, tok=user1_tok)
 
         # Note: These are purposely swapped. The `from_token` should come after
         # the `to_token` in this test
         to_token = self.event_sources.get_current_token()
 
-        # Join room2 after `before_room_token`
-        self.helper.join(room_id2, user1_id, tok=user1_tok)
+        # Join room2 after `to_token`
+        _join_room2_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok)
 
         # --------
 
-        # Join room3 after `before_room_token`
-        self.helper.join(room_id3, user1_id, tok=user1_tok)
+        # Join room3 after `to_token`
+        _join_room3_response1 = self.helper.join(room_id3, user1_id, tok=user1_tok)
 
-        # Join and leave the room4 after `before_room_token`
-        self.helper.join(room_id4, user1_id, tok=user1_tok)
-        self.helper.leave(room_id4, user1_id, tok=user1_tok)
+        # Join and leave the room4 after `to_token`
+        _join_room4_response1 = self.helper.join(room_id4, user1_id, tok=user1_tok)
+        _leave_room4_response1 = self.helper.leave(room_id4, user1_id, tok=user1_tok)
 
         # Note: These are purposely swapped. The `from_token` should come after the
         # `to_token` in this test
@@ -1350,31 +1404,59 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         self.helper.join(room_id4, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=from_token,
                 to_token=to_token,
             )
         )
 
-        # Only rooms we were joined to before the `to_token` should show up
-        #
-        # There won't be any newly_left rooms because the `from_token` is ahead of the
-        # `to_token` and that range will give no membership changes to check.
-        self.assertEqual(room_id_results.keys(), {room_id1})
+        # In the "current" state snapshot, we're joined to all of the rooms but in the
+        # from/to token range...
+        self.assertIncludes(
+            room_id_results.keys(),
+            {
+                # Included because we were joined before both tokens
+                room_id1,
+                # Included because we had membership before the to_token
+                room_id2,
+                # Excluded because we joined after the `to_token`
+                # room_id3,
+                # Excluded because we joined after the `to_token`
+                # room_id4,
+            },
+            exact=True,
+        )
+
+        # Room1
         # It should be pointing to the latest membership event in the from/to range
         self.assertEqual(
             room_id_results[room_id1].event_id,
-            join_response1["event_id"],
+            join_room1_response1["event_id"],
         )
-        # We should *NOT* be `newly_joined` because we joined `room1` before either of the tokens
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
+        # We should *NOT* be `newly_joined`/`newly_left` because we joined `room1`
+        # before either of the tokens
         self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
+
+        # Room2
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id2].event_id,
+            leave_room2_response1["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id2].membership, Membership.LEAVE)
+        # We should *NOT* be `newly_joined`/`newly_left` because we joined and left
+        # `room1` before either of the tokens
+        self.assertEqual(room_id_results[room_id2].newly_joined, False)
+        self.assertEqual(room_id_results[room_id2].newly_left, False)
 
     def test_leave_before_range_and_join_leave_after_to_token(self) -> None:
         """
-        Old left room shouldn't show up. But we're also testing that joining and leaving
-        after the `to_token` doesn't mess with the results. See condition "1a)" comments
-        in the `get_sync_room_ids_for_user()` method.
+        Test old left rooms. But we're also testing that joining and leaving after the
+        `to_token` doesn't mess with the results. See condition "1a)" comments in the
+        `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1386,7 +1468,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
         # Join and leave the room before the from/to range
         self.helper.join(room_id1, user1_id, tok=user1_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
@@ -1395,21 +1477,30 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_room1_token,
                 to_token=after_room1_token,
             )
         )
 
-        # Room shouldn't show up because it was left before the `from_token`
-        self.assertEqual(room_id_results.keys(), set())
+        self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            leave_response["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.LEAVE)
+        # We should *NOT* be `newly_joined`/`newly_left` because we joined and left
+        # `room1` before either of the tokens
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_leave_before_range_and_join_after_to_token(self) -> None:
         """
-        Old left room shouldn't show up. But we're also testing that joining after the
-        `to_token` doesn't mess with the results. See condition "1b)" comments in the
-        `get_sync_room_ids_for_user()` method.
+        Test old left room. But we're also testing that joining after the `to_token`
+        doesn't mess with the results. See condition "1b)" comments in the
+        `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1421,7 +1512,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
         # Join and leave the room before the from/to range
         self.helper.join(room_id1, user1_id, tok=user1_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
@@ -1429,24 +1520,32 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_room1_token,
                 to_token=after_room1_token,
             )
         )
 
-        # Room shouldn't show up because it was left before the `from_token`
-        self.assertEqual(room_id_results.keys(), set())
+        self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            leave_response["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.LEAVE)
+        # We should *NOT* be `newly_joined`/`newly_left` because we joined and left
+        # `room1` before either of the tokens
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_join_leave_multiple_times_during_range_and_after_to_token(
         self,
     ) -> None:
         """
         Join and leave multiple times shouldn't affect rooms from showing up. It just
-        matters that we were joined or newly_left in the from/to range. But we're also
-        testing that joining and leaving after the `to_token` doesn't mess with the
-        results.
+        matters that we had membership in the from/to range. But we're also testing that
+        joining and leaving after the `to_token` doesn't mess with the results.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1458,7 +1557,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         # We create the room with user2 so the room isn't left with no members when we
         # leave and can still re-join.
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
-        # Join, leave, join back to the room before the from/to range
+        # Join, leave, join back to the room during the from/to range
         join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
         leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
         join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
@@ -1471,7 +1570,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         leave_response3 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room1_token,
                 to_token=after_room1_token,
@@ -1496,15 +1595,19 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should be `newly_joined` because we joined during the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, True)
+        # We should *NOT* be `newly_left` because we joined during the token range and
+        # was still joined at the end of the range
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_join_leave_multiple_times_before_range_and_after_to_token(
         self,
     ) -> None:
         """
         Join and leave multiple times before the from/to range shouldn't affect rooms
-        from showing up. It just matters that we were joined or newly_left in the
+        from showing up. It just matters that we had membership in the
         from/to range. But we're also testing that joining and leaving after the
         `to_token` doesn't mess with the results.
         """
@@ -1529,7 +1632,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         leave_response3 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_room1_token,
                 to_token=after_room1_token,
@@ -1554,8 +1657,10 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should *NOT* be `newly_joined` because we joined before the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_invite_before_range_and_join_leave_after_to_token(
         self,
@@ -1563,7 +1668,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         """
         Make it look like we joined after the token range but we were invited before the
         from/to range so the room should still show up. See condition "1a)" comments in
-        the `get_sync_room_ids_for_user()` method.
+        the `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1586,7 +1691,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_room1_token,
                 to_token=after_room1_token,
@@ -1608,9 +1713,11 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.INVITE)
         # We should *NOT* be `newly_joined` because we were only invited before the
         # token range
         self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_join_and_display_name_changes_in_token_range(
         self,
@@ -1658,7 +1765,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         )
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room1_token,
                 to_token=after_room1_token,
@@ -1684,8 +1791,10 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should be `newly_joined` because we joined during the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, True)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_display_name_changes_in_token_range(
         self,
@@ -1721,7 +1830,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         after_change1_token = self.event_sources.get_current_token()
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_room1_token,
                 to_token=after_change1_token,
@@ -1744,8 +1853,10 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should *NOT* be `newly_joined` because we joined before the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_display_name_changes_before_and_after_token_range(
         self,
@@ -1791,7 +1902,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         )
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_room1_token,
                 to_token=after_room1_token,
@@ -1817,8 +1928,10 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should *NOT* be `newly_joined` because we joined before the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_display_name_changes_leave_after_token_range(
         self,
@@ -1828,7 +1941,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         if there are multiple `join` membership events in a row indicating
         `displayname`/`avatar_url` updates and we leave after the `to_token`.
 
-        See condition "1a)" comments in the `get_sync_room_ids_for_user()` method.
+        See condition "1a)" comments in the `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1871,7 +1984,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room1_token,
                 to_token=after_room1_token,
@@ -1897,8 +2010,10 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should be `newly_joined` because we joined during the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, True)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_display_name_changes_join_after_token_range(
         self,
@@ -1908,7 +2023,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         indicating `displayname`/`avatar_url` updates doesn't affect the results (we
         joined after the token range so it shouldn't show up)
 
-        See condition "1b)" comments in the `get_sync_room_ids_for_user()` method.
+        See condition "1b)" comments in the `get_room_membership_for_user_at_to_token()` method.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1937,7 +2052,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         )
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room1_token,
                 to_token=after_room1_token,
@@ -1973,7 +2088,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         after_more_changes_token = self.event_sources.get_current_token()
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=after_room1_token,
                 to_token=after_more_changes_token,
@@ -1987,9 +2102,11 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
             room_id_results[room_id1].event_id,
             join_response2["event_id"],
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should be considered `newly_joined` because there is some non-join event in
         # between our latest join event.
         self.assertEqual(room_id_results[room_id1].newly_joined, True)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_newly_joined_only_joins_during_token_range(
         self,
@@ -2036,7 +2153,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         after_room1_token = self.event_sources.get_current_token()
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room1_token,
                 to_token=after_room1_token,
@@ -2062,8 +2179,10 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 }
             ),
         )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
         # We should be `newly_joined` because we first joined during the token range
         self.assertEqual(room_id_results[room_id1].newly_joined, True)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
 
     def test_multiple_rooms_are_not_confused(
         self,
@@ -2086,16 +2205,18 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # Invited and left the room before the token
         self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        leave_room1_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
         # Invited to room2
-        self.helper.invite(room_id2, src=user2_id, targ=user1_id, tok=user2_tok)
+        invite_room2_response = self.helper.invite(
+            room_id2, src=user2_id, targ=user1_id, tok=user2_tok
+        )
 
         before_room3_token = self.event_sources.get_current_token()
 
         # Invited and left room3 during the from/to range
         room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
         self.helper.invite(room_id3, src=user2_id, targ=user1_id, tok=user2_tok)
-        self.helper.leave(room_id3, user1_id, tok=user1_tok)
+        leave_room3_response = self.helper.leave(room_id3, user1_id, tok=user1_tok)
 
         after_room3_token = self.event_sources.get_current_token()
 
@@ -2108,7 +2229,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         self.helper.leave(room_id3, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_room3_token,
                 to_token=after_room3_token,
@@ -2118,19 +2239,158 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         self.assertEqual(
             room_id_results.keys(),
             {
-                # `room_id1` shouldn't show up because we left before the from/to range
-                #
-                # Room should show up because we were invited before the from/to range
+                # Left before the from/to range
+                room_id1,
+                # Invited before the from/to range
                 room_id2,
-                # Room should show up because it was newly_left during the from/to range
+                # `newly_left` during the from/to range
                 room_id3,
             },
         )
 
+        # Room1
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            leave_room1_response["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.LEAVE)
+        # We should *NOT* be `newly_joined`/`newly_left` because we were invited and left
+        # before the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
+
+        # Room2
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id2].event_id,
+            invite_room2_response["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id2].membership, Membership.INVITE)
+        # We should *NOT* be `newly_joined`/`newly_left` because we were invited before
+        # the token range
+        self.assertEqual(room_id_results[room_id2].newly_joined, False)
+        self.assertEqual(room_id_results[room_id2].newly_left, False)
+
+        # Room3
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id3].event_id,
+            leave_room3_response["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id3].membership, Membership.LEAVE)
+        # We should be `newly_left` because we were invited and left during
+        # the token range
+        self.assertEqual(room_id_results[room_id3].newly_joined, False)
+        self.assertEqual(room_id_results[room_id3].newly_left, True)
+
+    def test_state_reset(self) -> None:
+        """
+        Test a state reset scenario where the user gets removed from the room (when
+        there is no corresponding leave event)
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # The room where the state reset will happen
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Join another room so we don't hit the short-circuit and return early if they
+        # have no room membership
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+
+        before_reset_token = self.event_sources.get_current_token()
+
+        # Send another state event to make a position for the state reset to happen at
+        dummy_state_response = self.helper.send_state(
+            room_id1,
+            event_type="foobarbaz",
+            state_key="",
+            body={"foo": "bar"},
+            tok=user2_tok,
+        )
+        dummy_state_pos = self.get_success(
+            self.store.get_position_for_event(dummy_state_response["event_id"])
+        )
+
+        # Mock a state reset removing the membership for user1 in the current state
+        self.get_success(
+            self.store.db_pool.simple_delete(
+                table="current_state_events",
+                keyvalues={
+                    "room_id": room_id1,
+                    "type": EventTypes.Member,
+                    "state_key": user1_id,
+                },
+                desc="state reset user in current_state_events",
+            )
+        )
+        self.get_success(
+            self.store.db_pool.simple_delete(
+                table="local_current_membership",
+                keyvalues={
+                    "room_id": room_id1,
+                    "user_id": user1_id,
+                },
+                desc="state reset user in local_current_membership",
+            )
+        )
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                table="current_state_delta_stream",
+                values={
+                    "stream_id": dummy_state_pos.stream,
+                    "room_id": room_id1,
+                    "type": EventTypes.Member,
+                    "state_key": user1_id,
+                    "event_id": None,
+                    "prev_event_id": join_response1["event_id"],
+                    "instance_name": dummy_state_pos.instance_name,
+                },
+                desc="state reset user in current_state_delta_stream",
+            )
+        )
+
+        # Manually bust the cache since we we're just manually messing with the database
+        # and not causing an actual state reset.
+        self.store._membership_stream_cache.entity_has_changed(
+            user1_id, dummy_state_pos.stream
+        )
+
+        after_reset_token = self.event_sources.get_current_token()
+
+        # The function under test
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
+                UserID.from_string(user1_id),
+                from_token=before_reset_token,
+                to_token=after_reset_token,
+            )
+        )
 
-class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
+        # Room1 should show up because it was `newly_left` via state reset during the from/to range
+        self.assertEqual(room_id_results.keys(), {room_id1, room_id2})
+        # It should be pointing to no event because we were removed from the room
+        # without a corresponding leave event
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            None,
+        )
+        # State reset caused us to leave the room and there is no corresponding leave event
+        self.assertEqual(room_id_results[room_id1].membership, Membership.LEAVE)
+        # We should *NOT* be `newly_joined` because we joined before the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        # We should be `newly_left` because we were removed via state reset during the from/to range
+        self.assertEqual(room_id_results[room_id1].newly_left, True)
+
+
+class GetRoomMembershipForUserAtToTokenShardTestCase(BaseMultiWorkerStreamTestCase):
     """
-    Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it works with
+    Tests Sliding Sync handler `get_room_membership_for_user_at_to_token()` to make sure it works with
     sharded event stream_writers enabled
     """
 
@@ -2189,7 +2449,7 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
 
         We then send some events to advance the stream positions of worker1 and worker3
         but worker2 is lagging behind because it's stuck. We are specifically testing
-        that `get_sync_room_ids_for_user(from_token=xxx, to_token=xxx)` should work
+        that `get_room_membership_for_user_at_to_token(from_token=xxx, to_token=xxx)` should work
         correctly in these adverse conditions.
         """
         user1_id = self.register_user("user1", "pass")
@@ -2228,7 +2488,7 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
         join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
         join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok)
         # Leave room2
-        self.helper.leave(room_id2, user1_id, tok=user1_tok)
+        leave_room2_response = self.helper.leave(room_id2, user1_id, tok=user1_tok)
         join_response3 = self.helper.join(room_id3, user1_id, tok=user1_tok)
         # Leave room3
         self.helper.leave(room_id3, user1_id, tok=user1_tok)
@@ -2265,7 +2525,7 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
         # For room_id1/worker1: leave and join the room to advance the stream position
         # and generate membership changes.
         self.helper.leave(room_id1, user1_id, tok=user1_tok)
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_room1_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
         # For room_id2/worker2: which is currently stuck, join the room.
         join_on_worker2_response = self.helper.join(room_id2, user1_id, tok=user1_tok)
         # For room_id3/worker3: leave and join the room to advance the stream position
@@ -2319,7 +2579,7 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
 
         # The function under test
         room_id_results = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
                 UserID.from_string(user1_id),
                 from_token=before_stuck_activity_token,
                 to_token=stuck_activity_token,
@@ -2330,18 +2590,411 @@ class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
             room_id_results.keys(),
             {
                 room_id1,
-                # room_id2 shouldn't show up because we left before the from/to range
-                # and the join event during the range happened while worker2 was stuck.
-                # This means that from the perspective of the master, where the
-                # `stuck_activity_token` is generated, the stream position for worker2
-                # wasn't advanced to the join yet. Looking at the `instance_map`, the
-                # join technically comes after `stuck_activity_token``.
-                #
-                # room_id2,
+                room_id2,
                 room_id3,
             },
         )
 
+        # Room1
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            join_room1_response["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id1].membership, Membership.JOIN)
+        # We should be `newly_joined` because we joined during the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, True)
+        self.assertEqual(room_id_results[room_id1].newly_left, False)
+
+        # Room2
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id2].event_id,
+            leave_room2_response["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id2].membership, Membership.LEAVE)
+        # room_id2 should *NOT* be considered `newly_left` because we left before the
+        # from/to range and the join event during the range happened while worker2 was
+        # stuck. This means that from the perspective of the master, where the
+        # `stuck_activity_token` is generated, the stream position for worker2 wasn't
+        # advanced to the join yet. Looking at the `instance_map`, the join technically
+        # comes after `stuck_activity_token`.
+        self.assertEqual(room_id_results[room_id2].newly_joined, False)
+        self.assertEqual(room_id_results[room_id2].newly_left, False)
+
+        # Room3
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id3].event_id,
+            join_on_worker3_response["event_id"],
+        )
+        self.assertEqual(room_id_results[room_id3].membership, Membership.JOIN)
+        # We should be `newly_joined` because we joined during the token range
+        self.assertEqual(room_id_results[room_id3].newly_joined, True)
+        self.assertEqual(room_id_results[room_id3].newly_left, False)
+
+
+class FilterRoomsRelevantForSyncTestCase(HomeserverTestCase):
+    """
+    Tests Sliding Sync handler `filter_rooms_relevant_for_sync()` to make sure it returns
+    the correct list of rooms IDs.
+    """
+
+    servlets = [
+        admin.register_servlets,
+        knock.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        # Enable sliding sync
+        config["experimental_features"] = {"msc3575_enabled": True}
+        return config
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
+        self.store = self.hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+        self.storage_controllers = hs.get_storage_controllers()
+
+    def _get_sync_room_ids_for_user(
+        self,
+        user: UserID,
+        to_token: StreamToken,
+        from_token: Optional[StreamToken],
+    ) -> Dict[str, _RoomMembershipForUser]:
+        """
+        Get the rooms the user should be syncing with
+        """
+        room_membership_for_user_map = self.get_success(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
+                user=user,
+                from_token=from_token,
+                to_token=to_token,
+            )
+        )
+        filtered_sync_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms_relevant_for_sync(
+                user=user,
+                room_membership_for_user_map=room_membership_for_user_map,
+            )
+        )
+
+        return filtered_sync_room_map
+
+    def test_no_rooms(self) -> None:
+        """
+        Test when the user has never joined any rooms before
+        """
+        user1_id = self.register_user("user1", "pass")
+        # user1_tok = self.login(user1_id, "pass")
+
+        now_token = self.event_sources.get_current_token()
+
+        room_id_results = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=now_token,
+            to_token=now_token,
+        )
+
+        self.assertEqual(room_id_results.keys(), set())
+
+    def test_basic_rooms(self) -> None:
+        """
+        Test that rooms that the user is joined to, invited to, banned from, and knocked
+        on show up.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room_token = self.event_sources.get_current_token()
+
+        join_room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response = self.helper.join(join_room_id, user1_id, tok=user1_tok)
+
+        # Setup the invited room (user2 invites user1 to the room)
+        invited_room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        invite_response = self.helper.invite(
+            invited_room_id, targ=user1_id, tok=user2_tok
+        )
+
+        # Setup the ban room (user2 bans user1 from the room)
+        ban_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok, is_public=True
+        )
+        self.helper.join(ban_room_id, user1_id, tok=user1_tok)
+        ban_response = self.helper.ban(
+            ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok
+        )
+
+        # Setup the knock room (user1 knocks on the room)
+        knock_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok, room_version=RoomVersions.V7.identifier
+        )
+        self.helper.send_state(
+            knock_room_id,
+            EventTypes.JoinRules,
+            {"join_rule": JoinRules.KNOCK},
+            tok=user2_tok,
+        )
+        # User1 knocks on the room
+        knock_channel = self.make_request(
+            "POST",
+            "/_matrix/client/r0/knock/%s" % (knock_room_id,),
+            b"{}",
+            user1_tok,
+        )
+        self.assertEqual(knock_channel.code, 200, knock_channel.result)
+        knock_room_membership_state_event = self.get_success(
+            self.storage_controllers.state.get_current_state_event(
+                knock_room_id, EventTypes.Member, user1_id
+            )
+        )
+        assert knock_room_membership_state_event is not None
+
+        after_room_token = self.event_sources.get_current_token()
+
+        room_id_results = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=before_room_token,
+            to_token=after_room_token,
+        )
+
+        # Ensure that the invited, ban, and knock rooms show up
+        self.assertEqual(
+            room_id_results.keys(),
+            {
+                join_room_id,
+                invited_room_id,
+                ban_room_id,
+                knock_room_id,
+            },
+        )
+        # It should be pointing to the the respective membership event (latest
+        # membership event in the from/to range)
+        self.assertEqual(
+            room_id_results[join_room_id].event_id,
+            join_response["event_id"],
+        )
+        self.assertEqual(room_id_results[join_room_id].membership, Membership.JOIN)
+        self.assertEqual(room_id_results[join_room_id].newly_joined, True)
+        self.assertEqual(room_id_results[join_room_id].newly_left, False)
+
+        self.assertEqual(
+            room_id_results[invited_room_id].event_id,
+            invite_response["event_id"],
+        )
+        self.assertEqual(room_id_results[invited_room_id].membership, Membership.INVITE)
+        self.assertEqual(room_id_results[invited_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[invited_room_id].newly_left, False)
+
+        self.assertEqual(
+            room_id_results[ban_room_id].event_id,
+            ban_response["event_id"],
+        )
+        self.assertEqual(room_id_results[ban_room_id].membership, Membership.BAN)
+        self.assertEqual(room_id_results[ban_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[ban_room_id].newly_left, False)
+
+        self.assertEqual(
+            room_id_results[knock_room_id].event_id,
+            knock_room_membership_state_event.event_id,
+        )
+        self.assertEqual(room_id_results[knock_room_id].membership, Membership.KNOCK)
+        self.assertEqual(room_id_results[knock_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[knock_room_id].newly_left, False)
+
+    def test_only_newly_left_rooms_show_up(self) -> None:
+        """
+        Test that `newly_left` rooms still show up in the sync response but rooms that
+        were left before the `from_token` don't show up. See condition "2)" comments in
+        the `get_room_membership_for_user_at_to_token()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Leave before we calculate the `from_token`
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Leave during the from_token/to_token range (newly_left)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        _leave_response2 = self.helper.leave(room_id2, user1_id, tok=user1_tok)
+
+        after_room2_token = self.event_sources.get_current_token()
+
+        room_id_results = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=after_room1_token,
+            to_token=after_room2_token,
+        )
+
+        # Only the `newly_left` room should show up
+        self.assertEqual(room_id_results.keys(), {room_id2})
+        self.assertEqual(
+            room_id_results[room_id2].event_id,
+            _leave_response2["event_id"],
+        )
+        # We should *NOT* be `newly_joined` because we are instead `newly_left`
+        self.assertEqual(room_id_results[room_id2].newly_joined, False)
+        self.assertEqual(room_id_results[room_id2].newly_left, True)
+
+    def test_get_kicked_room(self) -> None:
+        """
+        Test that a room that the user was kicked from still shows up. When the user
+        comes back to their client, they should see that they were kicked.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Setup the kick room (user2 kicks user1 from the room)
+        kick_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok, is_public=True
+        )
+        self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+        # Kick user1 from the room
+        kick_response = self.helper.change_membership(
+            room=kick_room_id,
+            src=user2_id,
+            targ=user1_id,
+            tok=user2_tok,
+            membership=Membership.LEAVE,
+            extra_data={
+                "reason": "Bad manners",
+            },
+        )
+
+        after_kick_token = self.event_sources.get_current_token()
+
+        room_id_results = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=after_kick_token,
+            to_token=after_kick_token,
+        )
+
+        # The kicked room should show up
+        self.assertEqual(room_id_results.keys(), {kick_room_id})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[kick_room_id].event_id,
+            kick_response["event_id"],
+        )
+        self.assertEqual(room_id_results[kick_room_id].membership, Membership.LEAVE)
+        self.assertNotEqual(room_id_results[kick_room_id].sender, user1_id)
+        # We should *NOT* be `newly_joined` because we were not joined at the the time
+        # of the `to_token`.
+        self.assertEqual(room_id_results[kick_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[kick_room_id].newly_left, False)
+
+    def test_state_reset(self) -> None:
+        """
+        Test a state reset scenario where the user gets removed from the room (when
+        there is no corresponding leave event)
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # The room where the state reset will happen
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Join another room so we don't hit the short-circuit and return early if they
+        # have no room membership
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+
+        before_reset_token = self.event_sources.get_current_token()
+
+        # Send another state event to make a position for the state reset to happen at
+        dummy_state_response = self.helper.send_state(
+            room_id1,
+            event_type="foobarbaz",
+            state_key="",
+            body={"foo": "bar"},
+            tok=user2_tok,
+        )
+        dummy_state_pos = self.get_success(
+            self.store.get_position_for_event(dummy_state_response["event_id"])
+        )
+
+        # Mock a state reset removing the membership for user1 in the current state
+        self.get_success(
+            self.store.db_pool.simple_delete(
+                table="current_state_events",
+                keyvalues={
+                    "room_id": room_id1,
+                    "type": EventTypes.Member,
+                    "state_key": user1_id,
+                },
+                desc="state reset user in current_state_events",
+            )
+        )
+        self.get_success(
+            self.store.db_pool.simple_delete(
+                table="local_current_membership",
+                keyvalues={
+                    "room_id": room_id1,
+                    "user_id": user1_id,
+                },
+                desc="state reset user in local_current_membership",
+            )
+        )
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                table="current_state_delta_stream",
+                values={
+                    "stream_id": dummy_state_pos.stream,
+                    "room_id": room_id1,
+                    "type": EventTypes.Member,
+                    "state_key": user1_id,
+                    "event_id": None,
+                    "prev_event_id": join_response1["event_id"],
+                    "instance_name": dummy_state_pos.instance_name,
+                },
+                desc="state reset user in current_state_delta_stream",
+            )
+        )
+
+        # Manually bust the cache since we we're just manually messing with the database
+        # and not causing an actual state reset.
+        self.store._membership_stream_cache.entity_has_changed(
+            user1_id, dummy_state_pos.stream
+        )
+
+        after_reset_token = self.event_sources.get_current_token()
+
+        # The function under test
+        room_id_results = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=before_reset_token,
+            to_token=after_reset_token,
+        )
+
+        # Room1 should show up because it was `newly_left` via state reset during the from/to range
+        self.assertEqual(room_id_results.keys(), {room_id1, room_id2})
+        # It should be pointing to no event because we were removed from the room
+        # without a corresponding leave event
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            None,
+        )
+        # State reset caused us to leave the room and there is no corresponding leave event
+        self.assertEqual(room_id_results[room_id1].membership, Membership.LEAVE)
+        # We should *NOT* be `newly_joined` because we joined before the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
+        # We should be `newly_left` because we were removed via state reset during the from/to range
+        self.assertEqual(room_id_results[room_id1].newly_left, True)
+
 
 class FilterRoomsTestCase(HomeserverTestCase):
     """
@@ -2367,6 +3020,31 @@ class FilterRoomsTestCase(HomeserverTestCase):
         self.store = self.hs.get_datastores().main
         self.event_sources = hs.get_event_sources()
 
+    def _get_sync_room_ids_for_user(
+        self,
+        user: UserID,
+        to_token: StreamToken,
+        from_token: Optional[StreamToken],
+    ) -> Dict[str, _RoomMembershipForUser]:
+        """
+        Get the rooms the user should be syncing with
+        """
+        room_membership_for_user_map = self.get_success(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
+                user=user,
+                from_token=from_token,
+                to_token=to_token,
+            )
+        )
+        filtered_sync_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms_relevant_for_sync(
+                user=user,
+                room_membership_for_user_map=room_membership_for_user_map,
+            )
+        )
+
+        return filtered_sync_room_map
+
     def _create_dm_room(
         self,
         inviter_user_id: str,
@@ -2438,12 +3116,10 @@ class FilterRoomsTestCase(HomeserverTestCase):
         after_rooms_token = self.event_sources.get_current_token()
 
         # Get the rooms the user should be syncing with
-        sync_room_map = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
-                UserID.from_string(user1_id),
-                from_token=None,
-                to_token=after_rooms_token,
-            )
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
         )
 
         # Try with `is_dm=True`
@@ -2496,12 +3172,10 @@ class FilterRoomsTestCase(HomeserverTestCase):
         after_rooms_token = self.event_sources.get_current_token()
 
         # Get the rooms the user should be syncing with
-        sync_room_map = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
-                UserID.from_string(user1_id),
-                from_token=None,
-                to_token=after_rooms_token,
-            )
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
         )
 
         # Try with `is_encrypted=True`
@@ -2552,12 +3226,10 @@ class FilterRoomsTestCase(HomeserverTestCase):
         after_rooms_token = self.event_sources.get_current_token()
 
         # Get the rooms the user should be syncing with
-        sync_room_map = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
-                UserID.from_string(user1_id),
-                from_token=None,
-                to_token=after_rooms_token,
-            )
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
         )
 
         # Try with `is_invite=True`
@@ -2621,12 +3293,10 @@ class FilterRoomsTestCase(HomeserverTestCase):
         after_rooms_token = self.event_sources.get_current_token()
 
         # Get the rooms the user should be syncing with
-        sync_room_map = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
-                UserID.from_string(user1_id),
-                from_token=None,
-                to_token=after_rooms_token,
-            )
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
         )
 
         # Try finding only normal rooms
@@ -2714,12 +3384,10 @@ class FilterRoomsTestCase(HomeserverTestCase):
         after_rooms_token = self.event_sources.get_current_token()
 
         # Get the rooms the user should be syncing with
-        sync_room_map = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
-                UserID.from_string(user1_id),
-                from_token=None,
-                to_token=after_rooms_token,
-            )
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
         )
 
         # Try finding *NOT* normal rooms
@@ -2838,12 +3506,10 @@ class FilterRoomsTestCase(HomeserverTestCase):
         after_rooms_token = self.event_sources.get_current_token()
 
         # Get the rooms the user should be syncing with
-        sync_room_map = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
-                UserID.from_string(user1_id),
-                from_token=None,
-                to_token=after_rooms_token,
-            )
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
         )
 
         filtered_room_map = self.get_success(
@@ -2884,6 +3550,31 @@ class SortRoomsTestCase(HomeserverTestCase):
         self.store = self.hs.get_datastores().main
         self.event_sources = hs.get_event_sources()
 
+    def _get_sync_room_ids_for_user(
+        self,
+        user: UserID,
+        to_token: StreamToken,
+        from_token: Optional[StreamToken],
+    ) -> Dict[str, _RoomMembershipForUser]:
+        """
+        Get the rooms the user should be syncing with
+        """
+        room_membership_for_user_map = self.get_success(
+            self.sliding_sync_handler.get_room_membership_for_user_at_to_token(
+                user=user,
+                from_token=from_token,
+                to_token=to_token,
+            )
+        )
+        filtered_sync_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms_relevant_for_sync(
+                user=user,
+                room_membership_for_user_map=room_membership_for_user_map,
+            )
+        )
+
+        return filtered_sync_room_map
+
     def test_sort_activity_basic(self) -> None:
         """
         Rooms with newer activity are sorted first.
@@ -2903,12 +3594,10 @@ class SortRoomsTestCase(HomeserverTestCase):
         after_rooms_token = self.event_sources.get_current_token()
 
         # Get the rooms the user should be syncing with
-        sync_room_map = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
-                UserID.from_string(user1_id),
-                from_token=None,
-                to_token=after_rooms_token,
-            )
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
         )
 
         # Sort the rooms (what we're testing)
@@ -2986,12 +3675,10 @@ class SortRoomsTestCase(HomeserverTestCase):
         self.helper.send(room_id3, "activity in room3", tok=user2_tok)
 
         # Get the rooms the user should be syncing with
-        sync_room_map = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
-                UserID.from_string(user1_id),
-                from_token=before_rooms_token,
-                to_token=after_rooms_token,
-            )
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=before_rooms_token,
+            to_token=after_rooms_token,
         )
 
         # Sort the rooms (what we're testing)
@@ -3052,12 +3739,10 @@ class SortRoomsTestCase(HomeserverTestCase):
         after_rooms_token = self.event_sources.get_current_token()
 
         # Get the rooms the user should be syncing with
-        sync_room_map = self.get_success(
-            self.sliding_sync_handler.get_sync_room_ids_for_user(
-                UserID.from_string(user1_id),
-                from_token=None,
-                to_token=after_rooms_token,
-            )
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
         )
 
         # Sort the rooms (what we're testing)
diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py
index 70912e22f8..e55001fb40 100644
--- a/tests/media/test_media_storage.py
+++ b/tests/media/test_media_storage.py
@@ -1057,13 +1057,15 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
         )
         assert channel.code == 200
 
+    @override_config({"remote_media_download_burst_count": "87M"})
     @patch(
         "synapse.http.matrixfederationclient.read_body_with_max_size",
         read_body_with_max_size_30MiB,
     )
-    def test_download_ratelimit_max_size_sub(self) -> None:
+    def test_download_ratelimit_unknown_length(self) -> None:
         """
-        Test that if no content-length is provided, the default max size is applied instead
+        Test that if no content-length is provided, ratelimit will still be applied after
+        download once length is known
         """
 
         # mock out actually sending the request
@@ -1077,19 +1079,48 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
 
         self.client._send_request = _send_request  # type: ignore
 
-        # ten requests should go through using the max size (500MB/50MB)
-        for i in range(10):
-            channel2 = self.make_request(
+        # 3 requests should go through (note 3rd one would technically violate ratelimit but
+        # is applied *after* download - the next one will be ratelimited)
+        for i in range(3):
+            channel = self.make_request(
                 "GET",
                 f"/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy{i}",
                 shorthand=False,
             )
-            assert channel2.code == 200
+            assert channel.code == 200
 
-        # eleventh will hit ratelimit
-        channel3 = self.make_request(
+        # 4th will hit ratelimit
+        channel2 = self.make_request(
             "GET",
             "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyx",
             shorthand=False,
         )
-        assert channel3.code == 429
+        assert channel2.code == 429
+
+    @override_config({"max_upload_size": "29M"})
+    @patch(
+        "synapse.http.matrixfederationclient.read_body_with_max_size",
+        read_body_with_max_size_30MiB,
+    )
+    def test_max_download_respected(self) -> None:
+        """
+        Test that the max download size is enforced - note that max download size is determined
+        by the max_upload_size
+        """
+
+        # mock out actually sending the request
+        async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
+            resp = MagicMock(spec=IResponse)
+            resp.code = 200
+            resp.length = 31457280
+            resp.headers = Headers({"Content-Type": ["application/octet-stream"]})
+            resp.phrase = b"OK"
+            return resp
+
+        self.client._send_request = _send_request  # type: ignore
+
+        channel = self.make_request(
+            "GET", "/_matrix/media/v3/download/remote.org/abcd", shorthand=False
+        )
+        assert channel.code == 502
+        assert channel.json_body["errcode"] == "M_TOO_LARGE"
diff --git a/tests/rest/client/test_media.py b/tests/rest/client/test_media.py
index 7f2caed7d5..466c5a0b70 100644
--- a/tests/rest/client/test_media.py
+++ b/tests/rest/client/test_media.py
@@ -1809,13 +1809,19 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
         )
         assert channel.code == 200
 
+    @override_config(
+        {
+            "remote_media_download_burst_count": "87M",
+        }
+    )
     @patch(
         "synapse.http.matrixfederationclient.read_multipart_response",
         read_multipart_response_30MiB,
     )
-    def test_download_ratelimit_max_size_sub(self) -> None:
+    def test_download_ratelimit_unknown_length(self) -> None:
         """
-        Test that if no content-length is provided, the default max size is applied instead
+        Test that if no content-length is provided, ratelimiting is still applied after
+        media is downloaded and length is known
         """
 
         # mock out actually sending the request
@@ -1831,8 +1837,9 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
 
         self.client._send_request = _send_request  # type: ignore
 
-        # ten requests should go through using the max size (500MB/50MB)
-        for i in range(10):
+        # first 3 will go through (note that 3rd request technically violates rate limit but
+        # that since the ratelimiting is applied *after* download it goes through, but next one fails)
+        for i in range(3):
             channel2 = self.make_request(
                 "GET",
                 f"/_matrix/client/v1/media/download/remote.org/abc{i}",
@@ -1841,7 +1848,7 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
             )
             assert channel2.code == 200
 
-        # eleventh will hit ratelimit
+        # 4th will hit ratelimit
         channel3 = self.make_request(
             "GET",
             "/_matrix/client/v1/media/download/remote.org/abcd",
@@ -1850,6 +1857,39 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
         )
         assert channel3.code == 429
 
+    @override_config({"max_upload_size": "29M"})
+    @patch(
+        "synapse.http.matrixfederationclient.read_multipart_response",
+        read_multipart_response_30MiB,
+    )
+    def test_max_download_respected(self) -> None:
+        """
+        Test that the max download size is enforced - note that max download size is determined
+        by the max_upload_size
+        """
+
+        # mock out actually sending the request, returns a 30MiB response
+        async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
+            resp = MagicMock(spec=IResponse)
+            resp.code = 200
+            resp.length = 31457280
+            resp.headers = Headers(
+                {"Content-Type": ["multipart/mixed; boundary=gc0p4Jq0M2Yt08jU534c0p"]}
+            )
+            resp.phrase = b"OK"
+            return resp
+
+        self.client._send_request = _send_request  # type: ignore
+
+        channel = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/remote.org/abcd",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        assert channel.code == 502
+        assert channel.json_body["errcode"] == "M_TOO_LARGE"
+
     def test_file_download(self) -> None:
         content = io.BytesIO(b"file_to_stream")
         content_uri = self.get_success(
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 304c0d4d3d..a008ee465b 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -20,7 +20,8 @@
 #
 import json
 import logging
-from typing import AbstractSet, Any, Dict, Iterable, List, Optional
+from http import HTTPStatus
+from typing import Any, Dict, Iterable, List
 
 from parameterized import parameterized, parameterized_class
 
@@ -1259,7 +1260,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         exact: bool = False,
     ) -> None:
         """
-        Wrapper around `_assertIncludes` to give slightly better looking diff error
+        Wrapper around `assertIncludes` to give slightly better looking diff error
         messages that include some context "$event_id (type, state_key)".
 
         Args:
@@ -1275,7 +1276,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         for event in actual_required_state:
             assert isinstance(event, dict)
 
-        self._assertIncludes(
+        self.assertIncludes(
             {
                 f'{event["event_id"]} ("{event["type"]}", "{event["state_key"]}")'
                 for event in actual_required_state
@@ -1289,56 +1290,6 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             message=str(actual_required_state),
         )
 
-    def _assertIncludes(
-        self,
-        actual_items: AbstractSet[str],
-        expected_items: AbstractSet[str],
-        exact: bool = False,
-        message: Optional[str] = None,
-    ) -> None:
-        """
-        Assert that all of the `expected_items` are included in the `actual_items`.
-
-        This assert could also be called `assertContains`, `assertItemsInSet`
-
-        Args:
-            actual_items: The container
-            expected_items: The items to check for in the container
-            exact: Whether the actual state should be exactly equal to the expected
-                state (no extras).
-            message: Optional message to include in the failure message.
-        """
-        # Check that each set has the same items
-        if exact and actual_items == expected_items:
-            return
-        # Check for a superset
-        elif not exact and actual_items >= expected_items:
-            return
-
-        expected_lines: List[str] = []
-        for expected_item in expected_items:
-            is_expected_in_actual = expected_item in actual_items
-            expected_lines.append(
-                "{}  {}".format(" " if is_expected_in_actual else "?", expected_item)
-            )
-
-        actual_lines: List[str] = []
-        for actual_item in actual_items:
-            is_actual_in_expected = actual_item in expected_items
-            actual_lines.append(
-                "{}  {}".format("+" if is_actual_in_expected else " ", actual_item)
-            )
-
-        newline = "\n"
-        expected_string = f"Expected items to be in actual ('?' = missing expected items):\n {{\n{newline.join(expected_lines)}\n }}"
-        actual_string = f"Actual ('+' = found expected items):\n {{\n{newline.join(actual_lines)}\n }}"
-        first_message = (
-            "Items must match exactly" if exact else "Some expected items are missing."
-        )
-        diff_message = f"{first_message}\n{expected_string}\n{actual_string}"
-
-        self.fail(f"{diff_message}\n{message}")
-
     def _add_new_dm_to_global_account_data(
         self, source_user_id: str, target_user_id: str, target_room_id: str
     ) -> None:
@@ -1662,6 +1613,20 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             list(channel.json_body["lists"]["room-invites"]),
         )
 
+        # Ensure DM's are correctly marked
+        self.assertDictEqual(
+            {
+                room_id: room.get("is_dm")
+                for room_id, room in channel.json_body["rooms"].items()
+            },
+            {
+                invite_room_id: None,
+                room_id: None,
+                invited_dm_room_id: True,
+                joined_dm_room_id: True,
+            },
+        )
+
     def test_sort_list(self) -> None:
         """
         Test that the `lists` are sorted by `stream_ordering`
@@ -1813,8 +1778,8 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
     def test_rooms_meta_when_joined(self) -> None:
         """
-        Test that the `rooms` `name` and `avatar` (soon to test `heroes`) are included
-        in the response when the user is joined to the room.
+        Test that the `rooms` `name` and `avatar` are included in the response and
+        reflect the current state of the room when the user is joined to the room.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1866,11 +1831,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             "mxc://DUMMY_MEDIA_ID",
             channel.json_body["rooms"][room_id1],
         )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            2,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            0,
+        )
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("is_dm"),
+        )
 
     def test_rooms_meta_when_invited(self) -> None:
         """
-        Test that the `rooms` `name` and `avatar` (soon to test `heroes`) are included
-        in the response when the user is invited to the room.
+        Test that the `rooms` `name` and `avatar` are included in the response and
+        reflect the current state of the room when the user is invited to the room.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1892,7 +1868,8 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             tok=user2_tok,
         )
 
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # User1 is invited to the room
+        self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
 
         # Update the room name after user1 has left
         self.helper.send_state(
@@ -1938,11 +1915,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             "mxc://UPDATED_DUMMY_MEDIA_ID",
             channel.json_body["rooms"][room_id1],
         )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            1,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            1,
+        )
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("is_dm"),
+        )
 
     def test_rooms_meta_when_banned(self) -> None:
         """
-        Test that the `rooms` `name` and `avatar` (soon to test `heroes`) reflect the
-        state of the room when the user was banned (do not leak current state).
+        Test that the `rooms` `name` and `avatar` reflect the state of the room when the
+        user was banned (do not leak current state).
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -2010,6 +1998,273 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             "mxc://DUMMY_MEDIA_ID",
             channel.json_body["rooms"][room_id1],
         )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            # FIXME: The actual number should be "1" (user2) but we currently don't
+            # support this for rooms where the user has left/been banned.
+            0,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            0,
+        )
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("is_dm"),
+        )
+
+    def test_rooms_meta_heroes(self) -> None:
+        """
+        Test that the `rooms` `heroes` are included in the response when the room
+        doesn't have a room name set.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "name": "my super room",
+            },
+        )
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # User3 is invited
+        self.helper.invite(room_id1, src=user2_id, targ=user3_id, tok=user2_tok)
+
+        room_id2 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                # No room name set so that `heroes` is populated
+                #
+                # "name": "my super room2",
+            },
+        )
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+        # User3 is invited
+        self.helper.invite(room_id2, src=user2_id, targ=user3_id, tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Room1 has a name so we shouldn't see any `heroes` which the client would use
+        # the calculate the room name themselves.
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["name"],
+            "my super room",
+            channel.json_body["rooms"][room_id1],
+        )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("heroes"))
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            2,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            1,
+        )
+
+        # Room2 doesn't have a name so we should see `heroes` populated
+        self.assertIsNone(channel.json_body["rooms"][room_id2].get("name"))
+        self.assertCountEqual(
+            [
+                hero["user_id"]
+                for hero in channel.json_body["rooms"][room_id2].get("heroes", [])
+            ],
+            # Heroes shouldn't include the user themselves (we shouldn't see user1)
+            [user2_id, user3_id],
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id2]["joined_count"],
+            2,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id2]["invited_count"],
+            1,
+        )
+
+        # We didn't request any state so we shouldn't see any `required_state`
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state"))
+        self.assertIsNone(channel.json_body["rooms"][room_id2].get("required_state"))
+
+    def test_rooms_meta_heroes_max(self) -> None:
+        """
+        Test that the `rooms` `heroes` only includes the first 5 users (not including
+        yourself).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        user5_tok = self.login(user5_id, "pass")
+        user6_id = self.register_user("user6", "pass")
+        user6_tok = self.login(user6_id, "pass")
+        user7_id = self.register_user("user7", "pass")
+        user7_tok = self.login(user7_id, "pass")
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                # No room name set so that `heroes` is populated
+                #
+                # "name": "my super room",
+            },
+        )
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user3_id, tok=user3_tok)
+        self.helper.join(room_id1, user4_id, tok=user4_tok)
+        self.helper.join(room_id1, user5_id, tok=user5_tok)
+        self.helper.join(room_id1, user6_id, tok=user6_tok)
+        self.helper.join(room_id1, user7_id, tok=user7_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Room2 doesn't have a name so we should see `heroes` populated
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("name"))
+        self.assertCountEqual(
+            [
+                hero["user_id"]
+                for hero in channel.json_body["rooms"][room_id1].get("heroes", [])
+            ],
+            # Heroes should be the first 5 users in the room (excluding the user
+            # themselves, we shouldn't see `user1`)
+            [user2_id, user3_id, user4_id, user5_id, user6_id],
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            7,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            0,
+        )
+
+        # We didn't request any state so we shouldn't see any `required_state`
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state"))
+
+    def test_rooms_meta_heroes_when_banned(self) -> None:
+        """
+        Test that the `rooms` `heroes` are included in the response when the room
+        doesn't have a room name set but doesn't leak information past their ban.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        _user5_tok = self.login(user5_id, "pass")
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                # No room name set so that `heroes` is populated
+                #
+                # "name": "my super room",
+            },
+        )
+        # User1 joins the room
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # User3 is invited
+        self.helper.invite(room_id1, src=user2_id, targ=user3_id, tok=user2_tok)
+
+        # User1 is banned from the room
+        self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+
+        # User4 joins the room after user1 is banned
+        self.helper.join(room_id1, user4_id, tok=user4_tok)
+        # User5 is invited after user1 is banned
+        self.helper.invite(room_id1, src=user2_id, targ=user5_id, tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Room2 doesn't have a name so we should see `heroes` populated
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("name"))
+        self.assertCountEqual(
+            [
+                hero["user_id"]
+                for hero in channel.json_body["rooms"][room_id1].get("heroes", [])
+            ],
+            # Heroes shouldn't include the user themselves (we shouldn't see user1). We
+            # also shouldn't see user4 since they joined after user1 was banned.
+            #
+            # FIXME: The actual result should be `[user2_id, user3_id]` but we currently
+            # don't support this for rooms where the user has left/been banned.
+            [],
+        )
+
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            # FIXME: The actual number should be "1" (user2) but we currently don't
+            # support this for rooms where the user has left/been banned.
+            0,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            # We shouldn't see user5 since they were invited after user1 was banned.
+            #
+            # FIXME: The actual number should be "1" (user3) but we currently don't
+            # support this for rooms where the user has left/been banned.
+            0,
+        )
 
     def test_rooms_limited_initial_sync(self) -> None:
         """
@@ -3081,11 +3336,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.assertEqual(channel.code, 200, channel.json_body)
 
         # Nothing to see for this banned user in the room in the token range
-        self.assertEqual(
-            channel.json_body["rooms"][room_id1]["timeline"],
-            [],
-            channel.json_body["rooms"][room_id1]["timeline"],
-        )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("timeline"))
         # No events returned in the timeline so nothing is "live"
         self.assertEqual(
             channel.json_body["rooms"][room_id1]["num_live"],
@@ -3565,6 +3816,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             body={"foo": "bar"},
             tok=user2_tok,
         )
+        self.helper.send_state(
+            room_id1,
+            event_type="org.matrix.bar_state",
+            state_key="",
+            body={"bar": "qux"},
+            tok=user2_tok,
+        )
 
         # Make the Sliding Sync request with wildcards for the `state_key`
         channel = self.make_request(
@@ -3588,16 +3846,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                         ],
                         "timeline_limit": 0,
                     },
-                }
-                # TODO: Room subscription should also combine with the `required_state`
-                # "room_subscriptions": {
-                #     room_id1: {
-                #         "required_state": [
-                #             ["org.matrix.bar_state", ""]
-                #         ],
-                #         "timeline_limit": 0,
-                #     }
-                # }
+                },
+                "room_subscriptions": {
+                    room_id1: {
+                        "required_state": [["org.matrix.bar_state", ""]],
+                        "timeline_limit": 0,
+                    }
+                },
             },
             access_token=user1_tok,
         )
@@ -3614,6 +3869,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                 state_map[(EventTypes.Member, user1_id)],
                 state_map[(EventTypes.Member, user2_id)],
                 state_map[("org.matrix.foo_state", "")],
+                state_map[("org.matrix.bar_state", "")],
             },
             exact=True,
         )
@@ -3706,6 +3962,271 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             channel.json_body["lists"]["foo-list"],
         )
 
+    def test_room_subscriptions_with_join_membership(self) -> None:
+        """
+        Test `room_subscriptions` with a joined room should give us timeline and current
+        state events.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Make the Sliding Sync request with just the room subscription
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "room_subscriptions": {
+                    room_id1: {
+                        "required_state": [
+                            [EventTypes.Create, ""],
+                        ],
+                        "timeline_limit": 1,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(room_id1)
+        )
+
+        # We should see some state
+        self._assertRequiredStateIncludes(
+            channel.json_body["rooms"][room_id1]["required_state"],
+            {
+                state_map[(EventTypes.Create, "")],
+            },
+            exact=True,
+        )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+
+        # We should see some events
+        self.assertEqual(
+            [
+                event["event_id"]
+                for event in channel.json_body["rooms"][room_id1]["timeline"]
+            ],
+            [
+                join_response["event_id"],
+            ],
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+        # No "live" events in an initial sync (no `from_token` to define the "live"
+        # range)
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            0,
+            channel.json_body["rooms"][room_id1],
+        )
+        # There are more events to paginate to
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            True,
+            channel.json_body["rooms"][room_id1],
+        )
+
+    def test_room_subscriptions_with_leave_membership(self) -> None:
+        """
+        Test `room_subscriptions` with a leave room should give us timeline and state
+        events up to the leave event.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.send_state(
+            room_id1,
+            event_type="org.matrix.foo_state",
+            state_key="",
+            body={"foo": "bar"},
+            tok=user2_tok,
+        )
+
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(room_id1)
+        )
+
+        # Send some events after user1 leaves
+        self.helper.send(room_id1, "activity after leave", tok=user2_tok)
+        # Update state after user1 leaves
+        self.helper.send_state(
+            room_id1,
+            event_type="org.matrix.foo_state",
+            state_key="",
+            body={"foo": "qux"},
+            tok=user2_tok,
+        )
+
+        # Make the Sliding Sync request with just the room subscription
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "room_subscriptions": {
+                    room_id1: {
+                        "required_state": [
+                            ["org.matrix.foo_state", ""],
+                        ],
+                        "timeline_limit": 2,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We should see the state at the time of the leave
+        self._assertRequiredStateIncludes(
+            channel.json_body["rooms"][room_id1]["required_state"],
+            {
+                state_map[("org.matrix.foo_state", "")],
+            },
+            exact=True,
+        )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+
+        # We should see some before we left (nothing after)
+        self.assertEqual(
+            [
+                event["event_id"]
+                for event in channel.json_body["rooms"][room_id1]["timeline"]
+            ],
+            [
+                join_response["event_id"],
+                leave_response["event_id"],
+            ],
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+        # No "live" events in an initial sync (no `from_token` to define the "live"
+        # range)
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            0,
+            channel.json_body["rooms"][room_id1],
+        )
+        # There are more events to paginate to
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            True,
+            channel.json_body["rooms"][room_id1],
+        )
+
+    def test_room_subscriptions_no_leak_private_room(self) -> None:
+        """
+        Test `room_subscriptions` with a private room we have never been in should not
+        leak any data to the user.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=False)
+
+        # We should not be able to join the private room
+        self.helper.join(
+            room_id1, user1_id, tok=user1_tok, expect_code=HTTPStatus.FORBIDDEN
+        )
+
+        # Make the Sliding Sync request with just the room subscription
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "room_subscriptions": {
+                    room_id1: {
+                        "required_state": [
+                            [EventTypes.Create, ""],
+                        ],
+                        "timeline_limit": 1,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We should not see the room at all (we're not in it)
+        self.assertIsNone(
+            channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
+        )
+
+    def test_room_subscriptions_world_readable(self) -> None:
+        """
+        Test `room_subscriptions` with a room that has `world_readable` history visibility
+
+        FIXME: We should be able to see the room timeline and state
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Create a room with `world_readable` history visibility
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "preset": "public_chat",
+                "initial_state": [
+                    {
+                        "content": {
+                            "history_visibility": HistoryVisibility.WORLD_READABLE
+                        },
+                        "state_key": "",
+                        "type": EventTypes.RoomHistoryVisibility,
+                    }
+                ],
+            },
+        )
+        # Ensure we're testing with a room with `world_readable` history visibility
+        # which means events are visible to anyone even without membership.
+        history_visibility_response = self.helper.get_state(
+            room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
+        )
+        self.assertEqual(
+            history_visibility_response.get("history_visibility"),
+            HistoryVisibility.WORLD_READABLE,
+        )
+
+        # Note: We never join the room
+
+        # Make the Sliding Sync request with just the room subscription
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "room_subscriptions": {
+                    room_id1: {
+                        "required_state": [
+                            [EventTypes.Create, ""],
+                        ],
+                        "timeline_limit": 1,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # FIXME: In the future, we should be able to see the room because it's
+        # `world_readable` but currently we don't support this.
+        self.assertIsNone(
+            channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
+        )
+
 
 class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
     """Tests for the to-device sliding sync extension"""
diff --git a/tests/server.py b/tests/server.py
index f1cd0f76be..85602e6953 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -289,10 +289,6 @@ class FakeChannel:
         self._reactor.run()
 
         while not self.is_finished():
-            # If there's a producer, tell it to resume producing so we get content
-            if self._producer:
-                self._producer.resumeProducing()
-
             if self._reactor.seconds() > end_time:
                 raise TimedOutException("Timed out waiting for request to finish.")
 
diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py
index 882f3bbbdc..418b556108 100644
--- a/tests/storage/test_roommember.py
+++ b/tests/storage/test_roommember.py
@@ -19,20 +19,28 @@
 # [This file includes modifications made by New Vector Limited]
 #
 #
+import logging
 from typing import List, Optional, Tuple, cast
 
 from twisted.test.proto_helpers import MemoryReactor
 
-from synapse.api.constants import Membership
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.room_versions import RoomVersions
+from synapse.rest import admin
 from synapse.rest.admin import register_servlets_for_client_rest_resource
-from synapse.rest.client import login, room
+from synapse.rest.client import knock, login, room
 from synapse.server import HomeServer
+from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.roommember import MemberSummary
 from synapse.types import UserID, create_requester
 from synapse.util import Clock
 
 from tests import unittest
 from tests.server import TestHomeServer
 from tests.test_utils import event_injection
+from tests.unittest import skip_unless
+
+logger = logging.getLogger(__name__)
 
 
 class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
@@ -240,6 +248,397 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
         )
 
 
+class RoomSummaryTestCase(unittest.HomeserverTestCase):
+    """
+    Test `/sync` room summary related logic like `get_room_summary(...)` and
+    `extract_heroes_from_room_summary(...)`
+    """
+
+    servlets = [
+        admin.register_servlets,
+        knock.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
+        self.store = self.hs.get_datastores().main
+
+    def _assert_member_summary(
+        self,
+        actual_member_summary: MemberSummary,
+        expected_member_list: List[str],
+        *,
+        expected_member_count: Optional[int] = None,
+    ) -> None:
+        """
+        Assert that the `MemberSummary` object has the expected members.
+        """
+        self.assertListEqual(
+            [
+                user_id
+                for user_id, _membership_event_id in actual_member_summary.members
+            ],
+            expected_member_list,
+        )
+        self.assertEqual(
+            actual_member_summary.count,
+            (
+                expected_member_count
+                if expected_member_count is not None
+                else len(expected_member_list)
+            ),
+        )
+
+    def test_get_room_summary_membership(self) -> None:
+        """
+        Test that `get_room_summary(...)` gets every kind of membership when there
+        aren't that many members in the room.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        user5_tok = self.login(user5_id, "pass")
+
+        # Setup a room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # User2 is banned
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+        self.helper.ban(room_id, src=user1_id, targ=user2_id, tok=user1_tok)
+
+        # User3 is invited by user1
+        self.helper.invite(room_id, targ=user3_id, tok=user1_tok)
+
+        # User4 leaves
+        self.helper.join(room_id, user4_id, tok=user4_tok)
+        self.helper.leave(room_id, user4_id, tok=user4_tok)
+
+        # User5 joins
+        self.helper.join(room_id, user5_id, tok=user5_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+        empty_ms = MemberSummary([], 0)
+
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.JOIN, empty_ms),
+            [user1_id, user5_id],
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.INVITE, empty_ms), [user3_id]
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.LEAVE, empty_ms), [user4_id]
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.BAN, empty_ms), [user2_id]
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.KNOCK, empty_ms),
+            [
+                # No one knocked
+            ],
+        )
+
+    def test_get_room_summary_membership_order(self) -> None:
+        """
+        Test that `get_room_summary(...)` stacks our limit of 6 in this order: joins ->
+        invites -> leave -> everything else (bans/knocks)
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        user5_tok = self.login(user5_id, "pass")
+        user6_id = self.register_user("user6", "pass")
+        user6_tok = self.login(user6_id, "pass")
+        user7_id = self.register_user("user7", "pass")
+        user7_tok = self.login(user7_id, "pass")
+
+        # Setup the room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # We expect the order to be joins -> invites -> leave -> bans so setup the users
+        # *NOT* in that same order to make sure we're actually sorting them.
+
+        # User2 is banned
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+        self.helper.ban(room_id, src=user1_id, targ=user2_id, tok=user1_tok)
+
+        # User3 is invited by user1
+        self.helper.invite(room_id, targ=user3_id, tok=user1_tok)
+
+        # User4 leaves
+        self.helper.join(room_id, user4_id, tok=user4_tok)
+        self.helper.leave(room_id, user4_id, tok=user4_tok)
+
+        # User5, User6, User7 joins
+        self.helper.join(room_id, user5_id, tok=user5_tok)
+        self.helper.join(room_id, user6_id, tok=user6_tok)
+        self.helper.join(room_id, user7_id, tok=user7_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+        empty_ms = MemberSummary([], 0)
+
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.JOIN, empty_ms),
+            [user1_id, user5_id, user6_id, user7_id],
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.INVITE, empty_ms), [user3_id]
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.LEAVE, empty_ms), [user4_id]
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.BAN, empty_ms),
+            [
+                # The banned user is not in the summary because the summary can only fit
+                # 6 members and prefers everything else before bans
+                #
+                # user2_id
+            ],
+            # But we still see the count of banned users
+            expected_member_count=1,
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.KNOCK, empty_ms),
+            [
+                # No one knocked
+            ],
+        )
+
+    def test_extract_heroes_from_room_summary_excludes_self(self) -> None:
+        """
+        Test that `extract_heroes_from_room_summary(...)` does not include the user
+        itself.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Setup the room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # User2 joins
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+
+        # We first ask from the perspective of a random fake user
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me="@fakeuser"
+        )
+
+        # Make sure user1 is in the room (ensure our test setup is correct)
+        self.assertListEqual(hero_user_ids, [user1_id, user2_id])
+
+        # Now, we ask for the room summary from the perspective of user1
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me=user1_id
+        )
+
+        # User1 should not be included in the list of heroes because they are the one
+        # asking
+        self.assertListEqual(hero_user_ids, [user2_id])
+
+    def test_extract_heroes_from_room_summary_first_five_joins(self) -> None:
+        """
+        Test that `extract_heroes_from_room_summary(...)` returns the first 5 joins.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        user5_tok = self.login(user5_id, "pass")
+        user6_id = self.register_user("user6", "pass")
+        user6_tok = self.login(user6_id, "pass")
+        user7_id = self.register_user("user7", "pass")
+        user7_tok = self.login(user7_id, "pass")
+
+        # Setup the room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # User2 -> User7 joins
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+        self.helper.join(room_id, user3_id, tok=user3_tok)
+        self.helper.join(room_id, user4_id, tok=user4_tok)
+        self.helper.join(room_id, user5_id, tok=user5_tok)
+        self.helper.join(room_id, user6_id, tok=user6_tok)
+        self.helper.join(room_id, user7_id, tok=user7_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me="@fakuser"
+        )
+
+        # First 5 users to join the room
+        self.assertListEqual(
+            hero_user_ids, [user1_id, user2_id, user3_id, user4_id, user5_id]
+        )
+
+    def test_extract_heroes_from_room_summary_membership_order(self) -> None:
+        """
+        Test that `extract_heroes_from_room_summary(...)` prefers joins/invites over
+        everything else.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        user5_tok = self.login(user5_id, "pass")
+
+        # Setup the room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # We expect the order to be joins -> invites -> leave -> bans so setup the users
+        # *NOT* in that same order to make sure we're actually sorting them.
+
+        # User2 is banned
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+        self.helper.ban(room_id, src=user1_id, targ=user2_id, tok=user1_tok)
+
+        # User3 is invited by user1
+        self.helper.invite(room_id, targ=user3_id, tok=user1_tok)
+
+        # User4 leaves
+        self.helper.join(room_id, user4_id, tok=user4_tok)
+        self.helper.leave(room_id, user4_id, tok=user4_tok)
+
+        # User5 joins
+        self.helper.join(room_id, user5_id, tok=user5_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me="@fakeuser"
+        )
+
+        # Prefer joins -> invites, over everything else
+        self.assertListEqual(
+            hero_user_ids,
+            [
+                # The joins
+                user1_id,
+                user5_id,
+                # The invites
+                user3_id,
+            ],
+        )
+
+    @skip_unless(
+        False,
+        "Test is not possible because when everyone leaves the room, "
+        + "the server is `no_longer_in_room` and we don't have any `current_state_events` to query",
+    )
+    def test_extract_heroes_from_room_summary_fallback_leave_ban(self) -> None:
+        """
+        Test that `extract_heroes_from_room_summary(...)` falls back to leave/ban if
+        there aren't any joins/invites.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass")
+
+        # Setup the room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # User2 is banned
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+        self.helper.ban(room_id, src=user1_id, targ=user2_id, tok=user1_tok)
+
+        # User3 leaves
+        self.helper.join(room_id, user3_id, tok=user3_tok)
+        self.helper.leave(room_id, user3_id, tok=user3_tok)
+
+        # User1 leaves (we're doing this last because they're the room creator)
+        self.helper.leave(room_id, user1_id, tok=user1_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me="@fakeuser"
+        )
+
+        # Fallback to people who left -> banned
+        self.assertListEqual(
+            hero_user_ids,
+            [user3_id, user1_id, user3_id],
+        )
+
+    def test_extract_heroes_from_room_summary_excludes_knocks(self) -> None:
+        """
+        People who knock on the room have (potentially) never been in the room before
+        and are total outsiders. Plus the spec doesn't mention them at all for heroes.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Setup the knock room (user1 is the creator and is joined to the room)
+        knock_room_id = self.helper.create_room_as(
+            user1_id, tok=user1_tok, room_version=RoomVersions.V7.identifier
+        )
+        self.helper.send_state(
+            knock_room_id,
+            EventTypes.JoinRules,
+            {"join_rule": JoinRules.KNOCK},
+            tok=user1_tok,
+        )
+
+        # User2 knocks on the room
+        knock_channel = self.make_request(
+            "POST",
+            "/_matrix/client/r0/knock/%s" % (knock_room_id,),
+            b"{}",
+            user2_tok,
+        )
+        self.assertEqual(knock_channel.code, 200, knock_channel.result)
+
+        room_membership_summary = self.get_success(
+            self.store.get_room_summary(knock_room_id)
+        )
+
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me="@fakeuser"
+        )
+
+        # user1 is the creator and is joined to the room (should show up as a hero)
+        # user2 is knocking on the room (should not show up as a hero)
+        self.assertListEqual(
+            hero_user_ids,
+            [user1_id],
+        )
+
+
 class CurrentStateMembershipUpdateTestCase(unittest.HomeserverTestCase):
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.store = hs.get_datastores().main
diff --git a/tests/test_types.py b/tests/test_types.py
index 944aa784fc..00adc65a5a 100644
--- a/tests/test_types.py
+++ b/tests/test_types.py
@@ -19,9 +19,18 @@
 #
 #
 
+from typing import Type
+from unittest import skipUnless
+
+from immutabledict import immutabledict
+from parameterized import parameterized_class
+
 from synapse.api.errors import SynapseError
 from synapse.types import (
+    AbstractMultiWriterStreamToken,
+    MultiWriterStreamToken,
     RoomAlias,
+    RoomStreamToken,
     UserID,
     get_domain_from_id,
     get_localpart_from_id,
@@ -29,6 +38,7 @@ from synapse.types import (
 )
 
 from tests import unittest
+from tests.utils import USE_POSTGRES_FOR_TESTS
 
 
 class IsMineIDTests(unittest.HomeserverTestCase):
@@ -127,3 +137,64 @@ class MapUsernameTestCase(unittest.TestCase):
         # this should work with either a unicode or a bytes
         self.assertEqual(map_username_to_mxid_localpart("têst"), "t=c3=aast")
         self.assertEqual(map_username_to_mxid_localpart("têst".encode()), "t=c3=aast")
+
+
+@parameterized_class(
+    ("token_type",),
+    [
+        (MultiWriterStreamToken,),
+        (RoomStreamToken,),
+    ],
+    class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{params_dict['token_type'].__name__}",
+)
+class MultiWriterTokenTestCase(unittest.HomeserverTestCase):
+    """Tests for the different types of multi writer tokens."""
+
+    token_type: Type[AbstractMultiWriterStreamToken]
+
+    def test_basic_token(self) -> None:
+        """Test that a simple stream token can be serialized and unserialized"""
+        store = self.hs.get_datastores().main
+
+        token = self.token_type(stream=5)
+
+        string_token = self.get_success(token.to_string(store))
+
+        if isinstance(token, RoomStreamToken):
+            self.assertEqual(string_token, "s5")
+        else:
+            self.assertEqual(string_token, "5")
+
+        parsed_token = self.get_success(self.token_type.parse(store, string_token))
+        self.assertEqual(parsed_token, token)
+
+    @skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres")
+    def test_instance_map(self) -> None:
+        """Test for stream token with instance map"""
+        store = self.hs.get_datastores().main
+
+        token = self.token_type(stream=5, instance_map=immutabledict({"foo": 6}))
+
+        string_token = self.get_success(token.to_string(store))
+        self.assertEqual(string_token, "m5~1.6")
+
+        parsed_token = self.get_success(self.token_type.parse(store, string_token))
+        self.assertEqual(parsed_token, token)
+
+    def test_instance_map_assertion(self) -> None:
+        """Test that we assert values in the instance map are greater than the
+        min stream position"""
+
+        with self.assertRaises(ValueError):
+            self.token_type(stream=5, instance_map=immutabledict({"foo": 4}))
+
+        with self.assertRaises(ValueError):
+            self.token_type(stream=5, instance_map=immutabledict({"foo": 5}))
+
+    def test_parse_bad_token(self) -> None:
+        """Test that we can parse tokens produced by a bug in Synapse of the
+        form `m5~`"""
+        store = self.hs.get_datastores().main
+
+        parsed_token = self.get_success(self.token_type.parse(store, "m5~"))
+        self.assertEqual(parsed_token, self.token_type(stream=5))
diff --git a/tests/unittest.py b/tests/unittest.py
index a7c20556a0..4aa7f56106 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -28,6 +28,7 @@ import logging
 import secrets
 import time
 from typing import (
+    AbstractSet,
     Any,
     Awaitable,
     Callable,
@@ -269,6 +270,56 @@ class TestCase(unittest.TestCase):
                 required[key], actual[key], msg="%s mismatch. %s" % (key, actual)
             )
 
+    def assertIncludes(
+        self,
+        actual_items: AbstractSet[str],
+        expected_items: AbstractSet[str],
+        exact: bool = False,
+        message: Optional[str] = None,
+    ) -> None:
+        """
+        Assert that all of the `expected_items` are included in the `actual_items`.
+
+        This assert could also be called `assertContains`, `assertItemsInSet`
+
+        Args:
+            actual_items: The container
+            expected_items: The items to check for in the container
+            exact: Whether the actual state should be exactly equal to the expected
+                state (no extras).
+            message: Optional message to include in the failure message.
+        """
+        # Check that each set has the same items
+        if exact and actual_items == expected_items:
+            return
+        # Check for a superset
+        elif not exact and actual_items >= expected_items:
+            return
+
+        expected_lines: List[str] = []
+        for expected_item in expected_items:
+            is_expected_in_actual = expected_item in actual_items
+            expected_lines.append(
+                "{}  {}".format(" " if is_expected_in_actual else "?", expected_item)
+            )
+
+        actual_lines: List[str] = []
+        for actual_item in actual_items:
+            is_actual_in_expected = actual_item in expected_items
+            actual_lines.append(
+                "{}  {}".format("+" if is_actual_in_expected else " ", actual_item)
+            )
+
+        newline = "\n"
+        expected_string = f"Expected items to be in actual ('?' = missing expected items):\n {{\n{newline.join(expected_lines)}\n }}"
+        actual_string = f"Actual ('+' = found expected items):\n {{\n{newline.join(actual_lines)}\n }}"
+        first_message = (
+            "Items must match exactly" if exact else "Some expected items are missing."
+        )
+        diff_message = f"{first_message}\n{expected_string}\n{actual_string}"
+
+        self.fail(f"{diff_message}\n{message}")
+
 
 def DEBUG(target: TV) -> TV:
     """A decorator to set the .loglevel attribute to logging.DEBUG.
diff --git a/tests/util/test_check_dependencies.py b/tests/util/test_check_dependencies.py
index fb67146c69..13a4e6ddaa 100644
--- a/tests/util/test_check_dependencies.py
+++ b/tests/util/test_check_dependencies.py
@@ -21,6 +21,7 @@
 
 from contextlib import contextmanager
 from os import PathLike
+from pathlib import Path
 from typing import Generator, Optional, Union
 from unittest.mock import patch
 
@@ -41,7 +42,7 @@ class DummyDistribution(metadata.Distribution):
     def version(self) -> str:
         return self._version
 
-    def locate_file(self, path: Union[str, PathLike]) -> PathLike:
+    def locate_file(self, path: Union[str, PathLike]) -> Path:
         raise NotImplementedError()
 
     def read_text(self, filename: str) -> None: