summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-31 12:59:11 +0100
committerErik Johnston <erik@matrix.org>2024-07-31 13:00:13 +0100
commit31ec7b547cb3d9ecbc291e1e9fed163873742340 (patch)
treecc4169aa48ad7df0188971045047bbcdd6420f31
parentMerge branch 'erikj/ss_room_sub_timeline' into erikj/ss_hacks (diff)
parentSliding Sync: Update filters to be robust against remote invite rooms (#17450) (diff)
downloadsynapse-31ec7b547cb3d9ecbc291e1e9fed163873742340.tar.xz
Merge remote-tracking branch 'origin/develop' into erikj/ss_hacks
-rw-r--r--CHANGES.md46
-rw-r--r--Cargo.lock5
-rw-r--r--changelog.d/17450.bugfix1
-rw-r--r--debian/changelog12
-rw-r--r--poetry.lock131
-rw-r--r--pyproject.toml2
-rw-r--r--synapse/api/constants.py8
-rw-r--r--synapse/events/__init__.py19
-rw-r--r--synapse/events/utils.py29
-rw-r--r--synapse/handlers/sliding_sync.py435
-rw-r--r--synapse/handlers/stats.py4
-rw-r--r--synapse/http/proxy.py12
-rw-r--r--synapse/http/server.py4
-rw-r--r--synapse/http/site.py2
-rw-r--r--synapse/storage/_base.py4
-rw-r--r--synapse/storage/databases/main/cache.py22
-rw-r--r--synapse/storage/databases/main/state.py215
-rw-r--r--synapse/types/handlers/__init__.py2
-rw-r--r--tests/handlers/test_sliding_sync.py874
-rw-r--r--tests/rest/client/test_login.py5
-rw-r--r--tests/rest/client/test_sync.py794
-rw-r--r--tests/server.py26
-rw-r--r--tests/test_server.py9
23 files changed, 2197 insertions, 464 deletions
diff --git a/CHANGES.md b/CHANGES.md
index f869674ace..b4fddc3e5c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,30 @@
+# Synapse 1.112.0 (2024-07-30)
+
+This security release is to update our locked dependency on Twisted to 24.7.0rc1, which includes a security fix for [CVE-2024-41671 / GHSA-c8m8-j448-xjx7: Disordered HTTP pipeline response in twisted.web, again](https://github.com/twisted/twisted/security/advisories/GHSA-c8m8-j448-xjx7).
+
+Note that this security fix is also available as **Synapse 1.111.1**, which does not include the rest of the changes in Synapse 1.112.0.
+
+This issue means that, if multiple HTTP requests are pipelined in the same TCP connection, Synapse can send responses to the wrong HTTP request.
+If a reverse proxy was configured to use HTTP pipelining, this could result in responses being sent to the wrong user, severely harming confidentiality.
+
+With that said, despite being a high severity issue, **we consider it unlikely that Synapse installations will be affected**.
+The use of HTTP pipelining in this fashion would cause worse performance for clients (request-response latencies would be increased as users' responses would be artificially blocked behind other users' slow requests). Further, Nginx and Haproxy, two common reverse proxies, do not appear to support configuring their upstreams to use HTTP pipelining and thus would not be affected. For both of these reasons, we consider it unlikely that a Synapse deployment would be set up in such a configuration.
+
+Despite that, we cannot rule out that some installations may exist with this unusual setup and so we are releasing this security update today.
+
+**pip users:** Note that by default, upgrading Synapse using pip will not automatically upgrade Twisted. **Please manually install the new version of Twisted** using `pip install Twisted==24.7.0rc1`. Note also that even the `--upgrade-strategy=eager` flag to `pip install -U matrix-synapse` will not upgrade Twisted to a patched version because it is only a release candidate at this time.
+
+### Internal Changes
+
+- Upgrade locked dependency on Twisted to 24.7.0rc1. ([\#17502](https://github.com/element-hq/synapse/issues/17502))
+
+
 # Synapse 1.112.0rc1 (2024-07-23)
 
+Please note that this release candidate does not include the security dependency update
+included in version 1.111.1 as this version was released before 1.111.1.
+The same security fix can be found in the full release of 1.112.0.
+
 ### Features
 
 - Add to-device extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17416](https://github.com/element-hq/synapse/issues/17416))
@@ -56,6 +81,27 @@
 * Bump ulid from 1.1.2 to 1.1.3. ([\#17442](https://github.com/element-hq/synapse/issues/17442))
 * Bump zipp from 3.15.0 to 3.19.1. ([\#17427](https://github.com/element-hq/synapse/issues/17427))
 
+
+# Synapse 1.111.1 (2024-07-30)
+
+This security release is to update our locked dependency on Twisted to 24.7.0rc1, which includes a security fix for [CVE-2024-41671 / GHSA-c8m8-j448-xjx7: Disordered HTTP pipeline response in twisted.web, again](https://github.com/twisted/twisted/security/advisories/GHSA-c8m8-j448-xjx7).
+
+This issue means that, if multiple HTTP requests are pipelined in the same TCP connection, Synapse can send responses to the wrong HTTP request.
+If a reverse proxy was configured to use HTTP pipelining, this could result in responses being sent to the wrong user, severely harming confidentiality.
+
+With that said, despite being a high severity issue, **we consider it unlikely that Synapse installations will be affected**.
+The use of HTTP pipelining in this fashion would cause worse performance for clients (request-response latencies would be increased as users' responses would be artificially blocked behind other users' slow requests). Further, Nginx and Haproxy, two common reverse proxies, do not appear to support configuring their upstreams to use HTTP pipelining and thus would not be affected. For both of these reasons, we consider it unlikely that a Synapse deployment would be set up in such a configuration.
+
+Despite that, we cannot rule out that some installations may exist with this unusual setup and so we are releasing this security update today.
+
+**pip users:** Note that by default, upgrading Synapse using pip will not automatically upgrade Twisted. **Please manually install the new version of Twisted** using `pip install Twisted==24.7.0rc1`. Note also that even the `--upgrade-strategy=eager` flag to `pip install -U matrix-synapse` will not upgrade Twisted to a patched version because it is only a release candidate at this time.
+
+
+### Internal Changes
+
+- Upgrade locked dependency on Twisted to 24.7.0rc1. ([\#17502](https://github.com/element-hq/synapse/issues/17502))
+
+
 # Synapse 1.111.0 (2024-07-16)
 
 No significant changes since 1.111.0rc2.
diff --git a/Cargo.lock b/Cargo.lock
index e9adfcbdc3..333499e197 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -505,11 +505,12 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.120"
+version = "1.0.121"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5"
+checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609"
 dependencies = [
  "itoa",
+ "memchr",
  "ryu",
  "serde",
 ]
diff --git a/changelog.d/17450.bugfix b/changelog.d/17450.bugfix
new file mode 100644
index 0000000000..01a521da38
--- /dev/null
+++ b/changelog.d/17450.bugfix
@@ -0,0 +1 @@
+Update experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint to handle invite/knock rooms when filtering.
diff --git a/debian/changelog b/debian/changelog
index 5209b9f5fd..e35750a35f 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,9 +1,21 @@
+matrix-synapse-py3 (1.112.0) stable; urgency=medium
+
+  * New Synapse release 1.112.0.
+
+ -- Synapse Packaging team <packages@matrix.org>  Tue, 30 Jul 2024 17:15:48 +0100
+
 matrix-synapse-py3 (1.112.0~rc1) stable; urgency=medium
 
   * New Synapse release 1.112.0rc1.
 
  -- Synapse Packaging team <packages@matrix.org>  Tue, 23 Jul 2024 08:58:55 -0600
 
+matrix-synapse-py3 (1.111.1) stable; urgency=medium
+
+  * New Synapse release 1.111.1.
+
+ -- Synapse Packaging team <packages@matrix.org>  Tue, 30 Jul 2024 16:13:52 +0100
+
 matrix-synapse-py3 (1.111.0) stable; urgency=medium
 
   * New Synapse release 1.111.0.
diff --git a/poetry.lock b/poetry.lock
index 417f6850b8..7d8334515a 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1,4 +1,4 @@
-# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
+# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
 
 [[package]]
 name = "annotated-types"
@@ -67,38 +67,38 @@ visualize = ["Twisted (>=16.1.1)", "graphviz (>0.5.1)"]
 
 [[package]]
 name = "bcrypt"
-version = "4.1.3"
+version = "4.2.0"
 description = "Modern password hashing for your software and your servers"
 optional = false
 python-versions = ">=3.7"
 files = [
-    {file = "bcrypt-4.1.3-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:48429c83292b57bf4af6ab75809f8f4daf52aa5d480632e53707805cc1ce9b74"},
-    {file = "bcrypt-4.1.3-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4a8bea4c152b91fd8319fef4c6a790da5c07840421c2b785084989bf8bbb7455"},
-    {file = "bcrypt-4.1.3-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d3b317050a9a711a5c7214bf04e28333cf528e0ed0ec9a4e55ba628d0f07c1a"},
-    {file = "bcrypt-4.1.3-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:094fd31e08c2b102a14880ee5b3d09913ecf334cd604af27e1013c76831f7b05"},
-    {file = "bcrypt-4.1.3-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:4fb253d65da30d9269e0a6f4b0de32bd657a0208a6f4e43d3e645774fb5457f3"},
-    {file = "bcrypt-4.1.3-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:193bb49eeeb9c1e2db9ba65d09dc6384edd5608d9d672b4125e9320af9153a15"},
-    {file = "bcrypt-4.1.3-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:8cbb119267068c2581ae38790e0d1fbae65d0725247a930fc9900c285d95725d"},
-    {file = "bcrypt-4.1.3-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:6cac78a8d42f9d120b3987f82252bdbeb7e6e900a5e1ba37f6be6fe4e3848286"},
-    {file = "bcrypt-4.1.3-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:01746eb2c4299dd0ae1670234bf77704f581dd72cc180f444bfe74eb80495b64"},
-    {file = "bcrypt-4.1.3-cp37-abi3-win32.whl", hash = "sha256:037c5bf7c196a63dcce75545c8874610c600809d5d82c305dd327cd4969995bf"},
-    {file = "bcrypt-4.1.3-cp37-abi3-win_amd64.whl", hash = "sha256:8a893d192dfb7c8e883c4576813bf18bb9d59e2cfd88b68b725990f033f1b978"},
-    {file = "bcrypt-4.1.3-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:0d4cf6ef1525f79255ef048b3489602868c47aea61f375377f0d00514fe4a78c"},
-    {file = "bcrypt-4.1.3-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f5698ce5292a4e4b9e5861f7e53b1d89242ad39d54c3da451a93cac17b61921a"},
-    {file = "bcrypt-4.1.3-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ec3c2e1ca3e5c4b9edb94290b356d082b721f3f50758bce7cce11d8a7c89ce84"},
-    {file = "bcrypt-4.1.3-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:3a5be252fef513363fe281bafc596c31b552cf81d04c5085bc5dac29670faa08"},
-    {file = "bcrypt-4.1.3-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:5f7cd3399fbc4ec290378b541b0cf3d4398e4737a65d0f938c7c0f9d5e686611"},
-    {file = "bcrypt-4.1.3-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:c4c8d9b3e97209dd7111bf726e79f638ad9224b4691d1c7cfefa571a09b1b2d6"},
-    {file = "bcrypt-4.1.3-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:31adb9cbb8737a581a843e13df22ffb7c84638342de3708a98d5c986770f2834"},
-    {file = "bcrypt-4.1.3-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:551b320396e1d05e49cc18dd77d970accd52b322441628aca04801bbd1d52a73"},
-    {file = "bcrypt-4.1.3-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:6717543d2c110a155e6821ce5670c1f512f602eabb77dba95717ca76af79867d"},
-    {file = "bcrypt-4.1.3-cp39-abi3-win32.whl", hash = "sha256:6004f5229b50f8493c49232b8e75726b568535fd300e5039e255d919fc3a07f2"},
-    {file = "bcrypt-4.1.3-cp39-abi3-win_amd64.whl", hash = "sha256:2505b54afb074627111b5a8dc9b6ae69d0f01fea65c2fcaea403448c503d3991"},
-    {file = "bcrypt-4.1.3-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:cb9c707c10bddaf9e5ba7cdb769f3e889e60b7d4fea22834b261f51ca2b89fed"},
-    {file = "bcrypt-4.1.3-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:9f8ea645eb94fb6e7bea0cf4ba121c07a3a182ac52876493870033141aa687bc"},
-    {file = "bcrypt-4.1.3-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:f44a97780677e7ac0ca393bd7982b19dbbd8d7228c1afe10b128fd9550eef5f1"},
-    {file = "bcrypt-4.1.3-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:d84702adb8f2798d813b17d8187d27076cca3cd52fe3686bb07a9083930ce650"},
-    {file = "bcrypt-4.1.3.tar.gz", hash = "sha256:2ee15dd749f5952fe3f0430d0ff6b74082e159c50332a1413d51b5689cf06623"},
+    {file = "bcrypt-4.2.0-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:096a15d26ed6ce37a14c1ac1e48119660f21b24cba457f160a4b830f3fe6b5cb"},
+    {file = "bcrypt-4.2.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c02d944ca89d9b1922ceb8a46460dd17df1ba37ab66feac4870f6862a1533c00"},
+    {file = "bcrypt-4.2.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1d84cf6d877918620b687b8fd1bf7781d11e8a0998f576c7aa939776b512b98d"},
+    {file = "bcrypt-4.2.0-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:1bb429fedbe0249465cdd85a58e8376f31bb315e484f16e68ca4c786dcc04291"},
+    {file = "bcrypt-4.2.0-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:655ea221910bcac76ea08aaa76df427ef8625f92e55a8ee44fbf7753dbabb328"},
+    {file = "bcrypt-4.2.0-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:1ee38e858bf5d0287c39b7a1fc59eec64bbf880c7d504d3a06a96c16e14058e7"},
+    {file = "bcrypt-4.2.0-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:0da52759f7f30e83f1e30a888d9163a81353ef224d82dc58eb5bb52efcabc399"},
+    {file = "bcrypt-4.2.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:3698393a1b1f1fd5714524193849d0c6d524d33523acca37cd28f02899285060"},
+    {file = "bcrypt-4.2.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:762a2c5fb35f89606a9fde5e51392dad0cd1ab7ae64149a8b935fe8d79dd5ed7"},
+    {file = "bcrypt-4.2.0-cp37-abi3-win32.whl", hash = "sha256:5a1e8aa9b28ae28020a3ac4b053117fb51c57a010b9f969603ed885f23841458"},
+    {file = "bcrypt-4.2.0-cp37-abi3-win_amd64.whl", hash = "sha256:8f6ede91359e5df88d1f5c1ef47428a4420136f3ce97763e31b86dd8280fbdf5"},
+    {file = "bcrypt-4.2.0-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:c52aac18ea1f4a4f65963ea4f9530c306b56ccd0c6f8c8da0c06976e34a6e841"},
+    {file = "bcrypt-4.2.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3bbbfb2734f0e4f37c5136130405332640a1e46e6b23e000eeff2ba8d005da68"},
+    {file = "bcrypt-4.2.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3413bd60460f76097ee2e0a493ccebe4a7601918219c02f503984f0a7ee0aebe"},
+    {file = "bcrypt-4.2.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:8d7bb9c42801035e61c109c345a28ed7e84426ae4865511eb82e913df18f58c2"},
+    {file = "bcrypt-4.2.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:3d3a6d28cb2305b43feac298774b997e372e56c7c7afd90a12b3dc49b189151c"},
+    {file = "bcrypt-4.2.0-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:9c1c4ad86351339c5f320ca372dfba6cb6beb25e8efc659bedd918d921956bae"},
+    {file = "bcrypt-4.2.0-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:27fe0f57bb5573104b5a6de5e4153c60814c711b29364c10a75a54bb6d7ff48d"},
+    {file = "bcrypt-4.2.0-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:8ac68872c82f1add6a20bd489870c71b00ebacd2e9134a8aa3f98a0052ab4b0e"},
+    {file = "bcrypt-4.2.0-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:cb2a8ec2bc07d3553ccebf0746bbf3d19426d1c6d1adbd4fa48925f66af7b9e8"},
+    {file = "bcrypt-4.2.0-cp39-abi3-win32.whl", hash = "sha256:77800b7147c9dc905db1cba26abe31e504d8247ac73580b4aa179f98e6608f34"},
+    {file = "bcrypt-4.2.0-cp39-abi3-win_amd64.whl", hash = "sha256:61ed14326ee023917ecd093ee6ef422a72f3aec6f07e21ea5f10622b735538a9"},
+    {file = "bcrypt-4.2.0-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:39e1d30c7233cfc54f5c3f2c825156fe044efdd3e0b9d309512cc514a263ec2a"},
+    {file = "bcrypt-4.2.0-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:f4f4acf526fcd1c34e7ce851147deedd4e26e6402369304220250598b26448db"},
+    {file = "bcrypt-4.2.0-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:1ff39b78a52cf03fdf902635e4c81e544714861ba3f0efc56558979dd4f09170"},
+    {file = "bcrypt-4.2.0-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:373db9abe198e8e2c70d12b479464e0d5092cc122b20ec504097b5f2297ed184"},
+    {file = "bcrypt-4.2.0.tar.gz", hash = "sha256:cf69eaf5185fd58f268f805b505ce31f9b9fc2d64b376642164e9244540c1221"},
 ]
 
 [package.extras]
@@ -821,18 +821,21 @@ testing = ["flake8 (<5)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-chec
 
 [[package]]
 name = "incremental"
-version = "22.10.0"
-description = "\"A small library that versions your Python projects.\""
+version = "24.7.2"
+description = "A small library that versions your Python projects."
 optional = false
-python-versions = "*"
+python-versions = ">=3.8"
 files = [
-    {file = "incremental-22.10.0-py2.py3-none-any.whl", hash = "sha256:b864a1f30885ee72c5ac2835a761b8fe8aa9c28b9395cacf27286602688d3e51"},
-    {file = "incremental-22.10.0.tar.gz", hash = "sha256:912feeb5e0f7e0188e6f42241d2f450002e11bbc0937c65865045854c24c0bd0"},
+    {file = "incremental-24.7.2-py3-none-any.whl", hash = "sha256:8cb2c3431530bec48ad70513931a760f446ad6c25e8333ca5d95e24b0ed7b8fe"},
+    {file = "incremental-24.7.2.tar.gz", hash = "sha256:fb4f1d47ee60efe87d4f6f0ebb5f70b9760db2b2574c59c8e8912be4ebd464c9"},
 ]
 
+[package.dependencies]
+setuptools = ">=61.0"
+tomli = {version = "*", markers = "python_version < \"3.11\""}
+
 [package.extras]
-mypy = ["click (>=6.0)", "mypy (==0.812)", "twisted (>=16.4.0)"]
-scripts = ["click (>=6.0)", "twisted (>=16.4.0)"]
+scripts = ["click (>=6.0)"]
 
 [[package]]
 name = "isort"
@@ -2711,13 +2714,13 @@ urllib3 = ">=1.26.0"
 
 [[package]]
 name = "twisted"
-version = "24.3.0"
+version = "24.7.0rc1"
 description = "An asynchronous networking framework written in Python"
 optional = false
 python-versions = ">=3.8.0"
 files = [
-    {file = "twisted-24.3.0-py3-none-any.whl", hash = "sha256:039f2e6a49ab5108abd94de187fa92377abe5985c7a72d68d0ad266ba19eae63"},
-    {file = "twisted-24.3.0.tar.gz", hash = "sha256:6b38b6ece7296b5e122c9eb17da2eeab3d98a198f50ca9efd00fb03e5b4fd4ae"},
+    {file = "twisted-24.7.0rc1-py3-none-any.whl", hash = "sha256:f37d6656fe4e2871fab29d8952ae90bd6ca8b48a9e4dfa1b348f4cd62e6ba0bb"},
+    {file = "twisted-24.7.0rc1.tar.gz", hash = "sha256:bbc4a2193ca34cfa32f626300746698a6d70fcd77d9c0b79a664c347e39634fc"},
 ]
 
 [package.dependencies]
@@ -2726,48 +2729,26 @@ automat = ">=0.8.0"
 constantly = ">=15.1"
 hyperlink = ">=17.1.1"
 idna = {version = ">=2.4", optional = true, markers = "extra == \"tls\""}
-incremental = ">=22.10.0"
+incremental = ">=24.7.0"
 pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""}
 service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""}
-twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""}
 typing-extensions = ">=4.2.0"
 zope-interface = ">=5"
 
 [package.extras]
-all-non-platform = ["twisted[conch,http2,serial,test,tls]", "twisted[conch,http2,serial,test,tls]"]
+all-non-platform = ["appdirs (>=1.4.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)", "cryptography (>=3.3)", "cython-test-exception-raiser (>=1.0.2,<2)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.56)", "hypothesis (>=6.56)", "idna (>=2.4)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "priority (>=1.1.0,<2.0)", "pyhamcrest (>=2)", "pyhamcrest (>=2)", "pyopenssl (>=21.0.0)", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "pywin32 (!=226)", "service-identity (>=18.1.0)", "service-identity (>=18.1.0)"]
 conch = ["appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)"]
-dev = ["coverage (>=6b1,<7)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "twisted[dev-release]", "twistedchecker (>=0.7,<1.0)"]
+dev = ["coverage (>=7.5,<8.0)", "cython-test-exception-raiser (>=1.0.2,<2)", "hypothesis (>=6.56)", "pydoctor (>=23.9.0,<23.10.0)", "pyflakes (>=2.2,<3.0)", "pyhamcrest (>=2)", "python-subunit (>=1.4,<2.0)", "sphinx (>=6,<7)", "sphinx-rtd-theme (>=1.3,<2.0)", "towncrier (>=23.6,<24.0)", "twistedchecker (>=0.7,<1.0)"]
 dev-release = ["pydoctor (>=23.9.0,<23.10.0)", "pydoctor (>=23.9.0,<23.10.0)", "sphinx (>=6,<7)", "sphinx (>=6,<7)", "sphinx-rtd-theme (>=1.3,<2.0)", "sphinx-rtd-theme (>=1.3,<2.0)", "towncrier (>=23.6,<24.0)", "towncrier (>=23.6,<24.0)"]
-gtk-platform = ["pygobject", "pygobject", "twisted[all-non-platform]", "twisted[all-non-platform]"]
+gtk-platform = ["appdirs (>=1.4.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)", "cryptography (>=3.3)", "cython-test-exception-raiser (>=1.0.2,<2)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.56)", "hypothesis (>=6.56)", "idna (>=2.4)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "priority (>=1.1.0,<2.0)", "pygobject", "pygobject", "pyhamcrest (>=2)", "pyhamcrest (>=2)", "pyopenssl (>=21.0.0)", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "pywin32 (!=226)", "service-identity (>=18.1.0)", "service-identity (>=18.1.0)"]
 http2 = ["h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)"]
-macos-platform = ["pyobjc-core", "pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "pyobjc-framework-cocoa", "twisted[all-non-platform]", "twisted[all-non-platform]"]
-mypy = ["mypy (>=1.8,<2.0)", "mypy-zope (>=1.0.3,<1.1.0)", "twisted[all-non-platform,dev]", "types-pyopenssl", "types-setuptools"]
-osx-platform = ["twisted[macos-platform]", "twisted[macos-platform]"]
+macos-platform = ["appdirs (>=1.4.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)", "cryptography (>=3.3)", "cython-test-exception-raiser (>=1.0.2,<2)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.56)", "hypothesis (>=6.56)", "idna (>=2.4)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "priority (>=1.1.0,<2.0)", "pyhamcrest (>=2)", "pyhamcrest (>=2)", "pyobjc-core", "pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "pyobjc-framework-cocoa", "pyopenssl (>=21.0.0)", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "pywin32 (!=226)", "service-identity (>=18.1.0)", "service-identity (>=18.1.0)"]
+mypy = ["appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "coverage (>=7.5,<8.0)", "cryptography (>=3.3)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.56)", "idna (>=2.4)", "mypy (>=1.8,<2.0)", "mypy-zope (>=1.0.3,<1.1.0)", "priority (>=1.1.0,<2.0)", "pydoctor (>=23.9.0,<23.10.0)", "pyflakes (>=2.2,<3.0)", "pyhamcrest (>=2)", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "python-subunit (>=1.4,<2.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)", "sphinx (>=6,<7)", "sphinx-rtd-theme (>=1.3,<2.0)", "towncrier (>=23.6,<24.0)", "twistedchecker (>=0.7,<1.0)", "types-pyopenssl", "types-setuptools"]
+osx-platform = ["appdirs (>=1.4.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)", "cryptography (>=3.3)", "cython-test-exception-raiser (>=1.0.2,<2)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.56)", "hypothesis (>=6.56)", "idna (>=2.4)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "priority (>=1.1.0,<2.0)", "pyhamcrest (>=2)", "pyhamcrest (>=2)", "pyobjc-core", "pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "pyobjc-framework-cocoa", "pyopenssl (>=21.0.0)", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "pywin32 (!=226)", "service-identity (>=18.1.0)", "service-identity (>=18.1.0)"]
 serial = ["pyserial (>=3.0)", "pywin32 (!=226)"]
 test = ["cython-test-exception-raiser (>=1.0.2,<2)", "hypothesis (>=6.56)", "pyhamcrest (>=2)"]
 tls = ["idna (>=2.4)", "pyopenssl (>=21.0.0)", "service-identity (>=18.1.0)"]
-windows-platform = ["pywin32 (!=226)", "pywin32 (!=226)", "twisted[all-non-platform]", "twisted[all-non-platform]"]
-
-[[package]]
-name = "twisted-iocpsupport"
-version = "1.0.2"
-description = "An extension for use in the twisted I/O Completion Ports reactor."
-optional = false
-python-versions = "*"
-files = [
-    {file = "twisted-iocpsupport-1.0.2.tar.gz", hash = "sha256:72068b206ee809c9c596b57b5287259ea41ddb4774d86725b19f35bf56aa32a9"},
-    {file = "twisted_iocpsupport-1.0.2-cp310-cp310-win32.whl", hash = "sha256:985c06a33f5c0dae92c71a036d1ea63872ee86a21dd9b01e1f287486f15524b4"},
-    {file = "twisted_iocpsupport-1.0.2-cp310-cp310-win_amd64.whl", hash = "sha256:81b3abe3527b367da0220482820cb12a16c661672b7bcfcde328902890d63323"},
-    {file = "twisted_iocpsupport-1.0.2-cp36-cp36m-win32.whl", hash = "sha256:9dbb8823b49f06d4de52721b47de4d3b3026064ef4788ce62b1a21c57c3fff6f"},
-    {file = "twisted_iocpsupport-1.0.2-cp36-cp36m-win_amd64.whl", hash = "sha256:b9fed67cf0f951573f06d560ac2f10f2a4bbdc6697770113a2fc396ea2cb2565"},
-    {file = "twisted_iocpsupport-1.0.2-cp37-cp37m-win32.whl", hash = "sha256:b76b4eed9b27fd63ddb0877efdd2d15835fdcb6baa745cb85b66e5d016ac2878"},
-    {file = "twisted_iocpsupport-1.0.2-cp37-cp37m-win_amd64.whl", hash = "sha256:851b3735ca7e8102e661872390e3bce88f8901bece95c25a0c8bb9ecb8a23d32"},
-    {file = "twisted_iocpsupport-1.0.2-cp38-cp38-win32.whl", hash = "sha256:bf4133139d77fc706d8f572e6b7d82871d82ec7ef25d685c2351bdacfb701415"},
-    {file = "twisted_iocpsupport-1.0.2-cp38-cp38-win_amd64.whl", hash = "sha256:306becd6e22ab6e8e4f36b6bdafd9c92e867c98a5ce517b27fdd27760ee7ae41"},
-    {file = "twisted_iocpsupport-1.0.2-cp39-cp39-win32.whl", hash = "sha256:3c61742cb0bc6c1ac117a7e5f422c129832f0c295af49e01d8a6066df8cfc04d"},
-    {file = "twisted_iocpsupport-1.0.2-cp39-cp39-win_amd64.whl", hash = "sha256:b435857b9efcbfc12f8c326ef0383f26416272260455bbca2cd8d8eca470c546"},
-    {file = "twisted_iocpsupport-1.0.2-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:7d972cfa8439bdcb35a7be78b7ef86d73b34b808c74be56dfa785c8a93b851bf"},
-]
+windows-platform = ["appdirs (>=1.4.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)", "cryptography (>=3.3)", "cython-test-exception-raiser (>=1.0.2,<2)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.56)", "hypothesis (>=6.56)", "idna (>=2.4)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "priority (>=1.1.0,<2.0)", "pyhamcrest (>=2)", "pyhamcrest (>=2)", "pyopenssl (>=21.0.0)", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "pywin32 (!=226)", "pywin32 (!=226)", "pywin32 (!=226)", "service-identity (>=18.1.0)", "service-identity (>=18.1.0)", "twisted-iocpsupport (>=1.0.2)", "twisted-iocpsupport (>=1.0.2)"]
 
 [[package]]
 name = "txredisapi"
@@ -2894,13 +2875,13 @@ files = [
 
 [[package]]
 name = "types-pyopenssl"
-version = "24.1.0.20240425"
+version = "24.1.0.20240722"
 description = "Typing stubs for pyOpenSSL"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "types-pyOpenSSL-24.1.0.20240425.tar.gz", hash = "sha256:0a7e82626c1983dc8dc59292bf20654a51c3c3881bcbb9b337c1da6e32f0204e"},
-    {file = "types_pyOpenSSL-24.1.0.20240425-py3-none-any.whl", hash = "sha256:f51a156835555dd2a1f025621e8c4fbe7493470331afeef96884d1d29bf3a473"},
+    {file = "types-pyOpenSSL-24.1.0.20240722.tar.gz", hash = "sha256:47913b4678a01d879f503a12044468221ed8576263c1540dcb0484ca21b08c39"},
+    {file = "types_pyOpenSSL-24.1.0.20240722-py3-none-any.whl", hash = "sha256:6a7a5d2ec042537934cfb4c9d4deb0e16c4c6250b09358df1f083682fe6fda54"},
 ]
 
 [package.dependencies]
@@ -2934,13 +2915,13 @@ urllib3 = ">=2"
 
 [[package]]
 name = "types-setuptools"
-version = "70.1.0.20240627"
+version = "71.1.0.20240726"
 description = "Typing stubs for setuptools"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "types-setuptools-70.1.0.20240627.tar.gz", hash = "sha256:385907a47b5cf302b928ce07953cd91147d5de6f3da604c31905fdf0ec309e83"},
-    {file = "types_setuptools-70.1.0.20240627-py3-none-any.whl", hash = "sha256:c7bdf05cd0a8b66868b4774c7b3c079d01ae025d8c9562bfc8bf2ff44d263c9c"},
+    {file = "types-setuptools-71.1.0.20240726.tar.gz", hash = "sha256:85ba28e9461bb1be86ebba4db0f1c2408f2b11115b1966334ea9dc464e29303e"},
+    {file = "types_setuptools-71.1.0.20240726-py3-none-any.whl", hash = "sha256:a7775376f36e0ff09bcad236bf265777590a66b11623e48c20bfc30f1444ea36"},
 ]
 
 [[package]]
diff --git a/pyproject.toml b/pyproject.toml
index fe6289839c..c8373c6dbc 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
 
 [tool.poetry]
 name = "matrix-synapse"
-version = "1.112.0rc1"
+version = "1.112.0"
 description = "Homeserver for the Matrix decentralised comms protocol"
 authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
 license = "AGPL-3.0-or-later"
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 85001d9676..7dcb1e01fd 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -225,6 +225,11 @@ class EventContentFields:
     # This is deprecated in MSC2175.
     ROOM_CREATOR: Final = "creator"
 
+    # The version of the room for `m.room.create` events.
+    ROOM_VERSION: Final = "room_version"
+
+    ROOM_NAME: Final = "name"
+
     # Used in m.room.guest_access events.
     GUEST_ACCESS: Final = "guest_access"
 
@@ -237,6 +242,9 @@ class EventContentFields:
     # an unspecced field added to to-device messages to identify them uniquely-ish
     TO_DEVICE_MSGID: Final = "org.matrix.msgid"
 
+    # `m.room.encryption`` algorithm field
+    ENCRYPTION_ALGORITHM: Final = "algorithm"
+
 
 class EventUnsignedContentFields:
     """Fields found inside the 'unsigned' data on events"""
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 36e0f47e51..2e56b671f0 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -554,3 +554,22 @@ def relation_from_event(event: EventBase) -> Optional[_EventRelation]:
             aggregation_key = None
 
     return _EventRelation(parent_id, rel_type, aggregation_key)
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class StrippedStateEvent:
+    """
+    A stripped down state event. Usually used for remote invite/knocks so the user can
+    make an informed decision on whether they want to join.
+
+    Attributes:
+        type: Event `type`
+        state_key: Event `state_key`
+        sender: Event `sender`
+        content: Event `content`
+    """
+
+    type: str
+    state_key: str
+    sender: str
+    content: Dict[str, Any]
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index f937fd4698..54f94add4d 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -49,7 +49,7 @@ from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import RoomVersion
 from synapse.types import JsonDict, Requester
 
-from . import EventBase, make_event_from_dict
+from . import EventBase, StrippedStateEvent, make_event_from_dict
 
 if TYPE_CHECKING:
     from synapse.handlers.relations import BundledAggregations
@@ -854,3 +854,30 @@ def strip_event(event: EventBase) -> JsonDict:
         "content": event.content,
         "sender": event.sender,
     }
+
+
+def parse_stripped_state_event(raw_stripped_event: Any) -> Optional[StrippedStateEvent]:
+    """
+    Given a raw value from an event's `unsigned` field, attempt to parse it into a
+    `StrippedStateEvent`.
+    """
+    if isinstance(raw_stripped_event, dict):
+        # All of these fields are required
+        type = raw_stripped_event.get("type")
+        state_key = raw_stripped_event.get("state_key")
+        sender = raw_stripped_event.get("sender")
+        content = raw_stripped_event.get("content")
+        if (
+            isinstance(type, str)
+            and isinstance(state_key, str)
+            and isinstance(sender, str)
+            and isinstance(content, dict)
+        ):
+            return StrippedStateEvent(
+                type=type,
+                state_key=state_key,
+                sender=sender,
+                content=content,
+            )
+
+    return None
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index a38ff32a71..af5359123c 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -17,6 +17,7 @@
 # [This file includes modifications made by New Vector Limited]
 #
 #
+import enum
 import logging
 from enum import Enum
 from itertools import chain
@@ -26,23 +27,35 @@ from typing import (
     Dict,
     Final,
     List,
+    Literal,
     Mapping,
     Optional,
     Sequence,
     Set,
     Tuple,
+    Union,
 )
 
 import attr
 from immutabledict import immutabledict
 from typing_extensions import assert_never
 
-from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
-from synapse.events import EventBase
-from synapse.events.utils import strip_event
+from synapse.api.constants import (
+    AccountDataTypes,
+    Direction,
+    EventContentFields,
+    EventTypes,
+    Membership,
+)
+from synapse.events import EventBase, StrippedStateEvent
+from synapse.events.utils import parse_stripped_state_event, strip_event
 from synapse.handlers.relations import BundledAggregations
 from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.databases.main.state import (
+    ROOM_UNKNOWN_SENTINEL,
+    Sentinel as StateSentinel,
+)
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
 from synapse.storage.roommember import MemberSummary
 from synapse.types import (
@@ -50,6 +63,7 @@ from synapse.types import (
     JsonDict,
     JsonMapping,
     MultiWriterStreamToken,
+    MutableStateMap,
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
@@ -71,6 +85,12 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+class Sentinel(enum.Enum):
+    # defining a sentinel in this way allows mypy to correctly handle the
+    # type of a dictionary lookup and subsequent type narrowing.
+    UNSET_SENTINEL = object()
+
+
 # The event types that clients should consider as new activity.
 DEFAULT_BUMP_EVENT_TYPES = {
     EventTypes.Create,
@@ -494,8 +514,7 @@ class SlidingSyncHandler:
 
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
-        # Keep track of the rooms that we're going to display and need to fetch more
-        # info about
+        # Keep track of the rooms that we can display and need to fetch more info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
         if has_lists and sync_config.lists is not None:
             sync_room_map = await self.filter_rooms_relevant_for_sync(
@@ -625,10 +644,11 @@ class SlidingSyncHandler:
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
-        all_rooms = set(relevant_room_map)
 
         # Filter out rooms that haven't received updates and we've sent down
         # previously.
+        # Keep track of the rooms that we're going to display and need to fetch more info about
+        relevant_rooms_to_send_map = relevant_room_map
         if from_token:
             rooms_should_send = set()
 
@@ -673,7 +693,7 @@ class SlidingSyncHandler:
                 relevant_room_map.keys(), from_token.stream_token.room_key
             )
             rooms_should_send.update(rooms_that_have_updates)
-            relevant_room_map = {
+            relevant_rooms_to_send_map = {
                 room_id: room_sync_config
                 for room_id, room_sync_config in relevant_room_map.items()
                 if room_id in rooms_should_send
@@ -685,7 +705,7 @@ class SlidingSyncHandler:
             room_sync_result = await self.get_room_sync_data(
                 sync_config=sync_config,
                 room_id=room_id,
-                room_sync_config=relevant_room_map[room_id],
+                room_sync_config=relevant_rooms_to_send_map[room_id],
                 room_membership_for_user_at_to_token=room_membership_for_user_map[
                     room_id
                 ],
@@ -697,14 +717,20 @@ class SlidingSyncHandler:
             if room_sync_result or not from_token:
                 rooms[room_id] = room_sync_result
 
-        if relevant_room_map:
+        if relevant_rooms_to_send_map:
             with start_active_span("sliding_sync.generate_room_entries"):
-                await concurrently_execute(handle_room, relevant_room_map, 10)
+                await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)
 
         extensions = await self.get_extensions_response(
             sync_config=sync_config,
             actual_lists=lists,
-            actual_room_ids=all_rooms,
+            # We're purposely using `relevant_room_map` instead of
+            # `relevant_rooms_to_send_map` here. This needs to be all room_ids we could
+            # send regardless of whether they have an event update or not. The
+            # extensions care about more than just normal events in the rooms (like
+            # account data, read receipts, typing indicators, to-device messages, etc).
+            actual_room_ids=set(relevant_room_map.keys()),
+            actual_room_response_map=rooms,
             from_token=from_token,
             to_token=to_token,
         )
@@ -714,7 +740,7 @@ class SlidingSyncHandler:
                 sync_config=sync_config,
                 room_configs=relevant_room_map,
                 from_token=from_token,
-                sent_room_ids=relevant_room_map.keys(),
+                sent_room_ids=relevant_rooms_to_send_map.keys(),
                 # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
                 unsent_room_ids=[],
             )
@@ -1179,6 +1205,265 @@ class SlidingSyncHandler:
 
         # return None
 
+    async def _bulk_get_stripped_state_for_rooms_from_sync_room_map(
+        self,
+        room_ids: StrCollection,
+        sync_room_map: Dict[str, _RoomMembershipForUser],
+    ) -> Dict[str, Optional[StateMap[StrippedStateEvent]]]:
+        """
+        Fetch stripped state for a list of room IDs. Stripped state is only
+        applicable to invite/knock rooms. Other rooms will have `None` as their
+        stripped state.
+
+        For invite rooms, we pull from `unsigned.invite_room_state`.
+        For knock rooms, we pull from `unsigned.knock_room_state`.
+
+        Args:
+            room_ids: Room IDs to fetch stripped state for
+            sync_room_map: Dictionary of room IDs to sort along with membership
+                information in the room at the time of `to_token`.
+
+        Returns:
+            Mapping from room_id to mapping of (type, state_key) to stripped state
+            event.
+        """
+        room_id_to_stripped_state_map: Dict[
+            str, Optional[StateMap[StrippedStateEvent]]
+        ] = {}
+
+        # Fetch what we haven't before
+        room_ids_to_fetch = [
+            room_id
+            for room_id in room_ids
+            if room_id not in room_id_to_stripped_state_map
+        ]
+
+        # Gather a list of event IDs we can grab stripped state from
+        invite_or_knock_event_ids: List[str] = []
+        for room_id in room_ids_to_fetch:
+            if sync_room_map[room_id].membership in (
+                Membership.INVITE,
+                Membership.KNOCK,
+            ):
+                event_id = sync_room_map[room_id].event_id
+                # If this is an invite/knock then there should be an event_id
+                assert event_id is not None
+                invite_or_knock_event_ids.append(event_id)
+            else:
+                room_id_to_stripped_state_map[room_id] = None
+
+        invite_or_knock_events = await self.store.get_events(invite_or_knock_event_ids)
+        for invite_or_knock_event in invite_or_knock_events.values():
+            room_id = invite_or_knock_event.room_id
+            membership = invite_or_knock_event.membership
+
+            raw_stripped_state_events = None
+            if membership == Membership.INVITE:
+                invite_room_state = invite_or_knock_event.unsigned.get(
+                    "invite_room_state"
+                )
+                raw_stripped_state_events = invite_room_state
+            elif membership == Membership.KNOCK:
+                knock_room_state = invite_or_knock_event.unsigned.get(
+                    "knock_room_state"
+                )
+                raw_stripped_state_events = knock_room_state
+            else:
+                raise AssertionError(
+                    f"Unexpected membership {membership} (this is a problem with Synapse itself)"
+                )
+
+            stripped_state_map: Optional[MutableStateMap[StrippedStateEvent]] = None
+            # Scrutinize unsigned things. `raw_stripped_state_events` should be a list
+            # of stripped events
+            if raw_stripped_state_events is not None:
+                stripped_state_map = {}
+                if isinstance(raw_stripped_state_events, list):
+                    for raw_stripped_event in raw_stripped_state_events:
+                        stripped_state_event = parse_stripped_state_event(
+                            raw_stripped_event
+                        )
+                        if stripped_state_event is not None:
+                            stripped_state_map[
+                                (
+                                    stripped_state_event.type,
+                                    stripped_state_event.state_key,
+                                )
+                            ] = stripped_state_event
+
+            room_id_to_stripped_state_map[room_id] = stripped_state_map
+
+        return room_id_to_stripped_state_map
+
+    async def _bulk_get_partial_current_state_content_for_rooms(
+        self,
+        content_type: Literal[
+            # `content.type` from `EventTypes.Create``
+            "room_type",
+            # `content.algorithm` from `EventTypes.RoomEncryption`
+            "room_encryption",
+        ],
+        room_ids: Set[str],
+        sync_room_map: Dict[str, _RoomMembershipForUser],
+        to_token: StreamToken,
+        room_id_to_stripped_state_map: Dict[
+            str, Optional[StateMap[StrippedStateEvent]]
+        ],
+    ) -> Mapping[str, Union[Optional[str], StateSentinel]]:
+        """
+        Get the given state event content for a list of rooms. First we check the
+        current state of the room, then fallback to stripped state if available, then
+        historical state.
+
+        Args:
+            content_type: Which content to grab
+            room_ids: Room IDs to fetch the given content field for.
+            sync_room_map: Dictionary of room IDs to sort along with membership
+                information in the room at the time of `to_token`.
+            to_token: We filter based on the state of the room at this token
+            room_id_to_stripped_state_map: This does not need to be filled in before
+                calling this function. Mapping from room_id to mapping of (type, state_key)
+                to stripped state event. Modified in place when we fetch new rooms so we can
+                save work next time this function is called.
+
+        Returns:
+            A mapping from room ID to the state event content if the room has
+            the given state event (event_type, ""), otherwise `None`. Rooms unknown to
+            this server will return `ROOM_UNKNOWN_SENTINEL`.
+        """
+        room_id_to_content: Dict[str, Union[Optional[str], StateSentinel]] = {}
+
+        # As a bulk shortcut, use the current state if the server is particpating in the
+        # room (meaning we have current state). Ideally, for leave/ban rooms, we would
+        # want the state at the time of the membership instead of current state to not
+        # leak anything but we consider the create/encryption stripped state events to
+        # not be a secret given they are often set at the start of the room and they are
+        # normally handed out on invite/knock.
+        #
+        # Be mindful to only use this for non-sensitive details. For example, even
+        # though the room name/avatar/topic are also stripped state, they seem a lot
+        # more senstive to leak the current state value of.
+        #
+        # Since this function is cached, we need to make a mutable copy via
+        # `dict(...)`.
+        event_type = ""
+        event_content_field = ""
+        if content_type == "room_type":
+            event_type = EventTypes.Create
+            event_content_field = EventContentFields.ROOM_TYPE
+            room_id_to_content = dict(await self.store.bulk_get_room_type(room_ids))
+        elif content_type == "room_encryption":
+            event_type = EventTypes.RoomEncryption
+            event_content_field = EventContentFields.ENCRYPTION_ALGORITHM
+            room_id_to_content = dict(
+                await self.store.bulk_get_room_encryption(room_ids)
+            )
+        else:
+            assert_never(content_type)
+
+        room_ids_with_results = [
+            room_id
+            for room_id, content_field in room_id_to_content.items()
+            if content_field is not ROOM_UNKNOWN_SENTINEL
+        ]
+
+        # We might not have current room state for remote invite/knocks if we are
+        # the first person on our server to see the room. The best we can do is look
+        # in the optional stripped state from the invite/knock event.
+        room_ids_without_results = room_ids.difference(
+            chain(
+                room_ids_with_results,
+                [
+                    room_id
+                    for room_id, stripped_state_map in room_id_to_stripped_state_map.items()
+                    if stripped_state_map is not None
+                ],
+            )
+        )
+        room_id_to_stripped_state_map.update(
+            await self._bulk_get_stripped_state_for_rooms_from_sync_room_map(
+                room_ids_without_results, sync_room_map
+            )
+        )
+
+        # Update our `room_id_to_content` map based on the stripped state
+        # (applies to invite/knock rooms)
+        rooms_ids_without_stripped_state: Set[str] = set()
+        for room_id in room_ids_without_results:
+            stripped_state_map = room_id_to_stripped_state_map.get(
+                room_id, Sentinel.UNSET_SENTINEL
+            )
+            assert stripped_state_map is not Sentinel.UNSET_SENTINEL, (
+                f"Stripped state left unset for room {room_id}. "
+                + "Make sure you're calling `_bulk_get_stripped_state_for_rooms_from_sync_room_map(...)` "
+                + "with that room_id. (this is a problem with Synapse itself)"
+            )
+
+            # If there is some stripped state, we assume the remote server passed *all*
+            # of the potential stripped state events for the room.
+            if stripped_state_map is not None:
+                create_stripped_event = stripped_state_map.get((EventTypes.Create, ""))
+                stripped_event = stripped_state_map.get((event_type, ""))
+                # Sanity check that we at-least have the create event
+                if create_stripped_event is not None:
+                    if stripped_event is not None:
+                        room_id_to_content[room_id] = stripped_event.content.get(
+                            event_content_field
+                        )
+                    else:
+                        # Didn't see the state event we're looking for in the stripped
+                        # state so we can assume relevant content field is `None`.
+                        room_id_to_content[room_id] = None
+            else:
+                rooms_ids_without_stripped_state.add(room_id)
+
+        # Last resort, we might not have current room state for rooms that the
+        # server has left (no one local is in the room) but we can look at the
+        # historical state.
+        #
+        # Update our `room_id_to_content` map based on the state at the time of
+        # the membership event.
+        for room_id in rooms_ids_without_stripped_state:
+            # TODO: It would be nice to look this up in a bulk way (N+1 queries)
+            #
+            # TODO: `get_state_at(...)` doesn't take into account the "current state".
+            room_state = await self.storage_controllers.state.get_state_at(
+                room_id=room_id,
+                stream_position=to_token.copy_and_replace(
+                    StreamKeyType.ROOM,
+                    sync_room_map[room_id].event_pos.to_room_stream_token(),
+                ),
+                state_filter=StateFilter.from_types(
+                    [
+                        (EventTypes.Create, ""),
+                        (event_type, ""),
+                    ]
+                ),
+                # Partially-stated rooms should have all state events except for
+                # remote membership events so we don't need to wait at all because
+                # we only want the create event and some non-member event.
+                await_full_state=False,
+            )
+            # We can use the create event as a canary to tell whether the server has
+            # seen the room before
+            create_event = room_state.get((EventTypes.Create, ""))
+            state_event = room_state.get((event_type, ""))
+
+            if create_event is None:
+                # Skip for unknown rooms
+                continue
+
+            if state_event is not None:
+                room_id_to_content[room_id] = state_event.content.get(
+                    event_content_field
+                )
+            else:
+                # Didn't see the state event we're looking for in the stripped
+                # state so we can assume relevant content field is `None`.
+                room_id_to_content[room_id] = None
+
+        return room_id_to_content
+
     @trace
     async def filter_rooms(
         self,
@@ -1201,6 +1486,10 @@ class SlidingSyncHandler:
             A filtered dictionary of room IDs along with membership information in the
             room at the time of `to_token`.
         """
+        room_id_to_stripped_state_map: Dict[
+            str, Optional[StateMap[StrippedStateEvent]]
+        ] = {}
+
         filtered_room_id_set = set(sync_room_map.keys())
 
         # Filter for Direct-Message (DM) rooms
@@ -1220,31 +1509,34 @@ class SlidingSyncHandler:
                     if not sync_room_map[room_id].is_dm
                 }
 
-        if filters.spaces:
+        if filters.spaces is not None:
             raise NotImplementedError()
 
         # Filter for encrypted rooms
         if filters.is_encrypted is not None:
+            room_id_to_encryption = (
+                await self._bulk_get_partial_current_state_content_for_rooms(
+                    content_type="room_encryption",
+                    room_ids=filtered_room_id_set,
+                    to_token=to_token,
+                    sync_room_map=sync_room_map,
+                    room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+                )
+            )
+
             # Make a copy so we don't run into an error: `Set changed size during
             # iteration`, when we filter out and remove items
             for room_id in filtered_room_id_set.copy():
-                state_at_to_token = await self.storage_controllers.state.get_state_at(
-                    room_id,
-                    to_token,
-                    state_filter=StateFilter.from_types(
-                        [(EventTypes.RoomEncryption, "")]
-                    ),
-                    # Partially-stated rooms should have all state events except for the
-                    # membership events so we don't need to wait because we only care
-                    # about retrieving the `EventTypes.RoomEncryption` state event here.
-                    # Plus we don't want to block the whole sync waiting for this one
-                    # room.
-                    await_full_state=False,
-                )
-                is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, ""))
+                encryption = room_id_to_encryption.get(room_id, ROOM_UNKNOWN_SENTINEL)
+
+                # Just remove rooms if we can't determine their encryption status
+                if encryption is ROOM_UNKNOWN_SENTINEL:
+                    filtered_room_id_set.remove(room_id)
+                    continue
 
                 # If we're looking for encrypted rooms, filter out rooms that are not
                 # encrypted and vice versa
+                is_encrypted = encryption is not None
                 if (filters.is_encrypted and not is_encrypted) or (
                     not filters.is_encrypted and is_encrypted
                 ):
@@ -1270,15 +1562,26 @@ class SlidingSyncHandler:
         # provided in the list. `None` is a valid type for rooms which do not have a
         # room type.
         if filters.room_types is not None or filters.not_room_types is not None:
-            room_to_type = await self.store.bulk_get_room_type(
-                {
-                    room_id
-                    for room_id in filtered_room_id_set
-                    # We only know the room types for joined rooms
-                    if sync_room_map[room_id].membership == Membership.JOIN
-                }
+            room_id_to_type = (
+                await self._bulk_get_partial_current_state_content_for_rooms(
+                    content_type="room_type",
+                    room_ids=filtered_room_id_set,
+                    to_token=to_token,
+                    sync_room_map=sync_room_map,
+                    room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+                )
             )
-            for room_id, room_type in room_to_type.items():
+
+            # Make a copy so we don't run into an error: `Set changed size during
+            # iteration`, when we filter out and remove items
+            for room_id in filtered_room_id_set.copy():
+                room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL)
+
+                # Just remove rooms if we can't determine their type
+                if room_type is ROOM_UNKNOWN_SENTINEL:
+                    filtered_room_id_set.remove(room_id)
+                    continue
+
                 if (
                     filters.room_types is not None
                     and room_type not in filters.room_types
@@ -1291,13 +1594,24 @@ class SlidingSyncHandler:
                 ):
                     filtered_room_id_set.remove(room_id)
 
-        if filters.room_name_like:
+        if filters.room_name_like is not None:
+            # TODO: The room name is a bit more sensitive to leak than the
+            # create/encryption event. Maybe we should consider a better way to fetch
+            # historical state before implementing this.
+            #
+            # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms(
+            #     content_type="room_name",
+            #     room_ids=filtered_room_id_set,
+            #     to_token=to_token,
+            #     sync_room_map=sync_room_map,
+            #     room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+            # )
             raise NotImplementedError()
 
-        if filters.tags:
+        if filters.tags is not None:
             raise NotImplementedError()
 
-        if filters.not_tags:
+        if filters.not_tags is not None:
             raise NotImplementedError()
 
         # Assemble a new sync room map but only with the `filtered_room_id_set`
@@ -1378,14 +1692,17 @@ class SlidingSyncHandler:
                 in the room at the time of `to_token`.
             to_token: The point in the stream to sync up to.
         """
-        room_state_ids: StateMap[str]
+        state_ids: StateMap[str]
         # People shouldn't see past their leave/ban event
         if room_membership_for_user_at_to_token.membership in (
             Membership.LEAVE,
             Membership.BAN,
         ):
-            # TODO: `get_state_ids_at(...)` doesn't take into account the "current state"
-            room_state_ids = await self.storage_controllers.state.get_state_ids_at(
+            # TODO: `get_state_ids_at(...)` doesn't take into account the "current
+            # state". Maybe we need to use
+            # `get_forward_extremities_for_room_at_stream_ordering(...)` to "Fetch the
+            # current state at the time."
+            state_ids = await self.storage_controllers.state.get_state_ids_at(
                 room_id,
                 stream_position=to_token.copy_and_replace(
                     StreamKeyType.ROOM,
@@ -1404,7 +1721,7 @@ class SlidingSyncHandler:
             )
         # Otherwise, we can get the latest current state in the room
         else:
-            room_state_ids = await self.storage_controllers.state.get_current_state_ids(
+            state_ids = await self.storage_controllers.state.get_current_state_ids(
                 room_id,
                 state_filter,
                 # Partially-stated rooms should have all state events except for
@@ -1419,7 +1736,7 @@ class SlidingSyncHandler:
             )
             # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
 
-        return room_state_ids
+        return state_ids
 
     async def get_current_state_at(
         self,
@@ -1439,17 +1756,17 @@ class SlidingSyncHandler:
                 in the room at the time of `to_token`.
             to_token: The point in the stream to sync up to.
         """
-        room_state_ids = await self.get_current_state_ids_at(
+        state_ids = await self.get_current_state_ids_at(
             room_id=room_id,
             room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
             state_filter=state_filter,
             to_token=to_token,
         )
 
-        event_map = await self.store.get_events(list(room_state_ids.values()))
+        event_map = await self.store.get_events(list(state_ids.values()))
 
         state_map = {}
-        for key, event_id in room_state_ids.items():
+        for key, event_id in state_ids.items():
             event = event_map.get(event_id)
             if event:
                 state_map[key] = event
@@ -1958,6 +2275,7 @@ class SlidingSyncHandler:
         sync_config: SlidingSyncConfig,
         actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
         actual_room_ids: Set[str],
+        actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
     ) -> SlidingSyncResult.Extensions:
@@ -1968,6 +2286,8 @@ class SlidingSyncHandler:
             actual_lists: Sliding window API. A map of list key to list results in the
                 Sliding Sync response.
             actual_room_ids: The actual room IDs in the the Sliding Sync response.
+            actual_room_response_map: A map of room ID to room results in the the
+                Sliding Sync response.
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
         """
@@ -2009,6 +2329,7 @@ class SlidingSyncHandler:
                 sync_config=sync_config,
                 actual_lists=actual_lists,
                 actual_room_ids=actual_room_ids,
+                actual_room_response_map=actual_room_response_map,
                 receipts_request=sync_config.extensions.receipts,
                 to_token=to_token,
                 from_token=from_token,
@@ -2310,12 +2631,12 @@ class SlidingSyncHandler:
             account_data_by_room_map=account_data_by_room_map,
         )
 
-    @trace
     async def get_receipts_extension_response(
         self,
         sync_config: SlidingSyncConfig,
         actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
         actual_room_ids: Set[str],
+        actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
         receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
@@ -2327,6 +2648,8 @@ class SlidingSyncHandler:
             actual_lists: Sliding window API. A map of list key to list results in the
                 Sliding Sync response.
             actual_room_ids: The actual room IDs in the the Sliding Sync response.
+            actual_room_response_map: A map of room ID to room results in the the
+                Sliding Sync response.
             account_data_request: The account_data extension from the request
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
@@ -2364,6 +2687,24 @@ class SlidingSyncHandler:
                 room_id = receipt["room_id"]
                 type = receipt["type"]
                 content = receipt["content"]
+
+                room_result = actual_room_response_map.get(room_id)
+                if room_result is not None:
+                    if room_result.initial:
+                        # TODO: In the future, it would be good to fetch less receipts
+                        # out of the database in the first place but we would need to
+                        # add a new `event_id` index to `receipts_linearized`.
+                        relevant_event_ids = [
+                            event.event_id for event in room_result.timeline_events
+                        ]
+
+                        assert isinstance(content, dict)
+                        content = {
+                            event_id: content_value
+                            for event_id, content_value in content.items()
+                            if event_id in relevant_event_ids
+                        }
+
                 room_id_to_receipt_map[room_id] = {"type": type, "content": content}
 
         return SlidingSyncResult.Extensions.ReceiptsExtension(
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 1c94f3ca46..8f90c17060 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -293,7 +293,9 @@ class StatsHandler:
                     "history_visibility"
                 )
             elif delta.event_type == EventTypes.RoomEncryption:
-                room_state["encryption"] = event_content.get("algorithm")
+                room_state["encryption"] = event_content.get(
+                    EventContentFields.ENCRYPTION_ALGORITHM
+                )
             elif delta.event_type == EventTypes.Name:
                 room_state["name"] = event_content.get("name")
             elif delta.event_type == EventTypes.Topic:
diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py
index 5b5ded757b..97aa429e7d 100644
--- a/synapse/http/proxy.py
+++ b/synapse/http/proxy.py
@@ -62,6 +62,15 @@ HOP_BY_HOP_HEADERS = {
     "Upgrade",
 }
 
+if hasattr(Headers, "_canonicalNameCaps"):
+    # Twisted < 24.7.0rc1
+    _canonicalHeaderName = Headers()._canonicalNameCaps  # type: ignore[attr-defined]
+else:
+    # Twisted >= 24.7.0rc1
+    # But note that `_encodeName` still exists on prior versions,
+    # it just encodes differently
+    _canonicalHeaderName = Headers()._encodeName
+
 
 def parse_connection_header_value(
     connection_header_value: Optional[bytes],
@@ -85,11 +94,10 @@ def parse_connection_header_value(
         The set of header names that should not be copied over from the remote response.
         The keys are capitalized in canonical capitalization.
     """
-    headers = Headers()
     extra_headers_to_remove: Set[str] = set()
     if connection_header_value:
         extra_headers_to_remove = {
-            headers._canonicalNameCaps(connection_option.strip()).decode("ascii")
+            _canonicalHeaderName(connection_option.strip()).decode("ascii")
             for connection_option in connection_header_value.split(b",")
         }
 
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 211795dc39..0d0c610b28 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -74,6 +74,7 @@ from synapse.api.errors import (
 from synapse.config.homeserver import HomeServerConfig
 from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
 from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
+from synapse.types import ISynapseReactor
 from synapse.util import json_encoder
 from synapse.util.caches import intern_dict
 from synapse.util.cancellation import is_function_cancellable
@@ -868,7 +869,8 @@ async def _async_write_json_to_request_in_thread(
 
     with start_active_span("encode_json_response"):
         span = active_span()
-        json_str = await defer_to_thread(request.reactor, encode, span)
+        reactor: ISynapseReactor = request.reactor  # type: ignore
+        json_str = await defer_to_thread(reactor, encode, span)
 
     _write_bytes_to_request(request, json_str)
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index a5b5780679..af169ba51e 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -683,7 +683,7 @@ class SynapseSite(ProxySite):
         self.access_logger = logging.getLogger(logger_name)
         self.server_version_string = server_version_string.encode("ascii")
 
-    def log(self, request: SynapseRequest) -> None:
+    def log(self, request: SynapseRequest) -> None:  # type: ignore[override]
         pass
 
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 066f3d08ae..e12ab94576 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -127,6 +127,8 @@ class SQLBaseStore(metaclass=ABCMeta):
         # Purge other caches based on room state.
         self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
         self._attempt_to_invalidate_cache("get_partial_current_state_ids", (room_id,))
+        self._attempt_to_invalidate_cache("get_room_type", (room_id,))
+        self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
 
     def _invalidate_state_caches_all(self, room_id: str) -> None:
         """Invalidates caches that are based on the current state, but does
@@ -153,6 +155,8 @@ class SQLBaseStore(metaclass=ABCMeta):
             "_get_rooms_for_local_user_where_membership_is_inner", None
         )
         self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
+        self._attempt_to_invalidate_cache("get_room_type", (room_id,))
+        self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
 
     def _attempt_to_invalidate_cache(
         self, cache_name: str, key: Optional[Collection[Any]]
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 8c2c0c5ab0..9fd50951fa 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -268,13 +268,23 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token)  # type: ignore[attr-defined]
 
             if data.type == EventTypes.Member:
-                self.get_rooms_for_user.invalidate((data.state_key,))  # type: ignore[attr-defined]
+                self._attempt_to_invalidate_cache(
+                    "get_rooms_for_user", (data.state_key,)
+                )
+            elif data.type == EventTypes.RoomEncryption:
+                self._attempt_to_invalidate_cache(
+                    "get_room_encryption", (data.room_id,)
+                )
+            elif data.type == EventTypes.Create:
+                self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
         elif row.type == EventsStreamAllStateRow.TypeId:
             assert isinstance(data, EventsStreamAllStateRow)
             # Similar to the above, but the entire caches are invalidated. This is
             # unfortunate for the membership caches, but should recover quickly.
             self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token)  # type: ignore[attr-defined]
-            self.get_rooms_for_user.invalidate_all()  # type: ignore[attr-defined]
+            self._attempt_to_invalidate_cache("get_rooms_for_user", None)
+            self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
+            self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
         else:
             raise Exception("Unknown events stream row type %s" % (row.type,))
 
@@ -348,6 +358,10 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             self._attempt_to_invalidate_cache(
                 "get_forgotten_rooms_for_user", (state_key,)
             )
+        elif etype == EventTypes.Create:
+            self._attempt_to_invalidate_cache("get_room_type", (room_id,))
+        elif etype == EventTypes.RoomEncryption:
+            self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
 
         if relates_to:
             self._attempt_to_invalidate_cache(
@@ -408,6 +422,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         self._attempt_to_invalidate_cache("get_thread_summary", None)
         self._attempt_to_invalidate_cache("get_thread_participated", None)
         self._attempt_to_invalidate_cache("get_threads", (room_id,))
+        self._attempt_to_invalidate_cache("get_room_type", (room_id,))
+        self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
 
         self._attempt_to_invalidate_cache("_get_state_group_for_event", None)
 
@@ -460,6 +476,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
         self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
         self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))
+        self._attempt_to_invalidate_cache("get_room_type", (room_id,))
+        self._attempt_to_invalidate_cache("get_room_encryption", (room_id,))
 
         # And delete state caches.
 
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 5188b2f7a4..62bc4600fb 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -30,6 +30,7 @@ from typing import (
     Iterable,
     List,
     Mapping,
+    MutableMapping,
     Optional,
     Set,
     Tuple,
@@ -72,10 +73,18 @@ logger = logging.getLogger(__name__)
 
 _T = TypeVar("_T")
 
-
 MAX_STATE_DELTA_HOPS = 100
 
 
+# Freeze so it's immutable and we can use it as a cache value
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class Sentinel:
+    pass
+
+
+ROOM_UNKNOWN_SENTINEL = Sentinel()
+
+
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class EventMetadata:
     """Returned by `get_metadata_for_events`"""
@@ -300,51 +309,189 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     @cached(max_entries=10000)
     async def get_room_type(self, room_id: str) -> Optional[str]:
-        """Get the room type for a given room. The server must be joined to the
-        given room.
-        """
-
-        row = await self.db_pool.simple_select_one(
-            table="room_stats_state",
-            keyvalues={"room_id": room_id},
-            retcols=("room_type",),
-            allow_none=True,
-            desc="get_room_type",
-        )
-
-        if row is not None:
-            return row[0]
-
-        # If we haven't updated `room_stats_state` with the room yet, query the
-        # create event directly.
-        create_event = await self.get_create_event_for_room(room_id)
-        room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
-        return room_type
+        raise NotImplementedError()
 
     @cachedList(cached_method_name="get_room_type", list_name="room_ids")
     async def bulk_get_room_type(
         self, room_ids: Set[str]
-    ) -> Mapping[str, Optional[str]]:
-        """Bulk fetch room types for the given rooms, the server must be in all
-        the rooms given.
+    ) -> Mapping[str, Union[Optional[str], Sentinel]]:
         """
+        Bulk fetch room types for the given rooms (via current state).
 
-        rows = await self.db_pool.simple_select_many_batch(
-            table="room_stats_state",
-            column="room_id",
-            iterable=room_ids,
-            retcols=("room_id", "room_type"),
-            desc="bulk_get_room_type",
+        Since this function is cached, any missing values would be cached as `None`. In
+        order to distinguish between an unencrypted room that has `None` encryption and
+        a room that is unknown to the server where we might want to omit the value
+        (which would make it cached as `None`), instead we use the sentinel value
+        `ROOM_UNKNOWN_SENTINEL`.
+
+        Returns:
+            A mapping from room ID to the room's type (`None` is a valid room type).
+            Rooms unknown to this server will return `ROOM_UNKNOWN_SENTINEL`.
+        """
+
+        def txn(
+            txn: LoggingTransaction,
+        ) -> MutableMapping[str, Union[Optional[str], Sentinel]]:
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "room_id", room_ids
+            )
+
+            # We can't rely on `room_stats_state.room_type` if the server has left the
+            # room because the `room_id` will still be in the table but everything will
+            # be set to `None` but `None` is a valid room type value. We join against
+            # the `room_stats_current` table which keeps track of the
+            # `current_state_events` count (and a proxy value `local_users_in_room`
+            # which can used to assume the server is participating in the room and has
+            # current state) to ensure that the data in `room_stats_state` is up-to-date
+            # with the current state.
+            #
+            # FIXME: Use `room_stats_current.current_state_events` instead of
+            # `room_stats_current.local_users_in_room` once
+            # https://github.com/element-hq/synapse/issues/17457 is fixed.
+            sql = f"""
+                SELECT room_id, room_type
+                FROM room_stats_state
+                INNER JOIN room_stats_current USING (room_id)
+                WHERE
+                    {clause}
+                    AND local_users_in_room > 0
+            """
+
+            txn.execute(sql, args)
+
+            room_id_to_type_map = {}
+            for row in txn:
+                room_id_to_type_map[row[0]] = row[1]
+
+            return room_id_to_type_map
+
+        results = await self.db_pool.runInteraction(
+            "bulk_get_room_type",
+            txn,
         )
 
         # If we haven't updated `room_stats_state` with the room yet, query the
         # create events directly. This should happen only rarely so we don't
         # mind if we do this in a loop.
-        results = dict(rows)
         for room_id in room_ids - results.keys():
-            create_event = await self.get_create_event_for_room(room_id)
-            room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
-            results[room_id] = room_type
+            try:
+                create_event = await self.get_create_event_for_room(room_id)
+                room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
+                results[room_id] = room_type
+            except NotFoundError:
+                # We use the sentinel value to distinguish between `None` which is a
+                # valid room type and a room that is unknown to the server so the value
+                # is just unset.
+                results[room_id] = ROOM_UNKNOWN_SENTINEL
+
+        return results
+
+    @cached(max_entries=10000)
+    async def get_room_encryption(self, room_id: str) -> Optional[str]:
+        raise NotImplementedError()
+
+    @cachedList(cached_method_name="get_room_encryption", list_name="room_ids")
+    async def bulk_get_room_encryption(
+        self, room_ids: Set[str]
+    ) -> Mapping[str, Union[Optional[str], Sentinel]]:
+        """
+        Bulk fetch room encryption for the given rooms (via current state).
+
+        Since this function is cached, any missing values would be cached as `None`. In
+        order to distinguish between an unencrypted room that has `None` encryption and
+        a room that is unknown to the server where we might want to omit the value
+        (which would make it cached as `None`), instead we use the sentinel value
+        `ROOM_UNKNOWN_SENTINEL`.
+
+        Returns:
+            A mapping from room ID to the room's encryption algorithm if the room is
+            encrypted, otherwise `None`. Rooms unknown to this server will return
+            `ROOM_UNKNOWN_SENTINEL`.
+        """
+
+        def txn(
+            txn: LoggingTransaction,
+        ) -> MutableMapping[str, Union[Optional[str], Sentinel]]:
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "room_id", room_ids
+            )
+
+            # We can't rely on `room_stats_state.encryption` if the server has left the
+            # room because the `room_id` will still be in the table but everything will
+            # be set to `None` but `None` is a valid encryption value. We join against
+            # the `room_stats_current` table which keeps track of the
+            # `current_state_events` count (and a proxy value `local_users_in_room`
+            # which can used to assume the server is participating in the room and has
+            # current state) to ensure that the data in `room_stats_state` is up-to-date
+            # with the current state.
+            #
+            # FIXME: Use `room_stats_current.current_state_events` instead of
+            # `room_stats_current.local_users_in_room` once
+            # https://github.com/element-hq/synapse/issues/17457 is fixed.
+            sql = f"""
+                SELECT room_id, encryption
+                FROM room_stats_state
+                INNER JOIN room_stats_current USING (room_id)
+                WHERE
+                    {clause}
+                    AND local_users_in_room > 0
+            """
+
+            txn.execute(sql, args)
+
+            room_id_to_encryption_map = {}
+            for row in txn:
+                room_id_to_encryption_map[row[0]] = row[1]
+
+            return room_id_to_encryption_map
+
+        results = await self.db_pool.runInteraction(
+            "bulk_get_room_encryption",
+            txn,
+        )
+
+        # If we haven't updated `room_stats_state` with the room yet, query the state
+        # directly. This should happen only rarely so we don't mind if we do this in a
+        # loop.
+        encryption_event_ids: List[str] = []
+        for room_id in room_ids - results.keys():
+            state_map = await self.get_partial_filtered_current_state_ids(
+                room_id,
+                state_filter=StateFilter.from_types(
+                    [
+                        (EventTypes.Create, ""),
+                        (EventTypes.RoomEncryption, ""),
+                    ]
+                ),
+            )
+            # We can use the create event as a canary to tell whether the server has
+            # seen the room before
+            create_event_id = state_map.get((EventTypes.Create, ""))
+            encryption_event_id = state_map.get((EventTypes.RoomEncryption, ""))
+
+            if create_event_id is None:
+                # We use the sentinel value to distinguish between `None` which is a
+                # valid room type and a room that is unknown to the server so the value
+                # is just unset.
+                results[room_id] = ROOM_UNKNOWN_SENTINEL
+                continue
+
+            if encryption_event_id is None:
+                results[room_id] = None
+            else:
+                encryption_event_ids.append(encryption_event_id)
+
+        encryption_event_map = await self.get_events(encryption_event_ids)
+
+        for encryption_event_id in encryption_event_ids:
+            encryption_event = encryption_event_map.get(encryption_event_id)
+            # If the curent state says there is an encryption event, we should have it
+            # in the database.
+            assert encryption_event is not None
+
+            results[encryption_event.room_id] = encryption_event.content.get(
+                EventContentFields.ENCRYPTION_ALGORITHM
+            )
 
         return results
 
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 12bdb94d3a..2f7e92665c 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -152,7 +152,7 @@ class SlidingSyncResult:
     Attributes:
         next_pos: The next position token in the sliding window to request (next_batch).
         lists: Sliding window API. A map of list key to list results.
-        rooms: Room subscription API. A map of room ID to room subscription to room results.
+        rooms: Room subscription API. A map of room ID to room results.
         extensions: Extensions API. A map of extension key to extension results.
     """
 
diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py
index a7aa9bb8af..96da47f3b9 100644
--- a/tests/handlers/test_sliding_sync.py
+++ b/tests/handlers/test_sliding_sync.py
@@ -19,7 +19,7 @@
 #
 import logging
 from copy import deepcopy
-from typing import Dict, Optional
+from typing import Dict, List, Optional
 from unittest.mock import patch
 
 from parameterized import parameterized
@@ -35,7 +35,7 @@ from synapse.api.constants import (
     RoomTypes,
 )
 from synapse.api.room_versions import RoomVersions
-from synapse.events import make_event_from_dict
+from synapse.events import StrippedStateEvent, make_event_from_dict
 from synapse.events.snapshot import EventContext
 from synapse.handlers.sliding_sync import (
     RoomSyncConfig,
@@ -3093,6 +3093,78 @@ class FilterRoomsTestCase(HomeserverTestCase):
 
         return room_id
 
+    _remote_invite_count: int = 0
+
+    def _create_remote_invite_room_for_user(
+        self,
+        invitee_user_id: str,
+        unsigned_invite_room_state: Optional[List[StrippedStateEvent]],
+    ) -> str:
+        """
+        Create a fake invite for a remote room and persist it.
+
+        We don't have any state for these kind of rooms and can only rely on the
+        stripped state included in the unsigned portion of the invite event to identify
+        the room.
+
+        Args:
+            invitee_user_id: The person being invited
+            unsigned_invite_room_state: List of stripped state events to assist the
+                receiver in identifying the room.
+
+        Returns:
+            The room ID of the remote invite room
+        """
+        invite_room_id = f"!test_room{self._remote_invite_count}:remote_server"
+
+        invite_event_dict = {
+            "room_id": invite_room_id,
+            "sender": "@inviter:remote_server",
+            "state_key": invitee_user_id,
+            "depth": 1,
+            "origin_server_ts": 1,
+            "type": EventTypes.Member,
+            "content": {"membership": Membership.INVITE},
+            "auth_events": [],
+            "prev_events": [],
+        }
+        if unsigned_invite_room_state is not None:
+            serialized_stripped_state_events = []
+            for stripped_event in unsigned_invite_room_state:
+                serialized_stripped_state_events.append(
+                    {
+                        "type": stripped_event.type,
+                        "state_key": stripped_event.state_key,
+                        "sender": stripped_event.sender,
+                        "content": stripped_event.content,
+                    }
+                )
+
+            invite_event_dict["unsigned"] = {
+                "invite_room_state": serialized_stripped_state_events
+            }
+
+        invite_event = make_event_from_dict(
+            invite_event_dict,
+            room_version=RoomVersions.V10,
+        )
+        invite_event.internal_metadata.outlier = True
+        invite_event.internal_metadata.out_of_band_membership = True
+
+        self.get_success(
+            self.store.maybe_store_room_on_outlier_membership(
+                room_id=invite_room_id, room_version=invite_event.room_version
+            )
+        )
+        context = EventContext.for_outlier(self.hs.get_storage_controllers())
+        persist_controller = self.hs.get_storage_controllers().persistence
+        assert persist_controller is not None
+        self.get_success(persist_controller.persist_event(invite_event, context))
+
+        self._remote_invite_count += 1
+
+        return invite_room_id
+
     def test_filter_dm_rooms(self) -> None:
         """
         Test `filter.is_dm` for DM rooms
@@ -3157,7 +3229,288 @@ class FilterRoomsTestCase(HomeserverTestCase):
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
 
-        # Create a normal room
+        # Create an unencrypted room
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # Create an encrypted room
+        encrypted_room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.helper.send_state(
+            encrypted_room_id,
+            EventTypes.RoomEncryption,
+            {EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
+            tok=user1_tok,
+        )
+
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Get the rooms the user should be syncing with
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
+        )
+
+        # Try with `is_encrypted=True`
+        truthy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=True,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(truthy_filtered_room_map.keys(), {encrypted_room_id})
+
+        # Try with `is_encrypted=False`
+        falsy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=False,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(falsy_filtered_room_map.keys(), {room_id})
+
+    def test_filter_encrypted_server_left_room(self) -> None:
+        """
+        Test that we can apply a `filter.is_encrypted` against a room that everyone has left.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        before_rooms_token = self.event_sources.get_current_token()
+
+        # Create an unencrypted room
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+        # Leave the room
+        self.helper.leave(room_id, user1_id, tok=user1_tok)
+
+        # Create an encrypted room
+        encrypted_room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.helper.send_state(
+            encrypted_room_id,
+            EventTypes.RoomEncryption,
+            {EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
+            tok=user1_tok,
+        )
+        # Leave the room
+        self.helper.leave(encrypted_room_id, user1_id, tok=user1_tok)
+
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Get the rooms the user should be syncing with
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            # We're using a `from_token` so that the room is considered `newly_left` and
+            # appears in our list of relevant sync rooms
+            from_token=before_rooms_token,
+            to_token=after_rooms_token,
+        )
+
+        # Try with `is_encrypted=True`
+        truthy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=True,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(truthy_filtered_room_map.keys(), {encrypted_room_id})
+
+        # Try with `is_encrypted=False`
+        falsy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=False,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(falsy_filtered_room_map.keys(), {room_id})
+
+    def test_filter_encrypted_server_left_room2(self) -> None:
+        """
+        Test that we can apply a `filter.is_encrypted` against a room that everyone has
+        left.
+
+        There is still someone local who is invited to the rooms but that doesn't affect
+        whether the server is participating in the room (users need to be joined).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        _user2_tok = self.login(user2_id, "pass")
+
+        before_rooms_token = self.event_sources.get_current_token()
+
+        # Create an unencrypted room
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+        # Invite user2
+        self.helper.invite(room_id, targ=user2_id, tok=user1_tok)
+        # User1 leaves the room
+        self.helper.leave(room_id, user1_id, tok=user1_tok)
+
+        # Create an encrypted room
+        encrypted_room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.helper.send_state(
+            encrypted_room_id,
+            EventTypes.RoomEncryption,
+            {EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
+            tok=user1_tok,
+        )
+        # Invite user2
+        self.helper.invite(encrypted_room_id, targ=user2_id, tok=user1_tok)
+        # User1 leaves the room
+        self.helper.leave(encrypted_room_id, user1_id, tok=user1_tok)
+
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Get the rooms the user should be syncing with
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            # We're using a `from_token` so that the room is considered `newly_left` and
+            # appears in our list of relevant sync rooms
+            from_token=before_rooms_token,
+            to_token=after_rooms_token,
+        )
+
+        # Try with `is_encrypted=True`
+        truthy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=True,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(truthy_filtered_room_map.keys(), {encrypted_room_id})
+
+        # Try with `is_encrypted=False`
+        falsy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=False,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(falsy_filtered_room_map.keys(), {room_id})
+
+    def test_filter_encrypted_after_we_left(self) -> None:
+        """
+        Test that we can apply a `filter.is_encrypted` against a room that was encrypted
+        after we left the room (make sure we don't just use the current state)
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_rooms_token = self.event_sources.get_current_token()
+
+        # Create an unencrypted room
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        # Leave the room
+        self.helper.join(room_id, user1_id, tok=user1_tok)
+        self.helper.leave(room_id, user1_id, tok=user1_tok)
+
+        # Create a room that will be encrypted
+        encrypted_after_we_left_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok
+        )
+        # Leave the room
+        self.helper.join(encrypted_after_we_left_room_id, user1_id, tok=user1_tok)
+        self.helper.leave(encrypted_after_we_left_room_id, user1_id, tok=user1_tok)
+
+        # Encrypt the room after we've left
+        self.helper.send_state(
+            encrypted_after_we_left_room_id,
+            EventTypes.RoomEncryption,
+            {EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
+            tok=user2_tok,
+        )
+
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Get the rooms the user should be syncing with
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            # We're using a `from_token` so that the room is considered `newly_left` and
+            # appears in our list of relevant sync rooms
+            from_token=before_rooms_token,
+            to_token=after_rooms_token,
+        )
+
+        # Try with `is_encrypted=True`
+        truthy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=True,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        # Even though we left the room before it was encrypted, we still see it because
+        # someone else on our server is still participating in the room and we "leak"
+        # the current state to the left user. But we consider the room encryption status
+        # to not be a secret given it's often set at the start of the room and it's one
+        # of the stripped state events that is normally handed out.
+        self.assertEqual(
+            truthy_filtered_room_map.keys(), {encrypted_after_we_left_room_id}
+        )
+
+        # Try with `is_encrypted=False`
+        falsy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=False,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        # Even though we left the room before it was encrypted... (see comment above)
+        self.assertEqual(falsy_filtered_room_map.keys(), {room_id})
+
+    def test_filter_encrypted_with_remote_invite_room_no_stripped_state(self) -> None:
+        """
+        Test that we can apply a `filter.is_encrypted` filter against a remote invite
+        room without any `unsigned.invite_room_state` (stripped state).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a remote invite room without any `unsigned.invite_room_state`
+        _remote_invite_room_id = self._create_remote_invite_room_for_user(
+            user1_id, None
+        )
+
+        # Create an unencrypted room
         room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
 
         # Create an encrypted room
@@ -3165,7 +3518,7 @@ class FilterRoomsTestCase(HomeserverTestCase):
         self.helper.send_state(
             encrypted_room_id,
             EventTypes.RoomEncryption,
-            {"algorithm": "m.megolm.v1.aes-sha2"},
+            {EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
             tok=user1_tok,
         )
 
@@ -3190,6 +3543,8 @@ class FilterRoomsTestCase(HomeserverTestCase):
             )
         )
 
+        # `remote_invite_room_id` should not appear because we can't figure out whether
+        # it is encrypted or not (no stripped state, `unsigned.invite_room_state`).
         self.assertEqual(truthy_filtered_room_map.keys(), {encrypted_room_id})
 
         # Try with `is_encrypted=False`
@@ -3204,8 +3559,179 @@ class FilterRoomsTestCase(HomeserverTestCase):
             )
         )
 
+        # `remote_invite_room_id` should not appear because we can't figure out whether
+        # it is encrypted or not (no stripped state, `unsigned.invite_room_state`).
+        self.assertEqual(falsy_filtered_room_map.keys(), {room_id})
+
+    def test_filter_encrypted_with_remote_invite_encrypted_room(self) -> None:
+        """
+        Test that we can apply a `filter.is_encrypted` filter against a remote invite
+        encrypted room with some `unsigned.invite_room_state` (stripped state).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a remote invite room with some `unsigned.invite_room_state`
+        # indicating that the room is encrypted.
+        remote_invite_room_id = self._create_remote_invite_room_for_user(
+            user1_id,
+            [
+                StrippedStateEvent(
+                    type=EventTypes.Create,
+                    state_key="",
+                    sender="@inviter:remote_server",
+                    content={
+                        EventContentFields.ROOM_CREATOR: "@inviter:remote_server",
+                        EventContentFields.ROOM_VERSION: RoomVersions.V10.identifier,
+                    },
+                ),
+                StrippedStateEvent(
+                    type=EventTypes.RoomEncryption,
+                    state_key="",
+                    sender="@inviter:remote_server",
+                    content={
+                        EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2",
+                    },
+                ),
+            ],
+        )
+
+        # Create an unencrypted room
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # Create an encrypted room
+        encrypted_room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.helper.send_state(
+            encrypted_room_id,
+            EventTypes.RoomEncryption,
+            {EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
+            tok=user1_tok,
+        )
+
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Get the rooms the user should be syncing with
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
+        )
+
+        # Try with `is_encrypted=True`
+        truthy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=True,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        # `remote_invite_room_id` should appear here because it is encrypted
+        # according to the stripped state
+        self.assertEqual(
+            truthy_filtered_room_map.keys(), {encrypted_room_id, remote_invite_room_id}
+        )
+
+        # Try with `is_encrypted=False`
+        falsy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=False,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        # `remote_invite_room_id` should not appear here because it is encrypted
+        # according to the stripped state
         self.assertEqual(falsy_filtered_room_map.keys(), {room_id})
 
+    def test_filter_encrypted_with_remote_invite_unencrypted_room(self) -> None:
+        """
+        Test that we can apply a `filter.is_encrypted` filter against a remote invite
+        unencrypted room with some `unsigned.invite_room_state` (stripped state).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a remote invite room with some `unsigned.invite_room_state`
+        # but don't set any room encryption event.
+        remote_invite_room_id = self._create_remote_invite_room_for_user(
+            user1_id,
+            [
+                StrippedStateEvent(
+                    type=EventTypes.Create,
+                    state_key="",
+                    sender="@inviter:remote_server",
+                    content={
+                        EventContentFields.ROOM_CREATOR: "@inviter:remote_server",
+                        EventContentFields.ROOM_VERSION: RoomVersions.V10.identifier,
+                    },
+                ),
+                # No room encryption event
+            ],
+        )
+
+        # Create an unencrypted room
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # Create an encrypted room
+        encrypted_room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.helper.send_state(
+            encrypted_room_id,
+            EventTypes.RoomEncryption,
+            {EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
+            tok=user1_tok,
+        )
+
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Get the rooms the user should be syncing with
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
+        )
+
+        # Try with `is_encrypted=True`
+        truthy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=True,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        # `remote_invite_room_id` should not appear here because it is unencrypted
+        # according to the stripped state
+        self.assertEqual(truthy_filtered_room_map.keys(), {encrypted_room_id})
+
+        # Try with `is_encrypted=False`
+        falsy_filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_encrypted=False,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        # `remote_invite_room_id` should appear because it is unencrypted according to
+        # the stripped state
+        self.assertEqual(
+            falsy_filtered_room_map.keys(), {room_id, remote_invite_room_id}
+        )
+
     def test_filter_invite_rooms(self) -> None:
         """
         Test `filter.is_invite` for rooms that the user has been invited to
@@ -3461,48 +3987,160 @@ class FilterRoomsTestCase(HomeserverTestCase):
 
         self.assertEqual(filtered_room_map.keys(), {space_room_id})
 
-    def test_filter_room_types_with_invite_remote_room(self) -> None:
-        """Test that we can apply a room type filter, even if we have an invite
-        for a remote room.
+    def test_filter_room_types_server_left_room(self) -> None:
+        """
+        Test that we can apply a `filter.room_types` against a room that everyone has left.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        before_rooms_token = self.event_sources.get_current_token()
+
+        # Create a normal room (no room type)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+        # Leave the room
+        self.helper.leave(room_id, user1_id, tok=user1_tok)
+
+        # Create a space room
+        space_room_id = self.helper.create_room_as(
+            user1_id,
+            tok=user1_tok,
+            extra_content={
+                "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE}
+            },
+        )
+        # Leave the room
+        self.helper.leave(space_room_id, user1_id, tok=user1_tok)
+
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Get the rooms the user should be syncing with
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            # We're using a `from_token` so that the room is considered `newly_left` and
+            # appears in our list of relevant sync rooms
+            from_token=before_rooms_token,
+            to_token=after_rooms_token,
+        )
 
-        This is a regression test.
+        # Try finding only normal rooms
+        filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(room_types=[None]),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(filtered_room_map.keys(), {room_id})
+
+        # Try finding only spaces
+        filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(room_types=[RoomTypes.SPACE]),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(filtered_room_map.keys(), {space_room_id})
+
+    def test_filter_room_types_server_left_room2(self) -> None:
         """
+        Test that we can apply a `filter.room_types` against a room that everyone has left.
 
+        There is still someone local who is invited to the rooms but that doesn't affect
+        whether the server is participating in the room (users need to be joined).
+        """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        _user2_tok = self.login(user2_id, "pass")
 
-        # Create a fake remote invite and persist it.
-        invite_room_id = "!some:room"
-        invite_event = make_event_from_dict(
-            {
-                "room_id": invite_room_id,
-                "sender": "@user:test.serv",
-                "state_key": user1_id,
-                "depth": 1,
-                "origin_server_ts": 1,
-                "type": EventTypes.Member,
-                "content": {"membership": Membership.INVITE},
-                "auth_events": [],
-                "prev_events": [],
+        before_rooms_token = self.event_sources.get_current_token()
+
+        # Create a normal room (no room type)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+        # Invite user2
+        self.helper.invite(room_id, targ=user2_id, tok=user1_tok)
+        # User1 leaves the room
+        self.helper.leave(room_id, user1_id, tok=user1_tok)
+
+        # Create a space room
+        space_room_id = self.helper.create_room_as(
+            user1_id,
+            tok=user1_tok,
+            extra_content={
+                "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE}
             },
-            room_version=RoomVersions.V10,
         )
-        invite_event.internal_metadata.outlier = True
-        invite_event.internal_metadata.out_of_band_membership = True
+        # Invite user2
+        self.helper.invite(space_room_id, targ=user2_id, tok=user1_tok)
+        # User1 leaves the room
+        self.helper.leave(space_room_id, user1_id, tok=user1_tok)
 
-        self.get_success(
-            self.store.maybe_store_room_on_outlier_membership(
-                room_id=invite_room_id, room_version=invite_event.room_version
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Get the rooms the user should be syncing with
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            # We're using a `from_token` so that the room is considered `newly_left` and
+            # appears in our list of relevant sync rooms
+            from_token=before_rooms_token,
+            to_token=after_rooms_token,
+        )
+
+        # Try finding only normal rooms
+        filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(room_types=[None]),
+                after_rooms_token,
             )
         )
-        context = EventContext.for_outlier(self.hs.get_storage_controllers())
-        persist_controller = self.hs.get_storage_controllers().persistence
-        assert persist_controller is not None
-        self.get_success(persist_controller.persist_event(invite_event, context))
+
+        self.assertEqual(filtered_room_map.keys(), {room_id})
+
+        # Try finding only spaces
+        filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(room_types=[RoomTypes.SPACE]),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(filtered_room_map.keys(), {space_room_id})
+
+    def test_filter_room_types_with_remote_invite_room_no_stripped_state(self) -> None:
+        """
+        Test that we can apply a `filter.room_types` filter against a remote invite
+        room without any `unsigned.invite_room_state` (stripped state).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a remote invite room without any `unsigned.invite_room_state`
+        _remote_invite_room_id = self._create_remote_invite_room_for_user(
+            user1_id, None
+        )
 
         # Create a normal room (no room type)
         room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
 
+        # Create a space room
+        space_room_id = self.helper.create_room_as(
+            user1_id,
+            tok=user1_tok,
+            extra_content={
+                "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE}
+            },
+        )
+
         after_rooms_token = self.event_sources.get_current_token()
 
         # Get the rooms the user should be syncing with
@@ -3512,18 +4150,186 @@ class FilterRoomsTestCase(HomeserverTestCase):
             to_token=after_rooms_token,
         )
 
+        # Try finding only normal rooms
         filtered_room_map = self.get_success(
             self.sliding_sync_handler.filter_rooms(
                 UserID.from_string(user1_id),
                 sync_room_map,
-                SlidingSyncConfig.SlidingSyncList.Filters(
-                    room_types=[None, RoomTypes.SPACE],
+                SlidingSyncConfig.SlidingSyncList.Filters(room_types=[None]),
+                after_rooms_token,
+            )
+        )
+
+        # `remote_invite_room_id` should not appear because we can't figure out what
+        # room type it is (no stripped state, `unsigned.invite_room_state`)
+        self.assertEqual(filtered_room_map.keys(), {room_id})
+
+        # Try finding only spaces
+        filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(room_types=[RoomTypes.SPACE]),
+                after_rooms_token,
+            )
+        )
+
+        # `remote_invite_room_id` should not appear because we can't figure out what
+        # room type it is (no stripped state, `unsigned.invite_room_state`)
+        self.assertEqual(filtered_room_map.keys(), {space_room_id})
+
+    def test_filter_room_types_with_remote_invite_space(self) -> None:
+        """
+        Test that we can apply a `filter.room_types` filter against a remote invite
+        to a space room with some `unsigned.invite_room_state` (stripped state).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a remote invite room with some `unsigned.invite_room_state` indicating
+        # that it is a space room
+        remote_invite_room_id = self._create_remote_invite_room_for_user(
+            user1_id,
+            [
+                StrippedStateEvent(
+                    type=EventTypes.Create,
+                    state_key="",
+                    sender="@inviter:remote_server",
+                    content={
+                        EventContentFields.ROOM_CREATOR: "@inviter:remote_server",
+                        EventContentFields.ROOM_VERSION: RoomVersions.V10.identifier,
+                        # Specify that it is a space room
+                        EventContentFields.ROOM_TYPE: RoomTypes.SPACE,
+                    },
+                ),
+            ],
+        )
+
+        # Create a normal room (no room type)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # Create a space room
+        space_room_id = self.helper.create_room_as(
+            user1_id,
+            tok=user1_tok,
+            extra_content={
+                "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE}
+            },
+        )
+
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Get the rooms the user should be syncing with
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
+        )
+
+        # Try finding only normal rooms
+        filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(room_types=[None]),
+                after_rooms_token,
+            )
+        )
+
+        # `remote_invite_room_id` should not appear here because it is a space room
+        # according to the stripped state
+        self.assertEqual(filtered_room_map.keys(), {room_id})
+
+        # Try finding only spaces
+        filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(room_types=[RoomTypes.SPACE]),
+                after_rooms_token,
+            )
+        )
+
+        # `remote_invite_room_id` should appear here because it is a space room
+        # according to the stripped state
+        self.assertEqual(
+            filtered_room_map.keys(), {space_room_id, remote_invite_room_id}
+        )
+
+    def test_filter_room_types_with_remote_invite_normal_room(self) -> None:
+        """
+        Test that we can apply a `filter.room_types` filter against a remote invite
+        to a normal room with some `unsigned.invite_room_state` (stripped state).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a remote invite room with some `unsigned.invite_room_state`
+        # but the create event does not specify a room type (normal room)
+        remote_invite_room_id = self._create_remote_invite_room_for_user(
+            user1_id,
+            [
+                StrippedStateEvent(
+                    type=EventTypes.Create,
+                    state_key="",
+                    sender="@inviter:remote_server",
+                    content={
+                        EventContentFields.ROOM_CREATOR: "@inviter:remote_server",
+                        EventContentFields.ROOM_VERSION: RoomVersions.V10.identifier,
+                        # No room type means this is a normal room
+                    },
                 ),
+            ],
+        )
+
+        # Create a normal room (no room type)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # Create a space room
+        space_room_id = self.helper.create_room_as(
+            user1_id,
+            tok=user1_tok,
+            extra_content={
+                "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE}
+            },
+        )
+
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Get the rooms the user should be syncing with
+        sync_room_map = self._get_sync_room_ids_for_user(
+            UserID.from_string(user1_id),
+            from_token=None,
+            to_token=after_rooms_token,
+        )
+
+        # Try finding only normal rooms
+        filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(room_types=[None]),
                 after_rooms_token,
             )
         )
 
-        self.assertEqual(filtered_room_map.keys(), {room_id, invite_room_id})
+        # `remote_invite_room_id` should appear here because it is a normal room
+        # according to the stripped state (no room type)
+        self.assertEqual(filtered_room_map.keys(), {room_id, remote_invite_room_id})
+
+        # Try finding only spaces
+        filtered_room_map = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                sync_room_map,
+                SlidingSyncConfig.SlidingSyncList.Filters(room_types=[RoomTypes.SPACE]),
+                after_rooms_token,
+            )
+        )
+
+        # `remote_invite_room_id` should not appear here because it is a normal room
+        # according to the stripped state (no room type)
+        self.assertEqual(filtered_room_map.keys(), {space_room_id})
 
 
 class SortRoomsTestCase(HomeserverTestCase):
diff --git a/tests/rest/client/test_login.py b/tests/rest/client/test_login.py
index 3fb77fd9dd..2b1e44381b 100644
--- a/tests/rest/client/test_login.py
+++ b/tests/rest/client/test_login.py
@@ -969,9 +969,8 @@ class CASTestCase(unittest.HomeserverTestCase):
         # Test that the response is HTML.
         self.assertEqual(channel.code, 200, channel.result)
         content_type_header_value = ""
-        for header in channel.result.get("headers", []):
-            if header[0] == b"Content-Type":
-                content_type_header_value = header[1].decode("utf8")
+        for header in channel.headers.getRawHeaders("Content-Type", []):
+            content_type_header_value = header
 
         self.assertTrue(content_type_header_value.startswith("text/html"))
 
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 5cc4121f97..24a5fac0fa 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -37,6 +37,7 @@ from synapse.api.constants import (
     Membership,
     ReceiptTypes,
     RelationTypes,
+    RoomTypes,
 )
 from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase
@@ -1869,6 +1870,150 @@ class SlidingSyncTestCase(SlidingSyncBase):
             },
         )
 
+    def test_filter_regardless_of_membership_server_left_room(self) -> None:
+        """
+        Test that filters apply to rooms regardless of membership. We're also
+        compounding the problem by having all of the local users leave the room causing
+        our server to leave the room.
+
+        We want to make sure that if someone is filtering rooms, and leaves, you still
+        get that final update down sync that you left.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Create a normal room
+        room_id = self.helper.create_room_as(user1_id, tok=user2_tok)
+        self.helper.join(room_id, user1_id, tok=user1_tok)
+
+        # Create an encrypted space room
+        space_room_id = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE}
+            },
+        )
+        self.helper.send_state(
+            space_room_id,
+            EventTypes.RoomEncryption,
+            {EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"},
+            tok=user2_tok,
+        )
+        self.helper.join(space_room_id, user1_id, tok=user1_tok)
+
+        # Make an initial Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "all-list": {
+                        "ranges": [[0, 99]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                        "filters": {},
+                    },
+                    "foo-list": {
+                        "ranges": [[0, 99]],
+                        "required_state": [],
+                        "timeline_limit": 1,
+                        "filters": {
+                            "is_encrypted": True,
+                            "room_types": [RoomTypes.SPACE],
+                        },
+                    },
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        from_token = channel.json_body["pos"]
+
+        # Make sure the response has the lists we requested
+        self.assertListEqual(
+            list(channel.json_body["lists"].keys()),
+            ["all-list", "foo-list"],
+            channel.json_body["lists"].keys(),
+        )
+
+        # Make sure the lists have the correct rooms
+        self.assertListEqual(
+            list(channel.json_body["lists"]["all-list"]["ops"]),
+            [
+                {
+                    "op": "SYNC",
+                    "range": [0, 99],
+                    "room_ids": [space_room_id, room_id],
+                }
+            ],
+        )
+        self.assertListEqual(
+            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            [
+                {
+                    "op": "SYNC",
+                    "range": [0, 99],
+                    "room_ids": [space_room_id],
+                }
+            ],
+        )
+
+        # Everyone leaves the encrypted space room
+        self.helper.leave(space_room_id, user1_id, tok=user1_tok)
+        self.helper.leave(space_room_id, user2_id, tok=user2_tok)
+
+        # Make an incremental Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint + f"?pos={from_token}",
+            {
+                "lists": {
+                    "all-list": {
+                        "ranges": [[0, 99]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                        "filters": {},
+                    },
+                    "foo-list": {
+                        "ranges": [[0, 99]],
+                        "required_state": [],
+                        "timeline_limit": 1,
+                        "filters": {
+                            "is_encrypted": True,
+                            "room_types": [RoomTypes.SPACE],
+                        },
+                    },
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Make sure the lists have the correct rooms even though we `newly_left`
+        self.assertListEqual(
+            list(channel.json_body["lists"]["all-list"]["ops"]),
+            [
+                {
+                    "op": "SYNC",
+                    "range": [0, 99],
+                    "room_ids": [space_room_id, room_id],
+                }
+            ],
+        )
+        self.assertListEqual(
+            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            [
+                {
+                    "op": "SYNC",
+                    "range": [0, 99],
+                    "room_ids": [space_room_id],
+                }
+            ],
+        )
+
     def test_sort_list(self) -> None:
         """
         Test that the `lists` are sorted by `stream_ordering`
@@ -4476,6 +4621,225 @@ class SlidingSyncTestCase(SlidingSyncBase):
         # `world_readable` but currently we don't support this.
         self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"])
 
+    # Any extensions that use `lists`/`rooms` should be tested here
+    @parameterized.expand([("account_data",), ("receipts",)])
+    def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> None:
+        """
+        With various extensions, test out requesting different variations of
+        `lists`/`rooms`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create some rooms
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        room_id_to_human_name_map = {
+            room_id1: "room1",
+            room_id2: "room2",
+            room_id3: "room3",
+            room_id4: "room4",
+            room_id5: "room5",
+        }
+
+        for room_id in room_id_to_human_name_map.keys():
+            if extension_name == "account_data":
+                # Add some account data to each room
+                self.get_success(
+                    self.account_data_handler.add_account_data_to_room(
+                        user_id=user1_id,
+                        room_id=room_id,
+                        account_data_type="org.matrix.roorarraz",
+                        content={"roo": "rar"},
+                    )
+                )
+            elif extension_name == "receipts":
+                event_response = self.helper.send(
+                    room_id, body="new event", tok=user1_tok
+                )
+                # Read last event
+                channel = self.make_request(
+                    "POST",
+                    f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response['event_id']}",
+                    {},
+                    access_token=user1_tok,
+                )
+                self.assertEqual(channel.code, 200, channel.json_body)
+            else:
+                raise AssertionError(f"Unknown extension name: {extension_name}")
+
+        main_sync_body = {
+            "lists": {
+                # We expect this list range to include room5 and room4
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+                # We expect this list range to include room5, room4, room3
+                "bar-list": {
+                    "ranges": [[0, 2]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+            },
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            },
+        }
+
+        # Mix lists and rooms
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": ["foo-list", "non-existent-list"],
+                    "rooms": [room_id1, room_id2, "!non-existent-room"],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ✅ Requested via `rooms` and a room subscription exists
+        # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
+        # room3: ❌ Not requested
+        # room4: ✅ Shows up because requested via `lists` and list exists in the response
+        # room5: ✅ Shows up because requested via `lists` and list exists in the response
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room1", "room4", "room5"},
+            exact=True,
+        )
+
+        # Try wildcards (this is the default)
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    # "lists": ["*"],
+                    # "rooms": ["*"],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
+        # room2: ❌ Not requested
+        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room1", "room3", "room4", "room5"},
+            exact=True,
+        )
+
+        # Empty list will return nothing
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": [],
+                    "rooms": [],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ❌ Not requested
+        # room4: ❌ Not requested
+        # room5: ❌ Not requested
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            set(),
+            exact=True,
+        )
+
+        # Try wildcard and none
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": ["*"],
+                    "rooms": [],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room3", "room4", "room5"},
+            exact=True,
+        )
+
+        # Try requesting a room that is only in a list
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": [],
+                    "rooms": [room_id5],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ❌ Not requested
+        # room4: ❌ Not requested
+        # room5: ✅ Requested via `rooms` and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room5"},
+            exact=True,
+        )
+
     def test_rooms_required_state_incremental_sync_LIVE(self) -> None:
         """Test that we only get state updates in incremental sync for rooms
         we've already seen (LIVE).
@@ -4943,225 +5307,6 @@ class SlidingSyncTestCase(SlidingSyncBase):
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
         self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
 
-    # Any extensions that use `lists`/`rooms` should be tested here
-    @parameterized.expand([("account_data",), ("receipts",)])
-    def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> None:
-        """
-        With various extensions, test out requesting different variations of
-        `lists`/`rooms`.
-        """
-        user1_id = self.register_user("user1", "pass")
-        user1_tok = self.login(user1_id, "pass")
-
-        # Create some rooms
-        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
-
-        room_id_to_human_name_map = {
-            room_id1: "room1",
-            room_id2: "room2",
-            room_id3: "room3",
-            room_id4: "room4",
-            room_id5: "room5",
-        }
-
-        for room_id in room_id_to_human_name_map.keys():
-            if extension_name == "account_data":
-                # Add some account data to each room
-                self.get_success(
-                    self.account_data_handler.add_account_data_to_room(
-                        user_id=user1_id,
-                        room_id=room_id,
-                        account_data_type="org.matrix.roorarraz",
-                        content={"roo": "rar"},
-                    )
-                )
-            elif extension_name == "receipts":
-                event_response = self.helper.send(
-                    room_id, body="new event", tok=user1_tok
-                )
-                # Read last event
-                channel = self.make_request(
-                    "POST",
-                    f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response['event_id']}",
-                    {},
-                    access_token=user1_tok,
-                )
-                self.assertEqual(channel.code, 200, channel.json_body)
-            else:
-                raise AssertionError(f"Unknown extension name: {extension_name}")
-
-        main_sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-            },
-            "room_subscriptions": {
-                room_id1: {
-                    "required_state": [],
-                    "timeline_limit": 0,
-                }
-            },
-        }
-
-        # Mix lists and rooms
-        sync_body = {
-            **main_sync_body,
-            "extensions": {
-                extension_name: {
-                    "enabled": True,
-                    "lists": ["foo-list", "non-existent-list"],
-                    "rooms": [room_id1, room_id2, "!non-existent-room"],
-                }
-            },
-        }
-        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
-
-        # room1: ✅ Requested via `rooms` and a room subscription exists
-        # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
-        # room3: ❌ Not requested
-        # room4: ✅ Shows up because requested via `lists` and list exists in the response
-        # room5: ✅ Shows up because requested via `lists` and list exists in the response
-        self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"][extension_name]
-                .get("rooms")
-                .keys()
-            },
-            {"room1", "room4", "room5"},
-            exact=True,
-        )
-
-        # Try wildcards (this is the default)
-        sync_body = {
-            **main_sync_body,
-            "extensions": {
-                extension_name: {
-                    "enabled": True,
-                    # "lists": ["*"],
-                    # "rooms": ["*"],
-                }
-            },
-        }
-        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
-
-        # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
-        # room2: ❌ Not requested
-        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
-        self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"][extension_name]
-                .get("rooms")
-                .keys()
-            },
-            {"room1", "room3", "room4", "room5"},
-            exact=True,
-        )
-
-        # Empty list will return nothing
-        sync_body = {
-            **main_sync_body,
-            "extensions": {
-                extension_name: {
-                    "enabled": True,
-                    "lists": [],
-                    "rooms": [],
-                }
-            },
-        }
-        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
-
-        # room1: ❌ Not requested
-        # room2: ❌ Not requested
-        # room3: ❌ Not requested
-        # room4: ❌ Not requested
-        # room5: ❌ Not requested
-        self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"][extension_name]
-                .get("rooms")
-                .keys()
-            },
-            set(),
-            exact=True,
-        )
-
-        # Try wildcard and none
-        sync_body = {
-            **main_sync_body,
-            "extensions": {
-                extension_name: {
-                    "enabled": True,
-                    "lists": ["*"],
-                    "rooms": [],
-                }
-            },
-        }
-        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
-
-        # room1: ❌ Not requested
-        # room2: ❌ Not requested
-        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
-        self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"][extension_name]
-                .get("rooms")
-                .keys()
-            },
-            {"room3", "room4", "room5"},
-            exact=True,
-        )
-
-        # Try requesting a room that is only in a list
-        sync_body = {
-            **main_sync_body,
-            "extensions": {
-                extension_name: {
-                    "enabled": True,
-                    "lists": [],
-                    "rooms": [room_id5],
-                }
-            },
-        }
-        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
-
-        # room1: ❌ Not requested
-        # room2: ❌ Not requested
-        # room3: ❌ Not requested
-        # room4: ❌ Not requested
-        # room5: ✅ Requested via `rooms` and is in a list
-        self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"][extension_name]
-                .get("rooms")
-                .keys()
-            },
-            {"room5"},
-            exact=True,
-        )
-
     def test_increasing_timeline_range_sends_more_messages(self) -> None:
         """
         Test that increasing the timeline limit via room subscriptions sends the
@@ -6430,10 +6575,12 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
             exact=True,
         )
 
-    def test_receipts_initial_sync(self) -> None:
+    def test_receipts_initial_sync_with_timeline(self) -> None:
         """
-        On initial sync, we return all receipts for a given room but only for
-        rooms that we request and are being returned in the Sliding Sync response.
+        On initial sync, we only return receipts for events in a given room's timeline.
+
+        We also make sure that we only return receipts for rooms that we request and are
+        already being returned in the Sliding Sync response.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -6441,16 +6588,24 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         user2_tok = self.login(user2_id, "pass")
         user3_id = self.register_user("user3", "pass")
         user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
 
         # Create a room
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id1, user1_id, tok=user1_tok)
         self.helper.join(room_id1, user3_id, tok=user3_tok)
-        event_response1 = self.helper.send(room_id1, body="new event", tok=user2_tok)
+        self.helper.join(room_id1, user4_id, tok=user4_tok)
+        room1_event_response1 = self.helper.send(
+            room_id1, body="new event1", tok=user2_tok
+        )
+        room1_event_response2 = self.helper.send(
+            room_id1, body="new event2", tok=user2_tok
+        )
         # User1 reads the last event
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response2['event_id']}",
             {},
             access_token=user1_tok,
         )
@@ -6458,29 +6613,40 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # User2 reads the last event
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response2['event_id']}",
             {},
             access_token=user2_tok,
         )
         self.assertEqual(channel.code, 200, channel.json_body)
-        # User3 privately reads the last event (make sure this doesn't leak to the other users)
+        # User3 reads the first event
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response1['event_id']}",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}",
             {},
             access_token=user3_tok,
         )
         self.assertEqual(channel.code, 200, channel.json_body)
+        # User4 privately reads the last event (make sure this doesn't leak to the other users)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ_PRIVATE}/{room1_event_response2['event_id']}",
+            {},
+            access_token=user4_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
 
         # Create another room
         room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id2, user1_id, tok=user1_tok)
         self.helper.join(room_id2, user3_id, tok=user3_tok)
-        event_response2 = self.helper.send(room_id2, body="new event", tok=user2_tok)
+        self.helper.join(room_id2, user4_id, tok=user4_tok)
+        room2_event_response1 = self.helper.send(
+            room_id2, body="new event2", tok=user2_tok
+        )
         # User1 reads the last event
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response2['event_id']}",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{room2_event_response1['event_id']}",
             {},
             access_token=user1_tok,
         )
@@ -6488,17 +6654,17 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # User2 reads the last event
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{room2_event_response1['event_id']}",
             {},
             access_token=user2_tok,
         )
         self.assertEqual(channel.code, 200, channel.json_body)
-        # User3 privately reads the last event (make sure this doesn't leak to the other users)
+        # User4 privately reads the last event (make sure this doesn't leak to the other users)
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response2['event_id']}",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{room2_event_response1['event_id']}",
             {},
-            access_token=user3_tok,
+            access_token=user4_tok,
         )
         self.assertEqual(channel.code, 200, channel.json_body)
 
@@ -6508,7 +6674,8 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
             "room_subscriptions": {
                 room_id1: {
                     "required_state": [],
-                    "timeline_limit": 0,
+                    # On initial sync, we only have receipts for events in the timeline
+                    "timeline_limit": 1,
                 }
             },
             "extensions": {
@@ -6520,6 +6687,17 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         }
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
+        # Only the latest event in the room is in the timelie because the `timeline_limit` is 1
+        self.assertIncludes(
+            {
+                event["event_id"]
+                for event in response_body["rooms"][room_id1].get("timeline", [])
+            },
+            {room1_event_response2["event_id"]},
+            exact=True,
+            message=str(response_body["rooms"][room_id1]),
+        )
+
         # Even though we requested room2, we only expect room1 to show up because that's
         # the only room in the Sliding Sync response (room2 is not one of our room
         # subscriptions or in a sliding window list).
@@ -6536,7 +6714,7 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # We can see user1 and user2 read receipts
         self.assertIncludes(
             response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
-                event_response1["event_id"]
+                room1_event_response2["event_id"]
             ][ReceiptTypes.READ].keys(),
             {user1_id, user2_id},
             exact=True,
@@ -6545,7 +6723,7 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # private read receipts
         self.assertIncludes(
             response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
-                event_response1["event_id"]
+                room1_event_response2["event_id"]
             ]
             .get(ReceiptTypes.READ_PRIVATE, {})
             .keys(),
@@ -6553,10 +6731,18 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
             exact=True,
         )
 
+        # We shouldn't see receipts for event2 since it wasn't in the timeline and this is an initial sync
+        self.assertIsNone(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"].get(
+                room1_event_response1["event_id"]
+            )
+        )
+
     def test_receipts_incremental_sync(self) -> None:
         """
-        On incremental sync, we return all receipts for a given room but only for
-        rooms that we request and are being returned in the Sliding Sync response.
+        On incremental sync, we return all receipts in the token range for a given room
+        but only for rooms that we request and are being returned in the Sliding Sync
+        response.
         """
 
         user1_id = self.register_user("user1", "pass")
@@ -6570,11 +6756,13 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id1, user1_id, tok=user1_tok)
         self.helper.join(room_id1, user3_id, tok=user3_tok)
-        event_response1 = self.helper.send(room_id1, body="new event", tok=user2_tok)
+        room1_event_response1 = self.helper.send(
+            room_id1, body="new event2", tok=user2_tok
+        )
         # User2 reads the last event (before the `from_token`)
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}",
             {},
             access_token=user2_tok,
         )
@@ -6583,11 +6771,13 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # Create room2
         room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id2, user1_id, tok=user1_tok)
-        event_response2 = self.helper.send(room_id2, body="new event", tok=user2_tok)
+        room2_event_response1 = self.helper.send(
+            room_id2, body="new event2", tok=user2_tok
+        )
         # User1 reads the last event (before the `from_token`)
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response2['event_id']}",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{room2_event_response1['event_id']}",
             {},
             access_token=user1_tok,
         )
@@ -6597,7 +6787,9 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id3, user1_id, tok=user1_tok)
         self.helper.join(room_id3, user3_id, tok=user3_tok)
-        event_response3 = self.helper.send(room_id3, body="new event", tok=user2_tok)
+        room3_event_response1 = self.helper.send(
+            room_id3, body="new event", tok=user2_tok
+        )
 
         # Create room4
         room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok)
@@ -6643,7 +6835,7 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # User1 reads room1
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}",
             {},
             access_token=user1_tok,
         )
@@ -6651,7 +6843,7 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # User1 privately reads room2
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response2['event_id']}",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{room2_event_response1['event_id']}",
             {},
             access_token=user1_tok,
         )
@@ -6659,7 +6851,7 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # User3 reads room3
         channel = self.make_request(
             "POST",
-            f"/rooms/{room_id3}/receipt/{ReceiptTypes.READ}/{event_response3['event_id']}",
+            f"/rooms/{room_id3}/receipt/{ReceiptTypes.READ}/{room3_event_response1['event_id']}",
             {},
             access_token=user3_tok,
         )
@@ -6688,7 +6880,7 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # We only see that user1 has read something in room1 since the `from_token`
         self.assertIncludes(
             response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
-                event_response1["event_id"]
+                room1_event_response1["event_id"]
             ][ReceiptTypes.READ].keys(),
             {user1_id},
             exact=True,
@@ -6697,13 +6889,15 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # others' private read receipts
         self.assertIncludes(
             response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
-                event_response1["event_id"]
+                room1_event_response1["event_id"]
             ]
             .get(ReceiptTypes.READ_PRIVATE, {})
             .keys(),
             set(),
             exact=True,
         )
+        # No events in the timeline since they were sent before the `from_token`
+        self.assertNotIn(room_id1, response_body["rooms"])
 
         # Check room3:
         #
@@ -6715,7 +6909,7 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # We only see that user3 has read something in room1 since the `from_token`
         self.assertIncludes(
             response_body["extensions"]["receipts"]["rooms"][room_id3]["content"][
-                event_response3["event_id"]
+                room3_event_response1["event_id"]
             ][ReceiptTypes.READ].keys(),
             {user3_id},
             exact=True,
@@ -6724,13 +6918,113 @@ class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
         # others' private read receipts
         self.assertIncludes(
             response_body["extensions"]["receipts"]["rooms"][room_id3]["content"][
-                event_response3["event_id"]
+                room3_event_response1["event_id"]
             ]
             .get(ReceiptTypes.READ_PRIVATE, {})
             .keys(),
             set(),
             exact=True,
         )
+        # No events in the timeline since they were sent before the `from_token`
+        self.assertNotIn(room_id3, response_body["rooms"])
+
+    def test_receipts_incremental_sync_all_live_receipts(self) -> None:
+        """
+        On incremental sync, we return all receipts in the token range for a given room
+        even if they are not in the timeline.
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Create room1
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        sync_body = {
+            "lists": {},
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    # The timeline will only include event2
+                    "timeline_limit": 1,
+                },
+            },
+            "extensions": {
+                "receipts": {
+                    "enabled": True,
+                    "rooms": [room_id1],
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        room1_event_response1 = self.helper.send(
+            room_id1, body="new event1", tok=user2_tok
+        )
+        room1_event_response2 = self.helper.send(
+            room_id1, body="new event2", tok=user2_tok
+        )
+
+        # User1 reads event1
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User2 reads event2
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response2['event_id']}",
+            {},
+            access_token=user2_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Make an incremental Sliding Sync request with the receipts extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        # We should see room1 because it has receipts in the token range
+        self.assertIncludes(
+            response_body["extensions"]["receipts"].get("rooms").keys(),
+            {room_id1},
+            exact=True,
+        )
+        # Sanity check that it's the correct ephemeral event type
+        self.assertEqual(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["type"],
+            EduTypes.RECEIPT,
+        )
+        # We should see all receipts in the token range regardless of whether the events
+        # are in the timeline
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                room1_event_response1["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user1_id},
+            exact=True,
+        )
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                room1_event_response2["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user2_id},
+            exact=True,
+        )
+        # Only the latest event in the timeline because the `timeline_limit` is 1
+        self.assertIncludes(
+            {
+                event["event_id"]
+                for event in response_body["rooms"][room_id1].get("timeline", [])
+            },
+            {room1_event_response2["event_id"]},
+            exact=True,
+            message=str(response_body["rooms"][room_id1]),
+        )
 
     def test_wait_for_new_data(self) -> None:
         """
diff --git a/tests/server.py b/tests/server.py
index 85602e6953..3e377585ce 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -198,17 +198,35 @@ class FakeChannel:
     def headers(self) -> Headers:
         if not self.result:
             raise Exception("No result yet.")
-        h = Headers()
-        for i in self.result["headers"]:
-            h.addRawHeader(*i)
+
+        h = self.result["headers"]
+        assert isinstance(h, Headers)
         return h
 
     def writeHeaders(
-        self, version: bytes, code: bytes, reason: bytes, headers: Headers
+        self,
+        version: bytes,
+        code: bytes,
+        reason: bytes,
+        headers: Union[Headers, List[Tuple[bytes, bytes]]],
     ) -> None:
         self.result["version"] = version
         self.result["code"] = code
         self.result["reason"] = reason
+
+        if isinstance(headers, list):
+            # Support prior to Twisted 24.7.0rc1
+            new_headers = Headers()
+            for k, v in headers:
+                assert isinstance(k, bytes), f"key is not of type bytes: {k!r}"
+                assert isinstance(v, bytes), f"value is not of type bytes: {v!r}"
+                new_headers.addRawHeader(k, v)
+            headers = new_headers
+
+        assert isinstance(
+            headers, Headers
+        ), f"headers are of the wrong type: {headers!r}"
+
         self.result["headers"] = headers
 
     def write(self, data: bytes) -> None:
diff --git a/tests/test_server.py b/tests/test_server.py
index 0910ea5f28..9ff2589497 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -392,8 +392,7 @@ class WrapHtmlRequestHandlerTests(unittest.TestCase):
         )
 
         self.assertEqual(channel.code, 301)
-        headers = channel.result["headers"]
-        location_headers = [v for k, v in headers if k == b"Location"]
+        location_headers = channel.headers.getRawHeaders(b"Location", [])
         self.assertEqual(location_headers, [b"/look/an/eagle"])
 
     def test_redirect_exception_with_cookie(self) -> None:
@@ -415,10 +414,10 @@ class WrapHtmlRequestHandlerTests(unittest.TestCase):
         )
 
         self.assertEqual(channel.code, 304)
-        headers = channel.result["headers"]
-        location_headers = [v for k, v in headers if k == b"Location"]
+        headers = channel.headers
+        location_headers = headers.getRawHeaders(b"Location", [])
         self.assertEqual(location_headers, [b"/no/over/there"])
-        cookies_headers = [v for k, v in headers if k == b"Set-Cookie"]
+        cookies_headers = headers.getRawHeaders(b"Set-Cookie", [])
         self.assertEqual(cookies_headers, [b"session=yespls"])
 
     def test_head_request(self) -> None: