diff --git a/Cargo.lock b/Cargo.lock
index 79d9cefcf6..61c0f1bd04 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -332,18 +332,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
-version = "1.0.183"
+version = "1.0.184"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
+checksum = "2c911f4b04d7385c9035407a4eff5903bf4fe270fa046fda448b69e797f4fff0"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
-version = "1.0.183"
+version = "1.0.184"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
+checksum = "c1df27f5b29406ada06609b2e2f77fb34f6dbb104a457a671cc31dbed237e09e"
dependencies = [
"proc-macro2",
"quote",
diff --git a/changelog.d/15891.feature b/changelog.d/15891.feature
new file mode 100644
index 0000000000..5024b5adc4
--- /dev/null
+++ b/changelog.d/15891.feature
@@ -0,0 +1 @@
+Implements a task scheduler for resumable potentially long running tasks.
diff --git a/changelog.d/16030.feature b/changelog.d/16030.feature
new file mode 100644
index 0000000000..c2f068085f
--- /dev/null
+++ b/changelog.d/16030.feature
@@ -0,0 +1 @@
+Allow specifying `client_secret_path` as alternative to `client_secret` for OIDC providers. This avoids leaking the client secret in the homeserver config. Contributed by @Ma27.
diff --git a/changelog.d/16114.feature b/changelog.d/16114.feature
new file mode 100644
index 0000000000..e937a3b029
--- /dev/null
+++ b/changelog.d/16114.feature
@@ -0,0 +1 @@
+Add an `admins` query parameter to the [List Accounts](https://matrix-org.github.io/synapse/v1.91/admin_api/user_admin_api.html#list-accounts) [admin API](https://matrix-org.github.io/synapse/v1.91/usage/administration/admin_api/index.html), to include only admins or to exclude admins in user queries.
\ No newline at end of file
diff --git a/changelog.d/16116.bugfix b/changelog.d/16116.bugfix
new file mode 100644
index 0000000000..f57a26ae39
--- /dev/null
+++ b/changelog.d/16116.bugfix
@@ -0,0 +1 @@
+Fix performance of state resolutions for large, old rooms that did not have the full auth chain persisted.
diff --git a/changelog.d/16125.misc b/changelog.d/16125.misc
new file mode 100644
index 0000000000..2f1bf23108
--- /dev/null
+++ b/changelog.d/16125.misc
@@ -0,0 +1 @@
+Add an admin endpoint to allow authorizing server to signal token revocations.
diff --git a/changelog.d/16127.bugfix b/changelog.d/16127.bugfix
new file mode 100644
index 0000000000..9ce5f4a705
--- /dev/null
+++ b/changelog.d/16127.bugfix
@@ -0,0 +1 @@
+User constent and 3-PID changes capability cannot be enabled when using experimental [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) support.
diff --git a/changelog.d/16131.misc b/changelog.d/16131.misc
new file mode 100644
index 0000000000..4f04699512
--- /dev/null
+++ b/changelog.d/16131.misc
@@ -0,0 +1 @@
+Add response time metrics for introspection requests for delegated auth.
diff --git a/changelog.d/16132.misc b/changelog.d/16132.misc
new file mode 100644
index 0000000000..aca26079d8
--- /dev/null
+++ b/changelog.d/16132.misc
@@ -0,0 +1 @@
+MSC3861: allow impersonation by an admin user using `_oidc_admin_impersonate_user_id` query parameter.
diff --git a/changelog.d/16134.bugfix b/changelog.d/16134.bugfix
new file mode 100644
index 0000000000..9ce5f4a705
--- /dev/null
+++ b/changelog.d/16134.bugfix
@@ -0,0 +1 @@
+User constent and 3-PID changes capability cannot be enabled when using experimental [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) support.
diff --git a/changelog.d/16148.bugfix b/changelog.d/16148.bugfix
new file mode 100644
index 0000000000..fea316f856
--- /dev/null
+++ b/changelog.d/16148.bugfix
@@ -0,0 +1 @@
+Fix performance degredation when there are a lot of in-flight replication requests.
diff --git a/changelog.d/16149.misc b/changelog.d/16149.misc
new file mode 100644
index 0000000000..8b6674d2aa
--- /dev/null
+++ b/changelog.d/16149.misc
@@ -0,0 +1 @@
+Increase performance of read/write locks.
diff --git a/changelog.d/16150.misc b/changelog.d/16150.misc
new file mode 100644
index 0000000000..97861282fd
--- /dev/null
+++ b/changelog.d/16150.misc
@@ -0,0 +1 @@
+Clean-up calling `setup_background_tasks` in unit tests.
diff --git a/changelog.d/16152.misc b/changelog.d/16152.misc
new file mode 100644
index 0000000000..f8bf9f2c52
--- /dev/null
+++ b/changelog.d/16152.misc
@@ -0,0 +1 @@
+Raised the poetry-core version cap to 1.7.0.
diff --git a/changelog.d/16156.bugfix b/changelog.d/16156.bugfix
new file mode 100644
index 0000000000..17284297cf
--- /dev/null
+++ b/changelog.d/16156.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in 1.87 where synapse would send an excessive amount of federation requests to servers which have been offline for a long time. Contributed by Nico.
diff --git a/changelog.d/16157.misc b/changelog.d/16157.misc
new file mode 100644
index 0000000000..c9d8999cca
--- /dev/null
+++ b/changelog.d/16157.misc
@@ -0,0 +1 @@
+Fix assertion in user directory unit tests.
diff --git a/changelog.d/16158.misc b/changelog.d/16158.misc
new file mode 100644
index 0000000000..41059378c5
--- /dev/null
+++ b/changelog.d/16158.misc
@@ -0,0 +1 @@
+Improve presence tests.
diff --git a/changelog.d/16159.misc b/changelog.d/16159.misc
new file mode 100644
index 0000000000..04cdd1afaf
--- /dev/null
+++ b/changelog.d/16159.misc
@@ -0,0 +1 @@
+Reduce scope of locks when paginating to alleviate DB contention.
diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md
index c269ce6af0..99abfea3a0 100644
--- a/docs/admin_api/user_admin_api.md
+++ b/docs/admin_api/user_admin_api.md
@@ -219,6 +219,8 @@ The following parameters should be set in the URL:
**or** displaynames that contain this value.
- `guests` - string representing a bool - Is optional and if `false` will **exclude** guest users.
Defaults to `true` to include guest users.
+- `admins` - Optional flag to filter admins. If `true`, only admins are queried. If `false`, admins are excluded from
+ the query. When the flag is absent (the default), **both** admins and non-admins are included in the search results.
- `deactivated` - string representing a bool - Is optional and if `true` will **include** deactivated users.
Defaults to `false` to exclude deactivated users.
- `limit` - string representing a positive integer - Is optional but is used for pagination,
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 6601bba9f2..743c51d76a 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -3204,6 +3204,14 @@ Options for each entry include:
* `client_secret`: oauth2 client secret to use. May be omitted if
`client_secret_jwt_key` is given, or if `client_auth_method` is 'none'.
+ Must be omitted if `client_secret_path` is specified.
+
+* `client_secret_path`: path to the oauth2 client secret to use. With that
+ it's not necessary to leak secrets into the config file itself.
+ Mutually exclusive with `client_secret`. Can be omitted if
+ `client_secret_jwt_key` is specified.
+
+ *Added in Synapse 1.91.0.*
* `client_secret_jwt_key`: Alternative to client_secret: details of a key used
to create a JSON Web Token to be used as an OAuth2 client secret. If
diff --git a/poetry.lock b/poetry.lock
index db1332a04b..e62c10da9f 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -397,13 +397,13 @@ files = [
[[package]]
name = "click"
-version = "8.1.6"
+version = "8.1.7"
description = "Composable command line interface toolkit"
optional = false
python-versions = ">=3.7"
files = [
- {file = "click-8.1.6-py3-none-any.whl", hash = "sha256:fa244bb30b3b5ee2cae3da8f55c9e5e0c0e86093306301fb418eb9dc40fbded5"},
- {file = "click-8.1.6.tar.gz", hash = "sha256:48ee849951919527a045bfe3bf7baa8a959c423134e1a5b98c05c20ba75a1cbd"},
+ {file = "click-8.1.7-py3-none-any.whl", hash = "sha256:ae74fb96c20a0277a1d615f1e4d73c8414f5a98db8b799a7931d1582f3390c28"},
+ {file = "click-8.1.7.tar.gz", hash = "sha256:ca9853ad459e787e2192211578cc907e7594e294c7ccc834310722b41b9ca6de"},
]
[package.dependencies]
@@ -726,89 +726,89 @@ files = [
[[package]]
name = "ijson"
-version = "3.2.1"
+version = "3.2.3"
description = "Iterative JSON parser with standard Python iterator interfaces"
optional = false
python-versions = "*"
files = [
- {file = "ijson-3.2.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:6f827f6961f093e1055a2be0c3137f0e7d667979da455ac9648f72d4a2bb8970"},
- {file = "ijson-3.2.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b6e51f4497065cd0d09f5e906cd538a8d22609eab716e3c883769acf147ab1b6"},
- {file = "ijson-3.2.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:f022686c40bff3e340627a5a0c9212718d529e787ada3b76ba546d47a9ecdbbd"},
- {file = "ijson-3.2.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e4105c15a13fa1dc24ebd3bf2e679fa14dcbfcc48bc39138a0fa3f4ddf6cc09b"},
- {file = "ijson-3.2.1-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:404423e666f185dfb753ddc92705c84dffdc4cc872aaf825bbe0607893cb5b02"},
- {file = "ijson-3.2.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:39e71f32830827cf21d0233a814092e5a23668e18f52eca5cac4f670d9df1240"},
- {file = "ijson-3.2.1-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:43af7ed5292caa1452747e2b62485b6c0ece4bcbc5bf6f2758abd547e4124a14"},
- {file = "ijson-3.2.1-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:e805aa6897a11b0f73f1f6bca078981df8960aeeccf527a214f240409c742bab"},
- {file = "ijson-3.2.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:5b2df0bd84889e9017a670328fe3e82ec509fd6744c7ac2c99c7ee2300d76afa"},
- {file = "ijson-3.2.1-cp310-cp310-win32.whl", hash = "sha256:675259c7ea7f51ffaf8cb9e79bf875e28bb09622892943f4f415588fd7ab7bec"},
- {file = "ijson-3.2.1-cp310-cp310-win_amd64.whl", hash = "sha256:90d4b2eb771a3585c8186820fe50e3282ef62477b865e765a50a8295674abeac"},
- {file = "ijson-3.2.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:fc581a61e210bf6013c1fa6536566e51127be1cfbd69539b63d8b813206d2fe0"},
- {file = "ijson-3.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:75cdf7ad4c00a8f5ac94ff27e3b7c1bf5ac463f125bca2be1744c5bc9600db5c"},
- {file = "ijson-3.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:85a2bf4636ace4d92e7c5d857a1c5694f42407c868953cf2927f18127bcd0d58"},
- {file = "ijson-3.2.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9fe0cb66e7dd4aa11da5fff60bdf5ee04819a5e6a57acf7ca12c65f7fc009afc"},
- {file = "ijson-3.2.1-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c6f7957ad38cb714378944032f2c2ee9c6531b5b0b38c5ccd08cedbb0ceddd02"},
- {file = "ijson-3.2.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:13283d264cca8a63e5bad91e82eec39711e95893e7e8d4a419799a8c5f85203a"},
- {file = "ijson-3.2.1-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:12c24cde850fe79bc806be0e9fc38b47dd5ac0a223070ccb12e9b695425e2936"},
- {file = "ijson-3.2.1-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:2ce8eed838e5a0791cb5948117b5453f2b3b3c28d93d06ee2bbf2c198c47881c"},
- {file = "ijson-3.2.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:b81c2589f191b0dc741f532be00b4bea617297dd9698431c8053e2d28272d4db"},
- {file = "ijson-3.2.1-cp311-cp311-win32.whl", hash = "sha256:ba2beac56ac96f728d0f2430e4c667c66819a423d321bb9db9ebdebd803e1b5b"},
- {file = "ijson-3.2.1-cp311-cp311-win_amd64.whl", hash = "sha256:c71614ed4bbc6a32ff1e42d7ce92a176fb67d658913343792d2c4567aa130817"},
- {file = "ijson-3.2.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:683fc8b0ea085e75ea34044fdc70649b37367d494f132a2bd1e59d7135054d89"},
- {file = "ijson-3.2.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:deeaecec2f4e20e8bec20b0a5cdc34daebe7903f2e700f7dcaef68b5925d35ea"},
- {file = "ijson-3.2.1-cp36-cp36m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:11923ac3188877f19dbb7051f7345202701cc39bf8e5ac44f8ae536c9eca8c82"},
- {file = "ijson-3.2.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:400deefcdae21e90fc39c1dcfc6ba2df24537e8c65bd57b763ed5256b73ba64d"},
- {file = "ijson-3.2.1-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:56bc4bad53770710a3a91944fe640fdeb269987a14352b74ebbad2aa55801c00"},
- {file = "ijson-3.2.1-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:f5a179523e085126844c6161aabcd193dbb5747bd01fadb68e92abf048f32ec9"},
- {file = "ijson-3.2.1-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:ee24655986e4415fbb7a0cf51445fff3072ceac0e219f4bbbd5c53535a3c5680"},
- {file = "ijson-3.2.1-cp36-cp36m-win32.whl", hash = "sha256:4a5c672b0540005c1bb0bba97aa559a87a2e4ee409fc68e2f5ba5b30f009ac99"},
- {file = "ijson-3.2.1-cp36-cp36m-win_amd64.whl", hash = "sha256:cfaf1d89b0e122e69c87a15db6d6f44feb9db96d2af7fe88cdc464177a257b5d"},
- {file = "ijson-3.2.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:1cbd052eb67c1b3611f25974ba967886e89391faaf55afec93808c19f06ca612"},
- {file = "ijson-3.2.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f13ffc491886e5d7bde7d68712d168bce0141b2a918db1164bc8599c0123e293"},
- {file = "ijson-3.2.1-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bc4c4fc6bafc777f8422fe36edb1cbd72a13cb29695893a064c9c95776a4bdf9"},
- {file = "ijson-3.2.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a42fcb2bf9748c26f004690b2feb6e13e4875bb7c9d83535f887c21e0a982a7c"},
- {file = "ijson-3.2.1-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:0c92f7bc2f3a947c2ba7f7aa48382c36079f8259c930e81d9164341f9b853c45"},
- {file = "ijson-3.2.1-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:fd497042db562379339660e787bc8679ed3abaa740768d39bc3746e769e7c7a5"},
- {file = "ijson-3.2.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:7d61c7cd8ddd75dcef818ff5a111a31b902a6a0e410ee0c2b2ecaa6dac92658a"},
- {file = "ijson-3.2.1-cp37-cp37m-win32.whl", hash = "sha256:36caf624d263fc40e7e805d759d09ea368d8cf497aecb3241ac2f0a286ad8eca"},
- {file = "ijson-3.2.1-cp37-cp37m-win_amd64.whl", hash = "sha256:32f9ed25ff80942e433119600bca13b86a8f9b8b0966edbc1d91a48ccbdd4d54"},
- {file = "ijson-3.2.1-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:e89bbd747140eac3a3c9e7e5835b90d85c4a02763fc5134861bfc1ea03b66ae7"},
- {file = "ijson-3.2.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:d69b4b1d509de36ec42a0e4af30ede39fb754e4039b2928ef7282ebc2125ffdd"},
- {file = "ijson-3.2.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:e7feb0771f50deabe6ce85b210fa9e005843d3d3c60fb3315d69e1f9d0d75e0c"},
- {file = "ijson-3.2.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5fd8148a363888054ff06eaaa1103f2f98720ab39666084a214e4fedfc13cf64"},
- {file = "ijson-3.2.1-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:598638dcc5141e9ae269903901877103f5362e0db4443e34721df8f8d34577b4"},
- {file = "ijson-3.2.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e979190b7d0fabca20d6b7224ab1c1aa461ad1ab72ba94f1bb1e5894cd59f342"},
- {file = "ijson-3.2.1-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:bc810eb80b4f486c7957201ba2a53f53ddc9b3233af67e4359e29371bf04883b"},
- {file = "ijson-3.2.1-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:26e758584611dfe826dd18ffd94dc0d8a062ce56e41674ad3bfa371c7b78c4b5"},
- {file = "ijson-3.2.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:24e9ae5b35b85ea094b6c36495bc856089254aed6a48bada8d7eec5a04f74439"},
- {file = "ijson-3.2.1-cp38-cp38-win32.whl", hash = "sha256:4b5dc7b5b4b8cb3087d188f37911cd67e26672d33d3571e73440de3f0a86f7e6"},
- {file = "ijson-3.2.1-cp38-cp38-win_amd64.whl", hash = "sha256:1af94ff40609270bbb3eac47e072582bb578f5023fac8408cccd80fe5892d221"},
- {file = "ijson-3.2.1-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:2dda67affceebc52c8bc5fe72c3a4a1e338e4d4b0497dbac5089c2d3862df214"},
- {file = "ijson-3.2.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:bd780303ddfedc8d57cdb9f2d53a8cea2f2f4a6fb857bf8fe5a0c3ab1d4ca901"},
- {file = "ijson-3.2.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:4fbab6af1bab88a8e46beda08cf44610eed0adb8d157a1a60b4bb6c3a121c6de"},
- {file = "ijson-3.2.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c97a07988a1e0ce2bc8e8a62eb5f25195a3bd58a939ac353cbc6018a548cc08d"},
- {file = "ijson-3.2.1-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:a65671a6826ae723837143914c823ad7bcc0d1a3e38d87c71df897a2556fb48f"},
- {file = "ijson-3.2.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a1806372008bbed9ee92db5747e38c047fa1c4ee89cb2dd5daaa57feb46ce50a"},
- {file = "ijson-3.2.1-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:91e5a8e96f78a59e2520078c227a4fec5bf91c13adeded9e33fb13981cb823c3"},
- {file = "ijson-3.2.1-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:1f820fce8ef093718f2319ff6f1322390664659b783775919dadccb1b470153d"},
- {file = "ijson-3.2.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:bca3e8c91a1076a20620dbaa6a2848772b0e8a4055e86d42d3fa39221b53ed1a"},
- {file = "ijson-3.2.1-cp39-cp39-win32.whl", hash = "sha256:de87f137b7438d43840f4339a37d4e6a58c987f4bb2a70609969f854f8ae20f3"},
- {file = "ijson-3.2.1-cp39-cp39-win_amd64.whl", hash = "sha256:0caebb8350b47266a58b766ec08e1de441d6d160702c428b5cf7504d93c832c4"},
- {file = "ijson-3.2.1-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:37389785c1abd27fcc24800fcfa9a6b1022743413e4056507fd32356b623ff33"},
- {file = "ijson-3.2.1-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4b364b82231d51cbeae52468c3b27e8a042e544ab764c8f3975e912cf010603f"},
- {file = "ijson-3.2.1-pp37-pypy37_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0a5999d0ec28a8ec47cf20c736fd4f895dc077bf6441bf237b00b074315a295d"},
- {file = "ijson-3.2.1-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8bd481857a39305517fb6f1313d558c2dc4e78c9e9384cc5bc1c3e28f1afbedf"},
- {file = "ijson-3.2.1-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:545f62f12f89350d4d73f2a779cb269198ae578fac080085a1927148b803e602"},
- {file = "ijson-3.2.1-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:4d5622505d01c2f3d7b9638c1eb8c747eb550936b505225893704289ff28576f"},
- {file = "ijson-3.2.1-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:20293bb36423b129fad3753858ccf7b2ccb5b2c0d3759efe810d0b9d79633a7e"},
- {file = "ijson-3.2.1-pp38-pypy38_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7cd8a4921b852fd2cb5b0c985540c97ff6893139a57fe7121d510ec5d1c0ca44"},
- {file = "ijson-3.2.1-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:dc902ff1ae1efed7d526294d7a9dd3df66d29b2cdc05fb5479838fef1327a534"},
- {file = "ijson-3.2.1-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:2925a7978d8170146a9cb49a15a982b71fbbf21980bf2e16cd90c528545b7c02"},
- {file = "ijson-3.2.1-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:c21c6509f6944939399f3630c5dc424d30d71d375f6cd58f9af56158fdf7251c"},
- {file = "ijson-3.2.1-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f5729fc7648bc972d70922d7dad15459cca3a9e5ed0328eb9ae3ffa004066194"},
- {file = "ijson-3.2.1-pp39-pypy39_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:805a2d5ed5a15d60327bc9347f2d4125ab621fb18071db98b1c598f1ee99e8f1"},
- {file = "ijson-3.2.1-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3d0220a4b6c63f44589e429157174e3f4b8d1e534d5fb82bdb43a7f8dd77ae4b"},
- {file = "ijson-3.2.1-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:271d9b7c083f65c58ff0afd9dbb5d2f3d445f734632aebfef4a261b0a337abdb"},
- {file = "ijson-3.2.1.tar.gz", hash = "sha256:8574bf19f31fab870488769ad919a80f130825236ac8bde9a733f69c2961d7a7"},
+ {file = "ijson-3.2.3-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:0a4ae076bf97b0430e4e16c9cb635a6b773904aec45ed8dcbc9b17211b8569ba"},
+ {file = "ijson-3.2.3-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:cfced0a6ec85916eb8c8e22415b7267ae118eaff2a860c42d2cc1261711d0d31"},
+ {file = "ijson-3.2.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:0b9d1141cfd1e6d6643aa0b4876730d0d28371815ce846d2e4e84a2d4f471cf3"},
+ {file = "ijson-3.2.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9e0a27db6454edd6013d40a956d008361aac5bff375a9c04ab11fc8c214250b5"},
+ {file = "ijson-3.2.3-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3c0d526ccb335c3c13063c273637d8611f32970603dfb182177b232d01f14c23"},
+ {file = "ijson-3.2.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:545a30b3659df2a3481593d30d60491d1594bc8005f99600e1bba647bb44cbb5"},
+ {file = "ijson-3.2.3-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:9680e37a10fedb3eab24a4a7e749d8a73f26f1a4c901430e7aa81b5da15f7307"},
+ {file = "ijson-3.2.3-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:2a80c0bb1053055d1599e44dc1396f713e8b3407000e6390add72d49633ff3bb"},
+ {file = "ijson-3.2.3-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:f05ed49f434ce396ddcf99e9fd98245328e99f991283850c309f5e3182211a79"},
+ {file = "ijson-3.2.3-cp310-cp310-win32.whl", hash = "sha256:b4eb2304573c9fdf448d3fa4a4fdcb727b93002b5c5c56c14a5ffbbc39f64ae4"},
+ {file = "ijson-3.2.3-cp310-cp310-win_amd64.whl", hash = "sha256:923131f5153c70936e8bd2dd9dcfcff43c67a3d1c789e9c96724747423c173eb"},
+ {file = "ijson-3.2.3-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:904f77dd3d87736ff668884fe5197a184748eb0c3e302ded61706501d0327465"},
+ {file = "ijson-3.2.3-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:0974444c1f416e19de1e9f567a4560890095e71e81623c509feff642114c1e53"},
+ {file = "ijson-3.2.3-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:c1a4b8eb69b6d7b4e94170aa991efad75ba156b05f0de2a6cd84f991def12ff9"},
+ {file = "ijson-3.2.3-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d052417fd7ce2221114f8d3b58f05a83c1a2b6b99cafe0b86ac9ed5e2fc889df"},
+ {file = "ijson-3.2.3-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7b8064a85ec1b0beda7dd028e887f7112670d574db606f68006c72dd0bb0e0e2"},
+ {file = "ijson-3.2.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eaac293853f1342a8d2a45ac1f723c860f700860e7743fb97f7b76356df883a8"},
+ {file = "ijson-3.2.3-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:6c32c18a934c1dc8917455b0ce478fd7a26c50c364bd52c5a4fb0fc6bb516af7"},
+ {file = "ijson-3.2.3-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:713a919e0220ac44dab12b5fed74f9130f3480e55e90f9d80f58de129ea24f83"},
+ {file = "ijson-3.2.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:4a3a6a2fbbe7550ffe52d151cf76065e6b89cfb3e9d0463e49a7e322a25d0426"},
+ {file = "ijson-3.2.3-cp311-cp311-win32.whl", hash = "sha256:6a4db2f7fb9acfb855c9ae1aae602e4648dd1f88804a0d5cfb78c3639bcf156c"},
+ {file = "ijson-3.2.3-cp311-cp311-win_amd64.whl", hash = "sha256:ccd6be56335cbb845f3d3021b1766299c056c70c4c9165fb2fbe2d62258bae3f"},
+ {file = "ijson-3.2.3-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:eeb286639649fb6bed37997a5e30eefcacddac79476d24128348ec890b2a0ccb"},
+ {file = "ijson-3.2.3-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:396338a655fb9af4ac59dd09c189885b51fa0eefc84d35408662031023c110d1"},
+ {file = "ijson-3.2.3-cp36-cp36m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:0e0243d166d11a2a47c17c7e885debf3b19ed136be2af1f5d1c34212850236ac"},
+ {file = "ijson-3.2.3-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:85afdb3f3a5d0011584d4fa8e6dccc5936be51c27e84cd2882fe904ca3bd04c5"},
+ {file = "ijson-3.2.3-cp36-cp36m-musllinux_1_1_aarch64.whl", hash = "sha256:4fc35d569eff3afa76bfecf533f818ecb9390105be257f3f83c03204661ace70"},
+ {file = "ijson-3.2.3-cp36-cp36m-musllinux_1_1_i686.whl", hash = "sha256:455d7d3b7a6aacfb8ab1ebcaf697eedf5be66e044eac32508fccdc633d995f0e"},
+ {file = "ijson-3.2.3-cp36-cp36m-musllinux_1_1_x86_64.whl", hash = "sha256:c63f3d57dbbac56cead05b12b81e8e1e259f14ce7f233a8cbe7fa0996733b628"},
+ {file = "ijson-3.2.3-cp36-cp36m-win32.whl", hash = "sha256:a4d7fe3629de3ecb088bff6dfe25f77be3e8261ed53d5e244717e266f8544305"},
+ {file = "ijson-3.2.3-cp36-cp36m-win_amd64.whl", hash = "sha256:96190d59f015b5a2af388a98446e411f58ecc6a93934e036daa75f75d02386a0"},
+ {file = "ijson-3.2.3-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:35194e0b8a2bda12b4096e2e792efa5d4801a0abb950c48ade351d479cd22ba5"},
+ {file = "ijson-3.2.3-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d1053fb5f0b010ee76ca515e6af36b50d26c1728ad46be12f1f147a835341083"},
+ {file = "ijson-3.2.3-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:211124cff9d9d139dd0dfced356f1472860352c055d2481459038b8205d7d742"},
+ {file = "ijson-3.2.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:92dc4d48e9f6a271292d6079e9fcdce33c83d1acf11e6e12696fb05c5889fe74"},
+ {file = "ijson-3.2.3-cp37-cp37m-musllinux_1_1_aarch64.whl", hash = "sha256:3dcc33ee56f92a77f48776014ddb47af67c33dda361e84371153c4f1ed4434e1"},
+ {file = "ijson-3.2.3-cp37-cp37m-musllinux_1_1_i686.whl", hash = "sha256:98c6799925a5d1988da4cd68879b8eeab52c6e029acc45e03abb7921a4715c4b"},
+ {file = "ijson-3.2.3-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:4252e48c95cd8ceefc2caade310559ab61c37d82dfa045928ed05328eb5b5f65"},
+ {file = "ijson-3.2.3-cp37-cp37m-win32.whl", hash = "sha256:644f4f03349ff2731fd515afd1c91b9e439e90c9f8c28292251834154edbffca"},
+ {file = "ijson-3.2.3-cp37-cp37m-win_amd64.whl", hash = "sha256:ba33c764afa9ecef62801ba7ac0319268a7526f50f7601370d9f8f04e77fc02b"},
+ {file = "ijson-3.2.3-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:4b2ec8c2a3f1742cbd5f36b65e192028e541b5fd8c7fd97c1fc0ca6c427c704a"},
+ {file = "ijson-3.2.3-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:7dc357da4b4ebd8903e77dbcc3ce0555ee29ebe0747c3c7f56adda423df8ec89"},
+ {file = "ijson-3.2.3-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:bcc51c84bb220ac330122468fe526a7777faa6464e3b04c15b476761beea424f"},
+ {file = "ijson-3.2.3-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f8d54b624629f9903005c58d9321a036c72f5c212701bbb93d1a520ecd15e370"},
+ {file = "ijson-3.2.3-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d6ea7c7e3ec44742e867c72fd750c6a1e35b112f88a917615332c4476e718d40"},
+ {file = "ijson-3.2.3-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:916acdc5e504f8b66c3e287ada5d4b39a3275fc1f2013c4b05d1ab9933671a6c"},
+ {file = "ijson-3.2.3-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:81815b4184b85ce124bfc4c446d5f5e5e643fc119771c5916f035220ada29974"},
+ {file = "ijson-3.2.3-cp38-cp38-musllinux_1_1_i686.whl", hash = "sha256:b49fd5fe1cd9c1c8caf6c59f82b08117dd6bea2ec45b641594e25948f48f4169"},
+ {file = "ijson-3.2.3-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:86b3c91fdcb8ffb30556c9669930f02b7642de58ca2987845b04f0d7fe46d9a8"},
+ {file = "ijson-3.2.3-cp38-cp38-win32.whl", hash = "sha256:a729b0c8fb935481afe3cf7e0dadd0da3a69cc7f145dbab8502e2f1e01d85a7c"},
+ {file = "ijson-3.2.3-cp38-cp38-win_amd64.whl", hash = "sha256:d34e049992d8a46922f96483e96b32ac4c9cffd01a5c33a928e70a283710cd58"},
+ {file = "ijson-3.2.3-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:9c2a12dcdb6fa28f333bf10b3a0f80ec70bc45280d8435be7e19696fab2bc706"},
+ {file = "ijson-3.2.3-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:1844c5b57da21466f255a0aeddf89049e730d7f3dfc4d750f0e65c36e6a61a7c"},
+ {file = "ijson-3.2.3-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:2ec3e5ff2515f1c40ef6a94983158e172f004cd643b9e4b5302017139b6c96e4"},
+ {file = "ijson-3.2.3-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:46bafb1b9959872a1f946f8dd9c6f1a30a970fc05b7bfae8579da3f1f988e598"},
+ {file = "ijson-3.2.3-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ab4db9fee0138b60e31b3c02fff8a4c28d7b152040553b6a91b60354aebd4b02"},
+ {file = "ijson-3.2.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:f4bc87e69d1997c6a55fff5ee2af878720801ff6ab1fb3b7f94adda050651e37"},
+ {file = "ijson-3.2.3-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:e9fd906f0c38e9f0bfd5365e1bed98d649f506721f76bb1a9baa5d7374f26f19"},
+ {file = "ijson-3.2.3-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:e84d27d1acb60d9102728d06b9650e5b7e5cb0631bd6e3dfadba8fb6a80d6c2f"},
+ {file = "ijson-3.2.3-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:2cc04fc0a22bb945cd179f614845c8b5106c0b3939ee0d84ce67c7a61ac1a936"},
+ {file = "ijson-3.2.3-cp39-cp39-win32.whl", hash = "sha256:e641814793a037175f7ec1b717ebb68f26d89d82cfd66f36e588f32d7e488d5f"},
+ {file = "ijson-3.2.3-cp39-cp39-win_amd64.whl", hash = "sha256:6bd3e7e91d031f1e8cea7ce53f704ab74e61e505e8072467e092172422728b22"},
+ {file = "ijson-3.2.3-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:06f9707da06a19b01013f8c65bf67db523662a9b4a4ff027e946e66c261f17f0"},
+ {file = "ijson-3.2.3-pp37-pypy37_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:be8495f7c13fa1f622a2c6b64e79ac63965b89caf664cc4e701c335c652d15f2"},
+ {file = "ijson-3.2.3-pp37-pypy37_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7596b42f38c3dcf9d434dddd50f46aeb28e96f891444c2b4b1266304a19a2c09"},
+ {file = "ijson-3.2.3-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fbac4e9609a1086bbad075beb2ceec486a3b138604e12d2059a33ce2cba93051"},
+ {file = "ijson-3.2.3-pp37-pypy37_pp73-win_amd64.whl", hash = "sha256:db2d6341f9cb538253e7fe23311d59252f124f47165221d3c06a7ed667ecd595"},
+ {file = "ijson-3.2.3-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:fa8b98be298efbb2588f883f9953113d8a0023ab39abe77fe734b71b46b1220a"},
+ {file = "ijson-3.2.3-pp38-pypy38_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:674e585361c702fad050ab4c153fd168dc30f5980ef42b64400bc84d194e662d"},
+ {file = "ijson-3.2.3-pp38-pypy38_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:fd12e42b9cb9c0166559a3ffa276b4f9fc9d5b4c304e5a13668642d34b48b634"},
+ {file = "ijson-3.2.3-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d31e0d771d82def80cd4663a66de277c3b44ba82cd48f630526b52f74663c639"},
+ {file = "ijson-3.2.3-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:7ce4c70c23521179d6da842bb9bc2e36bb9fad1e0187e35423ff0f282890c9ca"},
+ {file = "ijson-3.2.3-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:39f551a6fbeed4433c85269c7c8778e2aaea2501d7ebcb65b38f556030642c17"},
+ {file = "ijson-3.2.3-pp39-pypy39_pp73-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3b14d322fec0de7af16f3ef920bf282f0dd747200b69e0b9628117f381b7775b"},
+ {file = "ijson-3.2.3-pp39-pypy39_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:7851a341429b12d4527ca507097c959659baf5106c7074d15c17c387719ffbcd"},
+ {file = "ijson-3.2.3-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:db3bf1b42191b5cc9b6441552fdcb3b583594cb6b19e90d1578b7cbcf80d0fae"},
+ {file = "ijson-3.2.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:6f662dc44362a53af3084d3765bb01cd7b4734d1f484a6095cad4cb0cbfe5374"},
+ {file = "ijson-3.2.3.tar.gz", hash = "sha256:10294e9bf89cb713da05bc4790bdff616610432db561964827074898e174f917"},
]
[[package]]
@@ -1881,13 +1881,13 @@ email = ["email-validator (>=1.0.3)"]
[[package]]
name = "pygithub"
-version = "1.59.0"
+version = "1.59.1"
description = "Use the full Github API v3"
optional = false
python-versions = ">=3.7"
files = [
- {file = "PyGithub-1.59.0-py3-none-any.whl", hash = "sha256:126bdbae72087d8d038b113aab6b059b4553cb59348e3024bb1a1cae406ace9e"},
- {file = "PyGithub-1.59.0.tar.gz", hash = "sha256:6e05ff49bac3caa7d1d6177a10c6e55a3e20c85b92424cc198571fd0cf786690"},
+ {file = "PyGithub-1.59.1-py3-none-any.whl", hash = "sha256:3d87a822e6c868142f0c2c4bf16cce4696b5a7a4d142a7bd160e1bdf75bc54a9"},
+ {file = "PyGithub-1.59.1.tar.gz", hash = "sha256:c44e3a121c15bf9d3a5cc98d94c9a047a5132a9b01d22264627f58ade9ddc217"},
]
[package.dependencies]
@@ -2385,13 +2385,13 @@ doc = ["Sphinx", "sphinx-rtd-theme"]
[[package]]
name = "sentry-sdk"
-version = "1.28.1"
+version = "1.29.2"
description = "Python client for Sentry (https://sentry.io)"
optional = true
python-versions = "*"
files = [
- {file = "sentry-sdk-1.28.1.tar.gz", hash = "sha256:dcd88c68aa64dae715311b5ede6502fd684f70d00a7cd4858118f0ba3153a3ae"},
- {file = "sentry_sdk-1.28.1-py2.py3-none-any.whl", hash = "sha256:6bdb25bd9092478d3a817cb0d01fa99e296aea34d404eac3ca0037faa5c2aa0a"},
+ {file = "sentry-sdk-1.29.2.tar.gz", hash = "sha256:a99ee105384788c3f228726a88baf515fe7b5f1d2d0f215a03d194369f158df7"},
+ {file = "sentry_sdk-1.29.2-py2.py3-none-any.whl", hash = "sha256:3e17215d8006612e2df02b0e73115eb8376c37e3f586d8436fa41644e605074d"},
]
[package.dependencies]
@@ -3013,13 +3013,13 @@ files = [
[[package]]
name = "types-pyopenssl"
-version = "23.2.0.1"
+version = "23.2.0.2"
description = "Typing stubs for pyOpenSSL"
optional = false
python-versions = "*"
files = [
- {file = "types-pyOpenSSL-23.2.0.1.tar.gz", hash = "sha256:beeb5d22704c625a1e4b6dc756355c5b4af0b980138b702a9d9f932acf020903"},
- {file = "types_pyOpenSSL-23.2.0.1-py3-none-any.whl", hash = "sha256:0568553f104466f1b8e0db3360fbe6770137d02e21a1a45c209bf2b1b03d90d4"},
+ {file = "types-pyOpenSSL-23.2.0.2.tar.gz", hash = "sha256:6a010dac9ecd42b582d7dd2cc3e9e40486b79b3b64bb2fffba1474ff96af906d"},
+ {file = "types_pyOpenSSL-23.2.0.2-py3-none-any.whl", hash = "sha256:19536aa3debfbe25a918cf0d898e9f5fbbe6f3594a429da7914bf331deb1b342"},
]
[package.dependencies]
diff --git a/pyproject.toml b/pyproject.toml
index 86680cb8e5..0585a9b01e 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -367,7 +367,7 @@ furo = ">=2022.12.7,<2024.0.0"
# system changes.
# We are happy to raise these upper bounds upon request,
# provided we check that it's safe to do so (i.e. that CI passes).
-requires = ["poetry-core>=1.1.0,<=1.6.0", "setuptools_rust>=1.3,<=1.6.0"]
+requires = ["poetry-core>=1.1.0,<=1.7.0", "setuptools_rust>=1.3,<=1.6.0"]
build-backend = "poetry.core.masonry.api"
diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py
index 3a516093f5..14cba50c90 100644
--- a/synapse/api/auth/msc3861_delegated.py
+++ b/synapse/api/auth/msc3861_delegated.py
@@ -20,6 +20,7 @@ from authlib.oauth2.auth import encode_client_secret_basic, encode_client_secret
from authlib.oauth2.rfc7523 import ClientSecretJWT, PrivateKeyJWT, private_key_jwt_sign
from authlib.oauth2.rfc7662 import IntrospectionToken
from authlib.oidc.discovery import OpenIDProviderMetadata, get_well_known_url
+from prometheus_client import Histogram
from twisted.web.client import readBody
from twisted.web.http_headers import Headers
@@ -46,6 +47,13 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+introspection_response_timer = Histogram(
+ "synapse_api_auth_delegated_introspection_response",
+ "Time taken to get a response for an introspection request",
+ ["code"],
+)
+
+
# Scope as defined by MSC2967
# https://github.com/matrix-org/matrix-spec-proposals/pull/2967
SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*"
@@ -190,14 +198,26 @@ class MSC3861DelegatedAuth(BaseAuth):
# Do the actual request
# We're not using the SimpleHttpClient util methods as we don't want to
# check the HTTP status code, and we do the body encoding ourselves.
- response = await self._http_client.request(
- method="POST",
- uri=uri,
- data=body.encode("utf-8"),
- headers=headers,
- )
- resp_body = await make_deferred_yieldable(readBody(response))
+ start_time = self._clock.time()
+ try:
+ response = await self._http_client.request(
+ method="POST",
+ uri=uri,
+ data=body.encode("utf-8"),
+ headers=headers,
+ )
+
+ resp_body = await make_deferred_yieldable(readBody(response))
+ except Exception:
+ end_time = self._clock.time()
+ introspection_response_timer.labels("ERR").observe(end_time - start_time)
+ raise
+
+ end_time = self._clock.time()
+ introspection_response_timer.labels(response.code).observe(
+ end_time - start_time
+ )
if response.code < 200 or response.code >= 300:
raise HttpResponseException(
@@ -226,7 +246,7 @@ class MSC3861DelegatedAuth(BaseAuth):
return introspection_token
async def is_server_admin(self, requester: Requester) -> bool:
- return "urn:synapse:admin:*" in requester.scope
+ return SCOPE_SYNAPSE_ADMIN in requester.scope
async def get_user_by_req(
self,
@@ -243,6 +263,25 @@ class MSC3861DelegatedAuth(BaseAuth):
# so that we don't provision the user if they don't have enough permission:
requester = await self.get_user_by_access_token(access_token, allow_expired)
+ # Allow impersonation by an admin user using `_oidc_admin_impersonate_user_id` query parameter
+ if request.args is not None:
+ user_id_params = request.args.get(b"_oidc_admin_impersonate_user_id")
+ if user_id_params:
+ if await self.is_server_admin(requester):
+ user_id_str = user_id_params[0].decode("ascii")
+ impersonated_user_id = UserID.from_string(user_id_str)
+ logging.info(f"Admin impersonation of user {user_id_str}")
+ requester = create_requester(
+ user_id=impersonated_user_id,
+ scope=[SCOPE_MATRIX_API],
+ authenticated_entity=requester.user.to_string(),
+ )
+ else:
+ raise AuthError(
+ 401,
+ "Impersonation not possible by a non admin user",
+ )
+
# Deny the request if the user account is locked.
if not allow_locked and await self.store.get_user_locked_status(
requester.user.to_string()
@@ -270,14 +309,14 @@ class MSC3861DelegatedAuth(BaseAuth):
# XXX: This is a temporary solution so that the admin API can be called by
# the OIDC provider. This will be removed once we have OIDC client
# credentials grant support in matrix-authentication-service.
- logging.info("Admin toked used")
+ logging.info("Admin token used")
# XXX: that user doesn't exist and won't be provisioned.
# This is mostly fine for admin calls, but we should also think about doing
# requesters without a user_id.
admin_user = UserID("__oidc_admin", self._hostname)
return create_requester(
user_id=admin_user,
- scope=["urn:synapse:admin:*"],
+ scope=[SCOPE_SYNAPSE_ADMIN],
)
try:
@@ -399,3 +438,16 @@ class MSC3861DelegatedAuth(BaseAuth):
scope=scope,
is_guest=(has_guest_scope and not has_user_scope),
)
+
+ def invalidate_cached_tokens(self, keys: List[str]) -> None:
+ """
+ Invalidate the entry(s) in the introspection token cache corresponding to the given key
+ """
+ for key in keys:
+ self._token_cache.invalidate(key)
+
+ def invalidate_token_cache(self) -> None:
+ """
+ Invalidate the entire token cache.
+ """
+ self._token_cache.invalidate_all()
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index dc79efcc14..d25e3548e0 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -91,6 +91,7 @@ from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.storage.databases.main.stats import StatsStore
from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.storage.databases.main.tags import TagsWorkerStore
+from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
from synapse.storage.databases.main.transactions import TransactionWorkerStore
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
from synapse.storage.databases.main.user_directory import UserDirectoryStore
@@ -144,6 +145,7 @@ class GenericWorkerStore(
TransactionWorkerStore,
LockStore,
SessionStore,
+ TaskSchedulerWorkerStore,
):
# Properties that multiple storage classes define. Tell mypy what the
# expected type is.
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index ac9449b18f..277ea4675b 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -173,6 +173,13 @@ class MSC3861:
("enable_registration",),
)
+ # We only need to test the user consent version, as if it must be set if the user_consent section was present in the config
+ if root.consent.user_consent_version is not None:
+ raise ConfigError(
+ "User consent cannot be enabled when OAuth delegation is enabled",
+ ("user_consent",),
+ )
+
if (
root.oidc.oidc_enabled
or root.saml2.saml2_enabled
@@ -216,6 +223,12 @@ class MSC3861:
("session_lifetime",),
)
+ if root.registration.enable_3pid_changes:
+ raise ConfigError(
+ "enable_3pid_changes cannot be enabled when OAuth delegation is enabled",
+ ("enable_3pid_changes",),
+ )
+
@attr.s(auto_attribs=True, frozen=True, slots=True)
class MSC3866Config:
diff --git a/synapse/config/oidc.py b/synapse/config/oidc.py
index 77c1d1dc8e..574d6afb95 100644
--- a/synapse/config/oidc.py
+++ b/synapse/config/oidc.py
@@ -280,6 +280,20 @@ def _parse_oidc_config_dict(
for x in oidc_config.get("attribute_requirements", [])
]
+ # Read from either `client_secret_path` or `client_secret`. If both exist, error.
+ client_secret = oidc_config.get("client_secret")
+ client_secret_path = oidc_config.get("client_secret_path")
+ if client_secret_path is not None:
+ if client_secret is None:
+ client_secret = read_file(
+ client_secret_path, config_path + ("client_secret_path",)
+ ).rstrip("\n")
+ else:
+ raise ConfigError(
+ "Cannot specify both client_secret and client_secret_path",
+ config_path + ("client_secret",),
+ )
+
return OidcProviderConfig(
idp_id=idp_id,
idp_name=oidc_config.get("idp_name", "OIDC"),
@@ -288,7 +302,7 @@ def _parse_oidc_config_dict(
discover=oidc_config.get("discover", True),
issuer=oidc_config["issuer"],
client_id=oidc_config["client_id"],
- client_secret=oidc_config.get("client_secret"),
+ client_secret=client_secret,
client_secret_jwt_key=client_secret_jwt_key,
client_auth_method=oidc_config.get("client_auth_method", "client_secret_basic"),
pkce_method=oidc_config.get("pkce_method", "auto"),
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index df1d83dfaa..b8ad6fbc06 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -133,7 +133,16 @@ class RegistrationConfig(Config):
self.enable_set_displayname = config.get("enable_set_displayname", True)
self.enable_set_avatar_url = config.get("enable_set_avatar_url", True)
- self.enable_3pid_changes = config.get("enable_3pid_changes", True)
+
+ # The default value of enable_3pid_changes is True, unless msc3861 is enabled.
+ msc3861_enabled = (
+ (config.get("experimental_features") or {})
+ .get("msc3861", {})
+ .get("enabled", False)
+ )
+ self.enable_3pid_changes = config.get(
+ "enable_3pid_changes", not msc3861_enabled
+ )
self.disable_msisdn_registration = config.get(
"disable_msisdn_registration", False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2b93b8c621..29cd45550a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -60,6 +60,7 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.validator import EventValidator
from synapse.federation.federation_client import InvalidResponseError
+from synapse.handlers.pagination import PURGE_PAGINATION_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import nested_logging_context
from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
@@ -152,6 +153,7 @@ class FederationHandler:
self._device_handler = hs.get_device_handler()
self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
self._notifier = hs.get_notifier()
+ self._worker_locks = hs.get_worker_locks_handler()
self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
hs
@@ -200,7 +202,7 @@ class FederationHandler:
@trace
@tag_args
async def maybe_backfill(
- self, room_id: str, current_depth: int, limit: int
+ self, room_id: str, current_depth: int, limit: int, record_time: bool = True
) -> bool:
"""Checks the database to see if we should backfill before paginating,
and if so do.
@@ -213,21 +215,25 @@ class FederationHandler:
limit: The number of events that the pagination request will
return. This is used as part of the heuristic to decide if we
should back paginate.
+ record_time: Whether to record the time it takes to backfill.
Returns:
True if we actually tried to backfill something, otherwise False.
"""
# Starting the processing time here so we can include the room backfill
# linearizer lock queue in the timing
- processing_start_time = self.clock.time_msec()
+ processing_start_time = self.clock.time_msec() if record_time else 0
async with self._room_backfill.queue(room_id):
- return await self._maybe_backfill_inner(
- room_id,
- current_depth,
- limit,
- processing_start_time=processing_start_time,
- )
+ async with self._worker_locks.acquire_read_write_lock(
+ PURGE_PAGINATION_LOCK_NAME, room_id, write=False
+ ):
+ return await self._maybe_backfill_inner(
+ room_id,
+ current_depth,
+ limit,
+ processing_start_time=processing_start_time,
+ )
@trace
@tag_args
@@ -305,12 +311,21 @@ class FederationHandler:
# of history that extends all the way back to where we are currently paginating
# and it's within the 100 events that are returned from `/backfill`.
if not sorted_backfill_points and current_depth != MAX_DEPTH:
+ # Check that we actually have later backfill points, if not just return.
+ have_later_backfill_points = await self.store.get_backfill_points_in_room(
+ room_id=room_id,
+ current_depth=MAX_DEPTH,
+ limit=1,
+ )
+ if not have_later_backfill_points:
+ return False
+
logger.debug(
"_maybe_backfill_inner: all backfill points are *after* current depth. Trying again with later backfill points."
)
run_as_background_process(
"_maybe_backfill_inner_anyway_with_max_depth",
- self._maybe_backfill_inner,
+ self.maybe_backfill,
room_id=room_id,
# We use `MAX_DEPTH` so that we find all backfill points next
# time (all events are below the `MAX_DEPTH`)
@@ -319,7 +334,7 @@ class FederationHandler:
# We don't want to start another timing observation from this
# nested recursive call. The top-most call can record the time
# overall otherwise the smaller one will throw off the results.
- processing_start_time=None,
+ record_time=False,
)
# We return `False` because we're backfilling in the background and there is
# no new events immediately for the caller to know about yet.
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 1be6ebc6d9..e5ac9096cc 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -487,155 +487,150 @@ class PaginationHandler:
room_token = from_token.room_key
- async with self._worker_locks.acquire_read_write_lock(
- PURGE_PAGINATION_LOCK_NAME, room_id, write=False
- ):
- (membership, member_event_id) = (None, None)
- if not use_admin_priviledge:
- (
- membership,
- member_event_id,
- ) = await self.auth.check_user_in_room_or_world_readable(
- room_id, requester, allow_departed_users=True
+ (membership, member_event_id) = (None, None)
+ if not use_admin_priviledge:
+ (
+ membership,
+ member_event_id,
+ ) = await self.auth.check_user_in_room_or_world_readable(
+ room_id, requester, allow_departed_users=True
+ )
+
+ if pagin_config.direction == Direction.BACKWARDS:
+ # if we're going backwards, we might need to backfill. This
+ # requires that we have a topo token.
+ if room_token.topological:
+ curr_topo = room_token.topological
+ else:
+ curr_topo = await self.store.get_current_topological_token(
+ room_id, room_token.stream
)
- if pagin_config.direction == Direction.BACKWARDS:
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- curr_topo = room_token.topological
- else:
- curr_topo = await self.store.get_current_topological_token(
- room_id, room_token.stream
- )
+ # If they have left the room then clamp the token to be before
+ # they left the room, to save the effort of loading from the
+ # database.
+ if (
+ pagin_config.direction == Direction.BACKWARDS
+ and not use_admin_priviledge
+ and membership == Membership.LEAVE
+ ):
+ # This is only None if the room is world_readable, in which case
+ # "Membership.JOIN" would have been returned and we should never hit
+ # this branch.
+ assert member_event_id
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
- if (
- pagin_config.direction == Direction.BACKWARDS
- and not use_admin_priviledge
- and membership == Membership.LEAVE
- ):
- # This is only None if the room is world_readable, in which case
- # "Membership.JOIN" would have been returned and we should never hit
- # this branch.
- assert member_event_id
+ leave_token = await self.store.get_topological_token_for_event(
+ member_event_id
+ )
+ assert leave_token.topological is not None
- leave_token = await self.store.get_topological_token_for_event(
- member_event_id
+ if leave_token.topological < curr_topo:
+ from_token = from_token.copy_and_replace(
+ StreamKeyType.ROOM, leave_token
)
- assert leave_token.topological is not None
- if leave_token.topological < curr_topo:
- from_token = from_token.copy_and_replace(
- StreamKeyType.ROOM, leave_token
- )
+ to_room_key = None
+ if pagin_config.to_token:
+ to_room_key = pagin_config.to_token.room_key
+
+ # Initially fetch the events from the database. With any luck, we can return
+ # these without blocking on backfill (handled below).
+ events, next_key = await self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
+ limit=pagin_config.limit,
+ event_filter=event_filter,
+ )
- to_room_key = None
- if pagin_config.to_token:
- to_room_key = pagin_config.to_token.room_key
-
- # Initially fetch the events from the database. With any luck, we can return
- # these without blocking on backfill (handled below).
- events, next_key = await self.store.paginate_room_events(
- room_id=room_id,
- from_key=from_token.room_key,
- to_key=to_room_key,
- direction=pagin_config.direction,
- limit=pagin_config.limit,
- event_filter=event_filter,
+ if pagin_config.direction == Direction.BACKWARDS:
+ # We use a `Set` because there can be multiple events at a given depth
+ # and we only care about looking at the unique continum of depths to
+ # find gaps.
+ event_depths: Set[int] = {event.depth for event in events}
+ sorted_event_depths = sorted(event_depths)
+
+ # Inspect the depths of the returned events to see if there are any gaps
+ found_big_gap = False
+ number_of_gaps = 0
+ previous_event_depth = (
+ sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
)
-
- if pagin_config.direction == Direction.BACKWARDS:
- # We use a `Set` because there can be multiple events at a given depth
- # and we only care about looking at the unique continum of depths to
- # find gaps.
- event_depths: Set[int] = {event.depth for event in events}
- sorted_event_depths = sorted(event_depths)
-
- # Inspect the depths of the returned events to see if there are any gaps
- found_big_gap = False
- number_of_gaps = 0
- previous_event_depth = (
- sorted_event_depths[0] if len(sorted_event_depths) > 0 else 0
- )
- for event_depth in sorted_event_depths:
- # We don't expect a negative depth but we'll just deal with it in
- # any case by taking the absolute value to get the true gap between
- # any two integers.
- depth_gap = abs(event_depth - previous_event_depth)
- # A `depth_gap` of 1 is a normal continuous chain to the next event
- # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
- # also possible there is no event at a given depth but we can't ever
- # know that for sure)
- if depth_gap > 1:
- number_of_gaps += 1
-
- # We only tolerate a small number single-event long gaps in the
- # returned events because those are most likely just events we've
- # failed to pull in the past. Anything longer than that is probably
- # a sign that we're missing a decent chunk of history and we should
- # try to backfill it.
- #
- # XXX: It's possible we could tolerate longer gaps if we checked
- # that a given events `prev_events` is one that has failed pull
- # attempts and we could just treat it like a dead branch of history
- # for now or at least something that we don't need the block the
- # client on to try pulling.
- #
- # XXX: If we had something like MSC3871 to indicate gaps in the
- # timeline to the client, we could also get away with any sized gap
- # and just have the client refetch the holes as they see fit.
- if depth_gap > 2:
- found_big_gap = True
- break
- previous_event_depth = event_depth
-
- # Backfill in the foreground if we found a big gap, have too many holes,
- # or we don't have enough events to fill the limit that the client asked
- # for.
- missing_too_many_events = (
- number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
+ for event_depth in sorted_event_depths:
+ # We don't expect a negative depth but we'll just deal with it in
+ # any case by taking the absolute value to get the true gap between
+ # any two integers.
+ depth_gap = abs(event_depth - previous_event_depth)
+ # A `depth_gap` of 1 is a normal continuous chain to the next event
+ # (1 <-- 2 <-- 3) so anything larger indicates a missing event (it's
+ # also possible there is no event at a given depth but we can't ever
+ # know that for sure)
+ if depth_gap > 1:
+ number_of_gaps += 1
+
+ # We only tolerate a small number single-event long gaps in the
+ # returned events because those are most likely just events we've
+ # failed to pull in the past. Anything longer than that is probably
+ # a sign that we're missing a decent chunk of history and we should
+ # try to backfill it.
+ #
+ # XXX: It's possible we could tolerate longer gaps if we checked
+ # that a given events `prev_events` is one that has failed pull
+ # attempts and we could just treat it like a dead branch of history
+ # for now or at least something that we don't need the block the
+ # client on to try pulling.
+ #
+ # XXX: If we had something like MSC3871 to indicate gaps in the
+ # timeline to the client, we could also get away with any sized gap
+ # and just have the client refetch the holes as they see fit.
+ if depth_gap > 2:
+ found_big_gap = True
+ break
+ previous_event_depth = event_depth
+
+ # Backfill in the foreground if we found a big gap, have too many holes,
+ # or we don't have enough events to fill the limit that the client asked
+ # for.
+ missing_too_many_events = (
+ number_of_gaps > BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD
+ )
+ not_enough_events_to_fill_response = len(events) < pagin_config.limit
+ if (
+ found_big_gap
+ or missing_too_many_events
+ or not_enough_events_to_fill_response
+ ):
+ did_backfill = await self.hs.get_federation_handler().maybe_backfill(
+ room_id,
+ curr_topo,
+ limit=pagin_config.limit,
)
- not_enough_events_to_fill_response = len(events) < pagin_config.limit
- if (
- found_big_gap
- or missing_too_many_events
- or not_enough_events_to_fill_response
- ):
- did_backfill = (
- await self.hs.get_federation_handler().maybe_backfill(
- room_id,
- curr_topo,
- limit=pagin_config.limit,
- )
- )
- # If we did backfill something, refetch the events from the database to
- # catch anything new that might have been added since we last fetched.
- if did_backfill:
- events, next_key = await self.store.paginate_room_events(
- room_id=room_id,
- from_key=from_token.room_key,
- to_key=to_room_key,
- direction=pagin_config.direction,
- limit=pagin_config.limit,
- event_filter=event_filter,
- )
- else:
- # Otherwise, we can backfill in the background for eventual
- # consistency's sake but we don't need to block the client waiting
- # for a costly federation call and processing.
- run_as_background_process(
- "maybe_backfill_in_the_background",
- self.hs.get_federation_handler().maybe_backfill,
- room_id,
- curr_topo,
+ # If we did backfill something, refetch the events from the database to
+ # catch anything new that might have been added since we last fetched.
+ if did_backfill:
+ events, next_key = await self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=from_token.room_key,
+ to_key=to_room_key,
+ direction=pagin_config.direction,
limit=pagin_config.limit,
+ event_filter=event_filter,
)
+ else:
+ # Otherwise, we can backfill in the background for eventual
+ # consistency's sake but we don't need to block the client waiting
+ # for a costly federation call and processing.
+ run_as_background_process(
+ "maybe_backfill_in_the_background",
+ self.hs.get_federation_handler().maybe_backfill,
+ room_id,
+ curr_topo,
+ limit=pagin_config.limit,
+ )
- next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
+ next_token = from_token.copy_and_replace(StreamKeyType.ROOM, next_key)
# if no events are returned from pagination, that implies
# we have reached the end of the available events.
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 139f57cf86..3b88dc68ea 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -14,7 +14,9 @@
"""A replication client for use by synapse workers.
"""
import logging
-from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Dict, Iterable, Optional, Set, Tuple
+
+from sortedcontainers import SortedList
from twisted.internet import defer
from twisted.internet.defer import Deferred
@@ -26,6 +28,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.streams import (
AccountDataStream,
+ CachesStream,
DeviceListsStream,
PushersStream,
PushRulesStream,
@@ -73,6 +76,7 @@ class ReplicationDataHandler:
self._instance_name = hs.get_instance_name()
self._typing_handler = hs.get_typing_handler()
self._state_storage_controller = hs.get_storage_controllers().state
+ self.auth = hs.get_auth()
self._notify_pushers = hs.config.worker.start_pushers
self._pusher_pool = hs.get_pusherpool()
@@ -84,7 +88,9 @@ class ReplicationDataHandler:
# Map from stream and instance to list of deferreds waiting for the stream to
# arrive at a particular position. The lists are sorted by stream position.
- self._streams_to_waiters: Dict[Tuple[str, str], List[Tuple[int, Deferred]]] = {}
+ self._streams_to_waiters: Dict[
+ Tuple[str, str], SortedList[Tuple[int, Deferred]]
+ ] = {}
async def on_rdata(
self, stream_name: str, instance_name: str, token: int, rows: list
@@ -218,6 +224,16 @@ class ReplicationDataHandler:
self._state_storage_controller.notify_event_un_partial_stated(
row.event_id
)
+ # invalidate the introspection token cache
+ elif stream_name == CachesStream.NAME:
+ for row in rows:
+ if row.cache_func == "introspection_token_invalidation":
+ if row.keys[0] is None:
+ # invalidate the whole cache
+ # mypy ignore - the token cache is defined on MSC3861DelegatedAuth
+ self.auth.invalidate_token_cache() # type: ignore[attr-defined]
+ else:
+ self.auth.invalidate_cached_tokens(row.keys) # type: ignore[attr-defined]
await self._presence_handler.process_replication_rows(
stream_name, instance_name, token, rows
@@ -226,7 +242,9 @@ class ReplicationDataHandler:
# Notify any waiting deferreds. The list is ordered by position so we
# just iterate through the list until we reach a position that is
# greater than the received row position.
- waiting_list = self._streams_to_waiters.get((stream_name, instance_name), [])
+ waiting_list = self._streams_to_waiters.get((stream_name, instance_name))
+ if not waiting_list:
+ return
# Index of first item with a position after the current token, i.e we
# have called all deferreds before this index. If not overwritten by
@@ -250,7 +268,7 @@ class ReplicationDataHandler:
# Drop all entries in the waiting list that were called in the above
# loop. (This maintains the order so no need to resort)
- waiting_list[:] = waiting_list[index_of_first_deferred_not_called:]
+ del waiting_list[:index_of_first_deferred_not_called]
for deferred in deferreds_to_callback:
try:
@@ -310,11 +328,10 @@ class ReplicationDataHandler:
)
waiting_list = self._streams_to_waiters.setdefault(
- (stream_name, instance_name), []
+ (stream_name, instance_name), SortedList(key=lambda t: t[0])
)
- waiting_list.append((position, deferred))
- waiting_list.sort(key=lambda t: t[0])
+ waiting_list.add((position, deferred))
# We measure here to get in flight counts and average waiting time.
with Measure(self._clock, "repl.wait_for_stream_position"):
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index fe8177ed4d..55e752fda8 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -47,6 +47,7 @@ from synapse.rest.admin.federation import (
ListDestinationsRestServlet,
)
from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
+from synapse.rest.admin.oidc import OIDCTokenRevocationRestServlet
from synapse.rest.admin.registration_tokens import (
ListRegistrationTokensRestServlet,
NewRegistrationTokenRestServlet,
@@ -297,6 +298,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
BackgroundUpdateRestServlet(hs).register(http_server)
BackgroundUpdateStartJobRestServlet(hs).register(http_server)
ExperimentalFeaturesRestServlet(hs).register(http_server)
+ if hs.config.experimental.msc3861.enabled:
+ OIDCTokenRevocationRestServlet(hs).register(http_server)
def register_servlets_for_client_rest_resource(
diff --git a/synapse/rest/admin/oidc.py b/synapse/rest/admin/oidc.py
new file mode 100644
index 0000000000..64d2d40550
--- /dev/null
+++ b/synapse/rest/admin/oidc.py
@@ -0,0 +1,55 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from http import HTTPStatus
+from typing import TYPE_CHECKING, Dict, Tuple
+
+from synapse.http.servlet import RestServlet
+from synapse.http.site import SynapseRequest
+from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
+class OIDCTokenRevocationRestServlet(RestServlet):
+ """
+ Delete a given token introspection response - identified by the `jti` field - from the
+ introspection token cache when a token is revoked at the authorizing server
+ """
+
+ PATTERNS = admin_patterns("/OIDC_token_revocation/(?P<token_id>[^/]*)")
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__()
+ auth = hs.get_auth()
+
+ # If this endpoint is loaded then we must have enabled delegated auth.
+ from synapse.api.auth.msc3861_delegated import MSC3861DelegatedAuth
+
+ assert isinstance(auth, MSC3861DelegatedAuth)
+
+ self.auth = auth
+ self.store = hs.get_datastores().main
+
+ async def on_DELETE(
+ self, request: SynapseRequest, token_id: str
+ ) -> Tuple[HTTPStatus, Dict]:
+ await assert_requester_is_admin(self.auth, request)
+
+ self.auth._token_cache.invalidate(token_id)
+
+ # make sure we invalidate the cache on any workers
+ await self.store.stream_introspection_token_invalidation((token_id,))
+
+ return HTTPStatus.OK, {}
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 04d9ef25b7..240e6254b0 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -109,6 +109,8 @@ class UsersRestServletV2(RestServlet):
)
deactivated = parse_boolean(request, "deactivated", default=False)
+ admins = parse_boolean(request, "admins")
+
# If support for MSC3866 is not enabled, apply no filtering based on the
# `approved` column.
if self._msc3866_enabled:
@@ -146,6 +148,7 @@ class UsersRestServletV2(RestServlet):
name,
guests,
deactivated,
+ admins,
order_by,
direction,
approved,
diff --git a/synapse/server.py b/synapse/server.py
index e753ff0377..7cdd3ea3c2 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -142,6 +142,7 @@ from synapse.util.distributor import Distributor
from synapse.util.macaroons import MacaroonGenerator
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import random_string
+from synapse.util.task_scheduler import TaskScheduler
logger = logging.getLogger(__name__)
@@ -360,6 +361,7 @@ class HomeServer(metaclass=abc.ABCMeta):
"""
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
getattr(self, "get_" + i + "_handler")()
+ self.get_task_scheduler()
def get_reactor(self) -> ISynapseReactor:
"""
@@ -912,6 +914,9 @@ class HomeServer(metaclass=abc.ABCMeta):
"""Usage metrics shared between phone home stats and the prometheus exporter."""
return CommonUsageMetricsManager(self)
- @cache_in_self
def get_worker_locks_handler(self) -> WorkerLocksHandler:
return WorkerLocksHandler(self)
+
+ @cache_in_self
+ def get_task_scheduler(self) -> TaskScheduler:
+ return TaskScheduler(self)
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index be67d1ff22..a85633efcd 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -70,6 +70,7 @@ from .state import StateStore
from .stats import StatsStore
from .stream import StreamWorkerStore
from .tags import TagsStore
+from .task_scheduler import TaskSchedulerWorkerStore
from .transactions import TransactionWorkerStore
from .ui_auth import UIAuthStore
from .user_directory import UserDirectoryStore
@@ -127,6 +128,7 @@ class DataStore(
CacheInvalidationWorkerStore,
LockStore,
SessionStore,
+ TaskSchedulerWorkerStore,
):
def __init__(
self,
@@ -168,6 +170,7 @@ class DataStore(
name: Optional[str] = None,
guests: bool = True,
deactivated: bool = False,
+ admins: Optional[bool] = None,
order_by: str = UserSortOrder.NAME.value,
direction: Direction = Direction.FORWARDS,
approved: bool = True,
@@ -184,6 +187,9 @@ class DataStore(
name: search for local part of user_id or display name
guests: whether to in include guest users
deactivated: whether to include deactivated users
+ admins: Optional flag to filter admins. If true, only admins are queried.
+ if false, admins are excluded from the query. When it is
+ none (the default), both admins and none-admins are queried.
order_by: the sort order of the returned list
direction: sort ascending or descending
approved: whether to include approved users
@@ -220,6 +226,12 @@ class DataStore(
if not deactivated:
filters.append("deactivated = 0")
+ if admins is not None:
+ if admins:
+ filters.append("admin = 1")
+ else:
+ filters.append("admin = 0")
+
if not approved:
# We ignore NULL values for the approved flag because these should only
# be already existing users that we consider as already approved.
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2fbd389c71..18905e07b6 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -584,6 +584,19 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
else:
return 0
+ async def stream_introspection_token_invalidation(
+ self, key: Tuple[Optional[str]]
+ ) -> None:
+ """
+ Stream an invalidation request for the introspection token cache to workers
+
+ Args:
+ key: token_id of the introspection token to remove from the cache
+ """
+ await self.send_invalidation_to_replication(
+ "introspection_token_invalidation", key
+ )
+
@wrap_as_background_process("clean_up_old_cache_invalidations")
async def _clean_up_cache_invalidation_wrapper(self) -> None:
"""
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index e4162f846b..fa69a4a298 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -33,6 +33,7 @@ from typing_extensions import Literal
from synapse.api.constants import EduTypes
from synapse.api.errors import Codes, StoreError
+from synapse.config.homeserver import HomeServerConfig
from synapse.logging.opentracing import (
get_active_span_text_map,
set_tag,
@@ -1663,6 +1664,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
self.device_id_exists_cache: LruCache[
Tuple[str, str], Literal[True]
] = LruCache(cache_name="device_id_exists", max_size=10000)
+ self.config: HomeServerConfig = hs.config
async def store_device(
self,
@@ -1784,6 +1786,13 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
for device_id in device_ids:
self.device_id_exists_cache.invalidate((user_id, device_id))
+ # TODO: don't nuke the entire cache once there is a way to associate
+ # device_id -> introspection_token
+ if self.config.experimental.msc3861.enabled:
+ # mypy ignore - the token cache is defined on MSC3861DelegatedAuth
+ self.auth._token_cache.invalidate_all() # type: ignore[attr-defined]
+ await self.stream_introspection_token_invalidation((None,))
+
async def update_device(
self, user_id: str, device_id: str, new_display_name: Optional[str] = None
) -> None:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 534dc32413..fab7008a8f 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -452,33 +452,56 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# sets.
seen_chains: Set[int] = set()
- sql = """
- SELECT event_id, chain_id, sequence_number
- FROM event_auth_chains
- WHERE %s
- """
- for batch in batch_iter(initial_events, 1000):
- clause, args = make_in_list_sql_clause(
- txn.database_engine, "event_id", batch
- )
- txn.execute(sql % (clause,), args)
+ # Fetch the chain cover index for the initial set of events we're
+ # considering.
+ def fetch_chain_info(events_to_fetch: Collection[str]) -> None:
+ sql = """
+ SELECT event_id, chain_id, sequence_number
+ FROM event_auth_chains
+ WHERE %s
+ """
+ for batch in batch_iter(events_to_fetch, 1000):
+ clause, args = make_in_list_sql_clause(
+ txn.database_engine, "event_id", batch
+ )
+ txn.execute(sql % (clause,), args)
- for event_id, chain_id, sequence_number in txn:
- chain_info[event_id] = (chain_id, sequence_number)
- seen_chains.add(chain_id)
- chain_to_event.setdefault(chain_id, {})[sequence_number] = event_id
+ for event_id, chain_id, sequence_number in txn:
+ chain_info[event_id] = (chain_id, sequence_number)
+ seen_chains.add(chain_id)
+ chain_to_event.setdefault(chain_id, {})[sequence_number] = event_id
+
+ fetch_chain_info(initial_events)
# Check that we actually have a chain ID for all the events.
events_missing_chain_info = initial_events.difference(chain_info)
+
+ # The result set to return, i.e. the auth chain difference.
+ result: Set[str] = set()
+
if events_missing_chain_info:
- # This can happen due to e.g. downgrade/upgrade of the server. We
- # raise an exception and fall back to the previous algorithm.
- logger.info(
- "Unexpectedly found that events don't have chain IDs in room %s: %s",
+ # For some reason we have events we haven't calculated the chain
+ # index for, so we need to handle those separately. This should only
+ # happen for older rooms where the server doesn't have all the auth
+ # events.
+ result = self._fixup_auth_chain_difference_sets(
+ txn,
room_id,
- events_missing_chain_info,
+ state_sets=state_sets,
+ events_missing_chain_info=events_missing_chain_info,
+ events_that_have_chain_index=chain_info,
)
- raise _NoChainCoverIndex(room_id)
+
+ # We now need to refetch any events that we have added to the state
+ # sets.
+ new_events_to_fetch = {
+ event_id
+ for state_set in state_sets
+ for event_id in state_set
+ if event_id not in initial_events
+ }
+
+ fetch_chain_info(new_events_to_fetch)
# Corresponds to `state_sets`, except as a map from chain ID to max
# sequence number reachable from the state set.
@@ -487,8 +510,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
chains: Dict[int, int] = {}
set_to_chain.append(chains)
- for event_id in state_set:
- chain_id, seq_no = chain_info[event_id]
+ for state_id in state_set:
+ chain_id, seq_no = chain_info[state_id]
chains[chain_id] = max(seq_no, chains.get(chain_id, 0))
@@ -532,7 +555,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# from *any* state set and the minimum sequence number reachable from
# *all* state sets. Events in that range are in the auth chain
# difference.
- result = set()
# Mapping from chain ID to the range of sequence numbers that should be
# pulled from the database.
@@ -588,6 +610,122 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
return result
+ def _fixup_auth_chain_difference_sets(
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ state_sets: List[Set[str]],
+ events_missing_chain_info: Set[str],
+ events_that_have_chain_index: Collection[str],
+ ) -> Set[str]:
+ """Helper for `_get_auth_chain_difference_using_cover_index_txn` to
+ handle the case where we haven't calculated the chain cover index for
+ all events.
+
+ This modifies `state_sets` so that they only include events that have a
+ chain cover index, and returns a set of event IDs that are part of the
+ auth difference.
+ """
+
+ # This works similarly to the handling of unpersisted events in
+ # `synapse.state.v2_get_auth_chain_difference`. We uses the observation
+ # that if you can split the set of events into two classes X and Y,
+ # where no events in Y have events in X in their auth chain, then we can
+ # calculate the auth difference by considering X and Y separately.
+ #
+ # We do this in three steps:
+ # 1. Compute the set of events without chain cover index belonging to
+ # the auth difference.
+ # 2. Replacing the un-indexed events in the state_sets with their auth
+ # events, recursively, until the state_sets contain only indexed
+ # events. We can then calculate the auth difference of those state
+ # sets using the chain cover index.
+ # 3. Add the results of 1 and 2 together.
+
+ # By construction we know that all events that we haven't persisted the
+ # chain cover index for are contained in
+ # `event_auth_chain_to_calculate`, so we pull out the events from those
+ # rather than doing recursive queries to walk the auth chain.
+ #
+ # We pull out those events with their auth events, which gives us enough
+ # information to construct the auth chain of an event up to auth events
+ # that have the chain cover index.
+ sql = """
+ SELECT tc.event_id, ea.auth_id, eac.chain_id IS NOT NULL
+ FROM event_auth_chain_to_calculate AS tc
+ LEFT JOIN event_auth AS ea USING (event_id)
+ LEFT JOIN event_auth_chains AS eac ON (ea.auth_id = eac.event_id)
+ WHERE tc.room_id = ?
+ """
+ txn.execute(sql, (room_id,))
+ event_to_auth_ids: Dict[str, Set[str]] = {}
+ events_that_have_chain_index = set(events_that_have_chain_index)
+ for event_id, auth_id, auth_id_has_chain in txn:
+ s = event_to_auth_ids.setdefault(event_id, set())
+ if auth_id is not None:
+ s.add(auth_id)
+ if auth_id_has_chain:
+ events_that_have_chain_index.add(auth_id)
+
+ if events_missing_chain_info - event_to_auth_ids.keys():
+ # Uh oh, we somehow haven't correctly done the chain cover index,
+ # bail and fall back to the old method.
+ logger.info(
+ "Unexpectedly found that events don't have chain IDs in room %s: %s",
+ room_id,
+ events_missing_chain_info - event_to_auth_ids.keys(),
+ )
+ raise _NoChainCoverIndex(room_id)
+
+ # Create a map from event IDs we care about to their partial auth chain.
+ event_id_to_partial_auth_chain: Dict[str, Set[str]] = {}
+ for event_id, auth_ids in event_to_auth_ids.items():
+ if not any(event_id in state_set for state_set in state_sets):
+ continue
+
+ processing = set(auth_ids)
+ to_add = set()
+ while processing:
+ auth_id = processing.pop()
+ to_add.add(auth_id)
+
+ sub_auth_ids = event_to_auth_ids.get(auth_id)
+ if sub_auth_ids is None:
+ continue
+
+ processing.update(sub_auth_ids - to_add)
+
+ event_id_to_partial_auth_chain[event_id] = to_add
+
+ # Now we do two things:
+ # 1. Update the state sets to only include indexed events; and
+ # 2. Create a new list containing the auth chains of the un-indexed
+ # events
+ unindexed_state_sets: List[Set[str]] = []
+ for state_set in state_sets:
+ unindexed_state_set = set()
+ for event_id, auth_chain in event_id_to_partial_auth_chain.items():
+ if event_id not in state_set:
+ continue
+
+ unindexed_state_set.add(event_id)
+
+ state_set.discard(event_id)
+ state_set.difference_update(auth_chain)
+ for auth_id in auth_chain:
+ if auth_id in events_that_have_chain_index:
+ state_set.add(auth_id)
+ else:
+ unindexed_state_set.add(auth_id)
+
+ unindexed_state_sets.append(unindexed_state_set)
+
+ # Calculate and return the auth difference of the un-indexed events.
+ union = unindexed_state_sets[0].union(*unindexed_state_sets[1:])
+ intersection = unindexed_state_sets[0].intersection(*unindexed_state_sets[1:])
+
+ return union - intersection
+
def _get_auth_chain_difference_txn(
self, txn: LoggingTransaction, state_sets: List[Set[str]]
) -> Set[str]:
diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
new file mode 100644
index 0000000000..1fb3180c3c
--- /dev/null
+++ b/synapse/storage/databases/main/task_scheduler.py
@@ -0,0 +1,202 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+
+from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.storage.database import (
+ DatabasePool,
+ LoggingDatabaseConnection,
+ LoggingTransaction,
+ make_in_list_sql_clause,
+)
+from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
+from synapse.util import json_encoder
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+
+class TaskSchedulerWorkerStore(SQLBaseStore):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
+ super().__init__(database, db_conn, hs)
+
+ @staticmethod
+ def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask:
+ row["status"] = TaskStatus(row["status"])
+ if row["params"] is not None:
+ row["params"] = db_to_json(row["params"])
+ if row["result"] is not None:
+ row["result"] = db_to_json(row["result"])
+ return ScheduledTask(**row)
+
+ async def get_scheduled_tasks(
+ self,
+ *,
+ actions: Optional[List[str]] = None,
+ resource_id: Optional[str] = None,
+ statuses: Optional[List[TaskStatus]] = None,
+ max_timestamp: Optional[int] = None,
+ ) -> List[ScheduledTask]:
+ """Get a list of scheduled tasks from the DB.
+
+ Args:
+ actions: Limit the returned tasks to those specific action names
+ resource_id: Limit the returned tasks to the specific resource id, if specified
+ statuses: Limit the returned tasks to the specific statuses
+ max_timestamp: Limit the returned tasks to the ones that have
+ a timestamp inferior to the specified one
+
+ Returns: a list of `ScheduledTask`, ordered by increasing timestamps
+ """
+
+ def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
+ clauses: List[str] = []
+ args: List[Any] = []
+ if resource_id:
+ clauses.append("resource_id = ?")
+ args.append(resource_id)
+ if actions is not None:
+ clause, temp_args = make_in_list_sql_clause(
+ txn.database_engine, "action", actions
+ )
+ clauses.append(clause)
+ args.extend(temp_args)
+ if statuses is not None:
+ clause, temp_args = make_in_list_sql_clause(
+ txn.database_engine, "status", statuses
+ )
+ clauses.append(clause)
+ args.extend(temp_args)
+ if max_timestamp is not None:
+ clauses.append("timestamp <= ?")
+ args.append(max_timestamp)
+
+ sql = "SELECT * FROM scheduled_tasks"
+ if clauses:
+ sql = sql + " WHERE " + " AND ".join(clauses)
+
+ sql = sql + "ORDER BY timestamp"
+
+ txn.execute(sql, args)
+ return self.db_pool.cursor_to_dict(txn)
+
+ rows = await self.db_pool.runInteraction(
+ "get_scheduled_tasks", get_scheduled_tasks_txn
+ )
+ return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]
+
+ async def insert_scheduled_task(self, task: ScheduledTask) -> None:
+ """Insert a specified `ScheduledTask` in the DB.
+
+ Args:
+ task: the `ScheduledTask` to insert
+ """
+ await self.db_pool.simple_insert(
+ "scheduled_tasks",
+ {
+ "id": task.id,
+ "action": task.action,
+ "status": task.status,
+ "timestamp": task.timestamp,
+ "resource_id": task.resource_id,
+ "params": None
+ if task.params is None
+ else json_encoder.encode(task.params),
+ "result": None
+ if task.result is None
+ else json_encoder.encode(task.result),
+ "error": task.error,
+ },
+ desc="insert_scheduled_task",
+ )
+
+ async def update_scheduled_task(
+ self,
+ id: str,
+ timestamp: int,
+ *,
+ status: Optional[TaskStatus] = None,
+ result: Optional[JsonMapping] = None,
+ error: Optional[str] = None,
+ ) -> bool:
+ """Update a scheduled task in the DB with some new value(s).
+
+ Args:
+ id: id of the `ScheduledTask` to update
+ timestamp: new timestamp of the task
+ status: new status of the task
+ result: new result of the task
+ error: new error of the task
+
+ Returns: `False` if no matching row was found, `True` otherwise
+ """
+ updatevalues: JsonDict = {"timestamp": timestamp}
+ if status is not None:
+ updatevalues["status"] = status
+ if result is not None:
+ updatevalues["result"] = json_encoder.encode(result)
+ if error is not None:
+ updatevalues["error"] = error
+ nb_rows = await self.db_pool.simple_update(
+ "scheduled_tasks",
+ {"id": id},
+ updatevalues,
+ desc="update_scheduled_task",
+ )
+ return nb_rows > 0
+
+ async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
+ """Get a specific `ScheduledTask` from its id.
+
+ Args:
+ id: the id of the task to retrieve
+
+ Returns: the task if available, `None` otherwise
+ """
+ row = await self.db_pool.simple_select_one(
+ table="scheduled_tasks",
+ keyvalues={"id": id},
+ retcols=(
+ "id",
+ "action",
+ "status",
+ "timestamp",
+ "resource_id",
+ "params",
+ "result",
+ "error",
+ ),
+ allow_none=True,
+ desc="get_scheduled_task",
+ )
+
+ return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None
+
+ async def delete_scheduled_task(self, id: str) -> None:
+ """Delete a specific task from its id.
+
+ Args:
+ id: the id of the task to delete
+ """
+ await self.db_pool.simple_delete(
+ "scheduled_tasks",
+ keyvalues={"id": id},
+ desc="delete_scheduled_task",
+ )
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index c3bd36efc9..48e4b0ba3c 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -242,6 +242,8 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
) -> None:
# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
+ # We also upsert when the new retry interval is the same as the existing one,
+ # since it will be the case when `destination_max_retry_interval` is reached.
#
# WARNING: This is executed in autocommit, so we shouldn't add any more
# SQL calls in here (without being very careful).
@@ -257,7 +259,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
WHERE
EXCLUDED.retry_interval = 0
OR destinations.retry_interval IS NULL
- OR destinations.retry_interval < EXCLUDED.retry_interval
+ OR destinations.retry_interval <= EXCLUDED.retry_interval
"""
txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 7de9949a5b..649d3c8e9f 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -113,6 +113,7 @@ Changes in SCHEMA_VERSION = 79
Changes in SCHEMA_VERSION = 80
- The event_txn_id_device_id is always written to for new events.
+ - Add tables for the task scheduler.
"""
diff --git a/synapse/storage/schema/main/delta/80/02_read_write_locks_unlogged.sql.postgres b/synapse/storage/schema/main/delta/80/02_read_write_locks_unlogged.sql.postgres
new file mode 100644
index 0000000000..5b5dbf2687
--- /dev/null
+++ b/synapse/storage/schema/main/delta/80/02_read_write_locks_unlogged.sql.postgres
@@ -0,0 +1,30 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Mark the worker_read_write_locks* tables as UNLOGGED, to increase
+-- performance. This means that we don't replicate the tables, and they get
+-- truncated on a crash. This is acceptable as a) in those cases it's likely
+-- that Synapse needs to be stopped/restarted anyway, and b) the locks are
+-- considered best-effort anyway.
+
+-- We need to remove and recreate the circular foreign key references, as
+-- UNLOGGED tables can't reference normal tables.
+ALTER TABLE worker_read_write_locks_mode DROP CONSTRAINT IF EXISTS worker_read_write_locks_mode_foreign;
+
+ALTER TABLE worker_read_write_locks SET UNLOGGED;
+ALTER TABLE worker_read_write_locks_mode SET UNLOGGED;
+
+ALTER TABLE worker_read_write_locks_mode ADD CONSTRAINT worker_read_write_locks_mode_foreign
+ FOREIGN KEY (lock_name, lock_key, token) REFERENCES worker_read_write_locks(lock_name, lock_key, token) DEFERRABLE INITIALLY DEFERRED;
diff --git a/synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql b/synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql
new file mode 100644
index 0000000000..286d109ed7
--- /dev/null
+++ b/synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql
@@ -0,0 +1,28 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- cf ScheduledTask docstring for the meaning of the fields.
+CREATE TABLE IF NOT EXISTS scheduled_tasks(
+ id TEXT PRIMARY KEY,
+ action TEXT NOT NULL,
+ status TEXT NOT NULL,
+ timestamp BIGINT NOT NULL,
+ resource_id TEXT,
+ params TEXT,
+ result TEXT,
+ error TEXT
+);
+
+CREATE INDEX IF NOT EXISTS scheduled_tasks_status ON scheduled_tasks(status);
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 073f682aca..e750417189 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -15,6 +15,7 @@
import abc
import re
import string
+from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
@@ -969,3 +970,41 @@ class UserProfile(TypedDict):
class RetentionPolicy:
min_lifetime: Optional[int] = None
max_lifetime: Optional[int] = None
+
+
+class TaskStatus(str, Enum):
+ """Status of a scheduled task"""
+
+ # Task is scheduled but not active
+ SCHEDULED = "scheduled"
+ # Task is active and probably running, and if not
+ # will be run on next scheduler loop run
+ ACTIVE = "active"
+ # Task has completed successfully
+ COMPLETE = "complete"
+ # Task is over and either returned a failed status, or had an exception
+ FAILED = "failed"
+
+
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class ScheduledTask:
+ """Description of a scheduled task"""
+
+ # Id used to identify the task
+ id: str
+ # Name of the action to be run by this task
+ action: str
+ # Current status of this task
+ status: TaskStatus
+ # If the status is SCHEDULED then this represents when it should be launched,
+ # otherwise it represents the last time this task got a change of state.
+ # In milliseconds since epoch in system time timezone, usually UTC.
+ timestamp: int
+ # Optionally bind a task to some resource id for easy retrieval
+ resource_id: Optional[str]
+ # Optional parameters that will be passed to the function ran by the task
+ params: Optional[JsonMapping]
+ # Optional result that can be updated by the running task
+ result: Optional[JsonMapping]
+ # Optional error that should be assigned a value when the status is FAILED
+ error: Optional[str]
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 01ad02af67..9a3e10ddee 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -140,6 +140,20 @@ class ExpiringCache(Generic[KT, VT]):
return value.value
+ def invalidate(self, key: KT) -> None:
+ """
+ Remove the given key from the cache.
+ """
+
+ value = self._cache.pop(key, None)
+ if value:
+ if self.iterable:
+ self.metrics.inc_evictions(
+ EvictionReason.invalidation, len(value.value)
+ )
+ else:
+ self.metrics.inc_evictions(EvictionReason.invalidation)
+
def __contains__(self, key: KT) -> bool:
return key in self._cache
@@ -193,6 +207,14 @@ class ExpiringCache(Generic[KT, VT]):
len(self),
)
+ def invalidate_all(self) -> None:
+ """
+ Remove all items from the cache.
+ """
+ keys = set(self._cache.keys())
+ for key in keys:
+ self._cache.pop(key)
+
def __len__(self) -> int:
if self.iterable:
return sum(len(entry.value) for entry in self._cache.values())
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
new file mode 100644
index 0000000000..773a8327f6
--- /dev/null
+++ b/synapse/util/task_scheduler.py
@@ -0,0 +1,364 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple
+
+from prometheus_client import Gauge
+
+from twisted.python.failure import Failure
+
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import JsonMapping, ScheduledTask, TaskStatus
+from synapse.util.stringutils import random_string
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+running_tasks_gauge = Gauge(
+ "synapse_scheduler_running_tasks",
+ "The number of concurrent running tasks handled by the TaskScheduler",
+)
+
+
+class TaskScheduler:
+ """
+ This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background`
+ to launch a background task, or Twisted `deferLater` if we want to do so later on.
+
+ The problem with that is that the tasks will just stop and never be resumed if synapse
+ is stopped for whatever reason.
+
+ How this works:
+ - A function mapped to a named action should first be registered with `register_action`.
+ This function will be called when trying to resuming tasks after a synapse shutdown,
+ so this registration should happen when synapse is initialised, NOT right before scheduling
+ a task.
+ - A task can then be launched using this named action with `schedule_task`. A `params` dict
+ can be passed, and it will be available to the registered function when launched. This task
+ can be launch either now-ish, or later on by giving a `timestamp` parameter.
+
+ The function may call `update_task` at any time to update the `result` of the task,
+ and this can be used to resume the task at a specific point and/or to convey a result to
+ the code launching the task.
+ You can also specify the `result` (and/or an `error`) when returning from the function.
+
+ The reconciliation loop runs every 5 mns, so this is not a precise scheduler. When wanting
+ to launch now, the launch will still not happen before the next loop run.
+
+ Tasks will be run on the worker specified with `run_background_tasks_on` config,
+ or the main one by default.
+ There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already
+ full. In this regard, please take great care that scheduled tasks can actually finished.
+ For now there is no mechanism to stop a running task if it is stuck.
+ """
+
+ # Precision of the scheduler, evaluation of tasks to run will only happen
+ # every `SCHEDULE_INTERVAL_MS` ms
+ SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn
+ # Time before a complete or failed task is deleted from the DB
+ KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week
+ # Maximum number of tasks that can run at the same time
+ MAX_CONCURRENT_RUNNING_TASKS = 10
+ # Time from the last task update after which we will log a warning
+ LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs
+
+ def __init__(self, hs: "HomeServer"):
+ self._store = hs.get_datastores().main
+ self._clock = hs.get_clock()
+ self._running_tasks: Set[str] = set()
+ # A map between action names and their registered function
+ self._actions: Dict[
+ str,
+ Callable[
+ [ScheduledTask, bool],
+ Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
+ ],
+ ] = {}
+ self._run_background_tasks = hs.config.worker.run_background_tasks
+
+ if self._run_background_tasks:
+ self._clock.looping_call(
+ run_as_background_process,
+ TaskScheduler.SCHEDULE_INTERVAL_MS,
+ "handle_scheduled_tasks",
+ self._handle_scheduled_tasks,
+ )
+
+ def register_action(
+ self,
+ function: Callable[
+ [ScheduledTask, bool],
+ Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
+ ],
+ action_name: str,
+ ) -> None:
+ """Register a function to be executed when an action is scheduled with
+ the specified action name.
+
+ Actions need to be registered as early as possible so that a resumed action
+ can find its matching function. It's usually better to NOT do that right before
+ calling `schedule_task` but rather in an `__init__` method.
+
+ Args:
+ function: The function to be executed for this action. The parameters
+ passed to the function when launched are the `ScheduledTask` being run,
+ and a `first_launch` boolean to signal if it's a resumed task or the first
+ launch of it. The function should return a tuple of new `status`, `result`
+ and `error` as specified in `ScheduledTask`.
+ action_name: The name of the action to be associated with the function
+ """
+ self._actions[action_name] = function
+
+ async def schedule_task(
+ self,
+ action: str,
+ *,
+ resource_id: Optional[str] = None,
+ timestamp: Optional[int] = None,
+ params: Optional[JsonMapping] = None,
+ ) -> str:
+ """Schedule a new potentially resumable task. A function matching the specified
+ `action` should have been previously registered with `register_action`.
+
+ Args:
+ action: the name of a previously registered action
+ resource_id: a task can be associated with a resource id to facilitate
+ getting all tasks associated with a specific resource
+ timestamp: if `None`, the task will be launched as soon as possible, otherwise it
+ will be launch as soon as possible after the `timestamp` value.
+ Note that this scheduler is not meant to be precise, and the scheduling
+ could be delayed if too many tasks are already running
+ params: a set of parameters that can be easily accessed from inside the
+ executed function
+
+ Returns:
+ The id of the scheduled task
+ """
+ if action not in self._actions:
+ raise Exception(
+ f"No function associated with action {action} of the scheduled task"
+ )
+
+ if timestamp is None or timestamp < self._clock.time_msec():
+ timestamp = self._clock.time_msec()
+
+ task = ScheduledTask(
+ random_string(16),
+ action,
+ TaskStatus.SCHEDULED,
+ timestamp,
+ resource_id,
+ params,
+ result=None,
+ error=None,
+ )
+ await self._store.insert_scheduled_task(task)
+
+ return task.id
+
+ async def update_task(
+ self,
+ id: str,
+ *,
+ timestamp: Optional[int] = None,
+ status: Optional[TaskStatus] = None,
+ result: Optional[JsonMapping] = None,
+ error: Optional[str] = None,
+ ) -> bool:
+ """Update some task associated values. This is exposed publically so it can
+ be used inside task functions, mainly to update the result and be able to
+ resume a task at a specific step after a restart of synapse.
+
+ It can also be used to stage a task, by setting the `status` to `SCHEDULED` with
+ a new timestamp.
+
+ The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED`
+ are terminal status and can only be set by returning it in the function.
+
+ Args:
+ id: the id of the task to update
+ timestamp: useful to schedule a new stage of the task at a later date
+ status: the new `TaskStatus` of the task
+ result: the new result of the task
+ error: the new error of the task
+ """
+ if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED:
+ raise Exception(
+ "update_task can't be called with a FAILED or COMPLETE status"
+ )
+
+ if timestamp is None:
+ timestamp = self._clock.time_msec()
+ return await self._store.update_scheduled_task(
+ id,
+ timestamp,
+ status=status,
+ result=result,
+ error=error,
+ )
+
+ async def get_task(self, id: str) -> Optional[ScheduledTask]:
+ """Get a specific task description by id.
+
+ Args:
+ id: the id of the task to retrieve
+
+ Returns:
+ The task information or `None` if it doesn't exist or it has
+ already been removed because it's too old.
+ """
+ return await self._store.get_scheduled_task(id)
+
+ async def get_tasks(
+ self,
+ *,
+ actions: Optional[List[str]] = None,
+ resource_id: Optional[str] = None,
+ statuses: Optional[List[TaskStatus]] = None,
+ max_timestamp: Optional[int] = None,
+ ) -> List[ScheduledTask]:
+ """Get a list of tasks. Returns all the tasks if no args is provided.
+
+ If an arg is `None` all tasks matching the other args will be selected.
+ If an arg is an empty list, the corresponding value of the task needs
+ to be `None` to be selected.
+
+ Args:
+ actions: Limit the returned tasks to those specific action names
+ resource_id: Limit the returned tasks to the specific resource id, if specified
+ statuses: Limit the returned tasks to the specific statuses
+ max_timestamp: Limit the returned tasks to the ones that have
+ a timestamp inferior to the specified one
+
+ Returns
+ A list of `ScheduledTask`, ordered by increasing timestamps
+ """
+ return await self._store.get_scheduled_tasks(
+ actions=actions,
+ resource_id=resource_id,
+ statuses=statuses,
+ max_timestamp=max_timestamp,
+ )
+
+ async def delete_task(self, id: str) -> None:
+ """Delete a task. Running tasks can't be deleted.
+
+ Can only be called from the worker handling the task scheduling.
+
+ Args:
+ id: id of the task to delete
+ """
+ if self.task_is_running(id):
+ raise Exception(f"Task {id} is currently running and can't be deleted")
+ await self._store.delete_scheduled_task(id)
+
+ def task_is_running(self, id: str) -> bool:
+ """Check if a task is currently running.
+
+ Can only be called from the worker handling the task scheduling.
+
+ Args:
+ id: id of the task to check
+ """
+ assert self._run_background_tasks
+ return id in self._running_tasks
+
+ async def _handle_scheduled_tasks(self) -> None:
+ """Main loop taking care of launching tasks and cleaning up old ones."""
+ await self._launch_scheduled_tasks()
+ await self._clean_scheduled_tasks()
+
+ async def _launch_scheduled_tasks(self) -> None:
+ """Retrieve and launch scheduled tasks that should be running at that time."""
+ for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]):
+ if not self.task_is_running(task.id):
+ if (
+ len(self._running_tasks)
+ < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS
+ ):
+ await self._launch_task(task, first_launch=False)
+ else:
+ if (
+ self._clock.time_msec()
+ > task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS
+ ):
+ logger.warn(
+ f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck"
+ )
+ for task in await self.get_tasks(
+ statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec()
+ ):
+ if (
+ not self.task_is_running(task.id)
+ and len(self._running_tasks)
+ < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS
+ ):
+ await self._launch_task(task, first_launch=True)
+
+ running_tasks_gauge.set(len(self._running_tasks))
+
+ async def _clean_scheduled_tasks(self) -> None:
+ """Clean old complete or failed jobs to avoid clutter the DB."""
+ for task in await self._store.get_scheduled_tasks(
+ statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE]
+ ):
+ # FAILED and COMPLETE tasks should never be running
+ assert not self.task_is_running(task.id)
+ if (
+ self._clock.time_msec()
+ > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS
+ ):
+ await self._store.delete_scheduled_task(task.id)
+
+ async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None:
+ """Launch a scheduled task now.
+
+ Args:
+ task: the task to launch
+ first_launch: `True` if it's the first time is launched, `False` otherwise
+ """
+ assert task.action in self._actions
+
+ function = self._actions[task.action]
+
+ async def wrapper() -> None:
+ try:
+ (status, result, error) = await function(task, first_launch)
+ except Exception:
+ f = Failure()
+ logger.error(
+ f"scheduled task {task.id} failed",
+ exc_info=(f.type, f.value, f.getTracebackObject()),
+ )
+ status = TaskStatus.FAILED
+ result = None
+ error = f.getErrorMessage()
+
+ await self._store.update_scheduled_task(
+ task.id,
+ self._clock.time_msec(),
+ status=status,
+ result=result,
+ error=error,
+ )
+ self._running_tasks.remove(task.id)
+
+ self._running_tasks.add(task.id)
+ await self.update_task(task.id, status=TaskStatus.ACTIVE)
+ description = f"{task.id}-{task.action}"
+ run_as_background_process(description, wrapper)
diff --git a/tests/config/test_oauth_delegation.py b/tests/config/test_oauth_delegation.py
index f57c813a58..5c91031746 100644
--- a/tests/config/test_oauth_delegation.py
+++ b/tests/config/test_oauth_delegation.py
@@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
from unittest.mock import Mock
from synapse.config import ConfigError
@@ -167,6 +168,21 @@ class MSC3861OAuthDelegation(TestCase):
with self.assertRaises(ConfigError):
self.parse_config()
+ def test_user_consent_cannot_be_enabled(self) -> None:
+ tmpdir = self.mktemp()
+ os.mkdir(tmpdir)
+ self.config_dict["user_consent"] = {
+ "require_at_registration": True,
+ "version": "1",
+ "template_dir": tmpdir,
+ "server_notice_content": {
+ "msgtype": "m.text",
+ "body": "foo",
+ },
+ }
+ with self.assertRaises(ConfigError):
+ self.parse_config()
+
def test_password_config_cannot_be_enabled(self) -> None:
self.config_dict["password_config"] = {"enabled": True}
with self.assertRaises(ConfigError):
@@ -255,3 +271,8 @@ class MSC3861OAuthDelegation(TestCase):
self.config_dict["session_lifetime"] = "24h"
with self.assertRaises(ConfigError):
self.parse_config()
+
+ def test_enable_3pid_changes_cannot_be_enabled(self) -> None:
+ self.config_dict["enable_3pid_changes"] = True
+ with self.assertRaises(ConfigError):
+ self.parse_config()
diff --git a/tests/handlers/test_oauth_delegation.py b/tests/handlers/test_oauth_delegation.py
index 82c26e303f..b891e84690 100644
--- a/tests/handlers/test_oauth_delegation.py
+++ b/tests/handlers/test_oauth_delegation.py
@@ -14,7 +14,7 @@
from http import HTTPStatus
from typing import Any, Dict, Union
-from unittest.mock import ANY, Mock
+from unittest.mock import ANY, AsyncMock, Mock
from urllib.parse import parse_qs
from signedjson.key import (
@@ -340,6 +340,41 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
get_awaitable_result(self.auth.is_server_admin(requester)), False
)
+ def test_active_user_admin_impersonation(self) -> None:
+ """The handler should return a requester with normal user rights
+ and an user ID matching the one specified in query param `user_id`"""
+
+ self.http_client.request = simple_async_mock(
+ return_value=FakeResponse.json(
+ code=200,
+ payload={
+ "active": True,
+ "sub": SUBJECT,
+ "scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]),
+ "username": USERNAME,
+ },
+ )
+ )
+ request = Mock(args={})
+ request.args[b"access_token"] = [b"mockAccessToken"]
+ impersonated_user_id = f"@{USERNAME}:{SERVER_NAME}"
+ request.args[b"_oidc_admin_impersonate_user_id"] = [
+ impersonated_user_id.encode("ascii")
+ ]
+ request.requestHeaders.getRawHeaders = mock_getRawHeaders()
+ requester = self.get_success(self.auth.get_user_by_req(request))
+ self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
+ self.http_client.request.assert_called_once_with(
+ method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY
+ )
+ self._assertParams()
+ self.assertEqual(requester.user.to_string(), impersonated_user_id)
+ self.assertEqual(requester.is_guest, False)
+ self.assertEqual(requester.device_id, None)
+ self.assertEqual(
+ get_awaitable_result(self.auth.is_server_admin(requester)), False
+ )
+
def test_active_user_with_device(self) -> None:
"""The handler should return a requester with normal user rights and a device ID."""
@@ -553,6 +588,38 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
)
self.assertEqual(self.http_client.request.call_count, 2)
+ def test_revocation_endpoint(self) -> None:
+ # mock introspection response and then admin verification response
+ self.http_client.request = AsyncMock(
+ side_effect=[
+ FakeResponse.json(
+ code=200, payload={"active": True, "jti": "open_sesame"}
+ ),
+ FakeResponse.json(
+ code=200,
+ payload={
+ "active": True,
+ "sub": SUBJECT,
+ "scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]),
+ "username": USERNAME,
+ },
+ ),
+ ]
+ )
+
+ # cache a token to delete
+ introspection_token = self.get_success(
+ self.auth._introspect_token("open_sesame") # type: ignore[attr-defined]
+ )
+ self.assertEqual(self.auth._token_cache.get("open_sesame"), introspection_token) # type: ignore[attr-defined]
+
+ # delete the revoked token
+ introspection_token_id = "open_sesame"
+ url = f"/_synapse/admin/v1/OIDC_token_revocation/{introspection_token_id}"
+ channel = self.make_request("DELETE", url, access_token="mockAccessToken")
+ self.assertEqual(channel.code, 200)
+ self.assertEqual(self.auth._token_cache.get("open_sesame"), None) # type: ignore[attr-defined]
+
def make_device_keys(self, user_id: str, device_id: str) -> JsonDict:
# We only generate a master key to simplify the test.
master_signing_key = generate_signing_key(device_id)
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index fd66d573d2..1f483eb75a 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -514,6 +514,9 @@ class PresenceTimeoutTestCase(unittest.TestCase):
class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
+ user_id = "@test:server"
+ user_id_obj = UserID.from_string(user_id)
+
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
@@ -523,12 +526,11 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
we time out their syncing users presence.
"""
process_id = "1"
- user_id = "@test:server"
# Notify handler that a user is now syncing.
self.get_success(
self.presence_handler.update_external_syncs_row(
- process_id, user_id, True, self.clock.time_msec()
+ process_id, self.user_id, True, self.clock.time_msec()
)
)
@@ -536,48 +538,37 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
# stopped syncing that their presence state doesn't get timed out.
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.ONLINE)
# Check that if the external process timeout fires, then the syncing
# user gets timed out
self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.OFFLINE)
def test_user_goes_offline_by_timeout_status_msg_remain(self) -> None:
"""Test that if a user doesn't update the records for a while
users presence goes `OFFLINE` because of timeout and `status_msg` remains.
"""
- user_id = "@test:server"
status_msg = "I'm here!"
# Mark user as online
- self._set_presencestate_with_status_msg(
- user_id, PresenceState.ONLINE, status_msg
- )
+ self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
# Check that if we wait a while without telling the handler the user has
# stopped syncing that their presence state doesn't get timed out.
self.reactor.advance(SYNC_ONLINE_TIMEOUT / 2)
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.ONLINE)
self.assertEqual(state.status_msg, status_msg)
# Check that if the timeout fires, then the syncing user gets timed out
self.reactor.advance(SYNC_ONLINE_TIMEOUT)
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# status_msg should remain even after going offline
self.assertEqual(state.state, PresenceState.OFFLINE)
self.assertEqual(state.status_msg, status_msg)
@@ -586,24 +577,19 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
"""Test that if a user change presence manually to `OFFLINE`
and no status is set, that `status_msg` is `None`.
"""
- user_id = "@test:server"
status_msg = "I'm here!"
# Mark user as online
- self._set_presencestate_with_status_msg(
- user_id, PresenceState.ONLINE, status_msg
- )
+ self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
# Mark user as offline
self.get_success(
self.presence_handler.set_state(
- UserID.from_string(user_id), {"presence": PresenceState.OFFLINE}
+ self.user_id_obj, {"presence": PresenceState.OFFLINE}
)
)
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.OFFLINE)
self.assertEqual(state.status_msg, None)
@@ -611,41 +597,31 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
"""Test that if a user change presence manually to `OFFLINE`
and a status is set, that `status_msg` appears.
"""
- user_id = "@test:server"
status_msg = "I'm here!"
# Mark user as online
- self._set_presencestate_with_status_msg(
- user_id, PresenceState.ONLINE, status_msg
- )
+ self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
# Mark user as offline
- self._set_presencestate_with_status_msg(
- user_id, PresenceState.OFFLINE, "And now here."
- )
+ self._set_presencestate_with_status_msg(PresenceState.OFFLINE, "And now here.")
def test_user_reset_online_with_no_status(self) -> None:
"""Test that if a user set again the presence manually
and no status is set, that `status_msg` is `None`.
"""
- user_id = "@test:server"
status_msg = "I'm here!"
# Mark user as online
- self._set_presencestate_with_status_msg(
- user_id, PresenceState.ONLINE, status_msg
- )
+ self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
# Mark user as online again
self.get_success(
self.presence_handler.set_state(
- UserID.from_string(user_id), {"presence": PresenceState.ONLINE}
+ self.user_id_obj, {"presence": PresenceState.ONLINE}
)
)
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# status_msg should remain even after going offline
self.assertEqual(state.state, PresenceState.ONLINE)
self.assertEqual(state.status_msg, None)
@@ -654,33 +630,27 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
"""Test that if a user set again the presence manually
and status is `None`, that `status_msg` is `None`.
"""
- user_id = "@test:server"
status_msg = "I'm here!"
# Mark user as online
- self._set_presencestate_with_status_msg(
- user_id, PresenceState.ONLINE, status_msg
- )
+ self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
# Mark user as online and `status_msg = None`
- self._set_presencestate_with_status_msg(user_id, PresenceState.ONLINE, None)
+ self._set_presencestate_with_status_msg(PresenceState.ONLINE, None)
def test_set_presence_from_syncing_not_set(self) -> None:
"""Test that presence is not set by syncing if affect_presence is false"""
- user_id = "@test:server"
status_msg = "I'm here!"
- self._set_presencestate_with_status_msg(
- user_id, PresenceState.UNAVAILABLE, status_msg
- )
+ self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
self.get_success(
- self.presence_handler.user_syncing(user_id, False, PresenceState.ONLINE)
+ self.presence_handler.user_syncing(
+ self.user_id, False, PresenceState.ONLINE
+ )
)
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# we should still be unavailable
self.assertEqual(state.state, PresenceState.UNAVAILABLE)
# and status message should still be the same
@@ -688,50 +658,34 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
def test_set_presence_from_syncing_is_set(self) -> None:
"""Test that presence is set by syncing if affect_presence is true"""
- user_id = "@test:server"
status_msg = "I'm here!"
- self._set_presencestate_with_status_msg(
- user_id, PresenceState.UNAVAILABLE, status_msg
- )
+ self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
self.get_success(
- self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE)
+ self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE)
)
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# we should now be online
self.assertEqual(state.state, PresenceState.ONLINE)
def test_set_presence_from_syncing_keeps_status(self) -> None:
"""Test that presence set by syncing retains status message"""
- user_id = "@test:server"
status_msg = "I'm here!"
- self._set_presencestate_with_status_msg(
- user_id, PresenceState.UNAVAILABLE, status_msg
- )
+ self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
self.get_success(
- self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE)
+ self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE)
)
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# our status message should be the same as it was before
self.assertEqual(state.status_msg, status_msg)
@parameterized.expand([(False,), (True,)])
- @unittest.override_config(
- {
- "experimental_features": {
- "msc3026_enabled": True,
- },
- }
- )
+ @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
def test_set_presence_from_syncing_keeps_busy(
self, test_with_workers: bool
) -> None:
@@ -741,7 +695,6 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
test_with_workers: If True, check the presence state of the user by calling
/sync against a worker, rather than the main process.
"""
- user_id = "@test:server"
status_msg = "I'm busy!"
# By default, we call /sync against the main process.
@@ -755,44 +708,39 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
)
# Set presence to BUSY
- self._set_presencestate_with_status_msg(user_id, PresenceState.BUSY, status_msg)
+ self._set_presencestate_with_status_msg(PresenceState.BUSY, status_msg)
# Perform a sync with a presence state other than busy. This should NOT change
# our presence status; we only change from busy if we explicitly set it via
# /presence/*.
self.get_success(
worker_to_sync_against.get_presence_handler().user_syncing(
- user_id, True, PresenceState.ONLINE
+ self.user_id, True, PresenceState.ONLINE
)
)
# Check against the main process that the user's presence did not change.
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# we should still be busy
self.assertEqual(state.state, PresenceState.BUSY)
def _set_presencestate_with_status_msg(
- self, user_id: str, state: str, status_msg: Optional[str]
+ self, state: str, status_msg: Optional[str]
) -> None:
"""Set a PresenceState and status_msg and check the result.
Args:
- user_id: User for that the status is to be set.
state: The new PresenceState.
status_msg: Status message that is to be set.
"""
self.get_success(
self.presence_handler.set_state(
- UserID.from_string(user_id),
+ self.user_id_obj,
{"presence": state, "status_msg": status_msg},
)
)
- new_state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
+ new_state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(new_state.state, state)
self.assertEqual(new_state.status_msg, status_msg)
@@ -952,9 +900,6 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
self.assertEqual(upto_token, now_token)
self.assertFalse(limited)
- expected_rows = [
- (2, ("dest3", "@user3:test")),
- ]
self.assertCountEqual(rows, [])
prev_token = self.queue.get_current_token(self.instance_name)
diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py
index 9785dd698b..430209705e 100644
--- a/tests/handlers/test_user_directory.py
+++ b/tests/handlers/test_user_directory.py
@@ -446,6 +446,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
self.assertIsNone(profile)
def test_handle_user_deactivated_support_user(self) -> None:
+ """Ensure a support user doesn't get added to the user directory after deactivation."""
s_user_id = "@support:test"
self.get_success(
self.store.register_user(
@@ -453,14 +454,16 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
)
)
- mock_remove_from_user_dir = Mock(return_value=make_awaitable(None))
- with patch.object(
- self.store, "remove_from_user_dir", mock_remove_from_user_dir
- ):
- self.get_success(self.handler.handle_local_user_deactivated(s_user_id))
- # BUG: the correct spelling is assert_not_called, but that makes the test fail
- # and it's not clear that this is actually the behaviour we want.
- mock_remove_from_user_dir.not_called()
+ # The profile should not be in the directory.
+ profile = self.get_success(self.store._get_user_in_directory(s_user_id))
+ self.assertIsNone(profile)
+
+ # Remove the user from the directory.
+ self.get_success(self.handler.handle_local_user_deactivated(s_user_id))
+
+ # The profile should still not be in the user directory.
+ profile = self.get_success(self.store._get_user_in_directory(s_user_id))
+ self.assertIsNone(profile)
def test_handle_user_deactivated_regular_user(self) -> None:
r_user_id = "@regular:test"
diff --git a/tests/replication/test_intro_token_invalidation.py b/tests/replication/test_intro_token_invalidation.py
new file mode 100644
index 0000000000..f90678b6b1
--- /dev/null
+++ b/tests/replication/test_intro_token_invalidation.py
@@ -0,0 +1,62 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Any, Dict
+
+import synapse.rest.admin._base
+
+from tests.replication._base import BaseMultiWorkerStreamTestCase
+
+
+class IntrospectionTokenCacheInvalidationTestCase(BaseMultiWorkerStreamTestCase):
+ servlets = [synapse.rest.admin.register_servlets]
+
+ def default_config(self) -> Dict[str, Any]:
+ config = super().default_config()
+ config["disable_registration"] = True
+ config["experimental_features"] = {
+ "msc3861": {
+ "enabled": True,
+ "issuer": "some_dude",
+ "client_id": "ID",
+ "client_auth_method": "client_secret_post",
+ "client_secret": "secret",
+ }
+ }
+ return config
+
+ def test_stream_introspection_token_invalidation(self) -> None:
+ worker_hs = self.make_worker_hs("synapse.app.generic_worker")
+ auth = worker_hs.get_auth()
+ store = self.hs.get_datastores().main
+
+ # add a token to the cache on the worker
+ auth._token_cache["open_sesame"] = "intro_token" # type: ignore[attr-defined]
+
+ # stream the invalidation from the master
+ self.get_success(
+ store.stream_introspection_token_invalidation(("open_sesame",))
+ )
+
+ # check that the cache on the worker was invalidated
+ self.assertEqual(auth._token_cache.get("open_sesame"), None) # type: ignore[attr-defined]
+
+ # test invalidating whole cache
+ for i in range(0, 5):
+ auth._token_cache[f"open_sesame_{i}"] = f"intro_token_{i}" # type: ignore[attr-defined]
+ self.assertEqual(len(auth._token_cache), 5) # type: ignore[attr-defined]
+
+ self.get_success(store.stream_introspection_token_invalidation((None,)))
+
+ self.assertEqual(len(auth._token_cache), 0) # type: ignore[attr-defined]
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 41a959b4d6..feb81844ae 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -879,6 +879,44 @@ class UsersListTestCase(unittest.HomeserverTestCase):
self._order_test([self.admin_user, user1, user2], "creation_ts", "f")
self._order_test([user2, user1, self.admin_user], "creation_ts", "b")
+ def test_filter_admins(self) -> None:
+ """
+ Tests whether the various values of the query parameter `admins` lead to the
+ expected result set.
+ """
+
+ # Register an additional non admin user
+ self.register_user("user", "pass", admin=False)
+
+ # Query all users
+ channel = self.make_request(
+ "GET",
+ f"{self.url}",
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(200, channel.code, channel.result)
+ self.assertEqual(2, channel.json_body["total"])
+
+ # Query only admin users
+ channel = self.make_request(
+ "GET",
+ f"{self.url}?admins=true",
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(200, channel.code, channel.result)
+ self.assertEqual(1, channel.json_body["total"])
+ self.assertEqual(1, channel.json_body["users"][0]["admin"])
+
+ # Query only non admin users
+ channel = self.make_request(
+ "GET",
+ f"{self.url}?admins=false",
+ access_token=self.admin_user_tok,
+ )
+ self.assertEqual(200, channel.code, channel.result)
+ self.assertEqual(1, channel.json_body["total"])
+ self.assertFalse(channel.json_body["users"][0]["admin"])
+
@override_config(
{
"experimental_features": {
diff --git a/tests/server.py b/tests/server.py
index 481fe34c5c..ff03d28864 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -1000,8 +1000,6 @@ def setup_test_homeserver(
hs.tls_server_context_factory = Mock()
hs.setup()
- if homeserver_to_use == TestHomeServer:
- hs.setup_background_tasks()
if isinstance(db_engine, PostgresEngine):
database_pool = hs.get_datastores().databases[0]
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 9c151a5e62..7a4ecab2d5 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -13,7 +13,19 @@
# limitations under the License.
import datetime
-from typing import Dict, List, Tuple, Union, cast
+from typing import (
+ Collection,
+ Dict,
+ FrozenSet,
+ Iterable,
+ List,
+ Mapping,
+ Set,
+ Tuple,
+ TypeVar,
+ Union,
+ cast,
+)
import attr
from parameterized import parameterized
@@ -38,6 +50,138 @@ from synapse.util import Clock, json_encoder
import tests.unittest
import tests.utils
+# The silly auth graph we use to test the auth difference algorithm,
+# where the top are the most recent events.
+#
+# A B
+# \ /
+# D E
+# \ |
+# ` F C
+# | /|
+# G ´ |
+# | \ |
+# H I
+# | |
+# K J
+
+AUTH_GRAPH: Dict[str, List[str]] = {
+ "a": ["e"],
+ "b": ["e"],
+ "c": ["g", "i"],
+ "d": ["f"],
+ "e": ["f"],
+ "f": ["g"],
+ "g": ["h", "i"],
+ "h": ["k"],
+ "i": ["j"],
+ "k": [],
+ "j": [],
+}
+
+DEPTH_GRAPH = {
+ "a": 7,
+ "b": 7,
+ "c": 4,
+ "d": 6,
+ "e": 6,
+ "f": 5,
+ "g": 3,
+ "h": 2,
+ "i": 2,
+ "k": 1,
+ "j": 1,
+}
+
+T = TypeVar("T")
+
+
+def get_all_topologically_sorted_orders(
+ nodes: Iterable[T],
+ graph: Mapping[T, Collection[T]],
+) -> List[List[T]]:
+ """Given a set of nodes and a graph, return all possible topological
+ orderings.
+ """
+
+ # This is implemented by Kahn's algorithm, and forking execution each time
+ # we have a choice over which node to consider next.
+
+ degree_map = {node: 0 for node in nodes}
+ reverse_graph: Dict[T, Set[T]] = {}
+
+ for node, edges in graph.items():
+ if node not in degree_map:
+ continue
+
+ for edge in set(edges):
+ if edge in degree_map:
+ degree_map[node] += 1
+
+ reverse_graph.setdefault(edge, set()).add(node)
+ reverse_graph.setdefault(node, set())
+
+ zero_degree = [node for node, degree in degree_map.items() if degree == 0]
+
+ return _get_all_topologically_sorted_orders_inner(
+ reverse_graph, zero_degree, degree_map
+ )
+
+
+def _get_all_topologically_sorted_orders_inner(
+ reverse_graph: Dict[T, Set[T]],
+ zero_degree: List[T],
+ degree_map: Dict[T, int],
+) -> List[List[T]]:
+ new_paths = []
+
+ # Rather than only choosing *one* item from the list of nodes with zero
+ # degree, we "fork" execution and run the algorithm for each node in the
+ # zero degree.
+ for node in zero_degree:
+ new_degree_map = degree_map.copy()
+ new_zero_degree = zero_degree.copy()
+ new_zero_degree.remove(node)
+
+ for edge in reverse_graph.get(node, []):
+ if edge in new_degree_map:
+ new_degree_map[edge] -= 1
+ if new_degree_map[edge] == 0:
+ new_zero_degree.append(edge)
+
+ paths = _get_all_topologically_sorted_orders_inner(
+ reverse_graph, new_zero_degree, new_degree_map
+ )
+ for path in paths:
+ path.insert(0, node)
+
+ new_paths.extend(paths)
+
+ if not new_paths:
+ return [[]]
+
+ return new_paths
+
+
+def get_all_topologically_consistent_subsets(
+ nodes: Iterable[T],
+ graph: Mapping[T, Collection[T]],
+) -> Set[FrozenSet[T]]:
+ """Get all subsets of the graph where if node N is in the subgraph, then all
+ nodes that can reach that node (i.e. for all X there exists a path X -> N)
+ are in the subgraph.
+ """
+ all_topological_orderings = get_all_topologically_sorted_orders(nodes, graph)
+
+ graph_subsets = set()
+ for ordering in all_topological_orderings:
+ ordering.reverse()
+
+ for idx in range(len(ordering)):
+ graph_subsets.add(frozenset(ordering[:idx]))
+
+ return graph_subsets
+
@attr.s(auto_attribs=True, frozen=True, slots=True)
class _BackfillSetupInfo:
@@ -172,49 +316,6 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
def _setup_auth_chain(self, use_chain_cover_index: bool) -> str:
room_id = "@ROOM:local"
- # The silly auth graph we use to test the auth difference algorithm,
- # where the top are the most recent events.
- #
- # A B
- # \ /
- # D E
- # \ |
- # ` F C
- # | /|
- # G ´ |
- # | \ |
- # H I
- # | |
- # K J
-
- auth_graph: Dict[str, List[str]] = {
- "a": ["e"],
- "b": ["e"],
- "c": ["g", "i"],
- "d": ["f"],
- "e": ["f"],
- "f": ["g"],
- "g": ["h", "i"],
- "h": ["k"],
- "i": ["j"],
- "k": [],
- "j": [],
- }
-
- depth_map = {
- "a": 7,
- "b": 7,
- "c": 4,
- "d": 6,
- "e": 6,
- "f": 5,
- "g": 3,
- "h": 2,
- "i": 2,
- "k": 1,
- "j": 1,
- }
-
# Mark the room as maybe having a cover index.
def store_room(txn: LoggingTransaction) -> None:
@@ -238,9 +339,9 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
def insert_event(txn: LoggingTransaction) -> None:
stream_ordering = 0
- for event_id in auth_graph:
+ for event_id in AUTH_GRAPH:
stream_ordering += 1
- depth = depth_map[event_id]
+ depth = DEPTH_GRAPH[event_id]
self.store.db_pool.simple_insert_txn(
txn,
@@ -260,8 +361,8 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
self.persist_events._persist_event_auth_chain_txn(
txn,
[
- cast(EventBase, FakeEvent(event_id, room_id, auth_graph[event_id]))
- for event_id in auth_graph
+ cast(EventBase, FakeEvent(event_id, room_id, AUTH_GRAPH[event_id]))
+ for event_id in AUTH_GRAPH
],
)
@@ -344,7 +445,51 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
room_id = self._setup_auth_chain(use_chain_cover_index)
# Now actually test that various combinations give the right result:
+ self.assert_auth_diff_is_expected(room_id)
+
+ @parameterized.expand(
+ [
+ [graph_subset]
+ for graph_subset in get_all_topologically_consistent_subsets(
+ AUTH_GRAPH, AUTH_GRAPH
+ )
+ ]
+ )
+ def test_auth_difference_partial(self, graph_subset: Collection[str]) -> None:
+ """Test that if we only have a chain cover index on a partial subset of
+ the room we still get the correct auth chain difference.
+
+ We do this by removing the chain cover index for every valid subset of the
+ graph.
+ """
+ room_id = self._setup_auth_chain(True)
+
+ for event_id in graph_subset:
+ # Remove chain cover from that event.
+ self.get_success(
+ self.store.db_pool.simple_delete(
+ table="event_auth_chains",
+ keyvalues={"event_id": event_id},
+ desc="test_auth_difference_partial_remove",
+ )
+ )
+ self.get_success(
+ self.store.db_pool.simple_insert(
+ table="event_auth_chain_to_calculate",
+ values={
+ "event_id": event_id,
+ "room_id": room_id,
+ "type": "",
+ "state_key": "",
+ },
+ desc="test_auth_difference_partial_remove",
+ )
+ )
+
+ self.assert_auth_diff_is_expected(room_id)
+ def assert_auth_diff_is_expected(self, room_id: str) -> None:
+ """Assert the auth chain difference returns the correct answers."""
difference = self.get_success(
self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
)
diff --git a/tests/util/test_retryutils.py b/tests/util/test_retryutils.py
index 1277e1a865..4bcd17a6fc 100644
--- a/tests/util/test_retryutils.py
+++ b/tests/util/test_retryutils.py
@@ -108,3 +108,54 @@ class RetryLimiterTestCase(HomeserverTestCase):
new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
self.assertIsNone(new_timings)
+
+ def test_max_retry_interval(self) -> None:
+ """Test that `destination_max_retry_interval` setting works as expected"""
+ store = self.hs.get_datastores().main
+
+ destination_max_retry_interval_ms = (
+ self.hs.config.federation.destination_max_retry_interval_ms
+ )
+
+ self.get_success(get_retry_limiter("test_dest", self.clock, store))
+ self.pump(1)
+
+ failure_ts = self.clock.time_msec()
+
+ # Simulate reaching destination_max_retry_interval
+ self.get_success(
+ store.set_destination_retry_timings(
+ "test_dest",
+ failure_ts=failure_ts,
+ retry_last_ts=failure_ts,
+ retry_interval=destination_max_retry_interval_ms,
+ )
+ )
+
+ # Check it fails
+ self.get_failure(
+ get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
+ )
+
+ # Get past retry_interval and we can try again, and still throw an error to continue the backoff
+ self.reactor.advance(destination_max_retry_interval_ms / 1000 + 1)
+ limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
+ self.pump(1)
+ try:
+ with limiter:
+ self.pump(1)
+ raise AssertionError("argh")
+ except AssertionError:
+ pass
+
+ self.pump()
+
+ # retry_interval does not increase and stays at destination_max_retry_interval_ms
+ new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
+ assert new_timings is not None
+ self.assertEqual(new_timings.retry_interval, destination_max_retry_interval_ms)
+
+ # Check it fails
+ self.get_failure(
+ get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
+ )
diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py
new file mode 100644
index 0000000000..3a97559bf0
--- /dev/null
+++ b/tests/util/test_task_scheduler.py
@@ -0,0 +1,186 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Optional, Tuple
+
+from twisted.internet.task import deferLater
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.server import HomeServer
+from synapse.types import JsonMapping, ScheduledTask, TaskStatus
+from synapse.util import Clock
+from synapse.util.task_scheduler import TaskScheduler
+
+from tests import unittest
+
+
+class TestTaskScheduler(unittest.HomeserverTestCase):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.task_scheduler = hs.get_task_scheduler()
+ self.task_scheduler.register_action(self._test_task, "_test_task")
+ self.task_scheduler.register_action(self._sleeping_task, "_sleeping_task")
+ self.task_scheduler.register_action(self._raising_task, "_raising_task")
+ self.task_scheduler.register_action(self._resumable_task, "_resumable_task")
+
+ async def _test_task(
+ self, task: ScheduledTask, first_launch: bool
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ # This test task will copy the parameters to the result
+ result = None
+ if task.params:
+ result = task.params
+ return (TaskStatus.COMPLETE, result, None)
+
+ def test_schedule_task(self) -> None:
+ """Schedule a task in the future with some parameters to be copied as a result and check it executed correctly.
+ Also check that it get removed after `KEEP_TASKS_FOR_MS`."""
+ timestamp = self.clock.time_msec() + 30 * 1000
+ task_id = self.get_success(
+ self.task_scheduler.schedule_task(
+ "_test_task",
+ timestamp=timestamp,
+ params={"val": 1},
+ )
+ )
+
+ task = self.get_success(self.task_scheduler.get_task(task_id))
+ assert task is not None
+ self.assertEqual(task.status, TaskStatus.SCHEDULED)
+ self.assertIsNone(task.result)
+
+ # The timestamp being 30s after now the task should been executed
+ # after the first scheduling loop is run
+ self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)
+
+ task = self.get_success(self.task_scheduler.get_task(task_id))
+ assert task is not None
+ self.assertEqual(task.status, TaskStatus.COMPLETE)
+ assert task.result is not None
+ # The passed parameter should have been copied to the result
+ self.assertTrue(task.result.get("val") == 1)
+
+ # Let's wait for the complete task to be deleted and hence unavailable
+ self.reactor.advance((TaskScheduler.KEEP_TASKS_FOR_MS / 1000) + 1)
+
+ task = self.get_success(self.task_scheduler.get_task(task_id))
+ self.assertIsNone(task)
+
+ async def _sleeping_task(
+ self, task: ScheduledTask, first_launch: bool
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ # Sleep for a second
+ await deferLater(self.reactor, 1, lambda: None)
+ return TaskStatus.COMPLETE, None, None
+
+ def test_schedule_lot_of_tasks(self) -> None:
+ """Schedule more than `TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS` tasks and check the behavior."""
+ timestamp = self.clock.time_msec() + 30 * 1000
+ task_ids = []
+ for i in range(TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + 1):
+ task_ids.append(
+ self.get_success(
+ self.task_scheduler.schedule_task(
+ "_sleeping_task",
+ timestamp=timestamp,
+ params={"val": i},
+ )
+ )
+ )
+
+ # The timestamp being 30s after now the task should been executed
+ # after the first scheduling loop is run
+ self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
+
+ # This is to give the time to the sleeping tasks to finish
+ self.reactor.advance(1)
+
+ # Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one
+ # is still scheduled.
+ tasks = [
+ self.get_success(self.task_scheduler.get_task(task_id))
+ for task_id in task_ids
+ ]
+
+ self.assertEquals(
+ len(
+ [t for t in tasks if t is not None and t.status == TaskStatus.COMPLETE]
+ ),
+ TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
+ )
+
+ scheduled_tasks = [
+ t for t in tasks if t is not None and t.status == TaskStatus.SCHEDULED
+ ]
+ self.assertEquals(len(scheduled_tasks), 1)
+
+ self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
+ self.reactor.advance(1)
+
+ # Check that the last task has been properly executed after the next scheduler loop run
+ prev_scheduled_task = self.get_success(
+ self.task_scheduler.get_task(scheduled_tasks[0].id)
+ )
+ assert prev_scheduled_task is not None
+ self.assertEquals(
+ prev_scheduled_task.status,
+ TaskStatus.COMPLETE,
+ )
+
+ async def _raising_task(
+ self, task: ScheduledTask, first_launch: bool
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ raise Exception("raising")
+
+ def test_schedule_raising_task(self) -> None:
+ """Schedule a task raising an exception and check it runs to failure and report exception content."""
+ task_id = self.get_success(self.task_scheduler.schedule_task("_raising_task"))
+
+ self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
+
+ task = self.get_success(self.task_scheduler.get_task(task_id))
+ assert task is not None
+ self.assertEqual(task.status, TaskStatus.FAILED)
+ self.assertEqual(task.error, "raising")
+
+ async def _resumable_task(
+ self, task: ScheduledTask, first_launch: bool
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ if task.result and "in_progress" in task.result:
+ return TaskStatus.COMPLETE, {"success": True}, None
+ else:
+ await self.task_scheduler.update_task(task.id, result={"in_progress": True})
+ # Await forever to simulate an aborted task because of a restart
+ await deferLater(self.reactor, 2**16, lambda: None)
+ # This should never been called
+ return TaskStatus.ACTIVE, None, None
+
+ def test_schedule_resumable_task(self) -> None:
+ """Schedule a resumable task and check that it gets properly resumed and complete after simulating a synapse restart."""
+ task_id = self.get_success(self.task_scheduler.schedule_task("_resumable_task"))
+
+ self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
+
+ task = self.get_success(self.task_scheduler.get_task(task_id))
+ assert task is not None
+ self.assertEqual(task.status, TaskStatus.ACTIVE)
+
+ # Simulate a synapse restart by emptying the list of running tasks
+ self.task_scheduler._running_tasks = set()
+ self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
+
+ task = self.get_success(self.task_scheduler.get_task(task_id))
+ assert task is not None
+ self.assertEqual(task.status, TaskStatus.COMPLETE)
+ assert task.result is not None
+ self.assertTrue(task.result.get("success"))
|