diff options
52 files changed, 5640 insertions, 401 deletions
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index cdd881fbe1..2cc5a525a6 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -21,6 +21,7 @@ jobs: trial: ${{ !startsWith(github.ref, 'refs/pull/') || steps.filter.outputs.trial }} integration: ${{ !startsWith(github.ref, 'refs/pull/') || steps.filter.outputs.integration }} linting: ${{ !startsWith(github.ref, 'refs/pull/') || steps.filter.outputs.linting }} + linting_readme: ${{ !startsWith(github.ref, 'refs/pull/') || steps.filter.outputs.linting_readme }} steps: - uses: dorny/paths-filter@v3 id: filter @@ -72,6 +73,9 @@ jobs: - 'pyproject.toml' - 'poetry.lock' - '.github/workflows/tests.yml' + + linting_readme: + - 'README.rst' check-sampleconfig: runs-on: ubuntu-latest @@ -269,6 +273,20 @@ jobs: - run: cargo fmt --check + # This is to detect issues with the rst file, which can otherwise cause issues + # when uploading packages to PyPi. + lint-readme: + runs-on: ubuntu-latest + needs: changes + if: ${{ needs.changes.outputs.linting_readme == 'true' }} + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: "3.x" + - run: "pip install rstcheck" + - run: "rstcheck --report-level=WARNING README.rst" + # Dummy step to gate other tests on without repeating the whole list linting-done: if: ${{ !cancelled() }} # Run this even if prior jobs were skipped @@ -284,6 +302,7 @@ jobs: - lint-clippy - lint-clippy-nightly - lint-rustfmt + - lint-readme runs-on: ubuntu-latest steps: - uses: matrix-org/done-action@v2 @@ -301,6 +320,7 @@ jobs: lint-clippy lint-clippy-nightly lint-rustfmt + lint-readme calculate-test-jobs: diff --git a/Cargo.lock b/Cargo.lock index 1955c1a4e7..4353e55977 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,9 +234,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.21" +version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "memchr" @@ -505,9 +505,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.117" +version = "1.0.119" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" +checksum = "e8eddb61f0697cc3989c5d64b452f5488e2b8a60fd7d5076a3045076ffef8cb0" dependencies = [ "itoa", "ryu", diff --git a/changelog.d/17320.feature b/changelog.d/17320.feature new file mode 100644 index 0000000000..1e524f3eca --- /dev/null +++ b/changelog.d/17320.feature @@ -0,0 +1 @@ +Add `rooms` data to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/changelog.d/17337.feature b/changelog.d/17337.feature new file mode 100644 index 0000000000..bc8f437dbe --- /dev/null +++ b/changelog.d/17337.feature @@ -0,0 +1 @@ +Add `room_types`/`not_room_types` filtering to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/changelog.d/17356.doc b/changelog.d/17356.doc new file mode 100644 index 0000000000..b393d8d147 --- /dev/null +++ b/changelog.d/17356.doc @@ -0,0 +1 @@ +Clarify `url_preview_url_blacklist` is a usability feature. diff --git a/changelog.d/17362.bugfix b/changelog.d/17362.bugfix new file mode 100644 index 0000000000..a91ce9fc06 --- /dev/null +++ b/changelog.d/17362.bugfix @@ -0,0 +1 @@ +Fix rare race which causes no new to-device messages to be received from remote server. diff --git a/changelog.d/17363.misc b/changelog.d/17363.misc new file mode 100644 index 0000000000..555e2225ba --- /dev/null +++ b/changelog.d/17363.misc @@ -0,0 +1 @@ +Fix uploading packages to PyPi. \ No newline at end of file diff --git a/changelog.d/17365.feature b/changelog.d/17365.feature new file mode 100644 index 0000000000..f90dc84e38 --- /dev/null +++ b/changelog.d/17365.feature @@ -0,0 +1 @@ +Support [MSC3916](https://github.com/matrix-org/matrix-spec-proposals/blob/rav/authentication-for-media/proposals/3916-authentication-for-media.md) by adding _matrix/client/v1/media/download endpoint. \ No newline at end of file diff --git a/changelog.d/17367.misc b/changelog.d/17367.misc new file mode 100644 index 0000000000..361731b8ae --- /dev/null +++ b/changelog.d/17367.misc @@ -0,0 +1 @@ +Add CI check for the README. \ No newline at end of file diff --git a/changelog.d/17390.misc b/changelog.d/17390.misc new file mode 100644 index 0000000000..6a4e344c5c --- /dev/null +++ b/changelog.d/17390.misc @@ -0,0 +1 @@ +Fix building debian packages on non-clean checkouts. diff --git a/docker/build_debian.sh b/docker/build_debian.sh index 9eae38af91..00e0856c7d 100644 --- a/docker/build_debian.sh +++ b/docker/build_debian.sh @@ -11,6 +11,9 @@ DIST=$(cut -d ':' -f2 <<< "${distro:?}") cp -aT /synapse/source /synapse/build cd /synapse/build +# Delete any existing `.so` files to ensure a clean build. +rm -f /synapse/build/synapse/*.so + # if this is a prerelease, set the Section accordingly. # # When the package is later added to the package repo, reprepro will use the diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 063f3727f9..b6690f3404 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -117,7 +117,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { }, "media_repository": { "app": "synapse.app.generic_worker", - "listener_resources": ["media"], + "listener_resources": ["media", "client"], "endpoint_patterns": [ "^/_matrix/media/", "^/_synapse/admin/v1/purge_media_cache$", @@ -125,6 +125,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "^/_synapse/admin/v1/user/.*/media.*$", "^/_synapse/admin/v1/media/.*$", "^/_synapse/admin/v1/quarantine_media/.*$", + "^/_matrix/client/v1/media/.*$", ], # The first configured media worker will run the media background jobs "shared_extra_conf": { diff --git a/docs/upgrade.md b/docs/upgrade.md index 99be4122bb..cf53f56b06 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -117,6 +117,19 @@ each upgrade are complete before moving on to the next upgrade, to avoid stacking them up. You can monitor the currently running background updates with [the Admin API](usage/administration/admin_api/background_updates.html#status). +# Upgrading to v1.111.0 + +## New worker endpoints for authenticated client media + +[Media repository workers](./workers.md#synapseappmedia_repository) handling +Media APIs can now handle the following endpoint pattern: + +``` +^/_matrix/client/v1/media/.*$ +``` + +Please update your reverse proxy configuration. + # Upgrading to v1.106.0 ## Minimum supported Rust version diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 80a7bf9d21..65b03ad0f8 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1976,9 +1976,10 @@ This will not prevent the listed domains from accessing media themselves. It simply prevents users on this server from downloading media originating from the listed servers. -This will have no effect on media originating from the local server. -This only affects media downloaded from other Matrix servers, to -block domains from URL previews see [`url_preview_url_blacklist`](#url_preview_url_blacklist). +This will have no effect on media originating from the local server. This only +affects media downloaded from other Matrix servers, to control URL previews see +[`url_preview_ip_range_blacklist`](#url_preview_ip_range_blacklist) or +[`url_preview_url_blacklist`](#url_preview_url_blacklist). Defaults to an empty list (nothing blocked). @@ -2130,12 +2131,14 @@ url_preview_ip_range_whitelist: --- ### `url_preview_url_blacklist` -Optional list of URL matches that the URL preview spider is -denied from accessing. You should use `url_preview_ip_range_blacklist` -in preference to this, otherwise someone could define a public DNS -entry that points to a private IP address and circumvent the blacklist. -This is more useful if you know there is an entire shape of URL that -you know that will never want synapse to try to spider. +Optional list of URL matches that the URL preview spider is denied from +accessing. This is a usability feature, not a security one. You should use +`url_preview_ip_range_blacklist` in preference to this, otherwise someone could +define a public DNS entry that points to a private IP address and circumvent +the blacklist. Applications that perform redirects or serve different content +when detecting that Synapse is accessing them can also bypass the blacklist. +This is more useful if you know there is an entire shape of URL that you know +that you do not want Synapse to preview. Each list entry is a dictionary of url component attributes as returned by urlparse.urlsplit as applied to the absolute form of the URL. See diff --git a/docs/workers.md b/docs/workers.md index 1f6bfd9e7f..22fde488a9 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -739,6 +739,7 @@ An example for a federation sender instance: Handles the media repository. It can handle all endpoints starting with: /_matrix/media/ + /_matrix/client/v1/media/ ... and the following regular expressions matching media-specific administration APIs: diff --git a/mypy.ini b/mypy.ini index 1a2b9ea410..3fca15c01b 100644 --- a/mypy.ini +++ b/mypy.ini @@ -96,3 +96,6 @@ ignore_missing_imports = True # https://github.com/twisted/treq/pull/366 [mypy-treq.*] ignore_missing_imports = True + +[mypy-multipart.*] +ignore_missing_imports = True diff --git a/poetry.lock b/poetry.lock index 1bae0ea388..8142406e3f 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. [[package]] name = "annotated-types" @@ -403,43 +403,43 @@ files = [ [[package]] name = "cryptography" -version = "42.0.7" +version = "42.0.8" description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers." optional = false python-versions = ">=3.7" files = [ - {file = "cryptography-42.0.7-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:a987f840718078212fdf4504d0fd4c6effe34a7e4740378e59d47696e8dfb477"}, - {file = "cryptography-42.0.7-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:bd13b5e9b543532453de08bcdc3cc7cebec6f9883e886fd20a92f26940fd3e7a"}, - {file = "cryptography-42.0.7-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a79165431551042cc9d1d90e6145d5d0d3ab0f2d66326c201d9b0e7f5bf43604"}, - {file = "cryptography-42.0.7-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a47787a5e3649008a1102d3df55424e86606c9bae6fb77ac59afe06d234605f8"}, - {file = "cryptography-42.0.7-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:02c0eee2d7133bdbbc5e24441258d5d2244beb31da5ed19fbb80315f4bbbff55"}, - {file = "cryptography-42.0.7-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:5e44507bf8d14b36b8389b226665d597bc0f18ea035d75b4e53c7b1ea84583cc"}, - {file = "cryptography-42.0.7-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:7f8b25fa616d8b846aef64b15c606bb0828dbc35faf90566eb139aa9cff67af2"}, - {file = "cryptography-42.0.7-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:93a3209f6bb2b33e725ed08ee0991b92976dfdcf4e8b38646540674fc7508e13"}, - {file = "cryptography-42.0.7-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:e6b8f1881dac458c34778d0a424ae5769de30544fc678eac51c1c8bb2183e9da"}, - {file = "cryptography-42.0.7-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:3de9a45d3b2b7d8088c3fbf1ed4395dfeff79d07842217b38df14ef09ce1d8d7"}, - {file = "cryptography-42.0.7-cp37-abi3-win32.whl", hash = "sha256:789caea816c6704f63f6241a519bfa347f72fbd67ba28d04636b7c6b7da94b0b"}, - {file = "cryptography-42.0.7-cp37-abi3-win_amd64.whl", hash = "sha256:8cb8ce7c3347fcf9446f201dc30e2d5a3c898d009126010cbd1f443f28b52678"}, - {file = "cryptography-42.0.7-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:a3a5ac8b56fe37f3125e5b72b61dcde43283e5370827f5233893d461b7360cd4"}, - {file = "cryptography-42.0.7-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:779245e13b9a6638df14641d029add5dc17edbef6ec915688f3acb9e720a5858"}, - {file = "cryptography-42.0.7-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0d563795db98b4cd57742a78a288cdbdc9daedac29f2239793071fe114f13785"}, - {file = "cryptography-42.0.7-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:31adb7d06fe4383226c3e963471f6837742889b3c4caa55aac20ad951bc8ffda"}, - {file = "cryptography-42.0.7-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:efd0bf5205240182e0f13bcaea41be4fdf5c22c5129fc7ced4a0282ac86998c9"}, - {file = "cryptography-42.0.7-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:a9bc127cdc4ecf87a5ea22a2556cab6c7eda2923f84e4f3cc588e8470ce4e42e"}, - {file = "cryptography-42.0.7-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:3577d029bc3f4827dd5bf8bf7710cac13527b470bbf1820a3f394adb38ed7d5f"}, - {file = "cryptography-42.0.7-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:2e47577f9b18723fa294b0ea9a17d5e53a227867a0a4904a1a076d1646d45ca1"}, - {file = "cryptography-42.0.7-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:1a58839984d9cb34c855197043eaae2c187d930ca6d644612843b4fe8513c886"}, - {file = "cryptography-42.0.7-cp39-abi3-win32.whl", hash = "sha256:e6b79d0adb01aae87e8a44c2b64bc3f3fe59515280e00fb6d57a7267a2583cda"}, - {file = "cryptography-42.0.7-cp39-abi3-win_amd64.whl", hash = "sha256:16268d46086bb8ad5bf0a2b5544d8a9ed87a0e33f5e77dd3c3301e63d941a83b"}, - {file = "cryptography-42.0.7-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:2954fccea107026512b15afb4aa664a5640cd0af630e2ee3962f2602693f0c82"}, - {file = "cryptography-42.0.7-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:362e7197754c231797ec45ee081f3088a27a47c6c01eff2ac83f60f85a50fe60"}, - {file = "cryptography-42.0.7-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:4f698edacf9c9e0371112792558d2f705b5645076cc0aaae02f816a0171770fd"}, - {file = "cryptography-42.0.7-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:5482e789294854c28237bba77c4c83be698be740e31a3ae5e879ee5444166582"}, - {file = "cryptography-42.0.7-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:e9b2a6309f14c0497f348d08a065d52f3020656f675819fc405fb63bbcd26562"}, - {file = "cryptography-42.0.7-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:d8e3098721b84392ee45af2dd554c947c32cc52f862b6a3ae982dbb90f577f14"}, - {file = "cryptography-42.0.7-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:c65f96dad14f8528a447414125e1fc8feb2ad5a272b8f68477abbcc1ea7d94b9"}, - {file = "cryptography-42.0.7-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:36017400817987670037fbb0324d71489b6ead6231c9604f8fc1f7d008087c68"}, - {file = "cryptography-42.0.7.tar.gz", hash = "sha256:ecbfbc00bf55888edda9868a4cf927205de8499e7fabe6c050322298382953f2"}, + {file = "cryptography-42.0.8-cp37-abi3-macosx_10_12_universal2.whl", hash = "sha256:81d8a521705787afe7a18d5bfb47ea9d9cc068206270aad0b96a725022e18d2e"}, + {file = "cryptography-42.0.8-cp37-abi3-macosx_10_12_x86_64.whl", hash = "sha256:961e61cefdcb06e0c6d7e3a1b22ebe8b996eb2bf50614e89384be54c48c6b63d"}, + {file = "cryptography-42.0.8-cp37-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e3ec3672626e1b9e55afd0df6d774ff0e953452886e06e0f1eb7eb0c832e8902"}, + {file = "cryptography-42.0.8-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e599b53fd95357d92304510fb7bda8523ed1f79ca98dce2f43c115950aa78801"}, + {file = "cryptography-42.0.8-cp37-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:5226d5d21ab681f432a9c1cf8b658c0cb02533eece706b155e5fbd8a0cdd3949"}, + {file = "cryptography-42.0.8-cp37-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:6b7c4f03ce01afd3b76cf69a5455caa9cfa3de8c8f493e0d3ab7d20611c8dae9"}, + {file = "cryptography-42.0.8-cp37-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:2346b911eb349ab547076f47f2e035fc8ff2c02380a7cbbf8d87114fa0f1c583"}, + {file = "cryptography-42.0.8-cp37-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:ad803773e9df0b92e0a817d22fd8a3675493f690b96130a5e24f1b8fabbea9c7"}, + {file = "cryptography-42.0.8-cp37-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:2f66d9cd9147ee495a8374a45ca445819f8929a3efcd2e3df6428e46c3cbb10b"}, + {file = "cryptography-42.0.8-cp37-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:d45b940883a03e19e944456a558b67a41160e367a719833c53de6911cabba2b7"}, + {file = "cryptography-42.0.8-cp37-abi3-win32.whl", hash = "sha256:a0c5b2b0585b6af82d7e385f55a8bc568abff8923af147ee3c07bd8b42cda8b2"}, + {file = "cryptography-42.0.8-cp37-abi3-win_amd64.whl", hash = "sha256:57080dee41209e556a9a4ce60d229244f7a66ef52750f813bfbe18959770cfba"}, + {file = "cryptography-42.0.8-cp39-abi3-macosx_10_12_universal2.whl", hash = "sha256:dea567d1b0e8bc5764b9443858b673b734100c2871dc93163f58c46a97a83d28"}, + {file = "cryptography-42.0.8-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c4783183f7cb757b73b2ae9aed6599b96338eb957233c58ca8f49a49cc32fd5e"}, + {file = "cryptography-42.0.8-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a0608251135d0e03111152e41f0cc2392d1e74e35703960d4190b2e0f4ca9c70"}, + {file = "cryptography-42.0.8-cp39-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:dc0fdf6787f37b1c6b08e6dfc892d9d068b5bdb671198c72072828b80bd5fe4c"}, + {file = "cryptography-42.0.8-cp39-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:9c0c1716c8447ee7dbf08d6db2e5c41c688544c61074b54fc4564196f55c25a7"}, + {file = "cryptography-42.0.8-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:fff12c88a672ab9c9c1cf7b0c80e3ad9e2ebd9d828d955c126be4fd3e5578c9e"}, + {file = "cryptography-42.0.8-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:cafb92b2bc622cd1aa6a1dce4b93307792633f4c5fe1f46c6b97cf67073ec961"}, + {file = "cryptography-42.0.8-cp39-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:31f721658a29331f895a5a54e7e82075554ccfb8b163a18719d342f5ffe5ecb1"}, + {file = "cryptography-42.0.8-cp39-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:b297f90c5723d04bcc8265fc2a0f86d4ea2e0f7ab4b6994459548d3a6b992a14"}, + {file = "cryptography-42.0.8-cp39-abi3-win32.whl", hash = "sha256:2f88d197e66c65be5e42cd72e5c18afbfae3f741742070e3019ac8f4ac57262c"}, + {file = "cryptography-42.0.8-cp39-abi3-win_amd64.whl", hash = "sha256:fa76fbb7596cc5839320000cdd5d0955313696d9511debab7ee7278fc8b5c84a"}, + {file = "cryptography-42.0.8-pp310-pypy310_pp73-macosx_10_12_x86_64.whl", hash = "sha256:ba4f0a211697362e89ad822e667d8d340b4d8d55fae72cdd619389fb5912eefe"}, + {file = "cryptography-42.0.8-pp310-pypy310_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:81884c4d096c272f00aeb1f11cf62ccd39763581645b0812e99a91505fa48e0c"}, + {file = "cryptography-42.0.8-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:c9bb2ae11bfbab395bdd072985abde58ea9860ed84e59dbc0463a5d0159f5b71"}, + {file = "cryptography-42.0.8-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:7016f837e15b0a1c119d27ecd89b3515f01f90a8615ed5e9427e30d9cdbfed3d"}, + {file = "cryptography-42.0.8-pp39-pypy39_pp73-macosx_10_12_x86_64.whl", hash = "sha256:5a94eccb2a81a309806027e1670a358b99b8fe8bfe9f8d329f27d72c094dde8c"}, + {file = "cryptography-42.0.8-pp39-pypy39_pp73-manylinux_2_28_aarch64.whl", hash = "sha256:dec9b018df185f08483f294cae6ccac29e7a6e0678996587363dc352dc65c842"}, + {file = "cryptography-42.0.8-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:343728aac38decfdeecf55ecab3264b015be68fc2816ca800db649607aeee648"}, + {file = "cryptography-42.0.8-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:013629ae70b40af70c9a7a5db40abe5d9054e6f4380e50ce769947b73bf3caad"}, + {file = "cryptography-42.0.8.tar.gz", hash = "sha256:8d09d05439ce7baa8e9e95b07ec5b6c886f548deb7e0f69ef25f64b3bce842f2"}, ] [package.dependencies] @@ -2040,6 +2040,20 @@ files = [ six = ">=1.5" [[package]] +name = "python-multipart" +version = "0.0.9" +description = "A streaming multipart parser for Python" +optional = false +python-versions = ">=3.8" +files = [ + {file = "python_multipart-0.0.9-py3-none-any.whl", hash = "sha256:97ca7b8ea7b05f977dc3849c3ba99d51689822fab725c3703af7c866a0c2b215"}, + {file = "python_multipart-0.0.9.tar.gz", hash = "sha256:03f54688c663f1b7977105f021043b0793151e4cb1c1a9d4a11fc13d622c4026"}, +] + +[package.extras] +dev = ["atomicwrites (==1.4.1)", "attrs (==23.2.0)", "coverage (==7.4.1)", "hatch", "invoke (==2.2.0)", "more-itertools (==10.2.0)", "pbr (==6.0.0)", "pluggy (==1.4.0)", "py (==1.11.0)", "pytest (==8.0.0)", "pytest-cov (==4.1.0)", "pytest-timeout (==2.2.0)", "pyyaml (==6.0.1)", "ruff (==0.2.1)"] + +[[package]] name = "pytz" version = "2022.7.1" description = "World timezone definitions, modern and historical" @@ -2906,13 +2920,13 @@ urllib3 = ">=2" [[package]] name = "types-setuptools" -version = "69.5.0.20240423" +version = "70.1.0.20240627" description = "Typing stubs for setuptools" optional = false python-versions = ">=3.8" files = [ - {file = "types-setuptools-69.5.0.20240423.tar.gz", hash = "sha256:a7ba908f1746c4337d13f027fa0f4a5bcad6d1d92048219ba792b3295c58586d"}, - {file = "types_setuptools-69.5.0.20240423-py3-none-any.whl", hash = "sha256:a4381e041510755a6c9210e26ad55b1629bc10237aeb9cb8b6bd24996b73db48"}, + {file = "types-setuptools-70.1.0.20240627.tar.gz", hash = "sha256:385907a47b5cf302b928ce07953cd91147d5de6f3da604c31905fdf0ec309e83"}, + {file = "types_setuptools-70.1.0.20240627-py3-none-any.whl", hash = "sha256:c7bdf05cd0a8b66868b4774c7b3c079d01ae025d8c9562bfc8bf2ff44d263c9c"}, ] [[package]] @@ -3187,4 +3201,4 @@ user-search = ["pyicu"] [metadata] lock-version = "2.0" python-versions = "^3.8.0" -content-hash = "107c8fb5c67360340854fbdba3c085fc5f9c7be24bcb592596a914eea621faea" +content-hash = "e8d5806e10eb69bc06900fde18ea3df38f38490ab6baa73fe4a563dfb6abacba" diff --git a/pyproject.toml b/pyproject.toml index 616cc0d710..88595a3079 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -224,6 +224,8 @@ pydantic = ">=1.7.4, <3" # needed. setuptools_rust = ">=1.3" +# This is used for parsing multipart responses +python-multipart = ">=0.0.9" # Optional Dependencies # --------------------- diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index a99a9e09fc..26b8711851 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -130,7 +130,8 @@ class Ratelimiter: Overrides the value set during instantiation if set. burst_count: How many actions that can be performed before being limited. Overrides the value set during instantiation if set. - update: Whether to count this check as performing the action + update: Whether to count this check as performing the action. If the action + cannot be performed, the user's action count is not incremented at all. n_actions: The number of times the user wants to do this action. If the user cannot do all of the actions, the user's action count is not incremented at all. diff --git a/synapse/events/utils.py b/synapse/events/utils.py index b997d82d71..f937fd4698 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -836,3 +836,21 @@ def maybe_upsert_event_field( del container[key] return upsert_okay + + +def strip_event(event: EventBase) -> JsonDict: + """ + Used for "stripped state" events which provide a simplified view of the state of a + room intended to help a potential joiner identify the room (relevant when the user + is invited or knocked). + + Stripped state events can only have the `sender`, `type`, `state_key` and `content` + properties present. + """ + + return { + "type": event.type, + "state_key": event.state_key, + "content": event.content, + "sender": event.sender, + } diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f0f5a37a57..7d80ff6998 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1871,6 +1871,52 @@ class FederationClient(FederationBase): return filtered_statuses, filtered_failures + async def federation_download_media( + self, + destination: str, + media_id: str, + output_stream: BinaryIO, + max_size: int, + max_timeout_ms: int, + download_ratelimiter: Ratelimiter, + ip_address: str, + ) -> Union[ + Tuple[int, Dict[bytes, List[bytes]], bytes], + Tuple[int, Dict[bytes, List[bytes]]], + ]: + try: + return await self.transport_layer.federation_download_media( + destination, + media_id, + output_stream=output_stream, + max_size=max_size, + max_timeout_ms=max_timeout_ms, + download_ratelimiter=download_ratelimiter, + ip_address=ip_address, + ) + except HttpResponseException as e: + # If an error is received that is due to an unrecognised endpoint, + # fallback to the _matrix/media/v3/download endpoint. Otherwise, consider it a legitimate error + # and raise. + if not is_unknown_endpoint(e): + raise + + logger.debug( + "Couldn't download media %s/%s over _matrix/federation/v1/media/download, falling back to _matrix/media/v3/download path", + destination, + media_id, + ) + + return await self.transport_layer.download_media_v3( + destination, + media_id, + output_stream=output_stream, + max_size=max_size, + max_timeout_ms=max_timeout_ms, + download_ratelimiter=download_ratelimiter, + ip_address=ip_address, + ) + async def download_media( self, destination: str, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index af1336fe5f..206e91ed14 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -824,7 +824,6 @@ class TransportLayerClient: ip_address: str, ) -> Tuple[int, Dict[bytes, List[bytes]]]: path = f"/_matrix/media/r0/download/{destination}/{media_id}" - return await self.client.get_file( destination, path, @@ -852,7 +851,6 @@ class TransportLayerClient: ip_address: str, ) -> Tuple[int, Dict[bytes, List[bytes]]]: path = f"/_matrix/media/v3/download/{destination}/{media_id}" - return await self.client.get_file( destination, path, @@ -873,6 +871,29 @@ class TransportLayerClient: ip_address=ip_address, ) + async def federation_download_media( + self, + destination: str, + media_id: str, + output_stream: BinaryIO, + max_size: int, + max_timeout_ms: int, + download_ratelimiter: Ratelimiter, + ip_address: str, + ) -> Tuple[int, Dict[bytes, List[bytes]], bytes]: + path = f"/_matrix/federation/v1/media/download/{media_id}" + return await self.client.federation_get_file( + destination, + path, + output_stream=output_stream, + max_size=max_size, + args={ + "timeout_ms": str(max_timeout_ms), + }, + download_ratelimiter=download_ratelimiter, + ip_address=ip_address, + ) + def _create_path(federation_prefix: str, path: str, *args: str) -> str: """ diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py index edaf0196d6..c44e5daa47 100644 --- a/synapse/federation/transport/server/__init__.py +++ b/synapse/federation/transport/server/__init__.py @@ -32,8 +32,8 @@ from synapse.federation.transport.server._base import ( from synapse.federation.transport.server.federation import ( FEDERATION_SERVLET_CLASSES, FederationAccountStatusServlet, + FederationMediaDownloadServlet, FederationUnstableClientKeysClaimServlet, - FederationUnstableMediaDownloadServlet, ) from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import ( @@ -316,11 +316,8 @@ def register_servlets( ): continue - if servletclass == FederationUnstableMediaDownloadServlet: - if ( - not hs.config.server.enable_media_repo - or not hs.config.experimental.msc3916_authenticated_media_enabled - ): + if servletclass == FederationMediaDownloadServlet: + if not hs.config.server.enable_media_repo: continue servletclass( diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py index 4e2717b565..e124481474 100644 --- a/synapse/federation/transport/server/_base.py +++ b/synapse/federation/transport/server/_base.py @@ -362,7 +362,7 @@ class BaseFederationServlet: return None if ( func.__self__.__class__.__name__ # type: ignore - == "FederationUnstableMediaDownloadServlet" + == "FederationMediaDownloadServlet" ): response = await func( origin, content, request, *args, **kwargs @@ -374,7 +374,7 @@ class BaseFederationServlet: else: if ( func.__self__.__class__.__name__ # type: ignore - == "FederationUnstableMediaDownloadServlet" + == "FederationMediaDownloadServlet" ): response = await func( origin, content, request, *args, **kwargs diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index 67bb907050..ec957768d4 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -790,7 +790,7 @@ class FederationAccountStatusServlet(BaseFederationServerServlet): return 200, {"account_statuses": statuses, "failures": failures} -class FederationUnstableMediaDownloadServlet(BaseFederationServerServlet): +class FederationMediaDownloadServlet(BaseFederationServerServlet): """ Implementation of new federation media `/download` endpoint outlined in MSC3916. Returns a multipart/mixed response consisting of a JSON object and the requested media @@ -798,7 +798,6 @@ class FederationUnstableMediaDownloadServlet(BaseFederationServerServlet): """ PATH = "/media/download/(?P<media_id>[^/]*)" - PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3916" RATELIMIT = True def __init__( @@ -858,5 +857,5 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationV1SendKnockServlet, FederationMakeKnockServlet, FederationAccountStatusServlet, - FederationUnstableMediaDownloadServlet, + FederationMediaDownloadServlet, ) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 847a638bba..0cebeea592 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -18,22 +18,34 @@ # # import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple +import attr from immutabledict import immutabledict -from synapse.api.constants import AccountDataTypes, EventTypes, Membership +from synapse.api.constants import ( + AccountDataTypes, + Direction, + EventContentFields, + EventTypes, + Membership, +) from synapse.events import EventBase -from synapse.storage.roommember import RoomsForUser +from synapse.events.utils import strip_event +from synapse.handlers.relations import BundledAggregations +from synapse.storage.databases.main.stream import CurrentStateDeltaMembership from synapse.types import ( + JsonDict, PersistedEventPosition, Requester, RoomStreamToken, + StreamKeyType, StreamToken, UserID, ) from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult from synapse.types.state import StateFilter +from synapse.visibility import filter_events_for_client if TYPE_CHECKING: from synapse.server import HomeServer @@ -41,28 +53,9 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser: - """ - Quick helper to convert an event to a `RoomsForUser` object. - """ - # These fields should be present for all persisted events - assert event.internal_metadata.stream_ordering is not None - assert event.internal_metadata.instance_name is not None - - return RoomsForUser( - room_id=event.room_id, - sender=event.sender, - membership=event.membership, - event_id=event.event_id, - event_pos=PersistedEventPosition( - event.internal_metadata.instance_name, - event.internal_metadata.stream_ordering, - ), - room_version_id=event.room_version.identifier, - ) - - -def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool: +def filter_membership_for_sync( + *, membership: str, user_id: str, sender: Optional[str] +) -> bool: """ Returns True if the membership event should be included in the sync response, otherwise False. @@ -79,7 +72,54 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> # # This logic includes kicks (leave events where the sender is not the same user) and # can be read as "anything that isn't a leave or a leave with a different sender". - return membership != Membership.LEAVE or sender != user_id + # + # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset + # happened that removed the user from the room, or the user was the last person + # locally to leave the room which caused the server to leave the room. In both + # cases, we can just remove the rooms since they are no longer relevant to the user. + # They could still be added back later if they are `newly_left`. + return membership != Membership.LEAVE or sender not in (user_id, None) + + +# We can't freeze this class because we want to update it in place with the +# de-duplicated data. +@attr.s(slots=True, auto_attribs=True) +class RoomSyncConfig: + """ + Holds the config for what data we should fetch for a room in the sync response. + + Attributes: + timeline_limit: The maximum number of events to return in the timeline. + required_state: The set of state events requested for the room. The + values are close to `StateKey` but actually use a syntax where you can + provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part + of the tuple (type, state_key). + """ + + timeline_limit: int + required_state: Set[Tuple[str, str]] + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _RoomMembershipForUser: + """ + Attributes: + event_id: The event ID of the membership event + event_pos: The stream position of the membership event + membership: The membership state of the user in the room + sender: The person who sent the membership event + newly_joined: Whether the user newly joined the room during the given token + range + """ + + event_id: Optional[str] + event_pos: PersistedEventPosition + membership: str + sender: Optional[str] + newly_joined: bool + + def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser": + return attr.evolve(self, **kwds) class SlidingSyncHandler: @@ -90,6 +130,7 @@ class SlidingSyncHandler: self.auth_blocking = hs.get_auth_blocking() self.notifier = hs.get_notifier() self.event_sources = hs.get_event_sources() + self.relations_handler = hs.get_relations_handler() self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync async def wait_for_sync_for_user( @@ -201,6 +242,7 @@ class SlidingSyncHandler: # Assemble sliding window lists lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + relevant_room_map: Dict[str, RoomSyncConfig] = {} if sync_config.lists: # Get all of the room IDs that the user should be able to see in the sync # response @@ -225,29 +267,67 @@ class SlidingSyncHandler: ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: for range in list_config.ranges: + sliced_room_ids = [ + room_id + # Both sides of range are inclusive + for room_id, _ in sorted_room_info[range[0] : range[1] + 1] + ] + ops.append( SlidingSyncResult.SlidingWindowList.Operation( op=OperationType.SYNC, range=range, - room_ids=[ - room_id - for room_id, _ in sorted_room_info[ - range[0] : range[1] - ] - ], + room_ids=sliced_room_ids, ) ) + # Take the superset of the `RoomSyncConfig` for each room + for room_id in sliced_room_ids: + if relevant_room_map.get(room_id) is not None: + # Take the highest timeline limit + if ( + relevant_room_map[room_id].timeline_limit + < list_config.timeline_limit + ): + relevant_room_map[room_id].timeline_limit = ( + list_config.timeline_limit + ) + + # Union the required state + relevant_room_map[room_id].required_state.update( + list_config.required_state + ) + else: + relevant_room_map[room_id] = RoomSyncConfig( + timeline_limit=list_config.timeline_limit, + required_state=set(list_config.required_state), + ) + lists[list_key] = SlidingSyncResult.SlidingWindowList( count=len(sorted_room_info), ops=ops, ) + # TODO: if (sync_config.room_subscriptions): + + # Fetch room data + rooms: Dict[str, SlidingSyncResult.RoomResult] = {} + for room_id, room_sync_config in relevant_room_map.items(): + room_sync_result = await self.get_room_sync_data( + user=sync_config.user, + room_id=room_id, + room_sync_config=room_sync_config, + rooms_membership_for_user_at_to_token=sync_room_map[room_id], + from_token=from_token, + to_token=to_token, + ) + + rooms[room_id] = room_sync_result + return SlidingSyncResult( next_pos=to_token, lists=lists, - # TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions` - rooms={}, + rooms=rooms, extensions={}, ) @@ -256,7 +336,7 @@ class SlidingSyncHandler: user: UserID, to_token: StreamToken, from_token: Optional[StreamToken] = None, - ) -> Dict[str, RoomsForUser]: + ) -> Dict[str, _RoomMembershipForUser]: """ Fetch room IDs that should be listed for this user in the sync response (the full room list that will be filtered, sorted, and sliced). @@ -305,13 +385,17 @@ class SlidingSyncHandler: # Our working list of rooms that can show up in the sync response sync_room_id_set = { - room_for_user.room_id: room_for_user - for room_for_user in room_for_user_list - if filter_membership_for_sync( + # Note: The `room_for_user` we're assigning here will need to be fixed up + # (below) because they are potentially from the current snapshot time + # instead from the time of the `to_token`. + room_for_user.room_id: _RoomMembershipForUser( + event_id=room_for_user.event_id, + event_pos=room_for_user.event_pos, membership=room_for_user.membership, - user_id=user_id, sender=room_for_user.sender, + newly_joined=False, ) + for room_for_user in room_for_user_list } # Get the `RoomStreamToken` that represents the spot we queried up to when we got @@ -346,14 +430,9 @@ class SlidingSyncHandler: # # - 1a) Remove rooms that the user joined after the `to_token` # - 1b) Add back rooms that the user left after the `to_token` + # - 1c) Update room membership events to the point in time of the `to_token` # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`) - # - # Below, we're doing two separate lookups for membership changes. We could - # request everything for both fixups in one range, [`from_token.room_key`, - # `membership_snapshot_token`), but we want to avoid raw `stream_ordering` - # comparison without `instance_name` (which is flawed). We could refactor - # `event.internal_metadata` to include `instance_name` but it might turn out a - # little difficult and a bigger, broader Synapse change than we want to make. + # - 3) Figure out which rooms are `newly_joined` # 1) ----------------------------------------------------- @@ -363,159 +442,198 @@ class SlidingSyncHandler: # If our `to_token` is already the same or ahead of the latest room membership # for the user, we don't need to do any "2)" fix-ups and can just straight-up # use the room list from the snapshot as a base (nothing has changed) - membership_change_events_after_to_token = [] + current_state_delta_membership_changes_after_to_token = [] if not membership_snapshot_token.is_before_or_eq(to_token.room_key): - membership_change_events_after_to_token = ( - await self.store.get_membership_changes_for_user( + current_state_delta_membership_changes_after_to_token = ( + await self.store.get_current_state_delta_membership_changes_for_user( user_id, from_key=to_token.room_key, to_key=membership_snapshot_token, - excluded_rooms=self.rooms_to_exclude_globally, + excluded_room_ids=self.rooms_to_exclude_globally, ) ) - # 1) Assemble a list of the last membership events in some given ranges. Someone - # could have left and joined multiple times during the given range but we only - # care about end-result so we grab the last one. - last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} - # We also need the first membership event after the `to_token` so we can step - # backward to the previous membership that would apply to the from/to range. - first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {} - for event in membership_change_events_after_to_token: - last_membership_change_by_room_id_after_to_token[event.room_id] = event + # 1) Assemble a list of the first membership event after the `to_token` so we can + # step backward to the previous membership that would apply to the from/to + # range. + first_membership_change_by_room_id_after_to_token: Dict[ + str, CurrentStateDeltaMembership + ] = {} + for membership_change in current_state_delta_membership_changes_after_to_token: # Only set if we haven't already set it first_membership_change_by_room_id_after_to_token.setdefault( - event.room_id, event + membership_change.room_id, membership_change ) # 1) Fixup + # + # Since we fetched a snapshot of the users room list at some point in time after + # the from/to tokens, we need to revert/rewind some membership changes to match + # the point in time of the `to_token`. for ( - last_membership_change_after_to_token - ) in last_membership_change_by_room_id_after_to_token.values(): - room_id = last_membership_change_after_to_token.room_id - - # We want to find the first membership change after the `to_token` then step - # backward to know the membership in the from/to range. - first_membership_change_after_to_token = ( - first_membership_change_by_room_id_after_to_token.get(room_id) - ) - assert first_membership_change_after_to_token is not None, ( - "If there was a `last_membership_change_after_to_token` that we're iterating over, " - + "then there should be corresponding a first change. For example, even if there " - + "is only one event after the `to_token`, the first and last event will be same event. " - + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`" - + "/`first_membership_change_by_room_id_after_to_token` dicts above." - ) - # TODO: Instead of reading from `unsigned`, refactor this to use the - # `current_state_delta_stream` table in the future. Probably a new - # `get_membership_changes_for_user()` function that uses - # `current_state_delta_stream` with a join to `room_memberships`. This would - # help in state reset scenarios since `prev_content` is looking at the - # current branch vs the current room state. This is all just data given to - # the client so no real harm to data integrity, but we'd like to be nice to - # the client. Since the `current_state_delta_stream` table is new, it - # doesn't have all events in it. Since this is Sliding Sync, if we ever need - # to, we can signal the client to throw all of their state away by sending - # "operation: RESET". - prev_content = first_membership_change_after_to_token.unsigned.get( - "prev_content", {} - ) - prev_membership = prev_content.get("membership", None) - prev_sender = first_membership_change_after_to_token.unsigned.get( - "prev_sender", None - ) - - # Check if the previous membership (membership that applies to the from/to - # range) should be included in our `sync_room_id_set` - should_prev_membership_be_included = ( - prev_membership is not None - and prev_sender is not None - and filter_membership_for_sync( - membership=prev_membership, - user_id=user_id, - sender=prev_sender, - ) - ) - - # Check if the last membership (membership that applies to our snapshot) was - # already included in our `sync_room_id_set` - was_last_membership_already_included = filter_membership_for_sync( - membership=last_membership_change_after_to_token.membership, + room_id, + first_membership_change_after_to_token, + ) in first_membership_change_by_room_id_after_to_token.items(): + # 1a) Remove rooms that the user joined after the `to_token` + if first_membership_change_after_to_token.prev_event_id is None: + sync_room_id_set.pop(room_id, None) + # 1b) 1c) From the first membership event after the `to_token`, step backward to the + # previous membership that would apply to the from/to range. + else: + # We don't expect these fields to be `None` if we have a `prev_event_id` + # but we're being defensive since it's possible that the prev event was + # culled from the database. + if ( + first_membership_change_after_to_token.prev_event_pos is not None + and first_membership_change_after_to_token.prev_membership + is not None + ): + sync_room_id_set[room_id] = _RoomMembershipForUser( + event_id=first_membership_change_after_to_token.prev_event_id, + event_pos=first_membership_change_after_to_token.prev_event_pos, + membership=first_membership_change_after_to_token.prev_membership, + sender=first_membership_change_after_to_token.prev_sender, + newly_joined=False, + ) + else: + # If we can't find the previous membership event, we shouldn't + # include the room in the sync response since we can't determine the + # exact membership state and shouldn't rely on the current snapshot. + sync_room_id_set.pop(room_id, None) + + # Filter the rooms that that we have updated room membership events to the point + # in time of the `to_token` (from the "1)" fixups) + filtered_sync_room_id_set = { + room_id: room_membership_for_user + for room_id, room_membership_for_user in sync_room_id_set.items() + if filter_membership_for_sync( + membership=room_membership_for_user.membership, user_id=user_id, - sender=last_membership_change_after_to_token.sender, + sender=room_membership_for_user.sender, ) - - # 1a) Add back rooms that the user left after the `to_token` - # - # For example, if the last membership event after the `to_token` is a leave - # event, then the room was excluded from `sync_room_id_set` when we first - # crafted it above. We should add these rooms back as long as the user also - # was part of the room before the `to_token`. - if ( - not was_last_membership_already_included - and should_prev_membership_be_included - ): - sync_room_id_set[room_id] = convert_event_to_rooms_for_user( - last_membership_change_after_to_token - ) - # 1b) Remove rooms that the user joined (hasn't left) after the `to_token` - # - # For example, if the last membership event after the `to_token` is a "join" - # event, then the room was included `sync_room_id_set` when we first crafted - # it above. We should remove these rooms as long as the user also wasn't - # part of the room before the `to_token`. - elif ( - was_last_membership_already_included - and not should_prev_membership_be_included - ): - del sync_room_id_set[room_id] + } # 2) ----------------------------------------------------- # We fix-up newly_left rooms after the first fixup because it may have removed - # some left rooms that we can figure out our newly_left in the following code + # some left rooms that we can figure out are newly_left in the following code # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token` - membership_change_events_in_from_to_range = [] + current_state_delta_membership_changes_in_from_to_range = [] if from_token: - membership_change_events_in_from_to_range = ( - await self.store.get_membership_changes_for_user( + current_state_delta_membership_changes_in_from_to_range = ( + await self.store.get_current_state_delta_membership_changes_for_user( user_id, from_key=from_token.room_key, to_key=to_token.room_key, - excluded_rooms=self.rooms_to_exclude_globally, + excluded_room_ids=self.rooms_to_exclude_globally, ) ) # 2) Assemble a list of the last membership events in some given ranges. Someone # could have left and joined multiple times during the given range but we only # care about end-result so we grab the last one. - last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {} - for event in membership_change_events_in_from_to_range: - last_membership_change_by_room_id_in_from_to_range[event.room_id] = event + last_membership_change_by_room_id_in_from_to_range: Dict[ + str, CurrentStateDeltaMembership + ] = {} + # We also want to assemble a list of the first membership events during the token + # range so we can step backward to the previous membership that would apply to + # before the token range to see if we have `newly_joined` the room. + first_membership_change_by_room_id_in_from_to_range: Dict[ + str, CurrentStateDeltaMembership + ] = {} + # Keep track if the room has a non-join event in the token range so we can later + # tell if it was a `newly_joined` room. If the last membership event in the + # token range is a join and there is also some non-join in the range, we know + # they `newly_joined`. + has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {} + for ( + membership_change + ) in current_state_delta_membership_changes_in_from_to_range: + room_id = membership_change.room_id + + last_membership_change_by_room_id_in_from_to_range[room_id] = ( + membership_change + ) + # Only set if we haven't already set it + first_membership_change_by_room_id_in_from_to_range.setdefault( + room_id, membership_change + ) + + if membership_change.membership != Membership.JOIN: + has_non_join_event_by_room_id_in_from_to_range[room_id] = True # 2) Fixup + # + # 3) We also want to assemble a list of possibly newly joined rooms. Someone + # could have left and joined multiple times during the given range but we only + # care about whether they are joined at the end of the token range so we are + # working with the last membership even in the token range. + possibly_newly_joined_room_ids = set() for ( last_membership_change_in_from_to_range ) in last_membership_change_by_room_id_in_from_to_range.values(): room_id = last_membership_change_in_from_to_range.room_id + # 3) + if last_membership_change_in_from_to_range.membership == Membership.JOIN: + possibly_newly_joined_room_ids.add(room_id) + # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We # include newly_left rooms because the last event that the user should see # is their own leave event if last_membership_change_in_from_to_range.membership == Membership.LEAVE: - sync_room_id_set[room_id] = convert_event_to_rooms_for_user( - last_membership_change_in_from_to_range + filtered_sync_room_id_set[room_id] = _RoomMembershipForUser( + event_id=last_membership_change_in_from_to_range.event_id, + event_pos=last_membership_change_in_from_to_range.event_pos, + membership=last_membership_change_in_from_to_range.membership, + sender=last_membership_change_in_from_to_range.sender, + newly_joined=False, ) - return sync_room_id_set + # 3) Figure out `newly_joined` + for room_id in possibly_newly_joined_room_ids: + has_non_join_in_from_to_range = ( + has_non_join_event_by_room_id_in_from_to_range.get(room_id, False) + ) + # If the last membership event in the token range is a join and there is + # also some non-join in the range, we know they `newly_joined`. + if has_non_join_in_from_to_range: + # We found a `newly_joined` room (we left and joined within the token range) + filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[ + room_id + ].copy_and_replace(newly_joined=True) + else: + prev_event_id = first_membership_change_by_room_id_in_from_to_range[ + room_id + ].prev_event_id + prev_membership = first_membership_change_by_room_id_in_from_to_range[ + room_id + ].prev_membership + + if prev_event_id is None: + # We found a `newly_joined` room (we are joining the room for the + # first time within the token range) + filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[ + room_id + ].copy_and_replace(newly_joined=True) + # Last resort, we need to step back to the previous membership event + # just before the token range to see if we're joined then or not. + elif prev_membership != Membership.JOIN: + # We found a `newly_joined` room (we left before the token range + # and joined within the token range) + filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[ + room_id + ].copy_and_replace(newly_joined=True) + + return filtered_sync_room_id_set async def filter_rooms( self, user: UserID, - sync_room_map: Dict[str, RoomsForUser], + sync_room_map: Dict[str, _RoomMembershipForUser], filters: SlidingSyncConfig.SlidingSyncList.Filters, to_token: StreamToken, - ) -> Dict[str, RoomsForUser]: + ) -> Dict[str, _RoomMembershipForUser]: """ Filter rooms based on the sync request. @@ -583,6 +701,10 @@ class SlidingSyncHandler: state_filter=StateFilter.from_types( [(EventTypes.RoomEncryption, "")] ), + # Partially stated rooms should have all state events except for the + # membership events so we don't need to wait. Plus we don't want to + # block the whole sync waiting for this one room. + await_full_state=False, ) is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, "")) @@ -609,11 +731,26 @@ class SlidingSyncHandler: ): filtered_room_id_set.remove(room_id) - if filters.room_types: - raise NotImplementedError() + # Filter by room type (space vs room, etc). A room must match one of the types + # provided in the list. `None` is a valid type for rooms which do not have a + # room type. + if filters.room_types is not None or filters.not_room_types is not None: + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in list(filtered_room_id_set): + create_event = await self.store.get_create_event_for_room(room_id) + room_type = create_event.content.get(EventContentFields.ROOM_TYPE) + if ( + filters.room_types is not None + and room_type not in filters.room_types + ): + filtered_room_id_set.remove(room_id) - if filters.not_room_types: - raise NotImplementedError() + if ( + filters.not_room_types is not None + and room_type in filters.not_room_types + ): + filtered_room_id_set.remove(room_id) if filters.room_name_like: raise NotImplementedError() @@ -629,9 +766,9 @@ class SlidingSyncHandler: async def sort_rooms( self, - sync_room_map: Dict[str, RoomsForUser], + sync_room_map: Dict[str, _RoomMembershipForUser], to_token: StreamToken, - ) -> List[Tuple[str, RoomsForUser]]: + ) -> List[Tuple[str, _RoomMembershipForUser]]: """ Sort by `stream_ordering` of the last event that the user should see in the room. `stream_ordering` is unique so we get a stable sort. @@ -678,3 +815,229 @@ class SlidingSyncHandler: # We want descending order reverse=True, ) + + async def get_room_sync_data( + self, + user: UserID, + room_id: str, + room_sync_config: RoomSyncConfig, + rooms_membership_for_user_at_to_token: _RoomMembershipForUser, + from_token: Optional[StreamToken], + to_token: StreamToken, + ) -> SlidingSyncResult.RoomResult: + """ + Fetch room data for the sync response. + + We fetch data according to the token range (> `from_token` and <= `to_token`). + + Args: + user: User to fetch data for + room_id: The room ID to fetch data for + room_sync_config: Config for what data we should fetch for a room in the + sync response. + rooms_membership_for_user_at_to_token: Membership information for the user + in the room at the time of `to_token`. + from_token: The point in the stream to sync from. + to_token: The point in the stream to sync up to. + """ + + # Assemble the list of timeline events + # + # It would be nice to make the `rooms` response more uniform regardless of + # membership. Currently, we have to make all of these optional because + # `invite`/`knock` rooms only have `stripped_state`. See + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 + timeline_events: Optional[List[EventBase]] = None + bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None + limited: Optional[bool] = None + prev_batch_token: Optional[StreamToken] = None + num_live: Optional[int] = None + if ( + room_sync_config.timeline_limit > 0 + # No timeline for invite/knock rooms (just `stripped_state`) + and rooms_membership_for_user_at_to_token.membership + not in (Membership.INVITE, Membership.KNOCK) + ): + limited = False + # We want to start off using the `to_token` (vs `from_token`) because we look + # backwards from the `to_token` up to the `timeline_limit` and we might not + # reach the `from_token` before we hit the limit. We will update the room stream + # position once we've fetched the events to point to the earliest event fetched. + prev_batch_token = to_token + + # We're going to paginate backwards from the `to_token` + from_bound = to_token.room_key + # People shouldn't see past their leave/ban event + if rooms_membership_for_user_at_to_token.membership in ( + Membership.LEAVE, + Membership.BAN, + ): + from_bound = ( + rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token() + ) + + # Determine whether we should limit the timeline to the token range. + # + # We should return historical messages (before token range) in the + # following cases because we want clients to be able to show a basic + # screen of information: + # - Initial sync (because no `from_token` to limit us anyway) + # - When users `newly_joined` + # - TODO: For an incremental sync where we haven't sent it down this + # connection before + to_bound = ( + from_token.room_key + if from_token is not None + and not rooms_membership_for_user_at_to_token.newly_joined + else None + ) + + timeline_events, new_room_key = await self.store.paginate_room_events( + room_id=room_id, + from_key=from_bound, + to_key=to_bound, + direction=Direction.BACKWARDS, + # We add one so we can determine if there are enough events to saturate + # the limit or not (see `limited`) + limit=room_sync_config.timeline_limit + 1, + event_filter=None, + ) + + # We want to return the events in ascending order (the last event is the + # most recent). + timeline_events.reverse() + + # Determine our `limited` status based on the timeline. We do this before + # filtering the events so we can accurately determine if there is more to + # paginate even if we filter out some/all events. + if len(timeline_events) > room_sync_config.timeline_limit: + limited = True + # Get rid of that extra "+ 1" event because we only used it to determine + # if we hit the limit or not + timeline_events = timeline_events[-room_sync_config.timeline_limit :] + assert timeline_events[0].internal_metadata.stream_ordering + new_room_key = RoomStreamToken( + stream=timeline_events[0].internal_metadata.stream_ordering - 1 + ) + + # Make sure we don't expose any events that the client shouldn't see + timeline_events = await filter_events_for_client( + self.storage_controllers, + user.to_string(), + timeline_events, + is_peeking=rooms_membership_for_user_at_to_token.membership + != Membership.JOIN, + filter_send_to_client=True, + ) + # TODO: Filter out `EventTypes.CallInvite` in public rooms, + # see https://github.com/element-hq/synapse/issues/17359 + + # TODO: Handle timeline gaps (`get_timeline_gaps()`) + + # Determine how many "live" events we have (events within the given token range). + # + # This is mostly useful to determine whether a given @mention event should + # make a noise or not. Clients cannot rely solely on the absence of + # `initial: true` to determine live events because if a room not in the + # sliding window bumps into the window because of an @mention it will have + # `initial: true` yet contain a single live event (with potentially other + # old events in the timeline) + num_live = 0 + if from_token is not None: + for timeline_event in reversed(timeline_events): + # This fields should be present for all persisted events + assert timeline_event.internal_metadata.stream_ordering is not None + assert timeline_event.internal_metadata.instance_name is not None + + persisted_position = PersistedEventPosition( + instance_name=timeline_event.internal_metadata.instance_name, + stream=timeline_event.internal_metadata.stream_ordering, + ) + if persisted_position.persisted_after(from_token.room_key): + num_live += 1 + else: + # Since we're iterating over the timeline events in + # reverse-chronological order, we can break once we hit an event + # that's not live. In the future, we could potentially optimize + # this more with a binary search (bisect). + break + + # If the timeline is `limited=True`, the client does not have all events + # necessary to calculate aggregations themselves. + if limited: + bundled_aggregations = ( + await self.relations_handler.get_bundled_aggregations( + timeline_events, user.to_string() + ) + ) + + # Update the `prev_batch_token` to point to the position that allows us to + # keep paginating backwards from the oldest event we return in the timeline. + prev_batch_token = prev_batch_token.copy_and_replace( + StreamKeyType.ROOM, new_room_key + ) + + # Figure out any stripped state events for invite/knocks. This allows the + # potential joiner to identify the room. + stripped_state: List[JsonDict] = [] + if rooms_membership_for_user_at_to_token.membership in ( + Membership.INVITE, + Membership.KNOCK, + ): + # This should never happen. If someone is invited/knocked on room, then + # there should be an event for it. + assert rooms_membership_for_user_at_to_token.event_id is not None + + invite_or_knock_event = await self.store.get_event( + rooms_membership_for_user_at_to_token.event_id + ) + + stripped_state = [] + if invite_or_knock_event.membership == Membership.INVITE: + stripped_state.extend( + invite_or_knock_event.unsigned.get("invite_room_state", []) + ) + elif invite_or_knock_event.membership == Membership.KNOCK: + stripped_state.extend( + invite_or_knock_event.unsigned.get("knock_room_state", []) + ) + + stripped_state.append(strip_event(invite_or_knock_event)) + + # TODO: Handle state resets. For example, if we see + # `rooms_membership_for_user_at_to_token.membership = Membership.LEAVE` but + # `required_state` doesn't include it, we should indicate to the client that a + # state reset happened. Perhaps we should indicate this by setting `initial: + # True` and empty `required_state`. + + return SlidingSyncResult.RoomResult( + # TODO: Dummy value + name=None, + # TODO: Dummy value + avatar=None, + # TODO: Dummy value + heroes=None, + # TODO: Since we can't determine whether we've already sent a room down this + # Sliding Sync connection before (we plan to add this optimization in the + # future), we're always returning the requested room state instead of + # updates. + initial=True, + # TODO: Dummy value + required_state=[], + timeline_events=timeline_events, + bundled_aggregations=bundled_aggregations, + # TODO: Dummy value + is_dm=False, + stripped_state=stripped_state, + prev_batch=prev_batch_token, + limited=limited, + # TODO: Dummy values + joined_count=0, + invited_count=0, + # TODO: These are just dummy values. We could potentially just remove these + # since notifications can only really be done correctly on the client anyway + # (encrypted rooms). + notification_count=0, + highlight_count=0, + num_live=num_live, + ) diff --git a/synapse/http/client.py b/synapse/http/client.py index 4718517c97..56ad28eabf 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -35,6 +35,8 @@ from typing import ( Union, ) +import attr +import multipart import treq from canonicaljson import encode_canonical_json from netaddr import AddrFormatError, IPAddress, IPSet @@ -1006,6 +1008,130 @@ class _DiscardBodyWithMaxSizeProtocol(protocol.Protocol): self._maybe_fail() +@attr.s(auto_attribs=True, slots=True) +class MultipartResponse: + """ + A small class to hold parsed values of a multipart response. + """ + + json: bytes = b"{}" + length: Optional[int] = None + content_type: Optional[bytes] = None + disposition: Optional[bytes] = None + url: Optional[bytes] = None + + +class _MultipartParserProtocol(protocol.Protocol): + """ + Protocol to read and parse a MSC3916 multipart/mixed response + """ + + transport: Optional[ITCPTransport] = None + + def __init__( + self, + stream: ByteWriteable, + deferred: defer.Deferred, + boundary: str, + max_length: Optional[int], + ) -> None: + self.stream = stream + self.deferred = deferred + self.boundary = boundary + self.max_length = max_length + self.parser = None + self.multipart_response = MultipartResponse() + self.has_redirect = False + self.in_json = False + self.json_done = False + self.file_length = 0 + self.total_length = 0 + self.in_disposition = False + self.in_content_type = False + + def dataReceived(self, incoming_data: bytes) -> None: + if self.deferred.called: + return + + # we don't have a parser yet, instantiate it + if not self.parser: + + def on_header_field(data: bytes, start: int, end: int) -> None: + if data[start:end] == b"Location": + self.has_redirect = True + if data[start:end] == b"Content-Disposition": + self.in_disposition = True + if data[start:end] == b"Content-Type": + self.in_content_type = True + + def on_header_value(data: bytes, start: int, end: int) -> None: + # the first header should be content-type for application/json + if not self.in_json and not self.json_done: + assert data[start:end] == b"application/json" + self.in_json = True + elif self.has_redirect: + self.multipart_response.url = data[start:end] + elif self.in_content_type: + self.multipart_response.content_type = data[start:end] + self.in_content_type = False + elif self.in_disposition: + self.multipart_response.disposition = data[start:end] + self.in_disposition = False + + def on_part_data(data: bytes, start: int, end: int) -> None: + # we've seen json header but haven't written the json data + if self.in_json and not self.json_done: + self.multipart_response.json = data[start:end] + self.json_done = True + # we have a redirect header rather than a file, and have already captured it + elif self.has_redirect: + return + # otherwise we are in the file part + else: + logger.info("Writing multipart file data to stream") + try: + self.stream.write(data[start:end]) + except Exception as e: + logger.warning( + f"Exception encountered writing file data to stream: {e}" + ) + self.deferred.errback() + self.file_length += end - start + + callbacks = { + "on_header_field": on_header_field, + "on_header_value": on_header_value, + "on_part_data": on_part_data, + } + self.parser = multipart.MultipartParser(self.boundary, callbacks) + + self.total_length += len(incoming_data) + if self.max_length is not None and self.total_length >= self.max_length: + self.deferred.errback(BodyExceededMaxSize()) + # Close the connection (forcefully) since all the data will get + # discarded anyway. + assert self.transport is not None + self.transport.abortConnection() + + try: + self.parser.write(incoming_data) # type: ignore[attr-defined] + except Exception as e: + logger.warning(f"Exception writing to multipart parser: {e}") + self.deferred.errback() + return + + def connectionLost(self, reason: Failure = connectionDone) -> None: + # If the maximum size was already exceeded, there's nothing to do. + if self.deferred.called: + return + + if reason.check(ResponseDone): + self.multipart_response.length = self.file_length + self.deferred.callback(self.multipart_response) + else: + self.deferred.errback(reason) + + class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): """A protocol which reads body to a stream, erroring if the body exceeds a maximum size.""" @@ -1091,6 +1217,32 @@ def read_body_with_max_size( return d +def read_multipart_response( + response: IResponse, stream: ByteWriteable, boundary: str, max_length: Optional[int] +) -> "defer.Deferred[MultipartResponse]": + """ + Reads a MSC3916 multipart/mixed response and parses it, reading the file part (if it contains one) into + the stream passed in and returning a deferred resolving to a MultipartResponse + + Args: + response: The HTTP response to read from. + stream: The file-object to write to. + boundary: the multipart/mixed boundary string + max_length: maximum allowable length of the response + """ + d: defer.Deferred[MultipartResponse] = defer.Deferred() + + # If the Content-Length header gives a size larger than the maximum allowed + # size, do not bother downloading the body. + if max_length is not None and response.length != UNKNOWN_LENGTH: + if response.length > max_length: + response.deliverBody(_DiscardBodyWithMaxSizeProtocol(d)) + return d + + response.deliverBody(_MultipartParserProtocol(stream, d, boundary, max_length)) + return d + + def encode_query_args(args: Optional[QueryParams]) -> bytes: """ Encodes a map of query arguments to bytes which can be appended to a URL. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 104b803b0f..749b01dd0e 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -75,9 +75,11 @@ from synapse.http.client import ( BlocklistingAgentWrapper, BodyExceededMaxSize, ByteWriteable, + SimpleHttpClient, _make_scheduler, encode_query_args, read_body_with_max_size, + read_multipart_response, ) from synapse.http.connectproxyclient import BearerProxyCredentials from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent @@ -466,6 +468,13 @@ class MatrixFederationHttpClient: self._sleeper = AwakenableSleeper(self.reactor) + self._simple_http_client = SimpleHttpClient( + hs, + ip_blocklist=hs.config.server.federation_ip_range_blocklist, + ip_allowlist=hs.config.server.federation_ip_range_allowlist, + use_proxy=True, + ) + def wake_destination(self, destination: str) -> None: """Called when the remote server may have come back online.""" @@ -1553,6 +1562,189 @@ class MatrixFederationHttpClient: ) return length, headers + async def federation_get_file( + self, + destination: str, + path: str, + output_stream: BinaryIO, + download_ratelimiter: Ratelimiter, + ip_address: str, + max_size: int, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + ignore_backoff: bool = False, + ) -> Tuple[int, Dict[bytes, List[bytes]], bytes]: + """GETs a file from a given homeserver over the federation /download endpoint + Args: + destination: The remote server to send the HTTP request to. + path: The HTTP path to GET. + output_stream: File to write the response body to. + download_ratelimiter: a ratelimiter to limit remote media downloads, keyed to + requester IP + ip_address: IP address of the requester + max_size: maximum allowable size in bytes of the file + args: Optional dictionary used to create the query string. + ignore_backoff: true to ignore the historical backoff data + and try the request anyway. + + Returns: + Resolves to an (int, dict, bytes) tuple of + the file length, a dict of the response headers, and the file json + + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. + SynapseError: If the requested file exceeds ratelimits or the response from the + remote server is not a multipart response + AssertionError: if the resolved multipart response's length is None + """ + request = MatrixFederationRequest( + method="GET", destination=destination, path=path, query=args + ) + + # check for a minimum balance of 1MiB in ratelimiter before initiating request + send_req, _ = await download_ratelimiter.can_do_action( + requester=None, key=ip_address, n_actions=1048576, update=False + ) + + if not send_req: + msg = "Requested file size exceeds ratelimits" + logger.warning( + "{%s} [%s] %s", + request.txn_id, + request.destination, + msg, + ) + raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED) + + response = await self._send_request( + request, + retry_on_dns_fail=retry_on_dns_fail, + ignore_backoff=ignore_backoff, + ) + + headers = dict(response.headers.getAllRawHeaders()) + + expected_size = response.length + # if we don't get an expected length then use the max length + if expected_size == UNKNOWN_LENGTH: + expected_size = max_size + logger.debug( + f"File size unknown, assuming file is max allowable size: {max_size}" + ) + + read_body, _ = await download_ratelimiter.can_do_action( + requester=None, + key=ip_address, + n_actions=expected_size, + ) + if not read_body: + msg = "Requested file size exceeds ratelimits" + logger.warning( + "{%s} [%s] %s", + request.txn_id, + request.destination, + msg, + ) + raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED) + + # this should be a multipart/mixed response with the boundary string in the header + try: + raw_content_type = headers.get(b"Content-Type") + assert raw_content_type is not None + content_type = raw_content_type[0].decode("UTF-8") + content_type_parts = content_type.split("boundary=") + boundary = content_type_parts[1] + except Exception: + msg = "Remote response is malformed: expected Content-Type of multipart/mixed with a boundary present." + logger.warning( + "{%s} [%s] %s", + request.txn_id, + request.destination, + msg, + ) + raise SynapseError(HTTPStatus.BAD_GATEWAY, msg) + + try: + # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >= + deferred = read_multipart_response( + response, output_stream, boundary, expected_size + 1 + ) + deferred.addTimeout(self.default_timeout_seconds, self.reactor) + except BodyExceededMaxSize: + msg = "Requested file is too large > %r bytes" % (expected_size,) + logger.warning( + "{%s} [%s] %s", + request.txn_id, + request.destination, + msg, + ) + raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE) + except defer.TimeoutError as e: + logger.warning( + "{%s} [%s] Timed out reading response - %s %s", + request.txn_id, + request.destination, + request.method, + request.uri.decode("ascii"), + ) + raise RequestSendFailed(e, can_retry=True) from e + except ResponseFailed as e: + logger.warning( + "{%s} [%s] Failed to read response - %s %s", + request.txn_id, + request.destination, + request.method, + request.uri.decode("ascii"), + ) + raise RequestSendFailed(e, can_retry=True) from e + except Exception as e: + logger.warning( + "{%s} [%s] Error reading response: %s", + request.txn_id, + request.destination, + e, + ) + raise + + multipart_response = await make_deferred_yieldable(deferred) + if not multipart_response.url: + assert multipart_response.length is not None + length = multipart_response.length + headers[b"Content-Type"] = [multipart_response.content_type] + headers[b"Content-Disposition"] = [multipart_response.disposition] + + # the response contained a redirect url to download the file from + else: + str_url = multipart_response.url.decode("utf-8") + logger.info( + "{%s} [%s] File download redirected, now downloading from: %s", + request.txn_id, + request.destination, + str_url, + ) + length, headers, _, _ = await self._simple_http_client.get_file( + str_url, output_stream, expected_size + ) + + logger.info( + "{%s} [%s] Completed: %d %s [%d bytes] %s %s", + request.txn_id, + request.destination, + response.code, + response.phrase.decode("ascii", errors="replace"), + length, + request.method, + request.uri.decode("ascii"), + ) + return length, headers, multipart_response.json + def _flatten_response_never_received(e: BaseException) -> str: if hasattr(e, "reasons"): diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 7ad0b7c3cf..1b268ce4d4 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -221,6 +221,7 @@ def add_file_headers( # select private. don't bother setting Expires as all our # clients are smart enough to be happy with Cache-Control request.setHeader(b"Cache-Control", b"public,max-age=86400,s-maxage=86400") + if file_size is not None: request.setHeader(b"Content-Length", b"%d" % (file_size,)) @@ -302,12 +303,37 @@ async def respond_with_multipart_responder( ) return + if media_info.media_type.lower().split(";", 1)[0] in INLINE_CONTENT_TYPES: + disposition = "inline" + else: + disposition = "attachment" + + def _quote(x: str) -> str: + return urllib.parse.quote(x.encode("utf-8")) + + if media_info.upload_name: + if _can_encode_filename_as_token(media_info.upload_name): + disposition = "%s; filename=%s" % ( + disposition, + media_info.upload_name, + ) + else: + disposition = "%s; filename*=utf-8''%s" % ( + disposition, + _quote(media_info.upload_name), + ) + from synapse.media.media_storage import MultipartFileConsumer # note that currently the json_object is just {}, this will change when linked media # is implemented multipart_consumer = MultipartFileConsumer( - clock, request, media_info.media_type, {}, media_info.media_length + clock, + request, + media_info.media_type, + {}, + disposition, + media_info.media_length, ) logger.debug("Responding to media request with responder %s", responder) diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 1436329fad..542642b900 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -480,6 +480,7 @@ class MediaRepository: name: Optional[str], max_timeout_ms: int, ip_address: str, + use_federation_endpoint: bool, ) -> None: """Respond to requests for remote media. @@ -492,6 +493,8 @@ class MediaRepository: max_timeout_ms: the maximum number of milliseconds to wait for the media to be uploaded. ip_address: the IP address of the requester + use_federation_endpoint: whether to request the remote media over the new + federation `/download` endpoint Returns: Resolves once a response has successfully been written to request @@ -522,6 +525,7 @@ class MediaRepository: max_timeout_ms, self.download_ratelimiter, ip_address, + use_federation_endpoint, ) # We deliberately stream the file outside the lock @@ -569,6 +573,7 @@ class MediaRepository: max_timeout_ms, self.download_ratelimiter, ip_address, + False, ) # Ensure we actually use the responder so that it releases resources @@ -585,6 +590,7 @@ class MediaRepository: max_timeout_ms: int, download_ratelimiter: Ratelimiter, ip_address: str, + use_federation_endpoint: bool, ) -> Tuple[Optional[Responder], RemoteMedia]: """Looks for media in local cache, if not there then attempt to download from remote server. @@ -598,6 +604,8 @@ class MediaRepository: download_ratelimiter: a ratelimiter limiting remote media downloads, keyed to requester IP. ip_address: the IP address of the requester + use_federation_endpoint: whether to request the remote media over the new federation + /download endpoint Returns: A tuple of responder and the media info of the file. @@ -629,9 +637,23 @@ class MediaRepository: # Failed to find the file anywhere, lets download it. try: - media_info = await self._download_remote_file( - server_name, media_id, max_timeout_ms, download_ratelimiter, ip_address - ) + if not use_federation_endpoint: + media_info = await self._download_remote_file( + server_name, + media_id, + max_timeout_ms, + download_ratelimiter, + ip_address, + ) + else: + media_info = await self._federation_download_remote_file( + server_name, + media_id, + max_timeout_ms, + download_ratelimiter, + ip_address, + ) + except SynapseError: raise except Exception as e: @@ -775,6 +797,129 @@ class MediaRepository: quarantined_by=None, ) + async def _federation_download_remote_file( + self, + server_name: str, + media_id: str, + max_timeout_ms: int, + download_ratelimiter: Ratelimiter, + ip_address: str, + ) -> RemoteMedia: + """Attempt to download the remote file from the given server name. + Uses the given file_id as the local id and downloads the file over the federation + v1 download endpoint + + Args: + server_name: Originating server + media_id: The media ID of the content (as defined by the + remote server). This is different than the file_id, which is + locally generated. + max_timeout_ms: the maximum number of milliseconds to wait for the + media to be uploaded. + download_ratelimiter: a ratelimiter limiting remote media downloads, keyed to + requester IP + ip_address: the IP address of the requester + + Returns: + The media info of the file. + """ + + file_id = random_string(24) + + file_info = FileInfo(server_name=server_name, file_id=file_id) + + async with self.media_storage.store_into_file(file_info) as (f, fname): + try: + res = await self.client.federation_download_media( + server_name, + media_id, + output_stream=f, + max_size=self.max_upload_size, + max_timeout_ms=max_timeout_ms, + download_ratelimiter=download_ratelimiter, + ip_address=ip_address, + ) + # if we had to fall back to the _matrix/media endpoint it will only return + # the headers and length, check the length of the tuple before unpacking + if len(res) == 3: + length, headers, json = res + else: + length, headers = res + except RequestSendFailed as e: + logger.warning( + "Request failed fetching remote media %s/%s: %r", + server_name, + media_id, + e, + ) + raise SynapseError(502, "Failed to fetch remote media") + + except HttpResponseException as e: + logger.warning( + "HTTP error fetching remote media %s/%s: %s", + server_name, + media_id, + e.response, + ) + if e.code == twisted.web.http.NOT_FOUND: + raise e.to_synapse_error() + raise SynapseError(502, "Failed to fetch remote media") + + except SynapseError: + logger.warning( + "Failed to fetch remote media %s/%s", server_name, media_id + ) + raise + except NotRetryingDestination: + logger.warning("Not retrying destination %r", server_name) + raise SynapseError(502, "Failed to fetch remote media") + except Exception: + logger.exception( + "Failed to fetch remote media %s/%s", server_name, media_id + ) + raise SynapseError(502, "Failed to fetch remote media") + + if b"Content-Type" in headers: + media_type = headers[b"Content-Type"][0].decode("ascii") + else: + media_type = "application/octet-stream" + upload_name = get_filename_from_headers(headers) + time_now_ms = self.clock.time_msec() + + # Multiple remote media download requests can race (when using + # multiple media repos), so this may throw a violation constraint + # exception. If it does we'll delete the newly downloaded file from + # disk (as we're in the ctx manager). + # + # However: we've already called `finish()` so we may have also + # written to the storage providers. This is preferable to the + # alternative where we call `finish()` *after* this, where we could + # end up having an entry in the DB but fail to write the files to + # the storage providers. + await self.store.store_cached_remote_media( + origin=server_name, + media_id=media_id, + media_type=media_type, + time_now_ms=time_now_ms, + upload_name=upload_name, + media_length=length, + filesystem_id=file_id, + ) + + logger.debug("Stored remote media in file %r", fname) + + return RemoteMedia( + media_origin=server_name, + media_id=media_id, + media_type=media_type, + media_length=length, + upload_name=upload_name, + created_ts=time_now_ms, + filesystem_id=file_id, + last_access_ts=time_now_ms, + quarantined_by=None, + ) + def _get_thumbnail_requirements( self, media_type: str ) -> Tuple[ThumbnailRequirement, ...]: diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py index 1be2c9b5f5..2a106bb0eb 100644 --- a/synapse/media/media_storage.py +++ b/synapse/media/media_storage.py @@ -401,13 +401,14 @@ class MultipartFileConsumer: wrapped_consumer: interfaces.IConsumer, file_content_type: str, json_object: JsonDict, - content_length: Optional[int] = None, + disposition: str, + content_length: Optional[int], ) -> None: self.clock = clock self.wrapped_consumer = wrapped_consumer self.json_field = json_object self.json_field_written = False - self.content_type_written = False + self.file_headers_written = False self.file_content_type = file_content_type self.boundary = uuid4().hex.encode("ascii") @@ -420,6 +421,7 @@ class MultipartFileConsumer: self.paused = False self.length = content_length + self.disposition = disposition ### IConsumer APIs ### @@ -488,11 +490,13 @@ class MultipartFileConsumer: self.json_field_written = True # if we haven't written the content type yet, do so - if not self.content_type_written: + if not self.file_headers_written: type = self.file_content_type.encode("utf-8") content_type = Header(b"Content-Type", type) - self.wrapped_consumer.write(bytes(content_type) + CRLF + CRLF) - self.content_type_written = True + self.wrapped_consumer.write(bytes(content_type) + CRLF) + disp_header = Header(b"Content-Disposition", self.disposition) + self.wrapped_consumer.write(bytes(disp_header) + CRLF + CRLF) + self.file_headers_written = True self.wrapped_consumer.write(data) @@ -506,7 +510,6 @@ class MultipartFileConsumer: producing data for good. """ assert self.producer is not None - self.paused = True self.producer.stopProducing() @@ -518,7 +521,6 @@ class MultipartFileConsumer: the time being, and to stop until C{resumeProducing()} is called. """ assert self.producer is not None - self.paused = True if self.streaming: @@ -549,7 +551,7 @@ class MultipartFileConsumer: """ if not self.length: return None - # calculate length of json field and content-type header + # calculate length of json field and content-type, disposition headers json_field = json.dumps(self.json_field) json_bytes = json_field.encode("utf-8") json_length = len(json_bytes) @@ -558,9 +560,13 @@ class MultipartFileConsumer: content_type = Header(b"Content-Type", type) type_length = len(bytes(content_type)) - # 154 is the length of the elements that aren't variable, ie + disp = self.disposition.encode("utf-8") + disp_header = Header(b"Content-Disposition", disp) + disp_length = len(bytes(disp_header)) + + # 156 is the length of the elements that aren't variable, ie # CRLFs and boundary strings, etc - self.length += json_length + type_length + 154 + self.length += json_length + type_length + disp_length + 156 return self.length @@ -569,7 +575,6 @@ class MultipartFileConsumer: async def _resumeProducingRepeatedly(self) -> None: assert self.producer is not None assert not self.streaming - producer = cast("interfaces.IPullProducer", self.producer) self.paused = False diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index 0024ccf708..c94d454a28 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -145,6 +145,10 @@ class ClientRestResource(JsonResource): password_policy.register_servlets(hs, client_resource) knock.register_servlets(hs, client_resource) appservice_ping.register_servlets(hs, client_resource) + if hs.config.server.enable_media_repo: + from synapse.rest.client import media + + media.register_servlets(hs, client_resource) # moving to /_synapse/admin if is_main_process: diff --git a/synapse/rest/client/media.py b/synapse/rest/client/media.py index 0c089163c1..c0ae5dd66f 100644 --- a/synapse/rest/client/media.py +++ b/synapse/rest/client/media.py @@ -22,6 +22,7 @@ import logging import re +from typing import Optional from synapse.http.server import ( HttpServer, @@ -194,14 +195,76 @@ class UnstableThumbnailResource(RestServlet): self.media_repo.mark_recently_accessed(server_name, media_id) +class DownloadResource(RestServlet): + PATTERNS = [ + re.compile( + "/_matrix/client/v1/media/download/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)(/(?P<file_name>[^/]*))?$" + ) + ] + + def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): + super().__init__() + self.media_repo = media_repo + self._is_mine_server_name = hs.is_mine_server_name + self.auth = hs.get_auth() + + async def on_GET( + self, + request: SynapseRequest, + server_name: str, + media_id: str, + file_name: Optional[str] = None, + ) -> None: + # Validate the server name, raising if invalid + parse_and_validate_server_name(server_name) + + await self.auth.get_user_by_req(request) + + set_cors_headers(request) + set_corp_headers(request) + request.setHeader( + b"Content-Security-Policy", + b"sandbox;" + b" default-src 'none';" + b" script-src 'none';" + b" plugin-types application/pdf;" + b" style-src 'unsafe-inline';" + b" media-src 'self';" + b" object-src 'self';", + ) + # Limited non-standard form of CSP for IE11 + request.setHeader(b"X-Content-Security-Policy", b"sandbox;") + request.setHeader(b"Referrer-Policy", b"no-referrer") + max_timeout_ms = parse_integer( + request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS + ) + max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS) + + if self._is_mine_server_name(server_name): + await self.media_repo.get_local_media( + request, media_id, file_name, max_timeout_ms + ) + else: + ip_address = request.getClientAddress().host + await self.media_repo.get_remote_media( + request, + server_name, + media_id, + file_name, + max_timeout_ms, + ip_address, + True, + ) + + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - if hs.config.experimental.msc3916_authenticated_media_enabled: - media_repo = hs.get_media_repository() - if hs.config.media.url_preview_enabled: - UnstablePreviewURLServlet( - hs, media_repo, media_repo.media_storage - ).register(http_server) - UnstableMediaConfigResource(hs).register(http_server) - UnstableThumbnailResource(hs, media_repo, media_repo.media_storage).register( + media_repo = hs.get_media_repository() + if hs.config.media.url_preview_enabled: + UnstablePreviewURLServlet(hs, media_repo, media_repo.media_storage).register( http_server ) + UnstableMediaConfigResource(hs).register(http_server) + UnstableThumbnailResource(hs, media_repo, media_repo.media_storage).register( + http_server + ) + DownloadResource(hs, media_repo).register(http_server) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index b5ab0d8534..1d955a2e89 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -761,7 +761,6 @@ class SlidingSyncRestServlet(RestServlet): "lists": { "foo-list": { "ranges": [ [0, 99] ], - "sort": [ "by_notification_level", "by_recency", "by_name" ], "required_state": [ ["m.room.join_rules", ""], ["m.room.history_visibility", ""], @@ -771,7 +770,6 @@ class SlidingSyncRestServlet(RestServlet): "filters": { "is_dm": true }, - "bump_event_types": [ "m.room.message", "m.room.encrypted" ], } }, // Room Subscriptions API @@ -779,10 +777,6 @@ class SlidingSyncRestServlet(RestServlet): "!sub1:bar": { "required_state": [ ["*","*"] ], "timeline_limit": 10, - "include_old_rooms": { - "timeline_limit": 1, - "required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ], - } } }, // Extensions API @@ -791,7 +785,7 @@ class SlidingSyncRestServlet(RestServlet): Response JSON:: { - "next_pos": "s58_224_0_13_10_1_1_16_0_1", + "pos": "s58_224_0_13_10_1_1_16_0_1", "lists": { "foo-list": { "count": 1337, @@ -830,7 +824,8 @@ class SlidingSyncRestServlet(RestServlet): "joined_count": 41, "invited_count": 1, "notification_count": 1, - "highlight_count": 0 + "highlight_count": 0, + "num_live": 2" }, // rooms from list "!foo:bar": { @@ -855,7 +850,8 @@ class SlidingSyncRestServlet(RestServlet): "joined_count": 4, "invited_count": 0, "notification_count": 54, - "highlight_count": 3 + "highlight_count": 3, + "num_live": 1, }, // ... 99 more items }, @@ -871,10 +867,11 @@ class SlidingSyncRestServlet(RestServlet): super().__init__() self.auth = hs.get_auth() self.store = hs.get_datastores().main + self.clock = hs.get_clock() self.filtering = hs.get_filtering() self.sliding_sync_handler = hs.get_sliding_sync_handler() + self.event_serializer = hs.get_event_client_serializer() - # TODO: Update this to `on_GET` once we figure out how we want to handle params async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) user = requester.user @@ -920,22 +917,25 @@ class SlidingSyncRestServlet(RestServlet): logger.info("Client has disconnected; not serializing response.") return 200, {} - response_content = await self.encode_response(sliding_sync_results) + response_content = await self.encode_response(requester, sliding_sync_results) return 200, response_content # TODO: Is there a better way to encode things? async def encode_response( self, + requester: Requester, sliding_sync_result: SlidingSyncResult, ) -> JsonDict: response: JsonDict = defaultdict(dict) - response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store) + response["pos"] = await sliding_sync_result.next_pos.to_string(self.store) serialized_lists = self.encode_lists(sliding_sync_result.lists) if serialized_lists: response["lists"] = serialized_lists - response["rooms"] = {} # TODO: sliding_sync_result.rooms + response["rooms"] = await self.encode_rooms( + requester, sliding_sync_result.rooms + ) response["extensions"] = {} # TODO: sliding_sync_result.extensions return response @@ -961,6 +961,92 @@ class SlidingSyncRestServlet(RestServlet): return serialized_lists + async def encode_rooms( + self, + requester: Requester, + rooms: Dict[str, SlidingSyncResult.RoomResult], + ) -> JsonDict: + time_now = self.clock.time_msec() + + serialize_options = SerializeEventConfig( + event_format=format_event_for_client_v2_without_room_id, + requester=requester, + ) + + serialized_rooms: Dict[str, JsonDict] = {} + for room_id, room_result in rooms.items(): + serialized_rooms[room_id] = { + "joined_count": room_result.joined_count, + "invited_count": room_result.invited_count, + "notification_count": room_result.notification_count, + "highlight_count": room_result.highlight_count, + } + + if room_result.name: + serialized_rooms[room_id]["name"] = room_result.name + + if room_result.avatar: + serialized_rooms[room_id]["avatar"] = room_result.avatar + + if room_result.heroes: + serialized_rooms[room_id]["heroes"] = room_result.heroes + + # We should only include the `initial` key if it's `True` to save bandwidth. + # The absense of this flag means `False`. + if room_result.initial: + serialized_rooms[room_id]["initial"] = room_result.initial + + # This will omitted for invite/knock rooms with `stripped_state` + if room_result.required_state is not None: + serialized_required_state = ( + await self.event_serializer.serialize_events( + room_result.required_state, + time_now, + config=serialize_options, + ) + ) + serialized_rooms[room_id]["required_state"] = serialized_required_state + + # This will omitted for invite/knock rooms with `stripped_state` + if room_result.timeline_events is not None: + serialized_timeline = await self.event_serializer.serialize_events( + room_result.timeline_events, + time_now, + config=serialize_options, + bundle_aggregations=room_result.bundled_aggregations, + ) + serialized_rooms[room_id]["timeline"] = serialized_timeline + + # This will omitted for invite/knock rooms with `stripped_state` + if room_result.limited is not None: + serialized_rooms[room_id]["limited"] = room_result.limited + + # This will omitted for invite/knock rooms with `stripped_state` + if room_result.prev_batch is not None: + serialized_rooms[room_id]["prev_batch"] = ( + await room_result.prev_batch.to_string(self.store) + ) + + # This will omitted for invite/knock rooms with `stripped_state` + if room_result.num_live is not None: + serialized_rooms[room_id]["num_live"] = room_result.num_live + + # Field should be absent on non-DM rooms + if room_result.is_dm: + serialized_rooms[room_id]["is_dm"] = room_result.is_dm + + # Stripped state only applies to invite/knock rooms + if room_result.stripped_state is not None: + # TODO: `knocked_state` but that isn't specced yet. + # + # TODO: Instead of adding `knocked_state`, it would be good to rename + # this to `stripped_state` so it can be shared between invite and knock + # rooms, see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919 + serialized_rooms[room_id]["invite_state"] = room_result.stripped_state + + return serialized_rooms + def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: SyncRestServlet(hs).register(http_server) diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py index 1628d58926..c32c626905 100644 --- a/synapse/rest/media/download_resource.py +++ b/synapse/rest/media/download_resource.py @@ -105,4 +105,5 @@ class DownloadResource(RestServlet): file_name, max_timeout_ms, ip_address, + False, ) diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index cc9b162ae4..f3630fbbf1 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -436,6 +436,9 @@ class StateStorageController: ) ) + # FIXME: This will return incorrect results when there are timeline gaps. For + # example, when you try to get a point in the room we haven't backfilled before. + if last_event_id: state = await self.get_state_after_event( last_event_id, diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 304ac42411..042d595ea0 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -825,14 +825,13 @@ class DeviceInboxWorkerStore(SQLBaseStore): # Check if we've already inserted a matching message_id for that # origin. This can happen if the origin doesn't receive our # acknowledgement from the first time we received the message. - already_inserted = self.db_pool.simple_select_one_txn( + already_inserted = self.db_pool.simple_select_list_txn( txn, table="device_federation_inbox", keyvalues={"origin": origin, "message_id": message_id}, retcols=("message_id",), - allow_none=True, ) - if already_inserted is not None: + if already_inserted: return # Add an entry for this message_id so that we know we've processed diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 198e65cfa5..a5acea8c3b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -55,7 +55,7 @@ from synapse.api.room_versions import ( ) from synapse.events import EventBase, make_event_from_dict from synapse.events.snapshot import EventContext -from synapse.events.utils import prune_event +from synapse.events.utils import prune_event, strip_event from synapse.logging.context import ( PreserveLoggingContext, current_context, @@ -1025,15 +1025,7 @@ class EventsWorkerStore(SQLBaseStore): state_to_include = await self.get_events(selected_state_ids.values()) - return [ - { - "type": e.type, - "state_key": e.state_key, - "content": e.content, - "sender": e.sender, - } - for e in state_to_include.values() - ] + return [strip_event(e) for e in state_to_include.values()] def _maybe_start_fetch_thread(self) -> None: """Starts an event fetch thread if we are not yet at the maximum number.""" diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index b7eb3116ae..d34376b8df 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -44,6 +44,7 @@ what sort order was used: import logging from typing import ( TYPE_CHECKING, + AbstractSet, Any, Collection, Dict, @@ -62,7 +63,7 @@ from typing_extensions import Literal from twisted.internet import defer -from synapse.api.constants import Direction +from synapse.api.constants import Direction, EventTypes, Membership from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -111,6 +112,32 @@ class _EventsAround: end: RoomStreamToken +@attr.s(slots=True, frozen=True, auto_attribs=True) +class CurrentStateDeltaMembership: + """ + Attributes: + event_id: The "current" membership event ID in this room. + event_pos: The position of the "current" membership event in the event stream. + prev_event_id: The previous membership event in this room that was replaced by + the "current" one. May be `None` if there was no previous membership event. + room_id: The room ID of the membership event. + membership: The membership state of the user in the room + sender: The person who sent the membership event + """ + + room_id: str + # Event + event_id: Optional[str] + event_pos: PersistedEventPosition + membership: str + sender: Optional[str] + # Prev event + prev_event_id: Optional[str] + prev_event_pos: Optional[PersistedEventPosition] + prev_membership: Optional[str] + prev_sender: Optional[str] + + def generate_pagination_where_clause( direction: Direction, column_names: Tuple[str, str], @@ -390,6 +417,43 @@ def _filter_results( return True +def _filter_results_by_stream( + lower_token: Optional[RoomStreamToken], + upper_token: Optional[RoomStreamToken], + instance_name: str, + stream_ordering: int, +) -> bool: + """ + This function only works with "live" tokens with `stream_ordering` only. See + `_filter_results(...)` if you want to work with all tokens. + + Returns True if the event persisted by the given instance at the given + stream_ordering falls between the two tokens (taking a None + token to mean unbounded). + + Used to filter results from fetching events in the DB against the given + tokens. This is necessary to handle the case where the tokens include + position maps, which we handle by fetching more than necessary from the DB + and then filtering (rather than attempting to construct a complicated SQL + query). + """ + if lower_token: + assert lower_token.topological is None + + # If these are live tokens we compare the stream ordering against the + # writers stream position. + if stream_ordering <= lower_token.get_stream_pos_for_instance(instance_name): + return False + + if upper_token: + assert upper_token.topological is None + + if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering: + return False + + return True + + def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: # NB: This may create SQL clauses that don't optimise well (and we don't # have indices on all possible clauses). E.g. it may create @@ -734,6 +798,191 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return ret, key + async def get_current_state_delta_membership_changes_for_user( + self, + user_id: str, + from_key: RoomStreamToken, + to_key: RoomStreamToken, + excluded_room_ids: Optional[List[str]] = None, + ) -> List[CurrentStateDeltaMembership]: + """ + Fetch membership events (and the previous event that was replaced by that one) + for a given user. + + Note: This function only works with "live" tokens with `stream_ordering` only. + + We're looking for membership changes in the token range (> `from_key` and <= + `to_key`). + + Please be mindful to only use this with `from_key` and `to_key` tokens that are + recent enough to be after when the first local user joined the room. Otherwise, + the results may be incomplete or too greedy. For example, if you use a token + range before the first local user joined the room, you will see 0 events since + `current_state_delta_stream` tracks what the server thinks is the current state + of the room as time goes. It does not track how state progresses from the + beginning of the room. So for example, when you remotely join a room, the first + rows will just be the state when you joined and progress from there. + + You can probably reasonably use this with `/sync` because the `to_key` passed in + will be the "current" now token and the range will cover when the user joined + the room. + + Args: + user_id: The user ID to fetch membership events for. + from_key: The point in the stream to sync from (fetching events > this point). + to_key: The token to fetch rooms up to (fetching events <= this point). + excluded_room_ids: Optional list of room IDs to exclude from the results. + + Returns: + All membership changes to the current state in the token range. Events are + sorted by `stream_ordering` ascending. + """ + # Start by ruling out cases where a DB query is not necessary. + if from_key == to_key: + return [] + + if from_key: + has_changed = self._membership_stream_cache.has_entity_changed( + user_id, int(from_key.stream) + ) + if not has_changed: + return [] + + def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]: + # To handle tokens with a non-empty instance_map we fetch more + # results than necessary and then filter down + min_from_id = from_key.stream + max_to_id = to_key.get_max_stream_pos() + + args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id] + + # TODO: It would be good to assert that the `from_token`/`to_token` is >= + # the first row in `current_state_delta_stream` for the rooms we're + # interested in. Otherwise, we will end up with empty results and not know + # it. + + # We could `COALESCE(e.stream_ordering, s.stream_id)` to get more accurate + # stream positioning when available but given our usages, we can avoid the + # complexity. Between two (valid) stream tokens, we will still get all of + # the state changes. Since those events are persisted in a batch, valid + # tokens will either be before or after the batch of events. + # + # `stream_ordering` from the `events` table is more accurate when available + # since the `current_state_delta_stream` table only tracks that the current + # state is at this stream position (not what stream position the state event + # was added) and uses the *minimum* stream position for batches of events. + sql = """ + SELECT + s.room_id, + e.event_id, + s.instance_name, + s.stream_id, + m.membership, + e.sender, + s.prev_event_id, + e_prev.instance_name AS prev_instance_name, + e_prev.stream_ordering AS prev_stream_ordering, + m_prev.membership AS prev_membership, + e_prev.sender AS prev_sender + FROM current_state_delta_stream AS s + LEFT JOIN events AS e ON e.event_id = s.event_id + LEFT JOIN room_memberships AS m ON m.event_id = s.event_id + LEFT JOIN events AS e_prev ON e_prev.event_id = s.prev_event_id + LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id + WHERE s.stream_id > ? AND s.stream_id <= ? + AND s.type = ? + AND s.state_key = ? + ORDER BY s.stream_id ASC + """ + + txn.execute(sql, args) + + membership_changes: List[CurrentStateDeltaMembership] = [] + for ( + room_id, + event_id, + instance_name, + stream_ordering, + membership, + sender, + prev_event_id, + prev_instance_name, + prev_stream_ordering, + prev_membership, + prev_sender, + ) in txn: + assert room_id is not None + assert instance_name is not None + assert stream_ordering is not None + + if _filter_results_by_stream( + from_key, + to_key, + instance_name, + stream_ordering, + ): + # When the server leaves a room, it will insert new rows into the + # `current_state_delta_stream` table with `event_id = null` for all + # current state. This means we might already have a row for the + # leave event and then another for the same leave where the + # `event_id=null` but the `prev_event_id` is pointing back at the + # earlier leave event. We don't want to report the leave, if we + # already have a leave event. + if event_id is None and prev_membership == Membership.LEAVE: + continue + + membership_change = CurrentStateDeltaMembership( + room_id=room_id, + # Event + event_id=event_id, + event_pos=PersistedEventPosition( + instance_name=instance_name, + stream=stream_ordering, + ), + # When `s.event_id = null`, we won't be able to get respective + # `room_membership` but can assume the user has left the room + # because this only happens when the server leaves a room + # (meaning everyone locally left) or a state reset which removed + # the person from the room. + membership=( + membership if membership is not None else Membership.LEAVE + ), + sender=sender, + # Prev event + prev_event_id=prev_event_id, + prev_event_pos=( + PersistedEventPosition( + instance_name=prev_instance_name, + stream=prev_stream_ordering, + ) + if ( + prev_instance_name is not None + and prev_stream_ordering is not None + ) + else None + ), + prev_membership=prev_membership, + prev_sender=prev_sender, + ) + + membership_changes.append(membership_change) + + return membership_changes + + membership_changes = await self.db_pool.runInteraction( + "get_current_state_delta_membership_changes_for_user", f + ) + + room_ids_to_exclude: AbstractSet[str] = set() + if excluded_room_ids is not None: + room_ids_to_exclude = set(excluded_room_ids) + + return [ + membership_change + for membership_change in membership_changes + if membership_change.room_id not in room_ids_to_exclude + ] + @cancellable async def get_membership_changes_for_user( self, @@ -769,10 +1018,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ignore_room_clause = "" if excluded_rooms is not None and len(excluded_rooms) > 0: - ignore_room_clause = "AND e.room_id NOT IN (%s)" % ",".join( - "?" for _ in excluded_rooms + ignore_room_clause, ignore_room_args = make_in_list_sql_clause( + txn.database_engine, "e.room_id", excluded_rooms, negative=True ) - args = args + excluded_rooms + ignore_room_clause = f"AND {ignore_room_clause}" + args += ignore_room_args sql = """ SELECT m.event_id, instance_name, topological_ordering, stream_ordering @@ -1554,6 +1804,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) -> Tuple[List[EventBase], RoomStreamToken]: """Returns list of events before or after a given token. + When Direction.FORWARDS: from_key < x <= to_key + When Direction.BACKWARDS: from_key >= x > to_key + Args: room_id from_key: The token used to stream from @@ -1570,6 +1823,27 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): and `to_key`). """ + # We can bail early if we're looking forwards, and our `to_key` is already + # before our `from_key`. + if ( + direction == Direction.FORWARDS + and to_key is not None + and to_key.is_before_or_eq(from_key) + ): + # Token selection matches what we do in `_paginate_room_events_txn` if there + # are no rows + return [], to_key if to_key else from_key + # Or vice-versa, if we're looking backwards and our `from_key` is already before + # our `to_key`. + elif ( + direction == Direction.BACKWARDS + and to_key is not None + and from_key.is_before_or_eq(to_key) + ): + # Token selection matches what we do in `_paginate_room_events_txn` if there + # are no rows + return [], to_key if to_key else from_key + rows, token = await self.db_pool.runInteraction( "paginate_room_events", self._paginate_room_events_txn, diff --git a/synapse/storage/schema/main/delta/42/current_state_delta.sql b/synapse/storage/schema/main/delta/42/current_state_delta.sql index 876b61e6a5..3d2fd69480 100644 --- a/synapse/storage/schema/main/delta/42/current_state_delta.sql +++ b/synapse/storage/schema/main/delta/42/current_state_delta.sql @@ -32,7 +32,10 @@ * limitations under the License. */ - +-- Tracks what the server thinks is the current state of the room as time goes. It does +-- not track how state progresses from the beginning of the room. So for example, when +-- you remotely join a room, the first rows will just be the state when you joined and +-- progress from there. CREATE TABLE current_state_delta_stream ( stream_id BIGINT NOT NULL, room_id TEXT NOT NULL, diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 8ab9f90238..b22a13ef01 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1096,6 +1096,9 @@ class PersistedPosition: stream: int def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool: + """ + Checks whether this position happened after the token + """ return token.get_stream_pos_for_instance(self.instance_name) < self.stream diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 1d65551d5b..3cd3c8fb0f 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -31,9 +31,12 @@ else: from pydantic import Extra from synapse.events import EventBase -from synapse.types import JsonMapping, StreamToken, UserID +from synapse.types import JsonDict, JsonMapping, StreamToken, UserID from synapse.types.rest.client import SlidingSyncBody +if TYPE_CHECKING: + from synapse.handlers.relations import BundledAggregations + class ShutdownRoomParams(TypedDict): """ @@ -159,11 +162,16 @@ class SlidingSyncResult: entirely and NOT send "initial":false as this is wasteful on bandwidth. The absence of this flag means 'false'. required_state: The current state of the room - timeline: Latest events in the room. The last event is the most recent + timeline: Latest events in the room. The last event is the most recent. + bundled_aggregations: A mapping of event ID to the bundled aggregations for + the timeline events above. This allows clients to show accurate reaction + counts (or edits, threads), even if some of the reaction events were skipped + over in a gappy sync. is_dm: Flag to specify whether the room is a direct-message room (most likely between two people). - invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state` - in sync v2, absent on joined/left rooms + stripped_state: Stripped state events (for rooms where the usre is + invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2, + absent on joined/left rooms prev_batch: A token that can be passed as a start parameter to the `/rooms/<room_id>/messages` API to retrieve earlier messages. limited: True if their are more events than fit between the given position and now. @@ -185,21 +193,28 @@ class SlidingSyncResult: (with potentially other old events in the timeline). """ - name: str + name: Optional[str] avatar: Optional[str] heroes: Optional[List[EventBase]] initial: bool - required_state: List[EventBase] - timeline: List[EventBase] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + required_state: Optional[List[EventBase]] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + timeline_events: Optional[List[EventBase]] + bundled_aggregations: Optional[Dict[str, "BundledAggregations"]] is_dm: bool - invite_state: List[EventBase] - prev_batch: StreamToken - limited: bool + # Optional because it's only relevant to invite/knock rooms + stripped_state: Optional[List[JsonDict]] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + prev_batch: Optional[StreamToken] + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + limited: Optional[bool] joined_count: int invited_count: int notification_count: int highlight_count: int - num_live: int + # Only optional because it won't be included for invite/knock rooms with `stripped_state` + num_live: Optional[int] @attr.s(slots=True, frozen=True, auto_attribs=True) class SlidingWindowList: diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index e2c79c4106..55f6b44053 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -152,22 +152,14 @@ class SlidingSyncBody(RequestBodyModel): anyway. timeline_limit: The maximum number of timeline events to return per response. (Max 1000 messages) - include_old_rooms: Determines if `predecessor` rooms are included in the - `rooms` response. The user MUST be joined to old rooms for them to show up - in the response. """ - class IncludeOldRooms(RequestBodyModel): - timeline_limit: StrictInt - required_state: List[Tuple[StrictStr, StrictStr]] - required_state: List[Tuple[StrictStr, StrictStr]] # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 if TYPE_CHECKING: timeline_limit: int else: timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type] - include_old_rooms: Optional[IncludeOldRooms] = None class SlidingSyncList(CommonRoomParameters): """ @@ -208,9 +200,6 @@ class SlidingSyncBody(RequestBodyModel): } timeline_limit: The maximum number of timeline events to return per response. - include_old_rooms: Determines if `predecessor` rooms are included in the - `rooms` response. The user MUST be joined to old rooms for them to show up - in the response. include_heroes: Return a stripped variant of membership events (containing `user_id` and optionally `avatar_url` and `displayname`) for the users used to calculate the room name. @@ -270,7 +259,7 @@ class SlidingSyncBody(RequestBodyModel): is_encrypted: Optional[StrictBool] = None is_invite: Optional[StrictBool] = None room_types: Optional[List[Union[StrictStr, None]]] = None - not_room_types: Optional[List[StrictStr]] = None + not_room_types: Optional[List[Union[StrictStr, None]]] = None room_name_like: Optional[StrictStr] = None tags: Optional[List[StrictStr]] = None not_tags: Optional[List[StrictStr]] = None diff --git a/tests/federation/test_federation_media.py b/tests/federation/test_federation_media.py index 2c396adbe3..142f73cfdb 100644 --- a/tests/federation/test_federation_media.py +++ b/tests/federation/test_federation_media.py @@ -36,10 +36,9 @@ from synapse.util import Clock from tests import unittest from tests.test_utils import SMALL_PNG -from tests.unittest import override_config -class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase): +class FederationMediaDownloadsTest(unittest.FederatingHomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: super().prepare(reactor, clock, hs) @@ -65,9 +64,6 @@ class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase ) self.media_repo = hs.get_media_repository() - @override_config( - {"experimental_features": {"msc3916_authenticated_media_enabled": True}} - ) def test_file_download(self) -> None: content = io.BytesIO(b"file_to_stream") content_uri = self.get_success( @@ -82,7 +78,7 @@ class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase # test with a text file channel = self.make_signed_federation_request( "GET", - f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + f"/_matrix/federation/v1/media/download/{content_uri.media_id}", ) self.pump() self.assertEqual(200, channel.code) @@ -106,7 +102,8 @@ class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase # check that the text file and expected value exist found_file = any( - "\r\nContent-Type: text/plain\r\n\r\nfile_to_stream" in field + "\r\nContent-Type: text/plain\r\nContent-Disposition: inline; filename=test_upload\r\n\r\nfile_to_stream" + in field for field in stripped ) self.assertTrue(found_file) @@ -124,7 +121,7 @@ class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase # test with an image file channel = self.make_signed_federation_request( "GET", - f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + f"/_matrix/federation/v1/media/download/{content_uri.media_id}", ) self.pump() self.assertEqual(200, channel.code) @@ -149,25 +146,3 @@ class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase # check that the png file exists and matches what was uploaded found_file = any(SMALL_PNG in field for field in stripped_bytes) self.assertTrue(found_file) - - @override_config( - {"experimental_features": {"msc3916_authenticated_media_enabled": False}} - ) - def test_disable_config(self) -> None: - content = io.BytesIO(b"file_to_stream") - content_uri = self.get_success( - self.media_repo.create_content( - "text/plain", - "test_upload", - content, - 46, - UserID.from_string("@user_id:whatever.org"), - ) - ) - channel = self.make_signed_federation_request( - "GET", - f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", - ) - self.pump() - self.assertEqual(404, channel.code) - self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED") diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py index 8dd4521b18..713a798703 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py @@ -24,7 +24,14 @@ from parameterized import parameterized from twisted.test.proto_helpers import MemoryReactor -from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership +from synapse.api.constants import ( + AccountDataTypes, + EventContentFields, + EventTypes, + JoinRules, + Membership, + RoomTypes, +) from synapse.api.room_versions import RoomVersions from synapse.handlers.sliding_sync import SlidingSyncConfig from synapse.rest import admin @@ -63,6 +70,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): self.sliding_sync_handler = self.hs.get_sliding_sync_handler() self.store = self.hs.get_datastores().main self.event_sources = hs.get_event_sources() + self.storage_controllers = hs.get_storage_controllers() def test_no_rooms(self) -> None: """ @@ -90,10 +98,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") before_room_token = self.event_sources.get_current_token() - room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + join_response = self.helper.join(room_id, user1_id, tok=user1_tok) after_room_token = self.event_sources.get_current_token() @@ -106,6 +117,15 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): ) self.assertEqual(room_id_results.keys(), {room_id}) + # It should be pointing to the join event (latest membership event in the + # from/to range) + self.assertEqual( + room_id_results[room_id].event_id, + join_response["event_id"], + ) + # We should be considered `newly_joined` because we joined during the token + # range + self.assertEqual(room_id_results[room_id].newly_joined, True) def test_get_already_joined_room(self) -> None: """ @@ -113,8 +133,11 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") - room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + join_response = self.helper.join(room_id, user1_id, tok=user1_tok) after_room_token = self.event_sources.get_current_token() @@ -127,6 +150,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): ) self.assertEqual(room_id_results.keys(), {room_id}) + # It should be pointing to the join event (latest membership event in the + # from/to range) + self.assertEqual( + room_id_results[room_id].event_id, + join_response["event_id"], + ) + # We should *NOT* be `newly_joined` because we joined before the token range + self.assertEqual(room_id_results[room_id].newly_joined, False) def test_get_invited_banned_knocked_room(self) -> None: """ @@ -142,14 +173,18 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # Setup the invited room (user2 invites user1 to the room) invited_room_id = self.helper.create_room_as(user2_id, tok=user2_tok) - self.helper.invite(invited_room_id, targ=user1_id, tok=user2_tok) + invite_response = self.helper.invite( + invited_room_id, targ=user1_id, tok=user2_tok + ) # Setup the ban room (user2 bans user1 from the room) ban_room_id = self.helper.create_room_as( user2_id, tok=user2_tok, is_public=True ) self.helper.join(ban_room_id, user1_id, tok=user1_tok) - self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok) + ban_response = self.helper.ban( + ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok + ) # Setup the knock room (user1 knocks on the room) knock_room_id = self.helper.create_room_as( @@ -162,13 +197,19 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): tok=user2_tok, ) # User1 knocks on the room - channel = self.make_request( + knock_channel = self.make_request( "POST", "/_matrix/client/r0/knock/%s" % (knock_room_id,), b"{}", user1_tok, ) - self.assertEqual(channel.code, 200, channel.result) + self.assertEqual(knock_channel.code, 200, knock_channel.result) + knock_room_membership_state_event = self.get_success( + self.storage_controllers.state.get_current_state_event( + knock_room_id, EventTypes.Member, user1_id + ) + ) + assert knock_room_membership_state_event is not None after_room_token = self.event_sources.get_current_token() @@ -189,6 +230,25 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): knock_room_id, }, ) + # It should be pointing to the the respective membership event (latest + # membership event in the from/to range) + self.assertEqual( + room_id_results[invited_room_id].event_id, + invite_response["event_id"], + ) + self.assertEqual( + room_id_results[ban_room_id].event_id, + ban_response["event_id"], + ) + self.assertEqual( + room_id_results[knock_room_id].event_id, + knock_room_membership_state_event.event_id, + ) + # We should *NOT* be `newly_joined` because we were not joined at the the time + # of the `to_token`. + self.assertEqual(room_id_results[invited_room_id].newly_joined, False) + self.assertEqual(room_id_results[ban_room_id].newly_joined, False) + self.assertEqual(room_id_results[knock_room_id].newly_joined, False) def test_get_kicked_room(self) -> None: """ @@ -206,7 +266,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): ) self.helper.join(kick_room_id, user1_id, tok=user1_tok) # Kick user1 from the room - self.helper.change_membership( + kick_response = self.helper.change_membership( room=kick_room_id, src=user2_id, targ=user1_id, @@ -229,6 +289,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # The kicked room should show up self.assertEqual(room_id_results.keys(), {kick_room_id}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[kick_room_id].event_id, + kick_response["event_id"], + ) + # We should *NOT* be `newly_joined` because we were not joined at the the time + # of the `to_token`. + self.assertEqual(room_id_results[kick_room_id].newly_joined, False) def test_forgotten_rooms(self) -> None: """ @@ -329,7 +397,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # Leave during the from_token/to_token range (newly_left) room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) - self.helper.leave(room_id2, user1_id, tok=user1_tok) + _leave_response2 = self.helper.leave(room_id2, user1_id, tok=user1_tok) after_room2_token = self.event_sources.get_current_token() @@ -343,6 +411,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # Only the newly_left room should show up self.assertEqual(room_id_results.keys(), {room_id2}) + # It should be pointing to the latest membership event in the from/to range but + # the `event_id` is `None` because we left the room causing the server to leave + # the room because no other local users are in it (quirk of the + # `current_state_delta_stream` table that we source things from) + self.assertEqual( + room_id_results[room_id2].event_id, + None, # _leave_response2["event_id"], + ) + # We should *NOT* be `newly_joined` because we are instead `newly_left` + self.assertEqual(room_id_results[room_id2].newly_joined, False) def test_no_joins_after_to_token(self) -> None: """ @@ -351,16 +429,19 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") before_room1_token = self.event_sources.get_current_token() - room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) after_room1_token = self.event_sources.get_current_token() - # Room join after after our `to_token` shouldn't show up - room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) - _ = room_id2 + # Room join after our `to_token` shouldn't show up + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id2, user1_id, tok=user1_tok) room_id_results = self.get_success( self.sliding_sync_handler.get_sync_room_ids_for_user( @@ -371,6 +452,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): ) self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + join_response1["event_id"], + ) + # We should be `newly_joined` because we joined during the token range + self.assertEqual(room_id_results[room_id1].newly_joined, True) def test_join_during_range_and_left_room_after_to_token(self) -> None: """ @@ -380,15 +468,18 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") before_room1_token = self.event_sources.get_current_token() - room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) after_room1_token = self.event_sources.get_current_token() # Leave the room after we already have our tokens - self.helper.leave(room_id1, user1_id, tok=user1_tok) + leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok) room_id_results = self.get_success( self.sliding_sync_handler.get_sync_room_ids_for_user( @@ -401,6 +492,20 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # We should still see the room because we were joined during the # from_token/to_token time period. self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + join_response["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response": join_response["event_id"], + "leave_response": leave_response["event_id"], + } + ), + ) + # We should be `newly_joined` because we joined during the token range + self.assertEqual(room_id_results[room_id1].newly_joined, True) def test_join_before_range_and_left_room_after_to_token(self) -> None: """ @@ -410,13 +515,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") - room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) after_room1_token = self.event_sources.get_current_token() # Leave the room after we already have our tokens - self.helper.leave(room_id1, user1_id, tok=user1_tok) + leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok) room_id_results = self.get_success( self.sliding_sync_handler.get_sync_room_ids_for_user( @@ -428,6 +536,20 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # We should still see the room because we were joined before the `from_token` self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + join_response["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response": join_response["event_id"], + "leave_response": leave_response["event_id"], + } + ), + ) + # We should *NOT* be `newly_joined` because we joined before the token range + self.assertEqual(room_id_results[room_id1].newly_joined, False) def test_kicked_before_range_and_left_after_to_token(self) -> None: """ @@ -444,9 +566,9 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): kick_room_id = self.helper.create_room_as( user2_id, tok=user2_tok, is_public=True ) - self.helper.join(kick_room_id, user1_id, tok=user1_tok) + join_response1 = self.helper.join(kick_room_id, user1_id, tok=user1_tok) # Kick user1 from the room - self.helper.change_membership( + kick_response = self.helper.change_membership( room=kick_room_id, src=user2_id, targ=user1_id, @@ -463,8 +585,8 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # # We have to join before we can leave (leave -> leave isn't a valid transition # or at least it doesn't work in Synapse, 403 forbidden) - self.helper.join(kick_room_id, user1_id, tok=user1_tok) - self.helper.leave(kick_room_id, user1_id, tok=user1_tok) + join_response2 = self.helper.join(kick_room_id, user1_id, tok=user1_tok) + leave_response = self.helper.leave(kick_room_id, user1_id, tok=user1_tok) room_id_results = self.get_success( self.sliding_sync_handler.get_sync_room_ids_for_user( @@ -476,6 +598,22 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # We shouldn't see the room because it was forgotten self.assertEqual(room_id_results.keys(), {kick_room_id}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[kick_room_id].event_id, + kick_response["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response1": join_response1["event_id"], + "kick_response": kick_response["event_id"], + "join_response2": join_response2["event_id"], + "leave_response": leave_response["event_id"], + } + ), + ) + # We should *NOT* be `newly_joined` because we were kicked + self.assertEqual(room_id_results[kick_room_id].newly_joined, False) def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None: """ @@ -494,14 +632,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # leave and can still re-join. room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) # Join and leave the room during the from/to range - self.helper.join(room_id1, user1_id, tok=user1_tok) - self.helper.leave(room_id1, user1_id, tok=user1_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok) after_room1_token = self.event_sources.get_current_token() # Join and leave the room after we already have our tokens - self.helper.join(room_id1, user1_id, tok=user1_tok) - self.helper.leave(room_id1, user1_id, tok=user1_tok) + join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok) + leave_response2 = self.helper.leave(room_id1, user1_id, tok=user1_tok) room_id_results = self.get_success( self.sliding_sync_handler.get_sync_room_ids_for_user( @@ -513,6 +651,22 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # Room should still show up because it's newly_left during the from/to range self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + leave_response1["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response1": join_response1["event_id"], + "leave_response1": leave_response1["event_id"], + "join_response2": join_response2["event_id"], + "leave_response2": leave_response2["event_id"], + } + ), + ) + # We should *NOT* be `newly_joined` because we left during the token range + self.assertEqual(room_id_results[room_id1].newly_joined, False) def test_newly_left_during_range_and_join_after_to_token(self) -> None: """ @@ -531,13 +685,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # leave and can still re-join. room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) # Join and leave the room during the from/to range - self.helper.join(room_id1, user1_id, tok=user1_tok) - self.helper.leave(room_id1, user1_id, tok=user1_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok) after_room1_token = self.event_sources.get_current_token() # Join the room after we already have our tokens - self.helper.join(room_id1, user1_id, tok=user1_tok) + join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok) room_id_results = self.get_success( self.sliding_sync_handler.get_sync_room_ids_for_user( @@ -549,11 +703,26 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # Room should still show up because it's newly_left during the from/to range self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + leave_response1["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response1": join_response1["event_id"], + "leave_response1": leave_response1["event_id"], + "join_response2": join_response2["event_id"], + } + ), + ) + # We should *NOT* be `newly_joined` because we left during the token range + self.assertEqual(room_id_results[room_id1].newly_joined, False) def test_no_from_token(self) -> None: """ Test that if we don't provide a `from_token`, we get all the rooms that we we're - joined to up to the `to_token`. + joined up to the `to_token`. Providing `from_token` only really has the effect that it adds `newly_left` rooms to the response. @@ -569,7 +738,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) # Join room1 - self.helper.join(room_id1, user1_id, tok=user1_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) # Join and leave the room2 before the `to_token` self.helper.join(room_id2, user1_id, tok=user1_tok) @@ -590,6 +759,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # Only rooms we were joined to before the `to_token` should show up self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + join_response1["event_id"], + ) + # We should *NOT* be `newly_joined` because there is no `from_token` to + # define a "live" range to compare against + self.assertEqual(room_id_results[room_id1].newly_joined, False) def test_from_token_ahead_of_to_token(self) -> None: """ @@ -609,7 +786,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) # Join room1 before `before_room_token` - self.helper.join(room_id1, user1_id, tok=user1_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) # Join and leave the room2 before `before_room_token` self.helper.join(room_id2, user1_id, tok=user1_tok) @@ -651,6 +828,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # There won't be any newly_left rooms because the `from_token` is ahead of the # `to_token` and that range will give no membership changes to check. self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + join_response1["event_id"], + ) + # We should *NOT* be `newly_joined` because we joined `room1` before either of the tokens + self.assertEqual(room_id_results[room_id1].newly_joined, False) def test_leave_before_range_and_join_leave_after_to_token(self) -> None: """ @@ -741,16 +925,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # leave and can still re-join. room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) # Join, leave, join back to the room before the from/to range - self.helper.join(room_id1, user1_id, tok=user1_tok) - self.helper.leave(room_id1, user1_id, tok=user1_tok) - self.helper.join(room_id1, user1_id, tok=user1_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok) + join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok) after_room1_token = self.event_sources.get_current_token() # Leave and Join the room multiple times after we already have our tokens - self.helper.leave(room_id1, user1_id, tok=user1_tok) - self.helper.join(room_id1, user1_id, tok=user1_tok) - self.helper.leave(room_id1, user1_id, tok=user1_tok) + leave_response2 = self.helper.leave(room_id1, user1_id, tok=user1_tok) + join_response3 = self.helper.join(room_id1, user1_id, tok=user1_tok) + leave_response3 = self.helper.leave(room_id1, user1_id, tok=user1_tok) room_id_results = self.get_success( self.sliding_sync_handler.get_sync_room_ids_for_user( @@ -762,6 +946,24 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # Room should show up because it was newly_left and joined during the from/to range self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + join_response2["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response1": join_response1["event_id"], + "leave_response1": leave_response1["event_id"], + "join_response2": join_response2["event_id"], + "leave_response2": leave_response2["event_id"], + "join_response3": join_response3["event_id"], + "leave_response3": leave_response3["event_id"], + } + ), + ) + # We should be `newly_joined` because we joined during the token range + self.assertEqual(room_id_results[room_id1].newly_joined, True) def test_join_leave_multiple_times_before_range_and_after_to_token( self, @@ -781,16 +983,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # leave and can still re-join. room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) # Join, leave, join back to the room before the from/to range - self.helper.join(room_id1, user1_id, tok=user1_tok) - self.helper.leave(room_id1, user1_id, tok=user1_tok) - self.helper.join(room_id1, user1_id, tok=user1_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok) + join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok) after_room1_token = self.event_sources.get_current_token() # Leave and Join the room multiple times after we already have our tokens - self.helper.leave(room_id1, user1_id, tok=user1_tok) - self.helper.join(room_id1, user1_id, tok=user1_tok) - self.helper.leave(room_id1, user1_id, tok=user1_tok) + leave_response2 = self.helper.leave(room_id1, user1_id, tok=user1_tok) + join_response3 = self.helper.join(room_id1, user1_id, tok=user1_tok) + leave_response3 = self.helper.leave(room_id1, user1_id, tok=user1_tok) room_id_results = self.get_success( self.sliding_sync_handler.get_sync_room_ids_for_user( @@ -802,6 +1004,24 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): # Room should show up because we were joined before the from/to range self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + join_response2["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response1": join_response1["event_id"], + "leave_response1": leave_response1["event_id"], + "join_response2": join_response2["event_id"], + "leave_response2": leave_response2["event_id"], + "join_response3": join_response3["event_id"], + "leave_response3": leave_response3["event_id"], + } + ), + ) + # We should *NOT* be `newly_joined` because we joined before the token range + self.assertEqual(room_id_results[room_id1].newly_joined, False) def test_invite_before_range_and_join_leave_after_to_token( self, @@ -821,24 +1041,495 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) # Invited to the room before the token - self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + invite_response = self.helper.invite( + room_id1, src=user2_id, targ=user1_id, tok=user2_tok + ) after_room1_token = self.event_sources.get_current_token() # Join and leave the room after we already have our tokens + join_respsonse = self.helper.join(room_id1, user1_id, tok=user1_tok) + leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should show up because we were invited before the from/to range + self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + invite_response["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "invite_response": invite_response["event_id"], + "join_respsonse": join_respsonse["event_id"], + "leave_response": leave_response["event_id"], + } + ), + ) + # We should *NOT* be `newly_joined` because we were only invited before the + # token range + self.assertEqual(room_id_results[room_id1].newly_joined, False) + + def test_join_and_display_name_changes_in_token_range( + self, + ) -> None: + """ + Test that we point to the correct membership event within the from/to range even + if there are multiple `join` membership events in a row indicating + `displayname`/`avatar_url` updates. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + # Update the displayname during the token range + displayname_change_during_token_range_response = self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user1_id, + body={ + "membership": Membership.JOIN, + "displayname": "displayname during token range", + }, + tok=user1_tok, + ) + + after_room1_token = self.event_sources.get_current_token() + + # Update the displayname after the token range + displayname_change_after_token_range_response = self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user1_id, + body={ + "membership": Membership.JOIN, + "displayname": "displayname after token range", + }, + tok=user1_tok, + ) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should show up because we were joined during the from/to range + self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + displayname_change_during_token_range_response["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response": join_response["event_id"], + "displayname_change_during_token_range_response": displayname_change_during_token_range_response[ + "event_id" + ], + "displayname_change_after_token_range_response": displayname_change_after_token_range_response[ + "event_id" + ], + } + ), + ) + # We should be `newly_joined` because we joined during the token range + self.assertEqual(room_id_results[room_id1].newly_joined, True) + + def test_display_name_changes_in_token_range( + self, + ) -> None: + """ + Test that we point to the correct membership event within the from/to range even + if there is `displayname`/`avatar_url` updates. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Update the displayname during the token range + displayname_change_during_token_range_response = self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user1_id, + body={ + "membership": Membership.JOIN, + "displayname": "displayname during token range", + }, + tok=user1_tok, + ) + + after_change1_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_change1_token, + ) + ) + + # Room should show up because we were joined during the from/to range + self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + displayname_change_during_token_range_response["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response": join_response["event_id"], + "displayname_change_during_token_range_response": displayname_change_during_token_range_response[ + "event_id" + ], + } + ), + ) + # We should *NOT* be `newly_joined` because we joined before the token range + self.assertEqual(room_id_results[room_id1].newly_joined, False) + + def test_display_name_changes_before_and_after_token_range( + self, + ) -> None: + """ + Test that we point to the correct membership event even though there are no + membership events in the from/range but there are `displayname`/`avatar_url` + changes before/after the token range. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + # Update the displayname before the token range + displayname_change_before_token_range_response = self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user1_id, + body={ + "membership": Membership.JOIN, + "displayname": "displayname during token range", + }, + tok=user1_tok, + ) + + after_room1_token = self.event_sources.get_current_token() + + # Update the displayname after the token range + displayname_change_after_token_range_response = self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user1_id, + body={ + "membership": Membership.JOIN, + "displayname": "displayname after token range", + }, + tok=user1_tok, + ) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should show up because we were joined before the from/to range + self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + displayname_change_before_token_range_response["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response": join_response["event_id"], + "displayname_change_before_token_range_response": displayname_change_before_token_range_response[ + "event_id" + ], + "displayname_change_after_token_range_response": displayname_change_after_token_range_response[ + "event_id" + ], + } + ), + ) + # We should *NOT* be `newly_joined` because we joined before the token range + self.assertEqual(room_id_results[room_id1].newly_joined, False) + + def test_display_name_changes_leave_after_token_range( + self, + ) -> None: + """ + Test that we point to the correct membership event within the from/to range even + if there are multiple `join` membership events in a row indicating + `displayname`/`avatar_url` updates and we leave after the `to_token`. + + See condition "1a)" comments in the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + # Update the displayname during the token range + displayname_change_during_token_range_response = self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user1_id, + body={ + "membership": Membership.JOIN, + "displayname": "displayname during token range", + }, + tok=user1_tok, + ) + + after_room1_token = self.event_sources.get_current_token() + + # Update the displayname after the token range + displayname_change_after_token_range_response = self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user1_id, + body={ + "membership": Membership.JOIN, + "displayname": "displayname after token range", + }, + tok=user1_tok, + ) + + # Leave after the token + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should show up because we were joined during the from/to range + self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + displayname_change_during_token_range_response["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response": join_response["event_id"], + "displayname_change_during_token_range_response": displayname_change_during_token_range_response[ + "event_id" + ], + "displayname_change_after_token_range_response": displayname_change_after_token_range_response[ + "event_id" + ], + } + ), + ) + # We should be `newly_joined` because we joined during the token range + self.assertEqual(room_id_results[room_id1].newly_joined, True) + + def test_display_name_changes_join_after_token_range( + self, + ) -> None: + """ + Test that multiple `join` membership events (after the `to_token`) in a row + indicating `displayname`/`avatar_url` updates doesn't affect the results (we + joined after the token range so it shouldn't show up) + + See condition "1b)" comments in the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + + after_room1_token = self.event_sources.get_current_token() + + self.helper.join(room_id1, user1_id, tok=user1_tok) + # Update the displayname after the token range + self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user1_id, + body={ + "membership": Membership.JOIN, + "displayname": "displayname after token range", + }, + tok=user1_tok, + ) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # Room shouldn't show up because we joined after the from/to range + self.assertEqual(room_id_results.keys(), set()) + + def test_newly_joined_with_leave_join_in_token_range( + self, + ) -> None: + """ + Test that even though we're joined before the token range, if we leave and join + within the token range, it's still counted as `newly_joined`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) self.helper.join(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave and join back during the token range self.helper.leave(room_id1, user1_id, tok=user1_tok) + join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok) + + after_more_changes_token = self.event_sources.get_current_token() room_id_results = self.get_success( self.sliding_sync_handler.get_sync_room_ids_for_user( UserID.from_string(user1_id), from_token=after_room1_token, + to_token=after_more_changes_token, + ) + ) + + # Room should show up because we were joined during the from/to range + self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + join_response2["event_id"], + ) + # We should be considered `newly_joined` because there is some non-join event in + # between our latest join event. + self.assertEqual(room_id_results[room_id1].newly_joined, True) + + def test_newly_joined_only_joins_during_token_range( + self, + ) -> None: + """ + Test that a join and more joins caused by display name changes, all during the + token range, still count as `newly_joined`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join, leave, join back to the room before the from/to range + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + # Update the displayname during the token range (looks like another join) + displayname_change_during_token_range_response1 = self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user1_id, + body={ + "membership": Membership.JOIN, + "displayname": "displayname during token range", + }, + tok=user1_tok, + ) + # Update the displayname during the token range (looks like another join) + displayname_change_during_token_range_response2 = self.helper.send_state( + room_id1, + event_type=EventTypes.Member, + state_key=user1_id, + body={ + "membership": Membership.JOIN, + "displayname": "displayname during token range", + }, + tok=user1_tok, + ) + + after_room1_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, to_token=after_room1_token, ) ) - # Room should show up because we were invited before the from/to range + # Room should show up because it was newly_left and joined during the from/to range self.assertEqual(room_id_results.keys(), {room_id1}) + # It should be pointing to the latest membership event in the from/to range + self.assertEqual( + room_id_results[room_id1].event_id, + displayname_change_during_token_range_response2["event_id"], + "Corresponding map to disambiguate the opaque event IDs: " + + str( + { + "join_response1": join_response1["event_id"], + "displayname_change_during_token_range_response1": displayname_change_during_token_range_response1[ + "event_id" + ], + "displayname_change_during_token_range_response2": displayname_change_during_token_range_response2[ + "event_id" + ], + } + ), + ) + # We should be `newly_joined` because we first joined during the token range + self.assertEqual(room_id_results[room_id1].newly_joined, True) def test_multiple_rooms_are_not_confused( self, @@ -1363,6 +2054,211 @@ class FilterRoomsTestCase(HomeserverTestCase): self.assertEqual(falsy_filtered_room_map.keys(), {room_id}) + def test_filter_room_types(self) -> None: + """ + Test `filter.room_types` for different room types + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a normal room (no room type) + room_id = self.helper.create_room_as(user1_id, tok=user1_tok) + + # Create a space room + space_room_id = self.helper.create_room_as( + user1_id, + tok=user1_tok, + extra_content={ + "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE} + }, + ) + + # Create an arbitrarily typed room + foo_room_id = self.helper.create_room_as( + user1_id, + tok=user1_tok, + extra_content={ + "creation_content": { + EventContentFields.ROOM_TYPE: "org.matrix.foobarbaz" + } + }, + ) + + after_rooms_token = self.event_sources.get_current_token() + + # Get the rooms the user should be syncing with + sync_room_map = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=None, + to_token=after_rooms_token, + ) + ) + + # Try finding only normal rooms + filtered_room_map = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + sync_room_map, + SlidingSyncConfig.SlidingSyncList.Filters(room_types=[None]), + after_rooms_token, + ) + ) + + self.assertEqual(filtered_room_map.keys(), {room_id}) + + # Try finding only spaces + filtered_room_map = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + sync_room_map, + SlidingSyncConfig.SlidingSyncList.Filters(room_types=[RoomTypes.SPACE]), + after_rooms_token, + ) + ) + + self.assertEqual(filtered_room_map.keys(), {space_room_id}) + + # Try finding normal rooms and spaces + filtered_room_map = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + sync_room_map, + SlidingSyncConfig.SlidingSyncList.Filters( + room_types=[None, RoomTypes.SPACE] + ), + after_rooms_token, + ) + ) + + self.assertEqual(filtered_room_map.keys(), {room_id, space_room_id}) + + # Try finding an arbitrary room type + filtered_room_map = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + sync_room_map, + SlidingSyncConfig.SlidingSyncList.Filters( + room_types=["org.matrix.foobarbaz"] + ), + after_rooms_token, + ) + ) + + self.assertEqual(filtered_room_map.keys(), {foo_room_id}) + + def test_filter_not_room_types(self) -> None: + """ + Test `filter.not_room_types` for different room types + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Create a normal room (no room type) + room_id = self.helper.create_room_as(user1_id, tok=user1_tok) + + # Create a space room + space_room_id = self.helper.create_room_as( + user1_id, + tok=user1_tok, + extra_content={ + "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE} + }, + ) + + # Create an arbitrarily typed room + foo_room_id = self.helper.create_room_as( + user1_id, + tok=user1_tok, + extra_content={ + "creation_content": { + EventContentFields.ROOM_TYPE: "org.matrix.foobarbaz" + } + }, + ) + + after_rooms_token = self.event_sources.get_current_token() + + # Get the rooms the user should be syncing with + sync_room_map = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=None, + to_token=after_rooms_token, + ) + ) + + # Try finding *NOT* normal rooms + filtered_room_map = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + sync_room_map, + SlidingSyncConfig.SlidingSyncList.Filters(not_room_types=[None]), + after_rooms_token, + ) + ) + + self.assertEqual(filtered_room_map.keys(), {space_room_id, foo_room_id}) + + # Try finding *NOT* spaces + filtered_room_map = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + sync_room_map, + SlidingSyncConfig.SlidingSyncList.Filters( + not_room_types=[RoomTypes.SPACE] + ), + after_rooms_token, + ) + ) + + self.assertEqual(filtered_room_map.keys(), {room_id, foo_room_id}) + + # Try finding *NOT* normal rooms or spaces + filtered_room_map = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + sync_room_map, + SlidingSyncConfig.SlidingSyncList.Filters( + not_room_types=[None, RoomTypes.SPACE] + ), + after_rooms_token, + ) + ) + + self.assertEqual(filtered_room_map.keys(), {foo_room_id}) + + # Test how it behaves when we have both `room_types` and `not_room_types`. + # `not_room_types` should win. + filtered_room_map = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + sync_room_map, + SlidingSyncConfig.SlidingSyncList.Filters( + room_types=[None], not_room_types=[None] + ), + after_rooms_token, + ) + ) + + # Nothing matches because nothing is both a normal room and not a normal room + self.assertEqual(filtered_room_map.keys(), set()) + + # Test how it behaves when we have both `room_types` and `not_room_types`. + # `not_room_types` should win. + filtered_room_map = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + sync_room_map, + SlidingSyncConfig.SlidingSyncList.Filters( + room_types=[None, RoomTypes.SPACE], not_room_types=[None] + ), + after_rooms_token, + ) + ) + + self.assertEqual(filtered_room_map.keys(), {space_room_id}) + class SortRoomsTestCase(HomeserverTestCase): """ diff --git a/tests/http/test_client.py b/tests/http/test_client.py index a98091d711..721917f957 100644 --- a/tests/http/test_client.py +++ b/tests/http/test_client.py @@ -37,18 +37,155 @@ from synapse.http.client import ( BlocklistingAgentWrapper, BlocklistingReactorWrapper, BodyExceededMaxSize, + MultipartResponse, _DiscardBodyWithMaxSizeProtocol, + _MultipartParserProtocol, read_body_with_max_size, + read_multipart_response, ) from tests.server import FakeTransport, get_clock from tests.unittest import TestCase +class ReadMultipartResponseTests(TestCase): + data1 = b"\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: application/json\r\n\r\n{}\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: text/plain\r\nContent-Disposition: inline; filename=test_upload\r\n\r\nfile_" + data2 = b"to_stream\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n" + + redirect_data = b"\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: application/json\r\n\r\n{}\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nLocation: https://cdn.example.org/ab/c1/2345.txt\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n" + + def _build_multipart_response( + self, response_length: Union[int, str], max_length: int + ) -> Tuple[ + BytesIO, + "Deferred[MultipartResponse]", + _MultipartParserProtocol, + ]: + """Start reading the body, returns the response, result and proto""" + response = Mock(length=response_length) + result = BytesIO() + boundary = "6067d4698f8d40a0a794ea7d7379d53a" + deferred = read_multipart_response(response, result, boundary, max_length) + + # Fish the protocol out of the response. + protocol = response.deliverBody.call_args[0][0] + protocol.transport = Mock() + + return result, deferred, protocol + + def _assert_error( + self, + deferred: "Deferred[MultipartResponse]", + protocol: _MultipartParserProtocol, + ) -> None: + """Ensure that the expected error is received.""" + assert isinstance(deferred.result, Failure) + self.assertIsInstance(deferred.result.value, BodyExceededMaxSize) + assert protocol.transport is not None + # type-ignore: presumably abortConnection has been replaced with a Mock. + protocol.transport.abortConnection.assert_called_once() # type: ignore[attr-defined] + + def _cleanup_error(self, deferred: "Deferred[MultipartResponse]") -> None: + """Ensure that the error in the Deferred is handled gracefully.""" + called = [False] + + def errback(f: Failure) -> None: + called[0] = True + + deferred.addErrback(errback) + self.assertTrue(called[0]) + + def test_parse_file(self) -> None: + """ + Check that a multipart response containing a file is properly parsed + into the json/file parts, and the json and file are properly captured + """ + result, deferred, protocol = self._build_multipart_response(249, 250) + + # Start sending data. + protocol.dataReceived(self.data1) + protocol.dataReceived(self.data2) + # Close the connection. + protocol.connectionLost(Failure(ResponseDone())) + + multipart_response: MultipartResponse = deferred.result # type: ignore[assignment] + + self.assertEqual(multipart_response.json, b"{}") + self.assertEqual(result.getvalue(), b"file_to_stream") + self.assertEqual(multipart_response.length, len(b"file_to_stream")) + self.assertEqual(multipart_response.content_type, b"text/plain") + self.assertEqual( + multipart_response.disposition, b"inline; filename=test_upload" + ) + + def test_parse_redirect(self) -> None: + """ + check that a multipart response containing a redirect is properly parsed and redirect url is + returned + """ + result, deferred, protocol = self._build_multipart_response(249, 250) + + # Start sending data. + protocol.dataReceived(self.redirect_data) + # Close the connection. + protocol.connectionLost(Failure(ResponseDone())) + + multipart_response: MultipartResponse = deferred.result # type: ignore[assignment] + + self.assertEqual(multipart_response.json, b"{}") + self.assertEqual(result.getvalue(), b"") + self.assertEqual( + multipart_response.url, b"https://cdn.example.org/ab/c1/2345.txt" + ) + + def test_too_large(self) -> None: + """A response which is too large raises an exception.""" + result, deferred, protocol = self._build_multipart_response(UNKNOWN_LENGTH, 180) + + # Start sending data. + protocol.dataReceived(self.data1) + + self.assertEqual(result.getvalue(), b"file_") + self._assert_error(deferred, protocol) + self._cleanup_error(deferred) + + def test_additional_data(self) -> None: + """A connection can receive data after being closed.""" + result, deferred, protocol = self._build_multipart_response(UNKNOWN_LENGTH, 180) + + # Start sending data. + protocol.dataReceived(self.data1) + self._assert_error(deferred, protocol) + + # More data might have come in. + protocol.dataReceived(self.data2) + + self.assertEqual(result.getvalue(), b"file_") + self._assert_error(deferred, protocol) + self._cleanup_error(deferred) + + def test_content_length(self) -> None: + """The body shouldn't be read (at all) if the Content-Length header is too large.""" + result, deferred, protocol = self._build_multipart_response(250, 1) + + # Deferred shouldn't be called yet. + self.assertFalse(deferred.called) + + # Start sending data. + protocol.dataReceived(self.data1) + self._assert_error(deferred, protocol) + self._cleanup_error(deferred) + + # The data is never consumed. + self.assertEqual(result.getvalue(), b"") + + class ReadBodyWithMaxSizeTests(TestCase): - def _build_response( - self, length: Union[int, str] = UNKNOWN_LENGTH - ) -> Tuple[BytesIO, "Deferred[int]", _DiscardBodyWithMaxSizeProtocol]: + def _build_response(self, length: Union[int, str] = UNKNOWN_LENGTH) -> Tuple[ + BytesIO, + "Deferred[int]", + _DiscardBodyWithMaxSizeProtocol, + ]: """Start reading the body, returns the response, result and proto""" response = Mock(length=length) result = BytesIO() diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index 46d20ce775..024086b775 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -129,7 +129,7 @@ class MediaStorageTests(unittest.HomeserverTestCase): @attr.s(auto_attribs=True, slots=True, frozen=True) -class _TestImage: +class TestImage: """An image for testing thumbnailing with the expected results Attributes: @@ -158,7 +158,7 @@ class _TestImage: is_inline: bool = True -small_png = _TestImage( +small_png = TestImage( SMALL_PNG, b"image/png", b".png", @@ -175,7 +175,7 @@ small_png = _TestImage( ), ) -small_png_with_transparency = _TestImage( +small_png_with_transparency = TestImage( unhexlify( b"89504e470d0a1a0a0000000d49484452000000010000000101000" b"00000376ef9240000000274524e5300010194fdae0000000a4944" @@ -188,7 +188,7 @@ small_png_with_transparency = _TestImage( # different versions of Pillow. ) -small_lossless_webp = _TestImage( +small_lossless_webp = TestImage( unhexlify( b"524946461a000000574542505650384c0d0000002f0000001007" b"1011118888fe0700" ), @@ -196,7 +196,7 @@ small_lossless_webp = _TestImage( b".webp", ) -empty_file = _TestImage( +empty_file = TestImage( b"", b"image/gif", b".gif", @@ -204,7 +204,7 @@ empty_file = _TestImage( unable_to_thumbnail=True, ) -SVG = _TestImage( +SVG = TestImage( b"""<?xml version="1.0"?> <!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd"> @@ -236,7 +236,7 @@ urls = [ @parameterized_class(("test_image", "url"), itertools.product(test_images, urls)) class MediaRepoTests(unittest.HomeserverTestCase): servlets = [media.register_servlets] - test_image: ClassVar[_TestImage] + test_image: ClassVar[TestImage] hijack_auth = True user_id = "@test:user" url: ClassVar[str] diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py index 4927e45446..6fc4600c41 100644 --- a/tests/replication/test_multi_media_repo.py +++ b/tests/replication/test_multi_media_repo.py @@ -28,7 +28,7 @@ from twisted.web.http import HTTPChannel from twisted.web.server import Request from synapse.rest import admin -from synapse.rest.client import login +from synapse.rest.client import login, media from synapse.server import HomeServer from synapse.util import Clock @@ -255,6 +255,238 @@ class MediaRepoShardTestCase(BaseMultiWorkerStreamTestCase): return sum(len(files) for _, _, files in os.walk(path)) +class AuthenticatedMediaRepoShardTestCase(BaseMultiWorkerStreamTestCase): + """Checks running multiple media repos work correctly using autheticated media paths""" + + servlets = [ + admin.register_servlets_for_client_rest_resource, + login.register_servlets, + media.register_servlets, + ] + + file_data = b"\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: application/json\r\n\r\n{}\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: text/plain\r\nContent-Disposition: inline; filename=test_upload\r\n\r\nfile_to_stream\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n" + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.user_id = self.register_user("user", "pass") + self.access_token = self.login("user", "pass") + + self.reactor.lookups["example.com"] = "1.2.3.4" + + def default_config(self) -> dict: + conf = super().default_config() + conf["federation_custom_ca_list"] = [get_test_ca_cert_file()] + return conf + + def make_worker_hs( + self, worker_app: str, extra_config: Optional[dict] = None, **kwargs: Any + ) -> HomeServer: + worker_hs = super().make_worker_hs(worker_app, extra_config, **kwargs) + # Force the media paths onto the replication resource. + worker_hs.get_media_repository_resource().register_servlets( + self._hs_to_site[worker_hs].resource, worker_hs + ) + return worker_hs + + def _get_media_req( + self, hs: HomeServer, target: str, media_id: str + ) -> Tuple[FakeChannel, Request]: + """Request some remote media from the given HS by calling the download + API. + + This then triggers an outbound request from the HS to the target. + + Returns: + The channel for the *client* request and the *outbound* request for + the media which the caller should respond to. + """ + channel = make_request( + self.reactor, + self._hs_to_site[hs], + "GET", + f"/_matrix/client/v1/media/download/{target}/{media_id}", + shorthand=False, + access_token=self.access_token, + await_result=False, + ) + self.pump() + + clients = self.reactor.tcpClients + self.assertGreaterEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients.pop() + + # build the test server + server_factory = Factory.forProtocol(HTTPChannel) + # Request.finish expects the factory to have a 'log' method. + server_factory.log = _log_request + + server_tls_protocol = wrap_server_factory_for_tls( + server_factory, self.reactor, sanlist=[b"DNS:example.com"] + ).buildProtocol(None) + + # now, tell the client protocol factory to build the client protocol (it will be a + # _WrappingProtocol, around a TLSMemoryBIOProtocol, around an + # HTTP11ClientProtocol) and wire the output of said protocol up to the server via + # a FakeTransport. + # + # Normally this would be done by the TCP socket code in Twisted, but we are + # stubbing that out here. + client_protocol = client_factory.buildProtocol(None) + client_protocol.makeConnection( + FakeTransport(server_tls_protocol, self.reactor, client_protocol) + ) + + # tell the server tls protocol to send its stuff back to the client, too + server_tls_protocol.makeConnection( + FakeTransport(client_protocol, self.reactor, server_tls_protocol) + ) + + # fish the test server back out of the server-side TLS protocol. + http_server: HTTPChannel = server_tls_protocol.wrappedProtocol + + # give the reactor a pump to get the TLS juices flowing. + self.reactor.pump((0.1,)) + + self.assertEqual(len(http_server.requests), 1) + request = http_server.requests[0] + + self.assertEqual(request.method, b"GET") + self.assertEqual( + request.path, + f"/_matrix/federation/v1/media/download/{media_id}".encode(), + ) + self.assertEqual( + request.requestHeaders.getRawHeaders(b"host"), [target.encode("utf-8")] + ) + + return channel, request + + def test_basic(self) -> None: + """Test basic fetching of remote media from a single worker.""" + hs1 = self.make_worker_hs("synapse.app.generic_worker") + + channel, request = self._get_media_req(hs1, "example.com:443", "ABC123") + + request.setResponseCode(200) + request.responseHeaders.setRawHeaders( + b"Content-Type", + ["multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a"], + ) + request.write(self.file_data) + request.finish() + + self.pump(0.1) + + self.assertEqual(channel.code, 200) + self.assertEqual(channel.result["body"], b"file_to_stream") + + def test_download_simple_file_race(self) -> None: + """Test that fetching remote media from two different processes at the + same time works. + """ + hs1 = self.make_worker_hs("synapse.app.generic_worker") + hs2 = self.make_worker_hs("synapse.app.generic_worker") + + start_count = self._count_remote_media() + + # Make two requests without responding to the outbound media requests. + channel1, request1 = self._get_media_req(hs1, "example.com:443", "ABC123") + channel2, request2 = self._get_media_req(hs2, "example.com:443", "ABC123") + + # Respond to the first outbound media request and check that the client + # request is successful + request1.setResponseCode(200) + request1.responseHeaders.setRawHeaders( + b"Content-Type", + ["multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a"], + ) + request1.write(self.file_data) + request1.finish() + + self.pump(0.1) + + self.assertEqual(channel1.code, 200, channel1.result["body"]) + self.assertEqual(channel1.result["body"], b"file_to_stream") + + # Now respond to the second with the same content. + request2.setResponseCode(200) + request2.responseHeaders.setRawHeaders( + b"Content-Type", + ["multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a"], + ) + request2.write(self.file_data) + request2.finish() + + self.pump(0.1) + + self.assertEqual(channel2.code, 200, channel2.result["body"]) + self.assertEqual(channel2.result["body"], b"file_to_stream") + + # We expect only one new file to have been persisted. + self.assertEqual(start_count + 1, self._count_remote_media()) + + def test_download_image_race(self) -> None: + """Test that fetching remote *images* from two different processes at + the same time works. + + This checks that races generating thumbnails are handled correctly. + """ + hs1 = self.make_worker_hs("synapse.app.generic_worker") + hs2 = self.make_worker_hs("synapse.app.generic_worker") + + start_count = self._count_remote_thumbnails() + + channel1, request1 = self._get_media_req(hs1, "example.com:443", "PIC1") + channel2, request2 = self._get_media_req(hs2, "example.com:443", "PIC1") + + request1.setResponseCode(200) + request1.responseHeaders.setRawHeaders( + b"Content-Type", + ["multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a"], + ) + img_data = b"\r\n\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: application/json\r\n\r\n{}\r\n--6067d4698f8d40a0a794ea7d7379d53a\r\nContent-Type: image/png\r\nContent-Disposition: inline; filename=test_img\r\n\r\n" + request1.write(img_data) + request1.write(SMALL_PNG) + request1.write(b"\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n") + request1.finish() + + self.pump(0.1) + + self.assertEqual(channel1.code, 200, channel1.result["body"]) + self.assertEqual(channel1.result["body"], SMALL_PNG) + + request2.setResponseCode(200) + request2.responseHeaders.setRawHeaders( + b"Content-Type", + ["multipart/mixed; boundary=6067d4698f8d40a0a794ea7d7379d53a"], + ) + request2.write(img_data) + request2.write(SMALL_PNG) + request2.write(b"\r\n--6067d4698f8d40a0a794ea7d7379d53a--\r\n\r\n") + request2.finish() + + self.pump(0.1) + + self.assertEqual(channel2.code, 200, channel2.result["body"]) + self.assertEqual(channel2.result["body"], SMALL_PNG) + + # We expect only three new thumbnails to have been persisted. + self.assertEqual(start_count + 3, self._count_remote_thumbnails()) + + def _count_remote_media(self) -> int: + """Count the number of files in our remote media directory.""" + path = os.path.join( + self.hs.get_media_repository().primary_base_path, "remote_content" + ) + return sum(len(files) for _, _, files in os.walk(path)) + + def _count_remote_thumbnails(self) -> int: + """Count the number of files in our remote thumbnails directory.""" + path = os.path.join( + self.hs.get_media_repository().primary_base_path, "remote_thumbnail" + ) + return sum(len(files) for _, _, files in os.walk(path)) + + def _log_request(request: Request) -> None: """Implements Factory.log, which is expected by Request.finish""" logger.info("Completed request %s", request) diff --git a/tests/rest/client/test_media.py b/tests/rest/client/test_media.py index be4a289ec1..6b5af2dbb6 100644 --- a/tests/rest/client/test_media.py +++ b/tests/rest/client/test_media.py @@ -19,31 +19,54 @@ # # import base64 +import io import json import os import re -from typing import Any, Dict, Optional, Sequence, Tuple, Type +from typing import Any, BinaryIO, ClassVar, Dict, List, Optional, Sequence, Tuple, Type +from unittest.mock import MagicMock, Mock, patch +from urllib import parse from urllib.parse import quote, urlencode +from parameterized import parameterized_class + +from twisted.internet import defer from twisted.internet._resolver import HostResolution from twisted.internet.address import IPv4Address, IPv6Address +from twisted.internet.defer import Deferred from twisted.internet.error import DNSLookupError from twisted.internet.interfaces import IAddress, IResolutionReceiver +from twisted.python.failure import Failure from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactor +from twisted.web.http_headers import Headers +from twisted.web.iweb import UNKNOWN_LENGTH, IResponse from twisted.web.resource import Resource +from synapse.api.errors import HttpResponseException +from synapse.api.ratelimiting import Ratelimiter from synapse.config.oembed import OEmbedEndpointConfig +from synapse.http.client import MultipartResponse +from synapse.http.types import QueryParams +from synapse.logging.context import make_deferred_yieldable from synapse.media._base import FileInfo from synapse.media.url_previewer import IMAGE_CACHE_EXPIRY_MS from synapse.rest import admin from synapse.rest.client import login, media from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, UserID from synapse.util import Clock from synapse.util.stringutils import parse_and_validate_mxc_uri from tests import unittest -from tests.server import FakeTransport, ThreadedMemoryReactorClock +from tests.media.test_media_storage import ( + SVG, + TestImage, + empty_file, + small_lossless_webp, + small_png, + small_png_with_transparency, +) +from tests.server import FakeChannel, FakeTransport, ThreadedMemoryReactorClock from tests.test_utils import SMALL_PNG from tests.unittest import override_config @@ -1607,3 +1630,583 @@ class UnstableMediaConfigTest(unittest.HomeserverTestCase): self.assertEqual( channel.json_body["m.upload.size"], self.hs.config.media.max_upload_size ) + + +class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase): + servlets = [ + media.register_servlets, + login.register_servlets, + admin.register_servlets, + ] + + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: + config = self.default_config() + + self.storage_path = self.mktemp() + self.media_store_path = self.mktemp() + os.mkdir(self.storage_path) + os.mkdir(self.media_store_path) + config["media_store_path"] = self.media_store_path + + provider_config = { + "module": "synapse.media.storage_provider.FileStorageProviderBackend", + "store_local": True, + "store_synchronous": False, + "store_remote": True, + "config": {"directory": self.storage_path}, + } + + config["media_storage_providers"] = [provider_config] + + return self.setup_test_homeserver(config=config) + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.repo = hs.get_media_repository() + self.client = hs.get_federation_http_client() + self.store = hs.get_datastores().main + self.user = self.register_user("user", "pass") + self.tok = self.login("user", "pass") + + # mock actually reading file body + def read_multipart_response_30MiB(*args: Any, **kwargs: Any) -> Deferred: + d: Deferred = defer.Deferred() + d.callback(MultipartResponse(b"{}", 31457280, b"img/png", None)) + return d + + def read_multipart_response_50MiB(*args: Any, **kwargs: Any) -> Deferred: + d: Deferred = defer.Deferred() + d.callback(MultipartResponse(b"{}", 31457280, b"img/png", None)) + return d + + @patch( + "synapse.http.matrixfederationclient.read_multipart_response", + read_multipart_response_30MiB, + ) + def test_download_ratelimit_default(self) -> None: + """ + Test remote media download ratelimiting against default configuration - 500MB bucket + and 87kb/second drain rate + """ + + # mock out actually sending the request, returns a 30MiB response + async def _send_request(*args: Any, **kwargs: Any) -> IResponse: + resp = MagicMock(spec=IResponse) + resp.code = 200 + resp.length = 31457280 + resp.headers = Headers( + {"Content-Type": ["multipart/mixed; boundary=gc0p4Jq0M2Yt08jU534c0p"]} + ) + resp.phrase = b"OK" + return resp + + self.client._send_request = _send_request # type: ignore + + # first request should go through + channel = self.make_request( + "GET", + "/_matrix/client/v1/media/download/remote.org/abc", + shorthand=False, + access_token=self.tok, + ) + assert channel.code == 200 + + # next 15 should go through + for i in range(15): + channel2 = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/remote.org/abc{i}", + shorthand=False, + access_token=self.tok, + ) + assert channel2.code == 200 + + # 17th will hit ratelimit + channel3 = self.make_request( + "GET", + "/_matrix/client/v1/media/download/remote.org/abcd", + shorthand=False, + access_token=self.tok, + ) + assert channel3.code == 429 + + # however, a request from a different IP will go through + channel4 = self.make_request( + "GET", + "/_matrix/client/v1/media/download/remote.org/abcde", + shorthand=False, + client_ip="187.233.230.159", + access_token=self.tok, + ) + assert channel4.code == 200 + + # at 87Kib/s it should take about 2 minutes for enough to drain from bucket that another + # 30MiB download is authorized - The last download was blocked at 503,316,480. + # The next download will be authorized when bucket hits 492,830,720 + # (524,288,000 total capacity - 31,457,280 download size) so 503,316,480 - 492,830,720 ~= 10,485,760 + # needs to drain before another download will be authorized, that will take ~= + # 2 minutes (10,485,760/89,088/60) + self.reactor.pump([2.0 * 60.0]) + + # enough has drained and next request goes through + channel5 = self.make_request( + "GET", + "/_matrix/client/v1/media/download/remote.org/abcdef", + shorthand=False, + access_token=self.tok, + ) + assert channel5.code == 200 + + @override_config( + { + "remote_media_download_per_second": "50M", + "remote_media_download_burst_count": "50M", + } + ) + @patch( + "synapse.http.matrixfederationclient.read_multipart_response", + read_multipart_response_50MiB, + ) + def test_download_rate_limit_config(self) -> None: + """ + Test that download rate limit config options are correctly picked up and applied + """ + + async def _send_request(*args: Any, **kwargs: Any) -> IResponse: + resp = MagicMock(spec=IResponse) + resp.code = 200 + resp.length = 52428800 + resp.headers = Headers( + {"Content-Type": ["multipart/mixed; boundary=gc0p4Jq0M2Yt08jU534c0p"]} + ) + resp.phrase = b"OK" + return resp + + self.client._send_request = _send_request # type: ignore + + # first request should go through + channel = self.make_request( + "GET", + "/_matrix/client/v1/media/download/remote.org/abc", + shorthand=False, + access_token=self.tok, + ) + assert channel.code == 200 + + # immediate second request should fail + channel = self.make_request( + "GET", + "/_matrix/client/v1/media/download/remote.org/abcd", + shorthand=False, + access_token=self.tok, + ) + assert channel.code == 429 + + # advance half a second + self.reactor.pump([0.5]) + + # request still fails + channel = self.make_request( + "GET", + "/_matrix/client/v1/media/download/remote.org/abcde", + shorthand=False, + access_token=self.tok, + ) + assert channel.code == 429 + + # advance another half second + self.reactor.pump([0.5]) + + # enough has drained from bucket and request is successful + channel = self.make_request( + "GET", + "/_matrix/client/v1/media/download/remote.org/abcdef", + shorthand=False, + access_token=self.tok, + ) + assert channel.code == 200 + + @patch( + "synapse.http.matrixfederationclient.read_multipart_response", + read_multipart_response_30MiB, + ) + def test_download_ratelimit_max_size_sub(self) -> None: + """ + Test that if no content-length is provided, the default max size is applied instead + """ + + # mock out actually sending the request + async def _send_request(*args: Any, **kwargs: Any) -> IResponse: + resp = MagicMock(spec=IResponse) + resp.code = 200 + resp.length = UNKNOWN_LENGTH + resp.headers = Headers( + {"Content-Type": ["multipart/mixed; boundary=gc0p4Jq0M2Yt08jU534c0p"]} + ) + resp.phrase = b"OK" + return resp + + self.client._send_request = _send_request # type: ignore + + # ten requests should go through using the max size (500MB/50MB) + for i in range(10): + channel2 = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/remote.org/abc{i}", + shorthand=False, + access_token=self.tok, + ) + assert channel2.code == 200 + + # eleventh will hit ratelimit + channel3 = self.make_request( + "GET", + "/_matrix/client/v1/media/download/remote.org/abcd", + shorthand=False, + access_token=self.tok, + ) + assert channel3.code == 429 + + def test_file_download(self) -> None: + content = io.BytesIO(b"file_to_stream") + content_uri = self.get_success( + self.repo.create_content( + "text/plain", + "test_upload", + content, + 46, + UserID.from_string("@user_id:whatever.org"), + ) + ) + # test with a text file + channel = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/test/{content_uri.media_id}", + shorthand=False, + access_token=self.tok, + ) + self.pump() + self.assertEqual(200, channel.code) + + +test_images = [ + small_png, + small_png_with_transparency, + small_lossless_webp, + empty_file, + SVG, +] +input_values = [(x,) for x in test_images] + + +@parameterized_class(("test_image",), input_values) +class DownloadTestCase(unittest.HomeserverTestCase): + test_image: ClassVar[TestImage] + servlets = [ + media.register_servlets, + login.register_servlets, + admin.register_servlets, + ] + + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: + self.fetches: List[ + Tuple[ + "Deferred[Any]", + str, + str, + Optional[QueryParams], + ] + ] = [] + + def federation_get_file( + destination: str, + path: str, + output_stream: BinaryIO, + download_ratelimiter: Ratelimiter, + ip_address: Any, + max_size: int, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + ignore_backoff: bool = False, + follow_redirects: bool = False, + ) -> "Deferred[Tuple[int, Dict[bytes, List[bytes]], bytes]]": + """A mock for MatrixFederationHttpClient.federation_get_file.""" + + def write_to( + r: Tuple[bytes, Tuple[int, Dict[bytes, List[bytes]], bytes]] + ) -> Tuple[int, Dict[bytes, List[bytes]], bytes]: + data, response = r + output_stream.write(data) + return response + + def write_err(f: Failure) -> Failure: + f.trap(HttpResponseException) + output_stream.write(f.value.response) + return f + + d: Deferred[Tuple[bytes, Tuple[int, Dict[bytes, List[bytes]], bytes]]] = ( + Deferred() + ) + self.fetches.append((d, destination, path, args)) + # Note that this callback changes the value held by d. + d_after_callback = d.addCallbacks(write_to, write_err) + return make_deferred_yieldable(d_after_callback) + + def get_file( + destination: str, + path: str, + output_stream: BinaryIO, + download_ratelimiter: Ratelimiter, + ip_address: Any, + max_size: int, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + ignore_backoff: bool = False, + follow_redirects: bool = False, + ) -> "Deferred[Tuple[int, Dict[bytes, List[bytes]]]]": + """A mock for MatrixFederationHttpClient.get_file.""" + + def write_to( + r: Tuple[bytes, Tuple[int, Dict[bytes, List[bytes]]]] + ) -> Tuple[int, Dict[bytes, List[bytes]]]: + data, response = r + output_stream.write(data) + return response + + def write_err(f: Failure) -> Failure: + f.trap(HttpResponseException) + output_stream.write(f.value.response) + return f + + d: Deferred[Tuple[bytes, Tuple[int, Dict[bytes, List[bytes]]]]] = Deferred() + self.fetches.append((d, destination, path, args)) + # Note that this callback changes the value held by d. + d_after_callback = d.addCallbacks(write_to, write_err) + return make_deferred_yieldable(d_after_callback) + + # Mock out the homeserver's MatrixFederationHttpClient + client = Mock() + client.federation_get_file = federation_get_file + client.get_file = get_file + + self.storage_path = self.mktemp() + self.media_store_path = self.mktemp() + os.mkdir(self.storage_path) + os.mkdir(self.media_store_path) + + config = self.default_config() + config["media_store_path"] = self.media_store_path + config["max_image_pixels"] = 2000000 + + provider_config = { + "module": "synapse.media.storage_provider.FileStorageProviderBackend", + "store_local": True, + "store_synchronous": False, + "store_remote": True, + "config": {"directory": self.storage_path}, + } + config["media_storage_providers"] = [provider_config] + config["experimental_features"] = {"msc3916_authenticated_media_enabled": True} + + hs = self.setup_test_homeserver(config=config, federation_http_client=client) + + return hs + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.media_repo = hs.get_media_repository() + + self.remote = "example.com" + self.media_id = "12345" + + self.user = self.register_user("user", "pass") + self.tok = self.login("user", "pass") + + def _req( + self, content_disposition: Optional[bytes], include_content_type: bool = True + ) -> FakeChannel: + channel = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/{self.remote}/{self.media_id}", + shorthand=False, + await_result=False, + access_token=self.tok, + ) + self.pump() + + # We've made one fetch, to example.com, using the federation media URL + self.assertEqual(len(self.fetches), 1) + self.assertEqual(self.fetches[0][1], "example.com") + self.assertEqual( + self.fetches[0][2], "/_matrix/federation/v1/media/download/" + self.media_id + ) + self.assertEqual( + self.fetches[0][3], + {"timeout_ms": "20000"}, + ) + + headers = { + b"Content-Length": [b"%d" % (len(self.test_image.data))], + } + + if include_content_type: + headers[b"Content-Type"] = [self.test_image.content_type] + + if content_disposition: + headers[b"Content-Disposition"] = [content_disposition] + + self.fetches[0][0].callback( + (self.test_image.data, (len(self.test_image.data), headers, b"{}")) + ) + + self.pump() + self.assertEqual(channel.code, 200) + + return channel + + def test_handle_missing_content_type(self) -> None: + channel = self._req( + b"attachment; filename=out" + self.test_image.extension, + include_content_type=False, + ) + headers = channel.headers + self.assertEqual(channel.code, 200) + self.assertEqual( + headers.getRawHeaders(b"Content-Type"), [b"application/octet-stream"] + ) + + def test_disposition_filename_ascii(self) -> None: + """ + If the filename is filename=<ascii> then Synapse will decode it as an + ASCII string, and use filename= in the response. + """ + channel = self._req(b"attachment; filename=out" + self.test_image.extension) + + headers = channel.headers + self.assertEqual( + headers.getRawHeaders(b"Content-Type"), [self.test_image.content_type] + ) + self.assertEqual( + headers.getRawHeaders(b"Content-Disposition"), + [ + (b"inline" if self.test_image.is_inline else b"attachment") + + b"; filename=out" + + self.test_image.extension + ], + ) + + def test_disposition_filenamestar_utf8escaped(self) -> None: + """ + If the filename is filename=*utf8''<utf8 escaped> then Synapse will + correctly decode it as the UTF-8 string, and use filename* in the + response. + """ + filename = parse.quote("\u2603".encode()).encode("ascii") + channel = self._req( + b"attachment; filename*=utf-8''" + filename + self.test_image.extension + ) + + headers = channel.headers + self.assertEqual( + headers.getRawHeaders(b"Content-Type"), [self.test_image.content_type] + ) + self.assertEqual( + headers.getRawHeaders(b"Content-Disposition"), + [ + (b"inline" if self.test_image.is_inline else b"attachment") + + b"; filename*=utf-8''" + + filename + + self.test_image.extension + ], + ) + + def test_disposition_none(self) -> None: + """ + If there is no filename, Content-Disposition should only + be a disposition type. + """ + channel = self._req(None) + + headers = channel.headers + self.assertEqual( + headers.getRawHeaders(b"Content-Type"), [self.test_image.content_type] + ) + self.assertEqual( + headers.getRawHeaders(b"Content-Disposition"), + [b"inline" if self.test_image.is_inline else b"attachment"], + ) + + def test_x_robots_tag_header(self) -> None: + """ + Tests that the `X-Robots-Tag` header is present, which informs web crawlers + to not index, archive, or follow links in media. + """ + channel = self._req(b"attachment; filename=out" + self.test_image.extension) + + headers = channel.headers + self.assertEqual( + headers.getRawHeaders(b"X-Robots-Tag"), + [b"noindex, nofollow, noarchive, noimageindex"], + ) + + def test_cross_origin_resource_policy_header(self) -> None: + """ + Test that the Cross-Origin-Resource-Policy header is set to "cross-origin" + allowing web clients to embed media from the downloads API. + """ + channel = self._req(b"attachment; filename=out" + self.test_image.extension) + + headers = channel.headers + + self.assertEqual( + headers.getRawHeaders(b"Cross-Origin-Resource-Policy"), + [b"cross-origin"], + ) + + def test_unknown_federation_endpoint(self) -> None: + """ + Test that if the downloadd request to remote federation endpoint returns a 404 + we fall back to the _matrix/media endpoint + """ + channel = self.make_request( + "GET", + f"/_matrix/client/v1/media/download/{self.remote}/{self.media_id}", + shorthand=False, + await_result=False, + access_token=self.tok, + ) + self.pump() + + # We've made one fetch, to example.com, using the media URL, and asking + # the other server not to do a remote fetch + self.assertEqual(len(self.fetches), 1) + self.assertEqual(self.fetches[0][1], "example.com") + self.assertEqual( + self.fetches[0][2], f"/_matrix/federation/v1/media/download/{self.media_id}" + ) + + # The result which says the endpoint is unknown. + unknown_endpoint = b'{"errcode":"M_UNRECOGNIZED","error":"Unknown request"}' + self.fetches[0][0].errback( + HttpResponseException(404, "NOT FOUND", unknown_endpoint) + ) + + self.pump() + + # There should now be another request to the _matrix/media/v3/download URL. + self.assertEqual(len(self.fetches), 2) + self.assertEqual(self.fetches[1][1], "example.com") + self.assertEqual( + self.fetches[1][2], + f"/_matrix/media/v3/download/example.com/{self.media_id}", + ) + + headers = { + b"Content-Length": [b"%d" % (len(self.test_image.data))], + } + + self.fetches[1][0].callback( + (self.test_image.data, (len(self.test_image.data), headers)) + ) + + self.pump() + self.assertEqual(channel.code, 200) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 12c11f342c..966c622e14 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -31,12 +31,13 @@ from synapse.api.constants import ( AccountDataTypes, EventContentFields, EventTypes, + HistoryVisibility, ReceiptTypes, RelationTypes, ) from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from synapse.server import HomeServer -from synapse.types import JsonDict, RoomStreamToken, StreamKeyType +from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID from synapse.util import Clock from tests import unittest @@ -1326,7 +1327,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): def test_sync_list(self) -> None: """ - Test that room IDs show up in the Sliding Sync lists + Test that room IDs show up in the Sliding Sync `lists` """ alice_user_id = self.register_user("alice", "correcthorse") alice_access_token = self.login(alice_user_id, "correcthorse") @@ -1425,15 +1426,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): channel.await_result(timeout_ms=200) self.assertEqual(channel.code, 200, channel.json_body) - # We expect the `next_pos` in the result to be the same as what we requested + # We expect the next `pos` in the result to be the same as what we requested # with because we weren't able to find anything new yet. - self.assertEqual( - channel.json_body["next_pos"], future_position_token_serialized - ) + self.assertEqual(channel.json_body["pos"], future_position_token_serialized) def test_filter_list(self) -> None: """ - Test that filters apply to lists + Test that filters apply to `lists` """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -1564,7 +1563,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): def test_sort_list(self) -> None: """ - Test that the lists are sorted by `stream_ordering` + Test that the `lists` are sorted by `stream_ordering` """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -1618,3 +1617,1067 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ], channel.json_body["lists"]["foo-list"], ) + + def test_sliced_windows(self) -> None: + """ + Test that the `lists` `ranges` are sliced correctly. Both sides of each range + are inclusive. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + _room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + # Make the Sliding Sync request for a single room + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"], + ], + "timeline_limit": 1, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Make sure it has the foo-list we requested + self.assertListEqual( + list(channel.json_body["lists"].keys()), + ["foo-list"], + channel.json_body["lists"].keys(), + ) + # Make sure the list is sorted in the way we expect + self.assertListEqual( + list(channel.json_body["lists"]["foo-list"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 0], + "room_ids": [room_id3], + } + ], + channel.json_body["lists"]["foo-list"], + ) + + # Make the Sliding Sync request for the first two rooms + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"], + ], + "timeline_limit": 1, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Make sure it has the foo-list we requested + self.assertListEqual( + list(channel.json_body["lists"].keys()), + ["foo-list"], + channel.json_body["lists"].keys(), + ) + # Make sure the list is sorted in the way we expect + self.assertListEqual( + list(channel.json_body["lists"]["foo-list"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 1], + "room_ids": [room_id3, room_id2], + } + ], + channel.json_body["lists"]["foo-list"], + ) + + def test_rooms_limited_initial_sync(self) -> None: + """ + Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit` + on initial sync. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity1", tok=user2_tok) + self.helper.send(room_id1, "activity2", tok=user2_tok) + event_response3 = self.helper.send(room_id1, "activity3", tok=user2_tok) + event_pos3 = self.get_success( + self.store.get_position_for_event(event_response3["event_id"]) + ) + event_response4 = self.helper.send(room_id1, "activity4", tok=user2_tok) + event_pos4 = self.get_success( + self.store.get_position_for_event(event_response4["event_id"]) + ) + event_response5 = self.helper.send(room_id1, "activity5", tok=user2_tok) + user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We expect to saturate the `timeline_limit` (there are more than 3 messages in the room) + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + True, + channel.json_body["rooms"][room_id1], + ) + # Check to make sure the latest events are returned + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response4["event_id"], + event_response5["event_id"], + user1_join_response["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + + # Check to make sure the `prev_batch` points at the right place + prev_batch_token = self.get_success( + StreamToken.from_string( + self.store, channel.json_body["rooms"][room_id1]["prev_batch"] + ) + ) + prev_batch_room_stream_token_serialized = self.get_success( + prev_batch_token.room_key.to_string(self.store) + ) + # If we use the `prev_batch` token to look backwards, we should see `event3` + # next so make sure the token encompasses it + self.assertEqual( + event_pos3.persisted_after(prev_batch_token.room_key), + False, + f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be >= event_pos3={self.get_success(event_pos3.to_room_stream_token().to_string(self.store))}", + ) + # If we use the `prev_batch` token to look backwards, we shouldn't see `event4` + # anymore since it was just returned in this response. + self.assertEqual( + event_pos4.persisted_after(prev_batch_token.room_key), + True, + f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}", + ) + + # With no `from_token` (initial sync), it's all historical since there is no + # "live" range + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_not_limited_initial_sync(self) -> None: + """ + Test that we mark `rooms` as `limited=False` when there are no more events to + paginate to. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity1", tok=user2_tok) + self.helper.send(room_id1, "activity2", tok=user2_tok) + self.helper.send(room_id1, "activity3", tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Make the Sliding Sync request + timeline_limit = 100 + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": timeline_limit, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # The timeline should be `limited=False` because we have all of the events (no + # more to paginate to) + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + False, + channel.json_body["rooms"][room_id1], + ) + expected_number_of_events = 9 + # We're just looking to make sure we got all of the events before hitting the `timeline_limit` + self.assertEqual( + len(channel.json_body["rooms"][room_id1]["timeline"]), + expected_number_of_events, + channel.json_body["rooms"][room_id1]["timeline"], + ) + self.assertLessEqual(expected_number_of_events, timeline_limit) + + # With no `from_token` (initial sync), it's all historical since there is no + # "live" token range. + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_incremental_sync(self) -> None: + """ + Test `rooms` data during an incremental sync after an initial sync. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.send(room_id1, "activity before initial sync1", tok=user2_tok) + + # Make an initial Sliding Sync request to grab a token. This is also a sanity + # check that we can go from initial to incremental sync. + sync_params = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + } + channel = self.make_request( + "POST", + self.sync_endpoint, + sync_params, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + next_pos = channel.json_body["pos"] + + # Send some events but don't send enough to saturate the `timeline_limit`. + # We want to later test that we only get the new events since the `next_pos` + event_response2 = self.helper.send(room_id1, "activity after2", tok=user2_tok) + event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) + + # Make an incremental Sliding Sync request (what we're trying to test) + channel = self.make_request( + "POST", + self.sync_endpoint + f"?pos={next_pos}", + sync_params, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We only expect to see the new events since the last sync which isn't enough to + # fill up the `timeline_limit`. + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + False, + f'Our `timeline_limit` was {sync_params["lists"]["foo-list"]["timeline_limit"]} ' + + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. ' + + str(channel.json_body["rooms"][room_id1]), + ) + # Check to make sure the latest events are returned + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response2["event_id"], + event_response3["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + + # All events are "live" + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 2, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_newly_joined_incremental_sync(self) -> None: + """ + Test that when we make an incremental sync with a `newly_joined` `rooms`, we are + able to see some historical events before the `from_token`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before token1", tok=user2_tok) + event_response2 = self.helper.send( + room_id1, "activity before token2", tok=user2_tok + ) + + from_token = self.event_sources.get_current_token() + + # Join the room after the `from_token` which will make us consider this room as + # `newly_joined`. + user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Send some events but don't send enough to saturate the `timeline_limit`. + # We want to later test that we only get the new events since the `next_pos` + event_response3 = self.helper.send( + room_id1, "activity after token3", tok=user2_tok + ) + event_response4 = self.helper.send( + room_id1, "activity after token4", tok=user2_tok + ) + + # The `timeline_limit` is set to 4 so we can at least see one historical event + # before the `from_token`. We should see historical events because this is a + # `newly_joined` room. + timeline_limit = 4 + # Make an incremental Sliding Sync request (what we're trying to test) + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success(from_token.to_string(self.store))}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": timeline_limit, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We should see the new events and the rest should be filled with historical + # events which will make us `limited=True` since there are more to paginate to. + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + True, + f"Our `timeline_limit` was {timeline_limit} " + + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. ' + + str(channel.json_body["rooms"][room_id1]), + ) + # Check to make sure that the "live" and historical events are returned + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response2["event_id"], + user1_join_response["event_id"], + event_response3["event_id"], + event_response4["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + + # Only events after the `from_token` are "live" (join, event3, event4) + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 3, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_invite_shared_history_initial_sync(self) -> None: + """ + Test that `rooms` we are invited to have some stripped `invite_state` during an + initial sync. + + This is an `invite` room so we should only have `stripped_state` (no `timeline`) + but we also shouldn't see any timeline events because the history visiblity is + `shared` and we haven't joined the room yet. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user1 = UserID.from_string(user1_id) + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user2 = UserID.from_string(user2_id) + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + # Ensure we're testing with a room with `shared` history visibility which means + # history visible until you actually join the room. + history_visibility_response = self.helper.get_state( + room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok + ) + self.assertEqual( + history_visibility_response.get("history_visibility"), + HistoryVisibility.SHARED, + ) + + self.helper.send(room_id1, "activity before1", tok=user2_tok) + self.helper.send(room_id1, "activity before2", tok=user2_tok) + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.send(room_id1, "activity after3", tok=user2_tok) + self.helper.send(room_id1, "activity after4", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # `timeline` is omitted for `invite` rooms with `stripped_state` + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("timeline"), + channel.json_body["rooms"][room_id1], + ) + # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("num_live"), + channel.json_body["rooms"][room_id1], + ) + # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("limited"), + channel.json_body["rooms"][room_id1], + ) + # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("prev_batch"), + channel.json_body["rooms"][room_id1], + ) + # We should have some `stripped_state` so the potential joiner can identify the + # room (we don't care about the order). + self.assertCountEqual( + channel.json_body["rooms"][room_id1]["invite_state"], + [ + { + "content": {"creator": user2_id, "room_version": "10"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.create", + }, + { + "content": {"join_rule": "public"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.join_rules", + }, + { + "content": {"displayname": user2.localpart, "membership": "join"}, + "sender": user2_id, + "state_key": user2_id, + "type": "m.room.member", + }, + { + "content": {"displayname": user1.localpart, "membership": "invite"}, + "sender": user2_id, + "state_key": user1_id, + "type": "m.room.member", + }, + ], + channel.json_body["rooms"][room_id1]["invite_state"], + ) + + def test_rooms_invite_shared_history_incremental_sync(self) -> None: + """ + Test that `rooms` we are invited to have some stripped `invite_state` during an + incremental sync. + + This is an `invite` room so we should only have `stripped_state` (no `timeline`) + but we also shouldn't see any timeline events because the history visiblity is + `shared` and we haven't joined the room yet. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user1 = UserID.from_string(user1_id) + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user2 = UserID.from_string(user2_id) + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + # Ensure we're testing with a room with `shared` history visibility which means + # history visible until you actually join the room. + history_visibility_response = self.helper.get_state( + room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok + ) + self.assertEqual( + history_visibility_response.get("history_visibility"), + HistoryVisibility.SHARED, + ) + + self.helper.send(room_id1, "activity before invite1", tok=user2_tok) + self.helper.send(room_id1, "activity before invite2", tok=user2_tok) + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.send(room_id1, "activity after invite3", tok=user2_tok) + self.helper.send(room_id1, "activity after invite4", tok=user2_tok) + + from_token = self.event_sources.get_current_token() + + self.helper.send(room_id1, "activity after token5", tok=user2_tok) + self.helper.send(room_id1, "activity after toekn6", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success(from_token.to_string(self.store))}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # `timeline` is omitted for `invite` rooms with `stripped_state` + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("timeline"), + channel.json_body["rooms"][room_id1], + ) + # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("num_live"), + channel.json_body["rooms"][room_id1], + ) + # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("limited"), + channel.json_body["rooms"][room_id1], + ) + # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("prev_batch"), + channel.json_body["rooms"][room_id1], + ) + # We should have some `stripped_state` so the potential joiner can identify the + # room (we don't care about the order). + self.assertCountEqual( + channel.json_body["rooms"][room_id1]["invite_state"], + [ + { + "content": {"creator": user2_id, "room_version": "10"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.create", + }, + { + "content": {"join_rule": "public"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.join_rules", + }, + { + "content": {"displayname": user2.localpart, "membership": "join"}, + "sender": user2_id, + "state_key": user2_id, + "type": "m.room.member", + }, + { + "content": {"displayname": user1.localpart, "membership": "invite"}, + "sender": user2_id, + "state_key": user1_id, + "type": "m.room.member", + }, + ], + channel.json_body["rooms"][room_id1]["invite_state"], + ) + + def test_rooms_invite_world_readable_history_initial_sync(self) -> None: + """ + Test that `rooms` we are invited to have some stripped `invite_state` during an + initial sync. + + This is an `invite` room so we should only have `stripped_state` (no `timeline`) + but depending on the semantics we decide, we could potentially see some + historical events before/after the `from_token` because the history is + `world_readable`. Same situation for events after the `from_token` if the + history visibility was set to `invited`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user1 = UserID.from_string(user1_id) + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user2 = UserID.from_string(user2_id) + + room_id1 = self.helper.create_room_as( + user2_id, + tok=user2_tok, + extra_content={ + "preset": "public_chat", + "initial_state": [ + { + "content": { + "history_visibility": HistoryVisibility.WORLD_READABLE + }, + "state_key": "", + "type": EventTypes.RoomHistoryVisibility, + } + ], + }, + ) + # Ensure we're testing with a room with `world_readable` history visibility + # which means events are visible to anyone even without membership. + history_visibility_response = self.helper.get_state( + room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok + ) + self.assertEqual( + history_visibility_response.get("history_visibility"), + HistoryVisibility.WORLD_READABLE, + ) + + self.helper.send(room_id1, "activity before1", tok=user2_tok) + self.helper.send(room_id1, "activity before2", tok=user2_tok) + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.send(room_id1, "activity after3", tok=user2_tok) + self.helper.send(room_id1, "activity after4", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + # Large enough to see the latest events and before the invite + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # `timeline` is omitted for `invite` rooms with `stripped_state` + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("timeline"), + channel.json_body["rooms"][room_id1], + ) + # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("num_live"), + channel.json_body["rooms"][room_id1], + ) + # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("limited"), + channel.json_body["rooms"][room_id1], + ) + # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("prev_batch"), + channel.json_body["rooms"][room_id1], + ) + # We should have some `stripped_state` so the potential joiner can identify the + # room (we don't care about the order). + self.assertCountEqual( + channel.json_body["rooms"][room_id1]["invite_state"], + [ + { + "content": {"creator": user2_id, "room_version": "10"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.create", + }, + { + "content": {"join_rule": "public"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.join_rules", + }, + { + "content": {"displayname": user2.localpart, "membership": "join"}, + "sender": user2_id, + "state_key": user2_id, + "type": "m.room.member", + }, + { + "content": {"displayname": user1.localpart, "membership": "invite"}, + "sender": user2_id, + "state_key": user1_id, + "type": "m.room.member", + }, + ], + channel.json_body["rooms"][room_id1]["invite_state"], + ) + + def test_rooms_invite_world_readable_history_incremental_sync(self) -> None: + """ + Test that `rooms` we are invited to have some stripped `invite_state` during an + incremental sync. + + This is an `invite` room so we should only have `stripped_state` (no `timeline`) + but depending on the semantics we decide, we could potentially see some + historical events before/after the `from_token` because the history is + `world_readable`. Same situation for events after the `from_token` if the + history visibility was set to `invited`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user1 = UserID.from_string(user1_id) + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user2 = UserID.from_string(user2_id) + + room_id1 = self.helper.create_room_as( + user2_id, + tok=user2_tok, + extra_content={ + "preset": "public_chat", + "initial_state": [ + { + "content": { + "history_visibility": HistoryVisibility.WORLD_READABLE + }, + "state_key": "", + "type": EventTypes.RoomHistoryVisibility, + } + ], + }, + ) + # Ensure we're testing with a room with `world_readable` history visibility + # which means events are visible to anyone even without membership. + history_visibility_response = self.helper.get_state( + room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok + ) + self.assertEqual( + history_visibility_response.get("history_visibility"), + HistoryVisibility.WORLD_READABLE, + ) + + self.helper.send(room_id1, "activity before invite1", tok=user2_tok) + self.helper.send(room_id1, "activity before invite2", tok=user2_tok) + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.send(room_id1, "activity after invite3", tok=user2_tok) + self.helper.send(room_id1, "activity after invite4", tok=user2_tok) + + from_token = self.event_sources.get_current_token() + + self.helper.send(room_id1, "activity after token5", tok=user2_tok) + self.helper.send(room_id1, "activity after toekn6", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success(from_token.to_string(self.store))}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + # Large enough to see the latest events and before the invite + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # `timeline` is omitted for `invite` rooms with `stripped_state` + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("timeline"), + channel.json_body["rooms"][room_id1], + ) + # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("num_live"), + channel.json_body["rooms"][room_id1], + ) + # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("limited"), + channel.json_body["rooms"][room_id1], + ) + # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway) + self.assertIsNone( + channel.json_body["rooms"][room_id1].get("prev_batch"), + channel.json_body["rooms"][room_id1], + ) + # We should have some `stripped_state` so the potential joiner can identify the + # room (we don't care about the order). + self.assertCountEqual( + channel.json_body["rooms"][room_id1]["invite_state"], + [ + { + "content": {"creator": user2_id, "room_version": "10"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.create", + }, + { + "content": {"join_rule": "public"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.join_rules", + }, + { + "content": {"displayname": user2.localpart, "membership": "join"}, + "sender": user2_id, + "state_key": user2_id, + "type": "m.room.member", + }, + { + "content": {"displayname": user1.localpart, "membership": "invite"}, + "sender": user2_id, + "state_key": user1_id, + "type": "m.room.member", + }, + ], + channel.json_body["rooms"][room_id1]["invite_state"], + ) + + def test_rooms_ban_initial_sync(self) -> None: + """ + Test that `rooms` we are banned from in an intial sync only allows us to see + timeline events up to the ban event. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before1", tok=user2_tok) + self.helper.send(room_id1, "activity before2", tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) + event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok) + user1_ban_response = self.helper.ban( + room_id1, src=user2_id, targ=user1_id, tok=user2_tok + ) + + self.helper.send(room_id1, "activity after5", tok=user2_tok) + self.helper.send(room_id1, "activity after6", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We should see events before the ban but not after + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response3["event_id"], + event_response4["event_id"], + user1_ban_response["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + # No "live" events in an initial sync (no `from_token` to define the "live" + # range) + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + # There are more events to paginate to + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + True, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_ban_incremental_sync1(self) -> None: + """ + Test that `rooms` we are banned from during the next incremental sync only + allows us to see timeline events up to the ban event. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before1", tok=user2_tok) + self.helper.send(room_id1, "activity before2", tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + from_token = self.event_sources.get_current_token() + + event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) + event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok) + # The ban is within the token range (between the `from_token` and the sliding + # sync request) + user1_ban_response = self.helper.ban( + room_id1, src=user2_id, targ=user1_id, tok=user2_tok + ) + + self.helper.send(room_id1, "activity after5", tok=user2_tok) + self.helper.send(room_id1, "activity after6", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success(from_token.to_string(self.store))}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We should see events before the ban but not after + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response3["event_id"], + event_response4["event_id"], + user1_ban_response["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + # All live events in the incremental sync + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 3, + channel.json_body["rooms"][room_id1], + ) + # There aren't anymore events to paginate to in this range + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + False, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_ban_incremental_sync2(self) -> None: + """ + Test that `rooms` we are banned from before the incremental sync don't return + any events in the timeline. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before1", tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + self.helper.send(room_id1, "activity after2", tok=user2_tok) + # The ban is before we get our `from_token` + self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + + self.helper.send(room_id1, "activity after3", tok=user2_tok) + + from_token = self.event_sources.get_current_token() + + self.helper.send(room_id1, "activity after4", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success(from_token.to_string(self.store))}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Nothing to see for this banned user in the room in the token range + self.assertEqual( + channel.json_body["rooms"][room_id1]["timeline"], + [], + channel.json_body["rooms"][room_id1]["timeline"], + ) + # No events returned in the timeline so nothing is "live" + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + # There aren't anymore events to paginate to in this range + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + False, + channel.json_body["rooms"][room_id1], + ) diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index f0ba40a1f1..e43140720d 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -261,9 +261,9 @@ class RestHelper: targ: str, expect_code: int = HTTPStatus.OK, tok: Optional[str] = None, - ) -> None: + ) -> JsonDict: """A convenience helper: `change_membership` with `membership` preset to "ban".""" - self.change_membership( + return self.change_membership( room=room, src=src, targ=targ, diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index fe1e873e15..aad46b1b44 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -21,20 +21,32 @@ import logging from typing import List, Tuple +from unittest.mock import AsyncMock, patch from immutabledict import immutabledict from twisted.test.proto_helpers import MemoryReactor -from synapse.api.constants import Direction, EventTypes, RelationTypes +from synapse.api.constants import Direction, EventTypes, Membership, RelationTypes from synapse.api.filtering import Filter +from synapse.crypto.event_signing import add_hashes_and_signatures +from synapse.events import FrozenEventV3 +from synapse.federation.federation_client import SendJoinResult from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer -from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken +from synapse.storage.databases.main.stream import CurrentStateDeltaMembership +from synapse.types import ( + JsonDict, + PersistedEventPosition, + RoomStreamToken, + UserID, + create_requester, +) from synapse.util import Clock -from tests.unittest import HomeserverTestCase +from tests.test_utils.event_injection import create_event +from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase logger = logging.getLogger(__name__) @@ -543,3 +555,859 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): } ), ) + + +class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase): + """ + Test `get_current_state_delta_membership_changes_for_user(...)` + """ + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.event_sources = hs.get_event_sources() + self.state_handler = self.hs.get_state_handler() + persistence = hs.get_storage_controllers().persistence + assert persistence is not None + self.persistence = persistence + + def test_returns_membership_events(self) -> None: + """ + A basic test that a membership event in the token range is returned for the user. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + join_pos = self.get_success( + self.store.get_position_for_event(join_response["event_id"]) + ) + + after_room1_token = self.event_sources.get_current_token() + + membership_changes = self.get_success( + self.store.get_current_state_delta_membership_changes_for_user( + user1_id, + from_key=before_room1_token.room_key, + to_key=after_room1_token.room_key, + ) + ) + + # Let the whole diff show on failure + self.maxDiff = None + self.assertEqual( + membership_changes, + [ + CurrentStateDeltaMembership( + room_id=room_id1, + event_id=join_response["event_id"], + event_pos=join_pos, + membership="join", + sender=user1_id, + prev_event_id=None, + prev_event_pos=None, + prev_membership=None, + prev_sender=None, + ) + ], + ) + + def test_server_left_room_after_us(self) -> None: + """ + Test that when probing over part of the DAG where the server left the room *after + us*, we still see the join and leave changes. + + This is to make sure we play nicely with this behavior: When the server leaves a + room, it will insert new rows with `event_id = null` into the + `current_state_delta_stream` table for all current state. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + room_id1 = self.helper.create_room_as( + user2_id, + tok=user2_tok, + extra_content={ + "power_level_content_override": { + "users": { + user2_id: 100, + # Allow user1 to send state in the room + user1_id: 100, + } + } + }, + ) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + join_pos1 = self.get_success( + self.store.get_position_for_event(join_response1["event_id"]) + ) + # Make sure that random other non-member state that happens to have a `state_key` + # matching the user ID doesn't mess with things. + self.helper.send_state( + room_id1, + event_type="foobarbazdummy", + state_key=user1_id, + body={"foo": "bar"}, + tok=user1_tok, + ) + # User1 should leave the room first + leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok) + leave_pos1 = self.get_success( + self.store.get_position_for_event(leave_response1["event_id"]) + ) + + # User2 should also leave the room (everyone has left the room which means the + # server is no longer in the room). + self.helper.leave(room_id1, user2_id, tok=user2_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Get the membership changes for the user. + # + # At this point, the `current_state_delta_stream` table should look like the + # following. When the server leaves a room, it will insert new rows with + # `event_id = null` for all current state. + # + # | stream_id | room_id | type | state_key | event_id | prev_event_id | + # |-----------|----------|-----------------------------|----------------|----------|---------------| + # | 2 | !x:test | 'm.room.create' | '' | $xxx | None | + # | 3 | !x:test | 'm.room.member' | '@user2:test' | $aaa | None | + # | 4 | !x:test | 'm.room.history_visibility' | '' | $xxx | None | + # | 4 | !x:test | 'm.room.join_rules' | '' | $xxx | None | + # | 4 | !x:test | 'm.room.power_levels' | '' | $xxx | None | + # | 7 | !x:test | 'm.room.member' | '@user1:test' | $ooo | None | + # | 8 | !x:test | 'foobarbazdummy' | '@user1:test' | $xxx | None | + # | 9 | !x:test | 'm.room.member' | '@user1:test' | $ppp | $ooo | + # | 10 | !x:test | 'foobarbazdummy' | '@user1:test' | None | $xxx | + # | 10 | !x:test | 'm.room.create' | '' | None | $xxx | + # | 10 | !x:test | 'm.room.history_visibility' | '' | None | $xxx | + # | 10 | !x:test | 'm.room.join_rules' | '' | None | $xxx | + # | 10 | !x:test | 'm.room.member' | '@user1:test' | None | $ppp | + # | 10 | !x:test | 'm.room.member' | '@user2:test' | None | $aaa | + # | 10 | !x:test | 'm.room.power_levels' | | None | $xxx | + membership_changes = self.get_success( + self.store.get_current_state_delta_membership_changes_for_user( + user1_id, + from_key=before_room1_token.room_key, + to_key=after_room1_token.room_key, + ) + ) + + # Let the whole diff show on failure + self.maxDiff = None + self.assertEqual( + membership_changes, + [ + CurrentStateDeltaMembership( + room_id=room_id1, + event_id=join_response1["event_id"], + event_pos=join_pos1, + membership="join", + sender=user1_id, + prev_event_id=None, + prev_event_pos=None, + prev_membership=None, + prev_sender=None, + ), + CurrentStateDeltaMembership( + room_id=room_id1, + event_id=leave_response1["event_id"], + event_pos=leave_pos1, + membership="leave", + sender=user1_id, + prev_event_id=join_response1["event_id"], + prev_event_pos=join_pos1, + prev_membership="join", + prev_sender=user1_id, + ), + ], + ) + + def test_server_left_room_after_us_later(self) -> None: + """ + Test when the user leaves the room, then sometime later, everyone else leaves + the room, causing the server to leave the room, we shouldn't see any membership + changes. + + This is to make sure we play nicely with this behavior: When the server leaves a + room, it will insert new rows with `event_id = null` into the + `current_state_delta_stream` table for all current state. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + # User1 should leave the room first + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_user1_leave_token = self.event_sources.get_current_token() + + # User2 should also leave the room (everyone has left the room which means the + # server is no longer in the room). + self.helper.leave(room_id1, user2_id, tok=user2_tok) + + after_server_leave_token = self.event_sources.get_current_token() + + # Join another room as user1 just to advance the stream_ordering and bust + # `_membership_stream_cache` + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id2, user1_id, tok=user1_tok) + + # Get the membership changes for the user. + # + # At this point, the `current_state_delta_stream` table should look like the + # following. When the server leaves a room, it will insert new rows with + # `event_id = null` for all current state. + # + # TODO: Add DB rows to better see what's going on. + membership_changes = self.get_success( + self.store.get_current_state_delta_membership_changes_for_user( + user1_id, + from_key=after_user1_leave_token.room_key, + to_key=after_server_leave_token.room_key, + ) + ) + + # Let the whole diff show on failure + self.maxDiff = None + self.assertEqual( + membership_changes, + [], + ) + + def test_we_cause_server_left_room(self) -> None: + """ + Test that when probing over part of the DAG where the user leaves the room + causing the server to leave the room (because we were the last local user in the + room), we still see the join and leave changes. + + This is to make sure we play nicely with this behavior: When the server leaves a + room, it will insert new rows with `event_id = null` into the + `current_state_delta_stream` table for all current state. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + room_id1 = self.helper.create_room_as( + user2_id, + tok=user2_tok, + extra_content={ + "power_level_content_override": { + "users": { + user2_id: 100, + # Allow user1 to send state in the room + user1_id: 100, + } + } + }, + ) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + join_pos1 = self.get_success( + self.store.get_position_for_event(join_response1["event_id"]) + ) + # Make sure that random other non-member state that happens to have a `state_key` + # matching the user ID doesn't mess with things. + self.helper.send_state( + room_id1, + event_type="foobarbazdummy", + state_key=user1_id, + body={"foo": "bar"}, + tok=user1_tok, + ) + + # User2 should leave the room first. + self.helper.leave(room_id1, user2_id, tok=user2_tok) + + # User1 (the person we're testing with) should also leave the room (everyone has + # left the room which means the server is no longer in the room). + leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok) + leave_pos1 = self.get_success( + self.store.get_position_for_event(leave_response1["event_id"]) + ) + + after_room1_token = self.event_sources.get_current_token() + + # Get the membership changes for the user. + # + # At this point, the `current_state_delta_stream` table should look like the + # following. When the server leaves a room, it will insert new rows with + # `event_id = null` for all current state. + # + # | stream_id | room_id | type | state_key | event_id | prev_event_id | + # |-----------|-----------|-----------------------------|---------------|----------|---------------| + # | 2 | '!x:test' | 'm.room.create' | '' | '$xxx' | None | + # | 3 | '!x:test' | 'm.room.member' | '@user2:test' | '$aaa' | None | + # | 4 | '!x:test' | 'm.room.history_visibility' | '' | '$xxx' | None | + # | 4 | '!x:test' | 'm.room.join_rules' | '' | '$xxx' | None | + # | 4 | '!x:test' | 'm.room.power_levels' | '' | '$xxx' | None | + # | 7 | '!x:test' | 'm.room.member' | '@user1:test' | '$ooo' | None | + # | 8 | '!x:test' | 'foobarbazdummy' | '@user1:test' | '$xxx' | None | + # | 9 | '!x:test' | 'm.room.member' | '@user2:test' | '$bbb' | '$aaa' | + # | 10 | '!x:test' | 'foobarbazdummy' | '@user1:test' | None | '$xxx' | + # | 10 | '!x:test' | 'm.room.create' | '' | None | '$xxx' | + # | 10 | '!x:test' | 'm.room.history_visibility' | '' | None | '$xxx' | + # | 10 | '!x:test' | 'm.room.join_rules' | '' | None | '$xxx' | + # | 10 | '!x:test' | 'm.room.member' | '@user1:test' | None | '$ooo' | + # | 10 | '!x:test' | 'm.room.member' | '@user2:test' | None | '$bbb' | + # | 10 | '!x:test' | 'm.room.power_levels' | '' | None | '$xxx' | + membership_changes = self.get_success( + self.store.get_current_state_delta_membership_changes_for_user( + user1_id, + from_key=before_room1_token.room_key, + to_key=after_room1_token.room_key, + ) + ) + + # Let the whole diff show on failure + self.maxDiff = None + self.assertEqual( + membership_changes, + [ + CurrentStateDeltaMembership( + room_id=room_id1, + event_id=join_response1["event_id"], + event_pos=join_pos1, + membership="join", + sender=user1_id, + prev_event_id=None, + prev_event_pos=None, + prev_membership=None, + prev_sender=None, + ), + CurrentStateDeltaMembership( + room_id=room_id1, + event_id=None, # leave_response1["event_id"], + event_pos=leave_pos1, + membership="leave", + sender=None, # user1_id, + prev_event_id=join_response1["event_id"], + prev_event_pos=join_pos1, + prev_membership="join", + prev_sender=user1_id, + ), + ], + ) + + def test_different_user_membership_persisted_in_same_batch(self) -> None: + """ + Test batch of membership events from different users being processed at once. + This will result in all of the memberships being stored in the + `current_state_delta_stream` table with the same `stream_ordering` even though + the individual events have different `stream_ordering`s. + """ + user1_id = self.register_user("user1", "pass") + _user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user3_id = self.register_user("user3", "pass") + _user3_tok = self.login(user3_id, "pass") + user4_id = self.register_user("user4", "pass") + _user4_tok = self.login(user4_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # User2 is just the designated person to create the room (we do this across the + # tests to be consistent) + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + + # Persist the user1, user3, and user4 join events in the same batch so they all + # end up in the `current_state_delta_stream` table with the same + # stream_ordering. + join_event3, join_event_context3 = self.get_success( + create_event( + self.hs, + sender=user3_id, + type=EventTypes.Member, + state_key=user3_id, + content={"membership": "join"}, + room_id=room_id1, + ) + ) + # We want to put user1 in the middle of the batch. This way, regardless of the + # implementation that inserts rows into current_state_delta_stream` (whether it + # be minimum/maximum of stream position of the batch), we will still catch bugs. + join_event1, join_event_context1 = self.get_success( + create_event( + self.hs, + sender=user1_id, + type=EventTypes.Member, + state_key=user1_id, + content={"membership": "join"}, + room_id=room_id1, + ) + ) + join_event4, join_event_context4 = self.get_success( + create_event( + self.hs, + sender=user4_id, + type=EventTypes.Member, + state_key=user4_id, + content={"membership": "join"}, + room_id=room_id1, + ) + ) + self.get_success( + self.persistence.persist_events( + [ + (join_event3, join_event_context3), + (join_event1, join_event_context1), + (join_event4, join_event_context4), + ] + ) + ) + + after_room1_token = self.event_sources.get_current_token() + + # Get the membership changes for the user. + # + # At this point, the `current_state_delta_stream` table should look like (notice + # those three memberships at the end with `stream_id=7` because we persisted + # them in the same batch): + # + # | stream_id | room_id | type | state_key | event_id | prev_event_id | + # |-----------|-----------|----------------------------|------------------|----------|---------------| + # | 2 | '!x:test' | 'm.room.create' | '' | '$xxx' | None | + # | 3 | '!x:test' | 'm.room.member' | '@user2:test' | '$xxx' | None | + # | 4 | '!x:test' | 'm.room.history_visibility'| '' | '$xxx' | None | + # | 4 | '!x:test' | 'm.room.join_rules' | '' | '$xxx' | None | + # | 4 | '!x:test' | 'm.room.power_levels' | '' | '$xxx' | None | + # | 7 | '!x:test' | 'm.room.member' | '@user3:test' | '$xxx' | None | + # | 7 | '!x:test' | 'm.room.member' | '@user1:test' | '$xxx' | None | + # | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None | + membership_changes = self.get_success( + self.store.get_current_state_delta_membership_changes_for_user( + user1_id, + from_key=before_room1_token.room_key, + to_key=after_room1_token.room_key, + ) + ) + + join_pos3 = self.get_success( + self.store.get_position_for_event(join_event3.event_id) + ) + + # Let the whole diff show on failure + self.maxDiff = None + self.assertEqual( + membership_changes, + [ + CurrentStateDeltaMembership( + room_id=room_id1, + event_id=join_event1.event_id, + # Ideally, this would be `join_pos1` (to match the `event_id`) but + # when events are persisted in a batch, they are all stored in the + # `current_state_delta_stream` table with the minimum + # `stream_ordering` from the batch. + event_pos=join_pos3, + membership="join", + sender=user1_id, + prev_event_id=None, + prev_event_pos=None, + prev_membership=None, + prev_sender=None, + ), + ], + ) + + def test_state_reset(self) -> None: + """ + Test a state reset scenario where the user gets removed from the room (when + there is no corresponding leave event) + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + join_pos1 = self.get_success( + self.store.get_position_for_event(join_response1["event_id"]) + ) + + before_reset_token = self.event_sources.get_current_token() + + # Send another state event to make a position for the state reset to happen at + dummy_state_response = self.helper.send_state( + room_id1, + event_type="foobarbaz", + state_key="", + body={"foo": "bar"}, + tok=user2_tok, + ) + dummy_state_pos = self.get_success( + self.store.get_position_for_event(dummy_state_response["event_id"]) + ) + + # Mock a state reset removing the membership for user1 in the current state + self.get_success( + self.store.db_pool.simple_delete( + table="current_state_events", + keyvalues={ + "room_id": room_id1, + "type": EventTypes.Member, + "state_key": user1_id, + }, + desc="state reset user in current_state_delta_stream", + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + table="current_state_delta_stream", + values={ + "stream_id": dummy_state_pos.stream, + "room_id": room_id1, + "type": EventTypes.Member, + "state_key": user1_id, + "event_id": None, + "prev_event_id": join_response1["event_id"], + "instance_name": dummy_state_pos.instance_name, + }, + desc="state reset user in current_state_delta_stream", + ) + ) + + # Manually bust the cache since we we're just manually messing with the database + # and not causing an actual state reset. + self.store._membership_stream_cache.entity_has_changed( + user1_id, dummy_state_pos.stream + ) + + after_reset_token = self.event_sources.get_current_token() + + membership_changes = self.get_success( + self.store.get_current_state_delta_membership_changes_for_user( + user1_id, + from_key=before_reset_token.room_key, + to_key=after_reset_token.room_key, + ) + ) + + # Let the whole diff show on failure + self.maxDiff = None + self.assertEqual( + membership_changes, + [ + CurrentStateDeltaMembership( + room_id=room_id1, + event_id=None, + event_pos=dummy_state_pos, + membership="leave", + sender=None, # user1_id, + prev_event_id=join_response1["event_id"], + prev_event_pos=join_pos1, + prev_membership="join", + prev_sender=user1_id, + ), + ], + ) + + def test_excluded_room_ids(self) -> None: + """ + Test that the `excluded_room_ids` option excludes changes from the specified rooms. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + join_pos1 = self.get_success( + self.store.get_position_for_event(join_response1["event_id"]) + ) + + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok) + join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok) + join_pos2 = self.get_success( + self.store.get_position_for_event(join_response2["event_id"]) + ) + + after_room1_token = self.event_sources.get_current_token() + + # First test the the room is returned without the `excluded_room_ids` option + membership_changes = self.get_success( + self.store.get_current_state_delta_membership_changes_for_user( + user1_id, + from_key=before_room1_token.room_key, + to_key=after_room1_token.room_key, + ) + ) + + # Let the whole diff show on failure + self.maxDiff = None + self.assertEqual( + membership_changes, + [ + CurrentStateDeltaMembership( + room_id=room_id1, + event_id=join_response1["event_id"], + event_pos=join_pos1, + membership="join", + sender=user1_id, + prev_event_id=None, + prev_event_pos=None, + prev_membership=None, + prev_sender=None, + ), + CurrentStateDeltaMembership( + room_id=room_id2, + event_id=join_response2["event_id"], + event_pos=join_pos2, + membership="join", + sender=user1_id, + prev_event_id=None, + prev_event_pos=None, + prev_membership=None, + prev_sender=None, + ), + ], + ) + + # The test that `excluded_room_ids` excludes room2 as expected + membership_changes = self.get_success( + self.store.get_current_state_delta_membership_changes_for_user( + user1_id, + from_key=before_room1_token.room_key, + to_key=after_room1_token.room_key, + excluded_room_ids=[room_id2], + ) + ) + + # Let the whole diff show on failure + self.maxDiff = None + self.assertEqual( + membership_changes, + [ + CurrentStateDeltaMembership( + room_id=room_id1, + event_id=join_response1["event_id"], + event_pos=join_pos1, + membership="join", + sender=user1_id, + prev_event_id=None, + prev_event_pos=None, + prev_membership=None, + prev_sender=None, + ) + ], + ) + + +class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase( + FederatingHomeserverTestCase +): + """ + Test `get_current_state_delta_membership_changes_for_user(...)` when joining remote federated rooms. + """ + + servlets = [ + admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sliding_sync_handler = self.hs.get_sliding_sync_handler() + self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() + self.room_member_handler = hs.get_room_member_handler() + + def test_remote_join(self) -> None: + """ + Test remote join where the first rows in `current_state_delta_stream` will just + be the state when you joined the remote room. + """ + user1_id = self.register_user("user1", "pass") + _user1_tok = self.login(user1_id, "pass") + + before_join_token = self.event_sources.get_current_token() + + intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}" + + # Remotely join a room on another homeserver. + # + # To do this we have to mock the responses from the remote homeserver. We also + # patch out a bunch of event checks on our end. + create_event_source = { + "auth_events": [], + "content": { + "creator": f"@creator:{self.OTHER_SERVER_NAME}", + "room_version": self.hs.config.server.default_room_version.identifier, + }, + "depth": 0, + "origin_server_ts": 0, + "prev_events": [], + "room_id": intially_unjoined_room_id, + "sender": f"@creator:{self.OTHER_SERVER_NAME}", + "state_key": "", + "type": EventTypes.Create, + } + self.add_hashes_and_signatures_from_other_server( + create_event_source, + self.hs.config.server.default_room_version, + ) + create_event = FrozenEventV3( + create_event_source, + self.hs.config.server.default_room_version, + {}, + None, + ) + creator_join_event_source = { + "auth_events": [create_event.event_id], + "content": { + "membership": "join", + }, + "depth": 1, + "origin_server_ts": 1, + "prev_events": [], + "room_id": intially_unjoined_room_id, + "sender": f"@creator:{self.OTHER_SERVER_NAME}", + "state_key": f"@creator:{self.OTHER_SERVER_NAME}", + "type": EventTypes.Member, + } + self.add_hashes_and_signatures_from_other_server( + creator_join_event_source, + self.hs.config.server.default_room_version, + ) + creator_join_event = FrozenEventV3( + creator_join_event_source, + self.hs.config.server.default_room_version, + {}, + None, + ) + + # Our local user is going to remote join the room + join_event_source = { + "auth_events": [create_event.event_id], + "content": {"membership": "join"}, + "depth": 1, + "origin_server_ts": 100, + "prev_events": [creator_join_event.event_id], + "sender": user1_id, + "state_key": user1_id, + "room_id": intially_unjoined_room_id, + "type": EventTypes.Member, + } + add_hashes_and_signatures( + self.hs.config.server.default_room_version, + join_event_source, + self.hs.hostname, + self.hs.signing_key, + ) + join_event = FrozenEventV3( + join_event_source, + self.hs.config.server.default_room_version, + {}, + None, + ) + + mock_make_membership_event = AsyncMock( + return_value=( + self.OTHER_SERVER_NAME, + join_event, + self.hs.config.server.default_room_version, + ) + ) + mock_send_join = AsyncMock( + return_value=SendJoinResult( + join_event, + self.OTHER_SERVER_NAME, + state=[create_event, creator_join_event], + auth_chain=[create_event, creator_join_event], + partial_state=False, + servers_in_room=frozenset(), + ) + ) + + with patch.object( + self.room_member_handler.federation_handler.federation_client, + "make_membership_event", + mock_make_membership_event, + ), patch.object( + self.room_member_handler.federation_handler.federation_client, + "send_join", + mock_send_join, + ), patch( + "synapse.event_auth._is_membership_change_allowed", + return_value=None, + ), patch( + "synapse.handlers.federation_event.check_state_dependent_auth_rules", + return_value=None, + ): + self.get_success( + self.room_member_handler.update_membership( + requester=create_requester(user1_id), + target=UserID.from_string(user1_id), + room_id=intially_unjoined_room_id, + action=Membership.JOIN, + remote_room_hosts=[self.OTHER_SERVER_NAME], + ) + ) + + after_join_token = self.event_sources.get_current_token() + + # Get the membership changes for the user. + # + # At this point, the `current_state_delta_stream` table should look like the + # following. Notice that all of the events are at the same `stream_id` because + # the current state starts out where we remotely joined: + # + # | stream_id | room_id | type | state_key | event_id | prev_event_id | + # |-----------|------------------------------|-----------------|------------------------------|----------|---------------| + # | 2 | '!example:other.example.com' | 'm.room.member' | '@user1:test' | '$xxx' | None | + # | 2 | '!example:other.example.com' | 'm.room.create' | '' | '$xxx' | None | + # | 2 | '!example:other.example.com' | 'm.room.member' | '@creator:other.example.com' | '$xxx' | None | + membership_changes = self.get_success( + self.store.get_current_state_delta_membership_changes_for_user( + user1_id, + from_key=before_join_token.room_key, + to_key=after_join_token.room_key, + ) + ) + + join_pos = self.get_success( + self.store.get_position_for_event(join_event.event_id) + ) + + # Let the whole diff show on failure + self.maxDiff = None + self.assertEqual( + membership_changes, + [ + CurrentStateDeltaMembership( + room_id=intially_unjoined_room_id, + event_id=join_event.event_id, + event_pos=join_pos, + membership="join", + sender=user1_id, + prev_event_id=None, + prev_event_pos=None, + prev_membership=None, + prev_sender=None, + ), + ], + ) |