diff options
author | Erik Johnston <erik@matrix.org> | 2024-07-17 14:03:59 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2024-07-17 14:03:59 +0100 |
commit | ff1b13d94c7ccad7298044e313b157d2f8f2f83a (patch) | |
tree | b4e5d0fa194bf7aa56079f5471b0b964384d40bb | |
parent | Merge remote-tracking branch 'origin/develop' into erikj/ss_hacks (diff) | |
parent | Newsfile (diff) | |
download | synapse-ff1b13d94c7ccad7298044e313b157d2f8f2f83a.tar.xz |
Merge branch 'erikj/ss_room_store' into erikj/ss_hacks
-rw-r--r-- | .github/workflows/tests.yml | 4 | ||||
-rw-r--r-- | Cargo.lock | 8 | ||||
-rw-r--r-- | changelog.d/17424.misc | 1 | ||||
-rw-r--r-- | changelog.d/17438.bugfix | 1 | ||||
-rw-r--r-- | changelog.d/17439.bugfix | 1 | ||||
-rw-r--r-- | changelog.d/17447.feature | 1 | ||||
-rw-r--r-- | changelog.d/17449.bugfix | 1 | ||||
-rw-r--r-- | changelog.d/17452.misc | 1 | ||||
-rw-r--r-- | poetry.lock | 91 | ||||
-rw-r--r-- | synapse/app/homeserver.py | 2 | ||||
-rw-r--r-- | synapse/config/repository.py | 2 | ||||
-rw-r--r-- | synapse/config/server.py | 6 | ||||
-rw-r--r-- | synapse/handlers/sliding_sync.py | 323 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 126 | ||||
-rw-r--r-- | synapse/rest/client/sync.py | 12 | ||||
-rw-r--r-- | synapse/storage/databases/main/state_deltas.py | 38 | ||||
-rw-r--r-- | synapse/types/__init__.py | 102 | ||||
-rw-r--r-- | synapse/types/handlers/__init__.py | 40 | ||||
-rw-r--r-- | synapse/types/rest/client/__init__.py | 5 | ||||
-rw-r--r-- | tests/media/test_media_storage.py | 49 | ||||
-rw-r--r-- | tests/rest/client/test_media.py | 50 | ||||
-rw-r--r-- | tests/rest/client/test_sync.py | 167 | ||||
-rw-r--r-- | tests/server.py | 4 | ||||
-rw-r--r-- | tests/test_types.py | 71 | ||||
-rw-r--r-- | tests/util/test_check_dependencies.py | 3 |
25 files changed, 892 insertions, 217 deletions
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 767495101b..730f8552d9 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -305,7 +305,7 @@ jobs: - lint-readme runs-on: ubuntu-latest steps: - - uses: matrix-org/done-action@v2 + - uses: matrix-org/done-action@v3 with: needs: ${{ toJSON(needs) }} @@ -737,7 +737,7 @@ jobs: - linting-done runs-on: ubuntu-latest steps: - - uses: matrix-org/done-action@v2 + - uses: matrix-org/done-action@v3 with: needs: ${{ toJSON(needs) }} diff --git a/Cargo.lock b/Cargo.lock index 3a8bf7a49c..e9adfcbdc3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,9 +67,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytes" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" [[package]] name = "cfg-if" @@ -597,9 +597,9 @@ checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" [[package]] name = "ulid" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34778c17965aa2a08913b57e1f34db9b4a63f5de31768b55bf20d2795f921259" +checksum = "04f903f293d11f31c0c29e4148f6dc0d033a7f80cebc0282bea147611667d289" dependencies = [ "getrandom", "rand", diff --git a/changelog.d/17424.misc b/changelog.d/17424.misc new file mode 100644 index 0000000000..d4a81c137f --- /dev/null +++ b/changelog.d/17424.misc @@ -0,0 +1 @@ +Make sure we always use the right logic for enabling the media repo. diff --git a/changelog.d/17438.bugfix b/changelog.d/17438.bugfix new file mode 100644 index 0000000000..cff6eecd48 --- /dev/null +++ b/changelog.d/17438.bugfix @@ -0,0 +1 @@ +Fix rare bug where `/sync` would break for a user when using workers with multiple stream writers. diff --git a/changelog.d/17439.bugfix b/changelog.d/17439.bugfix new file mode 100644 index 0000000000..f36c3ec255 --- /dev/null +++ b/changelog.d/17439.bugfix @@ -0,0 +1 @@ +Limit concurrent remote downloads to 6 per IP address, and decrement remote downloads without a content-length from the ratelimiter after the download is complete. \ No newline at end of file diff --git a/changelog.d/17447.feature b/changelog.d/17447.feature new file mode 100644 index 0000000000..6f80e298ae --- /dev/null +++ b/changelog.d/17447.feature @@ -0,0 +1 @@ +Track which rooms have been sent to clients in the experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint. diff --git a/changelog.d/17449.bugfix b/changelog.d/17449.bugfix new file mode 100644 index 0000000000..cd847a3d1c --- /dev/null +++ b/changelog.d/17449.bugfix @@ -0,0 +1 @@ +Remove unnecessary call to resume producing in fake channel. \ No newline at end of file diff --git a/changelog.d/17452.misc b/changelog.d/17452.misc new file mode 100644 index 0000000000..4fd07f617b --- /dev/null +++ b/changelog.d/17452.misc @@ -0,0 +1 @@ +Change sliding sync to use their own token format in preparation for storing per-connection state. diff --git a/poetry.lock b/poetry.lock index f578a33ed5..4092a41884 100644 --- a/poetry.lock +++ b/poetry.lock @@ -934,13 +934,13 @@ i18n = ["Babel (>=2.7)"] [[package]] name = "jsonschema" -version = "4.22.0" +version = "4.23.0" description = "An implementation of JSON Schema validation for Python" optional = false python-versions = ">=3.8" files = [ - {file = "jsonschema-4.22.0-py3-none-any.whl", hash = "sha256:ff4cfd6b1367a40e7bc6411caec72effadd3db0bbe5017de188f2d6108335802"}, - {file = "jsonschema-4.22.0.tar.gz", hash = "sha256:5b22d434a45935119af990552c862e5d6d564e8f6601206b305a61fdf661a2b7"}, + {file = "jsonschema-4.23.0-py3-none-any.whl", hash = "sha256:fbadb6f8b144a8f8cf9f0b89ba94501d143e50411a1278633f56a7acf7fd5566"}, + {file = "jsonschema-4.23.0.tar.gz", hash = "sha256:d71497fef26351a33265337fa77ffeb82423f3ea21283cd9467bb03999266bc4"}, ] [package.dependencies] @@ -953,7 +953,7 @@ rpds-py = ">=0.7.1" [package.extras] format = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3987", "uri-template", "webcolors (>=1.11)"] -format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=1.11)"] +format-nongpl = ["fqdn", "idna", "isoduration", "jsonpointer (>1.13)", "rfc3339-validator", "rfc3986-validator (>0.1.0)", "uri-template", "webcolors (>=24.6.0)"] [[package]] name = "jsonschema-specifications" @@ -1389,38 +1389,38 @@ files = [ [[package]] name = "mypy" -version = "1.9.0" +version = "1.10.1" description = "Optional static typing for Python" optional = false python-versions = ">=3.8" files = [ - {file = "mypy-1.9.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f8a67616990062232ee4c3952f41c779afac41405806042a8126fe96e098419f"}, - {file = "mypy-1.9.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d357423fa57a489e8c47b7c85dfb96698caba13d66e086b412298a1a0ea3b0ed"}, - {file = "mypy-1.9.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:49c87c15aed320de9b438ae7b00c1ac91cd393c1b854c2ce538e2a72d55df150"}, - {file = "mypy-1.9.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:48533cdd345c3c2e5ef48ba3b0d3880b257b423e7995dada04248725c6f77374"}, - {file = "mypy-1.9.0-cp310-cp310-win_amd64.whl", hash = "sha256:4d3dbd346cfec7cb98e6cbb6e0f3c23618af826316188d587d1c1bc34f0ede03"}, - {file = "mypy-1.9.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:653265f9a2784db65bfca694d1edd23093ce49740b2244cde583aeb134c008f3"}, - {file = "mypy-1.9.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:3a3c007ff3ee90f69cf0a15cbcdf0995749569b86b6d2f327af01fd1b8aee9dc"}, - {file = "mypy-1.9.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2418488264eb41f69cc64a69a745fad4a8f86649af4b1041a4c64ee61fc61129"}, - {file = "mypy-1.9.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:68edad3dc7d70f2f17ae4c6c1b9471a56138ca22722487eebacfd1eb5321d612"}, - {file = "mypy-1.9.0-cp311-cp311-win_amd64.whl", hash = "sha256:85ca5fcc24f0b4aeedc1d02f93707bccc04733f21d41c88334c5482219b1ccb3"}, - {file = "mypy-1.9.0-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:aceb1db093b04db5cd390821464504111b8ec3e351eb85afd1433490163d60cd"}, - {file = "mypy-1.9.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:0235391f1c6f6ce487b23b9dbd1327b4ec33bb93934aa986efe8a9563d9349e6"}, - {file = "mypy-1.9.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d4d5ddc13421ba3e2e082a6c2d74c2ddb3979c39b582dacd53dd5d9431237185"}, - {file = "mypy-1.9.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:190da1ee69b427d7efa8aa0d5e5ccd67a4fb04038c380237a0d96829cb157913"}, - {file = "mypy-1.9.0-cp312-cp312-win_amd64.whl", hash = "sha256:fe28657de3bfec596bbeef01cb219833ad9d38dd5393fc649f4b366840baefe6"}, - {file = "mypy-1.9.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:e54396d70be04b34f31d2edf3362c1edd023246c82f1730bbf8768c28db5361b"}, - {file = "mypy-1.9.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:5e6061f44f2313b94f920e91b204ec600982961e07a17e0f6cd83371cb23f5c2"}, - {file = "mypy-1.9.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:81a10926e5473c5fc3da8abb04119a1f5811a236dc3a38d92015cb1e6ba4cb9e"}, - {file = "mypy-1.9.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:b685154e22e4e9199fc95f298661deea28aaede5ae16ccc8cbb1045e716b3e04"}, - {file = "mypy-1.9.0-cp38-cp38-win_amd64.whl", hash = "sha256:5d741d3fc7c4da608764073089e5f58ef6352bedc223ff58f2f038c2c4698a89"}, - {file = "mypy-1.9.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:587ce887f75dd9700252a3abbc9c97bbe165a4a630597845c61279cf32dfbf02"}, - {file = "mypy-1.9.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:f88566144752999351725ac623471661c9d1cd8caa0134ff98cceeea181789f4"}, - {file = "mypy-1.9.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:61758fabd58ce4b0720ae1e2fea5cfd4431591d6d590b197775329264f86311d"}, - {file = "mypy-1.9.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:e49499be624dead83927e70c756970a0bc8240e9f769389cdf5714b0784ca6bf"}, - {file = "mypy-1.9.0-cp39-cp39-win_amd64.whl", hash = "sha256:571741dc4194b4f82d344b15e8837e8c5fcc462d66d076748142327626a1b6e9"}, - {file = "mypy-1.9.0-py3-none-any.whl", hash = "sha256:a260627a570559181a9ea5de61ac6297aa5af202f06fd7ab093ce74e7181e43e"}, - {file = "mypy-1.9.0.tar.gz", hash = "sha256:3cc5da0127e6a478cddd906068496a97a7618a21ce9b54bde5bf7e539c7af974"}, + {file = "mypy-1.10.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:e36f229acfe250dc660790840916eb49726c928e8ce10fbdf90715090fe4ae02"}, + {file = "mypy-1.10.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:51a46974340baaa4145363b9e051812a2446cf583dfaeba124af966fa44593f7"}, + {file = "mypy-1.10.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:901c89c2d67bba57aaaca91ccdb659aa3a312de67f23b9dfb059727cce2e2e0a"}, + {file = "mypy-1.10.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:0cd62192a4a32b77ceb31272d9e74d23cd88c8060c34d1d3622db3267679a5d9"}, + {file = "mypy-1.10.1-cp310-cp310-win_amd64.whl", hash = "sha256:a2cbc68cb9e943ac0814c13e2452d2046c2f2b23ff0278e26599224cf164e78d"}, + {file = "mypy-1.10.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:bd6f629b67bb43dc0d9211ee98b96d8dabc97b1ad38b9b25f5e4c4d7569a0c6a"}, + {file = "mypy-1.10.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:a1bbb3a6f5ff319d2b9d40b4080d46cd639abe3516d5a62c070cf0114a457d84"}, + {file = "mypy-1.10.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b8edd4e9bbbc9d7b79502eb9592cab808585516ae1bcc1446eb9122656c6066f"}, + {file = "mypy-1.10.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:6166a88b15f1759f94a46fa474c7b1b05d134b1b61fca627dd7335454cc9aa6b"}, + {file = "mypy-1.10.1-cp311-cp311-win_amd64.whl", hash = "sha256:5bb9cd11c01c8606a9d0b83ffa91d0b236a0e91bc4126d9ba9ce62906ada868e"}, + {file = "mypy-1.10.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:d8681909f7b44d0b7b86e653ca152d6dff0eb5eb41694e163c6092124f8246d7"}, + {file = "mypy-1.10.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:378c03f53f10bbdd55ca94e46ec3ba255279706a6aacaecac52ad248f98205d3"}, + {file = "mypy-1.10.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bacf8f3a3d7d849f40ca6caea5c055122efe70e81480c8328ad29c55c69e93e"}, + {file = "mypy-1.10.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:701b5f71413f1e9855566a34d6e9d12624e9e0a8818a5704d74d6b0402e66c04"}, + {file = "mypy-1.10.1-cp312-cp312-win_amd64.whl", hash = "sha256:3c4c2992f6ea46ff7fce0072642cfb62af7a2484efe69017ed8b095f7b39ef31"}, + {file = "mypy-1.10.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:604282c886497645ffb87b8f35a57ec773a4a2721161e709a4422c1636ddde5c"}, + {file = "mypy-1.10.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:37fd87cab83f09842653f08de066ee68f1182b9b5282e4634cdb4b407266bade"}, + {file = "mypy-1.10.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8addf6313777dbb92e9564c5d32ec122bf2c6c39d683ea64de6a1fd98b90fe37"}, + {file = "mypy-1.10.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:5cc3ca0a244eb9a5249c7c583ad9a7e881aa5d7b73c35652296ddcdb33b2b9c7"}, + {file = "mypy-1.10.1-cp38-cp38-win_amd64.whl", hash = "sha256:1b3a2ffce52cc4dbaeee4df762f20a2905aa171ef157b82192f2e2f368eec05d"}, + {file = "mypy-1.10.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:fe85ed6836165d52ae8b88f99527d3d1b2362e0cb90b005409b8bed90e9059b3"}, + {file = "mypy-1.10.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c2ae450d60d7d020d67ab440c6e3fae375809988119817214440033f26ddf7bf"}, + {file = "mypy-1.10.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6be84c06e6abd72f960ba9a71561c14137a583093ffcf9bbfaf5e613d63fa531"}, + {file = "mypy-1.10.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2189ff1e39db399f08205e22a797383613ce1cb0cb3b13d8bcf0170e45b96cc3"}, + {file = "mypy-1.10.1-cp39-cp39-win_amd64.whl", hash = "sha256:97a131ee36ac37ce9581f4220311247ab6cba896b4395b9c87af0675a13a755f"}, + {file = "mypy-1.10.1-py3-none-any.whl", hash = "sha256:71d8ac0b906354ebda8ef1673e5fde785936ac1f29ff6987c7483cfbd5a4235a"}, + {file = "mypy-1.10.1.tar.gz", hash = "sha256:1f8f492d7db9e3593ef42d4f115f04e556130f2819ad33ab84551403e97dd4c0"}, ] [package.dependencies] @@ -2504,19 +2504,18 @@ tests = ["coverage[toml] (>=5.0.2)", "pytest"] [[package]] name = "setuptools" -version = "67.6.0" +version = "70.0.0" description = "Easily download, build, install, upgrade, and uninstall Python packages" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "setuptools-67.6.0-py3-none-any.whl", hash = "sha256:b78aaa36f6b90a074c1fa651168723acbf45d14cb1196b6f02c0fd07f17623b2"}, - {file = "setuptools-67.6.0.tar.gz", hash = "sha256:2ee892cd5f29f3373097f5a814697e397cf3ce313616df0af11231e2ad118077"}, + {file = "setuptools-70.0.0-py3-none-any.whl", hash = "sha256:54faa7f2e8d2d11bcd2c07bed282eef1046b5c080d1c32add737d7b5817b1ad4"}, + {file = "setuptools-70.0.0.tar.gz", hash = "sha256:f211a66637b8fa059bb28183da127d4e86396c991a942b028c6650d4319c3fd0"}, ] [package.extras] -docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (==0.8.3)", "sphinx-reredirects", "sphinxcontrib-towncrier"] -testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8 (<5)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] -testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"] +docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"] +testing = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"] [[package]] name = "setuptools-rust" @@ -2704,19 +2703,19 @@ docs = ["sphinx (<7.0.0)"] [[package]] name = "twine" -version = "5.1.0" +version = "5.1.1" description = "Collection of utilities for publishing packages on PyPI" optional = false python-versions = ">=3.8" files = [ - {file = "twine-5.1.0-py3-none-any.whl", hash = "sha256:fe1d814395bfe50cfbe27783cb74efe93abeac3f66deaeb6c8390e4e92bacb43"}, - {file = "twine-5.1.0.tar.gz", hash = "sha256:4d74770c88c4fcaf8134d2a6a9d863e40f08255ff7d8e2acb3cbbd57d25f6e9d"}, + {file = "twine-5.1.1-py3-none-any.whl", hash = "sha256:215dbe7b4b94c2c50a7315c0275d2258399280fbb7d04182c7e55e24b5f93997"}, + {file = "twine-5.1.1.tar.gz", hash = "sha256:9aa0825139c02b3434d913545c7b847a21c835e11597f5255842d457da2322db"}, ] [package.dependencies] importlib-metadata = ">=3.6" keyring = ">=15.1" -pkginfo = ">=1.8.1" +pkginfo = ">=1.8.1,<1.11" readme-renderer = ">=35.0" requests = ">=2.20" requests-toolbelt = ">=0.8.0,<0.9.0 || >0.9.0" @@ -2851,13 +2850,13 @@ files = [ [[package]] name = "types-jsonschema" -version = "4.22.0.20240610" +version = "4.23.0.20240712" description = "Typing stubs for jsonschema" optional = false python-versions = ">=3.8" files = [ - {file = "types-jsonschema-4.22.0.20240610.tar.gz", hash = "sha256:f82ab9fe756e3a2642ea9712c46b403ce61eb380b939b696cff3252af42f65b0"}, - {file = "types_jsonschema-4.22.0.20240610-py3-none-any.whl", hash = "sha256:89996b9bd1928f820a0e252b2844be21cd2e55d062b6fa1048d88453006ad89e"}, + {file = "types-jsonschema-4.23.0.20240712.tar.gz", hash = "sha256:b20db728dcf7ea3e80e9bdeb55e8b8420c6c040cda14e8cf284465adee71d217"}, + {file = "types_jsonschema-4.23.0.20240712-py3-none-any.whl", hash = "sha256:8c33177ce95336241c1d61ccb56a9964d4361b99d5f1cd81a1ab4909b0dd7cf4"}, ] [package.dependencies] diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 2b111847b7..e114ab7ec4 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -217,7 +217,7 @@ class SynapseHomeServer(HomeServer): ) if name in ["media", "federation", "client"]: - if self.config.server.enable_media_repo: + if self.config.media.can_load_media_repo: media_repo = self.get_media_repository_resource() resources.update( { diff --git a/synapse/config/repository.py b/synapse/config/repository.py index 1645470499..dc0e93ffa1 100644 --- a/synapse/config/repository.py +++ b/synapse/config/repository.py @@ -126,7 +126,7 @@ class ContentRepositoryConfig(Config): # Only enable the media repo if either the media repo is enabled or the # current worker app is the media repo. if ( - self.root.server.enable_media_repo is False + config.get("enable_media_repo", True) is False and config.get("worker_app") != "synapse.app.media_repository" ): self.can_load_media_repo = False diff --git a/synapse/config/server.py b/synapse/config/server.py index a2b2305776..8bb97df175 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -395,12 +395,6 @@ class ServerConfig(Config): self.presence_router_config, ) = load_module(presence_router_config, ("presence", "presence_router")) - # whether to enable the media repository endpoints. This should be set - # to false if the media repository is running as a separate endpoint; - # doing so ensures that we will not run cache cleanup jobs on the - # master, potentially causing inconsistency. - self.enable_media_repo = config.get("enable_media_repo", True) - # Whether to require authentication to retrieve profile data (avatars, # display names) of other users through the client API. self.require_auth_for_profile_requests = config.get( diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 239967fa73..6f2988e64c 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -18,11 +18,13 @@ # # import logging +from enum import Enum from itertools import chain from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple import attr from immutabledict import immutabledict +from typing_extensions import assert_never from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership from synapse.events import EventBase @@ -37,7 +39,9 @@ from synapse.types import ( PersistedEventPosition, Requester, RoomStreamToken, + SlidingSyncStreamToken, StateMap, + StrCollection, StreamKeyType, StreamToken, UserID, @@ -343,11 +347,13 @@ class SlidingSyncHandler: self.relations_handler = hs.get_relations_handler() self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync + self.connection_store = SlidingSyncConnectionStore() + async def wait_for_sync_for_user( self, requester: Requester, sync_config: SlidingSyncConfig, - from_token: Optional[StreamToken] = None, + from_token: Optional[SlidingSyncStreamToken] = None, timeout_ms: int = 0, ) -> SlidingSyncResult: """ @@ -382,7 +388,7 @@ class SlidingSyncHandler: # this returns false, it means we timed out waiting, and we should # just return an empty response. before_wait_ts = self.clock.time_msec() - if not await self.notifier.wait_for_stream_token(from_token): + if not await self.notifier.wait_for_stream_token(from_token.stream_token): logger.warning( "Timed out waiting for worker to catch up. Returning empty response" ) @@ -420,7 +426,7 @@ class SlidingSyncHandler: sync_config.user.to_string(), timeout_ms, current_sync_callback, - from_token=from_token, + from_token=from_token.stream_token, ) return result @@ -430,7 +436,7 @@ class SlidingSyncHandler: self, sync_config: SlidingSyncConfig, to_token: StreamToken, - from_token: Optional[StreamToken] = None, + from_token: Optional[SlidingSyncStreamToken] = None, ) -> SlidingSyncResult: """ Generates the response body of a Sliding Sync result, represented as a @@ -451,6 +457,12 @@ class SlidingSyncHandler: # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() + await self.connection_store.mark_token_seen( + user_id, + conn_id=sync_config.connection_id(), + from_token=from_token, + ) + # Get all of the room IDs that the user should be able to see in the sync # response has_lists = sync_config.lists is not None and len(sync_config.lists) > 0 @@ -463,7 +475,7 @@ class SlidingSyncHandler: await self.get_room_membership_for_user_at_to_token( user=sync_config.user, to_token=to_token, - from_token=from_token, + from_token=from_token.stream_token if from_token else None, ) ) @@ -605,7 +617,7 @@ class SlidingSyncHandler: @tag_args async def handle_room(room_id: str) -> None: room_sync_result = await self.get_room_sync_data( - user=sync_config.user, + sync_config=sync_config, room_id=room_id, room_sync_config=room_sync_config, room_membership_for_user_at_to_token=room_membership_for_user_map[ @@ -624,8 +636,21 @@ class SlidingSyncHandler: sync_config=sync_config, to_token=to_token ) + if has_lists or has_room_subscriptions: + connection_token = await self.connection_store.record_rooms( + user_id, + conn_id=sync_config.connection_id(), + from_token=from_token, + sent_room_ids=relevant_room_map.keys(), + unsent_room_ids=[], # TODO: We currently ssume that we have sent down all updates. + ) + elif from_token: + connection_token = from_token.connection_token + else: + connection_token = 0 + return SlidingSyncResult( - next_pos=to_token, + next_pos=SlidingSyncStreamToken(to_token, connection_token), lists=lists, rooms=rooms, extensions=extensions, @@ -713,10 +738,17 @@ class SlidingSyncHandler: instance_to_max_stream_ordering_map[instance_name] = stream_ordering # Then assemble the `RoomStreamToken` + min_stream_pos = min(instance_to_max_stream_ordering_map.values()) membership_snapshot_token = RoomStreamToken( # Minimum position in the `instance_map` - stream=min(instance_to_max_stream_ordering_map.values()), - instance_map=immutabledict(instance_to_max_stream_ordering_map), + stream=min_stream_pos, + instance_map=immutabledict( + { + instance_name: stream_pos + for instance_name, stream_pos in instance_to_max_stream_ordering_map.items() + if stream_pos > min_stream_pos + } + ), ) # Since we fetched the users room list at some point in time after the from/to @@ -1359,11 +1391,11 @@ class SlidingSyncHandler: async def get_room_sync_data( self, - user: UserID, + sync_config: SlidingSyncConfig, room_id: str, room_sync_config: RoomSyncConfig, room_membership_for_user_at_to_token: _RoomMembershipForUser, - from_token: Optional[StreamToken], + from_token: Optional[SlidingSyncStreamToken], to_token: StreamToken, ) -> SlidingSyncResult.RoomResult: """ @@ -1381,6 +1413,38 @@ class SlidingSyncHandler: from_token: The point in the stream to sync from. to_token: The point in the stream to sync up to. """ + user = sync_config.user + + # Determine whether we should limit the timeline to the token range. + # + # We should return historical messages (before token range) in the + # following cases because we want clients to be able to show a basic + # screen of information: + # - Initial sync (because no `from_token` to limit us anyway) + # - When users `newly_joined` + # - For an incremental sync where we haven't sent it down this + # connection before + to_bound = None + initial = True + if from_token and not room_membership_for_user_at_to_token.newly_joined: + room_status = await self.connection_store.have_sent_room( + user_id=user.to_string(), + conn_id=sync_config.connection_id(), + connection_token=from_token.connection_token, + room_id=room_id, + ) + if room_status.status == HaveSentRoomFlag.LIVE: + to_bound = from_token.stream_token.room_key + initial = False + elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: + assert room_status.last_token is not None + to_bound = room_status.last_token + initial = False + elif room_status.status == HaveSentRoomFlag.NEVER: + to_bound = None + initial = True + else: + assert_never(room_status.status) # Assemble the list of timeline events # @@ -1417,22 +1481,6 @@ class SlidingSyncHandler: room_membership_for_user_at_to_token.event_pos.to_room_stream_token() ) - # Determine whether we should limit the timeline to the token range. - # - # We should return historical messages (before token range) in the - # following cases because we want clients to be able to show a basic - # screen of information: - # - Initial sync (because no `from_token` to limit us anyway) - # - When users `newly_joined` - # - TODO: For an incremental sync where we haven't sent it down this - # connection before - to_bound = ( - from_token.room_key - if from_token is not None - and not room_membership_for_user_at_to_token.newly_joined - else None - ) - fiddled_timeline_limit = room_sync_config.timeline_limit if to_bound: fiddled_timeline_limit = max(fiddled_timeline_limit, 10) @@ -1498,7 +1546,9 @@ class SlidingSyncHandler: instance_name=timeline_event.internal_metadata.instance_name, stream=timeline_event.internal_metadata.stream_ordering, ) - if persisted_position.persisted_after(from_token.room_key): + if persisted_position.persisted_after( + from_token.stream_token.room_key + ): num_live += 1 else: # Since we're iterating over the timeline events in @@ -1555,12 +1605,6 @@ class SlidingSyncHandler: # indicate to the client that a state reset happened. Perhaps we should indicate # this by setting `initial: True` and empty `required_state`. - # TODO: Since we can't determine whether we've already sent a room down this - # Sliding Sync connection before (we plan to add this optimization in the - # future), we're always returning the requested room state instead of - # updates. - initial = True - # Check whether the room has a name set name_state_ids = await self.get_current_state_ids_at( room_id=room_id, @@ -1711,9 +1755,17 @@ class SlidingSyncHandler: to_token=to_token, ) else: - # TODO: Once we can figure out if we've sent a room down this connection before, - # we can return updates instead of the full required state. - raise NotImplementedError() + assert to_bound is not None + + deltas = await self.store.get_current_state_deltas_for_room( + room_id, to_bound, to_token.room_key + ) + # TODO: Filter room state before fetching events + # TODO: Handle state resets where event_id is None + events = await self.store.get_events( + [d.event_id for d in deltas if d.event_id] + ) + room_state = {(s.type, s.state_key): s for s in events.values()} required_room_state: StateMap[EventBase] = {} if required_state_filter != StateFilter.none(): @@ -1841,7 +1893,7 @@ class SlidingSyncHandler: """ user_id = sync_config.user.to_string() - device_id = sync_config.device_id + device_id = sync_config.requester.device_id # Check that this request has a valid device ID (not all requests have # to belong to a device, and so device_id is None), and that the @@ -1898,3 +1950,198 @@ class SlidingSyncHandler: next_batch=f"{stream_id}", events=messages, ) + + +class HaveSentRoomFlag(Enum): + """Flag for whether we have sent the room down a sliding sync connection. + + The valid state changes here are: + NEVER -> LIVE + LIVE -> PREVIOUSLY + PREVIOUSLY -> LIVE + """ + + # The room has never been sent down (or we have forgotten we have sent it + # down). + NEVER = 1 + + # We have previously sent the room down, but there are updates that we + # haven't sent down. + PREVIOUSLY = 2 + + # We have sent the room down and the client has received all updates. + LIVE = 3 + + +@attr.s(auto_attribs=True, slots=True, frozen=True) +class HaveSentRoom: + """Whether we have sent the room down a sliding sync connection. + + Attributes: + status: Flag of if we have or haven't sent down the room + last_token: If the flag is `PREVIOUSLY` then this is non-null and + contains the last stream token of the last updates we sent down + the room, i.e. we still need to send everything since then to the + client. + """ + + status: HaveSentRoomFlag + last_token: Optional[RoomStreamToken] + + @staticmethod + def previously(last_token: RoomStreamToken) -> "HaveSentRoom": + """Constructor for `PREVIOUSLY` flag.""" + return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token) + + +HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None) +HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) + + +@attr.s(auto_attribs=True) +class SlidingSyncConnectionStore: + """In-memory store of per-connection state, including what rooms we have + previously sent down a sliding sync connection. + + Note: This is NOT safe to run in a worker setup. + + The complication here is that we need to handle requests being resent, i.e. + if we sent down a room in a response that the client received, we must + consider the room *not* sent when we get the request again. + + This is handled by using an integer "token", which is returned to the client + as part of the sync token. For each connection we store a mapping from + tokens to the room states, and create a new entry when we send down new + rooms. + + Note that for any given sliding sync connection we will only store a maximum + of two different tokens: the previous token from the request and a new token + sent in the response. When we receive a request with a given token, we then + clear out all other entries with a different token. + + Attributes: + _connections: Mapping from `(user_id, conn_id)` to mapping of `token` + to mapping of room ID to `HaveSentRoom`. + """ + + # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom` + _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = ( + attr.Factory(dict) + ) + + async def have_sent_room( + self, user_id: str, conn_id: str, connection_token: int, room_id: str + ) -> HaveSentRoom: + """Whether for the given user_id/conn_id/token, return whether we have + previously sent the room down + """ + + sync_statuses = self._connections.setdefault((user_id, conn_id), {}) + room_status = sync_statuses.get(connection_token, {}).get( + room_id, HAVE_SENT_ROOM_NEVER + ) + + return room_status + + async def record_rooms( + self, + user_id: str, + conn_id: str, + from_token: Optional[SlidingSyncStreamToken], + *, + sent_room_ids: StrCollection, + unsent_room_ids: StrCollection, + ) -> int: + """Record which rooms we have/haven't sent down in a new response + + Attributes: + user_id + conn_id + from_token: The since token from the request, if any + sent_room_ids: The set of room IDs that we have sent down as + part of this request (only needs to be ones we didn't + previously sent down). + unsent_room_ids: The set of room IDs that have had updates + since the `last_room_token`, but which were not included in + this request + """ + prev_connection_token = 0 + if from_token is not None: + prev_connection_token = from_token.connection_token + + # If there are no changes then this is a noop. + if not sent_room_ids and not unsent_room_ids: + return prev_connection_token + + sync_statuses = self._connections.setdefault((user_id, conn_id), {}) + + # Generate a new token, removing any existing entries in that token + # (which can happen if requests get resent). + new_store_token = prev_connection_token + 1 + sync_statuses.pop(new_store_token, None) + + # Copy over and update the room mappings. + new_room_statuses = dict(sync_statuses.get(prev_connection_token, {})) + + # Whether we have updated the `new_room_statuses`, if we don't by the + # end we can treat this as a noop. + have_updated = False + for room_id in sent_room_ids: + new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE + have_updated = True + + # Whether we add/update the entries for unsent rooms depends on the + # existing entry: + # - LIVE: We have previously sent down everything up to + # `last_room_token, so we update the entry to be `PREVIOUSLY` with + # `last_room_token`. + # - PREVIOUSLY: We have previously sent down everything up to *a* + # given token, so we don't need to update the entry. + # - NEVER: We have never previously sent down the room, and we haven't + # sent anything down this time either so we leave it as NEVER. + + # Work out the new state for unsent rooms that were `LIVE`. + if from_token: + new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) + else: + new_unsent_state = HAVE_SENT_ROOM_NEVER + + for room_id in unsent_room_ids: + prev_state = new_room_statuses.get(room_id) + if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE: + new_room_statuses[room_id] = new_unsent_state + have_updated = True + + if not have_updated: + return prev_connection_token + + sync_statuses[new_store_token] = new_room_statuses + + return new_store_token + + async def mark_token_seen( + self, + user_id: str, + conn_id: str, + from_token: Optional[SlidingSyncStreamToken], + ) -> None: + """We have received a request with the given token, so we can clear out + any other tokens associated with the connection. + + If there is no from token then we have started afresh, and so we delete + all tokens associated with the device. + """ + # Clear out any tokens for the connection that doesn't match the one + # from the request. + + sync_statuses = self._connections.pop((user_id, conn_id), {}) + if from_token is None: + return + + sync_statuses = { + i: room_statuses + for i, room_statuses in sync_statuses.items() + if i == from_token.connection_token + } + if sync_statuses: + self._connections[(user_id, conn_id)] = sync_statuses diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 749b01dd0e..6fd75fd381 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -90,7 +90,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.types import JsonDict from synapse.util import json_decoder -from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred +from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred from synapse.util.metrics import Measure from synapse.util.stringutils import parse_and_validate_server_name @@ -475,6 +475,8 @@ class MatrixFederationHttpClient: use_proxy=True, ) + self.remote_download_linearizer = Linearizer("remote_download_linearizer", 6) + def wake_destination(self, destination: str) -> None: """Called when the remote server may have come back online.""" @@ -1486,35 +1488,44 @@ class MatrixFederationHttpClient: ) headers = dict(response.headers.getAllRawHeaders()) - expected_size = response.length - # if we don't get an expected length then use the max length + if expected_size == UNKNOWN_LENGTH: expected_size = max_size - logger.debug( - f"File size unknown, assuming file is max allowable size: {max_size}" - ) + else: + if int(expected_size) > max_size: + msg = "Requested file is too large > %r bytes" % (max_size,) + logger.warning( + "{%s} [%s] %s", + request.txn_id, + request.destination, + msg, + ) + raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE) - read_body, _ = await download_ratelimiter.can_do_action( - requester=None, - key=ip_address, - n_actions=expected_size, - ) - if not read_body: - msg = "Requested file size exceeds ratelimits" - logger.warning( - "{%s} [%s] %s", - request.txn_id, - request.destination, - msg, + read_body, _ = await download_ratelimiter.can_do_action( + requester=None, + key=ip_address, + n_actions=expected_size, ) - raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED) + if not read_body: + msg = "Requested file size exceeds ratelimits" + logger.warning( + "{%s} [%s] %s", + request.txn_id, + request.destination, + msg, + ) + raise SynapseError( + HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED + ) try: - # add a byte of headroom to max size as function errs at >= - d = read_body_with_max_size(response, output_stream, expected_size + 1) - d.addTimeout(self.default_timeout_seconds, self.reactor) - length = await make_deferred_yieldable(d) + async with self.remote_download_linearizer.queue(ip_address): + # add a byte of headroom to max size as function errs at >= + d = read_body_with_max_size(response, output_stream, expected_size + 1) + d.addTimeout(self.default_timeout_seconds, self.reactor) + length = await make_deferred_yieldable(d) except BodyExceededMaxSize: msg = "Requested file is too large > %r bytes" % (expected_size,) logger.warning( @@ -1560,6 +1571,13 @@ class MatrixFederationHttpClient: request.method, request.uri.decode("ascii"), ) + + # if we didn't know the length upfront, decrement the actual size from ratelimiter + if response.length == UNKNOWN_LENGTH: + download_ratelimiter.record_action( + requester=None, key=ip_address, n_actions=length + ) + return length, headers async def federation_get_file( @@ -1630,29 +1648,37 @@ class MatrixFederationHttpClient: ) headers = dict(response.headers.getAllRawHeaders()) - expected_size = response.length - # if we don't get an expected length then use the max length + if expected_size == UNKNOWN_LENGTH: expected_size = max_size - logger.debug( - f"File size unknown, assuming file is max allowable size: {max_size}" - ) + else: + if int(expected_size) > max_size: + msg = "Requested file is too large > %r bytes" % (max_size,) + logger.warning( + "{%s} [%s] %s", + request.txn_id, + request.destination, + msg, + ) + raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE) - read_body, _ = await download_ratelimiter.can_do_action( - requester=None, - key=ip_address, - n_actions=expected_size, - ) - if not read_body: - msg = "Requested file size exceeds ratelimits" - logger.warning( - "{%s} [%s] %s", - request.txn_id, - request.destination, - msg, + read_body, _ = await download_ratelimiter.can_do_action( + requester=None, + key=ip_address, + n_actions=expected_size, ) - raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED) + if not read_body: + msg = "Requested file size exceeds ratelimits" + logger.warning( + "{%s} [%s] %s", + request.txn_id, + request.destination, + msg, + ) + raise SynapseError( + HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED + ) # this should be a multipart/mixed response with the boundary string in the header try: @@ -1672,11 +1698,12 @@ class MatrixFederationHttpClient: raise SynapseError(HTTPStatus.BAD_GATEWAY, msg) try: - # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >= - deferred = read_multipart_response( - response, output_stream, boundary, expected_size + 1 - ) - deferred.addTimeout(self.default_timeout_seconds, self.reactor) + async with self.remote_download_linearizer.queue(ip_address): + # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >= + deferred = read_multipart_response( + response, output_stream, boundary, expected_size + 1 + ) + deferred.addTimeout(self.default_timeout_seconds, self.reactor) except BodyExceededMaxSize: msg = "Requested file is too large > %r bytes" % (expected_size,) logger.warning( @@ -1743,6 +1770,13 @@ class MatrixFederationHttpClient: request.method, request.uri.decode("ascii"), ) + + # if we didn't know the length upfront, decrement the actual size from ratelimiter + if response.length == UNKNOWN_LENGTH: + download_ratelimiter.record_action( + requester=None, key=ip_address, n_actions=length + ) + return length, headers, multipart_response.json diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index b7e2c99455..7c91b15cef 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -54,7 +54,7 @@ from synapse.http.servlet import ( from synapse.http.site import SynapseRequest from synapse.logging.opentracing import log_kv, set_tag, trace_with_opname from synapse.rest.admin.experimental_features import ExperimentalFeature -from synapse.types import JsonDict, Requester, StreamToken +from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken from synapse.types.rest.client import SlidingSyncBody from synapse.util import json_decoder from synapse.util.caches.lrucache import LruCache @@ -881,7 +881,6 @@ class SlidingSyncRestServlet(RestServlet): ) user = requester.user - device_id = requester.device_id timeout = parse_integer(request, "timeout", default=0) # Position in the stream @@ -889,7 +888,9 @@ class SlidingSyncRestServlet(RestServlet): from_token = None if from_token_string is not None: - from_token = await StreamToken.from_string(self.store, from_token_string) + from_token = await SlidingSyncStreamToken.from_string( + self.store, from_token_string + ) # TODO: We currently don't know whether we're going to use sticky params or # maybe some filters like sync v2 where they are built up once and referenced @@ -904,11 +905,12 @@ class SlidingSyncRestServlet(RestServlet): sync_config = SlidingSyncConfig( user=user, - device_id=device_id, + requester=requester, # FIXME: Currently, we're just manually copying the fields from the - # `SlidingSyncBody` into the config. How can we gurantee into the future + # `SlidingSyncBody` into the config. How can we guarantee into the future # that we don't forget any? I would like something more structured like # `copy_attributes(from=body, to=config)` + conn_id=body.conn_id, lists=body.lists, room_subscriptions=body.room_subscriptions, extensions=body.extensions, diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py index 036972ac25..cd6cb2c7a9 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py @@ -26,6 +26,8 @@ import attr from synapse.storage._base import SQLBaseStore from synapse.storage.database import LoggingTransaction +from synapse.storage.databases.main.stream import _filter_results_by_stream +from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) @@ -156,3 +158,39 @@ class StateDeltasStore(SQLBaseStore): "get_max_stream_id_in_current_state_deltas", self._get_max_stream_id_in_current_state_deltas_txn, ) + + async def get_current_state_deltas_for_room( + self, room_id: str, from_token: RoomStreamToken, to_token: RoomStreamToken + ) -> List[StateDelta]: + """Get the state deltas between that have happened between two + tokens.""" + + def get_current_state_deltas_for_room_txn( + txn: LoggingTransaction, + ) -> List[StateDelta]: + sql = """ + SELECT instance_name, stream_id, type, state_key, event_id, prev_event_id + FROM current_state_delta_stream + WHERE room_id = ? AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC + """ + txn.execute( + sql, (room_id, from_token.stream, to_token.get_max_stream_pos()) + ) + + return [ + StateDelta( + stream_id=row[1], + room_id=room_id, + event_type=row[2], + state_key=row[3], + event_id=row[4], + prev_event_id=row[5], + ) + for row in txn + if _filter_results_by_stream(from_token, to_token, row[0], row[1]) + ] + + return await self.db_pool.runInteraction( + "get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn + ) diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index b22a13ef01..23ac1842f8 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -20,6 +20,7 @@ # # import abc +import logging import re import string from enum import Enum @@ -74,6 +75,9 @@ if TYPE_CHECKING: from synapse.storage.databases.main import DataStore, PurgeEventsStore from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore + +logger = logging.getLogger(__name__) + # Define a state map type from type/state_key to T (usually an event ID or # event) T = TypeVar("T") @@ -454,6 +458,8 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): represented by a default `stream` attribute and a map of instance name to stream position of any writers that are ahead of the default stream position. + + The values in `instance_map` must be greater than the `stream` attribute. """ stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True) @@ -468,6 +474,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): kw_only=True, ) + def __attrs_post_init__(self) -> None: + # Enforce that all instances have a value greater than the min stream + # position. + for i, v in self.instance_map.items(): + if v <= self.stream: + raise ValueError( + f"'instance_map' includes a stream position before the main 'stream' attribute. Instance: {i}" + ) + @classmethod @abc.abstractmethod async def parse(cls, store: "DataStore", string: str) -> "Self": @@ -494,6 +509,9 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): for instance in set(self.instance_map).union(other.instance_map) } + # Filter out any redundant entries. + instance_map = {i: s for i, s in instance_map.items() if s > max_stream} + return attr.evolve( self, stream=max_stream, instance_map=immutabledict(instance_map) ) @@ -539,10 +557,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): def bound_stream_token(self, max_stream: int) -> "Self": """Bound the stream positions to a maximum value""" + min_pos = min(self.stream, max_stream) return type(self)( - stream=min(self.stream, max_stream), + stream=min_pos, instance_map=immutabledict( - {k: min(s, max_stream) for k, s in self.instance_map.items()} + { + k: min(s, max_stream) + for k, s in self.instance_map.items() + if min(s, max_stream) > min_pos + } ), ) @@ -637,6 +660,8 @@ class RoomStreamToken(AbstractMultiWriterStreamToken): "Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'." ) + super().__attrs_post_init__() + @classmethod async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken": try: @@ -651,6 +676,11 @@ class RoomStreamToken(AbstractMultiWriterStreamToken): instance_map = {} for part in parts[1:]: + if not part: + # Handle tokens of the form `m5~`, which were created by + # a bug + continue + key, value = part.split(".") instance_id = int(key) pos = int(value) @@ -666,7 +696,10 @@ class RoomStreamToken(AbstractMultiWriterStreamToken): except CancelledError: raise except Exception: - pass + # We log an exception here as even though this *might* be a client + # handing a bad token, its more likely that Synapse returned a bad + # token (and we really want to catch those!). + logger.exception("Failed to parse stream token: %r", string) raise SynapseError(400, "Invalid room stream token %r" % (string,)) @classmethod @@ -713,6 +746,8 @@ class RoomStreamToken(AbstractMultiWriterStreamToken): return self.instance_map.get(instance_name, self.stream) async def to_string(self, store: "DataStore") -> str: + """See class level docstring for information about the format.""" + if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) elif self.instance_map: @@ -727,8 +762,10 @@ class RoomStreamToken(AbstractMultiWriterStreamToken): instance_id = await store.get_id_for_instance(name) entries.append(f"{instance_id}.{pos}") - encoded_map = "~".join(entries) - return f"m{self.stream}~{encoded_map}" + if entries: + encoded_map = "~".join(entries) + return f"m{self.stream}~{encoded_map}" + return f"s{self.stream}" else: return "s%d" % (self.stream,) @@ -756,6 +793,11 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken): instance_map = {} for part in parts[1:]: + if not part: + # Handle tokens of the form `m5~`, which were created by + # a bug + continue + key, value = part.split(".") instance_id = int(key) pos = int(value) @@ -770,10 +812,15 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken): except CancelledError: raise except Exception: - pass + # We log an exception here as even though this *might* be a client + # handing a bad token, its more likely that Synapse returned a bad + # token (and we really want to catch those!). + logger.exception("Failed to parse stream token: %r", string) raise SynapseError(400, "Invalid stream token %r" % (string,)) async def to_string(self, store: "DataStore") -> str: + """See class level docstring for information about the format.""" + if self.instance_map: entries = [] for name, pos in self.instance_map.items(): @@ -786,8 +833,10 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken): instance_id = await store.get_id_for_instance(name) entries.append(f"{instance_id}.{pos}") - encoded_map = "~".join(entries) - return f"m{self.stream}~{encoded_map}" + if entries: + encoded_map = "~".join(entries) + return f"m{self.stream}~{encoded_map}" + return str(self.stream) else: return str(self.stream) @@ -1089,6 +1138,43 @@ StreamToken.START = StreamToken( @attr.s(slots=True, frozen=True, auto_attribs=True) +class SlidingSyncStreamToken: + """The same as a `StreamToken`, but includes an extra field at the start for + the sliding sync connection token (separated by a '/'). This is used to + store per-connection state. + + This then looks something like: + 5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379 + """ + + stream_token: StreamToken + connection_token: int + + @staticmethod + @cancellable + async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken": + """Creates a SlidingSyncStreamToken from its textual representation.""" + try: + connection_token_str, stream_token_str = string.split("/", 1) + connection_token = int(connection_token_str) + stream_token = await StreamToken.from_string(store, stream_token_str) + + return SlidingSyncStreamToken( + stream_token=stream_token, + connection_token=connection_token, + ) + except CancelledError: + raise + except Exception: + raise SynapseError(400, "Invalid stream token") + + async def to_string(self, store: "DataStore") -> str: + """Serializes the token to a string""" + stream_token_str = await self.stream_token.to_string(store) + return f"{self.connection_token}/{stream_token_str}" + + +@attr.s(slots=True, frozen=True, auto_attribs=True) class PersistedPosition: """Position of a newly persisted row with instance that persisted it.""" diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py index 409120470a..0c2ab13c93 100644 --- a/synapse/types/handlers/__init__.py +++ b/synapse/types/handlers/__init__.py @@ -31,7 +31,14 @@ else: from pydantic import Extra from synapse.events import EventBase -from synapse.types import JsonDict, JsonMapping, StreamToken, UserID +from synapse.types import ( + JsonDict, + JsonMapping, + Requester, + SlidingSyncStreamToken, + StreamToken, + UserID, +) from synapse.types.rest.client import SlidingSyncBody if TYPE_CHECKING: @@ -102,7 +109,7 @@ class SlidingSyncConfig(SlidingSyncBody): """ user: UserID - device_id: Optional[str] + requester: Requester # Pydantic config class Config: @@ -113,6 +120,31 @@ class SlidingSyncConfig(SlidingSyncBody): # Allow custom types like `UserID` to be used in the model arbitrary_types_allowed = True + def connection_id(self) -> str: + """Return a string identifier for this connection. May clash with + connection IDs from different users. + + This is generally a combination of device ID and conn_id. However, both + these two are optional (e.g. puppet access tokens don't have device + IDs), so this handles those edge cases. + """ + + # `conn_id` can be null, in which case we default to the empty string + # (if conn ID is empty then the client can't have multiple sync loops) + conn_id = self.conn_id or "" + + if self.requester.device_id: + return f"D/{self.requester.device_id}/{conn_id}" + + if self.requester.access_token_id: + # If we don't have a device, then the access token ID should be a + # stable ID. + return f"A/{self.requester.access_token_id}/{conn_id}" + + # If we have neither then its likely an AS or some weird token. Either + # way we can just fail here. + raise Exception("Cannot use sliding sync with access token type") + class OperationType(Enum): """ @@ -287,7 +319,7 @@ class SlidingSyncResult: def __bool__(self) -> bool: return bool(self.to_device) - next_pos: StreamToken + next_pos: SlidingSyncStreamToken lists: Dict[str, SlidingWindowList] rooms: Dict[str, RoomResult] extensions: Extensions @@ -300,7 +332,7 @@ class SlidingSyncResult: return bool(self.lists or self.rooms or self.extensions) @staticmethod - def empty(next_pos: StreamToken) -> "SlidingSyncResult": + def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult": "Return a new empty result" return SlidingSyncResult( next_pos=next_pos, diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py index dbe37bc712..5be8cf5389 100644 --- a/synapse/types/rest/client/__init__.py +++ b/synapse/types/rest/client/__init__.py @@ -120,6 +120,9 @@ class SlidingSyncBody(RequestBodyModel): Sliding Sync API request body. Attributes: + conn_id: An optional string to identify this connection to the server. If this + is missing, only 1 sliding sync connection can be made to the server at + any one time. lists: Sliding window API. A map of list key to list information (:class:`SlidingSyncList`). Max lists: 100. The list keys should be arbitrary strings which the client is using to refer to the list. Keep this @@ -315,6 +318,8 @@ class SlidingSyncBody(RequestBodyModel): to_device: Optional[ToDeviceExtension] = None + conn_id: Optional[str] + # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884 if TYPE_CHECKING: lists: Optional[Dict[str, SlidingSyncList]] = None diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index 70912e22f8..e55001fb40 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -1057,13 +1057,15 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase): ) assert channel.code == 200 + @override_config({"remote_media_download_burst_count": "87M"}) @patch( "synapse.http.matrixfederationclient.read_body_with_max_size", read_body_with_max_size_30MiB, ) - def test_download_ratelimit_max_size_sub(self) -> None: + def test_download_ratelimit_unknown_length(self) -> None: """ - Test that if no content-length is provided, the default max size is applied instead + Test that if no content-length is provided, ratelimit will still be applied after + download once length is known """ # mock out actually sending the request @@ -1077,19 +1079,48 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase): self.client._send_request = _send_request # type: ignore - # ten requests should go through using the max size (500MB/50MB) - for i in range(10): - channel2 = self.make_request( + # 3 requests should go through (note 3rd one would technically violate ratelimit but + # is applied *after* download - the next one will be ratelimited) + for i in range(3): + channel = self.make_request( "GET", f"/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy{i}", shorthand=False, ) - assert channel2.code == 200 + assert channel.code == 200 - # eleventh will hit ratelimit - channel3 = self.make_request( + # 4th will hit ratelimit + channel2 = self.make_request( "GET", "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyx", shorthand=False, ) - assert channel3.code == 429 + assert channel2.code == 429 + + @override_config({"max_upload_size": "29M"}) + @patch( + "synapse.http.matrixfederationclient.read_body_with_max_size", + read_body_with_max_size_30MiB, + ) + def test_max_download_respected(self) -> None: + """ + Test that the max download size is enforced - note that max download size is determined + by the max_upload_size + """ + + # mock out actually sending the request + async def _send_request(*args: Any, **kwargs: Any) -> IResponse: + resp = MagicMock(spec=IResponse) + resp.code = 200 + resp.length = 31457280 + resp.headers = Headers({"Content-Type": ["application/octet-stream"]}) + resp.phrase = b"OK" + return resp + + self.client._send_request = _send_request # type: ignore + + channel = self.make_request( + "GET", "/_matrix/media/v3/download/remote.org/abcd", shorthand=False + ) + assert channel.code == 502 + assert channel.json_body["errcode"] == "M_TOO_LARGE" diff --git a/tests/rest/client/test_media.py b/tests/rest/client/test_media.py index 7f2caed7d5..466c5a0b70 100644 --- a/tests/rest/client/test_media.py +++ b/tests/rest/client/test_media.py @@ -1809,13 +1809,19 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase): ) assert channel.code == 200 + @override_config( + { + "remote_media_download_burst_count": "87M", + } + ) @patch( "synapse.http.matrixfederationclient.read_multipart_response", read_multipart_response_30MiB, ) - def test_download_ratelimit_max_size_sub(self) -> None: + def test_download_ratelimit_unknown_length(self) -> None: """ - Test that if no content-length is provided, the default max size is applied instead + Test that if no content-length is provided, ratelimiting is still applied after + media is downloaded and length is known """ # mock out actually sending the request @@ -1831,8 +1837,9 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase): self.client._send_request = _send_request # type: ignore - # ten requests should go through using the max size (500MB/50MB) - for i in range(10): + # first 3 will go through (note that 3rd request technically violates rate limit but + # that since the ratelimiting is applied *after* download it goes through, but next one fails) + for i in range(3): channel2 = self.make_request( "GET", f"/_matrix/client/v1/media/download/remote.org/abc{i}", @@ -1841,7 +1848,7 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase): ) assert channel2.code == 200 - # eleventh will hit ratelimit + # 4th will hit ratelimit channel3 = self.make_request( "GET", "/_matrix/client/v1/media/download/remote.org/abcd", @@ -1850,6 +1857,39 @@ class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase): ) assert channel3.code == 429 + @override_config({"max_upload_size": "29M"}) + @patch( + "synapse.http.matrixfederationclient.read_multipart_response", + read_multipart_response_30MiB, + ) + def test_max_download_respected(self) -> None: + """ + Test that the max download size is enforced - note that max download size is determined + by the max_upload_size + """ + + # mock out actually sending the request, returns a 30MiB response + async def _send_request(*args: Any, **kwargs: Any) -> IResponse: + resp = MagicMock(spec=IResponse) + resp.code = 200 + resp.length = 31457280 + resp.headers = Headers( + {"Content-Type": ["multipart/mixed; boundary=gc0p4Jq0M2Yt08jU534c0p"]} + ) + resp.phrase = b"OK" + return resp + + self.client._send_request = _send_request # type: ignore + + channel = self.make_request( + "GET", + "/_matrix/client/v1/media/download/remote.org/abcd", + shorthand=False, + access_token=self.tok, + ) + assert channel.code == 502 + assert channel.json_body["errcode"] == "M_TOO_LARGE" + def test_file_download(self) -> None: content = io.BytesIO(b"file_to_stream") content_uri = self.get_success( diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index f5d57e689c..b8d2a0a2d0 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -50,7 +50,14 @@ from synapse.rest.client import ( sync, ) from synapse.server import HomeServer -from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID +from synapse.types import ( + JsonDict, + RoomStreamToken, + SlidingSyncStreamToken, + StreamKeyType, + StreamToken, + UserID, +) from synapse.util import Clock from tests import unittest @@ -1448,7 +1455,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) future_position_token_serialized = self.get_success( - future_position_token.to_string(self.store) + SlidingSyncStreamToken(future_position_token, 0).to_string(self.store) ) # Make the Sliding Sync request @@ -2608,7 +2615,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): room_id1, "activity before token2", tok=user2_tok ) - from_token = self.event_sources.get_current_token() + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + from_token = channel.json_body["pos"] # Join the room after the `from_token` which will make us consider this room as # `newly_joined`. @@ -2630,8 +2652,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # Make an incremental Sliding Sync request (what we're trying to test) channel = self.make_request( "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", + self.sync_endpoint + f"?pos={from_token}", { "lists": { "foo-list": { @@ -2817,7 +2838,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity after invite3", tok=user2_tok) self.helper.send(room_id1, "activity after invite4", tok=user2_tok) - from_token = self.event_sources.get_current_token() + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + from_token = channel.json_body["pos"] self.helper.send(room_id1, "activity after token5", tok=user2_tok) self.helper.send(room_id1, "activity after toekn6", tok=user2_tok) @@ -2825,8 +2861,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", + self.sync_endpoint + f"?pos={from_token}", { "lists": { "foo-list": { @@ -3074,7 +3109,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity after invite3", tok=user2_tok) self.helper.send(room_id1, "activity after invite4", tok=user2_tok) - from_token = self.event_sources.get_current_token() + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + from_token = channel.json_body["pos"] self.helper.send(room_id1, "activity after token5", tok=user2_tok) self.helper.send(room_id1, "activity after toekn6", tok=user2_tok) @@ -3082,8 +3132,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", + self.sync_endpoint + f"?pos={from_token}", { "lists": { "foo-list": { @@ -3239,7 +3288,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity before2", tok=user2_tok) self.helper.join(room_id1, user1_id, tok=user1_tok) - from_token = self.event_sources.get_current_token() + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + from_token = channel.json_body["pos"] event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok) @@ -3255,8 +3319,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", + self.sync_endpoint + f"?pos={from_token}", { "lists": { "foo-list": { @@ -3316,15 +3379,29 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): self.helper.send(room_id1, "activity after3", tok=user2_tok) - from_token = self.event_sources.get_current_token() + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + from_token = channel.json_body["pos"] self.helper.send(room_id1, "activity after4", tok=user2_tok) # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", + self.sync_endpoint + f"?pos={from_token}", { "lists": { "foo-list": { @@ -3451,13 +3528,27 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id1, user1_id, tok=user1_tok) - after_room_token = self.event_sources.get_current_token() + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + after_room_token = channel.json_body["pos"] # Make the Sliding Sync request channel = self.make_request( "POST", - self.sync_endpoint - + f"?pos={self.get_success(after_room_token.to_string(self.store))}", + self.sync_endpoint + f"?pos={after_room_token}", { "lists": { "foo-list": { @@ -3476,21 +3567,9 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): ) self.assertEqual(channel.code, 200, channel.json_body) - state_map = self.get_success( - self.storage_controllers.state.get_current_state(room_id1) - ) - - # The returned state doesn't change from initial to incremental sync. In the - # future, we will only return updates but only if we've sent the room down the + # We only return updates but only if we've sent the room down the # connection before. - self._assertRequiredStateIncludes( - channel.json_body["rooms"][room_id1]["required_state"], - { - state_map[(EventTypes.Create, "")], - state_map[(EventTypes.RoomHistoryVisibility, "")], - }, - exact=True, - ) + self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state")) self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state")) def test_rooms_required_state_wildcard(self) -> None: @@ -3729,7 +3808,22 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): user3_id = self.register_user("user3", "pass") user3_tok = self.login(user3_id, "pass") - from_token = self.event_sources.get_current_token() + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 4, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + from_token = channel.json_body["pos"] room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) self.helper.join(room_id1, user1_id, tok=user1_tok) @@ -3767,8 +3861,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase): # Make the Sliding Sync request with lazy loading for the room members channel = self.make_request( "POST", - self.sync_endpoint - + f"?pos={self.get_success(from_token.to_string(self.store))}", + self.sync_endpoint + f"?pos={from_token}", { "lists": { "foo-list": { diff --git a/tests/server.py b/tests/server.py index f1cd0f76be..85602e6953 100644 --- a/tests/server.py +++ b/tests/server.py @@ -289,10 +289,6 @@ class FakeChannel: self._reactor.run() while not self.is_finished(): - # If there's a producer, tell it to resume producing so we get content - if self._producer: - self._producer.resumeProducing() - if self._reactor.seconds() > end_time: raise TimedOutException("Timed out waiting for request to finish.") diff --git a/tests/test_types.py b/tests/test_types.py index 944aa784fc..00adc65a5a 100644 --- a/tests/test_types.py +++ b/tests/test_types.py @@ -19,9 +19,18 @@ # # +from typing import Type +from unittest import skipUnless + +from immutabledict import immutabledict +from parameterized import parameterized_class + from synapse.api.errors import SynapseError from synapse.types import ( + AbstractMultiWriterStreamToken, + MultiWriterStreamToken, RoomAlias, + RoomStreamToken, UserID, get_domain_from_id, get_localpart_from_id, @@ -29,6 +38,7 @@ from synapse.types import ( ) from tests import unittest +from tests.utils import USE_POSTGRES_FOR_TESTS class IsMineIDTests(unittest.HomeserverTestCase): @@ -127,3 +137,64 @@ class MapUsernameTestCase(unittest.TestCase): # this should work with either a unicode or a bytes self.assertEqual(map_username_to_mxid_localpart("têst"), "t=c3=aast") self.assertEqual(map_username_to_mxid_localpart("têst".encode()), "t=c3=aast") + + +@parameterized_class( + ("token_type",), + [ + (MultiWriterStreamToken,), + (RoomStreamToken,), + ], + class_name_func=lambda cls, num, params_dict: f"{cls.__name__}_{params_dict['token_type'].__name__}", +) +class MultiWriterTokenTestCase(unittest.HomeserverTestCase): + """Tests for the different types of multi writer tokens.""" + + token_type: Type[AbstractMultiWriterStreamToken] + + def test_basic_token(self) -> None: + """Test that a simple stream token can be serialized and unserialized""" + store = self.hs.get_datastores().main + + token = self.token_type(stream=5) + + string_token = self.get_success(token.to_string(store)) + + if isinstance(token, RoomStreamToken): + self.assertEqual(string_token, "s5") + else: + self.assertEqual(string_token, "5") + + parsed_token = self.get_success(self.token_type.parse(store, string_token)) + self.assertEqual(parsed_token, token) + + @skipUnless(USE_POSTGRES_FOR_TESTS, "Requires Postgres") + def test_instance_map(self) -> None: + """Test for stream token with instance map""" + store = self.hs.get_datastores().main + + token = self.token_type(stream=5, instance_map=immutabledict({"foo": 6})) + + string_token = self.get_success(token.to_string(store)) + self.assertEqual(string_token, "m5~1.6") + + parsed_token = self.get_success(self.token_type.parse(store, string_token)) + self.assertEqual(parsed_token, token) + + def test_instance_map_assertion(self) -> None: + """Test that we assert values in the instance map are greater than the + min stream position""" + + with self.assertRaises(ValueError): + self.token_type(stream=5, instance_map=immutabledict({"foo": 4})) + + with self.assertRaises(ValueError): + self.token_type(stream=5, instance_map=immutabledict({"foo": 5})) + + def test_parse_bad_token(self) -> None: + """Test that we can parse tokens produced by a bug in Synapse of the + form `m5~`""" + store = self.hs.get_datastores().main + + parsed_token = self.get_success(self.token_type.parse(store, "m5~")) + self.assertEqual(parsed_token, self.token_type(stream=5)) diff --git a/tests/util/test_check_dependencies.py b/tests/util/test_check_dependencies.py index fb67146c69..13a4e6ddaa 100644 --- a/tests/util/test_check_dependencies.py +++ b/tests/util/test_check_dependencies.py @@ -21,6 +21,7 @@ from contextlib import contextmanager from os import PathLike +from pathlib import Path from typing import Generator, Optional, Union from unittest.mock import patch @@ -41,7 +42,7 @@ class DummyDistribution(metadata.Distribution): def version(self) -> str: return self._version - def locate_file(self, path: Union[str, PathLike]) -> PathLike: + def locate_file(self, path: Union[str, PathLike]) -> Path: raise NotImplementedError() def read_text(self, filename: str) -> None: |