summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.md1
-rw-r--r--Cargo.lock5
-rw-r--r--changelog.d/17447.feature1
-rw-r--r--changelog.d/17452.misc1
-rw-r--r--changelog.d/17476.doc1
-rw-r--r--changelog.d/17477.feature1
-rw-r--r--changelog.d/17478.misc1
-rw-r--r--changelog.d/17479.misc1
-rw-r--r--changelog.d/17481.misc1
-rw-r--r--changelog.d/17482.misc1
-rw-r--r--changelog.d/17499.bugfix1
-rw-r--r--changelog.d/17501.misc1
-rw-r--r--debian/templates2
-rw-r--r--docs/usage/configuration/config_documentation.md7
-rw-r--r--poetry.lock108
-rw-r--r--pyproject.toml6
-rw-r--r--synapse/handlers/e2e_keys.py26
-rw-r--r--synapse/handlers/sliding_sync.py587
-rw-r--r--synapse/rest/client/sync.py51
-rw-r--r--synapse/server.py1
-rw-r--r--synapse/storage/databases/main/state_deltas.py37
-rw-r--r--synapse/storage/databases/main/stream.py10
-rw-r--r--synapse/types/__init__.py43
-rw-r--r--synapse/types/handlers/__init__.py55
-rw-r--r--synapse/types/rest/client/__init__.py23
-rw-r--r--tests/handlers/test_e2e_keys.py59
-rw-r--r--tests/rest/client/test_sync.py3713
27 files changed, 3252 insertions, 1492 deletions
diff --git a/CHANGES.md b/CHANGES.md
index a17f0b020e..b4fddc3e5c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -101,6 +101,7 @@ Despite that, we cannot rule out that some installations may exist with this unu
 
 - Upgrade locked dependency on Twisted to 24.7.0rc1. ([\#17502](https://github.com/element-hq/synapse/issues/17502))
 
+
 # Synapse 1.111.0 (2024-07-16)
 
 No significant changes since 1.111.0rc2.
diff --git a/Cargo.lock b/Cargo.lock
index e9adfcbdc3..333499e197 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -505,11 +505,12 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.120"
+version = "1.0.121"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5"
+checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609"
 dependencies = [
  "itoa",
+ "memchr",
  "ryu",
  "serde",
 ]
diff --git a/changelog.d/17447.feature b/changelog.d/17447.feature
new file mode 100644
index 0000000000..6f80e298ae
--- /dev/null
+++ b/changelog.d/17447.feature
@@ -0,0 +1 @@
+Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17452.misc b/changelog.d/17452.misc
new file mode 100644
index 0000000000..4fd07f617b
--- /dev/null
+++ b/changelog.d/17452.misc
@@ -0,0 +1 @@
+Change sliding sync to use their own token format in preparation for storing per-connection state.
diff --git a/changelog.d/17476.doc b/changelog.d/17476.doc
new file mode 100644
index 0000000000..89d8d490bb
--- /dev/null
+++ b/changelog.d/17476.doc
@@ -0,0 +1 @@
+Update the [`allowed_local_3pids`](https://element-hq.github.io/synapse/v1.112/usage/configuration/config_documentation.html#allowed_local_3pids) config option's msisdn address to a working example.
diff --git a/changelog.d/17477.feature b/changelog.d/17477.feature
new file mode 100644
index 0000000000..9785a2ef7b
--- /dev/null
+++ b/changelog.d/17477.feature
@@ -0,0 +1 @@
+Add Account Data extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17478.misc b/changelog.d/17478.misc
new file mode 100644
index 0000000000..5406c82742
--- /dev/null
+++ b/changelog.d/17478.misc
@@ -0,0 +1 @@
+Ensure we don't send down negative `bump_stamp` in experimental sliding sync endpoint.
diff --git a/changelog.d/17479.misc b/changelog.d/17479.misc
new file mode 100644
index 0000000000..4502f71662
--- /dev/null
+++ b/changelog.d/17479.misc
@@ -0,0 +1 @@
+Do not send down empty room entries down experimental sliding sync endpoint.
diff --git a/changelog.d/17481.misc b/changelog.d/17481.misc
new file mode 100644
index 0000000000..ac55538424
--- /dev/null
+++ b/changelog.d/17481.misc
@@ -0,0 +1 @@
+Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`.
diff --git a/changelog.d/17482.misc b/changelog.d/17482.misc
new file mode 100644
index 0000000000..ac55538424
--- /dev/null
+++ b/changelog.d/17482.misc
@@ -0,0 +1 @@
+Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`.
diff --git a/changelog.d/17499.bugfix b/changelog.d/17499.bugfix
new file mode 100644
index 0000000000..5cb7b3c30e
--- /dev/null
+++ b/changelog.d/17499.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in v1.110.0 which caused `/keys/query` to return incomplete results, leading to high network activity and CPU usage on Matrix clients.
diff --git a/changelog.d/17501.misc b/changelog.d/17501.misc
new file mode 100644
index 0000000000..ba96472acb
--- /dev/null
+++ b/changelog.d/17501.misc
@@ -0,0 +1 @@
+Add some opentracing tags and logging to the experimental sliding sync implementation.
diff --git a/debian/templates b/debian/templates
index cab05715d0..7bfd3c2e9f 100644
--- a/debian/templates
+++ b/debian/templates
@@ -5,7 +5,7 @@ _Description: Name of the server:
  servers via federation. This is normally the public hostname of the
  server running synapse, but can be different if you set up delegation.
  Please refer to the delegation documentation in this case:
- https://github.com/element-hq/synapse/blob/master/docs/delegate.md.
+ https://element-hq.github.io/synapse/latest/delegate.html.
 
 Template: matrix-synapse/report-stats
 Type: boolean
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 649f4f71c7..40f64be856 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -2386,7 +2386,7 @@ enable_registration_without_verification: true
 ---
 ### `registrations_require_3pid`
 
-If this is set, users must provide all of the specified types of 3PID when registering an account.
+If this is set, users must provide all of the specified types of [3PID](https://spec.matrix.org/latest/appendices/#3pid-types) when registering an account.
 
 Note that [`enable_registration`](#enable_registration) must also be set to allow account registration.
 
@@ -2411,6 +2411,9 @@ disable_msisdn_registration: true
 
 Mandate that users are only allowed to associate certain formats of
 3PIDs with accounts on this server, as specified by the `medium` and `pattern` sub-options.
+`pattern` is a [Perl-like regular expression](https://docs.python.org/3/library/re.html#module-re).
+
+More information about 3PIDs, allowed `medium` types and their `address` syntax can be found [in the Matrix spec](https://spec.matrix.org/latest/appendices/#3pid-types).
 
 Example configuration:
 ```yaml
@@ -2420,7 +2423,7 @@ allowed_local_3pids:
   - medium: email
     pattern: '^[^@]+@vector\.im$'
   - medium: msisdn
-    pattern: '\+44'
+    pattern: '^44\d{10}$'
 ```
 ---
 ### `enable_3pid_lookup`
diff --git a/poetry.lock b/poetry.lock
index 7359930983..7d8334515a 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -67,38 +67,38 @@ visualize = ["Twisted (>=16.1.1)", "graphviz (>0.5.1)"]
 
 [[package]]
 name = "bcrypt"
-version = "4.1.3"
+version = "4.2.0"
 description = "Modern password hashing for your software and your servers"
 optional = false
 python-versions = ">=3.7"
 files = [
-    {file = "bcrypt-4.1.3-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:48429c83292b57bf4af6ab75809f8f4daf52aa5d480632e53707805cc1ce9b74"},
-    {file = "bcrypt-4.1.3-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4a8bea4c152b91fd8319fef4c6a790da5c07840421c2b785084989bf8bbb7455"},
-    {file = "bcrypt-4.1.3-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d3b317050a9a711a5c7214bf04e28333cf528e0ed0ec9a4e55ba628d0f07c1a"},
-    {file = "bcrypt-4.1.3-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:094fd31e08c2b102a14880ee5b3d09913ecf334cd604af27e1013c76831f7b05"},
-    {file = "bcrypt-4.1.3-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:4fb253d65da30d9269e0a6f4b0de32bd657a0208a6f4e43d3e645774fb5457f3"},
-    {file = "bcrypt-4.1.3-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:193bb49eeeb9c1e2db9ba65d09dc6384edd5608d9d672b4125e9320af9153a15"},
-    {file = "bcrypt-4.1.3-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:8cbb119267068c2581ae38790e0d1fbae65d0725247a930fc9900c285d95725d"},
-    {file = "bcrypt-4.1.3-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:6cac78a8d42f9d120b3987f82252bdbeb7e6e900a5e1ba37f6be6fe4e3848286"},
-    {file = "bcrypt-4.1.3-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:01746eb2c4299dd0ae1670234bf77704f581dd72cc180f444bfe74eb80495b64"},
-    {file = "bcrypt-4.1.3-cp37-abi3-win32.whl", hash = "sha256:037c5bf7c196a63dcce75545c8874610c600809d5d82c305dd327cd4969995bf"},
-    {file = "bcrypt-4.1.3-cp37-abi3-win_amd64.whl", hash = "sha256:8a893d192dfb7c8e883c4576813bf18bb9d59e2cfd88b68b725990f033f1b978"},
-    {file = "bcrypt-4.1.3-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:0d4cf6ef1525f79255ef048b3489602868c47aea61f375377f0d00514fe4a78c"},
-    {file = "bcrypt-4.1.3-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f5698ce5292a4e4b9e5861f7e53b1d89242ad39d54c3da451a93cac17b61921a"},
-    {file = "bcrypt-4.1.3-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ec3c2e1ca3e5c4b9edb94290b356d082b721f3f50758bce7cce11d8a7c89ce84"},
-    {file = "bcrypt-4.1.3-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:3a5be252fef513363fe281bafc596c31b552cf81d04c5085bc5dac29670faa08"},
-    {file = "bcrypt-4.1.3-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:5f7cd3399fbc4ec290378b541b0cf3d4398e4737a65d0f938c7c0f9d5e686611"},
-    {file = "bcrypt-4.1.3-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:c4c8d9b3e97209dd7111bf726e79f638ad9224b4691d1c7cfefa571a09b1b2d6"},
-    {file = "bcrypt-4.1.3-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:31adb9cbb8737a581a843e13df22ffb7c84638342de3708a98d5c986770f2834"},
-    {file = "bcrypt-4.1.3-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:551b320396e1d05e49cc18dd77d970accd52b322441628aca04801bbd1d52a73"},
-    {file = "bcrypt-4.1.3-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:6717543d2c110a155e6821ce5670c1f512f602eabb77dba95717ca76af79867d"},
-    {file = "bcrypt-4.1.3-cp39-abi3-win32.whl", hash = "sha256:6004f5229b50f8493c49232b8e75726b568535fd300e5039e255d919fc3a07f2"},
-    {file = "bcrypt-4.1.3-cp39-abi3-win_amd64.whl", hash = "sha256:2505b54afb074627111b5a8dc9b6ae69d0f01fea65c2fcaea403448c503d3991"},
-    {file = "bcrypt-4.1.3-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:cb9c707c10bddaf9e5ba7cdb769f3e889e60b7d4fea22834b261f51ca2b89fed"},
-    {file = "bcrypt-4.1.3-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:9f8ea645eb94fb6e7bea0cf4ba121c07a3a182ac52876493870033141aa687bc"},
-    {file = "bcrypt-4.1.3-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:f44a97780677e7ac0ca393bd7982b19dbbd8d7228c1afe10b128fd9550eef5f1"},
-    {file = "bcrypt-4.1.3-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:d84702adb8f2798d813b17d8187d27076cca3cd52fe3686bb07a9083930ce650"},
-    {file = "bcrypt-4.1.3.tar.gz", hash = "sha256:2ee15dd749f5952fe3f0430d0ff6b74082e159c50332a1413d51b5689cf06623"},
+    {file = "bcrypt-4.2.0-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:096a15d26ed6ce37a14c1ac1e48119660f21b24cba457f160a4b830f3fe6b5cb"},
+    {file = "bcrypt-4.2.0-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c02d944ca89d9b1922ceb8a46460dd17df1ba37ab66feac4870f6862a1533c00"},
+    {file = "bcrypt-4.2.0-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:1d84cf6d877918620b687b8fd1bf7781d11e8a0998f576c7aa939776b512b98d"},
+    {file = "bcrypt-4.2.0-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:1bb429fedbe0249465cdd85a58e8376f31bb315e484f16e68ca4c786dcc04291"},
+    {file = "bcrypt-4.2.0-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:655ea221910bcac76ea08aaa76df427ef8625f92e55a8ee44fbf7753dbabb328"},
+    {file = "bcrypt-4.2.0-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:1ee38e858bf5d0287c39b7a1fc59eec64bbf880c7d504d3a06a96c16e14058e7"},
+    {file = "bcrypt-4.2.0-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:0da52759f7f30e83f1e30a888d9163a81353ef224d82dc58eb5bb52efcabc399"},
+    {file = "bcrypt-4.2.0-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:3698393a1b1f1fd5714524193849d0c6d524d33523acca37cd28f02899285060"},
+    {file = "bcrypt-4.2.0-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:762a2c5fb35f89606a9fde5e51392dad0cd1ab7ae64149a8b935fe8d79dd5ed7"},
+    {file = "bcrypt-4.2.0-cp37-abi3-win32.whl", hash = "sha256:5a1e8aa9b28ae28020a3ac4b053117fb51c57a010b9f969603ed885f23841458"},
+    {file = "bcrypt-4.2.0-cp37-abi3-win_amd64.whl", hash = "sha256:8f6ede91359e5df88d1f5c1ef47428a4420136f3ce97763e31b86dd8280fbdf5"},
+    {file = "bcrypt-4.2.0-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:c52aac18ea1f4a4f65963ea4f9530c306b56ccd0c6f8c8da0c06976e34a6e841"},
+    {file = "bcrypt-4.2.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3bbbfb2734f0e4f37c5136130405332640a1e46e6b23e000eeff2ba8d005da68"},
+    {file = "bcrypt-4.2.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3413bd60460f76097ee2e0a493ccebe4a7601918219c02f503984f0a7ee0aebe"},
+    {file = "bcrypt-4.2.0-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:8d7bb9c42801035e61c109c345a28ed7e84426ae4865511eb82e913df18f58c2"},
+    {file = "bcrypt-4.2.0-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:3d3a6d28cb2305b43feac298774b997e372e56c7c7afd90a12b3dc49b189151c"},
+    {file = "bcrypt-4.2.0-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:9c1c4ad86351339c5f320ca372dfba6cb6beb25e8efc659bedd918d921956bae"},
+    {file = "bcrypt-4.2.0-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:27fe0f57bb5573104b5a6de5e4153c60814c711b29364c10a75a54bb6d7ff48d"},
+    {file = "bcrypt-4.2.0-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:8ac68872c82f1add6a20bd489870c71b00ebacd2e9134a8aa3f98a0052ab4b0e"},
+    {file = "bcrypt-4.2.0-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:cb2a8ec2bc07d3553ccebf0746bbf3d19426d1c6d1adbd4fa48925f66af7b9e8"},
+    {file = "bcrypt-4.2.0-cp39-abi3-win32.whl", hash = "sha256:77800b7147c9dc905db1cba26abe31e504d8247ac73580b4aa179f98e6608f34"},
+    {file = "bcrypt-4.2.0-cp39-abi3-win_amd64.whl", hash = "sha256:61ed14326ee023917ecd093ee6ef422a72f3aec6f07e21ea5f10622b735538a9"},
+    {file = "bcrypt-4.2.0-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:39e1d30c7233cfc54f5c3f2c825156fe044efdd3e0b9d309512cc514a263ec2a"},
+    {file = "bcrypt-4.2.0-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:f4f4acf526fcd1c34e7ce851147deedd4e26e6402369304220250598b26448db"},
+    {file = "bcrypt-4.2.0-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:1ff39b78a52cf03fdf902635e4c81e544714861ba3f0efc56558979dd4f09170"},
+    {file = "bcrypt-4.2.0-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:373db9abe198e8e2c70d12b479464e0d5092cc122b20ec504097b5f2297ed184"},
+    {file = "bcrypt-4.2.0.tar.gz", hash = "sha256:cf69eaf5185fd58f268f805b505ce31f9b9fc2d64b376642164e9244540c1221"},
 ]
 
 [package.extras]
@@ -2361,29 +2361,29 @@ files = [
 
 [[package]]
 name = "ruff"
-version = "0.5.4"
+version = "0.5.5"
 description = "An extremely fast Python linter and code formatter, written in Rust."
 optional = false
 python-versions = ">=3.7"
 files = [
-    {file = "ruff-0.5.4-py3-none-linux_armv6l.whl", hash = "sha256:82acef724fc639699b4d3177ed5cc14c2a5aacd92edd578a9e846d5b5ec18ddf"},
-    {file = "ruff-0.5.4-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:da62e87637c8838b325e65beee485f71eb36202ce8e3cdbc24b9fcb8b99a37be"},
-    {file = "ruff-0.5.4-py3-none-macosx_11_0_arm64.whl", hash = "sha256:e98ad088edfe2f3b85a925ee96da652028f093d6b9b56b76fc242d8abb8e2059"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4c55efbecc3152d614cfe6c2247a3054cfe358cefbf794f8c79c8575456efe19"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:f9b85eaa1f653abd0a70603b8b7008d9e00c9fa1bbd0bf40dad3f0c0bdd06793"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0cf497a47751be8c883059c4613ba2f50dd06ec672692de2811f039432875278"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:09c14ed6a72af9ccc8d2e313d7acf7037f0faff43cde4b507e66f14e812e37f7"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:628f6b8f97b8bad2490240aa84f3e68f390e13fabc9af5c0d3b96b485921cd60"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3520a00c0563d7a7a7c324ad7e2cde2355733dafa9592c671fb2e9e3cd8194c1"},
-    {file = "ruff-0.5.4-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:93789f14ca2244fb91ed481456f6d0bb8af1f75a330e133b67d08f06ad85b516"},
-    {file = "ruff-0.5.4-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:029454e2824eafa25b9df46882f7f7844d36fd8ce51c1b7f6d97e2615a57bbcc"},
-    {file = "ruff-0.5.4-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:9492320eed573a13a0bc09a2957f17aa733fff9ce5bf00e66e6d4a88ec33813f"},
-    {file = "ruff-0.5.4-py3-none-musllinux_1_2_i686.whl", hash = "sha256:a6e1f62a92c645e2919b65c02e79d1f61e78a58eddaebca6c23659e7c7cb4ac7"},
-    {file = "ruff-0.5.4-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:768fa9208df2bec4b2ce61dbc7c2ddd6b1be9fb48f1f8d3b78b3332c7d71c1ff"},
-    {file = "ruff-0.5.4-py3-none-win32.whl", hash = "sha256:e1e7393e9c56128e870b233c82ceb42164966f25b30f68acbb24ed69ce9c3a4e"},
-    {file = "ruff-0.5.4-py3-none-win_amd64.whl", hash = "sha256:58b54459221fd3f661a7329f177f091eb35cf7a603f01d9eb3eb11cc348d38c4"},
-    {file = "ruff-0.5.4-py3-none-win_arm64.whl", hash = "sha256:bd53da65f1085fb5b307c38fd3c0829e76acf7b2a912d8d79cadcdb4875c1eb7"},
-    {file = "ruff-0.5.4.tar.gz", hash = "sha256:2795726d5f71c4f4e70653273d1c23a8182f07dd8e48c12de5d867bfb7557eed"},
+    {file = "ruff-0.5.5-py3-none-linux_armv6l.whl", hash = "sha256:605d589ec35d1da9213a9d4d7e7a9c761d90bba78fc8790d1c5e65026c1b9eaf"},
+    {file = "ruff-0.5.5-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:00817603822a3e42b80f7c3298c8269e09f889ee94640cd1fc7f9329788d7bf8"},
+    {file = "ruff-0.5.5-py3-none-macosx_11_0_arm64.whl", hash = "sha256:187a60f555e9f865a2ff2c6984b9afeffa7158ba6e1eab56cb830404c942b0f3"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fe26fc46fa8c6e0ae3f47ddccfbb136253c831c3289bba044befe68f467bfb16"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:4ad25dd9c5faac95c8e9efb13e15803cd8bbf7f4600645a60ffe17c73f60779b"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f70737c157d7edf749bcb952d13854e8f745cec695a01bdc6e29c29c288fc36e"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:cfd7de17cef6ab559e9f5ab859f0d3296393bc78f69030967ca4d87a541b97a0"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a09b43e02f76ac0145f86a08e045e2ea452066f7ba064fd6b0cdccb486f7c3e7"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d0b856cb19c60cd40198be5d8d4b556228e3dcd545b4f423d1ad812bfdca5884"},
+    {file = "ruff-0.5.5-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3687d002f911e8a5faf977e619a034d159a8373514a587249cc00f211c67a091"},
+    {file = "ruff-0.5.5-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ac9dc814e510436e30d0ba535f435a7f3dc97f895f844f5b3f347ec8c228a523"},
+    {file = "ruff-0.5.5-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:af9bdf6c389b5add40d89b201425b531e0a5cceb3cfdcc69f04d3d531c6be74f"},
+    {file = "ruff-0.5.5-py3-none-musllinux_1_2_i686.whl", hash = "sha256:d40a8533ed545390ef8315b8e25c4bb85739b90bd0f3fe1280a29ae364cc55d8"},
+    {file = "ruff-0.5.5-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:cab904683bf9e2ecbbe9ff235bfe056f0eba754d0168ad5407832928d579e7ab"},
+    {file = "ruff-0.5.5-py3-none-win32.whl", hash = "sha256:696f18463b47a94575db635ebb4c178188645636f05e934fdf361b74edf1bb2d"},
+    {file = "ruff-0.5.5-py3-none-win_amd64.whl", hash = "sha256:50f36d77f52d4c9c2f1361ccbfbd09099a1b2ea5d2b2222c586ab08885cf3445"},
+    {file = "ruff-0.5.5-py3-none-win_arm64.whl", hash = "sha256:3191317d967af701f1b73a31ed5788795936e423b7acce82a2b63e26eb3e89d6"},
+    {file = "ruff-0.5.5.tar.gz", hash = "sha256:cc5516bdb4858d972fbc31d246bdb390eab8df1a26e2353be2dbc0c2d7f5421a"},
 ]
 
 [[package]]
@@ -2875,13 +2875,13 @@ files = [
 
 [[package]]
 name = "types-pyopenssl"
-version = "24.1.0.20240425"
+version = "24.1.0.20240722"
 description = "Typing stubs for pyOpenSSL"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "types-pyOpenSSL-24.1.0.20240425.tar.gz", hash = "sha256:0a7e82626c1983dc8dc59292bf20654a51c3c3881bcbb9b337c1da6e32f0204e"},
-    {file = "types_pyOpenSSL-24.1.0.20240425-py3-none-any.whl", hash = "sha256:f51a156835555dd2a1f025621e8c4fbe7493470331afeef96884d1d29bf3a473"},
+    {file = "types-pyOpenSSL-24.1.0.20240722.tar.gz", hash = "sha256:47913b4678a01d879f503a12044468221ed8576263c1540dcb0484ca21b08c39"},
+    {file = "types_pyOpenSSL-24.1.0.20240722-py3-none-any.whl", hash = "sha256:6a7a5d2ec042537934cfb4c9d4deb0e16c4c6250b09358df1f083682fe6fda54"},
 ]
 
 [package.dependencies]
@@ -2915,13 +2915,13 @@ urllib3 = ">=2"
 
 [[package]]
 name = "types-setuptools"
-version = "70.1.0.20240627"
+version = "71.1.0.20240726"
 description = "Typing stubs for setuptools"
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "types-setuptools-70.1.0.20240627.tar.gz", hash = "sha256:385907a47b5cf302b928ce07953cd91147d5de6f3da604c31905fdf0ec309e83"},
-    {file = "types_setuptools-70.1.0.20240627-py3-none-any.whl", hash = "sha256:c7bdf05cd0a8b66868b4774c7b3c079d01ae025d8c9562bfc8bf2ff44d263c9c"},
+    {file = "types-setuptools-71.1.0.20240726.tar.gz", hash = "sha256:85ba28e9461bb1be86ebba4db0f1c2408f2b11115b1966334ea9dc464e29303e"},
+    {file = "types_setuptools-71.1.0.20240726-py3-none-any.whl", hash = "sha256:a7775376f36e0ff09bcad236bf265777590a66b11623e48c20bfc30f1444ea36"},
 ]
 
 [[package]]
@@ -3196,4 +3196,4 @@ user-search = ["pyicu"]
 [metadata]
 lock-version = "2.0"
 python-versions = "^3.8.0"
-content-hash = "e65fbd044230964cae8810c84289bcf0bc43b27532ea5a5ef8843eab4f6514af"
+content-hash = "5f458ce53b7469844af2e0c5a9c5ef720736de5f080c4eb8d3a0e60286424f44"
diff --git a/pyproject.toml b/pyproject.toml
index 521b279390..c8373c6dbc 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -201,8 +201,8 @@ netaddr = ">=0.7.18"
 # add a lower bound to the Jinja2 dependency.
 Jinja2 = ">=3.0"
 bleach = ">=1.4.3"
-# We use `Self`, which were added in `typing-extensions` 4.0.
-typing-extensions = ">=4.0"
+# We use `assert_never`, which were added in `typing-extensions` 4.1.
+typing-extensions = ">=4.1"
 # We enforce that we have a `cryptography` version that bundles an `openssl`
 # with the latest security patches.
 cryptography = ">=3.4.7"
@@ -322,7 +322,7 @@ all = [
 # This helps prevents merge conflicts when running a batch of dependabot updates.
 isort = ">=5.10.1"
 black = ">=22.7.0"
-ruff = "0.5.4"
+ruff = "0.5.5"
 # Type checking only works with the pydantic.v1 compat module from pydantic v2
 pydantic = "^2"
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 668cec513b..f78e66ad0a 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -291,13 +291,20 @@ class E2eKeysHandler:
 
             # Only try and fetch keys for destinations that are not marked as
             # down.
-            filtered_destinations = await filter_destinations_by_retry_limiter(
-                remote_queries_not_in_cache.keys(),
-                self.clock,
-                self.store,
-                # Let's give an arbitrary grace period for those hosts that are
-                # only recently down
-                retry_due_within_ms=60 * 1000,
+            unfiltered_destinations = remote_queries_not_in_cache.keys()
+            filtered_destinations = set(
+                await filter_destinations_by_retry_limiter(
+                    unfiltered_destinations,
+                    self.clock,
+                    self.store,
+                    # Let's give an arbitrary grace period for those hosts that are
+                    # only recently down
+                    retry_due_within_ms=60 * 1000,
+                )
+            )
+            failures.update(
+                (dest, _NOT_READY_FOR_RETRY_FAILURE)
+                for dest in (unfiltered_destinations - filtered_destinations)
             )
 
             await concurrently_execute(
@@ -1641,6 +1648,9 @@ def _check_device_signature(
         raise SynapseError(400, "Invalid signature", Codes.INVALID_SIGNATURE)
 
 
+_NOT_READY_FOR_RETRY_FAILURE = {"status": 503, "message": "Not ready for retry"}
+
+
 def _exception_to_failure(e: Exception) -> JsonDict:
     if isinstance(e, SynapseError):
         return {"status": e.code, "errcode": e.errcode, "message": str(e)}
@@ -1649,7 +1659,7 @@ def _exception_to_failure(e: Exception) -> JsonDict:
         return {"status": e.code, "message": str(e)}
 
     if isinstance(e, NotRetryingDestination):
-        return {"status": 503, "message": "Not ready for retry"}
+        return _NOT_READY_FOR_RETRY_FAILURE
 
     # include ConnectionRefused and other errors
     #
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 554ab59bf3..73414dbf69 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,6 +18,7 @@
 #
 #
 import logging
+from enum import Enum
 from itertools import chain
 from typing import (
     TYPE_CHECKING,
@@ -34,22 +35,26 @@ from typing import (
 
 import attr
 from immutabledict import immutabledict
+from typing_extensions import assert_never
 
 from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
 from synapse.events import EventBase
 from synapse.events.utils import strip_event
 from synapse.handlers.relations import BundledAggregations
-from synapse.logging.opentracing import start_active_span, tag_args, trace
+from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
 from synapse.storage.roommember import MemberSummary
 from synapse.types import (
     DeviceListUpdates,
     JsonDict,
+    JsonMapping,
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
+    SlidingSyncStreamToken,
     StateMap,
+    StrCollection,
     StreamKeyType,
     StreamToken,
     UserID,
@@ -356,13 +361,16 @@ class SlidingSyncHandler:
         self.event_sources = hs.get_event_sources()
         self.relations_handler = hs.get_relations_handler()
         self.device_handler = hs.get_device_handler()
+        self.push_rules_handler = hs.get_push_rules_handler()
         self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
+        self.connection_store = SlidingSyncConnectionStore()
+
     async def wait_for_sync_for_user(
         self,
         requester: Requester,
         sync_config: SlidingSyncConfig,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[SlidingSyncStreamToken] = None,
         timeout_ms: int = 0,
     ) -> SlidingSyncResult:
         """
@@ -393,7 +401,7 @@ class SlidingSyncHandler:
             # this returns false, it means we timed out waiting, and we should
             # just return an empty response.
             before_wait_ts = self.clock.time_msec()
-            if not await self.notifier.wait_for_stream_token(from_token):
+            if not await self.notifier.wait_for_stream_token(from_token.stream_token):
                 logger.warning(
                     "Timed out waiting for worker to catch up. Returning empty response"
                 )
@@ -431,16 +439,17 @@ class SlidingSyncHandler:
                 sync_config.user.to_string(),
                 timeout_ms,
                 current_sync_callback,
-                from_token=from_token,
+                from_token=from_token.stream_token,
             )
 
         return result
 
+    @trace
     async def current_sync_for_user(
         self,
         sync_config: SlidingSyncConfig,
         to_token: StreamToken,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[SlidingSyncStreamToken] = None,
     ) -> SlidingSyncResult:
         """
         Generates the response body of a Sliding Sync result, represented as a
@@ -461,6 +470,11 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        await self.connection_store.mark_token_seen(
+            sync_config=sync_config,
+            from_token=from_token,
+        )
+
         # Get all of the room IDs that the user should be able to see in the sync
         # response
         has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@@ -473,7 +487,7 @@ class SlidingSyncHandler:
                 await self.get_room_membership_for_user_at_to_token(
                     user=sync_config.user,
                     to_token=to_token,
-                    from_token=from_token,
+                    from_token=from_token.stream_token if from_token else None,
                 )
             )
 
@@ -606,11 +620,56 @@ class SlidingSyncHandler:
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
 
+        # Filter out rooms that haven't received updates and we've sent down
+        # previously.
+        if from_token:
+            rooms_should_send = set()
+
+            # First we check if there are rooms that match a list/room
+            # subscription and have updates we need to send (i.e. either because
+            # we haven't sent the room down, or we have but there are missing
+            # updates).
+            for room_id in relevant_room_map:
+                status = await self.connection_store.have_sent_room(
+                    sync_config,
+                    from_token.connection_position,
+                    room_id,
+                )
+                if (
+                    # The room was never sent down before so the client needs to know
+                    # about it regardless of any updates.
+                    status.status == HaveSentRoomFlag.NEVER
+                    # `PREVIOUSLY` literally means the "room was sent down before *AND*
+                    # there are updates we haven't sent down" so we already know this
+                    # room has updates.
+                    or status.status == HaveSentRoomFlag.PREVIOUSLY
+                ):
+                    rooms_should_send.add(room_id)
+                elif status.status == HaveSentRoomFlag.LIVE:
+                    # We know that we've sent all updates up until `from_token`,
+                    # so we just need to check if there have been updates since
+                    # then.
+                    pass
+                else:
+                    assert_never(status.status)
+
+            # We only need to check for new events since any state changes
+            # will also come down as new events.
+            rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
+                relevant_room_map.keys(), from_token.stream_token.room_key
+            )
+            rooms_should_send.update(rooms_that_have_updates)
+            relevant_room_map = {
+                room_id: room_sync_config
+                for room_id, room_sync_config in relevant_room_map.items()
+                if room_id in rooms_should_send
+            }
+
         @trace
         @tag_args
         async def handle_room(room_id: str) -> None:
             room_sync_result = await self.get_room_sync_data(
-                user=sync_config.user,
+                sync_config=sync_config,
                 room_id=room_id,
                 room_sync_config=relevant_room_map[room_id],
                 room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -620,19 +679,37 @@ class SlidingSyncHandler:
                 to_token=to_token,
             )
 
-            rooms[room_id] = room_sync_result
+            # Filter out empty room results during incremental sync
+            if room_sync_result or not from_token:
+                rooms[room_id] = room_sync_result
 
-        with start_active_span("sliding_sync.generate_room_entries"):
-            await concurrently_execute(handle_room, relevant_room_map, 10)
+        if relevant_room_map:
+            with start_active_span("sliding_sync.generate_room_entries"):
+                await concurrently_execute(handle_room, relevant_room_map, 10)
 
         extensions = await self.get_extensions_response(
             sync_config=sync_config,
+            lists=lists,
             from_token=from_token,
             to_token=to_token,
         )
 
+        if has_lists or has_room_subscriptions:
+            connection_position = await self.connection_store.record_rooms(
+                sync_config=sync_config,
+                from_token=from_token,
+                sent_room_ids=relevant_room_map.keys(),
+                # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
+                unsent_room_ids=[],
+            )
+        elif from_token:
+            connection_position = from_token.connection_position
+        else:
+            # Initial sync without a `from_token` starts at `0`
+            connection_position = 0
+
         return SlidingSyncResult(
-            next_pos=to_token,
+            next_pos=SlidingSyncStreamToken(to_token, connection_position),
             lists=lists,
             rooms=rooms,
             extensions=extensions,
@@ -1086,6 +1163,7 @@ class SlidingSyncHandler:
 
         # return None
 
+    @trace
     async def filter_rooms(
         self,
         user: UserID,
@@ -1209,6 +1287,7 @@ class SlidingSyncHandler:
         # Assemble a new sync room map but only with the `filtered_room_id_set`
         return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
 
+    @trace
     async def sort_rooms(
         self,
         sync_room_map: Dict[str, _RoomMembershipForUser],
@@ -1363,11 +1442,11 @@ class SlidingSyncHandler:
 
     async def get_room_sync_data(
         self,
-        user: UserID,
+        sync_config: SlidingSyncConfig,
         room_id: str,
         room_sync_config: RoomSyncConfig,
         room_membership_for_user_at_to_token: _RoomMembershipForUser,
-        from_token: Optional[StreamToken],
+        from_token: Optional[SlidingSyncStreamToken],
         to_token: StreamToken,
     ) -> SlidingSyncResult.RoomResult:
         """
@@ -1385,6 +1464,41 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
             to_token: The point in the stream to sync up to.
         """
+        user = sync_config.user
+
+        # Determine whether we should limit the timeline to the token range.
+        #
+        # We should return historical messages (before token range) in the
+        # following cases because we want clients to be able to show a basic
+        # screen of information:
+        #  - Initial sync (because no `from_token` to limit us anyway)
+        #  - When users `newly_joined`
+        #  - For an incremental sync where we haven't sent it down this
+        #    connection before
+        from_bound = None
+        initial = True
+        if from_token and not room_membership_for_user_at_to_token.newly_joined:
+            room_status = await self.connection_store.have_sent_room(
+                sync_config=sync_config,
+                connection_token=from_token.connection_position,
+                room_id=room_id,
+            )
+            if room_status.status == HaveSentRoomFlag.LIVE:
+                from_bound = from_token.stream_token.room_key
+                initial = False
+            elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
+                assert room_status.last_token is not None
+                from_bound = room_status.last_token
+                initial = False
+            elif room_status.status == HaveSentRoomFlag.NEVER:
+                from_bound = None
+                initial = True
+            else:
+                assert_never(room_status.status)
+
+            log_kv({"sliding_sync.room_status": room_status})
+
+        log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
 
         # Assemble the list of timeline events
         #
@@ -1411,36 +1525,23 @@ class SlidingSyncHandler:
             prev_batch_token = to_token
 
             # We're going to paginate backwards from the `to_token`
-            from_bound = to_token.room_key
+            to_bound = to_token.room_key
             # People shouldn't see past their leave/ban event
             if room_membership_for_user_at_to_token.membership in (
                 Membership.LEAVE,
                 Membership.BAN,
             ):
-                from_bound = (
+                to_bound = (
                     room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
                 )
 
-            # Determine whether we should limit the timeline to the token range.
-            #
-            # We should return historical messages (before token range) in the
-            # following cases because we want clients to be able to show a basic
-            # screen of information:
-            #  - Initial sync (because no `from_token` to limit us anyway)
-            #  - When users `newly_joined`
-            #  - TODO: For an incremental sync where we haven't sent it down this
-            #    connection before
-            to_bound = (
-                from_token.room_key
-                if from_token is not None
-                and not room_membership_for_user_at_to_token.newly_joined
-                else None
-            )
-
             timeline_events, new_room_key = await self.store.paginate_room_events(
                 room_id=room_id,
-                from_key=from_bound,
-                to_key=to_bound,
+                # The bounds are reversed so we can paginate backwards
+                # (from newer to older events) starting at to_bound.
+                # This ensures we fill the `limit` with the newest events first,
+                from_key=to_bound,
+                to_key=from_bound,
                 direction=Direction.BACKWARDS,
                 # We add one so we can determine if there are enough events to saturate
                 # the limit or not (see `limited`)
@@ -1498,7 +1599,9 @@ class SlidingSyncHandler:
                         instance_name=timeline_event.internal_metadata.instance_name,
                         stream=timeline_event.internal_metadata.stream_ordering,
                     )
-                    if persisted_position.persisted_after(from_token.room_key):
+                    if persisted_position.persisted_after(
+                        from_token.stream_token.room_key
+                    ):
                         num_live += 1
                     else:
                         # Since we're iterating over the timeline events in
@@ -1555,12 +1658,6 @@ class SlidingSyncHandler:
         # indicate to the client that a state reset happened. Perhaps we should indicate
         # this by setting `initial: True` and empty `required_state`.
 
-        # TODO: Since we can't determine whether we've already sent a room down this
-        # Sliding Sync connection before (we plan to add this optimization in the
-        # future), we're always returning the requested room state instead of
-        # updates.
-        initial = True
-
         # Check whether the room has a name set
         name_state_ids = await self.get_current_state_ids_at(
             room_id=room_id,
@@ -1706,9 +1803,22 @@ class SlidingSyncHandler:
                 to_token=to_token,
             )
         else:
-            # TODO: Once we can figure out if we've sent a room down this connection before,
-            # we can return updates instead of the full required state.
-            raise NotImplementedError()
+            assert from_bound is not None
+
+            # TODO: Limit the number of state events we're about to send down
+            # the room, if its too many we should change this to an
+            # `initial=True`?
+            deltas = await self.store.get_current_state_deltas_for_room(
+                room_id=room_id,
+                from_token=from_bound,
+                to_token=to_token.room_key,
+            )
+            # TODO: Filter room state before fetching events
+            # TODO: Handle state resets where event_id is None
+            events = await self.store.get_events(
+                [d.event_id for d in deltas if d.event_id]
+            )
+            room_state = {(s.type, s.state_key): s for s in events.values()}
 
         required_room_state: StateMap[EventBase] = {}
         if required_state_filter != StateFilter.none():
@@ -1752,8 +1862,14 @@ class SlidingSyncHandler:
         bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
         # But if we found a bump event, use that instead
         if last_bump_event_result is not None:
-            _, bump_event_pos = last_bump_event_result
-            bump_stamp = bump_event_pos.stream
+            _, new_bump_event_pos = last_bump_event_result
+
+            # If we've just joined a remote room, then the last bump event may
+            # have been backfilled (and so have a negative stream ordering).
+            # These negative stream orderings can't sensibly be compared, so
+            # instead we use the membership event position.
+            if new_bump_event_pos.stream > 0:
+                bump_stamp = new_bump_event_pos.stream
 
         return SlidingSyncResult.RoomResult(
             name=room_name,
@@ -1782,16 +1898,19 @@ class SlidingSyncHandler:
             highlight_count=0,
         )
 
+    @trace
     async def get_extensions_response(
         self,
         sync_config: SlidingSyncConfig,
+        lists: Dict[str, SlidingSyncResult.SlidingWindowList],
         to_token: StreamToken,
-        from_token: Optional[StreamToken],
+        from_token: Optional[SlidingSyncStreamToken],
     ) -> SlidingSyncResult.Extensions:
         """Handle extension requests.
 
         Args:
             sync_config: Sync configuration
+            lists: Sliding window API. A map of list key to list results.
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
         """
@@ -1816,11 +1935,23 @@ class SlidingSyncHandler:
                 from_token=from_token,
             )
 
+        account_data_response = None
+        if sync_config.extensions.account_data is not None:
+            account_data_response = await self.get_account_data_extension_response(
+                sync_config=sync_config,
+                lists=lists,
+                account_data_request=sync_config.extensions.account_data,
+                to_token=to_token,
+                from_token=from_token,
+            )
+
         return SlidingSyncResult.Extensions(
             to_device=to_device_response,
             e2ee=e2ee_response,
+            account_data=account_data_response,
         )
 
+    @trace
     async def get_to_device_extension_response(
         self,
         sync_config: SlidingSyncConfig,
@@ -1835,7 +1966,7 @@ class SlidingSyncHandler:
             to_token: The point in the stream to sync up to.
         """
         user_id = sync_config.user.to_string()
-        device_id = sync_config.device_id
+        device_id = sync_config.requester.device_id
 
         # Skip if the extension is not enabled
         if not to_device_request.enabled:
@@ -1895,12 +2026,13 @@ class SlidingSyncHandler:
             events=messages,
         )
 
+    @trace
     async def get_e2ee_extension_response(
         self,
         sync_config: SlidingSyncConfig,
         e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension,
         to_token: StreamToken,
-        from_token: Optional[StreamToken],
+        from_token: Optional[SlidingSyncStreamToken],
     ) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]:
         """Handle E2EE device extension (MSC3884)
 
@@ -1911,7 +2043,7 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
         """
         user_id = sync_config.user.to_string()
-        device_id = sync_config.device_id
+        device_id = sync_config.requester.device_id
 
         # Skip if the extension is not enabled
         if not e2ee_request.enabled:
@@ -1922,7 +2054,7 @@ class SlidingSyncHandler:
             # TODO: This should take into account the `from_token` and `to_token`
             device_list_updates = await self.device_handler.get_user_ids_changed(
                 user_id=user_id,
-                from_token=from_token,
+                from_token=from_token.stream_token,
             )
 
         device_one_time_keys_count: Mapping[str, int] = {}
@@ -1944,3 +2076,358 @@ class SlidingSyncHandler:
             device_one_time_keys_count=device_one_time_keys_count,
             device_unused_fallback_key_types=device_unused_fallback_key_types,
         )
+
+    @trace
+    async def get_account_data_extension_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
+        to_token: StreamToken,
+        from_token: Optional[SlidingSyncStreamToken],
+    ) -> Optional[SlidingSyncResult.Extensions.AccountDataExtension]:
+        """Handle Account Data extension (MSC3959)
+
+        Args:
+            sync_config: Sync configuration
+            lists: Sliding window API. A map of list key to list results.
+            account_data_request: The account_data extension from the request
+            to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
+        """
+        user_id = sync_config.user.to_string()
+
+        # Skip if the extension is not enabled
+        if not account_data_request.enabled:
+            return None
+
+        global_account_data_map: Mapping[str, JsonMapping] = {}
+        if from_token is not None:
+            global_account_data_map = (
+                await self.store.get_updated_global_account_data_for_user(
+                    user_id, from_token.stream_token.account_data_key
+                )
+            )
+
+            have_push_rules_changed = await self.store.have_push_rules_changed_for_user(
+                user_id, from_token.stream_token.push_rules_key
+            )
+            if have_push_rules_changed:
+                global_account_data_map = dict(global_account_data_map)
+                global_account_data_map[AccountDataTypes.PUSH_RULES] = (
+                    await self.push_rules_handler.push_rules_for_user(sync_config.user)
+                )
+        else:
+            all_global_account_data = await self.store.get_global_account_data_for_user(
+                user_id
+            )
+
+            global_account_data_map = dict(all_global_account_data)
+            global_account_data_map[AccountDataTypes.PUSH_RULES] = (
+                await self.push_rules_handler.push_rules_for_user(sync_config.user)
+            )
+
+        # We only want to include account data for rooms that are already in the sliding
+        # sync response AND that were requested in the account data request.
+        relevant_room_ids: Set[str] = set()
+
+        # See what rooms from the room subscriptions we should get account data for
+        if (
+            account_data_request.rooms is not None
+            and sync_config.room_subscriptions is not None
+        ):
+            actual_room_ids = sync_config.room_subscriptions.keys()
+
+            for room_id in account_data_request.rooms:
+                # A wildcard means we process all rooms from the room subscriptions
+                if room_id == "*":
+                    relevant_room_ids.update(sync_config.room_subscriptions.keys())
+                    break
+
+                if room_id in actual_room_ids:
+                    relevant_room_ids.add(room_id)
+
+        # See what rooms from the sliding window lists we should get account data for
+        if account_data_request.lists is not None:
+            for list_key in account_data_request.lists:
+                # Just some typing because we share the variable name in multiple places
+                actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
+
+                # A wildcard means we process rooms from all lists
+                if list_key == "*":
+                    for actual_list in lists.values():
+                        # We only expect a single SYNC operation for any list
+                        assert len(actual_list.ops) == 1
+                        sync_op = actual_list.ops[0]
+                        assert sync_op.op == OperationType.SYNC
+
+                        relevant_room_ids.update(sync_op.room_ids)
+
+                    break
+
+                actual_list = lists.get(list_key)
+                if actual_list is not None:
+                    # We only expect a single SYNC operation for any list
+                    assert len(actual_list.ops) == 1
+                    sync_op = actual_list.ops[0]
+                    assert sync_op.op == OperationType.SYNC
+
+                    relevant_room_ids.update(sync_op.room_ids)
+
+        # Fetch room account data
+        account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
+        if len(relevant_room_ids) > 0:
+            if from_token is not None:
+                account_data_by_room_map = (
+                    await self.store.get_updated_room_account_data_for_user(
+                        user_id, from_token.stream_token.account_data_key
+                    )
+                )
+            else:
+                account_data_by_room_map = (
+                    await self.store.get_room_account_data_for_user(user_id)
+                )
+
+        # Filter down to the relevant rooms
+        account_data_by_room_map = {
+            room_id: account_data_map
+            for room_id, account_data_map in account_data_by_room_map.items()
+            if room_id in relevant_room_ids
+        }
+
+        return SlidingSyncResult.Extensions.AccountDataExtension(
+            global_account_data_map=global_account_data_map,
+            account_data_by_room_map=account_data_by_room_map,
+        )
+
+
+class HaveSentRoomFlag(Enum):
+    """Flag for whether we have sent the room down a sliding sync connection.
+
+    The valid state changes here are:
+        NEVER -> LIVE
+        LIVE -> PREVIOUSLY
+        PREVIOUSLY -> LIVE
+    """
+
+    # The room has never been sent down (or we have forgotten we have sent it
+    # down).
+    NEVER = 1
+
+    # We have previously sent the room down, but there are updates that we
+    # haven't sent down.
+    PREVIOUSLY = 2
+
+    # We have sent the room down and the client has received all updates.
+    LIVE = 3
+
+
+@attr.s(auto_attribs=True, slots=True, frozen=True)
+class HaveSentRoom:
+    """Whether we have sent the room down a sliding sync connection.
+
+    Attributes:
+        status: Flag of if we have or haven't sent down the room
+        last_token: If the flag is `PREVIOUSLY` then this is non-null and
+            contains the last stream token of the last updates we sent down
+            the room, i.e. we still need to send everything since then to the
+            client.
+    """
+
+    status: HaveSentRoomFlag
+    last_token: Optional[RoomStreamToken]
+
+    @staticmethod
+    def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
+        """Constructor for `PREVIOUSLY` flag."""
+        return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
+
+
+HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
+HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
+
+
+@attr.s(auto_attribs=True)
+class SlidingSyncConnectionStore:
+    """In-memory store of per-connection state, including what rooms we have
+    previously sent down a sliding sync connection.
+
+    Note: This is NOT safe to run in a worker setup because connection positions will
+    point to different sets of rooms on different workers. e.g. for the same connection,
+    a connection position of 5 might have totally different states on worker A and
+    worker B.
+
+    One complication that we need to deal with here is needing to handle requests being
+    resent, i.e. if we sent down a room in a response that the client received, we must
+    consider the room *not* sent when we get the request again.
+
+    This is handled by using an integer "token", which is returned to the client
+    as part of the sync token. For each connection we store a mapping from
+    tokens to the room states, and create a new entry when we send down new
+    rooms.
+
+    Note that for any given sliding sync connection we will only store a maximum
+    of two different tokens: the previous token from the request and a new token
+    sent in the response. When we receive a request with a given token, we then
+    clear out all other entries with a different token.
+
+    Attributes:
+        _connections: Mapping from `(user_id, conn_id)` to mapping of `token`
+            to mapping of room ID to `HaveSentRoom`.
+    """
+
+    # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
+    _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
+        attr.Factory(dict)
+    )
+
+    async def have_sent_room(
+        self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
+    ) -> HaveSentRoom:
+        """For the given user_id/conn_id/token, return whether we have
+        previously sent the room down
+        """
+
+        conn_key = self._get_connection_key(sync_config)
+        sync_statuses = self._connections.setdefault(conn_key, {})
+        room_status = sync_statuses.get(connection_token, {}).get(
+            room_id, HAVE_SENT_ROOM_NEVER
+        )
+
+        return room_status
+
+    async def record_rooms(
+        self,
+        sync_config: SlidingSyncConfig,
+        from_token: Optional[SlidingSyncStreamToken],
+        *,
+        sent_room_ids: StrCollection,
+        unsent_room_ids: StrCollection,
+    ) -> int:
+        """Record which rooms we have/haven't sent down in a new response
+
+        Attributes:
+            sync_config
+            from_token: The since token from the request, if any
+            sent_room_ids: The set of room IDs that we have sent down as
+                part of this request (only needs to be ones we didn't
+                previously sent down).
+            unsent_room_ids: The set of room IDs that have had updates
+                since the `from_token`, but which were not included in
+                this request
+        """
+        prev_connection_token = 0
+        if from_token is not None:
+            prev_connection_token = from_token.connection_position
+
+        # If there are no changes then this is a noop.
+        if not sent_room_ids and not unsent_room_ids:
+            return prev_connection_token
+
+        conn_key = self._get_connection_key(sync_config)
+        sync_statuses = self._connections.setdefault(conn_key, {})
+
+        # Generate a new token, removing any existing entries in that token
+        # (which can happen if requests get resent).
+        new_store_token = prev_connection_token + 1
+        sync_statuses.pop(new_store_token, None)
+
+        # Copy over and update the room mappings.
+        new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
+
+        # Whether we have updated the `new_room_statuses`, if we don't by the
+        # end we can treat this as a noop.
+        have_updated = False
+        for room_id in sent_room_ids:
+            new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
+            have_updated = True
+
+        # Whether we add/update the entries for unsent rooms depends on the
+        # existing entry:
+        #   - LIVE: We have previously sent down everything up to
+        #     `last_room_token, so we update the entry to be `PREVIOUSLY` with
+        #     `last_room_token`.
+        #   - PREVIOUSLY: We have previously sent down everything up to *a*
+        #     given token, so we don't need to update the entry.
+        #   - NEVER: We have never previously sent down the room, and we haven't
+        #     sent anything down this time either so we leave it as NEVER.
+
+        # Work out the new state for unsent rooms that were `LIVE`.
+        if from_token:
+            new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
+        else:
+            new_unsent_state = HAVE_SENT_ROOM_NEVER
+
+        for room_id in unsent_room_ids:
+            prev_state = new_room_statuses.get(room_id)
+            if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
+                new_room_statuses[room_id] = new_unsent_state
+                have_updated = True
+
+        if not have_updated:
+            return prev_connection_token
+
+        sync_statuses[new_store_token] = new_room_statuses
+
+        return new_store_token
+
+    async def mark_token_seen(
+        self,
+        sync_config: SlidingSyncConfig,
+        from_token: Optional[SlidingSyncStreamToken],
+    ) -> None:
+        """We have received a request with the given token, so we can clear out
+        any other tokens associated with the connection.
+
+        If there is no from token then we have started afresh, and so we delete
+        all tokens associated with the device.
+        """
+        # Clear out any tokens for the connection that doesn't match the one
+        # from the request.
+
+        conn_key = self._get_connection_key(sync_config)
+        sync_statuses = self._connections.pop(conn_key, {})
+        if from_token is None:
+            return
+
+        sync_statuses = {
+            connection_token: room_statuses
+            for connection_token, room_statuses in sync_statuses.items()
+            if connection_token == from_token.connection_position
+        }
+        if sync_statuses:
+            self._connections[conn_key] = sync_statuses
+
+    @staticmethod
+    def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]:
+        """Return a unique identifier for this connection.
+
+        The first part is simply the user ID.
+
+        The second part is generally a combination of device ID and conn_id.
+        However, both these two are optional (e.g. puppet access tokens don't
+        have device IDs), so this handles those edge cases.
+
+        We use this over the raw `conn_id` to avoid clashes between different
+        clients that use the same `conn_id`. Imagine a user uses a web client
+        that uses `conn_id: main_sync_loop` and an Android client that also has
+        a `conn_id: main_sync_loop`.
+        """
+
+        user_id = sync_config.user.to_string()
+
+        # Only one sliding sync connection is allowed per given conn_id (empty
+        # or not).
+        conn_id = sync_config.conn_id or ""
+
+        if sync_config.requester.device_id:
+            return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}")
+
+        if sync_config.requester.access_token_id:
+            # If we don't have a device, then the access token ID should be a
+            # stable ID.
+            return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}")
+
+        # If we have neither then its likely an AS or some weird token. Either
+        # way we can just fail here.
+        raise Exception("Cannot use sliding sync with access token type")
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 93fe1d439e..ccfce6bd53 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -52,9 +52,9 @@ from synapse.http.servlet import (
     parse_string,
 )
 from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import trace_with_opname
+from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname
 from synapse.rest.admin.experimental_features import ExperimentalFeature
-from synapse.types import JsonDict, Requester, StreamToken
+from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
 from synapse.types.rest.client import SlidingSyncBody
 from synapse.util import json_decoder
 from synapse.util.caches.lrucache import LruCache
@@ -881,7 +881,6 @@ class SlidingSyncRestServlet(RestServlet):
         )
 
         user = requester.user
-        device_id = requester.device_id
 
         timeout = parse_integer(request, "timeout", default=0)
         # Position in the stream
@@ -889,22 +888,41 @@ class SlidingSyncRestServlet(RestServlet):
 
         from_token = None
         if from_token_string is not None:
-            from_token = await StreamToken.from_string(self.store, from_token_string)
+            from_token = await SlidingSyncStreamToken.from_string(
+                self.store, from_token_string
+            )
 
         # TODO: We currently don't know whether we're going to use sticky params or
         # maybe some filters like sync v2  where they are built up once and referenced
         # by filter ID. For now, we will just prototype with always passing everything
         # in.
         body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
-        logger.info("Sliding sync request: %r", body)
+
+        # Tag and log useful data to differentiate requests.
+        set_tag("sliding_sync.conn_id", body.conn_id or "")
+        log_kv(
+            {
+                "sliding_sync.lists": {
+                    list_name: {
+                        "ranges": list_config.ranges,
+                        "timeline_limit": list_config.timeline_limit,
+                    }
+                    for list_name, list_config in (body.lists or {}).items()
+                },
+                "sliding_sync.room_subscriptions": list(
+                    (body.room_subscriptions or {}).keys()
+                ),
+            }
+        )
 
         sync_config = SlidingSyncConfig(
             user=user,
-            device_id=device_id,
+            requester=requester,
             # FIXME: Currently, we're just manually copying the fields from the
-            # `SlidingSyncBody` into the config. How can we gurantee into the future
+            # `SlidingSyncBody` into the config. How can we guarantee into the future
             # that we don't forget any? I would like something more structured like
             # `copy_attributes(from=body, to=config)`
+            conn_id=body.conn_id,
             lists=body.lists,
             room_subscriptions=body.room_subscriptions,
             extensions=body.extensions,
@@ -927,7 +945,6 @@ class SlidingSyncRestServlet(RestServlet):
 
         return 200, response_content
 
-    # TODO: Is there a better way to encode things?
     async def encode_response(
         self,
         requester: Requester,
@@ -1115,6 +1132,24 @@ class SlidingSyncRestServlet(RestServlet):
                     extensions.e2ee.device_list_updates.left
                 )
 
+        if extensions.account_data is not None:
+            serialized_extensions["account_data"] = {
+                # Same as the the top-level `account_data.events` field in Sync v2.
+                "global": [
+                    {"type": account_data_type, "content": content}
+                    for account_data_type, content in extensions.account_data.global_account_data_map.items()
+                ],
+                # Same as the joined room's account_data field in Sync v2, e.g the path
+                # `rooms.join["!foo:bar"].account_data.events`.
+                "rooms": {
+                    room_id: [
+                        {"type": account_data_type, "content": content}
+                        for account_data_type, content in event_map.items()
+                    ]
+                    for room_id, event_map in extensions.account_data.account_data_by_room_map.items()
+                },
+            }
+
         return serialized_extensions
 
 
diff --git a/synapse/server.py b/synapse/server.py
index 4a3f9ff934..46b9d83a04 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -559,6 +559,7 @@ class HomeServer(metaclass=abc.ABCMeta):
     def get_sync_handler(self) -> SyncHandler:
         return SyncHandler(self)
 
+    @cache_in_self
     def get_sliding_sync_handler(self) -> SlidingSyncHandler:
         return SlidingSyncHandler(self)
 
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 036972ac25..da3ebe66b8 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -26,6 +26,8 @@ import attr
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import LoggingTransaction
+from synapse.storage.databases.main.stream import _filter_results_by_stream
+from synapse.types import RoomStreamToken
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 logger = logging.getLogger(__name__)
@@ -156,3 +158,38 @@ class StateDeltasStore(SQLBaseStore):
             "get_max_stream_id_in_current_state_deltas",
             self._get_max_stream_id_in_current_state_deltas_txn,
         )
+
+    async def get_current_state_deltas_for_room(
+        self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
+    ) -> List[StateDelta]:
+        """Get the state deltas between two tokens."""
+
+        def get_current_state_deltas_for_room_txn(
+            txn: LoggingTransaction,
+        ) -> List[StateDelta]:
+            sql = """
+                SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id
+                FROM current_state_delta_stream
+                WHERE room_id = ? AND ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            txn.execute(
+                sql, (room_id, from_token.stream, to_token.get_max_stream_pos())
+            )
+
+            return [
+                StateDelta(
+                    stream_id=row[1],
+                    room_id=room_id,
+                    event_type=row[2],
+                    state_key=row[3],
+                    event_id=row[4],
+                    prev_event_id=row[5],
+                )
+                for row in txn
+                if _filter_results_by_stream(from_token, to_token, row[0], row[1])
+            ]
+
+        return await self.db_pool.runInteraction(
+            "get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
+        )
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b034361aec..4207e73c7f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -2104,3 +2104,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             return RoomStreamToken(stream=last_position.stream - 1)
 
         return None
+
+    def get_rooms_that_might_have_updates(
+        self, room_ids: StrCollection, from_token: RoomStreamToken
+    ) -> StrCollection:
+        """Filters given room IDs down to those that might have updates, i.e.
+        removes rooms that definitely do not have updates.
+        """
+        return self._events_stream_cache.get_entities_changed(
+            room_ids, from_token.stream
+        )
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index c0d30ac2a3..5259550f1c 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -1161,6 +1161,49 @@ StreamToken.START = StreamToken(
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
+class SlidingSyncStreamToken:
+    """The same as a `StreamToken`, but includes an extra field at the start for
+    the sliding sync connection token (separated by a '/'). This is used to
+    store per-connection state.
+
+    This then looks something like:
+        5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379
+
+    Attributes:
+        stream_token: Token representing the position of all the standard
+            streams.
+        connection_position: Token used by sliding sync to track updates to any
+            per-connection state stored by Synapse.
+    """
+
+    stream_token: StreamToken
+    connection_position: int
+
+    @staticmethod
+    @cancellable
+    async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken":
+        """Creates a SlidingSyncStreamToken from its textual representation."""
+        try:
+            connection_position_str, stream_token_str = string.split("/", 1)
+            connection_position = int(connection_position_str)
+            stream_token = await StreamToken.from_string(store, stream_token_str)
+
+            return SlidingSyncStreamToken(
+                stream_token=stream_token,
+                connection_position=connection_position,
+            )
+        except CancelledError:
+            raise
+        except Exception:
+            raise SynapseError(400, "Invalid stream token")
+
+    async def to_string(self, store: "DataStore") -> str:
+        """Serializes the token to a string"""
+        stream_token_str = await self.stream_token.to_string(store)
+        return f"{self.connection_position}/{stream_token_str}"
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class PersistedPosition:
     """Position of a newly persisted row with instance that persisted it."""
 
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 4c6c42db04..f26cc0e903 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -31,7 +31,15 @@ else:
     from pydantic import Extra
 
 from synapse.events import EventBase
-from synapse.types import DeviceListUpdates, JsonDict, JsonMapping, StreamToken, UserID
+from synapse.types import (
+    DeviceListUpdates,
+    JsonDict,
+    JsonMapping,
+    Requester,
+    SlidingSyncStreamToken,
+    StreamToken,
+    UserID,
+)
 from synapse.types.rest.client import SlidingSyncBody
 
 if TYPE_CHECKING:
@@ -102,7 +110,7 @@ class SlidingSyncConfig(SlidingSyncBody):
     """
 
     user: UserID
-    device_id: Optional[str]
+    requester: Requester
 
     # Pydantic config
     class Config:
@@ -230,6 +238,17 @@ class SlidingSyncResult:
         notification_count: int
         highlight_count: int
 
+        def __bool__(self) -> bool:
+            return (
+                # If this is the first time the client is seeing the room, we should not filter it out
+                # under any circumstance.
+                self.initial
+                # We need to let the client know if there are any new events
+                or bool(self.required_state)
+                or bool(self.timeline_events)
+                or bool(self.stripped_state)
+            )
+
     @attr.s(slots=True, frozen=True, auto_attribs=True)
     class SlidingWindowList:
         """
@@ -323,13 +342,33 @@ class SlidingSyncResult:
                     or self.device_unused_fallback_key_types
                 )
 
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class AccountDataExtension:
+            """The Account Data extension (MSC3959)
+
+            Attributes:
+                global_account_data_map: Mapping from `type` to `content` of global account
+                    data events.
+                account_data_by_room_map: Mapping from room_id to mapping of `type` to
+                    `content` of room account data events.
+            """
+
+            global_account_data_map: Mapping[str, JsonMapping]
+            account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]]
+
+            def __bool__(self) -> bool:
+                return bool(
+                    self.global_account_data_map or self.account_data_by_room_map
+                )
+
         to_device: Optional[ToDeviceExtension] = None
         e2ee: Optional[E2eeExtension] = None
+        account_data: Optional[AccountDataExtension] = None
 
         def __bool__(self) -> bool:
-            return bool(self.to_device or self.e2ee)
+            return bool(self.to_device or self.e2ee or self.account_data)
 
-    next_pos: StreamToken
+    next_pos: SlidingSyncStreamToken
     lists: Dict[str, SlidingWindowList]
     rooms: Dict[str, RoomResult]
     extensions: Extensions
@@ -339,10 +378,14 @@ class SlidingSyncResult:
         to tell if the notifier needs to wait for more events when polling for
         events.
         """
-        return bool(self.lists or self.rooms or self.extensions)
+        # We don't include `self.lists` here, as a) `lists` is always non-empty even if
+        # there are no changes, and b) since we're sorting rooms by `stream_ordering` of
+        # the latest activity, anything that would cause the order to change would end
+        # up in `self.rooms` and cause us to send down the change.
+        return bool(self.rooms or self.extensions)
 
     @staticmethod
-    def empty(next_pos: StreamToken) -> "SlidingSyncResult":
+    def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
         "Return a new empty result"
         return SlidingSyncResult(
             next_pos=next_pos,
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index f3c45a0d6a..dfe3b1e0f7 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -120,6 +120,9 @@ class SlidingSyncBody(RequestBodyModel):
     Sliding Sync API request body.
 
     Attributes:
+        conn_id: An optional string to identify this connection to the server.
+            Only one sliding sync connection is allowed per given conn_id (empty
+            or not).
         lists: Sliding window API. A map of list key to list information
             (:class:`SlidingSyncList`). Max lists: 100. The list keys should be
             arbitrary strings which the client is using to refer to the list. Keep this
@@ -322,8 +325,28 @@ class SlidingSyncBody(RequestBodyModel):
 
             enabled: Optional[StrictBool] = False
 
+        class AccountDataExtension(RequestBodyModel):
+            """The Account Data extension (MSC3959)
+
+            Attributes:
+                enabled
+                lists: List of list keys (from the Sliding Window API) to apply this
+                    extension to.
+                rooms: List of room IDs (from the Room Subscription API) to apply this
+                    extension to.
+            """
+
+            enabled: Optional[StrictBool] = False
+            # Process all lists defined in the Sliding Window API. (This is the default.)
+            lists: Optional[List[StrictStr]] = ["*"]
+            # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+            rooms: Optional[List[StrictStr]] = ["*"]
+
         to_device: Optional[ToDeviceExtension] = None
         e2ee: Optional[E2eeExtension] = None
+        account_data: Optional[AccountDataExtension] = None
+
+    conn_id: Optional[str]
 
     # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
     if TYPE_CHECKING:
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
index 0e6352ff4b..8a3dfdcf75 100644
--- a/tests/handlers/test_e2e_keys.py
+++ b/tests/handlers/test_e2e_keys.py
@@ -43,9 +43,7 @@ from tests.unittest import override_config
 class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
         self.appservice_api = mock.AsyncMock()
-        return self.setup_test_homeserver(
-            federation_client=mock.Mock(), application_service_api=self.appservice_api
-        )
+        return self.setup_test_homeserver(application_service_api=self.appservice_api)
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.handler = hs.get_e2e_keys_handler()
@@ -1224,6 +1222,61 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
             },
         )
 
+    def test_query_devices_remote_down(self) -> None:
+        """Tests that querying keys for a remote user on an unreachable server returns
+        results in the "failures" property
+        """
+
+        remote_user_id = "@test:other"
+        local_user_id = "@test:test"
+
+        # The backoff code treats time zero as special
+        self.reactor.advance(5)
+
+        self.hs.get_federation_http_client().agent.request = mock.AsyncMock(  # type: ignore[method-assign]
+            side_effect=Exception("boop")
+        )
+
+        e2e_handler = self.hs.get_e2e_keys_handler()
+
+        query_result = self.get_success(
+            e2e_handler.query_devices(
+                {
+                    "device_keys": {remote_user_id: []},
+                },
+                timeout=10,
+                from_user_id=local_user_id,
+                from_device_id="some_device_id",
+            )
+        )
+
+        self.assertEqual(
+            query_result["failures"],
+            {
+                "other": {
+                    "message": "Failed to send request: Exception: boop",
+                    "status": 503,
+                }
+            },
+        )
+
+        # Do it again: we should hit the backoff
+        query_result = self.get_success(
+            e2e_handler.query_devices(
+                {
+                    "device_keys": {remote_user_id: []},
+                },
+                timeout=10,
+                from_user_id=local_user_id,
+                from_device_id="some_device_id",
+            )
+        )
+
+        self.assertEqual(
+            query_result["failures"],
+            {"other": {"message": "Not ready for retry", "status": 503}},
+        )
+
     @parameterized.expand(
         [
             # The remote homeserver's response indicates that this user has 0/1/2 devices.
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 2628869de6..5abf1041be 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -21,7 +21,7 @@
 import json
 import logging
 from http import HTTPStatus
-from typing import Any, Dict, Iterable, List
+from typing import Any, Dict, Iterable, List, Literal, Optional, Tuple
 
 from parameterized import parameterized, parameterized_class
 
@@ -37,6 +37,7 @@ from synapse.api.constants import (
     ReceiptTypes,
     RelationTypes,
 )
+from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase
 from synapse.handlers.sliding_sync import StateValues
 from synapse.rest.client import (
@@ -50,16 +51,24 @@ from synapse.rest.client import (
     sync,
 )
 from synapse.server import HomeServer
-from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID
+from synapse.types import (
+    JsonDict,
+    RoomStreamToken,
+    SlidingSyncStreamToken,
+    StreamKeyType,
+    StreamToken,
+    UserID,
+)
+from synapse.types.handlers import SlidingSyncConfig
 from synapse.util import Clock
+from synapse.util.stringutils import random_string
 
 from tests import unittest
 from tests.federation.transport.test_knocking import (
     KnockingStrippedStateEventHelperMixin,
 )
-from tests.server import FakeChannel, TimedOutException
-from tests.test_utils.event_injection import mark_event_as_partial_state
-from tests.unittest import skip_unless
+from tests.server import TimedOutException
+from tests.test_utils.event_injection import create_event, mark_event_as_partial_state
 
 logger = logging.getLogger(__name__)
 
@@ -1225,7 +1234,131 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase):
         self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"])
 
 
-class SlidingSyncTestCase(unittest.HomeserverTestCase):
+class SlidingSyncBase(unittest.HomeserverTestCase):
+    """Base class for sliding sync test cases"""
+
+    sync_endpoint = "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
+
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        # Enable sliding sync
+        config["experimental_features"] = {"msc3575_enabled": True}
+        return config
+
+    def do_sync(
+        self, sync_body: JsonDict, *, since: Optional[str] = None, tok: str
+    ) -> Tuple[JsonDict, str]:
+        """Do a sliding sync request with given body.
+
+        Asserts the request was successful.
+
+        Attributes:
+            sync_body: The full request body to use
+            since: Optional since token
+            tok: Access token to use
+
+        Returns:
+            A tuple of the response body and the `pos` field.
+        """
+
+        sync_path = self.sync_endpoint
+        if since:
+            sync_path += f"?pos={since}"
+
+        channel = self.make_request(
+            method="POST",
+            path=sync_path,
+            content=sync_body,
+            access_token=tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        return channel.json_body, channel.json_body["pos"]
+
+    def _bump_notifier_wait_for_events(
+        self,
+        user_id: str,
+        wake_stream_key: Literal[
+            StreamKeyType.ACCOUNT_DATA,
+            StreamKeyType.PRESENCE,
+        ],
+    ) -> None:
+        """
+        Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
+        Sync results.
+
+        Args:
+            user_id: The user ID to wake up the notifier for
+            wake_stream_key: The stream key to wake up. This will create an actual new
+                entity in that stream so it's best to choose one that won't affect the
+                Sliding Sync results you're testing for. In other words, if your testing
+                account data, choose `StreamKeyType.PRESENCE` instead. We support two
+                possible stream keys because you're probably testing one or the other so
+                one is always a "safe" option.
+        """
+        # We're expecting some new activity from this point onwards
+        from_token = self.hs.get_event_sources().get_current_token()
+
+        triggered_notifier_wait_for_events = False
+
+        async def _on_new_acivity(
+            before_token: StreamToken, after_token: StreamToken
+        ) -> bool:
+            nonlocal triggered_notifier_wait_for_events
+            triggered_notifier_wait_for_events = True
+            return True
+
+        notifier = self.hs.get_notifier()
+
+        # Listen for some new activity for the user. We're just trying to confirm that
+        # our bump below actually does what we think it does (triggers new activity for
+        # the user).
+        result_awaitable = notifier.wait_for_events(
+            user_id,
+            1000,
+            _on_new_acivity,
+            from_token=from_token,
+        )
+
+        # Update the account data or presence so that `notifier.wait_for_events(...)`
+        # wakes up. We chose these two options because they're least likely to show up
+        # in the Sliding Sync response so it won't affect whether we have results.
+        if wake_stream_key == StreamKeyType.ACCOUNT_DATA:
+            self.get_success(
+                self.hs.get_account_data_handler().add_account_data_for_user(
+                    user_id,
+                    "org.matrix.foobarbaz",
+                    {"foo": "bar"},
+                )
+            )
+        elif wake_stream_key == StreamKeyType.PRESENCE:
+            sending_user_id = self.register_user(
+                "user_bump_notifier_wait_for_events_" + random_string(10), "pass"
+            )
+            sending_user_tok = self.login(sending_user_id, "pass")
+            test_msg = {"foo": "bar"}
+            chan = self.make_request(
+                "PUT",
+                "/_matrix/client/r0/sendToDevice/m.test/1234",
+                content={"messages": {user_id: {"d1": test_msg}}},
+                access_token=sending_user_tok,
+            )
+            self.assertEqual(chan.code, 200, chan.result)
+        else:
+            raise AssertionError(
+                "Unable to wake that stream in _bump_notifier_wait_for_events(...)"
+            )
+
+        # Wait for our notifier result
+        self.get_success(result_awaitable)
+
+        if not triggered_notifier_wait_for_events:
+            raise AssertionError(
+                "Expected `notifier.wait_for_events(...)` to be triggered"
+            )
+
+
+class SlidingSyncTestCase(SlidingSyncBase):
     """
     Tests regarding MSC3575 Sliding Sync `/sync` endpoint.
     """
@@ -1238,22 +1371,10 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         devices.register_servlets,
     ]
 
-    def default_config(self) -> JsonDict:
-        config = super().default_config()
-        # Enable sliding sync
-        config["experimental_features"] = {"msc3575_enabled": True}
-        return config
-
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.store = hs.get_datastores().main
-        self.sync_endpoint = (
-            "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
-        )
-        self.store = hs.get_datastores().main
         self.event_sources = hs.get_event_sources()
         self.storage_controllers = hs.get_storage_controllers()
-        self.account_data_handler = hs.get_account_data_handler()
-        self.notifier = hs.get_notifier()
 
     def _assertRequiredStateIncludes(
         self,
@@ -1379,94 +1500,41 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         return room_id
 
-    def _bump_notifier_wait_for_events(self, user_id: str) -> None:
-        """
-        Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
-        Sync results.
-        """
-        # We're expecting some new activity from this point onwards
-        from_token = self.event_sources.get_current_token()
-
-        triggered_notifier_wait_for_events = False
-
-        async def _on_new_acivity(
-            before_token: StreamToken, after_token: StreamToken
-        ) -> bool:
-            nonlocal triggered_notifier_wait_for_events
-            triggered_notifier_wait_for_events = True
-            return True
-
-        # Listen for some new activity for the user. We're just trying to confirm that
-        # our bump below actually does what we think it does (triggers new activity for
-        # the user).
-        result_awaitable = self.notifier.wait_for_events(
-            user_id,
-            1000,
-            _on_new_acivity,
-            from_token=from_token,
-        )
-
-        # Update the account data so that `notifier.wait_for_events(...)` wakes up.
-        # We're bumping account data because it won't show up in the Sliding Sync
-        # response so it won't affect whether we have results.
-        self.get_success(
-            self.account_data_handler.add_account_data_for_user(
-                user_id,
-                "org.matrix.foobarbaz",
-                {"foo": "bar"},
-            )
-        )
-
-        # Wait for our notifier result
-        self.get_success(result_awaitable)
-
-        if not triggered_notifier_wait_for_events:
-            raise AssertionError(
-                "Expected `notifier.wait_for_events(...)` to be triggered"
-            )
-
     def test_sync_list(self) -> None:
         """
         Test that room IDs show up in the Sliding Sync `lists`
         """
-        alice_user_id = self.register_user("alice", "correcthorse")
-        alice_access_token = self.login(alice_user_id, "correcthorse")
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
 
-        room_id = self.helper.create_room_as(
-            alice_user_id, tok=alice_access_token, is_public=True
-        )
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 99]],
-                        "required_state": [
-                            ["m.room.join_rules", ""],
-                            ["m.room.history_visibility", ""],
-                            ["m.space.child", "*"],
-                        ],
-                        "timeline_limit": 1,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 99]],
+                    "required_state": [
+                        ["m.room.join_rules", ""],
+                        ["m.room.history_visibility", ""],
+                        ["m.space.child", "*"],
+                    ],
+                    "timeline_limit": 1,
                 }
-            },
-            access_token=alice_access_token,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Make sure it has the foo-list we requested
         self.assertListEqual(
-            list(channel.json_body["lists"].keys()),
+            list(response_body["lists"].keys()),
             ["foo-list"],
-            channel.json_body["lists"].keys(),
+            response_body["lists"].keys(),
         )
 
         # Make sure the list includes the room we are joined to
         self.assertListEqual(
-            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            list(response_body["lists"]["foo-list"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -1474,15 +1542,15 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "room_ids": [room_id],
                 }
             ],
-            channel.json_body["lists"]["foo-list"],
+            response_body["lists"]["foo-list"],
         )
 
     def test_wait_for_sync_token(self) -> None:
         """
         Test that worker will wait until it catches up to the given token
         """
-        alice_user_id = self.register_user("alice", "correcthorse")
-        alice_access_token = self.login(alice_user_id, "correcthorse")
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
 
         # Create a future token that will cause us to wait. Since we never send a new
         # event to reach that future stream_ordering, the worker will wait until the
@@ -1496,27 +1564,28 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         future_position_token_serialized = self.get_success(
-            future_position_token.to_string(self.store)
+            SlidingSyncStreamToken(future_position_token, 0).to_string(self.store)
         )
 
         # Make the Sliding Sync request
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 99]],
+                    "required_state": [
+                        ["m.room.join_rules", ""],
+                        ["m.room.history_visibility", ""],
+                        ["m.space.child", "*"],
+                    ],
+                    "timeline_limit": 1,
+                }
+            }
+        }
         channel = self.make_request(
             "POST",
             self.sync_endpoint + f"?pos={future_position_token_serialized}",
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 99]],
-                        "required_state": [
-                            ["m.room.join_rules", ""],
-                            ["m.room.history_visibility", ""],
-                            ["m.space.child", "*"],
-                        ],
-                        "timeline_limit": 1,
-                    }
-                }
-            },
-            access_token=alice_access_token,
+            content=sync_body,
+            access_token=user1_tok,
             await_result=False,
         )
         # Block for 10 seconds to make `notifier.wait_for_stream_token(from_token)`
@@ -1544,23 +1613,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id, user1_id, tok=user1_tok)
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 0]],
+                    "required_state": [],
+                    "timeline_limit": 1,
+                }
+            }
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + "?timeout=10000"
-            + f"&pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 0]],
-                        "required_state": [],
-                        "timeline_limit": 1,
-                    }
-                }
-            },
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
             access_token=user1_tok,
             await_result=False,
         )
@@ -1587,12 +1655,6 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             channel.json_body["rooms"][room_id]["timeline"],
         )
 
-    # TODO: Once we remove `ops`, we should be able to add a `RoomResult.__bool__` to
-    # check if there are any updates since the `from_token`.
-    @skip_unless(
-        False,
-        "Once we remove ops from the Sliding Sync response, this test should pass",
-    )
     def test_wait_for_new_data_timeout(self) -> None:
         """
         Test to make sure that the Sliding Sync request waits for new data to arrive but
@@ -1607,23 +1669,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id, user1_id, tok=user1_tok)
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 0]],
+                    "required_state": [],
+                    "timeline_limit": 1,
+                }
+            }
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + "?timeout=10000"
-            + f"&pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 0]],
-                        "required_state": [],
-                        "timeline_limit": 1,
-                    }
-                }
-            },
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
             access_token=user1_tok,
             await_result=False,
         )
@@ -1632,7 +1693,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             channel.await_result(timeout_ms=5000)
         # Wake-up `notifier.wait_for_events(...)` that will cause us test
         # `SlidingSyncResult.__bool__` for new results.
-        self._bump_notifier_wait_for_events(user1_id)
+        self._bump_notifier_wait_for_events(
+            user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
+        )
         # Block for a little bit more to ensure we don't see any new results.
         with self.assertRaises(TimedOutException):
             channel.await_result(timeout_ms=4000)
@@ -1641,12 +1704,8 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         channel.await_result(timeout_ms=1200)
         self.assertEqual(channel.code, 200, channel.json_body)
 
-        # We still see rooms because that's how Sliding Sync lists work but we reached
-        # the timeout before seeing them
-        self.assertEqual(
-            [event["event_id"] for event in channel.json_body["rooms"].keys()],
-            [room_id],
-        )
+        # There should be no room sent down.
+        self.assertFalse(channel.json_body["rooms"])
 
     def test_filter_list(self) -> None:
         """
@@ -1682,55 +1741,50 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.invite(invite_room_id, src=user2_id, targ=user1_id, tok=user2_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    # Absense of filters does not imply "False" values
-                    "all": {
-                        "ranges": [[0, 99]],
-                        "required_state": [],
-                        "timeline_limit": 1,
-                        "filters": {},
-                    },
-                    # Test single truthy filter
-                    "dms": {
-                        "ranges": [[0, 99]],
-                        "required_state": [],
-                        "timeline_limit": 1,
-                        "filters": {"is_dm": True},
-                    },
-                    # Test single falsy filter
-                    "non-dms": {
-                        "ranges": [[0, 99]],
-                        "required_state": [],
-                        "timeline_limit": 1,
-                        "filters": {"is_dm": False},
-                    },
-                    # Test how multiple filters should stack (AND'd together)
-                    "room-invites": {
-                        "ranges": [[0, 99]],
-                        "required_state": [],
-                        "timeline_limit": 1,
-                        "filters": {"is_dm": False, "is_invite": True},
-                    },
-                }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        sync_body = {
+            "lists": {
+                # Absense of filters does not imply "False" values
+                "all": {
+                    "ranges": [[0, 99]],
+                    "required_state": [],
+                    "timeline_limit": 1,
+                    "filters": {},
+                },
+                # Test single truthy filter
+                "dms": {
+                    "ranges": [[0, 99]],
+                    "required_state": [],
+                    "timeline_limit": 1,
+                    "filters": {"is_dm": True},
+                },
+                # Test single falsy filter
+                "non-dms": {
+                    "ranges": [[0, 99]],
+                    "required_state": [],
+                    "timeline_limit": 1,
+                    "filters": {"is_dm": False},
+                },
+                # Test how multiple filters should stack (AND'd together)
+                "room-invites": {
+                    "ranges": [[0, 99]],
+                    "required_state": [],
+                    "timeline_limit": 1,
+                    "filters": {"is_dm": False, "is_invite": True},
+                },
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Make sure it has the foo-list we requested
         self.assertListEqual(
-            list(channel.json_body["lists"].keys()),
+            list(response_body["lists"].keys()),
             ["all", "dms", "non-dms", "room-invites"],
-            channel.json_body["lists"].keys(),
+            response_body["lists"].keys(),
         )
 
         # Make sure the lists have the correct rooms
         self.assertListEqual(
-            list(channel.json_body["lists"]["all"]["ops"]),
+            list(response_body["lists"]["all"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -1743,10 +1797,10 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     ],
                 }
             ],
-            list(channel.json_body["lists"]["all"]),
+            list(response_body["lists"]["all"]),
         )
         self.assertListEqual(
-            list(channel.json_body["lists"]["dms"]["ops"]),
+            list(response_body["lists"]["dms"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -1754,10 +1808,10 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "room_ids": [invited_dm_room_id, joined_dm_room_id],
                 }
             ],
-            list(channel.json_body["lists"]["dms"]),
+            list(response_body["lists"]["dms"]),
         )
         self.assertListEqual(
-            list(channel.json_body["lists"]["non-dms"]["ops"]),
+            list(response_body["lists"]["non-dms"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -1765,10 +1819,10 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "room_ids": [invite_room_id, room_id],
                 }
             ],
-            list(channel.json_body["lists"]["non-dms"]),
+            list(response_body["lists"]["non-dms"]),
         )
         self.assertListEqual(
-            list(channel.json_body["lists"]["room-invites"]["ops"]),
+            list(response_body["lists"]["room-invites"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -1776,14 +1830,14 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "room_ids": [invite_room_id],
                 }
             ],
-            list(channel.json_body["lists"]["room-invites"]),
+            list(response_body["lists"]["room-invites"]),
         )
 
         # Ensure DM's are correctly marked
         self.assertDictEqual(
             {
                 room_id: room.get("is_dm")
-                for room_id, room in channel.json_body["rooms"].items()
+                for room_id, room in response_body["rooms"].items()
             },
             {
                 invite_room_id: None,
@@ -1810,36 +1864,31 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id2, "activity in room2", tok=user1_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 99]],
-                        "required_state": [
-                            ["m.room.join_rules", ""],
-                            ["m.room.history_visibility", ""],
-                            ["m.space.child", "*"],
-                        ],
-                        "timeline_limit": 1,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 99]],
+                    "required_state": [
+                        ["m.room.join_rules", ""],
+                        ["m.room.history_visibility", ""],
+                        ["m.space.child", "*"],
+                    ],
+                    "timeline_limit": 1,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Make sure it has the foo-list we requested
         self.assertListEqual(
-            list(channel.json_body["lists"].keys()),
+            list(response_body["lists"].keys()),
             ["foo-list"],
-            channel.json_body["lists"].keys(),
+            response_body["lists"].keys(),
         )
 
         # Make sure the list is sorted in the way we expect
         self.assertListEqual(
-            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            list(response_body["lists"]["foo-list"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -1847,7 +1896,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "room_ids": [room_id2, room_id1, room_id3],
                 }
             ],
-            channel.json_body["lists"]["foo-list"],
+            response_body["lists"]["foo-list"],
         )
 
     def test_sliced_windows(self) -> None:
@@ -1863,35 +1912,26 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
 
         # Make the Sliding Sync request for a single room
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 0]],
-                        "required_state": [
-                            ["m.room.join_rules", ""],
-                            ["m.room.history_visibility", ""],
-                            ["m.space.child", "*"],
-                        ],
-                        "timeline_limit": 1,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 0]],
+                    "required_state": [],
+                    "timeline_limit": 1,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Make sure it has the foo-list we requested
         self.assertListEqual(
-            list(channel.json_body["lists"].keys()),
+            list(response_body["lists"].keys()),
             ["foo-list"],
-            channel.json_body["lists"].keys(),
+            response_body["lists"].keys(),
         )
         # Make sure the list is sorted in the way we expect
         self.assertListEqual(
-            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            list(response_body["lists"]["foo-list"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -1899,39 +1939,30 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "room_ids": [room_id3],
                 }
             ],
-            channel.json_body["lists"]["foo-list"],
+            response_body["lists"]["foo-list"],
         )
 
         # Make the Sliding Sync request for the first two rooms
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            ["m.room.join_rules", ""],
-                            ["m.room.history_visibility", ""],
-                            ["m.space.child", "*"],
-                        ],
-                        "timeline_limit": 1,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 1,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Make sure it has the foo-list we requested
         self.assertListEqual(
-            list(channel.json_body["lists"].keys()),
+            list(response_body["lists"].keys()),
             ["foo-list"],
-            channel.json_body["lists"].keys(),
+            response_body["lists"].keys(),
         )
         # Make sure the list is sorted in the way we expect
         self.assertListEqual(
-            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            list(response_body["lists"]["foo-list"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -1939,7 +1970,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "room_ids": [room_id3, room_id2],
                 }
             ],
-            channel.json_body["lists"]["foo-list"],
+            response_body["lists"]["foo-list"],
         )
 
     def test_rooms_meta_when_joined(self) -> None:
@@ -1970,43 +2001,38 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Reflect the current state of the room
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["name"],
+            response_body["rooms"][room_id1]["name"],
             "my super room",
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["avatar"],
+            response_body["rooms"][room_id1]["avatar"],
             "mxc://DUMMY_MEDIA_ID",
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["joined_count"],
+            response_body["rooms"][room_id1]["joined_count"],
             2,
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["invited_count"],
+            response_body["rooms"][room_id1]["invited_count"],
             0,
         )
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("is_dm"),
+            response_body["rooms"][room_id1].get("is_dm"),
         )
 
     def test_rooms_meta_when_invited(self) -> None:
@@ -2053,44 +2079,39 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # This should still reflect the current state of the room even when the user is
         # invited.
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["name"],
+            response_body["rooms"][room_id1]["name"],
             "my super duper room",
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["avatar"],
+            response_body["rooms"][room_id1]["avatar"],
             "mxc://UPDATED_DUMMY_MEDIA_ID",
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["joined_count"],
+            response_body["rooms"][room_id1]["joined_count"],
             1,
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["invited_count"],
+            response_body["rooms"][room_id1]["invited_count"],
             1,
         )
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("is_dm"),
+            response_body["rooms"][room_id1].get("is_dm"),
         )
 
     def test_rooms_meta_when_banned(self) -> None:
@@ -2137,45 +2158,40 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Reflect the state of the room at the time of leaving
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["name"],
+            response_body["rooms"][room_id1]["name"],
             "my super room",
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["avatar"],
+            response_body["rooms"][room_id1]["avatar"],
             "mxc://DUMMY_MEDIA_ID",
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["joined_count"],
+            response_body["rooms"][room_id1]["joined_count"],
             # FIXME: The actual number should be "1" (user2) but we currently don't
             # support this for rooms where the user has left/been banned.
             0,
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["invited_count"],
+            response_body["rooms"][room_id1]["invited_count"],
             0,
         )
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("is_dm"),
+            response_body["rooms"][room_id1].get("is_dm"),
         )
 
     def test_rooms_meta_heroes(self) -> None:
@@ -2215,61 +2231,56 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.invite(room_id2, src=user2_id, targ=user3_id, tok=user2_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Room1 has a name so we shouldn't see any `heroes` which the client would use
         # the calculate the room name themselves.
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["name"],
+            response_body["rooms"][room_id1]["name"],
             "my super room",
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("heroes"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("heroes"))
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["joined_count"],
+            response_body["rooms"][room_id1]["joined_count"],
             2,
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["invited_count"],
+            response_body["rooms"][room_id1]["invited_count"],
             1,
         )
 
         # Room2 doesn't have a name so we should see `heroes` populated
-        self.assertIsNone(channel.json_body["rooms"][room_id2].get("name"))
+        self.assertIsNone(response_body["rooms"][room_id2].get("name"))
         self.assertCountEqual(
             [
                 hero["user_id"]
-                for hero in channel.json_body["rooms"][room_id2].get("heroes", [])
+                for hero in response_body["rooms"][room_id2].get("heroes", [])
             ],
             # Heroes shouldn't include the user themselves (we shouldn't see user1)
             [user2_id, user3_id],
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id2]["joined_count"],
+            response_body["rooms"][room_id2]["joined_count"],
             2,
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id2]["invited_count"],
+            response_body["rooms"][room_id2]["invited_count"],
             1,
         )
 
         # We didn't request any state so we shouldn't see any `required_state`
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state"))
-        self.assertIsNone(channel.json_body["rooms"][room_id2].get("required_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
+        self.assertIsNone(response_body["rooms"][room_id2].get("required_state"))
 
     def test_rooms_meta_heroes_max(self) -> None:
         """
@@ -2308,44 +2319,39 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.join(room_id1, user7_id, tok=user7_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Room2 doesn't have a name so we should see `heroes` populated
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("name"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("name"))
         self.assertCountEqual(
             [
                 hero["user_id"]
-                for hero in channel.json_body["rooms"][room_id1].get("heroes", [])
+                for hero in response_body["rooms"][room_id1].get("heroes", [])
             ],
             # Heroes should be the first 5 users in the room (excluding the user
             # themselves, we shouldn't see `user1`)
             [user2_id, user3_id, user4_id, user5_id, user6_id],
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["joined_count"],
+            response_body["rooms"][room_id1]["joined_count"],
             7,
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["invited_count"],
+            response_body["rooms"][room_id1]["invited_count"],
             0,
         )
 
         # We didn't request any state so we shouldn't see any `required_state`
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
 
     def test_rooms_meta_heroes_when_banned(self) -> None:
         """
@@ -2386,28 +2392,23 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.invite(room_id1, src=user2_id, targ=user5_id, tok=user2_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Room2 doesn't have a name so we should see `heroes` populated
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("name"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("name"))
         self.assertCountEqual(
             [
                 hero["user_id"]
-                for hero in channel.json_body["rooms"][room_id1].get("heroes", [])
+                for hero in response_body["rooms"][room_id1].get("heroes", [])
             ],
             # Heroes shouldn't include the user themselves (we shouldn't see user1). We
             # also shouldn't see user4 since they joined after user1 was banned.
@@ -2418,13 +2419,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["joined_count"],
+            response_body["rooms"][room_id1]["joined_count"],
             # FIXME: The actual number should be "1" (user2) but we currently don't
             # support this for rooms where the user has left/been banned.
             0,
         )
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["invited_count"],
+            response_body["rooms"][room_id1]["invited_count"],
             # We shouldn't see user5 since they were invited after user1 was banned.
             #
             # FIXME: The actual number should be "1" (user3) but we currently don't
@@ -2457,46 +2458,41 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 3,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 3,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # We expect to saturate the `timeline_limit` (there are more than 3 messages in the room)
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["limited"],
+            response_body["rooms"][room_id1]["limited"],
             True,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         # Check to make sure the latest events are returned
         self.assertEqual(
             [
                 event["event_id"]
-                for event in channel.json_body["rooms"][room_id1]["timeline"]
+                for event in response_body["rooms"][room_id1]["timeline"]
             ],
             [
                 event_response4["event_id"],
                 event_response5["event_id"],
                 user1_join_response["event_id"],
             ],
-            channel.json_body["rooms"][room_id1]["timeline"],
+            response_body["rooms"][room_id1]["timeline"],
         )
 
         # Check to make sure the `prev_batch` points at the right place
         prev_batch_token = self.get_success(
             StreamToken.from_string(
-                self.store, channel.json_body["rooms"][room_id1]["prev_batch"]
+                self.store, response_body["rooms"][room_id1]["prev_batch"]
             )
         )
         prev_batch_room_stream_token_serialized = self.get_success(
@@ -2520,9 +2516,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         # With no `from_token` (initial sync), it's all historical since there is no
         # "live" range
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["num_live"],
+            response_body["rooms"][room_id1]["num_live"],
             0,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
 
     def test_rooms_not_limited_initial_sync(self) -> None:
@@ -2543,44 +2539,39 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         # Make the Sliding Sync request
         timeline_limit = 100
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": timeline_limit,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": timeline_limit,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # The timeline should be `limited=False` because we have all of the events (no
         # more to paginate to)
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["limited"],
+            response_body["rooms"][room_id1]["limited"],
             False,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         expected_number_of_events = 9
         # We're just looking to make sure we got all of the events before hitting the `timeline_limit`
         self.assertEqual(
-            len(channel.json_body["rooms"][room_id1]["timeline"]),
+            len(response_body["rooms"][room_id1]["timeline"]),
             expected_number_of_events,
-            channel.json_body["rooms"][room_id1]["timeline"],
+            response_body["rooms"][room_id1]["timeline"],
         )
         self.assertLessEqual(expected_number_of_events, timeline_limit)
 
         # With no `from_token` (initial sync), it's all historical since there is no
         # "live" token range.
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["num_live"],
+            response_body["rooms"][room_id1]["num_live"],
             0,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
 
     def test_rooms_incremental_sync(self) -> None:
@@ -2598,7 +2589,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         # Make an initial Sliding Sync request to grab a token. This is also a sanity
         # check that we can go from initial to incremental sync.
-        sync_params = {
+        sync_body = {
             "lists": {
                 "foo-list": {
                     "ranges": [[0, 1]],
@@ -2607,14 +2598,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                 }
             }
         }
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            sync_params,
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
-        next_pos = channel.json_body["pos"]
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         # Send some events but don't send enough to saturate the `timeline_limit`.
         # We want to later test that we only get the new events since the `next_pos`
@@ -2622,41 +2606,35 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
 
         # Make an incremental Sliding Sync request (what we're trying to test)
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint + f"?pos={next_pos}",
-            sync_params,
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # We only expect to see the new events since the last sync which isn't enough to
         # fill up the `timeline_limit`.
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["limited"],
+            response_body["rooms"][room_id1]["limited"],
             False,
-            f'Our `timeline_limit` was {sync_params["lists"]["foo-list"]["timeline_limit"]} '
-            + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
-            + str(channel.json_body["rooms"][room_id1]),
+            f'Our `timeline_limit` was {sync_body["lists"]["foo-list"]["timeline_limit"]} '
+            + f'and {len(response_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
+            + str(response_body["rooms"][room_id1]),
         )
         # Check to make sure the latest events are returned
         self.assertEqual(
             [
                 event["event_id"]
-                for event in channel.json_body["rooms"][room_id1]["timeline"]
+                for event in response_body["rooms"][room_id1]["timeline"]
             ],
             [
                 event_response2["event_id"],
                 event_response3["event_id"],
             ],
-            channel.json_body["rooms"][room_id1]["timeline"],
+            response_body["rooms"][room_id1]["timeline"],
         )
 
         # All events are "live"
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["num_live"],
+            response_body["rooms"][room_id1]["num_live"],
             2,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
 
     def test_rooms_bump_stamp(self) -> None:
@@ -2701,33 +2679,27 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Make the Sliding Sync request
-        timeline_limit = 100
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": timeline_limit,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 100,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Make sure it has the foo-list we requested
         self.assertListEqual(
-            list(channel.json_body["lists"].keys()),
+            list(response_body["lists"].keys()),
             ["foo-list"],
-            channel.json_body["lists"].keys(),
+            response_body["lists"].keys(),
         )
 
         # Make sure the list includes the rooms in the right order
         self.assertListEqual(
-            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            list(response_body["lists"]["foo-list"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -2737,23 +2709,137 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "room_ids": [room_id1, room_id2],
                 }
             ],
-            channel.json_body["lists"]["foo-list"],
+            response_body["lists"]["foo-list"],
         )
 
         # The `bump_stamp` for room1 should point at the latest message (not the
         # reaction since it's not one of the `DEFAULT_BUMP_EVENT_TYPES`)
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["bump_stamp"],
+            response_body["rooms"][room_id1]["bump_stamp"],
             event_pos1.stream,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
 
         # The `bump_stamp` for room2 should point at the latest message
         self.assertEqual(
-            channel.json_body["rooms"][room_id2]["bump_stamp"],
+            response_body["rooms"][room_id2]["bump_stamp"],
             event_pos2.stream,
-            channel.json_body["rooms"][room_id2],
+            response_body["rooms"][room_id2],
+        )
+
+    def test_rooms_bump_stamp_backfill(self) -> None:
+        """
+        Test that `bump_stamp` ignores backfilled events, i.e. events with a
+        negative stream ordering.
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a remote room
+        creator = "@user:other"
+        room_id = "!foo:other"
+        shared_kwargs = {
+            "room_id": room_id,
+            "room_version": "10",
+        }
+
+        create_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[],
+                type=EventTypes.Create,
+                state_key="",
+                sender=creator,
+                **shared_kwargs,
+            )
+        )
+        creator_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[create_tuple[0].event_id],
+                auth_event_ids=[create_tuple[0].event_id],
+                type=EventTypes.Member,
+                state_key=creator,
+                content={"membership": Membership.JOIN},
+                sender=creator,
+                **shared_kwargs,
+            )
+        )
+        # We add a message event as a valid "bump type"
+        msg_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[creator_tuple[0].event_id],
+                auth_event_ids=[create_tuple[0].event_id],
+                type=EventTypes.Message,
+                content={"body": "foo", "msgtype": "m.text"},
+                sender=creator,
+                **shared_kwargs,
+            )
+        )
+        invite_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[msg_tuple[0].event_id],
+                auth_event_ids=[create_tuple[0].event_id, creator_tuple[0].event_id],
+                type=EventTypes.Member,
+                state_key=user1_id,
+                content={"membership": Membership.INVITE},
+                sender=creator,
+                **shared_kwargs,
+            )
+        )
+
+        remote_events_and_contexts = [
+            create_tuple,
+            creator_tuple,
+            msg_tuple,
+            invite_tuple,
+        ]
+
+        # Ensure the local HS knows the room version
+        self.get_success(
+            self.store.store_room(room_id, creator, False, RoomVersions.V10)
+        )
+
+        # Persist these events as backfilled events.
+        persistence = self.hs.get_storage_controllers().persistence
+        assert persistence is not None
+
+        for event, context in remote_events_and_contexts:
+            self.get_success(persistence.persist_event(event, context, backfilled=True))
+
+        # Now we join the local user to the room
+        join_tuple = self.get_success(
+            create_event(
+                self.hs,
+                prev_event_ids=[invite_tuple[0].event_id],
+                auth_event_ids=[create_tuple[0].event_id, invite_tuple[0].event_id],
+                type=EventTypes.Member,
+                state_key=user1_id,
+                content={"membership": Membership.JOIN},
+                sender=user1_id,
+                **shared_kwargs,
+            )
         )
+        self.get_success(persistence.persist_event(*join_tuple))
+
+        # Doing an SS request should return a positive `bump_stamp`, even though
+        # the only event that matches the bump types has as negative stream
+        # ordering.
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 5,
+                }
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        self.assertGreater(response_body["rooms"][room_id]["bump_stamp"], 0)
 
     def test_rooms_newly_joined_incremental_sync(self) -> None:
         """
@@ -2771,7 +2857,20 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             room_id1, "activity before token2", tok=user2_tok
         )
 
-        from_token = self.event_sources.get_current_token()
+        # The `timeline_limit` is set to 4 so we can at least see one historical event
+        # before the `from_token`. We should see historical events because this is a
+        # `newly_joined` room.
+        timeline_limit = 4
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": timeline_limit,
+                }
+            }
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         # Join the room after the `from_token` which will make us consider this room as
         # `newly_joined`.
@@ -2786,42 +2885,23 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             room_id1, "activity after token4", tok=user2_tok
         )
 
-        # The `timeline_limit` is set to 4 so we can at least see one historical event
-        # before the `from_token`. We should see historical events because this is a
-        # `newly_joined` room.
-        timeline_limit = 4
         # Make an incremental Sliding Sync request (what we're trying to test)
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": timeline_limit,
-                    }
-                }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # We should see the new events and the rest should be filled with historical
         # events which will make us `limited=True` since there are more to paginate to.
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["limited"],
+            response_body["rooms"][room_id1]["limited"],
             True,
             f"Our `timeline_limit` was {timeline_limit} "
-            + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
-            + str(channel.json_body["rooms"][room_id1]),
+            + f'and {len(response_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
+            + str(response_body["rooms"][room_id1]),
         )
         # Check to make sure that the "live" and historical events are returned
         self.assertEqual(
             [
                 event["event_id"]
-                for event in channel.json_body["rooms"][room_id1]["timeline"]
+                for event in response_body["rooms"][room_id1]["timeline"]
             ],
             [
                 event_response2["event_id"],
@@ -2829,14 +2909,14 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                 event_response3["event_id"],
                 event_response4["event_id"],
             ],
-            channel.json_body["rooms"][room_id1]["timeline"],
+            response_body["rooms"][room_id1]["timeline"],
         )
 
         # Only events after the `from_token` are "live" (join, event3, event4)
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["num_live"],
+            response_body["rooms"][room_id1]["num_live"],
             3,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
 
     def test_rooms_invite_shared_history_initial_sync(self) -> None:
@@ -2873,51 +2953,46 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "activity after4", tok=user2_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 3,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 3,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # `timeline` is omitted for `invite` rooms with `stripped_state`
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("timeline"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("timeline"),
+            response_body["rooms"][room_id1],
         )
         # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("num_live"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("num_live"),
+            response_body["rooms"][room_id1],
         )
         # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("limited"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("limited"),
+            response_body["rooms"][room_id1],
         )
         # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("prev_batch"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("prev_batch"),
+            response_body["rooms"][room_id1],
         )
         # `required_state` is omitted for `invite` rooms with `stripped_state`
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("required_state"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("required_state"),
+            response_body["rooms"][room_id1],
         )
         # We should have some `stripped_state` so the potential joiner can identify the
         # room (we don't care about the order).
         self.assertCountEqual(
-            channel.json_body["rooms"][room_id1]["invite_state"],
+            response_body["rooms"][room_id1]["invite_state"],
             [
                 {
                     "content": {"creator": user2_id, "room_version": "10"},
@@ -2944,7 +3019,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "type": "m.room.member",
                 },
             ],
-            channel.json_body["rooms"][room_id1]["invite_state"],
+            response_body["rooms"][room_id1]["invite_state"],
         )
 
     def test_rooms_invite_shared_history_incremental_sync(self) -> None:
@@ -2980,58 +3055,54 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
         self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 3,
+                }
+            }
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         self.helper.send(room_id1, "activity after token5", tok=user2_tok)
         self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 3,
-                    }
-                }
-            },
-            access_token=user1_tok,
+        response_body, from_token = self.do_sync(
+            sync_body, since=from_token, tok=user1_tok
         )
-        self.assertEqual(channel.code, 200, channel.json_body)
 
         # `timeline` is omitted for `invite` rooms with `stripped_state`
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("timeline"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("timeline"),
+            response_body["rooms"][room_id1],
         )
         # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("num_live"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("num_live"),
+            response_body["rooms"][room_id1],
         )
         # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("limited"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("limited"),
+            response_body["rooms"][room_id1],
         )
         # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("prev_batch"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("prev_batch"),
+            response_body["rooms"][room_id1],
         )
         # `required_state` is omitted for `invite` rooms with `stripped_state`
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("required_state"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("required_state"),
+            response_body["rooms"][room_id1],
         )
         # We should have some `stripped_state` so the potential joiner can identify the
         # room (we don't care about the order).
         self.assertCountEqual(
-            channel.json_body["rooms"][room_id1]["invite_state"],
+            response_body["rooms"][room_id1]["invite_state"],
             [
                 {
                     "content": {"creator": user2_id, "room_version": "10"},
@@ -3058,7 +3129,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "type": "m.room.member",
                 },
             ],
-            channel.json_body["rooms"][room_id1]["invite_state"],
+            response_body["rooms"][room_id1]["invite_state"],
         )
 
     def test_rooms_invite_world_readable_history_initial_sync(self) -> None:
@@ -3112,52 +3183,47 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "activity after4", tok=user2_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        # Large enough to see the latest events and before the invite
-                        "timeline_limit": 4,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    # Large enough to see the latest events and before the invite
+                    "timeline_limit": 4,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # `timeline` is omitted for `invite` rooms with `stripped_state`
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("timeline"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("timeline"),
+            response_body["rooms"][room_id1],
         )
         # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("num_live"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("num_live"),
+            response_body["rooms"][room_id1],
         )
         # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("limited"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("limited"),
+            response_body["rooms"][room_id1],
         )
         # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("prev_batch"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("prev_batch"),
+            response_body["rooms"][room_id1],
         )
         # `required_state` is omitted for `invite` rooms with `stripped_state`
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("required_state"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("required_state"),
+            response_body["rooms"][room_id1],
         )
         # We should have some `stripped_state` so the potential joiner can identify the
         # room (we don't care about the order).
         self.assertCountEqual(
-            channel.json_body["rooms"][room_id1]["invite_state"],
+            response_body["rooms"][room_id1]["invite_state"],
             [
                 {
                     "content": {"creator": user2_id, "room_version": "10"},
@@ -3184,7 +3250,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "type": "m.room.member",
                 },
             ],
-            channel.json_body["rooms"][room_id1]["invite_state"],
+            response_body["rooms"][room_id1]["invite_state"],
         )
 
     def test_rooms_invite_world_readable_history_incremental_sync(self) -> None:
@@ -3237,59 +3303,53 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
         self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    # Large enough to see the latest events and before the invite
+                    "timeline_limit": 4,
+                }
+            }
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         self.helper.send(room_id1, "activity after token5", tok=user2_tok)
         self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
 
-        # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        # Large enough to see the latest events and before the invite
-                        "timeline_limit": 4,
-                    }
-                }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        # Make the incremental Sliding Sync request
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # `timeline` is omitted for `invite` rooms with `stripped_state`
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("timeline"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("timeline"),
+            response_body["rooms"][room_id1],
         )
         # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("num_live"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("num_live"),
+            response_body["rooms"][room_id1],
         )
         # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("limited"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("limited"),
+            response_body["rooms"][room_id1],
         )
         # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("prev_batch"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("prev_batch"),
+            response_body["rooms"][room_id1],
         )
         # `required_state` is omitted for `invite` rooms with `stripped_state`
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("required_state"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("required_state"),
+            response_body["rooms"][room_id1],
         )
         # We should have some `stripped_state` so the potential joiner can identify the
         # room (we don't care about the order).
         self.assertCountEqual(
-            channel.json_body["rooms"][room_id1]["invite_state"],
+            response_body["rooms"][room_id1]["invite_state"],
             [
                 {
                     "content": {"creator": user2_id, "room_version": "10"},
@@ -3316,7 +3376,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "type": "m.room.member",
                 },
             ],
-            channel.json_body["rooms"][room_id1]["invite_state"],
+            response_body["rooms"][room_id1]["invite_state"],
         )
 
     def test_rooms_ban_initial_sync(self) -> None:
@@ -3344,47 +3404,42 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "activity after6", tok=user2_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 3,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 3,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # We should see events before the ban but not after
         self.assertEqual(
             [
                 event["event_id"]
-                for event in channel.json_body["rooms"][room_id1]["timeline"]
+                for event in response_body["rooms"][room_id1]["timeline"]
             ],
             [
                 event_response3["event_id"],
                 event_response4["event_id"],
                 user1_ban_response["event_id"],
             ],
-            channel.json_body["rooms"][room_id1]["timeline"],
+            response_body["rooms"][room_id1]["timeline"],
         )
         # No "live" events in an initial sync (no `from_token` to define the "live"
         # range)
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["num_live"],
+            response_body["rooms"][room_id1]["num_live"],
             0,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         # There are more events to paginate to
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["limited"],
+            response_body["rooms"][room_id1]["limited"],
             True,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
 
     def test_rooms_ban_incremental_sync1(self) -> None:
@@ -3402,7 +3457,16 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "activity before2", tok=user2_tok)
         self.helper.join(room_id1, user1_id, tok=user1_tok)
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 4,
+                }
+            }
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
         event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
@@ -3415,48 +3479,33 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "activity after5", tok=user2_tok)
         self.helper.send(room_id1, "activity after6", tok=user2_tok)
 
-        # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 4,
-                    }
-                }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        # Make the incremental Sliding Sync request
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # We should see events before the ban but not after
         self.assertEqual(
             [
                 event["event_id"]
-                for event in channel.json_body["rooms"][room_id1]["timeline"]
+                for event in response_body["rooms"][room_id1]["timeline"]
             ],
             [
                 event_response3["event_id"],
                 event_response4["event_id"],
                 user1_ban_response["event_id"],
             ],
-            channel.json_body["rooms"][room_id1]["timeline"],
+            response_body["rooms"][room_id1]["timeline"],
         )
         # All live events in the incremental sync
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["num_live"],
+            response_body["rooms"][room_id1]["num_live"],
             3,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         # There aren't anymore events to paginate to in this range
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["limited"],
+            response_body["rooms"][room_id1]["limited"],
             False,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
 
     def test_rooms_ban_incremental_sync2(self) -> None:
@@ -3479,42 +3528,24 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         self.helper.send(room_id1, "activity after3", tok=user2_tok)
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 4,
+                }
+            }
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         self.helper.send(room_id1, "activity after4", tok=user2_tok)
 
-        # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [],
-                        "timeline_limit": 4,
-                    }
-                }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        # Make the incremental Sliding Sync request
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # Nothing to see for this banned user in the room in the token range
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("timeline"))
-        # No events returned in the timeline so nothing is "live"
-        self.assertEqual(
-            channel.json_body["rooms"][room_id1]["num_live"],
-            0,
-            channel.json_body["rooms"][room_id1],
-        )
-        # There aren't anymore events to paginate to in this range
-        self.assertEqual(
-            channel.json_body["rooms"][room_id1]["limited"],
-            False,
-            channel.json_body["rooms"][room_id1],
-        )
+        self.assertIsNone(response_body["rooms"].get(room_id1))
 
     def test_rooms_no_required_state(self) -> None:
         """
@@ -3529,27 +3560,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        # Empty `required_state`
-                        "required_state": [],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    # Empty `required_state`
+                    "required_state": [],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # No `required_state` in response
         self.assertIsNone(
-            channel.json_body["rooms"][room_id1].get("required_state"),
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1].get("required_state"),
+            response_body["rooms"][room_id1],
         )
 
     def test_rooms_required_state_initial_sync(self) -> None:
@@ -3566,40 +3592,35 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                            [EventTypes.RoomHistoryVisibility, ""],
-                            # This one doesn't exist in the room
-                            [EventTypes.Tombstone, ""],
-                        ],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        [EventTypes.RoomHistoryVisibility, ""],
+                        # This one doesn't exist in the room
+                        [EventTypes.Tombstone, ""],
+                    ],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         state_map = self.get_success(
             self.storage_controllers.state.get_current_state(room_id1)
         )
 
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             {
                 state_map[(EventTypes.Create, "")],
                 state_map[(EventTypes.RoomHistoryVisibility, "")],
             },
             exact=True,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_incremental_sync(self) -> None:
         """
@@ -3614,47 +3635,83 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id1, user1_id, tok=user1_tok)
 
-        after_room_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        [EventTypes.RoomHistoryVisibility, ""],
+                        # This one doesn't exist in the room
+                        [EventTypes.Tombstone, ""],
+                    ],
+                    "timeline_limit": 1,
+                }
+            }
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
-        # Make the Sliding Sync request
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(after_room_token.to_string(self.store))}",
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                            [EventTypes.RoomHistoryVisibility, ""],
-                            # This one doesn't exist in the room
-                            [EventTypes.Tombstone, ""],
-                        ],
-                        "timeline_limit": 0,
-                    }
+        # Send a message so the room comes down sync.
+        self.helper.send(room_id1, "msg", tok=user1_tok)
+
+        # Make the incremental Sliding Sync request
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        # We only return updates but only if we've sent the room down the
+        # connection before.
+        self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
+
+    def test_rooms_required_state_incremental_sync_restart(self) -> None:
+        """
+        Test `rooms.required_state` returns requested state events in the room during an
+        incremental sync, after a restart (and so the in memory caches are reset).
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        [EventTypes.RoomHistoryVisibility, ""],
+                        # This one doesn't exist in the room
+                        [EventTypes.Tombstone, ""],
+                    ],
+                    "timeline_limit": 1,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
+        # Reset the in-memory cache
+        self.hs.get_sliding_sync_handler().connection_store._connections.clear()
+
+        # Make the Sliding Sync request
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        # If the cache has been cleared then we do expect the state to come down
         state_map = self.get_success(
             self.storage_controllers.state.get_current_state(room_id1)
         )
 
-        # The returned state doesn't change from initial to incremental sync. In the
-        # future, we will only return updates but only if we've sent the room down the
-        # connection before.
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             {
                 state_map[(EventTypes.Create, "")],
                 state_map[(EventTypes.RoomHistoryVisibility, "")],
             },
             exact=True,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_wildcard(self) -> None:
         """
@@ -3684,35 +3741,30 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Make the Sliding Sync request with wildcards for the `event_type` and `state_key`
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [StateValues.WILDCARD, StateValues.WILDCARD],
-                        ],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [StateValues.WILDCARD, StateValues.WILDCARD],
+                    ],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         state_map = self.get_success(
             self.storage_controllers.state.get_current_state(room_id1)
         )
 
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             # We should see all the state events in the room
             state_map.values(),
             exact=True,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_wildcard_event_type(self) -> None:
         """
@@ -3743,23 +3795,18 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Make the Sliding Sync request with wildcards for the `event_type`
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [StateValues.WILDCARD, user2_id],
-                        ],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [StateValues.WILDCARD, user2_id],
+                    ],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         state_map = self.get_success(
             self.storage_controllers.state.get_current_state(room_id1)
@@ -3767,7 +3814,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         # We expect at-least any state event with the `user2_id` as the `state_key`
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             {
                 state_map[(EventTypes.Member, user2_id)],
                 state_map[("org.matrix.foo_state", user2_id)],
@@ -3776,7 +3823,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             # events when the `event_type` is a wildcard.
             exact=False,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_wildcard_state_key(self) -> None:
         """
@@ -3792,37 +3839,32 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         # Make the Sliding Sync request with wildcards for the `state_key`
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [EventTypes.Member, StateValues.WILDCARD],
-                        ],
-                        "timeline_limit": 0,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Member, StateValues.WILDCARD],
+                    ],
+                    "timeline_limit": 0,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         state_map = self.get_success(
             self.storage_controllers.state.get_current_state(room_id1)
         )
 
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             {
                 state_map[(EventTypes.Member, user1_id)],
                 state_map[(EventTypes.Member, user2_id)],
             },
             exact=True,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_lazy_loading_room_members(self) -> None:
         """
@@ -3845,24 +3887,19 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.send(room_id1, "3", tok=user2_tok)
 
         # Make the Sliding Sync request with lazy loading for the room members
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                            [EventTypes.Member, StateValues.LAZY],
-                        ],
-                        "timeline_limit": 3,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        [EventTypes.Member, StateValues.LAZY],
+                    ],
+                    "timeline_limit": 3,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         state_map = self.get_success(
             self.storage_controllers.state.get_current_state(room_id1)
@@ -3870,7 +3907,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         # Only user2 and user3 sent events in the 3 events we see in the `timeline`
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             {
                 state_map[(EventTypes.Create, "")],
                 state_map[(EventTypes.Member, user2_id)],
@@ -3878,7 +3915,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             },
             exact=True,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_me(self) -> None:
         """
@@ -3918,25 +3955,20 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Make the Sliding Sync request with a request for '$ME'.
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                            [EventTypes.Member, StateValues.ME],
-                            ["org.matrix.foo", StateValues.ME],
-                        ],
-                        "timeline_limit": 3,
-                    }
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        [EventTypes.Member, StateValues.ME],
+                        ["org.matrix.foo", StateValues.ME],
+                    ],
+                    "timeline_limit": 3,
                 }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         state_map = self.get_success(
             self.storage_controllers.state.get_current_state(room_id1)
@@ -3944,7 +3976,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         # Only user2 and user3 sent events in the 3 events we see in the `timeline`
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             {
                 state_map[(EventTypes.Create, "")],
                 state_map[(EventTypes.Member, user1_id)],
@@ -3952,7 +3984,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             },
             exact=True,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     @parameterized.expand([(Membership.LEAVE,), (Membership.BAN,)])
     def test_rooms_required_state_leave_ban(self, stop_membership: str) -> None:
@@ -3966,7 +3998,20 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         user3_id = self.register_user("user3", "pass")
         user3_tok = self.login(user3_id, "pass")
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        [EventTypes.Member, "*"],
+                        ["org.matrix.foo_state", ""],
+                    ],
+                    "timeline_limit": 3,
+                }
+            }
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id1, user1_id, tok=user1_tok)
@@ -4002,30 +4047,11 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.helper.leave(room_id1, user3_id, tok=user3_tok)
 
         # Make the Sliding Sync request with lazy loading for the room members
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                            [EventTypes.Member, "*"],
-                            ["org.matrix.foo_state", ""],
-                        ],
-                        "timeline_limit": 3,
-                    }
-                }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # Only user2 and user3 sent events in the 3 events we see in the `timeline`
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             {
                 state_map[(EventTypes.Create, "")],
                 state_map[(EventTypes.Member, user1_id)],
@@ -4035,7 +4061,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             },
             exact=True,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_combine_superset(self) -> None:
         """
@@ -4065,45 +4091,40 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Make the Sliding Sync request with wildcards for the `state_key`
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                            [EventTypes.Member, user1_id],
-                        ],
-                        "timeline_limit": 0,
-                    },
-                    "bar-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [EventTypes.Member, StateValues.WILDCARD],
-                            ["org.matrix.foo_state", ""],
-                        ],
-                        "timeline_limit": 0,
-                    },
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        [EventTypes.Member, user1_id],
+                    ],
+                    "timeline_limit": 0,
                 },
-                "room_subscriptions": {
-                    room_id1: {
-                        "required_state": [["org.matrix.bar_state", ""]],
-                        "timeline_limit": 0,
-                    }
+                "bar-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Member, StateValues.WILDCARD],
+                        ["org.matrix.foo_state", ""],
+                    ],
+                    "timeline_limit": 0,
                 },
             },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [["org.matrix.bar_state", ""]],
+                    "timeline_limit": 0,
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         state_map = self.get_success(
             self.storage_controllers.state.get_current_state(room_id1)
         )
 
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             {
                 state_map[(EventTypes.Create, "")],
                 state_map[(EventTypes.Member, user1_id)],
@@ -4113,7 +4134,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             },
             exact=True,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_partial_state(self) -> None:
         """
@@ -4136,28 +4157,23 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Make the Sliding Sync request (NOT lazy-loading room members)
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                        ],
-                        "timeline_limit": 0,
-                    },
-                }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                    ],
+                    "timeline_limit": 0,
+                },
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Make sure the list includes room1 but room2 is excluded because it's still
         # partially-stated
         self.assertListEqual(
-            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            list(response_body["lists"]["foo-list"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -4165,33 +4181,28 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "room_ids": [room_id1],
                 }
             ],
-            channel.json_body["lists"]["foo-list"],
+            response_body["lists"]["foo-list"],
         )
 
         # Make the Sliding Sync request (with lazy-loading room members)
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {
-                    "foo-list": {
-                        "ranges": [[0, 1]],
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                            # Lazy-load room members
-                            [EventTypes.Member, StateValues.LAZY],
-                        ],
-                        "timeline_limit": 0,
-                    },
-                }
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        # Lazy-load room members
+                        [EventTypes.Member, StateValues.LAZY],
+                    ],
+                    "timeline_limit": 0,
+                },
+            }
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # The list should include both rooms now because we're lazy-loading room members
         self.assertListEqual(
-            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            list(response_body["lists"]["foo-list"]["ops"]),
             [
                 {
                     "op": "SYNC",
@@ -4199,7 +4210,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                     "room_ids": [room_id2, room_id1],
                 }
             ],
-            channel.json_body["lists"]["foo-list"],
+            response_body["lists"]["foo-list"],
         )
 
     def test_room_subscriptions_with_join_membership(self) -> None:
@@ -4216,22 +4227,17 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         # Make the Sliding Sync request with just the room subscription
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "room_subscriptions": {
-                    room_id1: {
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                        ],
-                        "timeline_limit": 1,
-                    }
-                },
+        sync_body = {
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                    ],
+                    "timeline_limit": 1,
+                }
             },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         state_map = self.get_success(
             self.storage_controllers.state.get_current_state(room_id1)
@@ -4239,37 +4245,37 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         # We should see some state
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             {
                 state_map[(EventTypes.Create, "")],
             },
             exact=True,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
         # We should see some events
         self.assertEqual(
             [
                 event["event_id"]
-                for event in channel.json_body["rooms"][room_id1]["timeline"]
+                for event in response_body["rooms"][room_id1]["timeline"]
             ],
             [
                 join_response["event_id"],
             ],
-            channel.json_body["rooms"][room_id1]["timeline"],
+            response_body["rooms"][room_id1]["timeline"],
         )
         # No "live" events in an initial sync (no `from_token` to define the "live"
         # range)
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["num_live"],
+            response_body["rooms"][room_id1]["num_live"],
             0,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         # There are more events to paginate to
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["limited"],
+            response_body["rooms"][room_id1]["limited"],
             True,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
 
     def test_room_subscriptions_with_leave_membership(self) -> None:
@@ -4310,57 +4316,52 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Make the Sliding Sync request with just the room subscription
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "room_subscriptions": {
-                    room_id1: {
-                        "required_state": [
-                            ["org.matrix.foo_state", ""],
-                        ],
-                        "timeline_limit": 2,
-                    }
-                },
+        sync_body = {
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [
+                        ["org.matrix.foo_state", ""],
+                    ],
+                    "timeline_limit": 2,
+                }
             },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # We should see the state at the time of the leave
         self._assertRequiredStateIncludes(
-            channel.json_body["rooms"][room_id1]["required_state"],
+            response_body["rooms"][room_id1]["required_state"],
             {
                 state_map[("org.matrix.foo_state", "")],
             },
             exact=True,
         )
-        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
         # We should see some before we left (nothing after)
         self.assertEqual(
             [
                 event["event_id"]
-                for event in channel.json_body["rooms"][room_id1]["timeline"]
+                for event in response_body["rooms"][room_id1]["timeline"]
             ],
             [
                 join_response["event_id"],
                 leave_response["event_id"],
             ],
-            channel.json_body["rooms"][room_id1]["timeline"],
+            response_body["rooms"][room_id1]["timeline"],
         )
         # No "live" events in an initial sync (no `from_token` to define the "live"
         # range)
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["num_live"],
+            response_body["rooms"][room_id1]["num_live"],
             0,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
         # There are more events to paginate to
         self.assertEqual(
-            channel.json_body["rooms"][room_id1]["limited"],
+            response_body["rooms"][room_id1]["limited"],
             True,
-            channel.json_body["rooms"][room_id1],
+            response_body["rooms"][room_id1],
         )
 
     def test_room_subscriptions_no_leak_private_room(self) -> None:
@@ -4381,27 +4382,20 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Make the Sliding Sync request with just the room subscription
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "room_subscriptions": {
-                    room_id1: {
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                        ],
-                        "timeline_limit": 1,
-                    }
-                },
+        sync_body = {
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                    ],
+                    "timeline_limit": 1,
+                }
             },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # We should not see the room at all (we're not in it)
-        self.assertIsNone(
-            channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
-        )
+        self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"])
 
     def test_room_subscriptions_world_readable(self) -> None:
         """
@@ -4444,111 +4438,506 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         # Note: We never join the room
 
         # Make the Sliding Sync request with just the room subscription
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "room_subscriptions": {
-                    room_id1: {
-                        "required_state": [
-                            [EventTypes.Create, ""],
-                        ],
-                        "timeline_limit": 1,
-                    }
-                },
+        sync_body = {
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                    ],
+                    "timeline_limit": 1,
+                }
             },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # FIXME: In the future, we should be able to see the room because it's
         # `world_readable` but currently we don't support this.
-        self.assertIsNone(
-            channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
+        self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"])
+
+    def test_rooms_required_state_incremental_sync_LIVE(self) -> None:
+        """Test that we only get state updates in incremental sync for rooms
+        we've already seen (LIVE).
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Make the Sliding Sync request
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        [EventTypes.RoomHistoryVisibility, ""],
+                        # This one doesn't exist in the room
+                        [EventTypes.Name, ""],
+                    ],
+                    "timeline_limit": 0,
+                }
+            }
+        }
+
+        response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(room_id1)
         )
 
+        self._assertRequiredStateIncludes(
+            response_body["rooms"][room_id1]["required_state"],
+            {
+                state_map[(EventTypes.Create, "")],
+                state_map[(EventTypes.RoomHistoryVisibility, "")],
+            },
+            exact=True,
+        )
 
-class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
-    """Tests for the to-device sliding sync extension"""
+        # Send a state event
+        self.helper.send_state(
+            room_id1, EventTypes.Name, body={"name": "foo"}, tok=user2_tok
+        )
 
-    servlets = [
-        synapse.rest.admin.register_servlets,
-        login.register_servlets,
-        sync.register_servlets,
-        sendtodevice.register_servlets,
-    ]
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
-    def default_config(self) -> JsonDict:
-        config = super().default_config()
-        # Enable sliding sync
-        config["experimental_features"] = {"msc3575_enabled": True}
-        return config
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(room_id1)
+        )
 
-    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
-        self.store = hs.get_datastores().main
-        self.event_sources = hs.get_event_sources()
-        self.account_data_handler = hs.get_account_data_handler()
-        self.notifier = hs.get_notifier()
-        self.sync_endpoint = (
-            "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
+        self.assertNotIn("initial", response_body["rooms"][room_id1])
+        self._assertRequiredStateIncludes(
+            response_body["rooms"][room_id1]["required_state"],
+            {
+                state_map[(EventTypes.Name, "")],
+            },
+            exact=True,
         )
 
-    def _bump_notifier_wait_for_events(self, user_id: str) -> None:
+    @parameterized.expand([(False,), (True,)])
+    def test_rooms_timeline_incremental_sync_PREVIOUSLY(self, limited: bool) -> None:
         """
-        Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
-        Sync results.
+        Test getting room data where we have previously sent down the room, but
+        we missed sending down some timeline events previously and so its status
+        is considered PREVIOUSLY.
+
+        There are two versions of this test, one where there are more messages
+        than the timeline limit, and one where there isn't.
         """
-        # We're expecting some new activity from this point onwards
-        from_token = self.event_sources.get_current_token()
 
-        triggered_notifier_wait_for_events = False
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
 
-        async def _on_new_acivity(
-            before_token: StreamToken, after_token: StreamToken
-        ) -> bool:
-            nonlocal triggered_notifier_wait_for_events
-            triggered_notifier_wait_for_events = True
-            return True
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
 
-        # Listen for some new activity for the user. We're just trying to confirm that
-        # our bump below actually does what we think it does (triggers new activity for
-        # the user).
-        result_awaitable = self.notifier.wait_for_events(
-            user_id,
-            1000,
-            _on_new_acivity,
-            from_token=from_token,
+        self.helper.send(room_id1, "msg", tok=user1_tok)
+
+        timeline_limit = 5
+        conn_id = "conn_id"
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 0]],
+                    "required_state": [],
+                    "timeline_limit": timeline_limit,
+                }
+            },
+            "conn_id": "conn_id",
+        }
+
+        # The first room gets sent down the initial sync
+        response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok)
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
         )
 
-        # Update the account data so that `notifier.wait_for_events(...)` wakes up.
-        # We're bumping account data because it won't show up in the Sliding Sync
-        # response so it won't affect whether we have results.
-        self.get_success(
-            self.account_data_handler.add_account_data_for_user(
-                user_id,
-                "org.matrix.foobarbaz",
-                {"foo": "bar"},
+        # We now send down some events in room1 (depending on the test param).
+        expected_events = []  # The set of events in the timeline
+        if limited:
+            for _ in range(10):
+                resp = self.helper.send(room_id1, "msg1", tok=user1_tok)
+                expected_events.append(resp["event_id"])
+        else:
+            resp = self.helper.send(room_id1, "msg1", tok=user1_tok)
+            expected_events.append(resp["event_id"])
+
+        # A second messages happens in the other room, so room1 won't get sent down.
+        self.helper.send(room_id2, "msg", tok=user1_tok)
+
+        # Only the second room gets sent down sync.
+        response_body, from_token = self.do_sync(
+            sync_body, since=initial_from_token, tok=user1_tok
+        )
+
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
+        )
+
+        # FIXME: This is a hack to record that the first room wasn't sent down
+        # sync, as we don't implement that currently.
+        sliding_sync_handler = self.hs.get_sliding_sync_handler()
+        requester = self.get_success(
+            self.hs.get_auth().get_user_by_access_token(user1_tok)
+        )
+        sync_config = SlidingSyncConfig(
+            user=requester.user,
+            requester=requester,
+            conn_id=conn_id,
+        )
+
+        parsed_initial_from_token = self.get_success(
+            SlidingSyncStreamToken.from_string(self.store, initial_from_token)
+        )
+        connection_position = self.get_success(
+            sliding_sync_handler.connection_store.record_rooms(
+                sync_config,
+                parsed_initial_from_token,
+                sent_room_ids=[],
+                unsent_room_ids=[room_id1],
             )
         )
 
-        # Wait for our notifier result
-        self.get_success(result_awaitable)
+        # FIXME: Now fix up `from_token` with new connect position above.
+        parsed_from_token = self.get_success(
+            SlidingSyncStreamToken.from_string(self.store, from_token)
+        )
+        parsed_from_token = SlidingSyncStreamToken(
+            stream_token=parsed_from_token.stream_token,
+            connection_position=connection_position,
+        )
+        from_token = self.get_success(parsed_from_token.to_string(self.store))
 
-        if not triggered_notifier_wait_for_events:
-            raise AssertionError(
-                "Expected `notifier.wait_for_events(...)` to be triggered"
+        # We now send another event to room1, so we should sync all the missing events.
+        resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
+        expected_events.append(resp["event_id"])
+
+        # This sync should contain the messages from room1 not yet sent down.
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+        )
+        self.assertNotIn("initial", response_body["rooms"][room_id1])
+
+        self.assertEqual(
+            [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]],
+            expected_events[-timeline_limit:],
+        )
+        self.assertEqual(response_body["rooms"][room_id1]["limited"], limited)
+        self.assertEqual(response_body["rooms"][room_id1].get("required_state"), None)
+
+    def test_rooms_required_state_incremental_sync_PREVIOUSLY(self) -> None:
+        """
+        Test getting room data where we have previously sent down the room, but
+        we missed sending down some state previously and so its status is
+        considered PREVIOUSLY.
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        self.helper.send(room_id1, "msg", tok=user1_tok)
+
+        conn_id = "conn_id"
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 0]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        [EventTypes.RoomHistoryVisibility, ""],
+                        # This one doesn't exist in the room
+                        [EventTypes.Name, ""],
+                    ],
+                    "timeline_limit": 0,
+                }
+            },
+            "conn_id": "conn_id",
+        }
+
+        # The first room gets sent down the initial sync
+        response_body, initial_from_token = self.do_sync(sync_body, tok=user1_tok)
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+        )
+
+        # We now send down some state in room1
+        resp = self.helper.send_state(
+            room_id1, EventTypes.Name, {"name": "foo"}, tok=user1_tok
+        )
+        name_change_id = resp["event_id"]
+
+        # A second messages happens in the other room, so room1 won't get sent down.
+        self.helper.send(room_id2, "msg", tok=user1_tok)
+
+        # Only the second room gets sent down sync.
+        response_body, from_token = self.do_sync(
+            sync_body, since=initial_from_token, tok=user1_tok
+        )
+
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
+        )
+
+        # FIXME: This is a hack to record that the first room wasn't sent down
+        # sync, as we don't implement that currently.
+        sliding_sync_handler = self.hs.get_sliding_sync_handler()
+        requester = self.get_success(
+            self.hs.get_auth().get_user_by_access_token(user1_tok)
+        )
+        sync_config = SlidingSyncConfig(
+            user=requester.user,
+            requester=requester,
+            conn_id=conn_id,
+        )
+
+        parsed_initial_from_token = self.get_success(
+            SlidingSyncStreamToken.from_string(self.store, initial_from_token)
+        )
+        connection_position = self.get_success(
+            sliding_sync_handler.connection_store.record_rooms(
+                sync_config,
+                parsed_initial_from_token,
+                sent_room_ids=[],
+                unsent_room_ids=[room_id1],
             )
+        )
+
+        # FIXME: Now fix up `from_token` with new connect position above.
+        parsed_from_token = self.get_success(
+            SlidingSyncStreamToken.from_string(self.store, from_token)
+        )
+        parsed_from_token = SlidingSyncStreamToken(
+            stream_token=parsed_from_token.stream_token,
+            connection_position=connection_position,
+        )
+        from_token = self.get_success(parsed_from_token.to_string(self.store))
+
+        # We now send another event to room1, so we should sync all the missing state.
+        self.helper.send(room_id1, "msg", tok=user1_tok)
+
+        # This sync should contain the state changes from room1.
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+        )
+        self.assertNotIn("initial", response_body["rooms"][room_id1])
+
+        # We should only see the name change.
+        self.assertEqual(
+            [
+                ev["event_id"]
+                for ev in response_body["rooms"][room_id1]["required_state"]
+            ],
+            [name_change_id],
+        )
+
+    def test_rooms_required_state_incremental_sync_NEVER(self) -> None:
+        """
+        Test getting `required_state` where we have NEVER sent down the room before
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        self.helper.send(room_id1, "msg", tok=user1_tok)
+
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 0]],
+                    "required_state": [
+                        [EventTypes.Create, ""],
+                        [EventTypes.RoomHistoryVisibility, ""],
+                        # This one doesn't exist in the room
+                        [EventTypes.Name, ""],
+                    ],
+                    "timeline_limit": 1,
+                }
+            },
+        }
+
+        # A message happens in the other room, so room1 won't get sent down.
+        self.helper.send(room_id2, "msg", tok=user1_tok)
+
+        # Only the second room gets sent down sync.
+        response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
+        )
+
+        # We now send another event to room1, so we should send down the full
+        # room.
+        self.helper.send(room_id1, "msg2", tok=user1_tok)
+
+        # This sync should contain the messages from room1 not yet sent down.
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+        )
+
+        self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
+
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(room_id1)
+        )
+
+        self._assertRequiredStateIncludes(
+            response_body["rooms"][room_id1]["required_state"],
+            {
+                state_map[(EventTypes.Create, "")],
+                state_map[(EventTypes.RoomHistoryVisibility, "")],
+            },
+            exact=True,
+        )
+
+    def test_rooms_timeline_incremental_sync_NEVER(self) -> None:
+        """
+        Test getting timeline room data where we have NEVER sent down the room
+        before
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 0]],
+                    "required_state": [],
+                    "timeline_limit": 5,
+                }
+            },
+        }
+
+        expected_events = []
+        for _ in range(4):
+            resp = self.helper.send(room_id1, "msg", tok=user1_tok)
+            expected_events.append(resp["event_id"])
+
+        # A message happens in the other room, so room1 won't get sent down.
+        self.helper.send(room_id2, "msg", tok=user1_tok)
+
+        # Only the second room gets sent down sync.
+        response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
+        )
+
+        # We now send another event to room1 so it comes down sync
+        resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
+        expected_events.append(resp["event_id"])
+
+        # This sync should contain the messages from room1 not yet sent down.
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        self.assertCountEqual(
+            response_body["rooms"].keys(), {room_id1}, response_body["rooms"]
+        )
+
+        self.assertEqual(
+            [ev["event_id"] for ev in response_body["rooms"][room_id1]["timeline"]],
+            expected_events,
+        )
+        self.assertEqual(response_body["rooms"][room_id1]["limited"], True)
+        self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
+
+    def test_rooms_with_no_updates_do_not_come_down_incremental_sync(self) -> None:
+        """
+        Test that rooms with no updates are returned in subsequent incremental
+        syncs.
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            }
+        }
+
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make the incremental Sliding Sync request
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        # Nothing has happened in the room, so the room should not come down
+        # /sync.
+        self.assertIsNone(response_body["rooms"].get(room_id1))
+
+    def test_empty_initial_room_comes_down_sync(self) -> None:
+        """
+        Test that rooms come down /sync even with empty required state and
+        timeline limit in initial sync.
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        sync_body = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            }
+        }
+
+        # Make the Sliding Sync request
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+        self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
+
+
+class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
+    """Tests for the to-device sliding sync extension"""
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        sync.register_servlets,
+        sendtodevice.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
 
     def _assert_to_device_response(
-        self, channel: FakeChannel, expected_messages: List[JsonDict]
+        self, response_body: JsonDict, expected_messages: List[JsonDict]
     ) -> str:
         """Assert the sliding sync response was successful and has the expected
         to-device messages.
 
         Returns the next_batch token from the to-device section.
         """
-        self.assertEqual(channel.code, 200, channel.json_body)
-        extensions = channel.json_body["extensions"]
+        extensions = response_body["extensions"]
         to_device = extensions["to_device"]
         self.assertIsInstance(to_device["next_batch"], str)
         self.assertEqual(to_device["events"], expected_messages)
@@ -4562,22 +4951,18 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
 
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {},
-                "extensions": {
-                    "to_device": {
-                        "enabled": True,
-                    }
-                },
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "to_device": {
+                    "enabled": True,
+                }
             },
-            access_token=user1_tok,
-        )
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # We expect no to-device messages
-        self._assert_to_device_response(channel, [])
+        self._assert_to_device_response(response_body, [])
 
     def test_data_initial_sync(self) -> None:
         """Test that we get to-device messages when we don't specify a since
@@ -4598,21 +4983,17 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
         )
         self.assertEqual(chan.code, 200, chan.result)
 
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {},
-                "extensions": {
-                    "to_device": {
-                        "enabled": True,
-                    }
-                },
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "to_device": {
+                    "enabled": True,
+                }
             },
-            access_token=user1_tok,
-        )
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
         self._assert_to_device_response(
-            channel,
+            response_body,
             [{"content": test_msg, "sender": user2_id, "type": "m.test"}],
         )
 
@@ -4624,21 +5005,17 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
         user2_id = self.register_user("u2", "pass")
         user2_tok = self.login(user2_id, "pass", "d2")
 
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {},
-                "extensions": {
-                    "to_device": {
-                        "enabled": True,
-                    }
-                },
+        sync_body: JsonDict = {
+            "lists": {},
+            "extensions": {
+                "to_device": {
+                    "enabled": True,
+                }
             },
-            access_token=user1_tok,
-        )
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
         # No to-device messages yet.
-        next_batch = self._assert_to_device_response(channel, [])
+        next_batch = self._assert_to_device_response(response_body, [])
 
         test_msg = {"foo": "bar"}
         chan = self.make_request(
@@ -4649,59 +5026,47 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
         )
         self.assertEqual(chan.code, 200, chan.result)
 
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {},
-                "extensions": {
-                    "to_device": {
-                        "enabled": True,
-                        "since": next_batch,
-                    }
-                },
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "to_device": {
+                    "enabled": True,
+                    "since": next_batch,
+                }
             },
-            access_token=user1_tok,
-        )
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
         next_batch = self._assert_to_device_response(
-            channel,
+            response_body,
             [{"content": test_msg, "sender": user2_id, "type": "m.test"}],
         )
 
         # The next sliding sync request should not include the to-device
         # message.
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {},
-                "extensions": {
-                    "to_device": {
-                        "enabled": True,
-                        "since": next_batch,
-                    }
-                },
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "to_device": {
+                    "enabled": True,
+                    "since": next_batch,
+                }
             },
-            access_token=user1_tok,
-        )
-        self._assert_to_device_response(channel, [])
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+        self._assert_to_device_response(response_body, [])
 
         # An initial sliding sync request should not include the to-device
         # message, as it should have been deleted
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {},
-                "extensions": {
-                    "to_device": {
-                        "enabled": True,
-                    }
-                },
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "to_device": {
+                    "enabled": True,
+                }
             },
-            access_token=user1_tok,
-        )
-        self._assert_to_device_response(channel, [])
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+        self._assert_to_device_response(response_body, [])
 
     def test_wait_for_new_data(self) -> None:
         """
@@ -4714,22 +5079,21 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
         user2_id = self.register_user("u2", "pass")
         user2_tok = self.login(user2_id, "pass", "d2")
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "to_device": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + "?timeout=10000"
-            + f"&pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {},
-                "extensions": {
-                    "to_device": {
-                        "enabled": True,
-                    }
-                },
-            },
+            self.sync_endpoint + "?timeout=10000" + f"&pos={from_token}",
+            content=sync_body,
             access_token=user1_tok,
             await_result=False,
         )
@@ -4752,7 +5116,7 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
         self.assertEqual(channel.code, 200, channel.json_body)
 
         self._assert_to_device_response(
-            channel,
+            channel.json_body,
             [{"content": test_msg, "sender": user2_id, "type": "m.test"}],
         )
 
@@ -4765,22 +5129,21 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "to_device": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + "?timeout=10000"
-            + f"&pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {},
-                "extensions": {
-                    "to_device": {
-                        "enabled": True,
-                    }
-                },
-            },
+            self.sync_endpoint + "?timeout=10000" + f"&pos={from_token}",
+            content=sync_body,
             access_token=user1_tok,
             await_result=False,
         )
@@ -4789,7 +5152,9 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
             channel.await_result(timeout_ms=5000)
         # Wake-up `notifier.wait_for_events(...)` that will cause us test
         # `SlidingSyncResult.__bool__` for new results.
-        self._bump_notifier_wait_for_events(user1_id)
+        self._bump_notifier_wait_for_events(
+            user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
+        )
         # Block for a little bit more to ensure we don't see any new results.
         with self.assertRaises(TimedOutException):
             channel.await_result(timeout_ms=4000)
@@ -4798,10 +5163,10 @@ class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
         channel.await_result(timeout_ms=1200)
         self.assertEqual(channel.code, 200, channel.json_body)
 
-        self._assert_to_device_response(channel, [])
+        self._assert_to_device_response(channel.json_body, [])
 
 
-class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
+class SlidingSyncE2eeExtensionTestCase(SlidingSyncBase):
     """Tests for the e2ee sliding sync extension"""
 
     servlets = [
@@ -4812,67 +5177,9 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
         devices.register_servlets,
     ]
 
-    def default_config(self) -> JsonDict:
-        config = super().default_config()
-        # Enable sliding sync
-        config["experimental_features"] = {"msc3575_enabled": True}
-        return config
-
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.store = hs.get_datastores().main
-        self.event_sources = hs.get_event_sources()
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
-        self.account_data_handler = hs.get_account_data_handler()
-        self.notifier = hs.get_notifier()
-        self.sync_endpoint = (
-            "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
-        )
-
-    def _bump_notifier_wait_for_events(self, user_id: str) -> None:
-        """
-        Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
-        Sync results.
-        """
-        # We're expecting some new activity from this point onwards
-        from_token = self.event_sources.get_current_token()
-
-        triggered_notifier_wait_for_events = False
-
-        async def _on_new_acivity(
-            before_token: StreamToken, after_token: StreamToken
-        ) -> bool:
-            nonlocal triggered_notifier_wait_for_events
-            triggered_notifier_wait_for_events = True
-            return True
-
-        # Listen for some new activity for the user. We're just trying to confirm that
-        # our bump below actually does what we think it does (triggers new activity for
-        # the user).
-        result_awaitable = self.notifier.wait_for_events(
-            user_id,
-            1000,
-            _on_new_acivity,
-            from_token=from_token,
-        )
-
-        # Update the account data so that `notifier.wait_for_events(...)` wakes up.
-        # We're bumping account data because it won't show up in the Sliding Sync
-        # response so it won't affect whether we have results.
-        self.get_success(
-            self.account_data_handler.add_account_data_for_user(
-                user_id,
-                "org.matrix.foobarbaz",
-                {"foo": "bar"},
-            )
-        )
-
-        # Wait for our notifier result
-        self.get_success(result_awaitable)
-
-        if not triggered_notifier_wait_for_events:
-            raise AssertionError(
-                "Expected `notifier.wait_for_events(...)` to be triggered"
-            )
 
     def test_no_data_initial_sync(self) -> None:
         """
@@ -4883,27 +5190,22 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
         user1_tok = self.login(user1_id, "pass")
 
         # Make an initial Sliding Sync request with the e2ee extension enabled
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {},
-                "extensions": {
-                    "e2ee": {
-                        "enabled": True,
-                    }
-                },
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "e2ee": {
+                    "enabled": True,
+                }
             },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Device list updates are only present for incremental syncs
-        self.assertIsNone(channel.json_body["extensions"]["e2ee"].get("device_lists"))
+        self.assertIsNone(response_body["extensions"]["e2ee"].get("device_lists"))
 
         # Both of these should be present even when empty
         self.assertEqual(
-            channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"],
+            response_body["extensions"]["e2ee"]["device_one_time_keys_count"],
             {
                 # This is always present because of
                 # https://github.com/element-hq/element-android/issues/3725 and
@@ -4912,7 +5214,7 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
             },
         )
         self.assertEqual(
-            channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
+            response_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
             [],
         )
 
@@ -4924,40 +5226,32 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "e2ee": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         # Make an incremental Sliding Sync request with the e2ee extension enabled
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {},
-                "extensions": {
-                    "e2ee": {
-                        "enabled": True,
-                    }
-                },
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # Device list shows up for incremental syncs
         self.assertEqual(
-            channel.json_body["extensions"]["e2ee"]
-            .get("device_lists", {})
-            .get("changed"),
+            response_body["extensions"]["e2ee"].get("device_lists", {}).get("changed"),
             [],
         )
         self.assertEqual(
-            channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
+            response_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
             [],
         )
 
         # Both of these should be present even when empty
         self.assertEqual(
-            channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"],
+            response_body["extensions"]["e2ee"]["device_one_time_keys_count"],
             {
                 # Note that "signed_curve25519" is always returned in key count responses
                 # regardless of whether we uploaded any keys for it. This is necessary until
@@ -4970,7 +5264,7 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
             },
         )
         self.assertEqual(
-            channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
+            response_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
             [],
         )
 
@@ -4992,22 +5286,21 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
         self.helper.join(room_id, user1_id, tok=user1_tok)
         self.helper.join(room_id, user3_id, tok=user3_tok)
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "e2ee": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + "?timeout=10000"
-            + f"&pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {},
-                "extensions": {
-                    "e2ee": {
-                        "enabled": True,
-                    }
-                },
-            },
+            self.sync_endpoint + "?timeout=10000" + f"&pos={from_token}",
+            content=sync_body,
             access_token=user1_tok,
             await_result=False,
         )
@@ -5053,22 +5346,21 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "e2ee": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         # Make the Sliding Sync request
         channel = self.make_request(
             "POST",
-            self.sync_endpoint
-            + "?timeout=10000"
-            + f"&pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {},
-                "extensions": {
-                    "e2ee": {
-                        "enabled": True,
-                    }
-                },
-            },
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
             access_token=user1_tok,
             await_result=False,
         )
@@ -5077,7 +5369,9 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
             channel.await_result(timeout_ms=5000)
         # Wake-up `notifier.wait_for_events(...)` that will cause us test
         # `SlidingSyncResult.__bool__` for new results.
-        self._bump_notifier_wait_for_events(user1_id)
+        self._bump_notifier_wait_for_events(
+            user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
+        )
         # Block for a little bit more to ensure we don't see any new results.
         with self.assertRaises(TimedOutException):
             channel.await_result(timeout_ms=4000)
@@ -5138,7 +5432,15 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
         self.helper.join(room_id, user3_id, tok=user3_tok)
         self.helper.join(room_id, user4_id, tok=user4_tok)
 
-        from_token = self.event_sources.get_current_token()
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "e2ee": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
         # Have user3 update their device list
         channel = self.make_request(
@@ -5155,31 +5457,15 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
         self.helper.leave(room_id, user4_id, tok=user4_tok)
 
         # Make an incremental Sliding Sync request with the e2ee extension enabled
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint
-            + f"?pos={self.get_success(from_token.to_string(self.store))}",
-            {
-                "lists": {},
-                "extensions": {
-                    "e2ee": {
-                        "enabled": True,
-                    }
-                },
-            },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # Device list updates show up
         self.assertEqual(
-            channel.json_body["extensions"]["e2ee"]
-            .get("device_lists", {})
-            .get("changed"),
+            response_body["extensions"]["e2ee"].get("device_lists", {}).get("changed"),
             [user3_id],
         )
         self.assertEqual(
-            channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
+            response_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
             [user4_id],
         )
 
@@ -5221,24 +5507,19 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
         )
 
         # Make a Sliding Sync request with the e2ee extension enabled
-        channel = self.make_request(
-            "POST",
-            self.sync_endpoint,
-            {
-                "lists": {},
-                "extensions": {
-                    "e2ee": {
-                        "enabled": True,
-                    }
-                },
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "e2ee": {
+                    "enabled": True,
+                }
             },
-            access_token=user1_tok,
-        )
-        self.assertEqual(channel.code, 200, channel.json_body)
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # Check for those one time key counts
         self.assertEqual(
-            channel.json_body["extensions"]["e2ee"].get("device_one_time_keys_count"),
+            response_body["extensions"]["e2ee"].get("device_one_time_keys_count"),
             {
                 "alg1": 1,
                 "alg2": 2,
@@ -5282,25 +5563,747 @@ class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
         self.assertEqual(fallback_res, ["alg1"], fallback_res)
 
         # Make a Sliding Sync request with the e2ee extension enabled
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "e2ee": {
+                    "enabled": True,
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # Check for the unused fallback key types
+        self.assertListEqual(
+            response_body["extensions"]["e2ee"].get("device_unused_fallback_key_types"),
+            ["alg1"],
+        )
+
+
+class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
+    """Tests for the account_data sliding sync extension"""
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        sync.register_servlets,
+        sendtodevice.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.account_data_handler = hs.get_account_data_handler()
+
+    def test_no_data_initial_sync(self) -> None:
+        """
+        Test that enabling the account_data extension works during an intitial sync,
+        even if there is no-data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Make an initial Sliding Sync request with the account_data extension enabled
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        self.assertIncludes(
+            {
+                global_event["type"]
+                for global_event in response_body["extensions"]["account_data"].get(
+                    "global"
+                )
+            },
+            # Even though we don't have any global account data set, Synapse saves some
+            # default push rules for us.
+            {AccountDataTypes.PUSH_RULES},
+            exact=True,
+        )
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
+            exact=True,
+        )
+
+    def test_no_data_incremental_sync(self) -> None:
+        """
+        Test that enabling account_data extension works during an incremental sync, even
+        if there is no-data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make an incremental Sliding Sync request with the account_data extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        # There has been no account data changes since the `from_token` so we shouldn't
+        # see any account data here.
+        self.assertIncludes(
+            {
+                global_event["type"]
+                for global_event in response_body["extensions"]["account_data"].get(
+                    "global"
+                )
+            },
+            set(),
+            exact=True,
+        )
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
+            exact=True,
+        )
+
+    def test_global_account_data_initial_sync(self) -> None:
+        """
+        On initial sync, we should return all global account data on initial sync.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Update the global account data
+        self.get_success(
+            self.account_data_handler.add_account_data_for_user(
+                user_id=user1_id,
+                account_data_type="org.matrix.foobarbaz",
+                content={"foo": "bar"},
+            )
+        )
+
+        # Make an initial Sliding Sync request with the account_data extension enabled
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # It should show us all of the global account data
+        self.assertIncludes(
+            {
+                global_event["type"]
+                for global_event in response_body["extensions"]["account_data"].get(
+                    "global"
+                )
+            },
+            {AccountDataTypes.PUSH_RULES, "org.matrix.foobarbaz"},
+            exact=True,
+        )
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
+            exact=True,
+        )
+
+    def test_global_account_data_incremental_sync(self) -> None:
+        """
+        On incremental sync, we should only account data that has changed since the
+        `from_token`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Add some global account data
+        self.get_success(
+            self.account_data_handler.add_account_data_for_user(
+                user_id=user1_id,
+                account_data_type="org.matrix.foobarbaz",
+                content={"foo": "bar"},
+            )
+        )
+
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Add some other global account data
+        self.get_success(
+            self.account_data_handler.add_account_data_for_user(
+                user_id=user1_id,
+                account_data_type="org.matrix.doodardaz",
+                content={"doo": "dar"},
+            )
+        )
+
+        # Make an incremental Sliding Sync request with the account_data extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        self.assertIncludes(
+            {
+                global_event["type"]
+                for global_event in response_body["extensions"]["account_data"].get(
+                    "global"
+                )
+            },
+            # We should only see the new global account data that happened after the `from_token`
+            {"org.matrix.doodardaz"},
+            exact=True,
+        )
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
+            exact=True,
+        )
+
+    def test_room_account_data_initial_sync(self) -> None:
+        """
+        On initial sync, we return all account data for a given room but only for
+        rooms that we request and are being returned in the Sliding Sync response.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a room and add some room account data
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id1,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Create another room with some room account data
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id2,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Make an initial Sliding Sync request with the account_data extension enabled
+        sync_body = {
+            "lists": {},
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            },
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                    "rooms": [room_id1, room_id2],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
+        # Even though we requested room2, we only expect room1 to show up because that's
+        # the only room in the Sliding Sync response (room2 is not one of our room
+        # subscriptions or in a sliding window list).
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            {room_id1},
+            exact=True,
+        )
+        self.assertIncludes(
+            {
+                event["type"]
+                for event in response_body["extensions"]["account_data"]
+                .get("rooms")
+                .get(room_id1)
+            },
+            {"org.matrix.roorarraz"},
+            exact=True,
+        )
+
+    def test_room_account_data_incremental_sync(self) -> None:
+        """
+        On incremental sync, we return all account data for a given room but only for
+        rooms that we request and are being returned in the Sliding Sync response.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a room and add some room account data
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id1,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Create another room with some room account data
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id2,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        sync_body = {
+            "lists": {},
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            },
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                    "rooms": [room_id1, room_id2],
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Add some other room account data
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id1,
+                account_data_type="org.matrix.roorarraz2",
+                content={"roo": "rar"},
+            )
+        )
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id2,
+                account_data_type="org.matrix.roorarraz2",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Make an incremental Sliding Sync request with the account_data extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
+        # Even though we requested room2, we only expect room1 to show up because that's
+        # the only room in the Sliding Sync response (room2 is not one of our room
+        # subscriptions or in a sliding window list).
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            {room_id1},
+            exact=True,
+        )
+        # We should only see the new room account data that happened after the `from_token`
+        self.assertIncludes(
+            {
+                event["type"]
+                for event in response_body["extensions"]["account_data"]
+                .get("rooms")
+                .get(room_id1)
+            },
+            {"org.matrix.roorarraz2"},
+            exact=True,
+        )
+
+    def test_room_account_data_relevant_rooms(self) -> None:
+        """
+        Test out different variations of `lists`/`rooms` we are requesting account data for.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a room and add some room account data
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id1,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Create another room with some room account data
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id2,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Create another room with some room account data
+        room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id3,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Create another room with some room account data
+        room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id4,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Create another room with some room account data
+        room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id5,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        room_id_to_human_name_map = {
+            room_id1: "room1",
+            room_id2: "room2",
+            room_id3: "room3",
+            room_id4: "room4",
+            room_id5: "room5",
+        }
+
+        # Mix lists and rooms
+        sync_body = {
+            "lists": {
+                # We expect this list range to include room5 and room4
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+                # We expect this list range to include room5, room4, room3
+                "bar-list": {
+                    "ranges": [[0, 2]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+            },
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            },
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                    "lists": ["foo-list", "non-existent-list"],
+                    "rooms": [room_id1, room_id2, "!non-existent-room"],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ✅ Requested via `rooms` and a room subscription exists
+        # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
+        # room3: ❌ Not requested
+        # room4: ✅ Shows up because requested via `lists` and list exists in the response
+        # room5: ✅ Shows up because requested via `lists` and list exists in the response
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"]["account_data"]
+                .get("rooms")
+                .keys()
+            },
+            {"room1", "room4", "room5"},
+            exact=True,
+        )
+
+        # Try wildcards (this is the default)
+        sync_body = {
+            "lists": {
+                # We expect this list range to include room5 and room4
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+                # We expect this list range to include room5, room4, room3
+                "bar-list": {
+                    "ranges": [[0, 2]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+            },
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            },
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                    # "lists": ["*"],
+                    # "rooms": ["*"],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
+        # room2: ❌ Not requested
+        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"]["account_data"]
+                .get("rooms")
+                .keys()
+            },
+            {"room1", "room3", "room4", "room5"},
+            exact=True,
+        )
+
+        # Empty list will return nothing
+        sync_body = {
+            "lists": {
+                # We expect this list range to include room5 and room4
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+                # We expect this list range to include room5, room4, room3
+                "bar-list": {
+                    "ranges": [[0, 2]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+            },
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            },
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                    "lists": [],
+                    "rooms": [],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ❌ Not requested
+        # room4: ❌ Not requested
+        # room5: ❌ Not requested
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"]["account_data"]
+                .get("rooms")
+                .keys()
+            },
+            set(),
+            exact=True,
+        )
+
+        # Try wildcard and none
+        sync_body = {
+            "lists": {
+                # We expect this list range to include room5 and room4
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+                # We expect this list range to include room5, room4, room3
+                "bar-list": {
+                    "ranges": [[0, 2]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+            },
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            },
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                    "lists": ["*"],
+                    "rooms": [],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"]["account_data"]
+                .get("rooms")
+                .keys()
+            },
+            {"room3", "room4", "room5"},
+            exact=True,
+        )
+
+    def test_wait_for_new_data(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive.
+
+        (Only applies to incremental syncs with a `timeout` specified)
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id, user1_id, tok=user1_tok)
+
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make an incremental Sliding Sync request with the account_data extension enabled
         channel = self.make_request(
             "POST",
-            self.sync_endpoint,
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
+            access_token=user1_tok,
+            await_result=False,
+        )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Bump the global account data to trigger new results
+        self.get_success(
+            self.account_data_handler.add_account_data_for_user(
+                user1_id,
+                "org.matrix.foobarbaz",
+                {"foo": "bar"},
+            )
+        )
+        # Should respond before the 10 second timeout
+        channel.await_result(timeout_ms=3000)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We should see the global account data update
+        self.assertIncludes(
             {
-                "lists": {},
-                "extensions": {
-                    "e2ee": {
-                        "enabled": True,
-                    }
-                },
+                global_event["type"]
+                for global_event in channel.json_body["extensions"]["account_data"].get(
+                    "global"
+                )
             },
+            {"org.matrix.foobarbaz"},
+            exact=True,
+        )
+        self.assertIncludes(
+            channel.json_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
+            exact=True,
+        )
+
+    def test_wait_for_new_data_timeout(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive but
+        no data ever arrives so we timeout. We're also making sure that the default data
+        from the account_data extension doesn't trigger a false-positive for new data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
             access_token=user1_tok,
+            await_result=False,
         )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Wake-up `notifier.wait_for_events(...)` that will cause us test
+        # `SlidingSyncResult.__bool__` for new results.
+        self._bump_notifier_wait_for_events(
+            user1_id,
+            # We choose `StreamKeyType.PRESENCE` because we're testing for account data
+            # and don't want to contaminate the account data results using
+            # `StreamKeyType.ACCOUNT_DATA`.
+            wake_stream_key=StreamKeyType.PRESENCE,
+        )
+        # Block for a little bit more to ensure we don't see any new results.
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=4000)
+        # Wait for the sync to complete (wait for the rest of the 10 second timeout,
+        # 5000 + 4000 + 1200 > 10000)
+        channel.await_result(timeout_ms=1200)
         self.assertEqual(channel.code, 200, channel.json_body)
 
-        # Check for the unused fallback key types
-        self.assertListEqual(
-            channel.json_body["extensions"]["e2ee"].get(
-                "device_unused_fallback_key_types"
-            ),
-            ["alg1"],
+        self.assertIsNotNone(
+            channel.json_body["extensions"]["account_data"].get("global")
+        )
+        self.assertIsNotNone(
+            channel.json_body["extensions"]["account_data"].get("rooms")
         )