diff options
author | Erik Johnston <erik@matrix.org> | 2024-08-08 11:03:20 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2024-08-08 11:03:20 +0100 |
commit | 4aeace7e0dd4e38041d5cb601ec01407ac96883b (patch) | |
tree | c7ce70e802aa94a94ef395c6990831bc277ef992 | |
parent | Update log line (diff) | |
parent | SSS: Implement PREVIOUSLY room tracking (#17535) (diff) | |
download | synapse-4aeace7e0dd4e38041d5cb601ec01407ac96883b.tar.xz |
Merge remote-tracking branch 'origin/develop' into erikj/ss_hacks
41 files changed, 955 insertions, 603 deletions
diff --git a/CHANGES.md b/CHANGES.md index b4fddc3e5c..c40c11f98c 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,47 @@ +# Synapse 1.113.0rc1 (2024-08-06) + +### Features + +- Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17447](https://github.com/element-hq/synapse/issues/17447)) +- Add Account Data extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17477](https://github.com/element-hq/synapse/issues/17477)) +- Add receipts extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17489](https://github.com/element-hq/synapse/issues/17489)) +- Add typing notification extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. ([\#17505](https://github.com/element-hq/synapse/issues/17505)) + +### Bugfixes + +- Update experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint to handle invite/knock rooms when filtering. ([\#17450](https://github.com/element-hq/synapse/issues/17450)) +- Fix a bug introduced in v1.110.0 which caused `/keys/query` to return incomplete results, leading to high network activity and CPU usage on Matrix clients. ([\#17499](https://github.com/element-hq/synapse/issues/17499)) + +### Improved Documentation + +- Update the [`allowed_local_3pids`](https://element-hq.github.io/synapse/v1.112/usage/configuration/config_documentation.html#allowed_local_3pids) config option's msisdn address to a working example. ([\#17476](https://github.com/element-hq/synapse/issues/17476)) + +### Internal Changes + +- Change sliding sync to use their own token format in preparation for storing per-connection state. ([\#17452](https://github.com/element-hq/synapse/issues/17452)) +- Ensure we don't send down negative `bump_stamp` in experimental sliding sync endpoint. ([\#17478](https://github.com/element-hq/synapse/issues/17478)) +- Do not send down empty room entries down experimental sliding sync endpoint. ([\#17479](https://github.com/element-hq/synapse/issues/17479)) +- Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`. ([\#17481](https://github.com/element-hq/synapse/issues/17481), [\#17482](https://github.com/element-hq/synapse/issues/17482)) +- Add some opentracing tags and logging to the experimental sliding sync implementation. ([\#17501](https://github.com/element-hq/synapse/issues/17501)) +- Split and move Sliding Sync tests so we have some more sane test file sizes. ([\#17504](https://github.com/element-hq/synapse/issues/17504)) +- Update the `limited` field description in the Sliding Sync response to accurately describe what it actually represents. ([\#17507](https://github.com/element-hq/synapse/issues/17507)) +- Easier to understand `timeline` assertions in Sliding Sync tests. ([\#17511](https://github.com/element-hq/synapse/issues/17511)) +- Reset the sliding sync connection if we don't recognize the per-connection state position. ([\#17529](https://github.com/element-hq/synapse/issues/17529)) + + + +### Updates to locked dependencies + +* Bump bcrypt from 4.1.3 to 4.2.0. ([\#17495](https://github.com/element-hq/synapse/issues/17495)) +* Bump black from 24.4.2 to 24.8.0. ([\#17522](https://github.com/element-hq/synapse/issues/17522)) +* Bump phonenumbers from 8.13.39 to 8.13.42. ([\#17521](https://github.com/element-hq/synapse/issues/17521)) +* Bump ruff from 0.5.4 to 0.5.5. ([\#17494](https://github.com/element-hq/synapse/issues/17494)) +* Bump serde_json from 1.0.120 to 1.0.121. ([\#17493](https://github.com/element-hq/synapse/issues/17493)) +* Bump serde_json from 1.0.121 to 1.0.122. ([\#17525](https://github.com/element-hq/synapse/issues/17525)) +* Bump towncrier from 23.11.0 to 24.7.1. ([\#17523](https://github.com/element-hq/synapse/issues/17523)) +* Bump types-pyopenssl from 24.1.0.20240425 to 24.1.0.20240722. ([\#17496](https://github.com/element-hq/synapse/issues/17496)) +* Bump types-setuptools from 70.1.0.20240627 to 71.1.0.20240726. ([\#17497](https://github.com/element-hq/synapse/issues/17497)) + # Synapse 1.112.0 (2024-07-30) This security release is to update our locked dependency on Twisted to 24.7.0rc1, which includes a security fix for [CVE-2024-41671 / GHSA-c8m8-j448-xjx7: Disordered HTTP pipeline response in twisted.web, again](https://github.com/twisted/twisted/security/advisories/GHSA-c8m8-j448-xjx7). diff --git a/Cargo.lock b/Cargo.lock index 333499e197..ce5520436d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,9 +67,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytes" -version = "1.6.1" +version = "1.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "cfg-if" @@ -444,9 +444,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.5" +version = "1.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b91213439dad192326a0d7c6ee3955910425f441d7038e0d6933b0aec5c4517f" +checksum = "4219d74c6b67a3654a9fbebc4b419e22126d13d2f3c4a07ee0cb61ff79a79619" dependencies = [ "aho-corasick", "memchr", @@ -505,9 +505,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.121" +version = "1.0.122" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609" +checksum = "784b6203951c57ff748476b126ccb5e8e2959a5c19e5c617ab1956be3dbc68da" dependencies = [ "itoa", "memchr", diff --git a/changelog.d/17447.feature b/changelog.d/17447.feature deleted file mode 100644 index 6f80e298ae..0000000000 --- a/changelog.d/17447.feature +++ /dev/null @@ -1 +0,0 @@ -Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/changelog.d/17450.bugfix b/changelog.d/17450.bugfix deleted file mode 100644 index 01a521da38..0000000000 --- a/changelog.d/17450.bugfix +++ /dev/null @@ -1 +0,0 @@ -Update experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint to handle invite/knock rooms when filtering. diff --git a/changelog.d/17452.misc b/changelog.d/17452.misc deleted file mode 100644 index 4fd07f617b..0000000000 --- a/changelog.d/17452.misc +++ /dev/null @@ -1 +0,0 @@ -Change sliding sync to use their own token format in preparation for storing per-connection state. diff --git a/changelog.d/17476.doc b/changelog.d/17476.doc deleted file mode 100644 index 89d8d490bb..0000000000 --- a/changelog.d/17476.doc +++ /dev/null @@ -1 +0,0 @@ -Update the [`allowed_local_3pids`](https://element-hq.github.io/synapse/v1.112/usage/configuration/config_documentation.html#allowed_local_3pids) config option's msisdn address to a working example. diff --git a/changelog.d/17477.feature b/changelog.d/17477.feature deleted file mode 100644 index 9785a2ef7b..0000000000 --- a/changelog.d/17477.feature +++ /dev/null @@ -1 +0,0 @@ -Add Account Data extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/changelog.d/17478.misc b/changelog.d/17478.misc deleted file mode 100644 index 5406c82742..0000000000 --- a/changelog.d/17478.misc +++ /dev/null @@ -1 +0,0 @@ -Ensure we don't send down negative `bump_stamp` in experimental sliding sync endpoint. diff --git a/changelog.d/17479.misc b/changelog.d/17479.misc deleted file mode 100644 index 4502f71662..0000000000 --- a/changelog.d/17479.misc +++ /dev/null @@ -1 +0,0 @@ -Do not send down empty room entries down experimental sliding sync endpoint. diff --git a/changelog.d/17481.misc b/changelog.d/17481.misc deleted file mode 100644 index ac55538424..0000000000 --- a/changelog.d/17481.misc +++ /dev/null @@ -1 +0,0 @@ -Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`. diff --git a/changelog.d/17482.misc b/changelog.d/17482.misc deleted file mode 100644 index ac55538424..0000000000 --- a/changelog.d/17482.misc +++ /dev/null @@ -1 +0,0 @@ -Refactor Sliding Sync tests to better utilize the `SlidingSyncBase`. diff --git a/changelog.d/17489.feature b/changelog.d/17489.feature deleted file mode 100644 index 5ace1e675e..0000000000 --- a/changelog.d/17489.feature +++ /dev/null @@ -1 +0,0 @@ -Add receipts extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/changelog.d/17499.bugfix b/changelog.d/17499.bugfix deleted file mode 100644 index 5cb7b3c30e..0000000000 --- a/changelog.d/17499.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix a bug introduced in v1.110.0 which caused `/keys/query` to return incomplete results, leading to high network activity and CPU usage on Matrix clients. diff --git a/changelog.d/17501.misc b/changelog.d/17501.misc deleted file mode 100644 index ba96472acb..0000000000 --- a/changelog.d/17501.misc +++ /dev/null @@ -1 +0,0 @@ -Add some opentracing tags and logging to the experimental sliding sync implementation. diff --git a/changelog.d/17504.misc b/changelog.d/17504.misc deleted file mode 100644 index 4ab892843d..0000000000 --- a/changelog.d/17504.misc +++ /dev/null @@ -1 +0,0 @@ -Split and move Sliding Sync tests so we have some more sane test file sizes. diff --git a/changelog.d/17505.feature b/changelog.d/17505.feature deleted file mode 100644 index ca0c2bd70f..0000000000 --- a/changelog.d/17505.feature +++ /dev/null @@ -1 +0,0 @@ -Add typing notification extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/changelog.d/17507.misc b/changelog.d/17507.misc deleted file mode 100644 index 82c4d263be..0000000000 --- a/changelog.d/17507.misc +++ /dev/null @@ -1 +0,0 @@ -Update the `limited` field description in the Sliding Sync response to accurately describe what it actually represents. diff --git a/changelog.d/17510.bugfix b/changelog.d/17510.bugfix new file mode 100644 index 0000000000..3170c284bd --- /dev/null +++ b/changelog.d/17510.bugfix @@ -0,0 +1 @@ +Fix timeline ordering (using `stream_ordering` instead of topological ordering) in experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/changelog.d/17514.misc b/changelog.d/17514.misc new file mode 100644 index 0000000000..fc3cc37915 --- /dev/null +++ b/changelog.d/17514.misc @@ -0,0 +1 @@ +Add more tracing to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/changelog.d/17515.doc b/changelog.d/17515.doc new file mode 100644 index 0000000000..c2dbe24e9d --- /dev/null +++ b/changelog.d/17515.doc @@ -0,0 +1,3 @@ +Clarify default behaviour of the +[`auto_accept_invites.worker_to_run_on`](https://element-hq.github.io/synapse/develop/usage/configuration/config_documentation.html#auto-accept-invites) +option. \ No newline at end of file diff --git a/changelog.d/17531.misc b/changelog.d/17531.misc new file mode 100644 index 0000000000..25b7b36a72 --- /dev/null +++ b/changelog.d/17531.misc @@ -0,0 +1 @@ +Fixup comment in sliding sync implementation. diff --git a/changelog.d/17535.bugfix b/changelog.d/17535.bugfix new file mode 100644 index 0000000000..c5b5da0485 --- /dev/null +++ b/changelog.d/17535.bugfix @@ -0,0 +1 @@ +Fix experimental sliding sync implementation to remember any updates in rooms that were not sent down immediately. diff --git a/debian/changelog b/debian/changelog index e35750a35f..e89fdb98dc 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,9 @@ +matrix-synapse-py3 (1.113.0~rc1) stable; urgency=medium + + * New Synapse release 1.113.0rc1. + + -- Synapse Packaging team <packages@matrix.org> Tue, 06 Aug 2024 12:23:23 +0100 + matrix-synapse-py3 (1.112.0) stable; urgency=medium * New Synapse release 1.112.0. diff --git a/docs/development/room-dag-concepts.md b/docs/development/room-dag-concepts.md index 76709487f8..35b667831c 100644 --- a/docs/development/room-dag-concepts.md +++ b/docs/development/room-dag-concepts.md @@ -21,8 +21,10 @@ incrementing integer, but backfilled events start with `stream_ordering=-1` and --- - - `/sync` returns things in the order they arrive at the server (`stream_ordering`). - - `/messages` (and `/backfill` in the federation API) return them in the order determined by the event graph `(topological_ordering, stream_ordering)`. + - Incremental `/sync?since=xxx` returns things in the order they arrive at the server + (`stream_ordering`). + - Initial `/sync`, `/messages` (and `/backfill` in the federation API) return them in + the order determined by the event graph `(topological_ordering, stream_ordering)`. The general idea is that, if you're following a room in real-time (i.e. `/sync`), you probably want to see the messages as they arrive at your server, diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 40f64be856..567bbf88d2 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -4685,7 +4685,9 @@ This setting has the following sub-options: * `only_for_direct_messages`: Whether invites should be automatically accepted for all room types, or only for direct messages. Defaults to false. * `only_from_local_users`: Whether to only automatically accept invites from users on this homeserver. Defaults to false. -* `worker_to_run_on`: Which worker to run this module on. This must match the "worker_name". +* `worker_to_run_on`: Which worker to run this module on. This must match + the "worker_name". If not set or `null`, invites will be accepted on the + main process. NOTE: Care should be taken not to enable this setting if the `synapse_auto_accept_invite` module is enabled and installed. The two modules will compete to perform the same task and may result in undesired behaviour. For example, multiple join diff --git a/poetry.lock b/poetry.lock index 7d8334515a..278bd6cb6e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "annotated-types" @@ -107,33 +107,33 @@ typecheck = ["mypy"] [[package]] name = "black" -version = "24.4.2" +version = "24.8.0" description = "The uncompromising code formatter." optional = false python-versions = ">=3.8" files = [ - {file = "black-24.4.2-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:dd1b5a14e417189db4c7b64a6540f31730713d173f0b63e55fabd52d61d8fdce"}, - {file = "black-24.4.2-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8e537d281831ad0e71007dcdcbe50a71470b978c453fa41ce77186bbe0ed6021"}, - {file = "black-24.4.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eaea3008c281f1038edb473c1aa8ed8143a5535ff18f978a318f10302b254063"}, - {file = "black-24.4.2-cp310-cp310-win_amd64.whl", hash = "sha256:7768a0dbf16a39aa5e9a3ded568bb545c8c2727396d063bbaf847df05b08cd96"}, - {file = "black-24.4.2-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:257d724c2c9b1660f353b36c802ccece186a30accc7742c176d29c146df6e474"}, - {file = "black-24.4.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:bdde6f877a18f24844e381d45e9947a49e97933573ac9d4345399be37621e26c"}, - {file = "black-24.4.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e151054aa00bad1f4e1f04919542885f89f5f7d086b8a59e5000e6c616896ffb"}, - {file = "black-24.4.2-cp311-cp311-win_amd64.whl", hash = "sha256:7e122b1c4fb252fd85df3ca93578732b4749d9be076593076ef4d07a0233c3e1"}, - {file = "black-24.4.2-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:accf49e151c8ed2c0cdc528691838afd217c50412534e876a19270fea1e28e2d"}, - {file = "black-24.4.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:88c57dc656038f1ab9f92b3eb5335ee9b021412feaa46330d5eba4e51fe49b04"}, - {file = "black-24.4.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:be8bef99eb46d5021bf053114442914baeb3649a89dc5f3a555c88737e5e98fc"}, - {file = "black-24.4.2-cp312-cp312-win_amd64.whl", hash = "sha256:415e686e87dbbe6f4cd5ef0fbf764af7b89f9057b97c908742b6008cc554b9c0"}, - {file = "black-24.4.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:bf10f7310db693bb62692609b397e8d67257c55f949abde4c67f9cc574492cc7"}, - {file = "black-24.4.2-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:98e123f1d5cfd42f886624d84464f7756f60ff6eab89ae845210631714f6db94"}, - {file = "black-24.4.2-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:48a85f2cb5e6799a9ef05347b476cce6c182d6c71ee36925a6c194d074336ef8"}, - {file = "black-24.4.2-cp38-cp38-win_amd64.whl", hash = "sha256:b1530ae42e9d6d5b670a34db49a94115a64596bc77710b1d05e9801e62ca0a7c"}, - {file = "black-24.4.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:37aae07b029fa0174d39daf02748b379399b909652a806e5708199bd93899da1"}, - {file = "black-24.4.2-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:da33a1a5e49c4122ccdfd56cd021ff1ebc4a1ec4e2d01594fef9b6f267a9e741"}, - {file = "black-24.4.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ef703f83fc32e131e9bcc0a5094cfe85599e7109f896fe8bc96cc402f3eb4b6e"}, - {file = "black-24.4.2-cp39-cp39-win_amd64.whl", hash = "sha256:b9176b9832e84308818a99a561e90aa479e73c523b3f77afd07913380ae2eab7"}, - {file = "black-24.4.2-py3-none-any.whl", hash = "sha256:d36ed1124bb81b32f8614555b34cc4259c3fbc7eec17870e8ff8ded335b58d8c"}, - {file = "black-24.4.2.tar.gz", hash = "sha256:c872b53057f000085da66a19c55d68f6f8ddcac2642392ad3a355878406fbd4d"}, + {file = "black-24.8.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:09cdeb74d494ec023ded657f7092ba518e8cf78fa8386155e4a03fdcc44679e6"}, + {file = "black-24.8.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:81c6742da39f33b08e791da38410f32e27d632260e599df7245cccee2064afeb"}, + {file = "black-24.8.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:707a1ca89221bc8a1a64fb5e15ef39cd755633daa672a9db7498d1c19de66a42"}, + {file = "black-24.8.0-cp310-cp310-win_amd64.whl", hash = "sha256:d6417535d99c37cee4091a2f24eb2b6d5ec42b144d50f1f2e436d9fe1916fe1a"}, + {file = "black-24.8.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:fb6e2c0b86bbd43dee042e48059c9ad7830abd5c94b0bc518c0eeec57c3eddc1"}, + {file = "black-24.8.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:837fd281f1908d0076844bc2b801ad2d369c78c45cf800cad7b61686051041af"}, + {file = "black-24.8.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:62e8730977f0b77998029da7971fa896ceefa2c4c4933fcd593fa599ecbf97a4"}, + {file = "black-24.8.0-cp311-cp311-win_amd64.whl", hash = "sha256:72901b4913cbac8972ad911dc4098d5753704d1f3c56e44ae8dce99eecb0e3af"}, + {file = "black-24.8.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:7c046c1d1eeb7aea9335da62472481d3bbf3fd986e093cffd35f4385c94ae368"}, + {file = "black-24.8.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:649f6d84ccbae73ab767e206772cc2d7a393a001070a4c814a546afd0d423aed"}, + {file = "black-24.8.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:2b59b250fdba5f9a9cd9d0ece6e6d993d91ce877d121d161e4698af3eb9c1018"}, + {file = "black-24.8.0-cp312-cp312-win_amd64.whl", hash = "sha256:6e55d30d44bed36593c3163b9bc63bf58b3b30e4611e4d88a0c3c239930ed5b2"}, + {file = "black-24.8.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:505289f17ceda596658ae81b61ebbe2d9b25aa78067035184ed0a9d855d18afd"}, + {file = "black-24.8.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:b19c9ad992c7883ad84c9b22aaa73562a16b819c1d8db7a1a1a49fb7ec13c7d2"}, + {file = "black-24.8.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1f13f7f386f86f8121d76599114bb8c17b69d962137fc70efe56137727c7047e"}, + {file = "black-24.8.0-cp38-cp38-win_amd64.whl", hash = "sha256:f490dbd59680d809ca31efdae20e634f3fae27fba3ce0ba3208333b713bc3920"}, + {file = "black-24.8.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:eab4dd44ce80dea27dc69db40dab62d4ca96112f87996bca68cd75639aeb2e4c"}, + {file = "black-24.8.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:3c4285573d4897a7610054af5a890bde7c65cb466040c5f0c8b732812d7f0e5e"}, + {file = "black-24.8.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9e84e33b37be070ba135176c123ae52a51f82306def9f7d063ee302ecab2cf47"}, + {file = "black-24.8.0-cp39-cp39-win_amd64.whl", hash = "sha256:73bbf84ed136e45d451a260c6b73ed674652f90a2b3211d6a35e78054563a9bb"}, + {file = "black-24.8.0-py3-none-any.whl", hash = "sha256:972085c618ee94f402da1af548a4f218c754ea7e5dc70acb168bfaca4c2542ed"}, + {file = "black-24.8.0.tar.gz", hash = "sha256:2500945420b6784c38b9ee885af039f5e7471ef284ab03fa35ecdde4688cd83f"}, ] [package.dependencies] @@ -1516,13 +1516,13 @@ files = [ [[package]] name = "phonenumbers" -version = "8.13.39" +version = "8.13.42" description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers." optional = false python-versions = "*" files = [ - {file = "phonenumbers-8.13.39-py2.py3-none-any.whl", hash = "sha256:3ad2d086fa71e7eef409001b9195ac54bebb0c6e3e752209b558ca192c9229a0"}, - {file = "phonenumbers-8.13.39.tar.gz", hash = "sha256:db7ca4970d206b2056231105300753b1a5b229f43416f8c2b3010e63fbb68d77"}, + {file = "phonenumbers-8.13.42-py2.py3-none-any.whl", hash = "sha256:18acc22ee03116d27b26e990f53806a1770a3e05f05e1620bc09ad187f889456"}, + {file = "phonenumbers-8.13.42.tar.gz", hash = "sha256:7137904f2db3b991701e853174ce8e1cb8f540b8bfdf27617540de04c0b7bed5"}, ] [[package]] @@ -2649,24 +2649,24 @@ files = [ [[package]] name = "towncrier" -version = "23.11.0" +version = "24.7.1" description = "Building newsfiles for your project." optional = false python-versions = ">=3.8" files = [ - {file = "towncrier-23.11.0-py3-none-any.whl", hash = "sha256:2e519ca619426d189e3c98c99558fe8be50c9ced13ea1fc20a4a353a95d2ded7"}, - {file = "towncrier-23.11.0.tar.gz", hash = "sha256:13937c247e3f8ae20ac44d895cf5f96a60ad46cfdcc1671759530d7837d9ee5d"}, + {file = "towncrier-24.7.1-py3-none-any.whl", hash = "sha256:685e2a94335b5dc47537b4d3b449a25b18571ea85b07dcf6e8df31ba40f692dd"}, + {file = "towncrier-24.7.1.tar.gz", hash = "sha256:57a057faedabcadf1a62f6f9bad726ae566c1f31a411338ddb8316993f583b3d"}, ] [package.dependencies] click = "*" +importlib-metadata = {version = ">=4.6", markers = "python_version < \"3.10\""} importlib-resources = {version = ">=5", markers = "python_version < \"3.10\""} -incremental = "*" jinja2 = "*" tomli = {version = "*", markers = "python_version < \"3.11\""} [package.extras] -dev = ["furo", "packaging", "sphinx (>=5)", "twisted"] +dev = ["furo (>=2024.05.06)", "nox", "packaging", "sphinx (>=5)", "twisted"] [[package]] name = "treq" @@ -3196,4 +3196,4 @@ user-search = ["pyicu"] [metadata] lock-version = "2.0" python-versions = "^3.8.0" -content-hash = "5f458ce53b7469844af2e0c5a9c5ef720736de5f080c4eb8d3a0e60286424f44" +content-hash = "c165cdc1f6612c9f1b5bfd8063c23e2d595d717dd8ac1a468519e902be2cdf93" diff --git a/pyproject.toml b/pyproject.toml index c8373c6dbc..c29d1534fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust" [tool.poetry] name = "matrix-synapse" -version = "1.112.0" +version = "1.113.0rc1" description = "Homeserver for the Matrix decentralised comms protocol" authors = ["Matrix.org Team and Contributors <packages@matrix.org>"] license = "AGPL-3.0-or-later" diff --git a/synapse/api/errors.py b/synapse/api/errors.py index dd4a1ae706..e6efa7a424 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -128,6 +128,10 @@ class Codes(str, Enum): # MSC2677 DUPLICATE_ANNOTATION = "M_DUPLICATE_ANNOTATION" + # MSC3575 we are telling the client they need to expire their sliding sync + # connection. + UNKNOWN_POS = "M_UNKNOWN_POS" + class CodeMessageException(RuntimeError): """An exception with integer code, a message string attributes and optional headers. @@ -847,3 +851,17 @@ class PartialStateConflictError(SynapseError): msg=PartialStateConflictError.message(), errcode=Codes.UNKNOWN, ) + + +class SlidingSyncUnknownPosition(SynapseError): + """An error that Synapse can return to signal to the client to expire their + sliding sync connection (i.e. send a new request without a `?since=` + param). + """ + + def __init__(self) -> None: + super().__init__( + HTTPStatus.BAD_REQUEST, + msg="Unknown position", + errcode=Codes.UNKNOWN_POS, + ) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index ec35784c5f..b44e862493 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -197,8 +197,14 @@ class AdminHandler: # events that we have and then filtering, this isn't the most # efficient method perhaps but it does guarantee we get everything. while True: - events, _ = await self._store.paginate_room_events( - room_id, from_key, to_key, limit=100, direction=Direction.FORWARDS + events, _ = ( + await self._store.paginate_room_events_by_topological_ordering( + room_id=room_id, + from_key=from_key, + to_key=to_key, + limit=100, + direction=Direction.FORWARDS, + ) ) if not events: break diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 872c85fbad..6fd7afa280 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -507,13 +507,15 @@ class PaginationHandler: # Initially fetch the events from the database. With any luck, we can return # these without blocking on backfill (handled below). - events, next_key = await self.store.paginate_room_events( - room_id=room_id, - from_key=from_token.room_key, - to_key=to_room_key, - direction=pagin_config.direction, - limit=pagin_config.limit, - event_filter=event_filter, + events, next_key = ( + await self.store.paginate_room_events_by_topological_ordering( + room_id=room_id, + from_key=from_token.room_key, + to_key=to_room_key, + direction=pagin_config.direction, + limit=pagin_config.limit, + event_filter=event_filter, + ) ) if pagin_config.direction == Direction.BACKWARDS: @@ -582,13 +584,15 @@ class PaginationHandler: # If we did backfill something, refetch the events from the database to # catch anything new that might have been added since we last fetched. if did_backfill: - events, next_key = await self.store.paginate_room_events( - room_id=room_id, - from_key=from_token.room_key, - to_key=to_room_key, - direction=pagin_config.direction, - limit=pagin_config.limit, - event_filter=event_filter, + events, next_key = ( + await self.store.paginate_room_events_by_topological_ordering( + room_id=room_id, + from_key=from_token.room_key, + to_key=to_room_key, + direction=pagin_config.direction, + limit=pagin_config.limit, + event_filter=event_filter, + ) ) else: # Otherwise, we can backfill in the background for eventual diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 262d9f4044..2c6e672ede 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1750,7 +1750,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): from_key=from_key, to_key=to_key, limit=limit or 10, - order="ASC", + direction=Direction.FORWARDS, ) events = list(room_events) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 85e1364c97..a702922288 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -47,16 +47,27 @@ from synapse.api.constants import ( EventTypes, Membership, ) +from synapse.api.errors import SlidingSyncUnknownPosition from synapse.events import EventBase, StrippedStateEvent from synapse.events.utils import parse_stripped_state_event, strip_event from synapse.handlers.relations import BundledAggregations -from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace +from synapse.logging.opentracing import ( + SynapseTags, + log_kv, + set_tag, + start_active_span, + tag_args, + trace, +) from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary from synapse.storage.databases.main.state import ( ROOM_UNKNOWN_SENTINEL, Sentinel as StateSentinel, ) -from synapse.storage.databases.main.stream import CurrentStateDeltaMembership +from synapse.storage.databases.main.stream import ( + CurrentStateDeltaMembership, + PaginateFunction, +) from synapse.storage.roommember import MemberSummary from synapse.types import ( DeviceListUpdates, @@ -491,6 +502,22 @@ class SlidingSyncHandler: # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() + if from_token: + # Check that we recognize the connection position, if not tell the + # clients that they need to start again. + # + # If we don't do this and the client asks for the full range of + # rooms, we end up sending down all rooms and their state from + # scratch (which can be very slow). By expiring the connection we + # allow the client a chance to do an initial request with a smaller + # range of rooms to get them some results sooner but will end up + # taking the same amount of time (more with round-trips and + # re-processing) in the end to get everything again. + if not await self.connection_store.is_valid_token( + sync_config, from_token.connection_position + ): + raise SlidingSyncUnknownPosition() + await self.connection_store.mark_token_seen( sync_config=sync_config, from_token=from_token, @@ -516,131 +543,153 @@ class SlidingSyncHandler: lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} # Keep track of the rooms that we can display and need to fetch more info about relevant_room_map: Dict[str, RoomSyncConfig] = {} + # The set of room IDs of all rooms that could appear in any list. These + # include rooms that are outside the list ranges. + all_rooms: Set[str] = set() if has_lists and sync_config.lists is not None: - sync_room_map = await self.filter_rooms_relevant_for_sync( - user=sync_config.user, - room_membership_for_user_map=room_membership_for_user_map, - ) + with start_active_span("assemble_sliding_window_lists"): + sync_room_map = await self.filter_rooms_relevant_for_sync( + user=sync_config.user, + room_membership_for_user_map=room_membership_for_user_map, + ) - for list_key, list_config in sync_config.lists.items(): - # Apply filters - filtered_sync_room_map = sync_room_map + for list_key, list_config in sync_config.lists.items(): + # Apply filters + filtered_sync_room_map = sync_room_map + if list_config.filters is not None: + filtered_sync_room_map = await self.filter_rooms( + sync_config.user, + sync_room_map, + list_config.filters, + to_token, + ) - if list_config.filters: + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_room_map = ( + await self.store.is_partial_state_room_batched( + filtered_sync_room_map.keys() + ) + ) - filtered_sync_room_map = await self.filter_rooms( - sync_config.user, - filtered_sync_room_map, - list_config.filters, - to_token, + # Since creating the `RoomSyncConfig` takes some work, let's just do it + # once and make a copy whenever we need it. + room_sync_config = RoomSyncConfig.from_room_config(list_config) + membership_state_keys = room_sync_config.required_state_map.get( + EventTypes.Member + ) + # Also see `StateFilter.must_await_full_state(...)` for comparison + lazy_loading = ( + membership_state_keys is not None + and StateValues.LAZY in membership_state_keys ) - # Sort the list - sorted_room_info = await self.sort_rooms( - filtered_sync_room_map, to_token - ) + if not lazy_loading: + # Exclude partially-stated rooms unless the `required_state` + # only has `["m.room.member", "$LAZY"]` for membership + # (lazy-loading room members). + filtered_sync_room_map = { + room_id: room + for room_id, room in filtered_sync_room_map.items() + if not partial_state_room_map.get(room_id) + } - # Find which rooms are partially stated and may need to be filtered out - # depending on the `required_state` requested (see below). - partial_state_room_map = await self.store.is_partial_state_room_batched( - filtered_sync_room_map.keys() - ) + all_rooms.update(filtered_sync_room_map) - # Since creating the `RoomSyncConfig` takes some work, let's just do it - # once and make a copy whenever we need it. - room_sync_config = RoomSyncConfig.from_room_config(list_config) - membership_state_keys = room_sync_config.required_state_map.get( - EventTypes.Member - ) - # Also see `StateFilter.must_await_full_state(...)` for comparison - lazy_loading = ( - membership_state_keys is not None - and StateValues.LAZY in membership_state_keys - ) + # Sort the list + sorted_room_info = await self.sort_rooms( + filtered_sync_room_map, to_token + ) + + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] + if list_config.ranges: + for range in list_config.ranges: + room_ids_in_list: List[str] = [] - ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] - if list_config.ranges: - for range in list_config.ranges: - room_ids_in_list: List[str] = [] - - # We're going to loop through the sorted list of rooms starting - # at the range start index and keep adding rooms until we fill - # up the range or run out of rooms. - # - # Both sides of range are inclusive so we `+ 1` - max_num_rooms = range[1] - range[0] + 1 - for room_membership in sorted_room_info[range[0] :]: - room_id = room_membership.room_id - - if len(room_ids_in_list) >= max_num_rooms: - break - - # Exclude partially-stated rooms unless the `required_state` - # only has `["m.room.member", "$LAZY"]` for membership - # (lazy-loading room members). - if partial_state_room_map.get(room_id) and not lazy_loading: - continue - - # Take the superset of the `RoomSyncConfig` for each room. + # We're going to loop through the sorted list of rooms starting + # at the range start index and keep adding rooms until we fill + # up the range or run out of rooms. # - # Update our `relevant_room_map` with the room we're going - # to display and need to fetch more info about. - existing_room_sync_config = relevant_room_map.get(room_id) - if existing_room_sync_config is not None: - existing_room_sync_config.combine_room_sync_config( - room_sync_config + # Both sides of range are inclusive so we `+ 1` + max_num_rooms = range[1] - range[0] + 1 + for room_membership in sorted_room_info[range[0] :]: + room_id = room_membership.room_id + + if len(room_ids_in_list) >= max_num_rooms: + break + + # Take the superset of the `RoomSyncConfig` for each room. + # + # Update our `relevant_room_map` with the room we're going + # to display and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get( + room_id ) - else: - # Make a copy so if we modify it later, it doesn't - # affect all references. - relevant_room_map[room_id] = ( - room_sync_config.deep_copy() + if existing_room_sync_config is not None: + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + else: + # Make a copy so if we modify it later, it doesn't + # affect all references. + relevant_room_map[room_id] = ( + room_sync_config.deep_copy() + ) + + room_ids_in_list.append(room_id) + + ops.append( + SlidingSyncResult.SlidingWindowList.Operation( + op=OperationType.SYNC, + range=range, + room_ids=room_ids_in_list, ) - - room_ids_in_list.append(room_id) - - ops.append( - SlidingSyncResult.SlidingWindowList.Operation( - op=OperationType.SYNC, - range=range, - room_ids=room_ids_in_list, ) - ) - lists[list_key] = SlidingSyncResult.SlidingWindowList( - count=len(sorted_room_info), - ops=ops, - ) + lists[list_key] = SlidingSyncResult.SlidingWindowList( + count=len(sorted_room_info), + ops=ops, + ) # Handle room subscriptions if has_room_subscriptions and sync_config.room_subscriptions is not None: - for room_id, room_subscription in sync_config.room_subscriptions.items(): - room_membership_for_user_at_to_token = ( - await self.check_room_subscription_allowed_for_user( - room_id=room_id, - room_membership_for_user_map=room_membership_for_user_map, - to_token=to_token, + with start_active_span("assemble_room_subscriptions"): + for ( + room_id, + room_subscription, + ) in sync_config.room_subscriptions.items(): + room_membership_for_user_at_to_token = ( + await self.check_room_subscription_allowed_for_user( + room_id=room_id, + room_membership_for_user_map=room_membership_for_user_map, + to_token=to_token, + ) ) - ) - # Skip this room if the user isn't allowed to see it - if not room_membership_for_user_at_to_token: - continue + # Skip this room if the user isn't allowed to see it + if not room_membership_for_user_at_to_token: + continue - room_membership_for_user_map[room_id] = ( - room_membership_for_user_at_to_token - ) + all_rooms.add(room_id) - # Take the superset of the `RoomSyncConfig` for each room. - # - # Update our `relevant_room_map` with the room we're going to display - # and need to fetch more info about. - room_sync_config = RoomSyncConfig.from_room_config(room_subscription) - existing_room_sync_config = relevant_room_map.get(room_id) - if existing_room_sync_config is not None: - existing_room_sync_config.combine_room_sync_config(room_sync_config) - else: - relevant_room_map[room_id] = room_sync_config + room_membership_for_user_map[room_id] = ( + room_membership_for_user_at_to_token + ) + + # Take the superset of the `RoomSyncConfig` for each room. + # + # Update our `relevant_room_map` with the room we're going to display + # and need to fetch more info about. + room_sync_config = RoomSyncConfig.from_room_config( + room_subscription + ) + existing_room_sync_config = relevant_room_map.get(room_id) + if existing_room_sync_config is not None: + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + else: + relevant_room_map[room_id] = room_sync_config # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} @@ -649,55 +698,49 @@ class SlidingSyncHandler: # previously. # Keep track of the rooms that we're going to display and need to fetch more info about relevant_rooms_to_send_map = relevant_room_map - if from_token: - rooms_should_send = set() - - # First we check if there are rooms that match a list/room - # subscription and have updates we need to send (i.e. either because - # we haven't sent the room down, or we have but there are missing - # updates). - for room_id in relevant_room_map: - status = await self.connection_store.have_sent_room( - sync_config, - from_token.connection_position, - room_id, - ) - if ( - # The room was never sent down before so the client needs to know - # about it regardless of any updates. - status.status == HaveSentRoomFlag.NEVER - # `PREVIOUSLY` literally means the "room was sent down before *AND* - # there are updates we haven't sent down" so we already know this - # room has updates. - or status.status == HaveSentRoomFlag.PREVIOUSLY - ): - rooms_should_send.add(room_id) - elif status.status == HaveSentRoomFlag.LIVE: - # We know that we've sent all updates up until `from_token`, - # so we just need to check if there have been updates since - # then. - pass - else: - assert_never(status.status) + with start_active_span("filter_relevant_rooms_to_send"): + if from_token: + rooms_should_send = set() + + # First we check if there are rooms that match a list/room + # subscription and have updates we need to send (i.e. either because + # we haven't sent the room down, or we have but there are missing + # updates). + for room_id in relevant_room_map: + status = await self.connection_store.have_sent_room( + sync_config, + from_token.connection_position, + room_id, + ) + if ( + # The room was never sent down before so the client needs to know + # about it regardless of any updates. + status.status == HaveSentRoomFlag.NEVER + # `PREVIOUSLY` literally means the "room was sent down before *AND* + # there are updates we haven't sent down" so we already know this + # room has updates. + or status.status == HaveSentRoomFlag.PREVIOUSLY + ): + rooms_should_send.add(room_id) + elif status.status == HaveSentRoomFlag.LIVE: + # We know that we've sent all updates up until `from_token`, + # so we just need to check if there have been updates since + # then. + pass + else: + assert_never(status.status) - if status.timeline_limit is not None and ( - status.timeline_limit < relevant_room_map[room_id].timeline_limit - ): - # If the timeline limit has increased we want to send down - # more historic events (even if nothing has since changed). - rooms_should_send.add(room_id) - - # We only need to check for new events since any state changes - # will also come down as new events. - rooms_that_have_updates = self.store.get_rooms_that_might_have_updates( - relevant_room_map.keys(), from_token.stream_token.room_key - ) - rooms_should_send.update(rooms_that_have_updates) - relevant_rooms_to_send_map = { - room_id: room_sync_config - for room_id, room_sync_config in relevant_room_map.items() - if room_id in rooms_should_send - } + # We only need to check for new events since any state changes + # will also come down as new events. + rooms_that_have_updates = self.store.get_rooms_that_might_have_updates( + relevant_room_map.keys(), from_token.stream_token.room_key + ) + rooms_should_send.update(rooms_that_have_updates) + relevant_rooms_to_send_map = { + room_id: room_sync_config + for room_id, room_sync_config in relevant_room_map.items() + if room_id in rooms_should_send + } @trace @tag_args @@ -736,13 +779,41 @@ class SlidingSyncHandler: ) if has_lists or has_room_subscriptions: + # We now calculate if any rooms outside the range have had updates, + # which we are not sending down. + # + # We *must* record rooms that have had updates, but it is also fine + # to record rooms as having updates even if there might not actually + # be anything new for the user (e.g. due to event filters, events + # having happened after the user left, etc). + unsent_room_ids = [] + if from_token: + # The set of rooms that the client (may) care about, but aren't + # in any list range (or subscribed to). + missing_rooms = all_rooms - relevant_room_map.keys() + + # We now just go and try fetching any events in the above rooms + # to see if anything has happened since the `from_token`. + # + # TODO: Replace this with something faster. When we land the + # sliding sync tables that record the most recent event + # positions we can use that. + missing_event_map_by_room = ( + await self.store.get_room_events_stream_for_rooms( + room_ids=missing_rooms, + from_key=to_token.room_key, + to_key=from_token.stream_token.room_key, + limit=1, + ) + ) + unsent_room_ids = list(missing_event_map_by_room) + connection_position = await self.connection_store.record_rooms( sync_config=sync_config, room_configs=relevant_room_map, from_token=from_token, sent_room_ids=relevant_rooms_to_send_map.keys(), - # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids` - unsent_room_ids=[], + unsent_room_ids=unsent_room_ids, ) elif from_token: connection_position = from_token.connection_position @@ -750,13 +821,20 @@ class SlidingSyncHandler: # Initial sync without a `from_token` starts at `0` connection_position = 0 - return SlidingSyncResult( + sliding_sync_result = SlidingSyncResult( next_pos=SlidingSyncStreamToken(to_token, connection_position), lists=lists, rooms=rooms, extensions=extensions, ) + # Make it easy to find traces for syncs that aren't empty + set_tag(SynapseTags.RESULT_PREFIX + "result", bool(sliding_sync_result)) + set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id) + + return sliding_sync_result + + @trace async def get_room_membership_for_user_at_to_token( self, user: UserID, @@ -1095,6 +1173,7 @@ class SlidingSyncHandler: return sync_room_id_set + @trace async def filter_rooms_relevant_for_sync( self, user: UserID, @@ -1205,6 +1284,7 @@ class SlidingSyncHandler: # return None + @trace async def _bulk_get_stripped_state_for_rooms_from_sync_room_map( self, room_ids: StrCollection, @@ -1295,6 +1375,7 @@ class SlidingSyncHandler: return room_id_to_stripped_state_map + @trace async def _bulk_get_partial_current_state_content_for_rooms( self, content_type: Literal[ @@ -1494,125 +1575,132 @@ class SlidingSyncHandler: # Filter for Direct-Message (DM) rooms if filters.is_dm is not None: - if filters.is_dm: - # Only DM rooms please - filtered_room_id_set = { - room_id - for room_id in filtered_room_id_set - if sync_room_map[room_id].is_dm - } - else: - # Only non-DM rooms please - filtered_room_id_set = { - room_id - for room_id in filtered_room_id_set - if not sync_room_map[room_id].is_dm - } + with start_active_span("filters.is_dm"): + if filters.is_dm: + # Only DM rooms please + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + if sync_room_map[room_id].is_dm + } + else: + # Only non-DM rooms please + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + if not sync_room_map[room_id].is_dm + } if filters.spaces is not None: - raise NotImplementedError() + with start_active_span("filters.spaces"): + raise NotImplementedError() # Filter for encrypted rooms if filters.is_encrypted is not None: - room_id_to_encryption = ( - await self._bulk_get_partial_current_state_content_for_rooms( - content_type="room_encryption", - room_ids=filtered_room_id_set, - to_token=to_token, - sync_room_map=sync_room_map, - room_id_to_stripped_state_map=room_id_to_stripped_state_map, + with start_active_span("filters.is_encrypted"): + room_id_to_encryption = ( + await self._bulk_get_partial_current_state_content_for_rooms( + content_type="room_encryption", + room_ids=filtered_room_id_set, + to_token=to_token, + sync_room_map=sync_room_map, + room_id_to_stripped_state_map=room_id_to_stripped_state_map, + ) ) - ) - # Make a copy so we don't run into an error: `Set changed size during - # iteration`, when we filter out and remove items - for room_id in filtered_room_id_set.copy(): - encryption = room_id_to_encryption.get(room_id, ROOM_UNKNOWN_SENTINEL) + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + encryption = room_id_to_encryption.get( + room_id, ROOM_UNKNOWN_SENTINEL + ) - # Just remove rooms if we can't determine their encryption status - if encryption is ROOM_UNKNOWN_SENTINEL: - filtered_room_id_set.remove(room_id) - continue + # Just remove rooms if we can't determine their encryption status + if encryption is ROOM_UNKNOWN_SENTINEL: + filtered_room_id_set.remove(room_id) + continue - # If we're looking for encrypted rooms, filter out rooms that are not - # encrypted and vice versa - is_encrypted = encryption is not None - if (filters.is_encrypted and not is_encrypted) or ( - not filters.is_encrypted and is_encrypted - ): - filtered_room_id_set.remove(room_id) + # If we're looking for encrypted rooms, filter out rooms that are not + # encrypted and vice versa + is_encrypted = encryption is not None + if (filters.is_encrypted and not is_encrypted) or ( + not filters.is_encrypted and is_encrypted + ): + filtered_room_id_set.remove(room_id) # Filter for rooms that the user has been invited to if filters.is_invite is not None: - # Make a copy so we don't run into an error: `Set changed size during - # iteration`, when we filter out and remove items - for room_id in filtered_room_id_set.copy(): - room_for_user = sync_room_map[room_id] - # If we're looking for invite rooms, filter out rooms that the user is - # not invited to and vice versa - if ( - filters.is_invite and room_for_user.membership != Membership.INVITE - ) or ( - not filters.is_invite - and room_for_user.membership == Membership.INVITE - ): - filtered_room_id_set.remove(room_id) + with start_active_span("filters.is_invite"): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_for_user = sync_room_map[room_id] + # If we're looking for invite rooms, filter out rooms that the user is + # not invited to and vice versa + if ( + filters.is_invite + and room_for_user.membership != Membership.INVITE + ) or ( + not filters.is_invite + and room_for_user.membership == Membership.INVITE + ): + filtered_room_id_set.remove(room_id) # Filter by room type (space vs room, etc). A room must match one of the types # provided in the list. `None` is a valid type for rooms which do not have a # room type. if filters.room_types is not None or filters.not_room_types is not None: - room_id_to_type = ( - await self._bulk_get_partial_current_state_content_for_rooms( - content_type="room_type", - room_ids=filtered_room_id_set, - to_token=to_token, - sync_room_map=sync_room_map, - room_id_to_stripped_state_map=room_id_to_stripped_state_map, + with start_active_span("filters.room_types"): + room_id_to_type = ( + await self._bulk_get_partial_current_state_content_for_rooms( + content_type="room_type", + room_ids=filtered_room_id_set, + to_token=to_token, + sync_room_map=sync_room_map, + room_id_to_stripped_state_map=room_id_to_stripped_state_map, + ) ) - ) - # Make a copy so we don't run into an error: `Set changed size during - # iteration`, when we filter out and remove items - for room_id in filtered_room_id_set.copy(): - room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL) + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL) - # Just remove rooms if we can't determine their type - if room_type is ROOM_UNKNOWN_SENTINEL: - filtered_room_id_set.remove(room_id) - continue + # Just remove rooms if we can't determine their type + if room_type is ROOM_UNKNOWN_SENTINEL: + filtered_room_id_set.remove(room_id) + continue - if ( - filters.room_types is not None - and room_type not in filters.room_types - ): - filtered_room_id_set.remove(room_id) + if ( + filters.room_types is not None + and room_type not in filters.room_types + ): + filtered_room_id_set.remove(room_id) - if ( - filters.not_room_types is not None - and room_type in filters.not_room_types - ): - filtered_room_id_set.remove(room_id) + if ( + filters.not_room_types is not None + and room_type in filters.not_room_types + ): + filtered_room_id_set.remove(room_id) if filters.room_name_like is not None: - # TODO: The room name is a bit more sensitive to leak than the - # create/encryption event. Maybe we should consider a better way to fetch - # historical state before implementing this. - # - # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms( - # content_type="room_name", - # room_ids=filtered_room_id_set, - # to_token=to_token, - # sync_room_map=sync_room_map, - # room_id_to_stripped_state_map=room_id_to_stripped_state_map, - # ) - raise NotImplementedError() - - if filters.tags is not None: - raise NotImplementedError() - - if filters.not_tags is not None: - raise NotImplementedError() + with start_active_span("filters.room_name_like"): + # TODO: The room name is a bit more sensitive to leak than the + # create/encryption event. Maybe we should consider a better way to fetch + # historical state before implementing this. + # + # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms( + # content_type="room_name", + # room_ids=filtered_room_id_set, + # to_token=to_token, + # sync_room_map=sync_room_map, + # room_id_to_stripped_state_map=room_id_to_stripped_state_map, + # ) + raise NotImplementedError() + + if filters.tags is not None or filters.not_tags is not None: + with start_active_span("filters.tags"): + raise NotImplementedError() # Assemble a new sync room map but only with the `filtered_room_id_set` return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} @@ -1674,6 +1762,7 @@ class SlidingSyncHandler: reverse=True, ) + @trace async def get_current_state_ids_at( self, room_id: str, @@ -1738,6 +1827,7 @@ class SlidingSyncHandler: return state_ids + @trace async def get_current_state_at( self, room_id: str, @@ -1799,19 +1889,27 @@ class SlidingSyncHandler: """ user = sync_config.user + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "membership", + room_membership_for_user_at_to_token.membership, + ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "timeline_limit", + room_sync_config.timeline_limit, + ) + # Determine whether we should limit the timeline to the token range. # # We should return historical messages (before token range) in the # following cases because we want clients to be able to show a basic # screen of information: + # # - Initial sync (because no `from_token` to limit us anyway) # - When users `newly_joined` # - For an incremental sync where we haven't sent it down this # connection before # - # We also decide if we should ignore the timeline bound or not. This is - # to handle the case where the client has requested more historical - # messages in the room by increasing the timeline limit. + # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 from_bound = None ignore_timeline_bound = False initial = True @@ -1889,11 +1987,36 @@ class SlidingSyncHandler: room_membership_for_user_at_to_token.event_pos.to_room_stream_token() ) - fiddled_timeline_limit = room_sync_config.timeline_limit - # if to_bound: - # fiddled_timeline_limit = max(fiddled_timeline_limit, 10) - - timeline_events, new_room_key = await self.store.paginate_room_events( + # For initial `/sync` (and other historical scenarios mentioned above), we + # want to view a historical section of the timeline; to fetch events by + # `topological_ordering` (best representation of the room DAG as others were + # seeing it at the time). This also aligns with the order that `/messages` + # returns events in. + # + # For incremental `/sync`, we want to get all updates for rooms since + # the last `/sync` (regardless if those updates arrived late or happened + # a while ago in the past); to fetch events by `stream_ordering` (in the + # order they were received by the server). + # + # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 + # + # FIXME: Using workaround for mypy, + # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and + # https://github.com/python/mypy/issues/17479 + paginate_room_events_by_topological_ordering: PaginateFunction = ( + self.store.paginate_room_events_by_topological_ordering + ) + paginate_room_events_by_stream_ordering: PaginateFunction = ( + self.store.paginate_room_events_by_stream_ordering + ) + pagination_method: PaginateFunction = ( + # Use `topographical_ordering` for historical events + paginate_room_events_by_topological_ordering + if from_bound is None + # Use `stream_ordering` for updates + else paginate_room_events_by_stream_ordering + ) + timeline_events, new_room_key = await pagination_method( room_id=room_id, # The bounds are reversed so we can paginate backwards # (from newer to older events) starting at to_bound. @@ -1903,8 +2026,7 @@ class SlidingSyncHandler: direction=Direction.BACKWARDS, # We add one so we can determine if there are enough events to saturate # the limit or not (see `limited`) - limit=fiddled_timeline_limit + 1, - event_filter=None, + limit=room_sync_config.timeline_limit + 1, ) # We want to return the events in ascending order (the last event is the @@ -1914,11 +2036,11 @@ class SlidingSyncHandler: # Determine our `limited` status based on the timeline. We do this before # filtering the events so we can accurately determine if there is more to # paginate even if we filter out some/all events. - if len(timeline_events) > fiddled_timeline_limit: + if len(timeline_events) > room_sync_config.timeline_limit: limited = True # Get rid of that extra "+ 1" event because we only used it to determine # if we hit the limit or not - timeline_events = timeline_events[-fiddled_timeline_limit:] + timeline_events = timeline_events[-room_sync_config.timeline_limit :] assert timeline_events[0].internal_metadata.stream_ordering new_room_key = RoomStreamToken( stream=timeline_events[0].internal_metadata.stream_ordering - 1 @@ -2091,6 +2213,10 @@ class SlidingSyncHandler: if StateValues.WILDCARD in room_sync_config.required_state_map.get( StateValues.WILDCARD, set() ): + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard", + True, + ) required_state_filter = StateFilter.all() # TODO: `StateFilter` currently doesn't support wildcard event types. We're # currently working around this by returning all state to the client but it @@ -2100,6 +2226,10 @@ class SlidingSyncHandler: room_sync_config.required_state_map.get(StateValues.WILDCARD) is not None ): + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard_event_type", + True, + ) required_state_filter = StateFilter.all() else: required_state_types: List[Tuple[str, Optional[str]]] = [] @@ -2107,8 +2237,12 @@ class SlidingSyncHandler: state_type, state_key_set, ) in room_sync_config.required_state_map.items(): + num_wild_state_keys = 0 + lazy_load_room_members = False + num_others = 0 for state_key in state_key_set: if state_key == StateValues.WILDCARD: + num_wild_state_keys += 1 # `None` is a wildcard in the `StateFilter` required_state_types.append((state_type, None)) # We need to fetch all relevant people when we're lazy-loading membership @@ -2116,6 +2250,7 @@ class SlidingSyncHandler: state_type == EventTypes.Member and state_key == StateValues.LAZY ): + lazy_load_room_members = True # Everyone in the timeline is relevant timeline_membership: Set[str] = set() if timeline_events is not None: @@ -2130,10 +2265,26 @@ class SlidingSyncHandler: # FIXME: We probably also care about invite, ban, kick, targets, etc # but the spec only mentions "senders". elif state_key == StateValues.ME: + num_others += 1 required_state_types.append((state_type, user.to_string())) else: + num_others += 1 required_state_types.append((state_type, state_key)) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + + "required_state_wildcard_state_key_count", + num_wild_state_keys, + ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "required_state_lazy", + lazy_load_room_members, + ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "required_state_other_count", + num_others, + ) + required_state_filter = StateFilter.from_types(required_state_types) # We need this base set of info for the response so let's just fetch it along @@ -2242,6 +2393,8 @@ class SlidingSyncHandler: if new_bump_event_pos.stream > 0: bump_stamp = new_bump_event_pos.stream + set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) + return SlidingSyncResult.RoomResult( name=room_name, avatar=room_avatar, @@ -2880,6 +3033,16 @@ class SlidingSyncConnectionStore: attr.Factory(dict) ) + async def is_valid_token( + self, sync_config: SlidingSyncConfig, connection_token: int + ) -> bool: + """Return whether the connection token is valid/recognized""" + if connection_token == 0: + return True + + conn_key = self._get_connection_key(sync_config) + return connection_token in self._connections.get(conn_key, {}) + async def have_sent_room( self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str ) -> HaveSentRoom: @@ -2895,6 +3058,7 @@ class SlidingSyncConnectionStore: return room_status + @trace async def record_rooms( self, sync_config: SlidingSyncConfig, @@ -2966,9 +3130,10 @@ class SlidingSyncConnectionStore: prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE ): + assert prev_state.timeline_limit is not None new_room_statuses[room_id] = HaveSentRoom.previously( from_token.stream_token.room_key, - room_configs[room_id].timeline_limit, + prev_state.timeline_limit, ) have_updated = True @@ -2979,6 +3144,7 @@ class SlidingSyncConnectionStore: return new_store_token + @trace async def mark_token_seen( self, sync_config: SlidingSyncConfig, diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ede014180c..6af2eeb75f 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -43,6 +43,7 @@ from prometheus_client import Counter from synapse.api.constants import ( AccountDataTypes, + Direction, EventContentFields, EventTypes, JoinRules, @@ -64,6 +65,7 @@ from synapse.logging.opentracing import ( ) from synapse.storage.databases.main.event_push_actions import RoomNotifCounts from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary +from synapse.storage.databases.main.stream import PaginateFunction from synapse.storage.roommember import MemberSummary from synapse.types import ( DeviceListUpdates, @@ -879,22 +881,49 @@ class SyncHandler: since_key = since_token.room_key while limited and len(recents) < timeline_limit and max_repeat: - # If we have a since_key then we are trying to get any events - # that have happened since `since_key` up to `end_key`, so we - # can just use `get_room_events_stream_for_room`. - # Otherwise, we want to return the last N events in the room - # in topological ordering. - if since_key: - events, end_key = await self.store.get_room_events_stream_for_room( - room_id, - limit=load_limit + 1, - from_key=since_key, - to_key=end_key, - ) - else: - events, end_key = await self.store.get_recent_events_for_room( - room_id, limit=load_limit + 1, end_token=end_key - ) + # For initial `/sync`, we want to view a historical section of the + # timeline; to fetch events by `topological_ordering` (best + # representation of the room DAG as others were seeing it at the time). + # This also aligns with the order that `/messages` returns events in. + # + # For incremental `/sync`, we want to get all updates for rooms since + # the last `/sync` (regardless if those updates arrived late or happened + # a while ago in the past); to fetch events by `stream_ordering` (in the + # order they were received by the server). + # + # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 + # + # FIXME: Using workaround for mypy, + # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and + # https://github.com/python/mypy/issues/17479 + paginate_room_events_by_topological_ordering: PaginateFunction = ( + self.store.paginate_room_events_by_topological_ordering + ) + paginate_room_events_by_stream_ordering: PaginateFunction = ( + self.store.paginate_room_events_by_stream_ordering + ) + pagination_method: PaginateFunction = ( + # Use `topographical_ordering` for historical events + paginate_room_events_by_topological_ordering + if since_key is None + # Use `stream_ordering` for updates + else paginate_room_events_by_stream_ordering + ) + events, end_key = await pagination_method( + room_id=room_id, + # The bounds are reversed so we can paginate backwards + # (from newer to older events) starting at to_bound. + # This ensures we fill the `limit` with the newest events first, + from_key=end_key, + to_key=since_key, + direction=Direction.BACKWARDS, + # We add one so we can determine if there are enough events to saturate + # the limit or not (see `limited`) + limit=load_limit + 1, + ) + # We want to return the events in ascending order (the last event is the + # most recent). + events.reverse() log_kv({"loaded_recents": len(events)}) @@ -2641,9 +2670,10 @@ class SyncHandler: # a "gap" in the timeline, as described by the spec for /sync. room_to_events = await self.store.get_room_events_stream_for_rooms( room_ids=sync_result_builder.joined_room_ids, - from_key=since_token.room_key, - to_key=now_token.room_key, + from_key=now_token.room_key, + to_key=since_token.room_key, limit=timeline_limit + 1, + direction=Direction.BACKWARDS, ) # We loop through all room ids, even if there are no new events, in case @@ -2654,6 +2684,9 @@ class SyncHandler: newly_joined = room_id in newly_joined_rooms if room_entry: events, start_key = room_entry + # We want to return the events in ascending order (the last event is the + # most recent). + events.reverse() prev_batch_token = now_token.copy_and_replace( StreamKeyType.ROOM, start_key diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index c29f365f27..18142d1c65 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -899,6 +899,9 @@ class SlidingSyncRestServlet(RestServlet): body = parse_and_validate_json_object_from_request(request, SlidingSyncBody) # Tag and log useful data to differentiate requests. + set_tag( + "sliding_sync.sync_type", "initial" if from_token is None else "incremental" + ) set_tag("sliding_sync.conn_id", body.conn_id or "") log_kv( { @@ -912,6 +915,12 @@ class SlidingSyncRestServlet(RestServlet): "sliding_sync.room_subscriptions": list( (body.room_subscriptions or {}).keys() ), + # We also include the number of room subscriptions because logs are + # limited to 1024 characters and the large room ID list above can be cut + # off. + "sliding_sync.num_room_subscriptions": len( + (body.room_subscriptions or {}).keys() + ), } ) diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 640ab123f0..1d9f0f52e1 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -39,6 +39,7 @@ from typing import ( import attr from synapse.api.constants import EventTypes, Membership +from synapse.logging.opentracing import trace from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -422,6 +423,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): return invite return None + @trace async def get_rooms_for_local_user_where_membership_is( self, user_id: str, diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py index da3ebe66b8..9ed39e688a 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py @@ -24,6 +24,7 @@ from typing import List, Optional, Tuple import attr +from synapse.logging.opentracing import trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main.stream import _filter_results_by_stream @@ -159,6 +160,7 @@ class StateDeltasStore(SQLBaseStore): self._get_max_stream_id_in_current_state_deltas_txn, ) + @trace async def get_current_state_deltas_for_room( self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken ) -> List[StateDelta]: diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 430c837828..baa23b1bfc 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -50,8 +50,8 @@ from typing import ( Dict, Iterable, List, - Mapping, Optional, + Protocol, Set, Tuple, cast, @@ -60,7 +60,7 @@ from typing import ( import attr from immutabledict import immutabledict -from typing_extensions import Literal +from typing_extensions import Literal, assert_never from twisted.internet import defer @@ -68,7 +68,7 @@ from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background -from synapse.logging.opentracing import trace +from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -85,7 +85,7 @@ from synapse.types import ( RoomStreamToken, StrCollection, ) -from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter @@ -103,6 +103,18 @@ _STREAM_TOKEN = "stream" _TOPOLOGICAL_TOKEN = "topological" +class PaginateFunction(Protocol): + async def __call__( + self, + *, + room_id: str, + from_key: RoomStreamToken, + to_key: Optional[RoomStreamToken] = None, + direction: Direction = Direction.BACKWARDS, + limit: int = 0, + ) -> Tuple[List[EventBase], RoomStreamToken]: ... + + # Used as return values for pagination APIs @attr.s(slots=True, frozen=True, auto_attribs=True) class _EventDictReturn: @@ -286,7 +298,7 @@ def generate_pagination_bounds( def generate_next_token( - direction: Direction, last_topo_ordering: int, last_stream_ordering: int + direction: Direction, last_topo_ordering: Optional[int], last_stream_ordering: int ) -> RoomStreamToken: """ Generate the next room stream token based on the currently returned data. @@ -453,7 +465,6 @@ def _filter_results_by_stream( The `instance_name` arg is optional to handle historic rows, and is interpreted as if it was "master". """ - if instance_name is None: instance_name = "master" @@ -670,33 +681,43 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): async def get_room_events_stream_for_rooms( self, + *, room_ids: Collection[str], from_key: RoomStreamToken, - to_key: RoomStreamToken, + to_key: Optional[RoomStreamToken] = None, + direction: Direction = Direction.BACKWARDS, limit: int = 0, - order: str = "DESC", ) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]: """Get new room events in stream ordering since `from_key`. Args: room_ids - from_key: Token from which no events are returned before - to_key: Token from which no events are returned after. (This - is typically the current stream token) + from_key: The token to stream from (starting point and heading in the given + direction) + to_key: The token representing the end stream position (end point) limit: Maximum number of events to return - order: Either "DESC" or "ASC". Determines which events are - returned when the result is limited. If "DESC" then the most - recent `limit` events are returned, otherwise returns the - oldest `limit` events. + direction: Indicates whether we are paginating forwards or backwards + from `from_key`. Returns: A map from room id to a tuple containing: - list of recent events in the room - stream ordering key for the start of the chunk of events returned. + + When Direction.FORWARDS: from_key < x <= to_key, (ascending order) + When Direction.BACKWARDS: from_key >= x > to_key, (descending order) """ - room_ids = self._events_stream_cache.get_entities_changed( - room_ids, from_key.stream - ) + if direction == Direction.FORWARDS: + room_ids = self._events_stream_cache.get_entities_changed( + room_ids, from_key.stream + ) + elif direction == Direction.BACKWARDS: + if to_key is not None: + room_ids = self._events_stream_cache.get_entities_changed( + room_ids, to_key.stream + ) + else: + assert_never(direction) if not room_ids: return {} @@ -708,12 +729,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): defer.gatherResults( [ run_in_background( - self.get_room_events_stream_for_room, - room_id, - from_key, - to_key, - limit, - order=order, + self.paginate_room_events_by_stream_ordering, + room_id=room_id, + from_key=from_key, + to_key=to_key, + direction=direction, + limit=limit, ) for room_id in rm_ids ], @@ -737,69 +758,122 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if self._events_stream_cache.has_entity_changed(room_id, from_id) } - async def get_room_events_stream_for_room( + async def paginate_room_events_by_stream_ordering( self, + *, room_id: str, from_key: RoomStreamToken, - to_key: RoomStreamToken, + to_key: Optional[RoomStreamToken] = None, + direction: Direction = Direction.BACKWARDS, limit: int = 0, - order: str = "DESC", ) -> Tuple[List[EventBase], RoomStreamToken]: - """Get new room events in stream ordering since `from_key`. + """ + Paginate events by `stream_ordering` in the room from the `from_key` in the + given `direction` to the `to_key` or `limit`. Args: room_id - from_key: Token from which no events are returned before - to_key: Token from which no events are returned after. (This - is typically the current stream token) + from_key: The token to stream from (starting point and heading in the given + direction) + to_key: The token representing the end stream position (end point) + direction: Indicates whether we are paginating forwards or backwards + from `from_key`. limit: Maximum number of events to return - order: Either "DESC" or "ASC". Determines which events are - returned when the result is limited. If "DESC" then the most - recent `limit` events are returned, otherwise returns the - oldest `limit` events. Returns: - The list of events (in ascending stream order) and the token from the start - of the chunk of events returned. + The results as a list of events and a token that points to the end + of the result set. If no events are returned then the end of the + stream has been reached (i.e. there are no events between `from_key` + and `to_key`). + + When Direction.FORWARDS: from_key < x <= to_key, (ascending order) + When Direction.BACKWARDS: from_key >= x > to_key, (descending order) """ - if from_key == to_key: - return [], from_key - has_changed = self._events_stream_cache.has_entity_changed( - room_id, from_key.stream - ) + # FIXME: When going forwards, we should enforce that the `to_key` is not `None` + # because we always need an upper bound when querying the events stream (as + # otherwise we'll potentially pick up events that are not fully persisted). + + # We should only be working with `stream_ordering` tokens here + assert from_key is None or from_key.topological is None + assert to_key is None or to_key.topological is None + + # We can bail early if we're looking forwards, and our `to_key` is already + # before our `from_key`. + if ( + direction == Direction.FORWARDS + and to_key is not None + and to_key.is_before_or_eq(from_key) + ): + # Token selection matches what we do below if there are no rows + return [], to_key if to_key else from_key + # Or vice-versa, if we're looking backwards and our `from_key` is already before + # our `to_key`. + elif ( + direction == Direction.BACKWARDS + and to_key is not None + and from_key.is_before_or_eq(to_key) + ): + # Token selection matches what we do below if there are no rows + return [], to_key if to_key else from_key + + # We can do a quick sanity check to see if any events have been sent in the room + # since the earlier token. + has_changed = True + if direction == Direction.FORWARDS: + has_changed = self._events_stream_cache.has_entity_changed( + room_id, from_key.stream + ) + elif direction == Direction.BACKWARDS: + if to_key is not None: + has_changed = self._events_stream_cache.has_entity_changed( + room_id, to_key.stream + ) + else: + assert_never(direction) if not has_changed: - return [], from_key + # Token selection matches what we do below if there are no rows + return [], to_key if to_key else from_key - def f(txn: LoggingTransaction) -> List[_EventDictReturn]: - # To handle tokens with a non-empty instance_map we fetch more - # results than necessary and then filter down - min_from_id = from_key.stream - max_to_id = to_key.get_max_stream_pos() + order, from_bound, to_bound = generate_pagination_bounds( + direction, from_key, to_key + ) - sql = """ - SELECT event_id, instance_name, topological_ordering, stream_ordering + bounds = generate_pagination_where_clause( + direction=direction, + # The empty string will shortcut downstream code to only use the + # `stream_ordering` column + column_names=("", "stream_ordering"), + from_token=from_bound, + to_token=to_bound, + engine=self.database_engine, + ) + + def f(txn: LoggingTransaction) -> List[_EventDictReturn]: + sql = f""" + SELECT event_id, instance_name, stream_ordering FROM events WHERE room_id = ? AND not outlier - AND stream_ordering > ? AND stream_ordering <= ? - ORDER BY stream_ordering %s LIMIT ? - """ % ( - order, - ) - txn.execute(sql, (room_id, min_from_id, max_to_id, 2 * limit)) + AND {bounds} + ORDER BY stream_ordering {order} LIMIT ? + """ + txn.execute(sql, (room_id, 2 * limit)) rows = [ _EventDictReturn(event_id, None, stream_ordering) - for event_id, instance_name, topological_ordering, stream_ordering in txn - if _filter_results( - from_key, - to_key, - instance_name, - topological_ordering, - stream_ordering, + for event_id, instance_name, stream_ordering in txn + if _filter_results_by_stream( + lower_token=( + to_key if direction == Direction.BACKWARDS else from_key + ), + upper_token=( + from_key if direction == Direction.BACKWARDS else to_key + ), + instance_name=instance_name, + stream_ordering=stream_ordering, ) ][:limit] return rows @@ -810,18 +884,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): [r.event_id for r in rows], get_prev_content=True ) - if order.lower() == "desc": - ret.reverse() - if rows: - key = RoomStreamToken(stream=min(r.stream_ordering for r in rows)) + next_key = generate_next_token( + direction=direction, + last_topo_ordering=None, + last_stream_ordering=rows[-1].stream_ordering, + ) else: - # Assume we didn't get anything because there was nothing to - # get. - key = from_key + # TODO (erikj): We should work out what to do here instead. (same as + # `_paginate_room_events_by_topological_ordering_txn(...)`) + next_key = to_key if to_key else from_key - return ret, key + return ret, next_key + @trace async def get_current_state_delta_membership_changes_for_user( self, user_id: str, @@ -1127,7 +1203,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): rows, token = await self.db_pool.runInteraction( "get_recent_event_ids_for_room", - self._paginate_room_events_txn, + self._paginate_room_events_by_topological_ordering_txn, room_id, from_token=end_token, limit=limit, @@ -1196,52 +1272,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None - @cachedList( - cached_method_name="get_max_stream_ordering_in_room", - list_name="room_ids", - ) - async def get_max_stream_ordering_in_rooms( - self, room_ids: StrCollection - ) -> Mapping[str, Optional[PersistedEventPosition]]: - """Get the positions for the latest event in a room. - - A batched version of `get_max_stream_ordering_in_room`. - """ - rows = await self.db_pool.simple_select_many_batch( - table="sliding_sync_room_metadata", - column="room_id", - iterable=room_ids, - retcols=("room_id", "instance_name", "last_stream_ordering"), - desc="get_max_stream_ordering_in_rooms", - ) - - return { - room_id: PersistedEventPosition(instance_name, stream) - for room_id, instance_name, stream in rows - } - - @cached(max_entries=10000) - async def get_max_stream_ordering_in_room( - self, - room_id: str, - ) -> Optional[PersistedEventPosition]: - """Get the position for the latest event in a room. - - Note: this may be after the current token for the room stream on this - process (e.g. due to replication lag) - """ - row = await self.db_pool.simple_select_one( - table="sliding_sync_room_metadata", - retcols=("instance_name", "last_stream_ordering"), - keyvalues={"room_id": room_id}, - allow_none=True, - desc="get_max_stream_ordering_in_room", - ) - if not row: - return None - - return PersistedEventPosition(instance_name=row[0], stream=row[1]) - + @trace async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str, @@ -1678,7 +1709,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): topological=topological_ordering, stream=stream_ordering ) - rows, start_token = self._paginate_room_events_txn( + rows, start_token = self._paginate_room_events_by_topological_ordering_txn( txn, room_id, before_token, @@ -1688,7 +1719,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) events_before = [r.event_id for r in rows] - rows, end_token = self._paginate_room_events_txn( + rows, end_token = self._paginate_room_events_by_topological_ordering_txn( txn, room_id, after_token, @@ -1851,14 +1882,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): def has_room_changed_since(self, room_id: str, stream_id: int) -> bool: return self._events_stream_cache.has_entity_changed(room_id, stream_id) - def _paginate_room_events_txn( + def _paginate_room_events_by_topological_ordering_txn( self, txn: LoggingTransaction, room_id: str, from_token: RoomStreamToken, to_token: Optional[RoomStreamToken] = None, direction: Direction = Direction.BACKWARDS, - limit: int = -1, + limit: int = 0, event_filter: Optional[Filter] = None, ) -> Tuple[List[_EventDictReturn], RoomStreamToken]: """Returns list of events before or after a given token. @@ -1880,6 +1911,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): been reached (i.e. there are no events between `from_token` and `to_token`), or `limit` is zero. """ + # We can bail early if we're looking forwards, and our `to_key` is already + # before our `from_token`. + if ( + direction == Direction.FORWARDS + and to_token is not None + and to_token.is_before_or_eq(from_token) + ): + # Token selection matches what we do below if there are no rows + return [], to_token if to_token else from_token + # Or vice-versa, if we're looking backwards and our `from_token` is already before + # our `to_token`. + elif ( + direction == Direction.BACKWARDS + and to_token is not None + and from_token.is_before_or_eq(to_token) + ): + # Token selection matches what we do below if there are no rows + return [], to_token if to_token else from_token args: List[Any] = [room_id] @@ -1964,7 +2013,6 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): "bounds": bounds, "order": order, } - txn.execute(sql, args) # Filter the result set. @@ -1996,27 +2044,30 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return rows, next_token @trace - async def paginate_room_events( + @tag_args + async def paginate_room_events_by_topological_ordering( self, + *, room_id: str, from_key: RoomStreamToken, to_key: Optional[RoomStreamToken] = None, direction: Direction = Direction.BACKWARDS, - limit: int = -1, + limit: int = 0, event_filter: Optional[Filter] = None, ) -> Tuple[List[EventBase], RoomStreamToken]: - """Returns list of events before or after a given token. - - When Direction.FORWARDS: from_key < x <= to_key - When Direction.BACKWARDS: from_key >= x > to_key + """ + Paginate events by `topological_ordering` (tie-break with `stream_ordering`) in + the room from the `from_key` in the given `direction` to the `to_key` or + `limit`. Args: room_id - from_key: The token used to stream from - to_key: A token which if given limits the results to only those before + from_key: The token to stream from (starting point and heading in the given + direction) + to_key: The token representing the end stream position (end point) direction: Indicates whether we are paginating forwards or backwards from `from_key`. - limit: The maximum number of events to return. + limit: Maximum number of events to return event_filter: If provided filters the events to those that match the filter. Returns: @@ -2024,8 +2075,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): of the result set. If no events are returned then the end of the stream has been reached (i.e. there are no events between `from_key` and `to_key`). + + When Direction.FORWARDS: from_key < x <= to_key, (ascending order) + When Direction.BACKWARDS: from_key >= x > to_key, (descending order) """ + # FIXME: When going forwards, we should enforce that the `to_key` is not `None` + # because we always need an upper bound when querying the events stream (as + # otherwise we'll potentially pick up events that are not fully persisted). + + # We have these checks outside of the transaction function (txn) to save getting + # a DB connection and switching threads if we don't need to. + # # We can bail early if we're looking forwards, and our `to_key` is already # before our `from_key`. if ( @@ -2048,8 +2109,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return [], to_key if to_key else from_key rows, token = await self.db_pool.runInteraction( - "paginate_room_events", - self._paginate_room_events_txn, + "paginate_room_events_by_topological_ordering", + self._paginate_room_events_by_topological_ordering_txn, room_id, from_key, to_key, @@ -2246,6 +2307,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return rows + @trace def get_rooms_that_might_have_updates( self, room_ids: StrCollection, from_token: RoomStreamToken ) -> StrCollection: diff --git a/tests/rest/client/sliding_sync/test_connection_tracking.py b/tests/rest/client/sliding_sync/test_connection_tracking.py index 4d8866b30a..6863c32f7c 100644 --- a/tests/rest/client/sliding_sync/test_connection_tracking.py +++ b/tests/rest/client/sliding_sync/test_connection_tracking.py @@ -21,8 +21,6 @@ import synapse.rest.admin from synapse.api.constants import EventTypes from synapse.rest.client import login, room, sync from synapse.server import HomeServer -from synapse.types import SlidingSyncStreamToken -from synapse.types.handlers import SlidingSyncConfig from synapse.util import Clock from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase @@ -130,7 +128,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase): self.helper.send(room_id1, "msg", tok=user1_tok) timeline_limit = 5 - conn_id = "conn_id" sync_body = { "lists": { "foo-list": { @@ -170,40 +167,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase): response_body["rooms"].keys(), {room_id2}, response_body["rooms"] ) - # FIXME: This is a hack to record that the first room wasn't sent down - # sync, as we don't implement that currently. - sliding_sync_handler = self.hs.get_sliding_sync_handler() - requester = self.get_success( - self.hs.get_auth().get_user_by_access_token(user1_tok) - ) - sync_config = SlidingSyncConfig( - user=requester.user, - requester=requester, - conn_id=conn_id, - ) - - parsed_initial_from_token = self.get_success( - SlidingSyncStreamToken.from_string(self.store, initial_from_token) - ) - connection_position = self.get_success( - sliding_sync_handler.connection_store.record_rooms( - sync_config, - parsed_initial_from_token, - sent_room_ids=[], - unsent_room_ids=[room_id1], - ) - ) - - # FIXME: Now fix up `from_token` with new connect position above. - parsed_from_token = self.get_success( - SlidingSyncStreamToken.from_string(self.store, from_token) - ) - parsed_from_token = SlidingSyncStreamToken( - stream_token=parsed_from_token.stream_token, - connection_position=connection_position, - ) - from_token = self.get_success(parsed_from_token.to_string(self.store)) - # We now send another event to room1, so we should sync all the missing events. resp = self.helper.send(room_id1, "msg2", tok=user1_tok) expected_events.append(resp["event_id"]) @@ -238,7 +201,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase): self.helper.send(room_id1, "msg", tok=user1_tok) - conn_id = "conn_id" sync_body = { "lists": { "foo-list": { @@ -279,40 +241,6 @@ class SlidingSyncConnectionTrackingTestCase(SlidingSyncBase): response_body["rooms"].keys(), {room_id2}, response_body["rooms"] ) - # FIXME: This is a hack to record that the first room wasn't sent down - # sync, as we don't implement that currently. - sliding_sync_handler = self.hs.get_sliding_sync_handler() - requester = self.get_success( - self.hs.get_auth().get_user_by_access_token(user1_tok) - ) - sync_config = SlidingSyncConfig( - user=requester.user, - requester=requester, - conn_id=conn_id, - ) - - parsed_initial_from_token = self.get_success( - SlidingSyncStreamToken.from_string(self.store, initial_from_token) - ) - connection_position = self.get_success( - sliding_sync_handler.connection_store.record_rooms( - sync_config, - parsed_initial_from_token, - sent_room_ids=[], - unsent_room_ids=[room_id1], - ) - ) - - # FIXME: Now fix up `from_token` with new connect position above. - parsed_from_token = self.get_success( - SlidingSyncStreamToken.from_string(self.store, from_token) - ) - parsed_from_token = SlidingSyncStreamToken( - stream_token=parsed_from_token.stream_token, - connection_position=connection_position, - ) - from_token = self.get_success(parsed_from_token.to_string(self.store)) - # We now send another event to room1, so we should sync all the missing state. self.helper.send(room_id1, "msg", tok=user1_tok) diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py index 03e36914ae..a13cad223f 100644 --- a/tests/rest/client/sliding_sync/test_rooms_required_state.py +++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py @@ -161,10 +161,10 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): self.assertIsNone(response_body["rooms"][room_id1].get("required_state")) self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) - def test_rooms_required_state_incremental_sync_restart(self) -> None: + def test_rooms_incremental_sync_restart(self) -> None: """ - Test `rooms.required_state` returns requested state events in the room during an - incremental sync, after a restart (and so the in memory caches are reset). + Test that after a restart (and so the in memory caches are reset) that + we correctly return an `M_UNKNOWN_POS` """ user1_id = self.register_user("user1", "pass") @@ -195,22 +195,16 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase): self.hs.get_sliding_sync_handler().connection_store._connections.clear() # Make the Sliding Sync request - response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) - - # If the cache has been cleared then we do expect the state to come down - state_map = self.get_success( - self.storage_controllers.state.get_current_state(room_id1) + channel = self.make_request( + method="POST", + path=self.sync_endpoint + f"?pos={from_token}", + content=sync_body, + access_token=user1_tok, ) - - self._assertRequiredStateIncludes( - response_body["rooms"][room_id1]["required_state"], - { - state_map[(EventTypes.Create, "")], - state_map[(EventTypes.RoomHistoryVisibility, "")], - }, - exact=True, + self.assertEqual(channel.code, 400, channel.json_body) + self.assertEqual( + channel.json_body["errcode"], "M_UNKNOWN_POS", channel.json_body ) - self.assertIsNone(response_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_wildcard(self) -> None: """ diff --git a/tests/rest/client/sliding_sync/test_rooms_timeline.py b/tests/rest/client/sliding_sync/test_rooms_timeline.py index 84a1e0d223..2e9586ca73 100644 --- a/tests/rest/client/sliding_sync/test_rooms_timeline.py +++ b/tests/rest/client/sliding_sync/test_rooms_timeline.py @@ -12,13 +12,14 @@ # <https://www.gnu.org/licenses/agpl-3.0.html>. # import logging +from typing import List, Optional from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.rest.client import login, room, sync from synapse.server import HomeServer -from synapse.types import StreamToken +from synapse.types import StreamToken, StrSequence from synapse.util import Clock from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase @@ -42,6 +43,82 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): self.store = hs.get_datastores().main self.storage_controllers = hs.get_storage_controllers() + def _assertListEqual( + self, + actual_items: StrSequence, + expected_items: StrSequence, + message: Optional[str] = None, + ) -> None: + """ + Like `self.assertListEqual(...)` but with an actually understandable diff message. + """ + + if actual_items == expected_items: + return + + expected_lines: List[str] = [] + for expected_item in expected_items: + is_expected_in_actual = expected_item in actual_items + expected_lines.append( + "{} {}".format(" " if is_expected_in_actual else "?", expected_item) + ) + + actual_lines: List[str] = [] + for actual_item in actual_items: + is_actual_in_expected = actual_item in expected_items + actual_lines.append( + "{} {}".format("+" if is_actual_in_expected else " ", actual_item) + ) + + newline = "\n" + expected_string = f"Expected items to be in actual ('?' = missing expected items):\n [\n{newline.join(expected_lines)}\n ]" + actual_string = f"Actual ('+' = found expected items):\n [\n{newline.join(actual_lines)}\n ]" + first_message = "Items must" + diff_message = f"{first_message}\n{expected_string}\n{actual_string}" + + self.fail(f"{diff_message}\n{message}") + + def _assertTimelineEqual( + self, + *, + room_id: str, + actual_event_ids: List[str], + expected_event_ids: List[str], + message: Optional[str] = None, + ) -> None: + """ + Like `self.assertListEqual(...)` for event IDs in a room but will give a nicer + output with context for what each event_id is (type, stream_ordering, content, + etc). + """ + if actual_event_ids == expected_event_ids: + return + + event_id_set = set(actual_event_ids + expected_event_ids) + events = self.get_success(self.store.get_events(event_id_set)) + + def event_id_to_string(event_id: str) -> str: + event = events.get(event_id) + if event: + state_key = event.get_state_key() + state_key_piece = f", {state_key}" if state_key is not None else "" + return ( + f"({event.internal_metadata.stream_ordering: >2}, {event.internal_metadata.instance_name}) " + + f"{event.event_id} ({event.type}{state_key_piece}) {event.content.get('membership', '')}{event.content.get('body', '')}" + ) + + return f"{event_id} <event not found in room_id={room_id}>" + + self._assertListEqual( + actual_items=[ + event_id_to_string(event_id) for event_id in actual_event_ids + ], + expected_items=[ + event_id_to_string(event_id) for event_id in expected_event_ids + ], + message=message, + ) + def test_rooms_limited_initial_sync(self) -> None: """ Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit` @@ -85,17 +162,18 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): response_body["rooms"][room_id1], ) # Check to make sure the latest events are returned - self.assertEqual( - [ + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - [ + expected_event_ids=[ event_response4["event_id"], event_response5["event_id"], user1_join_response["event_id"], ], - response_body["rooms"][room_id1]["timeline"], + message=str(response_body["rooms"][room_id1]["timeline"]), ) # Check to make sure the `prev_batch` points at the right place @@ -227,16 +305,17 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): + str(response_body["rooms"][room_id1]), ) # Check to make sure the latest events are returned - self.assertEqual( - [ + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - [ + expected_event_ids=[ event_response2["event_id"], event_response3["event_id"], ], - response_body["rooms"][room_id1]["timeline"], + message=str(response_body["rooms"][room_id1]["timeline"]), ) # All events are "live" @@ -303,18 +382,19 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): + str(response_body["rooms"][room_id1]), ) # Check to make sure that the "live" and historical events are returned - self.assertEqual( - [ + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - [ + expected_event_ids=[ event_response2["event_id"], user1_join_response["event_id"], event_response3["event_id"], event_response4["event_id"], ], - response_body["rooms"][room_id1]["timeline"], + message=str(response_body["rooms"][room_id1]["timeline"]), ) # Only events after the `from_token` are "live" (join, event3, event4) @@ -361,17 +441,18 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): response_body, _ = self.do_sync(sync_body, tok=user1_tok) # We should see events before the ban but not after - self.assertEqual( - [ + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - [ + expected_event_ids=[ event_response3["event_id"], event_response4["event_id"], user1_ban_response["event_id"], ], - response_body["rooms"][room_id1]["timeline"], + message=str(response_body["rooms"][room_id1]["timeline"]), ) # No "live" events in an initial sync (no `from_token` to define the "live" # range) @@ -428,17 +509,18 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase): response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok) # We should see events before the ban but not after - self.assertEqual( - [ + self._assertTimelineEqual( + room_id=room_id1, + actual_event_ids=[ event["event_id"] for event in response_body["rooms"][room_id1]["timeline"] ], - [ + expected_event_ids=[ event_response3["event_id"], event_response4["event_id"], user1_ban_response["event_id"], ], - response_body["rooms"][room_id1]["timeline"], + message=str(response_body["rooms"][room_id1]["timeline"]), ) # All live events in the incremental sync self.assertEqual( diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 9dea1af8ea..7b7590da76 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -148,7 +148,7 @@ class PaginationTestCase(HomeserverTestCase): """Make a request to /messages with a filter, returns the chunk of events.""" events, next_key = self.get_success( - self.hs.get_datastores().main.paginate_room_events( + self.hs.get_datastores().main.paginate_room_events_by_topological_ordering( room_id=self.room_id, from_key=self.from_token.room_key, to_key=None, |