summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-08-08 11:03:20 +0100
committerErik Johnston <erik@matrix.org>2024-08-08 11:03:20 +0100
commit4aeace7e0dd4e38041d5cb601ec01407ac96883b (patch)
treec7ce70e802aa94a94ef395c6990831bc277ef992
parentUpdate log line (diff)
parentSSS: Implement PREVIOUSLY room tracking (#17535) (diff)
downloadsynapse-4aeace7e0dd4e38041d5cb601ec01407ac96883b.tar.xz
Merge remote-tracking branch 'origin/develop' into erikj/ss_hacks
-rw-r--r--CHANGES.md44
-rw-r--r--Cargo.lock12
-rw-r--r--changelog.d/17447.feature1
-rw-r--r--changelog.d/17450.bugfix1
-rw-r--r--changelog.d/17452.misc1
-rw-r--r--changelog.d/17476.doc1
-rw-r--r--changelog.d/17477.feature1
-rw-r--r--changelog.d/17478.misc1
-rw-r--r--changelog.d/17479.misc1
-rw-r--r--changelog.d/17481.misc1
-rw-r--r--changelog.d/17482.misc1
-rw-r--r--changelog.d/17489.feature1
-rw-r--r--changelog.d/17499.bugfix1
-rw-r--r--changelog.d/17501.misc1
-rw-r--r--changelog.d/17504.misc1
-rw-r--r--changelog.d/17505.feature1
-rw-r--r--changelog.d/17507.misc1
-rw-r--r--changelog.d/17510.bugfix1
-rw-r--r--changelog.d/17514.misc1
-rw-r--r--changelog.d/17515.doc3
-rw-r--r--changelog.d/17531.misc1
-rw-r--r--changelog.d/17535.bugfix1
-rw-r--r--debian/changelog6
-rw-r--r--docs/development/room-dag-concepts.md6
-rw-r--r--docs/usage/configuration/config_documentation.md4
-rw-r--r--poetry.lock66
-rw-r--r--pyproject.toml2
-rw-r--r--synapse/api/errors.py18
-rw-r--r--synapse/handlers/admin.py10
-rw-r--r--synapse/handlers/pagination.py32
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/sliding_sync.py696
-rw-r--r--synapse/handlers/sync.py69
-rw-r--r--synapse/rest/client/sync.py9
-rw-r--r--synapse/storage/databases/main/roommember.py2
-rw-r--r--synapse/storage/databases/main/state_deltas.py2
-rw-r--r--synapse/storage/databases/main/stream.py330
-rw-r--r--tests/rest/client/sliding_sync/test_connection_tracking.py72
-rw-r--r--tests/rest/client/sliding_sync/test_rooms_required_state.py28
-rw-r--r--tests/rest/client/sliding_sync/test_rooms_timeline.py124
-rw-r--r--tests/storage/test_stream.py2
41 files changed, 955 insertions, 603 deletions
diff --git a/CHANGES.md b/CHANGES.md
index b4fddc3e5c..c40c11f98c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,47 @@
+# Synapse 1.113.0rc1 (2024-08-06)
+
+### Features
+
+- Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17447](https://github.com/element-hq/synapse/issues/17447))
+- Add Account Data extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17477](https://github.com/element-hq/synapse/issues/17477))
+- Add receipts extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17489](https://github.com/element-hq/synapse/issues/17489))
+- Add typing notification extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17505](https://github.com/element-hq/synapse/issues/17505))
+
+### Bugfixes
+
+- Update experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint to handle invite/knock rooms when filtering. ([\#17450](https://github.com/element-hq/synapse/issues/17450))
+- Fix a bug introduced in v1.110.0 which caused `/keys/query` to return incomplete results, leading to high network activity and CPU usage on Matrix clients. ([\#17499](https://github.com/element-hq/synapse/issues/17499))
+
+### Improved Documentation
+
+- Update the [`allowed_local_3pids`](https://element-hq.github.io/synapse/v1.112/usage/configuration/config_documentation.html#allowed_local_3pids) config option's msisdn address to a working example. ([\#17476](https://github.com/element-hq/synapse/issues/17476))
+
+### Internal Changes
+
+- Change sliding sync to use their own token format in preparation for storing per-connection state. ([\#17452](https://github.com/element-hq/synapse/issues/17452))
+- Ensure we don't send down negative `bump_stamp` in experimental sliding sync endpoint. ([\#17478](https://github.com/element-hq/synapse/issues/17478))
+- Do not send down empty room entries down experimental sliding sync endpoint. ([\#17479](https://github.com/element-hq/synapse/issues/17479))
+- Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`. ([\#17481](https://github.com/element-hq/synapse/issues/17481), [\#17482](https://github.com/element-hq/synapse/issues/17482))
+- Add some opentracing tags and logging to the experimental sliding sync implementation. ([\#17501](https://github.com/element-hq/synapse/issues/17501))
+- Split and move Sliding Sync tests so we have some more sane test file sizes. ([\#17504](https://github.com/element-hq/synapse/issues/17504))
+- Update the `limited` field description in the Sliding Sync response to accurately describe what it actually represents. ([\#17507](https://github.com/element-hq/synapse/issues/17507))
+- Easier to understand `timeline` assertions in Sliding Sync tests. ([\#17511](https://github.com/element-hq/synapse/issues/17511))
+- Reset the sliding sync connection if we don't recognize the per-connection state position. ([\#17529](https://github.com/element-hq/synapse/issues/17529))
+
+
+
+### Updates to locked dependencies
+
+* Bump bcrypt from 4.1.3 to 4.2.0. ([\#17495](https://github.com/element-hq/synapse/issues/17495))
+* Bump black from 24.4.2 to 24.8.0. ([\#17522](https://github.com/element-hq/synapse/issues/17522))
+* Bump phonenumbers from 8.13.39 to 8.13.42. ([\#17521](https://github.com/element-hq/synapse/issues/17521))
+* Bump ruff from 0.5.4 to 0.5.5. ([\#17494](https://github.com/element-hq/synapse/issues/17494))
+* Bump serde_json from 1.0.120 to 1.0.121. ([\#17493](https://github.com/element-hq/synapse/issues/17493))
+* Bump serde_json from 1.0.121 to 1.0.122. ([\#17525](https://github.com/element-hq/synapse/issues/17525))
+* Bump towncrier from 23.11.0 to 24.7.1. ([\#17523](https://github.com/element-hq/synapse/issues/17523))
+* Bump types-pyopenssl from 24.1.0.20240425 to 24.1.0.20240722. ([\#17496](https://github.com/element-hq/synapse/issues/17496))
+* Bump types-setuptools from 70.1.0.20240627 to 71.1.0.20240726. ([\#17497](https://github.com/element-hq/synapse/issues/17497))
+
 # Synapse 1.112.0 (2024-07-30)
 
 This security release is to update our locked dependency on Twisted to 24.7.0rc1, which includes a security fix for [CVE-2024-41671 / GHSA-c8m8-j448-xjx7: Disordered HTTP pipeline response in twisted.web, again](https://github.com/twisted/twisted/security/advisories/GHSA-c8m8-j448-xjx7).
diff --git a/Cargo.lock b/Cargo.lock
index 333499e197..ce5520436d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -67,9 +67,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
 
 [[package]]
 name = "bytes"
-version = "1.6.1"
+version = "1.7.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952"
+checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50"
 
 [[package]]
 name = "cfg-if"
@@ -444,9 +444,9 @@ dependencies = [
 
 [[package]]
 name = "regex"
-version = "1.10.5"
+version = "1.10.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f"
+checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619"
 dependencies = [
  "aho-corasick",
  "memchr",
@@ -505,9 +505,9 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.121"
+version = "1.0.122"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609"
+checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da"
 dependencies = [
  "itoa",
  "memchr",
diff --git a/changelog.d/17447.feature b/changelog.d/17447.feature
deleted file mode 100644
index 6f80e298ae..0000000000
--- a/changelog.d/17447.feature
+++ /dev/null
@@ -1 +0,0 @@
-Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17450.bugfix b/changelog.d/17450.bugfix
deleted file mode 100644
index 01a521da38..0000000000
--- a/changelog.d/17450.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Update experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint to handle invite/knock rooms when filtering.
diff --git a/changelog.d/17452.misc b/changelog.d/17452.misc
deleted file mode 100644
index 4fd07f617b..0000000000
--- a/changelog.d/17452.misc
+++ /dev/null
@@ -1 +0,0 @@
-Change sliding sync to use their own token format in preparation for storing per-connection state.
diff --git a/changelog.d/17476.doc b/changelog.d/17476.doc
deleted file mode 100644
index 89d8d490bb..0000000000
--- a/changelog.d/17476.doc
+++ /dev/null
@@ -1 +0,0 @@
-Update the [`allowed_local_3pids`](https://element-hq.github.io/synapse/v1.112/usage/configuration/config_documentation.html#allowed_local_3pids) config option's msisdn address to a working example.
diff --git a/changelog.d/17477.feature b/changelog.d/17477.feature
deleted file mode 100644
index 9785a2ef7b..0000000000
--- a/changelog.d/17477.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add Account Data extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17478.misc b/changelog.d/17478.misc
deleted file mode 100644
index 5406c82742..0000000000
--- a/changelog.d/17478.misc
+++ /dev/null
@@ -1 +0,0 @@
-Ensure we don't send down negative `bump_stamp` in experimental sliding sync endpoint.
diff --git a/changelog.d/17479.misc b/changelog.d/17479.misc
deleted file mode 100644
index 4502f71662..0000000000
--- a/changelog.d/17479.misc
+++ /dev/null
@@ -1 +0,0 @@
-Do not send down empty room entries down experimental sliding sync endpoint.
diff --git a/changelog.d/17481.misc b/changelog.d/17481.misc
deleted file mode 100644
index ac55538424..0000000000
--- a/changelog.d/17481.misc
+++ /dev/null
@@ -1 +0,0 @@
-Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`.
diff --git a/changelog.d/17482.misc b/changelog.d/17482.misc
deleted file mode 100644
index ac55538424..0000000000
--- a/changelog.d/17482.misc
+++ /dev/null
@@ -1 +0,0 @@
-Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`.
diff --git a/changelog.d/17489.feature b/changelog.d/17489.feature
deleted file mode 100644
index 5ace1e675e..0000000000
--- a/changelog.d/17489.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add receipts extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17499.bugfix b/changelog.d/17499.bugfix
deleted file mode 100644
index 5cb7b3c30e..0000000000
--- a/changelog.d/17499.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a bug introduced in v1.110.0 which caused `/keys/query` to return incomplete results, leading to high network activity and CPU usage on Matrix clients.
diff --git a/changelog.d/17501.misc b/changelog.d/17501.misc
deleted file mode 100644
index ba96472acb..0000000000
--- a/changelog.d/17501.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add some opentracing tags and logging to the experimental sliding sync implementation.
diff --git a/changelog.d/17504.misc b/changelog.d/17504.misc
deleted file mode 100644
index 4ab892843d..0000000000
--- a/changelog.d/17504.misc
+++ /dev/null
@@ -1 +0,0 @@
-Split and move Sliding Sync tests so we have some more sane test file sizes.
diff --git a/changelog.d/17505.feature b/changelog.d/17505.feature
deleted file mode 100644
index ca0c2bd70f..0000000000
--- a/changelog.d/17505.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add typing notification extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17507.misc b/changelog.d/17507.misc
deleted file mode 100644
index 82c4d263be..0000000000
--- a/changelog.d/17507.misc
+++ /dev/null
@@ -1 +0,0 @@
-Update the `limited` field description in the Sliding Sync response to accurately describe what it actually represents.
diff --git a/changelog.d/17510.bugfix b/changelog.d/17510.bugfix
new file mode 100644
index 0000000000..3170c284bd
--- /dev/null
+++ b/changelog.d/17510.bugfix
@@ -0,0 +1 @@
+Fix timeline ordering (using `stream_ordering` instead of topological ordering) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17514.misc b/changelog.d/17514.misc
new file mode 100644
index 0000000000..fc3cc37915
--- /dev/null
+++ b/changelog.d/17514.misc
@@ -0,0 +1 @@
+Add more tracing to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/changelog.d/17515.doc b/changelog.d/17515.doc
new file mode 100644
index 0000000000..c2dbe24e9d
--- /dev/null
+++ b/changelog.d/17515.doc
@@ -0,0 +1,3 @@
+Clarify default behaviour of the
+[`auto_accept_invites.worker_to_run_on`](https://element-hq.github.io/synapse/develop/usage/configuration/config_documentation.html#auto-accept-invites)
+option.
\ No newline at end of file
diff --git a/changelog.d/17531.misc b/changelog.d/17531.misc
new file mode 100644
index 0000000000..25b7b36a72
--- /dev/null
+++ b/changelog.d/17531.misc
@@ -0,0 +1 @@
+Fixup comment in sliding sync implementation.
diff --git a/changelog.d/17535.bugfix b/changelog.d/17535.bugfix
new file mode 100644
index 0000000000..c5b5da0485
--- /dev/null
+++ b/changelog.d/17535.bugfix
@@ -0,0 +1 @@
+Fix experimental sliding sync implementation to remember any updates in rooms that were not sent down immediately.
diff --git a/debian/changelog b/debian/changelog
index e35750a35f..e89fdb98dc 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,9 @@
+matrix-synapse-py3 (1.113.0~rc1) stable; urgency=medium
+
+  * New Synapse release 1.113.0rc1.
+
+ -- Synapse Packaging team <packages@matrix.org>  Tue, 06 Aug 2024 12:23:23 +0100
+
 matrix-synapse-py3 (1.112.0) stable; urgency=medium
 
   * New Synapse release 1.112.0.
diff --git a/docs/development/room-dag-concepts.md b/docs/development/room-dag-concepts.md
index 76709487f8..35b667831c 100644
--- a/docs/development/room-dag-concepts.md
+++ b/docs/development/room-dag-concepts.md
@@ -21,8 +21,10 @@ incrementing integer, but backfilled events start with `stream_ordering=-1` and
 
 ---
 
- - `/sync` returns things in the order they arrive at the server (`stream_ordering`).
- - `/messages` (and `/backfill` in the federation API) return them in the order determined by the event graph `(topological_ordering, stream_ordering)`.
+ - Incremental `/sync?since=xxx` returns things in the order they arrive at the server
+   (`stream_ordering`).
+ - Initial `/sync`, `/messages` (and `/backfill` in the federation API) return them in
+   the order determined by the event graph `(topological_ordering, stream_ordering)`.
 
 The general idea is that, if you're following a room in real-time (i.e.
 `/sync`), you probably want to see the messages as they arrive at your server,
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 40f64be856..567bbf88d2 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -4685,7 +4685,9 @@ This setting has the following sub-options:
 * `only_for_direct_messages`: Whether invites should be automatically accepted for all room types, or only
    for direct messages. Defaults to false.
 * `only_from_local_users`: Whether to only automatically accept invites from users on this homeserver. Defaults to false.
-* `worker_to_run_on`: Which worker to run this module on. This must match the "worker_name".
+* `worker_to_run_on`: Which worker to run this module on. This must match 
+  the "worker_name". If not set or `null`, invites will be accepted on the
+  main process.
 
 NOTE: Care should be taken not to enable this setting if the `synapse_auto_accept_invite` module is enabled and installed.
 The two modules will compete to perform the same task and may result in undesired behaviour. For example, multiple join
diff --git a/poetry.lock b/poetry.lock
index 7d8334515a..278bd6cb6e 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1,4 +1,4 @@
-# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand.
+# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand.
 
 [[package]]
 name = "annotated-types"
@@ -107,33 +107,33 @@ typecheck = ["mypy"]
 
 [[package]]
 name = "black"
-version = "24.4.2"
+version = "24.8.0"
 description = "The uncompromising code formatter."
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "black-24.4.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:dd1b5a14e417189db4c7b64a6540f31730713d173f0b63e55fabd52d61d8fdce"},
-    {file = "black-24.4.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8e537d281831ad0e71007dcdcbe50a71470b978c453fa41ce77186bbe0ed6021"},
-    {file = "black-24.4.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eaea3008c281f1038edb473c1aa8ed8143a5535ff18f978a318f10302b254063"},
-    {file = "black-24.4.2-cp310-cp310-win_amd64.whl", hash = "sha256:7768a0dbf16a39aa5e9a3ded568bb545c8c2727396d063bbaf847df05b08cd96"},
-    {file = "black-24.4.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:257d724c2c9b1660f353b36c802ccece186a30accc7742c176d29c146df6e474"},
-    {file = "black-24.4.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:bdde6f877a18f24844e381d45e9947a49e97933573ac9d4345399be37621e26c"},
-    {file = "black-24.4.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e151054aa00bad1f4e1f04919542885f89f5f7d086b8a59e5000e6c616896ffb"},
-    {file = "black-24.4.2-cp311-cp311-win_amd64.whl", hash = "sha256:7e122b1c4fb252fd85df3ca93578732b4749d9be076593076ef4d07a0233c3e1"},
-    {file = "black-24.4.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:accf49e151c8ed2c0cdc528691838afd217c50412534e876a19270fea1e28e2d"},
-    {file = "black-24.4.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:88c57dc656038f1ab9f92b3eb5335ee9b021412feaa46330d5eba4e51fe49b04"},
-    {file = "black-24.4.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:be8bef99eb46d5021bf053114442914baeb3649a89dc5f3a555c88737e5e98fc"},
-    {file = "black-24.4.2-cp312-cp312-win_amd64.whl", hash = "sha256:415e686e87dbbe6f4cd5ef0fbf764af7b89f9057b97c908742b6008cc554b9c0"},
-    {file = "black-24.4.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:bf10f7310db693bb62692609b397e8d67257c55f949abde4c67f9cc574492cc7"},
-    {file = "black-24.4.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:98e123f1d5cfd42f886624d84464f7756f60ff6eab89ae845210631714f6db94"},
-    {file = "black-24.4.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:48a85f2cb5e6799a9ef05347b476cce6c182d6c71ee36925a6c194d074336ef8"},
-    {file = "black-24.4.2-cp38-cp38-win_amd64.whl", hash = "sha256:b1530ae42e9d6d5b670a34db49a94115a64596bc77710b1d05e9801e62ca0a7c"},
-    {file = "black-24.4.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:37aae07b029fa0174d39daf02748b379399b909652a806e5708199bd93899da1"},
-    {file = "black-24.4.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:da33a1a5e49c4122ccdfd56cd021ff1ebc4a1ec4e2d01594fef9b6f267a9e741"},
-    {file = "black-24.4.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ef703f83fc32e131e9bcc0a5094cfe85599e7109f896fe8bc96cc402f3eb4b6e"},
-    {file = "black-24.4.2-cp39-cp39-win_amd64.whl", hash = "sha256:b9176b9832e84308818a99a561e90aa479e73c523b3f77afd07913380ae2eab7"},
-    {file = "black-24.4.2-py3-none-any.whl", hash = "sha256:d36ed1124bb81b32f8614555b34cc4259c3fbc7eec17870e8ff8ded335b58d8c"},
-    {file = "black-24.4.2.tar.gz", hash = "sha256:c872b53057f000085da66a19c55d68f6f8ddcac2642392ad3a355878406fbd4d"},
+    {file = "black-24.8.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:09cdeb74d494ec023ded657f7092ba518e8cf78fa8386155e4a03fdcc44679e6"},
+    {file = "black-24.8.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:81c6742da39f33b08e791da38410f32e27d632260e599df7245cccee2064afeb"},
+    {file = "black-24.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:707a1ca89221bc8a1a64fb5e15ef39cd755633daa672a9db7498d1c19de66a42"},
+    {file = "black-24.8.0-cp310-cp310-win_amd64.whl", hash = "sha256:d6417535d99c37cee4091a2f24eb2b6d5ec42b144d50f1f2e436d9fe1916fe1a"},
+    {file = "black-24.8.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:fb6e2c0b86bbd43dee042e48059c9ad7830abd5c94b0bc518c0eeec57c3eddc1"},
+    {file = "black-24.8.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:837fd281f1908d0076844bc2b801ad2d369c78c45cf800cad7b61686051041af"},
+    {file = "black-24.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:62e8730977f0b77998029da7971fa896ceefa2c4c4933fcd593fa599ecbf97a4"},
+    {file = "black-24.8.0-cp311-cp311-win_amd64.whl", hash = "sha256:72901b4913cbac8972ad911dc4098d5753704d1f3c56e44ae8dce99eecb0e3af"},
+    {file = "black-24.8.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:7c046c1d1eeb7aea9335da62472481d3bbf3fd986e093cffd35f4385c94ae368"},
+    {file = "black-24.8.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:649f6d84ccbae73ab767e206772cc2d7a393a001070a4c814a546afd0d423aed"},
+    {file = "black-24.8.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2b59b250fdba5f9a9cd9d0ece6e6d993d91ce877d121d161e4698af3eb9c1018"},
+    {file = "black-24.8.0-cp312-cp312-win_amd64.whl", hash = "sha256:6e55d30d44bed36593c3163b9bc63bf58b3b30e4611e4d88a0c3c239930ed5b2"},
+    {file = "black-24.8.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:505289f17ceda596658ae81b61ebbe2d9b25aa78067035184ed0a9d855d18afd"},
+    {file = "black-24.8.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:b19c9ad992c7883ad84c9b22aaa73562a16b819c1d8db7a1a1a49fb7ec13c7d2"},
+    {file = "black-24.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1f13f7f386f86f8121d76599114bb8c17b69d962137fc70efe56137727c7047e"},
+    {file = "black-24.8.0-cp38-cp38-win_amd64.whl", hash = "sha256:f490dbd59680d809ca31efdae20e634f3fae27fba3ce0ba3208333b713bc3920"},
+    {file = "black-24.8.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:eab4dd44ce80dea27dc69db40dab62d4ca96112f87996bca68cd75639aeb2e4c"},
+    {file = "black-24.8.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3c4285573d4897a7610054af5a890bde7c65cb466040c5f0c8b732812d7f0e5e"},
+    {file = "black-24.8.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9e84e33b37be070ba135176c123ae52a51f82306def9f7d063ee302ecab2cf47"},
+    {file = "black-24.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:73bbf84ed136e45d451a260c6b73ed674652f90a2b3211d6a35e78054563a9bb"},
+    {file = "black-24.8.0-py3-none-any.whl", hash = "sha256:972085c618ee94f402da1af548a4f218c754ea7e5dc70acb168bfaca4c2542ed"},
+    {file = "black-24.8.0.tar.gz", hash = "sha256:2500945420b6784c38b9ee885af039f5e7471ef284ab03fa35ecdde4688cd83f"},
 ]
 
 [package.dependencies]
@@ -1516,13 +1516,13 @@ files = [
 
 [[package]]
 name = "phonenumbers"
-version = "8.13.39"
+version = "8.13.42"
 description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers."
 optional = false
 python-versions = "*"
 files = [
-    {file = "phonenumbers-8.13.39-py2.py3-none-any.whl", hash = "sha256:3ad2d086fa71e7eef409001b9195ac54bebb0c6e3e752209b558ca192c9229a0"},
-    {file = "phonenumbers-8.13.39.tar.gz", hash = "sha256:db7ca4970d206b2056231105300753b1a5b229f43416f8c2b3010e63fbb68d77"},
+    {file = "phonenumbers-8.13.42-py2.py3-none-any.whl", hash = "sha256:18acc22ee03116d27b26e990f53806a1770a3e05f05e1620bc09ad187f889456"},
+    {file = "phonenumbers-8.13.42.tar.gz", hash = "sha256:7137904f2db3b991701e853174ce8e1cb8f540b8bfdf27617540de04c0b7bed5"},
 ]
 
 [[package]]
@@ -2649,24 +2649,24 @@ files = [
 
 [[package]]
 name = "towncrier"
-version = "23.11.0"
+version = "24.7.1"
 description = "Building newsfiles for your project."
 optional = false
 python-versions = ">=3.8"
 files = [
-    {file = "towncrier-23.11.0-py3-none-any.whl", hash = "sha256:2e519ca619426d189e3c98c99558fe8be50c9ced13ea1fc20a4a353a95d2ded7"},
-    {file = "towncrier-23.11.0.tar.gz", hash = "sha256:13937c247e3f8ae20ac44d895cf5f96a60ad46cfdcc1671759530d7837d9ee5d"},
+    {file = "towncrier-24.7.1-py3-none-any.whl", hash = "sha256:685e2a94335b5dc47537b4d3b449a25b18571ea85b07dcf6e8df31ba40f692dd"},
+    {file = "towncrier-24.7.1.tar.gz", hash = "sha256:57a057faedabcadf1a62f6f9bad726ae566c1f31a411338ddb8316993f583b3d"},
 ]
 
 [package.dependencies]
 click = "*"
+importlib-metadata = {version = ">=4.6", markers = "python_version < \"3.10\""}
 importlib-resources = {version = ">=5", markers = "python_version < \"3.10\""}
-incremental = "*"
 jinja2 = "*"
 tomli = {version = "*", markers = "python_version < \"3.11\""}
 
 [package.extras]
-dev = ["furo", "packaging", "sphinx (>=5)", "twisted"]
+dev = ["furo (>=2024.05.06)", "nox", "packaging", "sphinx (>=5)", "twisted"]
 
 [[package]]
 name = "treq"
@@ -3196,4 +3196,4 @@ user-search = ["pyicu"]
 [metadata]
 lock-version = "2.0"
 python-versions = "^3.8.0"
-content-hash = "5f458ce53b7469844af2e0c5a9c5ef720736de5f080c4eb8d3a0e60286424f44"
+content-hash = "c165cdc1f6612c9f1b5bfd8063c23e2d595d717dd8ac1a468519e902be2cdf93"
diff --git a/pyproject.toml b/pyproject.toml
index c8373c6dbc..c29d1534fb 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
 
 [tool.poetry]
 name = "matrix-synapse"
-version = "1.112.0"
+version = "1.113.0rc1"
 description = "Homeserver for the Matrix decentralised comms protocol"
 authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
 license = "AGPL-3.0-or-later"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index dd4a1ae706..e6efa7a424 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -128,6 +128,10 @@ class Codes(str, Enum):
     # MSC2677
     DUPLICATE_ANNOTATION = "M_DUPLICATE_ANNOTATION"
 
+    # MSC3575 we are telling the client they need to expire their sliding sync
+    # connection.
+    UNKNOWN_POS = "M_UNKNOWN_POS"
+
 
 class CodeMessageException(RuntimeError):
     """An exception with integer code, a message string attributes and optional headers.
@@ -847,3 +851,17 @@ class PartialStateConflictError(SynapseError):
             msg=PartialStateConflictError.message(),
             errcode=Codes.UNKNOWN,
         )
+
+
+class SlidingSyncUnknownPosition(SynapseError):
+    """An error that Synapse can return to signal to the client to expire their
+    sliding sync connection (i.e. send a new request without a `?since=`
+    param).
+    """
+
+    def __init__(self) -> None:
+        super().__init__(
+            HTTPStatus.BAD_REQUEST,
+            msg="Unknown position",
+            errcode=Codes.UNKNOWN_POS,
+        )
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index ec35784c5f..b44e862493 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -197,8 +197,14 @@ class AdminHandler:
             # events that we have and then filtering, this isn't the most
             # efficient method perhaps but it does guarantee we get everything.
             while True:
-                events, _ = await self._store.paginate_room_events(
-                    room_id, from_key, to_key, limit=100, direction=Direction.FORWARDS
+                events, _ = (
+                    await self._store.paginate_room_events_by_topological_ordering(
+                        room_id=room_id,
+                        from_key=from_key,
+                        to_key=to_key,
+                        limit=100,
+                        direction=Direction.FORWARDS,
+                    )
                 )
                 if not events:
                     break
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 872c85fbad..6fd7afa280 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -507,13 +507,15 @@ class PaginationHandler:
 
         # Initially fetch the events from the database. With any luck, we can return
         # these without blocking on backfill (handled below).
-        events, next_key = await self.store.paginate_room_events(
-            room_id=room_id,
-            from_key=from_token.room_key,
-            to_key=to_room_key,
-            direction=pagin_config.direction,
-            limit=pagin_config.limit,
-            event_filter=event_filter,
+        events, next_key = (
+            await self.store.paginate_room_events_by_topological_ordering(
+                room_id=room_id,
+                from_key=from_token.room_key,
+                to_key=to_room_key,
+                direction=pagin_config.direction,
+                limit=pagin_config.limit,
+                event_filter=event_filter,
+            )
         )
 
         if pagin_config.direction == Direction.BACKWARDS:
@@ -582,13 +584,15 @@ class PaginationHandler:
                 # If we did backfill something, refetch the events from the database to
                 # catch anything new that might have been added since we last fetched.
                 if did_backfill:
-                    events, next_key = await self.store.paginate_room_events(
-                        room_id=room_id,
-                        from_key=from_token.room_key,
-                        to_key=to_room_key,
-                        direction=pagin_config.direction,
-                        limit=pagin_config.limit,
-                        event_filter=event_filter,
+                    events, next_key = (
+                        await self.store.paginate_room_events_by_topological_ordering(
+                            room_id=room_id,
+                            from_key=from_token.room_key,
+                            to_key=to_room_key,
+                            direction=pagin_config.direction,
+                            limit=pagin_config.limit,
+                            event_filter=event_filter,
+                        )
                     )
             else:
                 # Otherwise, we can backfill in the background for eventual
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 262d9f4044..2c6e672ede 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1750,7 +1750,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
                 from_key=from_key,
                 to_key=to_key,
                 limit=limit or 10,
-                order="ASC",
+                direction=Direction.FORWARDS,
             )
 
             events = list(room_events)
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 85e1364c97..a702922288 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -47,16 +47,27 @@ from synapse.api.constants import (
     EventTypes,
     Membership,
 )
+from synapse.api.errors import SlidingSyncUnknownPosition
 from synapse.events import EventBase, StrippedStateEvent
 from synapse.events.utils import parse_stripped_state_event, strip_event
 from synapse.handlers.relations import BundledAggregations
-from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
+from synapse.logging.opentracing import (
+    SynapseTags,
+    log_kv,
+    set_tag,
+    start_active_span,
+    tag_args,
+    trace,
+)
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.databases.main.state import (
     ROOM_UNKNOWN_SENTINEL,
     Sentinel as StateSentinel,
 )
-from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.storage.databases.main.stream import (
+    CurrentStateDeltaMembership,
+    PaginateFunction,
+)
 from synapse.storage.roommember import MemberSummary
 from synapse.types import (
     DeviceListUpdates,
@@ -491,6 +502,22 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        if from_token:
+            # Check that we recognize the connection position, if not tell the
+            # clients that they need to start again.
+            #
+            # If we don't do this and the client asks for the full range of
+            # rooms, we end up sending down all rooms and their state from
+            # scratch (which can be very slow). By expiring the connection we
+            # allow the client a chance to do an initial request with a smaller
+            # range of rooms to get them some results sooner but will end up
+            # taking the same amount of time (more with round-trips and
+            # re-processing) in the end to get everything again.
+            if not await self.connection_store.is_valid_token(
+                sync_config, from_token.connection_position
+            ):
+                raise SlidingSyncUnknownPosition()
+
         await self.connection_store.mark_token_seen(
             sync_config=sync_config,
             from_token=from_token,
@@ -516,131 +543,153 @@ class SlidingSyncHandler:
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
         # Keep track of the rooms that we can display and need to fetch more info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
+        # The set of room IDs of all rooms that could appear in any list. These
+        # include rooms that are outside the list ranges.
+        all_rooms: Set[str] = set()
         if has_lists and sync_config.lists is not None:
-            sync_room_map = await self.filter_rooms_relevant_for_sync(
-                user=sync_config.user,
-                room_membership_for_user_map=room_membership_for_user_map,
-            )
+            with start_active_span("assemble_sliding_window_lists"):
+                sync_room_map = await self.filter_rooms_relevant_for_sync(
+                    user=sync_config.user,
+                    room_membership_for_user_map=room_membership_for_user_map,
+                )
 
-            for list_key, list_config in sync_config.lists.items():
-                # Apply filters
-                filtered_sync_room_map = sync_room_map
+                for list_key, list_config in sync_config.lists.items():
+                    # Apply filters
+                    filtered_sync_room_map = sync_room_map
+                    if list_config.filters is not None:
+                        filtered_sync_room_map = await self.filter_rooms(
+                            sync_config.user,
+                            sync_room_map,
+                            list_config.filters,
+                            to_token,
+                        )
 
-                if list_config.filters:
+                    # Find which rooms are partially stated and may need to be filtered out
+                    # depending on the `required_state` requested (see below).
+                    partial_state_room_map = (
+                        await self.store.is_partial_state_room_batched(
+                            filtered_sync_room_map.keys()
+                        )
+                    )
 
-                    filtered_sync_room_map = await self.filter_rooms(
-                        sync_config.user,
-                        filtered_sync_room_map,
-                        list_config.filters,
-                        to_token,
+                    # Since creating the `RoomSyncConfig` takes some work, let's just do it
+                    # once and make a copy whenever we need it.
+                    room_sync_config = RoomSyncConfig.from_room_config(list_config)
+                    membership_state_keys = room_sync_config.required_state_map.get(
+                        EventTypes.Member
+                    )
+                    # Also see `StateFilter.must_await_full_state(...)` for comparison
+                    lazy_loading = (
+                        membership_state_keys is not None
+                        and StateValues.LAZY in membership_state_keys
                     )
 
-                # Sort the list
-                sorted_room_info = await self.sort_rooms(
-                    filtered_sync_room_map, to_token
-                )
+                    if not lazy_loading:
+                        # Exclude partially-stated rooms unless the `required_state`
+                        # only has `["m.room.member", "$LAZY"]` for membership
+                        # (lazy-loading room members).
+                        filtered_sync_room_map = {
+                            room_id: room
+                            for room_id, room in filtered_sync_room_map.items()
+                            if not partial_state_room_map.get(room_id)
+                        }
 
-                # Find which rooms are partially stated and may need to be filtered out
-                # depending on the `required_state` requested (see below).
-                partial_state_room_map = await self.store.is_partial_state_room_batched(
-                    filtered_sync_room_map.keys()
-                )
+                    all_rooms.update(filtered_sync_room_map)
 
-                # Since creating the `RoomSyncConfig` takes some work, let's just do it
-                # once and make a copy whenever we need it.
-                room_sync_config = RoomSyncConfig.from_room_config(list_config)
-                membership_state_keys = room_sync_config.required_state_map.get(
-                    EventTypes.Member
-                )
-                # Also see `StateFilter.must_await_full_state(...)` for comparison
-                lazy_loading = (
-                    membership_state_keys is not None
-                    and StateValues.LAZY in membership_state_keys
-                )
+                    # Sort the list
+                    sorted_room_info = await self.sort_rooms(
+                        filtered_sync_room_map, to_token
+                    )
+
+                    ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
+                    if list_config.ranges:
+                        for range in list_config.ranges:
+                            room_ids_in_list: List[str] = []
 
-                ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
-                if list_config.ranges:
-                    for range in list_config.ranges:
-                        room_ids_in_list: List[str] = []
-
-                        # We're going to loop through the sorted list of rooms starting
-                        # at the range start index and keep adding rooms until we fill
-                        # up the range or run out of rooms.
-                        #
-                        # Both sides of range are inclusive so we `+ 1`
-                        max_num_rooms = range[1] - range[0] + 1
-                        for room_membership in sorted_room_info[range[0] :]:
-                            room_id = room_membership.room_id
-
-                            if len(room_ids_in_list) >= max_num_rooms:
-                                break
-
-                            # Exclude partially-stated rooms unless the `required_state`
-                            # only has `["m.room.member", "$LAZY"]` for membership
-                            # (lazy-loading room members).
-                            if partial_state_room_map.get(room_id) and not lazy_loading:
-                                continue
-
-                            # Take the superset of the `RoomSyncConfig` for each room.
+                            # We're going to loop through the sorted list of rooms starting
+                            # at the range start index and keep adding rooms until we fill
+                            # up the range or run out of rooms.
                             #
-                            # Update our `relevant_room_map` with the room we're going
-                            # to display and need to fetch more info about.
-                            existing_room_sync_config = relevant_room_map.get(room_id)
-                            if existing_room_sync_config is not None:
-                                existing_room_sync_config.combine_room_sync_config(
-                                    room_sync_config
+                            # Both sides of range are inclusive so we `+ 1`
+                            max_num_rooms = range[1] - range[0] + 1
+                            for room_membership in sorted_room_info[range[0] :]:
+                                room_id = room_membership.room_id
+
+                                if len(room_ids_in_list) >= max_num_rooms:
+                                    break
+
+                                # Take the superset of the `RoomSyncConfig` for each room.
+                                #
+                                # Update our `relevant_room_map` with the room we're going
+                                # to display and need to fetch more info about.
+                                existing_room_sync_config = relevant_room_map.get(
+                                    room_id
                                 )
-                            else:
-                                # Make a copy so if we modify it later, it doesn't
-                                # affect all references.
-                                relevant_room_map[room_id] = (
-                                    room_sync_config.deep_copy()
+                                if existing_room_sync_config is not None:
+                                    existing_room_sync_config.combine_room_sync_config(
+                                        room_sync_config
+                                    )
+                                else:
+                                    # Make a copy so if we modify it later, it doesn't
+                                    # affect all references.
+                                    relevant_room_map[room_id] = (
+                                        room_sync_config.deep_copy()
+                                    )
+
+                                room_ids_in_list.append(room_id)
+
+                            ops.append(
+                                SlidingSyncResult.SlidingWindowList.Operation(
+                                    op=OperationType.SYNC,
+                                    range=range,
+                                    room_ids=room_ids_in_list,
                                 )
-
-                            room_ids_in_list.append(room_id)
-
-                        ops.append(
-                            SlidingSyncResult.SlidingWindowList.Operation(
-                                op=OperationType.SYNC,
-                                range=range,
-                                room_ids=room_ids_in_list,
                             )
-                        )
 
-                lists[list_key] = SlidingSyncResult.SlidingWindowList(
-                    count=len(sorted_room_info),
-                    ops=ops,
-                )
+                    lists[list_key] = SlidingSyncResult.SlidingWindowList(
+                        count=len(sorted_room_info),
+                        ops=ops,
+                    )
 
         # Handle room subscriptions
         if has_room_subscriptions and sync_config.room_subscriptions is not None:
-            for room_id, room_subscription in sync_config.room_subscriptions.items():
-                room_membership_for_user_at_to_token = (
-                    await self.check_room_subscription_allowed_for_user(
-                        room_id=room_id,
-                        room_membership_for_user_map=room_membership_for_user_map,
-                        to_token=to_token,
+            with start_active_span("assemble_room_subscriptions"):
+                for (
+                    room_id,
+                    room_subscription,
+                ) in sync_config.room_subscriptions.items():
+                    room_membership_for_user_at_to_token = (
+                        await self.check_room_subscription_allowed_for_user(
+                            room_id=room_id,
+                            room_membership_for_user_map=room_membership_for_user_map,
+                            to_token=to_token,
+                        )
                     )
-                )
 
-                # Skip this room if the user isn't allowed to see it
-                if not room_membership_for_user_at_to_token:
-                    continue
+                    # Skip this room if the user isn't allowed to see it
+                    if not room_membership_for_user_at_to_token:
+                        continue
 
-                room_membership_for_user_map[room_id] = (
-                    room_membership_for_user_at_to_token
-                )
+                    all_rooms.add(room_id)
 
-                # Take the superset of the `RoomSyncConfig` for each room.
-                #
-                # Update our `relevant_room_map` with the room we're going to display
-                # and need to fetch more info about.
-                room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
-                existing_room_sync_config = relevant_room_map.get(room_id)
-                if existing_room_sync_config is not None:
-                    existing_room_sync_config.combine_room_sync_config(room_sync_config)
-                else:
-                    relevant_room_map[room_id] = room_sync_config
+                    room_membership_for_user_map[room_id] = (
+                        room_membership_for_user_at_to_token
+                    )
+
+                    # Take the superset of the `RoomSyncConfig` for each room.
+                    #
+                    # Update our `relevant_room_map` with the room we're going to display
+                    # and need to fetch more info about.
+                    room_sync_config = RoomSyncConfig.from_room_config(
+                        room_subscription
+                    )
+                    existing_room_sync_config = relevant_room_map.get(room_id)
+                    if existing_room_sync_config is not None:
+                        existing_room_sync_config.combine_room_sync_config(
+                            room_sync_config
+                        )
+                    else:
+                        relevant_room_map[room_id] = room_sync_config
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -649,55 +698,49 @@ class SlidingSyncHandler:
         # previously.
         # Keep track of the rooms that we're going to display and need to fetch more info about
         relevant_rooms_to_send_map = relevant_room_map
-        if from_token:
-            rooms_should_send = set()
-
-            # First we check if there are rooms that match a list/room
-            # subscription and have updates we need to send (i.e. either because
-            # we haven't sent the room down, or we have but there are missing
-            # updates).
-            for room_id in relevant_room_map:
-                status = await self.connection_store.have_sent_room(
-                    sync_config,
-                    from_token.connection_position,
-                    room_id,
-                )
-                if (
-                    # The room was never sent down before so the client needs to know
-                    # about it regardless of any updates.
-                    status.status == HaveSentRoomFlag.NEVER
-                    # `PREVIOUSLY` literally means the "room was sent down before *AND*
-                    # there are updates we haven't sent down" so we already know this
-                    # room has updates.
-                    or status.status == HaveSentRoomFlag.PREVIOUSLY
-                ):
-                    rooms_should_send.add(room_id)
-                elif status.status == HaveSentRoomFlag.LIVE:
-                    # We know that we've sent all updates up until `from_token`,
-                    # so we just need to check if there have been updates since
-                    # then.
-                    pass
-                else:
-                    assert_never(status.status)
+        with start_active_span("filter_relevant_rooms_to_send"):
+            if from_token:
+                rooms_should_send = set()
+
+                # First we check if there are rooms that match a list/room
+                # subscription and have updates we need to send (i.e. either because
+                # we haven't sent the room down, or we have but there are missing
+                # updates).
+                for room_id in relevant_room_map:
+                    status = await self.connection_store.have_sent_room(
+                        sync_config,
+                        from_token.connection_position,
+                        room_id,
+                    )
+                    if (
+                        # The room was never sent down before so the client needs to know
+                        # about it regardless of any updates.
+                        status.status == HaveSentRoomFlag.NEVER
+                        # `PREVIOUSLY` literally means the "room was sent down before *AND*
+                        # there are updates we haven't sent down" so we already know this
+                        # room has updates.
+                        or status.status == HaveSentRoomFlag.PREVIOUSLY
+                    ):
+                        rooms_should_send.add(room_id)
+                    elif status.status == HaveSentRoomFlag.LIVE:
+                        # We know that we've sent all updates up until `from_token`,
+                        # so we just need to check if there have been updates since
+                        # then.
+                        pass
+                    else:
+                        assert_never(status.status)
 
-                if status.timeline_limit is not None and (
-                    status.timeline_limit < relevant_room_map[room_id].timeline_limit
-                ):
-                    # If the timeline limit has increased we want to send down
-                    # more historic events (even if nothing has since changed).
-                    rooms_should_send.add(room_id)
-
-            # We only need to check for new events since any state changes
-            # will also come down as new events.
-            rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
-                relevant_room_map.keys(), from_token.stream_token.room_key
-            )
-            rooms_should_send.update(rooms_that_have_updates)
-            relevant_rooms_to_send_map = {
-                room_id: room_sync_config
-                for room_id, room_sync_config in relevant_room_map.items()
-                if room_id in rooms_should_send
-            }
+                # We only need to check for new events since any state changes
+                # will also come down as new events.
+                rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
+                    relevant_room_map.keys(), from_token.stream_token.room_key
+                )
+                rooms_should_send.update(rooms_that_have_updates)
+                relevant_rooms_to_send_map = {
+                    room_id: room_sync_config
+                    for room_id, room_sync_config in relevant_room_map.items()
+                    if room_id in rooms_should_send
+                }
 
         @trace
         @tag_args
@@ -736,13 +779,41 @@ class SlidingSyncHandler:
         )
 
         if has_lists or has_room_subscriptions:
+            # We now calculate if any rooms outside the range have had updates,
+            # which we are not sending down.
+            #
+            # We *must* record rooms that have had updates, but it is also fine
+            # to record rooms as having updates even if there might not actually
+            # be anything new for the user (e.g. due to event filters, events
+            # having happened after the user left, etc).
+            unsent_room_ids = []
+            if from_token:
+                # The set of rooms that the client (may) care about, but aren't
+                # in any list range (or subscribed to).
+                missing_rooms = all_rooms - relevant_room_map.keys()
+
+                # We now just go and try fetching any events in the above rooms
+                # to see if anything has happened since the `from_token`.
+                #
+                # TODO: Replace this with something faster. When we land the
+                # sliding sync tables that record the most recent event
+                # positions we can use that.
+                missing_event_map_by_room = (
+                    await self.store.get_room_events_stream_for_rooms(
+                        room_ids=missing_rooms,
+                        from_key=to_token.room_key,
+                        to_key=from_token.stream_token.room_key,
+                        limit=1,
+                    )
+                )
+                unsent_room_ids = list(missing_event_map_by_room)
+
             connection_position = await self.connection_store.record_rooms(
                 sync_config=sync_config,
                 room_configs=relevant_room_map,
                 from_token=from_token,
                 sent_room_ids=relevant_rooms_to_send_map.keys(),
-                # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
-                unsent_room_ids=[],
+                unsent_room_ids=unsent_room_ids,
             )
         elif from_token:
             connection_position = from_token.connection_position
@@ -750,13 +821,20 @@ class SlidingSyncHandler:
             # Initial sync without a `from_token` starts at `0`
             connection_position = 0
 
-        return SlidingSyncResult(
+        sliding_sync_result = SlidingSyncResult(
             next_pos=SlidingSyncStreamToken(to_token, connection_position),
             lists=lists,
             rooms=rooms,
             extensions=extensions,
         )
 
+        # Make it easy to find traces for syncs that aren't empty
+        set_tag(SynapseTags.RESULT_PREFIX + "result", bool(sliding_sync_result))
+        set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id)
+
+        return sliding_sync_result
+
+    @trace
     async def get_room_membership_for_user_at_to_token(
         self,
         user: UserID,
@@ -1095,6 +1173,7 @@ class SlidingSyncHandler:
 
         return sync_room_id_set
 
+    @trace
     async def filter_rooms_relevant_for_sync(
         self,
         user: UserID,
@@ -1205,6 +1284,7 @@ class SlidingSyncHandler:
 
         # return None
 
+    @trace
     async def _bulk_get_stripped_state_for_rooms_from_sync_room_map(
         self,
         room_ids: StrCollection,
@@ -1295,6 +1375,7 @@ class SlidingSyncHandler:
 
         return room_id_to_stripped_state_map
 
+    @trace
     async def _bulk_get_partial_current_state_content_for_rooms(
         self,
         content_type: Literal[
@@ -1494,125 +1575,132 @@ class SlidingSyncHandler:
 
         # Filter for Direct-Message (DM) rooms
         if filters.is_dm is not None:
-            if filters.is_dm:
-                # Only DM rooms please
-                filtered_room_id_set = {
-                    room_id
-                    for room_id in filtered_room_id_set
-                    if sync_room_map[room_id].is_dm
-                }
-            else:
-                # Only non-DM rooms please
-                filtered_room_id_set = {
-                    room_id
-                    for room_id in filtered_room_id_set
-                    if not sync_room_map[room_id].is_dm
-                }
+            with start_active_span("filters.is_dm"):
+                if filters.is_dm:
+                    # Only DM rooms please
+                    filtered_room_id_set = {
+                        room_id
+                        for room_id in filtered_room_id_set
+                        if sync_room_map[room_id].is_dm
+                    }
+                else:
+                    # Only non-DM rooms please
+                    filtered_room_id_set = {
+                        room_id
+                        for room_id in filtered_room_id_set
+                        if not sync_room_map[room_id].is_dm
+                    }
 
         if filters.spaces is not None:
-            raise NotImplementedError()
+            with start_active_span("filters.spaces"):
+                raise NotImplementedError()
 
         # Filter for encrypted rooms
         if filters.is_encrypted is not None:
-            room_id_to_encryption = (
-                await self._bulk_get_partial_current_state_content_for_rooms(
-                    content_type="room_encryption",
-                    room_ids=filtered_room_id_set,
-                    to_token=to_token,
-                    sync_room_map=sync_room_map,
-                    room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+            with start_active_span("filters.is_encrypted"):
+                room_id_to_encryption = (
+                    await self._bulk_get_partial_current_state_content_for_rooms(
+                        content_type="room_encryption",
+                        room_ids=filtered_room_id_set,
+                        to_token=to_token,
+                        sync_room_map=sync_room_map,
+                        room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+                    )
                 )
-            )
 
-            # Make a copy so we don't run into an error: `Set changed size during
-            # iteration`, when we filter out and remove items
-            for room_id in filtered_room_id_set.copy():
-                encryption = room_id_to_encryption.get(room_id, ROOM_UNKNOWN_SENTINEL)
+                # Make a copy so we don't run into an error: `Set changed size during
+                # iteration`, when we filter out and remove items
+                for room_id in filtered_room_id_set.copy():
+                    encryption = room_id_to_encryption.get(
+                        room_id, ROOM_UNKNOWN_SENTINEL
+                    )
 
-                # Just remove rooms if we can't determine their encryption status
-                if encryption is ROOM_UNKNOWN_SENTINEL:
-                    filtered_room_id_set.remove(room_id)
-                    continue
+                    # Just remove rooms if we can't determine their encryption status
+                    if encryption is ROOM_UNKNOWN_SENTINEL:
+                        filtered_room_id_set.remove(room_id)
+                        continue
 
-                # If we're looking for encrypted rooms, filter out rooms that are not
-                # encrypted and vice versa
-                is_encrypted = encryption is not None
-                if (filters.is_encrypted and not is_encrypted) or (
-                    not filters.is_encrypted and is_encrypted
-                ):
-                    filtered_room_id_set.remove(room_id)
+                    # If we're looking for encrypted rooms, filter out rooms that are not
+                    # encrypted and vice versa
+                    is_encrypted = encryption is not None
+                    if (filters.is_encrypted and not is_encrypted) or (
+                        not filters.is_encrypted and is_encrypted
+                    ):
+                        filtered_room_id_set.remove(room_id)
 
         # Filter for rooms that the user has been invited to
         if filters.is_invite is not None:
-            # Make a copy so we don't run into an error: `Set changed size during
-            # iteration`, when we filter out and remove items
-            for room_id in filtered_room_id_set.copy():
-                room_for_user = sync_room_map[room_id]
-                # If we're looking for invite rooms, filter out rooms that the user is
-                # not invited to and vice versa
-                if (
-                    filters.is_invite and room_for_user.membership != Membership.INVITE
-                ) or (
-                    not filters.is_invite
-                    and room_for_user.membership == Membership.INVITE
-                ):
-                    filtered_room_id_set.remove(room_id)
+            with start_active_span("filters.is_invite"):
+                # Make a copy so we don't run into an error: `Set changed size during
+                # iteration`, when we filter out and remove items
+                for room_id in filtered_room_id_set.copy():
+                    room_for_user = sync_room_map[room_id]
+                    # If we're looking for invite rooms, filter out rooms that the user is
+                    # not invited to and vice versa
+                    if (
+                        filters.is_invite
+                        and room_for_user.membership != Membership.INVITE
+                    ) or (
+                        not filters.is_invite
+                        and room_for_user.membership == Membership.INVITE
+                    ):
+                        filtered_room_id_set.remove(room_id)
 
         # Filter by room type (space vs room, etc). A room must match one of the types
         # provided in the list. `None` is a valid type for rooms which do not have a
         # room type.
         if filters.room_types is not None or filters.not_room_types is not None:
-            room_id_to_type = (
-                await self._bulk_get_partial_current_state_content_for_rooms(
-                    content_type="room_type",
-                    room_ids=filtered_room_id_set,
-                    to_token=to_token,
-                    sync_room_map=sync_room_map,
-                    room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+            with start_active_span("filters.room_types"):
+                room_id_to_type = (
+                    await self._bulk_get_partial_current_state_content_for_rooms(
+                        content_type="room_type",
+                        room_ids=filtered_room_id_set,
+                        to_token=to_token,
+                        sync_room_map=sync_room_map,
+                        room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+                    )
                 )
-            )
 
-            # Make a copy so we don't run into an error: `Set changed size during
-            # iteration`, when we filter out and remove items
-            for room_id in filtered_room_id_set.copy():
-                room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL)
+                # Make a copy so we don't run into an error: `Set changed size during
+                # iteration`, when we filter out and remove items
+                for room_id in filtered_room_id_set.copy():
+                    room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL)
 
-                # Just remove rooms if we can't determine their type
-                if room_type is ROOM_UNKNOWN_SENTINEL:
-                    filtered_room_id_set.remove(room_id)
-                    continue
+                    # Just remove rooms if we can't determine their type
+                    if room_type is ROOM_UNKNOWN_SENTINEL:
+                        filtered_room_id_set.remove(room_id)
+                        continue
 
-                if (
-                    filters.room_types is not None
-                    and room_type not in filters.room_types
-                ):
-                    filtered_room_id_set.remove(room_id)
+                    if (
+                        filters.room_types is not None
+                        and room_type not in filters.room_types
+                    ):
+                        filtered_room_id_set.remove(room_id)
 
-                if (
-                    filters.not_room_types is not None
-                    and room_type in filters.not_room_types
-                ):
-                    filtered_room_id_set.remove(room_id)
+                    if (
+                        filters.not_room_types is not None
+                        and room_type in filters.not_room_types
+                    ):
+                        filtered_room_id_set.remove(room_id)
 
         if filters.room_name_like is not None:
-            # TODO: The room name is a bit more sensitive to leak than the
-            # create/encryption event. Maybe we should consider a better way to fetch
-            # historical state before implementing this.
-            #
-            # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms(
-            #     content_type="room_name",
-            #     room_ids=filtered_room_id_set,
-            #     to_token=to_token,
-            #     sync_room_map=sync_room_map,
-            #     room_id_to_stripped_state_map=room_id_to_stripped_state_map,
-            # )
-            raise NotImplementedError()
-
-        if filters.tags is not None:
-            raise NotImplementedError()
-
-        if filters.not_tags is not None:
-            raise NotImplementedError()
+            with start_active_span("filters.room_name_like"):
+                # TODO: The room name is a bit more sensitive to leak than the
+                # create/encryption event. Maybe we should consider a better way to fetch
+                # historical state before implementing this.
+                #
+                # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms(
+                #     content_type="room_name",
+                #     room_ids=filtered_room_id_set,
+                #     to_token=to_token,
+                #     sync_room_map=sync_room_map,
+                #     room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+                # )
+                raise NotImplementedError()
+
+        if filters.tags is not None or filters.not_tags is not None:
+            with start_active_span("filters.tags"):
+                raise NotImplementedError()
 
         # Assemble a new sync room map but only with the `filtered_room_id_set`
         return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
@@ -1674,6 +1762,7 @@ class SlidingSyncHandler:
             reverse=True,
         )
 
+    @trace
     async def get_current_state_ids_at(
         self,
         room_id: str,
@@ -1738,6 +1827,7 @@ class SlidingSyncHandler:
 
         return state_ids
 
+    @trace
     async def get_current_state_at(
         self,
         room_id: str,
@@ -1799,19 +1889,27 @@ class SlidingSyncHandler:
         """
         user = sync_config.user
 
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "membership",
+            room_membership_for_user_at_to_token.membership,
+        )
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "timeline_limit",
+            room_sync_config.timeline_limit,
+        )
+
         # Determine whether we should limit the timeline to the token range.
         #
         # We should return historical messages (before token range) in the
         # following cases because we want clients to be able to show a basic
         # screen of information:
+        #
         #  - Initial sync (because no `from_token` to limit us anyway)
         #  - When users `newly_joined`
         #  - For an incremental sync where we haven't sent it down this
         #    connection before
         #
-        # We also decide if we should ignore the timeline bound or not. This is
-        # to handle the case where the client has requested more historical
-        # messages in the room by increasing the timeline limit.
+        # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
         from_bound = None
         ignore_timeline_bound = False
         initial = True
@@ -1889,11 +1987,36 @@ class SlidingSyncHandler:
                     room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
                 )
 
-            fiddled_timeline_limit = room_sync_config.timeline_limit
-            # if to_bound:
-            #     fiddled_timeline_limit = max(fiddled_timeline_limit, 10)
-
-            timeline_events, new_room_key = await self.store.paginate_room_events(
+            # For initial `/sync` (and other historical scenarios mentioned above), we
+            # want to view a historical section of the timeline; to fetch events by
+            # `topological_ordering` (best representation of the room DAG as others were
+            # seeing it at the time). This also aligns with the order that `/messages`
+            # returns events in.
+            #
+            # For incremental `/sync`, we want to get all updates for rooms since
+            # the last `/sync` (regardless if those updates arrived late or happened
+            # a while ago in the past); to fetch events by `stream_ordering` (in the
+            # order they were received by the server).
+            #
+            # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
+            #
+            # FIXME: Using workaround for mypy,
+            # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
+            # https://github.com/python/mypy/issues/17479
+            paginate_room_events_by_topological_ordering: PaginateFunction = (
+                self.store.paginate_room_events_by_topological_ordering
+            )
+            paginate_room_events_by_stream_ordering: PaginateFunction = (
+                self.store.paginate_room_events_by_stream_ordering
+            )
+            pagination_method: PaginateFunction = (
+                # Use `topographical_ordering` for historical events
+                paginate_room_events_by_topological_ordering
+                if from_bound is None
+                # Use `stream_ordering` for updates
+                else paginate_room_events_by_stream_ordering
+            )
+            timeline_events, new_room_key = await pagination_method(
                 room_id=room_id,
                 # The bounds are reversed so we can paginate backwards
                 # (from newer to older events) starting at to_bound.
@@ -1903,8 +2026,7 @@ class SlidingSyncHandler:
                 direction=Direction.BACKWARDS,
                 # We add one so we can determine if there are enough events to saturate
                 # the limit or not (see `limited`)
-                limit=fiddled_timeline_limit + 1,
-                event_filter=None,
+                limit=room_sync_config.timeline_limit + 1,
             )
 
             # We want to return the events in ascending order (the last event is the
@@ -1914,11 +2036,11 @@ class SlidingSyncHandler:
             # Determine our `limited` status based on the timeline. We do this before
             # filtering the events so we can accurately determine if there is more to
             # paginate even if we filter out some/all events.
-            if len(timeline_events) > fiddled_timeline_limit:
+            if len(timeline_events) > room_sync_config.timeline_limit:
                 limited = True
                 # Get rid of that extra "+ 1" event because we only used it to determine
                 # if we hit the limit or not
-                timeline_events = timeline_events[-fiddled_timeline_limit:]
+                timeline_events = timeline_events[-room_sync_config.timeline_limit :]
                 assert timeline_events[0].internal_metadata.stream_ordering
                 new_room_key = RoomStreamToken(
                     stream=timeline_events[0].internal_metadata.stream_ordering - 1
@@ -2091,6 +2213,10 @@ class SlidingSyncHandler:
             if StateValues.WILDCARD in room_sync_config.required_state_map.get(
                 StateValues.WILDCARD, set()
             ):
+                set_tag(
+                    SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard",
+                    True,
+                )
                 required_state_filter = StateFilter.all()
             # TODO: `StateFilter` currently doesn't support wildcard event types. We're
             # currently working around this by returning all state to the client but it
@@ -2100,6 +2226,10 @@ class SlidingSyncHandler:
                 room_sync_config.required_state_map.get(StateValues.WILDCARD)
                 is not None
             ):
+                set_tag(
+                    SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard_event_type",
+                    True,
+                )
                 required_state_filter = StateFilter.all()
             else:
                 required_state_types: List[Tuple[str, Optional[str]]] = []
@@ -2107,8 +2237,12 @@ class SlidingSyncHandler:
                     state_type,
                     state_key_set,
                 ) in room_sync_config.required_state_map.items():
+                    num_wild_state_keys = 0
+                    lazy_load_room_members = False
+                    num_others = 0
                     for state_key in state_key_set:
                         if state_key == StateValues.WILDCARD:
+                            num_wild_state_keys += 1
                             # `None` is a wildcard in the `StateFilter`
                             required_state_types.append((state_type, None))
                         # We need to fetch all relevant people when we're lazy-loading membership
@@ -2116,6 +2250,7 @@ class SlidingSyncHandler:
                             state_type == EventTypes.Member
                             and state_key == StateValues.LAZY
                         ):
+                            lazy_load_room_members = True
                             # Everyone in the timeline is relevant
                             timeline_membership: Set[str] = set()
                             if timeline_events is not None:
@@ -2130,10 +2265,26 @@ class SlidingSyncHandler:
                             # FIXME: We probably also care about invite, ban, kick, targets, etc
                             # but the spec only mentions "senders".
                         elif state_key == StateValues.ME:
+                            num_others += 1
                             required_state_types.append((state_type, user.to_string()))
                         else:
+                            num_others += 1
                             required_state_types.append((state_type, state_key))
 
+                    set_tag(
+                        SynapseTags.FUNC_ARG_PREFIX
+                        + "required_state_wildcard_state_key_count",
+                        num_wild_state_keys,
+                    )
+                    set_tag(
+                        SynapseTags.FUNC_ARG_PREFIX + "required_state_lazy",
+                        lazy_load_room_members,
+                    )
+                    set_tag(
+                        SynapseTags.FUNC_ARG_PREFIX + "required_state_other_count",
+                        num_others,
+                    )
+
                 required_state_filter = StateFilter.from_types(required_state_types)
 
         # We need this base set of info for the response so let's just fetch it along
@@ -2242,6 +2393,8 @@ class SlidingSyncHandler:
                 if new_bump_event_pos.stream > 0:
                     bump_stamp = new_bump_event_pos.stream
 
+        set_tag(SynapseTags.RESULT_PREFIX + "initial", initial)
+
         return SlidingSyncResult.RoomResult(
             name=room_name,
             avatar=room_avatar,
@@ -2880,6 +3033,16 @@ class SlidingSyncConnectionStore:
         attr.Factory(dict)
     )
 
+    async def is_valid_token(
+        self, sync_config: SlidingSyncConfig, connection_token: int
+    ) -> bool:
+        """Return whether the connection token is valid/recognized"""
+        if connection_token == 0:
+            return True
+
+        conn_key = self._get_connection_key(sync_config)
+        return connection_token in self._connections.get(conn_key, {})
+
     async def have_sent_room(
         self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
     ) -> HaveSentRoom:
@@ -2895,6 +3058,7 @@ class SlidingSyncConnectionStore:
 
         return room_status
 
+    @trace
     async def record_rooms(
         self,
         sync_config: SlidingSyncConfig,
@@ -2966,9 +3130,10 @@ class SlidingSyncConnectionStore:
                     prev_state is not None
                     and prev_state.status == HaveSentRoomFlag.LIVE
                 ):
+                    assert prev_state.timeline_limit is not None
                     new_room_statuses[room_id] = HaveSentRoom.previously(
                         from_token.stream_token.room_key,
-                        room_configs[room_id].timeline_limit,
+                        prev_state.timeline_limit,
                     )
                     have_updated = True
 
@@ -2979,6 +3144,7 @@ class SlidingSyncConnectionStore:
 
         return new_store_token
 
+    @trace
     async def mark_token_seen(
         self,
         sync_config: SlidingSyncConfig,
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index ede014180c..6af2eeb75f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -43,6 +43,7 @@ from prometheus_client import Counter
 
 from synapse.api.constants import (
     AccountDataTypes,
+    Direction,
     EventContentFields,
     EventTypes,
     JoinRules,
@@ -64,6 +65,7 @@ from synapse.logging.opentracing import (
 )
 from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.databases.main.stream import PaginateFunction
 from synapse.storage.roommember import MemberSummary
 from synapse.types import (
     DeviceListUpdates,
@@ -879,22 +881,49 @@ class SyncHandler:
                 since_key = since_token.room_key
 
             while limited and len(recents) < timeline_limit and max_repeat:
-                # If we have a since_key then we are trying to get any events
-                # that have happened since `since_key` up to `end_key`, so we
-                # can just use `get_room_events_stream_for_room`.
-                # Otherwise, we want to return the last N events in the room
-                # in topological ordering.
-                if since_key:
-                    events, end_key = await self.store.get_room_events_stream_for_room(
-                        room_id,
-                        limit=load_limit + 1,
-                        from_key=since_key,
-                        to_key=end_key,
-                    )
-                else:
-                    events, end_key = await self.store.get_recent_events_for_room(
-                        room_id, limit=load_limit + 1, end_token=end_key
-                    )
+                # For initial `/sync`, we want to view a historical section of the
+                # timeline; to fetch events by `topological_ordering` (best
+                # representation of the room DAG as others were seeing it at the time).
+                # This also aligns with the order that `/messages` returns events in.
+                #
+                # For incremental `/sync`, we want to get all updates for rooms since
+                # the last `/sync` (regardless if those updates arrived late or happened
+                # a while ago in the past); to fetch events by `stream_ordering` (in the
+                # order they were received by the server).
+                #
+                # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917
+                #
+                # FIXME: Using workaround for mypy,
+                # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and
+                # https://github.com/python/mypy/issues/17479
+                paginate_room_events_by_topological_ordering: PaginateFunction = (
+                    self.store.paginate_room_events_by_topological_ordering
+                )
+                paginate_room_events_by_stream_ordering: PaginateFunction = (
+                    self.store.paginate_room_events_by_stream_ordering
+                )
+                pagination_method: PaginateFunction = (
+                    # Use `topographical_ordering` for historical events
+                    paginate_room_events_by_topological_ordering
+                    if since_key is None
+                    # Use `stream_ordering` for updates
+                    else paginate_room_events_by_stream_ordering
+                )
+                events, end_key = await pagination_method(
+                    room_id=room_id,
+                    # The bounds are reversed so we can paginate backwards
+                    # (from newer to older events) starting at to_bound.
+                    # This ensures we fill the `limit` with the newest events first,
+                    from_key=end_key,
+                    to_key=since_key,
+                    direction=Direction.BACKWARDS,
+                    # We add one so we can determine if there are enough events to saturate
+                    # the limit or not (see `limited`)
+                    limit=load_limit + 1,
+                )
+                # We want to return the events in ascending order (the last event is the
+                # most recent).
+                events.reverse()
 
                 log_kv({"loaded_recents": len(events)})
 
@@ -2641,9 +2670,10 @@ class SyncHandler:
         # a "gap" in the timeline, as described by the spec for /sync.
         room_to_events = await self.store.get_room_events_stream_for_rooms(
             room_ids=sync_result_builder.joined_room_ids,
-            from_key=since_token.room_key,
-            to_key=now_token.room_key,
+            from_key=now_token.room_key,
+            to_key=since_token.room_key,
             limit=timeline_limit + 1,
+            direction=Direction.BACKWARDS,
         )
 
         # We loop through all room ids, even if there are no new events, in case
@@ -2654,6 +2684,9 @@ class SyncHandler:
             newly_joined = room_id in newly_joined_rooms
             if room_entry:
                 events, start_key = room_entry
+                # We want to return the events in ascending order (the last event is the
+                # most recent).
+                events.reverse()
 
                 prev_batch_token = now_token.copy_and_replace(
                     StreamKeyType.ROOM, start_key
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index c29f365f27..18142d1c65 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -899,6 +899,9 @@ class SlidingSyncRestServlet(RestServlet):
         body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
 
         # Tag and log useful data to differentiate requests.
+        set_tag(
+            "sliding_sync.sync_type", "initial" if from_token is None else "incremental"
+        )
         set_tag("sliding_sync.conn_id", body.conn_id or "")
         log_kv(
             {
@@ -912,6 +915,12 @@ class SlidingSyncRestServlet(RestServlet):
                 "sliding_sync.room_subscriptions": list(
                     (body.room_subscriptions or {}).keys()
                 ),
+                # We also include the number of room subscriptions because logs are
+                # limited to 1024 characters and the large room ID list above can be cut
+                # off.
+                "sliding_sync.num_room_subscriptions": len(
+                    (body.room_subscriptions or {}).keys()
+                ),
             }
         )
 
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 640ab123f0..1d9f0f52e1 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -39,6 +39,7 @@ from typing import (
 import attr
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.logging.opentracing import trace
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -422,6 +423,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
                 return invite
         return None
 
+    @trace
     async def get_rooms_for_local_user_where_membership_is(
         self,
         user_id: str,
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index da3ebe66b8..9ed39e688a 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -24,6 +24,7 @@ from typing import List, Optional, Tuple
 
 import attr
 
+from synapse.logging.opentracing import trace
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import LoggingTransaction
 from synapse.storage.databases.main.stream import _filter_results_by_stream
@@ -159,6 +160,7 @@ class StateDeltasStore(SQLBaseStore):
             self._get_max_stream_id_in_current_state_deltas_txn,
         )
 
+    @trace
     async def get_current_state_deltas_for_room(
         self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken
     ) -> List[StateDelta]:
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 430c837828..baa23b1bfc 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -50,8 +50,8 @@ from typing import (
     Dict,
     Iterable,
     List,
-    Mapping,
     Optional,
+    Protocol,
     Set,
     Tuple,
     cast,
@@ -60,7 +60,7 @@ from typing import (
 
 import attr
 from immutabledict import immutabledict
-from typing_extensions import Literal
+from typing_extensions import Literal, assert_never
 
 from twisted.internet import defer
 
@@ -68,7 +68,7 @@ from synapse.api.constants import Direction, EventTypes, Membership
 from synapse.api.filtering import Filter
 from synapse.events import EventBase
 from synapse.logging.context import make_deferred_yieldable, run_in_background
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import tag_args, trace
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
@@ -85,7 +85,7 @@ from synapse.types import (
     RoomStreamToken,
     StrCollection,
 )
-from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.descriptors import cached
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
@@ -103,6 +103,18 @@ _STREAM_TOKEN = "stream"
 _TOPOLOGICAL_TOKEN = "topological"
 
 
+class PaginateFunction(Protocol):
+    async def __call__(
+        self,
+        *,
+        room_id: str,
+        from_key: RoomStreamToken,
+        to_key: Optional[RoomStreamToken] = None,
+        direction: Direction = Direction.BACKWARDS,
+        limit: int = 0,
+    ) -> Tuple[List[EventBase], RoomStreamToken]: ...
+
+
 # Used as return values for pagination APIs
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class _EventDictReturn:
@@ -286,7 +298,7 @@ def generate_pagination_bounds(
 
 
 def generate_next_token(
-    direction: Direction, last_topo_ordering: int, last_stream_ordering: int
+    direction: Direction, last_topo_ordering: Optional[int], last_stream_ordering: int
 ) -> RoomStreamToken:
     """
     Generate the next room stream token based on the currently returned data.
@@ -453,7 +465,6 @@ def _filter_results_by_stream(
     The `instance_name` arg is optional to handle historic rows, and is
     interpreted as if it was "master".
     """
-
     if instance_name is None:
         instance_name = "master"
 
@@ -670,33 +681,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
     async def get_room_events_stream_for_rooms(
         self,
+        *,
         room_ids: Collection[str],
         from_key: RoomStreamToken,
-        to_key: RoomStreamToken,
+        to_key: Optional[RoomStreamToken] = None,
+        direction: Direction = Direction.BACKWARDS,
         limit: int = 0,
-        order: str = "DESC",
     ) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]:
         """Get new room events in stream ordering since `from_key`.
 
         Args:
             room_ids
-            from_key: Token from which no events are returned before
-            to_key: Token from which no events are returned after. (This
-                is typically the current stream token)
+            from_key: The token to stream from (starting point and heading in the given
+                direction)
+            to_key: The token representing the end stream position (end point)
             limit: Maximum number of events to return
-            order: Either "DESC" or "ASC". Determines which events are
-                returned when the result is limited. If "DESC" then the most
-                recent `limit` events are returned, otherwise returns the
-                oldest `limit` events.
+            direction: Indicates whether we are paginating forwards or backwards
+                from `from_key`.
 
         Returns:
             A map from room id to a tuple containing:
                 - list of recent events in the room
                 - stream ordering key for the start of the chunk of events returned.
+
+            When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
+            When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
         """
-        room_ids = self._events_stream_cache.get_entities_changed(
-            room_ids, from_key.stream
-        )
+        if direction == Direction.FORWARDS:
+            room_ids = self._events_stream_cache.get_entities_changed(
+                room_ids, from_key.stream
+            )
+        elif direction == Direction.BACKWARDS:
+            if to_key is not None:
+                room_ids = self._events_stream_cache.get_entities_changed(
+                    room_ids, to_key.stream
+                )
+        else:
+            assert_never(direction)
 
         if not room_ids:
             return {}
@@ -708,12 +729,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 defer.gatherResults(
                     [
                         run_in_background(
-                            self.get_room_events_stream_for_room,
-                            room_id,
-                            from_key,
-                            to_key,
-                            limit,
-                            order=order,
+                            self.paginate_room_events_by_stream_ordering,
+                            room_id=room_id,
+                            from_key=from_key,
+                            to_key=to_key,
+                            direction=direction,
+                            limit=limit,
                         )
                         for room_id in rm_ids
                     ],
@@ -737,69 +758,122 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             if self._events_stream_cache.has_entity_changed(room_id, from_id)
         }
 
-    async def get_room_events_stream_for_room(
+    async def paginate_room_events_by_stream_ordering(
         self,
+        *,
         room_id: str,
         from_key: RoomStreamToken,
-        to_key: RoomStreamToken,
+        to_key: Optional[RoomStreamToken] = None,
+        direction: Direction = Direction.BACKWARDS,
         limit: int = 0,
-        order: str = "DESC",
     ) -> Tuple[List[EventBase], RoomStreamToken]:
-        """Get new room events in stream ordering since `from_key`.
+        """
+        Paginate events by `stream_ordering` in the room from the `from_key` in the
+        given `direction` to the `to_key` or `limit`.
 
         Args:
             room_id
-            from_key: Token from which no events are returned before
-            to_key: Token from which no events are returned after. (This
-                is typically the current stream token)
+            from_key: The token to stream from (starting point and heading in the given
+                direction)
+            to_key: The token representing the end stream position (end point)
+            direction: Indicates whether we are paginating forwards or backwards
+                from `from_key`.
             limit: Maximum number of events to return
-            order: Either "DESC" or "ASC". Determines which events are
-                returned when the result is limited. If "DESC" then the most
-                recent `limit` events are returned, otherwise returns the
-                oldest `limit` events.
 
         Returns:
-            The list of events (in ascending stream order) and the token from the start
-            of the chunk of events returned.
+            The results as a list of events and a token that points to the end
+            of the result set. If no events are returned then the end of the
+            stream has been reached (i.e. there are no events between `from_key`
+            and `to_key`).
+
+            When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
+            When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
         """
-        if from_key == to_key:
-            return [], from_key
 
-        has_changed = self._events_stream_cache.has_entity_changed(
-            room_id, from_key.stream
-        )
+        # FIXME: When going forwards, we should enforce that the `to_key` is not `None`
+        # because we always need an upper bound when querying the events stream (as
+        # otherwise we'll potentially pick up events that are not fully persisted).
+
+        # We should only be working with `stream_ordering` tokens here
+        assert from_key is None or from_key.topological is None
+        assert to_key is None or to_key.topological is None
+
+        # We can bail early if we're looking forwards, and our `to_key` is already
+        # before our `from_key`.
+        if (
+            direction == Direction.FORWARDS
+            and to_key is not None
+            and to_key.is_before_or_eq(from_key)
+        ):
+            # Token selection matches what we do below if there are no rows
+            return [], to_key if to_key else from_key
+        # Or vice-versa, if we're looking backwards and our `from_key` is already before
+        # our `to_key`.
+        elif (
+            direction == Direction.BACKWARDS
+            and to_key is not None
+            and from_key.is_before_or_eq(to_key)
+        ):
+            # Token selection matches what we do below if there are no rows
+            return [], to_key if to_key else from_key
+
+        # We can do a quick sanity check to see if any events have been sent in the room
+        # since the earlier token.
+        has_changed = True
+        if direction == Direction.FORWARDS:
+            has_changed = self._events_stream_cache.has_entity_changed(
+                room_id, from_key.stream
+            )
+        elif direction == Direction.BACKWARDS:
+            if to_key is not None:
+                has_changed = self._events_stream_cache.has_entity_changed(
+                    room_id, to_key.stream
+                )
+        else:
+            assert_never(direction)
 
         if not has_changed:
-            return [], from_key
+            # Token selection matches what we do below if there are no rows
+            return [], to_key if to_key else from_key
 
-        def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
-            # To handle tokens with a non-empty instance_map we fetch more
-            # results than necessary and then filter down
-            min_from_id = from_key.stream
-            max_to_id = to_key.get_max_stream_pos()
+        order, from_bound, to_bound = generate_pagination_bounds(
+            direction, from_key, to_key
+        )
 
-            sql = """
-                SELECT event_id, instance_name, topological_ordering, stream_ordering
+        bounds = generate_pagination_where_clause(
+            direction=direction,
+            # The empty string will shortcut downstream code to only use the
+            # `stream_ordering` column
+            column_names=("", "stream_ordering"),
+            from_token=from_bound,
+            to_token=to_bound,
+            engine=self.database_engine,
+        )
+
+        def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
+            sql = f"""
+                SELECT event_id, instance_name, stream_ordering
                 FROM events
                 WHERE
                     room_id = ?
                     AND not outlier
-                    AND stream_ordering > ? AND stream_ordering <= ?
-                ORDER BY stream_ordering %s LIMIT ?
-            """ % (
-                order,
-            )
-            txn.execute(sql, (room_id, min_from_id, max_to_id, 2 * limit))
+                    AND {bounds}
+                ORDER BY stream_ordering {order} LIMIT ?
+            """
+            txn.execute(sql, (room_id, 2 * limit))
 
             rows = [
                 _EventDictReturn(event_id, None, stream_ordering)
-                for event_id, instance_name, topological_ordering, stream_ordering in txn
-                if _filter_results(
-                    from_key,
-                    to_key,
-                    instance_name,
-                    topological_ordering,
-                    stream_ordering,
+                for event_id, instance_name, stream_ordering in txn
+                if _filter_results_by_stream(
+                    lower_token=(
+                        to_key if direction == Direction.BACKWARDS else from_key
+                    ),
+                    upper_token=(
+                        from_key if direction == Direction.BACKWARDS else to_key
+                    ),
+                    instance_name=instance_name,
+                    stream_ordering=stream_ordering,
                 )
             ][:limit]
             return rows
@@ -810,18 +884,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             [r.event_id for r in rows], get_prev_content=True
         )
 
-        if order.lower() == "desc":
-            ret.reverse()
-
         if rows:
-            key = RoomStreamToken(stream=min(r.stream_ordering for r in rows))
+            next_key = generate_next_token(
+                direction=direction,
+                last_topo_ordering=None,
+                last_stream_ordering=rows[-1].stream_ordering,
+            )
         else:
-            # Assume we didn't get anything because there was nothing to
-            # get.
-            key = from_key
+            # TODO (erikj): We should work out what to do here instead. (same as
+            # `_paginate_room_events_by_topological_ordering_txn(...)`)
+            next_key = to_key if to_key else from_key
 
-        return ret, key
+        return ret, next_key
 
+    @trace
     async def get_current_state_delta_membership_changes_for_user(
         self,
         user_id: str,
@@ -1127,7 +1203,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         rows, token = await self.db_pool.runInteraction(
             "get_recent_event_ids_for_room",
-            self._paginate_room_events_txn,
+            self._paginate_room_events_by_topological_ordering_txn,
             room_id,
             from_token=end_token,
             limit=limit,
@@ -1196,52 +1272,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return None
 
-    @cachedList(
-        cached_method_name="get_max_stream_ordering_in_room",
-        list_name="room_ids",
-    )
-    async def get_max_stream_ordering_in_rooms(
-        self, room_ids: StrCollection
-    ) -> Mapping[str, Optional[PersistedEventPosition]]:
-        """Get the positions for the latest event in a room.
-
-        A batched version of `get_max_stream_ordering_in_room`.
-        """
-        rows = await self.db_pool.simple_select_many_batch(
-            table="sliding_sync_room_metadata",
-            column="room_id",
-            iterable=room_ids,
-            retcols=("room_id", "instance_name", "last_stream_ordering"),
-            desc="get_max_stream_ordering_in_rooms",
-        )
-
-        return {
-            room_id: PersistedEventPosition(instance_name, stream)
-            for room_id, instance_name, stream in rows
-        }
-
-    @cached(max_entries=10000)
-    async def get_max_stream_ordering_in_room(
-        self,
-        room_id: str,
-    ) -> Optional[PersistedEventPosition]:
-        """Get the position for the latest event in a room.
-
-        Note: this may be after the current token for the room stream on this
-        process (e.g. due to replication lag)
-        """
-        row = await self.db_pool.simple_select_one(
-            table="sliding_sync_room_metadata",
-            retcols=("instance_name", "last_stream_ordering"),
-            keyvalues={"room_id": room_id},
-            allow_none=True,
-            desc="get_max_stream_ordering_in_room",
-        )
-        if not row:
-            return None
-
-        return PersistedEventPosition(instance_name=row[0], stream=row[1])
-
+    @trace
     async def get_last_event_pos_in_room_before_stream_ordering(
         self,
         room_id: str,
@@ -1678,7 +1709,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             topological=topological_ordering, stream=stream_ordering
         )
 
-        rows, start_token = self._paginate_room_events_txn(
+        rows, start_token = self._paginate_room_events_by_topological_ordering_txn(
             txn,
             room_id,
             before_token,
@@ -1688,7 +1719,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
         events_before = [r.event_id for r in rows]
 
-        rows, end_token = self._paginate_room_events_txn(
+        rows, end_token = self._paginate_room_events_by_topological_ordering_txn(
             txn,
             room_id,
             after_token,
@@ -1851,14 +1882,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     def has_room_changed_since(self, room_id: str, stream_id: int) -> bool:
         return self._events_stream_cache.has_entity_changed(room_id, stream_id)
 
-    def _paginate_room_events_txn(
+    def _paginate_room_events_by_topological_ordering_txn(
         self,
         txn: LoggingTransaction,
         room_id: str,
         from_token: RoomStreamToken,
         to_token: Optional[RoomStreamToken] = None,
         direction: Direction = Direction.BACKWARDS,
-        limit: int = -1,
+        limit: int = 0,
         event_filter: Optional[Filter] = None,
     ) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
         """Returns list of events before or after a given token.
@@ -1880,6 +1911,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             been reached (i.e. there are no events between `from_token` and
             `to_token`), or `limit` is zero.
         """
+        # We can bail early if we're looking forwards, and our `to_key` is already
+        # before our `from_token`.
+        if (
+            direction == Direction.FORWARDS
+            and to_token is not None
+            and to_token.is_before_or_eq(from_token)
+        ):
+            # Token selection matches what we do below if there are no rows
+            return [], to_token if to_token else from_token
+        # Or vice-versa, if we're looking backwards and our `from_token` is already before
+        # our `to_token`.
+        elif (
+            direction == Direction.BACKWARDS
+            and to_token is not None
+            and from_token.is_before_or_eq(to_token)
+        ):
+            # Token selection matches what we do below if there are no rows
+            return [], to_token if to_token else from_token
 
         args: List[Any] = [room_id]
 
@@ -1964,7 +2013,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             "bounds": bounds,
             "order": order,
         }
-
         txn.execute(sql, args)
 
         # Filter the result set.
@@ -1996,27 +2044,30 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         return rows, next_token
 
     @trace
-    async def paginate_room_events(
+    @tag_args
+    async def paginate_room_events_by_topological_ordering(
         self,
+        *,
         room_id: str,
         from_key: RoomStreamToken,
         to_key: Optional[RoomStreamToken] = None,
         direction: Direction = Direction.BACKWARDS,
-        limit: int = -1,
+        limit: int = 0,
         event_filter: Optional[Filter] = None,
     ) -> Tuple[List[EventBase], RoomStreamToken]:
-        """Returns list of events before or after a given token.
-
-        When Direction.FORWARDS: from_key < x <= to_key
-        When Direction.BACKWARDS: from_key >= x > to_key
+        """
+        Paginate events by `topological_ordering` (tie-break with `stream_ordering`) in
+        the room from the `from_key` in the given `direction` to the `to_key` or
+        `limit`.
 
         Args:
             room_id
-            from_key: The token used to stream from
-            to_key: A token which if given limits the results to only those before
+            from_key: The token to stream from (starting point and heading in the given
+                direction)
+            to_key: The token representing the end stream position (end point)
             direction: Indicates whether we are paginating forwards or backwards
                 from `from_key`.
-            limit: The maximum number of events to return.
+            limit: Maximum number of events to return
             event_filter: If provided filters the events to those that match the filter.
 
         Returns:
@@ -2024,8 +2075,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             of the result set. If no events are returned then the end of the
             stream has been reached (i.e. there are no events between `from_key`
             and `to_key`).
+
+            When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
+            When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
         """
 
+        # FIXME: When going forwards, we should enforce that the `to_key` is not `None`
+        # because we always need an upper bound when querying the events stream (as
+        # otherwise we'll potentially pick up events that are not fully persisted).
+
+        # We have these checks outside of the transaction function (txn) to save getting
+        # a DB connection and switching threads if we don't need to.
+        #
         # We can bail early if we're looking forwards, and our `to_key` is already
         # before our `from_key`.
         if (
@@ -2048,8 +2109,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             return [], to_key if to_key else from_key
 
         rows, token = await self.db_pool.runInteraction(
-            "paginate_room_events",
-            self._paginate_room_events_txn,
+            "paginate_room_events_by_topological_ordering",
+            self._paginate_room_events_by_topological_ordering_txn,
             room_id,
             from_key,
             to_key,
@@ -2246,6 +2307,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return rows
 
+    @trace
     def get_rooms_that_might_have_updates(
         self, room_ids: StrCollection, from_token: RoomStreamToken
     ) -> StrCollection:
diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py
index 4d8866b30a..6863c32f7c 100644
--- a/tests/rest/client/sliding_sync/test_connection_tracking.py
+++ b/tests/rest/client/sliding_sync/test_connection_tracking.py
@@ -21,8 +21,6 @@ import synapse.rest.admin
 from synapse.api.constants import EventTypes
 from synapse.rest.client import login, room, sync
 from synapse.server import HomeServer
-from synapse.types import SlidingSyncStreamToken
-from synapse.types.handlers import SlidingSyncConfig
 from synapse.util import Clock
 
 from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
@@ -130,7 +128,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
         self.helper.send(room_id1, "msg", tok=user1_tok)
 
         timeline_limit = 5
-        conn_id = "conn_id"
         sync_body = {
             "lists": {
                 "foo-list": {
@@ -170,40 +167,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
             response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
         )
 
-        # FIXME: This is a hack to record that the first room wasn't sent down
-        # sync, as we don't implement that currently.
-        sliding_sync_handler = self.hs.get_sliding_sync_handler()
-        requester = self.get_success(
-            self.hs.get_auth().get_user_by_access_token(user1_tok)
-        )
-        sync_config = SlidingSyncConfig(
-            user=requester.user,
-            requester=requester,
-            conn_id=conn_id,
-        )
-
-        parsed_initial_from_token = self.get_success(
-            SlidingSyncStreamToken.from_string(self.store, initial_from_token)
-        )
-        connection_position = self.get_success(
-            sliding_sync_handler.connection_store.record_rooms(
-                sync_config,
-                parsed_initial_from_token,
-                sent_room_ids=[],
-                unsent_room_ids=[room_id1],
-            )
-        )
-
-        # FIXME: Now fix up `from_token` with new connect position above.
-        parsed_from_token = self.get_success(
-            SlidingSyncStreamToken.from_string(self.store, from_token)
-        )
-        parsed_from_token = SlidingSyncStreamToken(
-            stream_token=parsed_from_token.stream_token,
-            connection_position=connection_position,
-        )
-        from_token = self.get_success(parsed_from_token.to_string(self.store))
-
         # We now send another event to room1, so we should sync all the missing events.
         resp = self.helper.send(room_id1, "msg2", tok=user1_tok)
         expected_events.append(resp["event_id"])
@@ -238,7 +201,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
 
         self.helper.send(room_id1, "msg", tok=user1_tok)
 
-        conn_id = "conn_id"
         sync_body = {
             "lists": {
                 "foo-list": {
@@ -279,40 +241,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase):
             response_body["rooms"].keys(), {room_id2}, response_body["rooms"]
         )
 
-        # FIXME: This is a hack to record that the first room wasn't sent down
-        # sync, as we don't implement that currently.
-        sliding_sync_handler = self.hs.get_sliding_sync_handler()
-        requester = self.get_success(
-            self.hs.get_auth().get_user_by_access_token(user1_tok)
-        )
-        sync_config = SlidingSyncConfig(
-            user=requester.user,
-            requester=requester,
-            conn_id=conn_id,
-        )
-
-        parsed_initial_from_token = self.get_success(
-            SlidingSyncStreamToken.from_string(self.store, initial_from_token)
-        )
-        connection_position = self.get_success(
-            sliding_sync_handler.connection_store.record_rooms(
-                sync_config,
-                parsed_initial_from_token,
-                sent_room_ids=[],
-                unsent_room_ids=[room_id1],
-            )
-        )
-
-        # FIXME: Now fix up `from_token` with new connect position above.
-        parsed_from_token = self.get_success(
-            SlidingSyncStreamToken.from_string(self.store, from_token)
-        )
-        parsed_from_token = SlidingSyncStreamToken(
-            stream_token=parsed_from_token.stream_token,
-            connection_position=connection_position,
-        )
-        from_token = self.get_success(parsed_from_token.to_string(self.store))
-
         # We now send another event to room1, so we should sync all the missing state.
         self.helper.send(room_id1, "msg", tok=user1_tok)
 
diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py
index 03e36914ae..a13cad223f 100644
--- a/tests/rest/client/sliding_sync/test_rooms_required_state.py
+++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py
@@ -161,10 +161,10 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
         self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
         self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
-    def test_rooms_required_state_incremental_sync_restart(self) -> None:
+    def test_rooms_incremental_sync_restart(self) -> None:
         """
-        Test `rooms.required_state` returns requested state events in the room during an
-        incremental sync, after a restart (and so the in memory caches are reset).
+        Test that after a restart (and so the in memory caches are reset) that
+        we correctly return an `M_UNKNOWN_POS`
         """
 
         user1_id = self.register_user("user1", "pass")
@@ -195,22 +195,16 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
         self.hs.get_sliding_sync_handler().connection_store._connections.clear()
 
         # Make the Sliding Sync request
-        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
-
-        # If the cache has been cleared then we do expect the state to come down
-        state_map = self.get_success(
-            self.storage_controllers.state.get_current_state(room_id1)
+        channel = self.make_request(
+            method="POST",
+            path=self.sync_endpoint + f"?pos={from_token}",
+            content=sync_body,
+            access_token=user1_tok,
         )
-
-        self._assertRequiredStateIncludes(
-            response_body["rooms"][room_id1]["required_state"],
-            {
-                state_map[(EventTypes.Create, "")],
-                state_map[(EventTypes.RoomHistoryVisibility, "")],
-            },
-            exact=True,
+        self.assertEqual(channel.code, 400, channel.json_body)
+        self.assertEqual(
+            channel.json_body["errcode"], "M_UNKNOWN_POS", channel.json_body
         )
-        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_wildcard(self) -> None:
         """
diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py
index 84a1e0d223..2e9586ca73 100644
--- a/tests/rest/client/sliding_sync/test_rooms_timeline.py
+++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py
@@ -12,13 +12,14 @@
 # <https://www.gnu.org/licenses/agpl-3.0.html>.
 #
 import logging
+from typing import List, Optional
 
 from twisted.test.proto_helpers import MemoryReactor
 
 import synapse.rest.admin
 from synapse.rest.client import login, room, sync
 from synapse.server import HomeServer
-from synapse.types import StreamToken
+from synapse.types import StreamToken, StrSequence
 from synapse.util import Clock
 
 from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
@@ -42,6 +43,82 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
         self.store = hs.get_datastores().main
         self.storage_controllers = hs.get_storage_controllers()
 
+    def _assertListEqual(
+        self,
+        actual_items: StrSequence,
+        expected_items: StrSequence,
+        message: Optional[str] = None,
+    ) -> None:
+        """
+        Like `self.assertListEqual(...)` but with an actually understandable diff message.
+        """
+
+        if actual_items == expected_items:
+            return
+
+        expected_lines: List[str] = []
+        for expected_item in expected_items:
+            is_expected_in_actual = expected_item in actual_items
+            expected_lines.append(
+                "{}  {}".format(" " if is_expected_in_actual else "?", expected_item)
+            )
+
+        actual_lines: List[str] = []
+        for actual_item in actual_items:
+            is_actual_in_expected = actual_item in expected_items
+            actual_lines.append(
+                "{}  {}".format("+" if is_actual_in_expected else " ", actual_item)
+            )
+
+        newline = "\n"
+        expected_string = f"Expected items to be in actual ('?' = missing expected items):\n [\n{newline.join(expected_lines)}\n ]"
+        actual_string = f"Actual ('+' = found expected items):\n [\n{newline.join(actual_lines)}\n ]"
+        first_message = "Items must"
+        diff_message = f"{first_message}\n{expected_string}\n{actual_string}"
+
+        self.fail(f"{diff_message}\n{message}")
+
+    def _assertTimelineEqual(
+        self,
+        *,
+        room_id: str,
+        actual_event_ids: List[str],
+        expected_event_ids: List[str],
+        message: Optional[str] = None,
+    ) -> None:
+        """
+        Like `self.assertListEqual(...)` for event IDs in a room but will give a nicer
+        output with context for what each event_id is (type, stream_ordering, content,
+        etc).
+        """
+        if actual_event_ids == expected_event_ids:
+            return
+
+        event_id_set = set(actual_event_ids + expected_event_ids)
+        events = self.get_success(self.store.get_events(event_id_set))
+
+        def event_id_to_string(event_id: str) -> str:
+            event = events.get(event_id)
+            if event:
+                state_key = event.get_state_key()
+                state_key_piece = f", {state_key}" if state_key is not None else ""
+                return (
+                    f"({event.internal_metadata.stream_ordering: >2}, {event.internal_metadata.instance_name}) "
+                    + f"{event.event_id} ({event.type}{state_key_piece}) {event.content.get('membership', '')}{event.content.get('body', '')}"
+                )
+
+            return f"{event_id} <event not found in room_id={room_id}>"
+
+        self._assertListEqual(
+            actual_items=[
+                event_id_to_string(event_id) for event_id in actual_event_ids
+            ],
+            expected_items=[
+                event_id_to_string(event_id) for event_id in expected_event_ids
+            ],
+            message=message,
+        )
+
     def test_rooms_limited_initial_sync(self) -> None:
         """
         Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit`
@@ -85,17 +162,18 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
             response_body["rooms"][room_id1],
         )
         # Check to make sure the latest events are returned
-        self.assertEqual(
-            [
+        self._assertTimelineEqual(
+            room_id=room_id1,
+            actual_event_ids=[
                 event["event_id"]
                 for event in response_body["rooms"][room_id1]["timeline"]
             ],
-            [
+            expected_event_ids=[
                 event_response4["event_id"],
                 event_response5["event_id"],
                 user1_join_response["event_id"],
             ],
-            response_body["rooms"][room_id1]["timeline"],
+            message=str(response_body["rooms"][room_id1]["timeline"]),
         )
 
         # Check to make sure the `prev_batch` points at the right place
@@ -227,16 +305,17 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
             + str(response_body["rooms"][room_id1]),
         )
         # Check to make sure the latest events are returned
-        self.assertEqual(
-            [
+        self._assertTimelineEqual(
+            room_id=room_id1,
+            actual_event_ids=[
                 event["event_id"]
                 for event in response_body["rooms"][room_id1]["timeline"]
             ],
-            [
+            expected_event_ids=[
                 event_response2["event_id"],
                 event_response3["event_id"],
             ],
-            response_body["rooms"][room_id1]["timeline"],
+            message=str(response_body["rooms"][room_id1]["timeline"]),
         )
 
         # All events are "live"
@@ -303,18 +382,19 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
             + str(response_body["rooms"][room_id1]),
         )
         # Check to make sure that the "live" and historical events are returned
-        self.assertEqual(
-            [
+        self._assertTimelineEqual(
+            room_id=room_id1,
+            actual_event_ids=[
                 event["event_id"]
                 for event in response_body["rooms"][room_id1]["timeline"]
             ],
-            [
+            expected_event_ids=[
                 event_response2["event_id"],
                 user1_join_response["event_id"],
                 event_response3["event_id"],
                 event_response4["event_id"],
             ],
-            response_body["rooms"][room_id1]["timeline"],
+            message=str(response_body["rooms"][room_id1]["timeline"]),
         )
 
         # Only events after the `from_token` are "live" (join, event3, event4)
@@ -361,17 +441,18 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
         # We should see events before the ban but not after
-        self.assertEqual(
-            [
+        self._assertTimelineEqual(
+            room_id=room_id1,
+            actual_event_ids=[
                 event["event_id"]
                 for event in response_body["rooms"][room_id1]["timeline"]
             ],
-            [
+            expected_event_ids=[
                 event_response3["event_id"],
                 event_response4["event_id"],
                 user1_ban_response["event_id"],
             ],
-            response_body["rooms"][room_id1]["timeline"],
+            message=str(response_body["rooms"][room_id1]["timeline"]),
         )
         # No "live" events in an initial sync (no `from_token` to define the "live"
         # range)
@@ -428,17 +509,18 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
         response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
         # We should see events before the ban but not after
-        self.assertEqual(
-            [
+        self._assertTimelineEqual(
+            room_id=room_id1,
+            actual_event_ids=[
                 event["event_id"]
                 for event in response_body["rooms"][room_id1]["timeline"]
             ],
-            [
+            expected_event_ids=[
                 event_response3["event_id"],
                 event_response4["event_id"],
                 user1_ban_response["event_id"],
             ],
-            response_body["rooms"][room_id1]["timeline"],
+            message=str(response_body["rooms"][room_id1]["timeline"]),
         )
         # All live events in the incremental sync
         self.assertEqual(
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
index 9dea1af8ea..7b7590da76 100644
--- a/tests/storage/test_stream.py
+++ b/tests/storage/test_stream.py
@@ -148,7 +148,7 @@ class PaginationTestCase(HomeserverTestCase):
         """Make a request to /messages with a filter, returns the chunk of events."""
 
         events, next_key = self.get_success(
-            self.hs.get_datastores().main.paginate_room_events(
+            self.hs.get_datastores().main.paginate_room_events_by_topological_ordering(
                 room_id=self.room_id,
                 from_key=self.from_token.room_key,
                 to_key=None,