diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml
index 602f5e1759..8a69dc4986 100644
--- a/.github/workflows/docker.yml
+++ b/.github/workflows/docker.yml
@@ -29,6 +29,16 @@ jobs:
- name: Inspect builder
run: docker buildx inspect
+ - name: Checkout repository
+ uses: actions/checkout@v3
+
+ - name: Extract version from pyproject.toml
+ # Note: explicitly requesting bash will mean bash is invoked with `-eo pipefail`, see
+ # https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsshell
+ shell: bash
+ run: |
+ echo "SYNAPSE_VERSION=$(grep "^version" pyproject.toml | sed -E 's/version\s*=\s*["]([^"]*)["]/\1/')" >> $GITHUB_ENV
+
- name: Log in to DockerHub
uses: docker/login-action@v2
with:
@@ -61,7 +71,9 @@ jobs:
uses: docker/build-push-action@v4
with:
push: true
- labels: "gitsha1=${{ github.sha }}"
+ labels: |
+ gitsha1=${{ github.sha }}
+ org.opencontainers.image.version=${{ env.SYNAPSE_VERSION }}
tags: "${{ steps.set-tag.outputs.tags }}"
file: "docker/Dockerfile"
platforms: linux/amd64,linux/arm64
diff --git a/Cargo.lock b/Cargo.lock
index 2264e67245..d28fe3c228 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -332,18 +332,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
[[package]]
name = "serde"
-version = "1.0.171"
+version = "1.0.179"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9"
+checksum = "0a5bf42b8d227d4abf38a1ddb08602e229108a517cd4e5bb28f9c7eaafdce5c0"
dependencies = [
"serde_derive",
]
[[package]]
name = "serde_derive"
-version = "1.0.171"
+version = "1.0.179"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682"
+checksum = "741e124f5485c7e60c03b043f79f320bff3527f4bbf12cf3831750dc46a0ec2c"
dependencies = [
"proc-macro2",
"quote",
@@ -352,9 +352,9 @@ dependencies = [
[[package]]
name = "serde_json"
-version = "1.0.103"
+version = "1.0.104"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d03b412469450d4404fe8499a268edd7f8b79fecb074b0d812ad64ca21f4031b"
+checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c"
dependencies = [
"itoa",
"ryu",
diff --git a/changelog.d/15525.misc b/changelog.d/15525.misc
new file mode 100644
index 0000000000..67ab0cf62f
--- /dev/null
+++ b/changelog.d/15525.misc
@@ -0,0 +1 @@
+Update SQL queries to inline boolean parameters as supported in SQLite 3.27.
diff --git a/changelog.d/15791.bugfix b/changelog.d/15791.bugfix
new file mode 100644
index 0000000000..182634b62f
--- /dev/null
+++ b/changelog.d/15791.bugfix
@@ -0,0 +1 @@
+Fix bug where purging history and paginating simultaneously could lead to database corruption when using workers.
diff --git a/changelog.d/15964.removal b/changelog.d/15964.removal
new file mode 100644
index 0000000000..7613afe505
--- /dev/null
+++ b/changelog.d/15964.removal
@@ -0,0 +1 @@
+Remove support for legacy application service paths.
diff --git a/changelog.d/15972.docker b/changelog.d/15972.docker
new file mode 100644
index 0000000000..7fd9707deb
--- /dev/null
+++ b/changelog.d/15972.docker
@@ -0,0 +1 @@
+Add `org.opencontainers.image.version` labels to Docker containers [published by Matrix.org](https://hub.docker.com/r/matrixdotorg/synapse). Contributed by Mo Balaa.
diff --git a/changelog.d/16009.docker b/changelog.d/16009.docker
new file mode 100644
index 0000000000..7fd9707deb
--- /dev/null
+++ b/changelog.d/16009.docker
@@ -0,0 +1 @@
+Add `org.opencontainers.image.version` labels to Docker containers [published by Matrix.org](https://hub.docker.com/r/matrixdotorg/synapse). Contributed by Mo Balaa.
diff --git a/changelog.d/16011.misc b/changelog.d/16011.misc
new file mode 100644
index 0000000000..8a8d9822c6
--- /dev/null
+++ b/changelog.d/16011.misc
@@ -0,0 +1 @@
+Update PyYAML to 6.0.1.
diff --git a/changelog.d/16012.bugfix b/changelog.d/16012.bugfix
new file mode 100644
index 0000000000..44ca9377ff
--- /dev/null
+++ b/changelog.d/16012.bugfix
@@ -0,0 +1 @@
+Fix 404 not found code returned on profile endpoint when the display name is empty but not the avatar URL.
diff --git a/changelog.d/16016.doc b/changelog.d/16016.doc
new file mode 100644
index 0000000000..e677058c2d
--- /dev/null
+++ b/changelog.d/16016.doc
@@ -0,0 +1,2 @@
+Clarify comment on the keys/upload over replication enpoint.
+
diff --git a/changelog.d/16023.misc b/changelog.d/16023.misc
new file mode 100644
index 0000000000..ee732318e4
--- /dev/null
+++ b/changelog.d/16023.misc
@@ -0,0 +1 @@
+Combine duplicated code.
diff --git a/changelog.d/16027.doc b/changelog.d/16027.doc
new file mode 100644
index 0000000000..201e88d6b6
--- /dev/null
+++ b/changelog.d/16027.doc
@@ -0,0 +1 @@
+Do not expose Admin API in caddy reverse proxy example. Contributed by @NilsIrl.
diff --git a/docs/reverse_proxy.md b/docs/reverse_proxy.md
index 06337e7c00..fe9519b4b6 100644
--- a/docs/reverse_proxy.md
+++ b/docs/reverse_proxy.md
@@ -95,7 +95,7 @@ matrix.example.com {
}
example.com:8448 {
- reverse_proxy localhost:8008
+ reverse_proxy /_matrix/* localhost:8008
}
```
diff --git a/poetry.lock b/poetry.lock
index d5b30a11c4..94baffe496 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -824,13 +824,13 @@ files = [
[[package]]
name = "immutabledict"
-version = "2.2.4"
+version = "3.0.0"
description = "Immutable wrapper around dictionaries (a fork of frozendict)"
optional = false
-python-versions = ">=3.7,<4.0"
+python-versions = ">=3.8,<4.0"
files = [
- {file = "immutabledict-2.2.4-py3-none-any.whl", hash = "sha256:c827715c147d2364522f9a7709cc424c7001015274a3c705250e673605bde64b"},
- {file = "immutabledict-2.2.4.tar.gz", hash = "sha256:3bedc0741faaa2846f6edf5c29183f993da3abaff6a5961bb70a5659bb9e68ab"},
+ {file = "immutabledict-3.0.0-py3-none-any.whl", hash = "sha256:034bacc6c6872707c4ec0ea9515de6bbe0dcf0fcabd97ae19fd4e4c338f05798"},
+ {file = "immutabledict-3.0.0.tar.gz", hash = "sha256:5a23cd369a6187f76a8c29d7d687980b092538eb9800e58964603f1b973c56fe"},
]
[[package]]
@@ -2072,51 +2072,51 @@ files = [
[[package]]
name = "pyyaml"
-version = "6.0"
+version = "6.0.1"
description = "YAML parser and emitter for Python"
optional = false
python-versions = ">=3.6"
files = [
- {file = "PyYAML-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53"},
- {file = "PyYAML-6.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9df7ed3b3d2e0ecfe09e14741b857df43adb5a3ddadc919a2d94fbdf78fea53c"},
- {file = "PyYAML-6.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:77f396e6ef4c73fdc33a9157446466f1cff553d979bd00ecb64385760c6babdc"},
- {file = "PyYAML-6.0-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:a80a78046a72361de73f8f395f1f1e49f956c6be882eed58505a15f3e430962b"},
- {file = "PyYAML-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"},
- {file = "PyYAML-6.0-cp310-cp310-win32.whl", hash = "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513"},
- {file = "PyYAML-6.0-cp310-cp310-win_amd64.whl", hash = "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a"},
- {file = "PyYAML-6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358"},
- {file = "PyYAML-6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1"},
- {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d"},
- {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f"},
- {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782"},
- {file = "PyYAML-6.0-cp311-cp311-win32.whl", hash = "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7"},
- {file = "PyYAML-6.0-cp311-cp311-win_amd64.whl", hash = "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf"},
- {file = "PyYAML-6.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86"},
- {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f"},
- {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92"},
- {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:98c4d36e99714e55cfbaaee6dd5badbc9a1ec339ebfc3b1f52e293aee6bb71a4"},
- {file = "PyYAML-6.0-cp36-cp36m-win32.whl", hash = "sha256:0283c35a6a9fbf047493e3a0ce8d79ef5030852c51e9d911a27badfde0605293"},
- {file = "PyYAML-6.0-cp36-cp36m-win_amd64.whl", hash = "sha256:07751360502caac1c067a8132d150cf3d61339af5691fe9e87803040dbc5db57"},
- {file = "PyYAML-6.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:819b3830a1543db06c4d4b865e70ded25be52a2e0631ccd2f6a47a2822f2fd7c"},
- {file = "PyYAML-6.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:473f9edb243cb1935ab5a084eb238d842fb8f404ed2193a915d1784b5a6b5fc0"},
- {file = "PyYAML-6.0-cp37-cp37m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0ce82d761c532fe4ec3f87fc45688bdd3a4c1dc5e0b4a19814b9009a29baefd4"},
- {file = "PyYAML-6.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:231710d57adfd809ef5d34183b8ed1eeae3f76459c18fb4a0b373ad56bedcdd9"},
- {file = "PyYAML-6.0-cp37-cp37m-win32.whl", hash = "sha256:c5687b8d43cf58545ade1fe3e055f70eac7a5a1a0bf42824308d868289a95737"},
- {file = "PyYAML-6.0-cp37-cp37m-win_amd64.whl", hash = "sha256:d15a181d1ecd0d4270dc32edb46f7cb7733c7c508857278d3d378d14d606db2d"},
- {file = "PyYAML-6.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:0b4624f379dab24d3725ffde76559cff63d9ec94e1736b556dacdfebe5ab6d4b"},
- {file = "PyYAML-6.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:213c60cd50106436cc818accf5baa1aba61c0189ff610f64f4a3e8c6726218ba"},
- {file = "PyYAML-6.0-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9fa600030013c4de8165339db93d182b9431076eb98eb40ee068700c9c813e34"},
- {file = "PyYAML-6.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:277a0ef2981ca40581a47093e9e2d13b3f1fbbeffae064c1d21bfceba2030287"},
- {file = "PyYAML-6.0-cp38-cp38-win32.whl", hash = "sha256:d4eccecf9adf6fbcc6861a38015c2a64f38b9d94838ac1810a9023a0609e1b78"},
- {file = "PyYAML-6.0-cp38-cp38-win_amd64.whl", hash = "sha256:1e4747bc279b4f613a09eb64bba2ba602d8a6664c6ce6396a4d0cd413a50ce07"},
- {file = "PyYAML-6.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:055d937d65826939cb044fc8c9b08889e8c743fdc6a32b33e2390f66013e449b"},
- {file = "PyYAML-6.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e61ceaab6f49fb8bdfaa0f92c4b57bcfbea54c09277b1b4f7ac376bfb7a7c174"},
- {file = "PyYAML-6.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d67d839ede4ed1b28a4e8909735fc992a923cdb84e618544973d7dfc71540803"},
- {file = "PyYAML-6.0-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:cba8c411ef271aa037d7357a2bc8f9ee8b58b9965831d9e51baf703280dc73d3"},
- {file = "PyYAML-6.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:40527857252b61eacd1d9af500c3337ba8deb8fc298940291486c465c8b46ec0"},
- {file = "PyYAML-6.0-cp39-cp39-win32.whl", hash = "sha256:b5b9eccad747aabaaffbc6064800670f0c297e52c12754eb1d976c57e4f74dcb"},
- {file = "PyYAML-6.0-cp39-cp39-win_amd64.whl", hash = "sha256:b3d267842bf12586ba6c734f89d1f5b871df0273157918b0ccefa29deb05c21c"},
- {file = "PyYAML-6.0.tar.gz", hash = "sha256:68fb519c14306fec9720a2a5b45bc9f0c8d1b9c72adf45c37baedfcd949c35a2"},
+ {file = "PyYAML-6.0.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d858aa552c999bc8a8d57426ed01e40bef403cd8ccdd0fc5f6f04a00414cac2a"},
+ {file = "PyYAML-6.0.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:fd66fc5d0da6d9815ba2cebeb4205f95818ff4b79c3ebe268e75d961704af52f"},
+ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"},
+ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"},
+ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"},
+ {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"},
+ {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"},
+ {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"},
+ {file = "PyYAML-6.0.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:f003ed9ad21d6a4713f0a9b5a7a0a79e08dd0f221aff4525a2be4c346ee60aab"},
+ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"},
+ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"},
+ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"},
+ {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"},
+ {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
+ {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"},
+ {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"},
+ {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"},
+ {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:afd7e57eddb1a54f0f1a974bc4391af8bcce0b444685d936840f125cf046d5bd"},
+ {file = "PyYAML-6.0.1-cp36-cp36m-win32.whl", hash = "sha256:fca0e3a251908a499833aa292323f32437106001d436eca0e6e7833256674585"},
+ {file = "PyYAML-6.0.1-cp36-cp36m-win_amd64.whl", hash = "sha256:f22ac1c3cac4dbc50079e965eba2c1058622631e526bd9afd45fedd49ba781fa"},
+ {file = "PyYAML-6.0.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:b1275ad35a5d18c62a7220633c913e1b42d44b46ee12554e5fd39c70a243d6a3"},
+ {file = "PyYAML-6.0.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:18aeb1bf9a78867dc38b259769503436b7c72f7a1f1f4c93ff9a17de54319b27"},
+ {file = "PyYAML-6.0.1-cp37-cp37m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:596106435fa6ad000c2991a98fa58eeb8656ef2325d7e158344fb33864ed87e3"},
+ {file = "PyYAML-6.0.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:baa90d3f661d43131ca170712d903e6295d1f7a0f595074f151c0aed377c9b9c"},
+ {file = "PyYAML-6.0.1-cp37-cp37m-win32.whl", hash = "sha256:9046c58c4395dff28dd494285c82ba00b546adfc7ef001486fbf0324bc174fba"},
+ {file = "PyYAML-6.0.1-cp37-cp37m-win_amd64.whl", hash = "sha256:4fb147e7a67ef577a588a0e2c17b6db51dda102c71de36f8549b6816a96e1867"},
+ {file = "PyYAML-6.0.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:1d4c7e777c441b20e32f52bd377e0c409713e8bb1386e1099c2415f26e479595"},
+ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"},
+ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"},
+ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"},
+ {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"},
+ {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"},
+ {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"},
+ {file = "PyYAML-6.0.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c8098ddcc2a85b61647b2590f825f3db38891662cfc2fc776415143f599bb859"},
+ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"},
+ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"},
+ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"},
+ {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"},
+ {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"},
+ {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"},
]
[[package]]
@@ -2427,13 +2427,13 @@ tornado = ["tornado (>=5)"]
[[package]]
name = "service-identity"
-version = "21.1.0"
+version = "23.1.0"
description = "Service identity verification for pyOpenSSL & cryptography."
optional = false
-python-versions = "*"
+python-versions = ">=3.8"
files = [
- {file = "service-identity-21.1.0.tar.gz", hash = "sha256:6e6c6086ca271dc11b033d17c3a8bea9f24ebff920c587da090afc9519419d34"},
- {file = "service_identity-21.1.0-py2.py3-none-any.whl", hash = "sha256:f0b0caac3d40627c3c04d7a51b6e06721857a0e10a8775f2d1d7e72901b3a7db"},
+ {file = "service_identity-23.1.0-py3-none-any.whl", hash = "sha256:87415a691d52fcad954a500cb81f424d0273f8e7e3ee7d766128f4575080f383"},
+ {file = "service_identity-23.1.0.tar.gz", hash = "sha256:ecb33cd96307755041e978ab14f8b14e13b40f1fbd525a4dc78f46d2b986431d"},
]
[package.dependencies]
@@ -2441,12 +2441,12 @@ attrs = ">=19.1.0"
cryptography = "*"
pyasn1 = "*"
pyasn1-modules = "*"
-six = "*"
[package.extras]
-dev = ["coverage[toml] (>=5.0.2)", "furo", "idna", "pyOpenSSL", "pytest", "sphinx"]
-docs = ["furo", "sphinx"]
+dev = ["pyopenssl", "service-identity[docs,idna,mypy,tests]"]
+docs = ["furo", "myst-parser", "pyopenssl", "sphinx", "sphinx-notfound-page"]
idna = ["idna"]
+mypy = ["idna", "mypy", "types-pyopenssl"]
tests = ["coverage[toml] (>=5.0.2)", "pytest"]
[[package]]
@@ -2947,35 +2947,35 @@ files = [
[[package]]
name = "types-commonmark"
-version = "0.9.2.3"
+version = "0.9.2.4"
description = "Typing stubs for commonmark"
optional = false
python-versions = "*"
files = [
- {file = "types-commonmark-0.9.2.3.tar.gz", hash = "sha256:42769a2c194fd5b49fd9eedfd4a83cd1d2514c6d0a36f00f5c5ffe0b6a2d2fcf"},
- {file = "types_commonmark-0.9.2.3-py3-none-any.whl", hash = "sha256:b575156e1b8a292d43acb36f861110b85c4bc7aa53bbfb5ac64addec15d18cfa"},
+ {file = "types-commonmark-0.9.2.4.tar.gz", hash = "sha256:2c6486f65735cf18215cca3e962b17787fa545be279306f79b801f64a5319959"},
+ {file = "types_commonmark-0.9.2.4-py3-none-any.whl", hash = "sha256:d5090fa685c3e3c0ec3a5973ff842000baef6d86f762d52209b3c5e9fbd0b555"},
]
[[package]]
name = "types-jsonschema"
-version = "4.17.0.8"
+version = "4.17.0.10"
description = "Typing stubs for jsonschema"
optional = false
python-versions = "*"
files = [
- {file = "types-jsonschema-4.17.0.8.tar.gz", hash = "sha256:96a56990910f405e62de58862c0bbb3ac29ee6dba6d3d99aa0ba7f874cc547de"},
- {file = "types_jsonschema-4.17.0.8-py3-none-any.whl", hash = "sha256:f5958eb7b53217dfb5125f0412aeaef226a8a9013eac95816c95b5b523f6796b"},
+ {file = "types-jsonschema-4.17.0.10.tar.gz", hash = "sha256:8e979db34d69bc9f9b3d6e8b89bdbc60b3a41cfce4e1fb87bf191d205c7f5098"},
+ {file = "types_jsonschema-4.17.0.10-py3-none-any.whl", hash = "sha256:3aa2a89afbd9eaa6ce0c15618b36f02692a621433889ce73014656f7d8caf971"},
]
[[package]]
name = "types-netaddr"
-version = "0.8.0.8"
+version = "0.8.0.9"
description = "Typing stubs for netaddr"
optional = false
python-versions = "*"
files = [
- {file = "types-netaddr-0.8.0.8.tar.gz", hash = "sha256:db7e8cd16b1244e7c4541edd0df99d1039fc05fd5387c21840f0b958fc52aabc"},
- {file = "types_netaddr-0.8.0.8-py3-none-any.whl", hash = "sha256:6741b3824e2ec3f7a74842b394439b71107c7675f8ae42bb2b5e7a8ebfe8cf18"},
+ {file = "types-netaddr-0.8.0.9.tar.gz", hash = "sha256:68900c267fd31627c1721c5c52b32a257657ac2777457dca49b6b096ba2faf74"},
+ {file = "types_netaddr-0.8.0.9-py3-none-any.whl", hash = "sha256:63e871f064cd59473cec1177f372526f0fa3d565050247d5305bdc325be5c3f6"},
]
[[package]]
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 5fb3d5083d..359999f680 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -17,8 +17,6 @@ import urllib.parse
from typing import (
TYPE_CHECKING,
Any,
- Awaitable,
- Callable,
Dict,
Iterable,
List,
@@ -30,7 +28,7 @@ from typing import (
)
from prometheus_client import Counter
-from typing_extensions import Concatenate, ParamSpec, TypeGuard
+from typing_extensions import ParamSpec, TypeGuard
from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
from synapse.api.errors import CodeMessageException, HttpResponseException
@@ -80,9 +78,7 @@ sent_todevice_counter = Counter(
HOUR_IN_MS = 60 * 60 * 1000
-
APP_SERVICE_PREFIX = "/_matrix/app/v1"
-APP_SERVICE_UNSTABLE_PREFIX = "/_matrix/app/unstable"
P = ParamSpec("P")
R = TypeVar("R")
@@ -128,47 +124,6 @@ class ApplicationServiceApi(SimpleHttpClient):
hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
)
- async def _send_with_fallbacks(
- self,
- service: "ApplicationService",
- prefixes: List[str],
- path: str,
- func: Callable[Concatenate[str, P], Awaitable[R]],
- *args: P.args,
- **kwargs: P.kwargs,
- ) -> R:
- """
- Attempt to call an application service with multiple paths, falling back
- until one succeeds.
-
- Args:
- service: The appliacation service, this provides the base URL.
- prefixes: A last of paths to try in order for the requests.
- path: A suffix to append to each prefix.
- func: The function to call, the first argument will be the full
- endpoint to fetch. Other arguments are provided by args/kwargs.
-
- Returns:
- The return value of func.
- """
- for i, prefix in enumerate(prefixes, start=1):
- uri = f"{service.url}{prefix}{path}"
- try:
- return await func(uri, *args, **kwargs)
- except HttpResponseException as e:
- # If an error is received that is due to an unrecognised path,
- # fallback to next path (if one exists). Otherwise, consider it
- # a legitimate error and raise.
- if i < len(prefixes) and is_unknown_endpoint(e):
- continue
- raise
- except Exception:
- # Unexpected exceptions get sent to the caller.
- raise
-
- # The function should always exit via the return or raise above this.
- raise RuntimeError("Unexpected fallback behaviour. This should never be seen.")
-
async def query_user(self, service: "ApplicationService", user_id: str) -> bool:
if service.url is None:
return False
@@ -177,11 +132,8 @@ class ApplicationServiceApi(SimpleHttpClient):
assert service.hs_token is not None
try:
- response = await self._send_with_fallbacks(
- service,
- [APP_SERVICE_PREFIX, ""],
- f"/users/{urllib.parse.quote(user_id)}",
- self.get_json,
+ response = await self.get_json(
+ f"{service.url}{APP_SERVICE_PREFIX}/users/{urllib.parse.quote(user_id)}",
{"access_token": service.hs_token},
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
@@ -203,11 +155,8 @@ class ApplicationServiceApi(SimpleHttpClient):
assert service.hs_token is not None
try:
- response = await self._send_with_fallbacks(
- service,
- [APP_SERVICE_PREFIX, ""],
- f"/rooms/{urllib.parse.quote(alias)}",
- self.get_json,
+ response = await self.get_json(
+ f"{service.url}{APP_SERVICE_PREFIX}/rooms/{urllib.parse.quote(alias)}",
{"access_token": service.hs_token},
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
@@ -245,11 +194,8 @@ class ApplicationServiceApi(SimpleHttpClient):
**fields,
b"access_token": service.hs_token,
}
- response = await self._send_with_fallbacks(
- service,
- [APP_SERVICE_PREFIX, APP_SERVICE_UNSTABLE_PREFIX],
- f"/thirdparty/{kind}/{urllib.parse.quote(protocol)}",
- self.get_json,
+ response = await self.get_json(
+ f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/{kind}/{urllib.parse.quote(protocol)}",
args=args,
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
@@ -285,11 +231,8 @@ class ApplicationServiceApi(SimpleHttpClient):
# This is required by the configuration.
assert service.hs_token is not None
try:
- info = await self._send_with_fallbacks(
- service,
- [APP_SERVICE_PREFIX, APP_SERVICE_UNSTABLE_PREFIX],
- f"/thirdparty/protocol/{urllib.parse.quote(protocol)}",
- self.get_json,
+ info = await self.get_json(
+ f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/protocol/{urllib.parse.quote(protocol)}",
{"access_token": service.hs_token},
headers={"Authorization": [f"Bearer {service.hs_token}"]},
)
@@ -401,11 +344,8 @@ class ApplicationServiceApi(SimpleHttpClient):
}
try:
- await self._send_with_fallbacks(
- service,
- [APP_SERVICE_PREFIX, ""],
- f"/transactions/{urllib.parse.quote(str(txn_id))}",
- self.put_json,
+ await self.put_json(
+ f"{service.url}{APP_SERVICE_PREFIX}/transactions/{urllib.parse.quote(str(txn_id))}",
json_body=body,
args={"access_token": service.hs_token},
headers={"Authorization": [f"Bearer {service.hs_token}"]},
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index fa61dd8c10..a90d99c4d6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -63,6 +63,7 @@ from synapse.federation.federation_base import (
)
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.http.servlet import assert_params_in_dict
from synapse.logging.context import (
make_deferred_yieldable,
@@ -137,6 +138,7 @@ class FederationServer(FederationBase):
self._event_auth_handler = hs.get_event_auth_handler()
self._room_member_handler = hs.get_room_member_handler()
self._e2e_keys_handler = hs.get_e2e_keys_handler()
+ self._worker_lock_handler = hs.get_worker_locks_handler()
self._state_storage_controller = hs.get_storage_controllers().state
@@ -1236,9 +1238,18 @@ class FederationServer(FederationBase):
logger.info("handling received PDU in room %s: %s", room_id, event)
try:
with nested_logging_context(event.event_id):
- await self._federation_event_handler.on_receive_pdu(
- origin, event
- )
+ # We're taking out a lock within a lock, which could
+ # lead to deadlocks if we're not careful. However, it is
+ # safe on this occasion as we only ever take a write
+ # lock when deleting a room, which we would never do
+ # while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME`
+ # lock.
+ async with self._worker_lock_handler.acquire_read_write_lock(
+ DELETE_ROOM_LOCK_NAME, room_id, write=False
+ ):
+ await self._federation_event_handler.on_receive_pdu(
+ origin, event
+ )
except FederationError as e:
# XXX: Ideally we'd inform the remote we failed to process
# the event, but we can't return an error in the transaction
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index fff0b5fa12..c656e07d37 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -53,6 +53,7 @@ from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
from synapse.events.validator import EventValidator
from synapse.handlers.directory import DirectoryHandler
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -485,6 +486,7 @@ class EventCreationHandler:
self._events_shard_config = self.config.worker.events_shard_config
self._instance_name = hs.get_instance_name()
self._notifier = hs.get_notifier()
+ self._worker_lock_handler = hs.get_worker_locks_handler()
self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
@@ -876,14 +878,13 @@ class EventCreationHandler:
return prev_event
return None
- async def get_event_from_transaction(
+ async def get_event_id_from_transaction(
self,
requester: Requester,
txn_id: str,
room_id: str,
- ) -> Optional[EventBase]:
- """For the given transaction ID and room ID, check if there is a matching event.
- If so, fetch it and return it.
+ ) -> Optional[str]:
+ """For the given transaction ID and room ID, check if there is a matching event ID.
Args:
requester: The requester making the request in the context of which we want
@@ -892,8 +893,9 @@ class EventCreationHandler:
room_id: The room ID.
Returns:
- An event if one could be found, None otherwise.
+ An event ID if one could be found, None otherwise.
"""
+ existing_event_id = None
if self._msc3970_enabled and requester.device_id:
# When MSC3970 is enabled, we lookup for events sent by the same device first,
@@ -907,7 +909,7 @@ class EventCreationHandler:
)
)
if existing_event_id:
- return await self.store.get_event(existing_event_id)
+ return existing_event_id
# Pre-MSC3970, we looked up for events that were sent by the same session by
# using the access token ID.
@@ -920,9 +922,32 @@ class EventCreationHandler:
txn_id,
)
)
- if existing_event_id:
- return await self.store.get_event(existing_event_id)
+ return existing_event_id
+
+ async def get_event_from_transaction(
+ self,
+ requester: Requester,
+ txn_id: str,
+ room_id: str,
+ ) -> Optional[EventBase]:
+ """For the given transaction ID and room ID, check if there is a matching event.
+ If so, fetch it and return it.
+
+ Args:
+ requester: The requester making the request in the context of which we want
+ to fetch the event.
+ txn_id: The transaction ID.
+ room_id: The room ID.
+
+ Returns:
+ An event if one could be found, None otherwise.
+ """
+ existing_event_id = await self.get_event_id_from_transaction(
+ requester, txn_id, room_id
+ )
+ if existing_event_id:
+ return await self.store.get_event(existing_event_id)
return None
async def create_and_send_nonmember_event(
@@ -1010,6 +1035,37 @@ class EventCreationHandler:
event.internal_metadata.stream_ordering,
)
+ async with self._worker_lock_handler.acquire_read_write_lock(
+ DELETE_ROOM_LOCK_NAME, room_id, write=False
+ ):
+ return await self._create_and_send_nonmember_event_locked(
+ requester=requester,
+ event_dict=event_dict,
+ allow_no_prev_events=allow_no_prev_events,
+ prev_event_ids=prev_event_ids,
+ state_event_ids=state_event_ids,
+ ratelimit=ratelimit,
+ txn_id=txn_id,
+ ignore_shadow_ban=ignore_shadow_ban,
+ outlier=outlier,
+ depth=depth,
+ )
+
+ async def _create_and_send_nonmember_event_locked(
+ self,
+ requester: Requester,
+ event_dict: dict,
+ allow_no_prev_events: bool = False,
+ prev_event_ids: Optional[List[str]] = None,
+ state_event_ids: Optional[List[str]] = None,
+ ratelimit: bool = True,
+ txn_id: Optional[str] = None,
+ ignore_shadow_ban: bool = False,
+ outlier: bool = False,
+ depth: Optional[int] = None,
+ ) -> Tuple[EventBase, int]:
+ room_id = event_dict["room_id"]
+
# If we don't have any prev event IDs specified then we need to
# check that the host is in the room (as otherwise populating the
# prev events will fail), at which point we may as well check the
@@ -1923,7 +1979,10 @@ class EventCreationHandler:
)
for room_id in room_ids:
- dummy_event_sent = await self._send_dummy_event_for_room(room_id)
+ async with self._worker_lock_handler.acquire_read_write_lock(
+ DELETE_ROOM_LOCK_NAME, room_id, write=False
+ ):
+ dummy_event_sent = await self._send_dummy_event_for_room(room_id)
if not dummy_event_sent:
# Did not find a valid user in the room, so remove from future attempts
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 19b8728db9..da34658470 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -46,6 +46,11 @@ logger = logging.getLogger(__name__)
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
+PURGE_HISTORY_LOCK_NAME = "purge_history_lock"
+
+DELETE_ROOM_LOCK_NAME = "delete_room_lock"
+
+
@attr.s(slots=True, auto_attribs=True)
class PurgeStatus:
"""Object tracking the status of a purge request
@@ -142,6 +147,7 @@ class PaginationHandler:
self._server_name = hs.hostname
self._room_shutdown_handler = hs.get_room_shutdown_handler()
self._relations_handler = hs.get_relations_handler()
+ self._worker_locks = hs.get_worker_locks_handler()
self.pagination_lock = ReadWriteLock()
# IDs of rooms in which there currently an active purge *or delete* operation.
@@ -356,7 +362,9 @@ class PaginationHandler:
"""
self._purges_in_progress_by_room.add(room_id)
try:
- async with self.pagination_lock.write(room_id):
+ async with self._worker_locks.acquire_read_write_lock(
+ PURGE_HISTORY_LOCK_NAME, room_id, write=True
+ ):
await self._storage_controllers.purge_events.purge_history(
room_id, token, delete_local_events
)
@@ -412,7 +420,10 @@ class PaginationHandler:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
- async with self.pagination_lock.write(room_id):
+ async with self._worker_locks.acquire_multi_read_write_lock(
+ [(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)],
+ write=True,
+ ):
# first check that we have no users in this room
if not force:
joined = await self.store.is_host_joined(room_id, self._server_name)
@@ -471,7 +482,9 @@ class PaginationHandler:
room_token = from_token.room_key
- async with self.pagination_lock.read(room_id):
+ async with self._worker_locks.acquire_read_write_lock(
+ PURGE_HISTORY_LOCK_NAME, room_id, write=False
+ ):
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
(
@@ -747,7 +760,9 @@ class PaginationHandler:
self._purges_in_progress_by_room.add(room_id)
try:
- async with self.pagination_lock.write(room_id):
+ async with self._worker_locks.acquire_read_write_lock(
+ PURGE_HISTORY_LOCK_NAME, room_id, write=True
+ ):
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
delete_id
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index a7f8c5e636..c7fe101cd9 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -68,7 +68,7 @@ class ProfileHandler:
if self.hs.is_mine(target_user):
profileinfo = await self.store.get_profileinfo(target_user)
- if profileinfo.display_name is None:
+ if profileinfo.display_name is None and profileinfo.avatar_url is None:
raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
return {
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 496e701f13..e3cdf2bc61 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -39,6 +39,7 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -94,6 +95,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()
self.event_auth_handler = hs.get_event_auth_handler()
+ self._worker_lock_handler = hs.get_worker_locks_handler()
self.member_linearizer: Linearizer = Linearizer(name="member")
self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
@@ -174,8 +176,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
- self._msc3970_enabled = hs.config.experimental.msc3970_enabled
-
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
@@ -416,29 +416,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# do this check just before we persist an event as well, but may as well
# do it up front for efficiency.)
if txn_id:
- existing_event_id = None
- if self._msc3970_enabled and requester.device_id:
- # When MSC3970 is enabled, we lookup for events sent by the same device
- # first, and fallback to the old behaviour if none were found.
- existing_event_id = (
- await self.store.get_event_id_from_transaction_id_and_device_id(
- room_id,
- requester.user.to_string(),
- requester.device_id,
- txn_id,
- )
+ existing_event_id = (
+ await self.event_creation_handler.get_event_id_from_transaction(
+ requester, txn_id, room_id
)
-
- if requester.access_token_id and not existing_event_id:
- existing_event_id = (
- await self.store.get_event_id_from_transaction_id_and_token_id(
- room_id,
- requester.user.to_string(),
- requester.access_token_id,
- txn_id,
- )
- )
-
+ )
if existing_event_id:
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream
@@ -638,26 +620,29 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# by application services), and then by room ID.
async with self.member_as_limiter.queue(as_id):
async with self.member_linearizer.queue(key):
- with opentracing.start_active_span("update_membership_locked"):
- result = await self.update_membership_locked(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- new_room=new_room,
- require_consent=require_consent,
- outlier=outlier,
- allow_no_prev_events=allow_no_prev_events,
- prev_event_ids=prev_event_ids,
- state_event_ids=state_event_ids,
- depth=depth,
- origin_server_ts=origin_server_ts,
- )
+ async with self._worker_lock_handler.acquire_read_write_lock(
+ DELETE_ROOM_LOCK_NAME, room_id, write=False
+ ):
+ with opentracing.start_active_span("update_membership_locked"):
+ result = await self.update_membership_locked(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ new_room=new_room,
+ require_consent=require_consent,
+ outlier=outlier,
+ allow_no_prev_events=allow_no_prev_events,
+ prev_event_ids=prev_event_ids,
+ state_event_ids=state_event_ids,
+ depth=depth,
+ origin_server_ts=origin_server_ts,
+ )
return result
diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py
new file mode 100644
index 0000000000..72df773a86
--- /dev/null
+++ b/synapse/handlers/worker_lock.py
@@ -0,0 +1,333 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from types import TracebackType
+from typing import (
+ TYPE_CHECKING,
+ AsyncContextManager,
+ Collection,
+ Dict,
+ Optional,
+ Tuple,
+ Type,
+ Union,
+)
+from weakref import WeakSet
+
+import attr
+
+from twisted.internet import defer
+from twisted.internet.interfaces import IReactorTime
+
+from synapse.logging.context import PreserveLoggingContext
+from synapse.logging.opentracing import start_active_span
+from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.storage.databases.main.lock import Lock, LockStore
+from synapse.util.async_helpers import timeout_deferred
+
+if TYPE_CHECKING:
+ from synapse.logging.opentracing import opentracing
+ from synapse.server import HomeServer
+
+
+DELETE_ROOM_LOCK_NAME = "delete_room_lock"
+
+
+class WorkerLocksHandler:
+ """A class for waiting on taking out locks, rather than using the storage
+ functions directly (which don't support awaiting).
+ """
+
+ def __init__(self, hs: "HomeServer") -> None:
+ self._reactor = hs.get_reactor()
+ self._store = hs.get_datastores().main
+ self._clock = hs.get_clock()
+ self._notifier = hs.get_notifier()
+ self._instance_name = hs.get_instance_name()
+
+ # Map from lock name/key to set of `WaitingLock` that are active for
+ # that lock.
+ self._locks: Dict[
+ Tuple[str, str], WeakSet[Union[WaitingLock, WaitingMultiLock]]
+ ] = {}
+
+ self._clock.looping_call(self._cleanup_locks, 30_000)
+
+ self._notifier.add_lock_released_callback(self._on_lock_released)
+
+ def acquire_lock(self, lock_name: str, lock_key: str) -> "WaitingLock":
+ """Acquire a standard lock, returns a context manager that will block
+ until the lock is acquired.
+
+ Note: Care must be taken to avoid deadlocks. In particular, this
+ function does *not* timeout.
+
+ Usage:
+ async with handler.acquire_lock(name, key):
+ # Do work while holding the lock...
+ """
+
+ lock = WaitingLock(
+ reactor=self._reactor,
+ store=self._store,
+ handler=self,
+ lock_name=lock_name,
+ lock_key=lock_key,
+ write=None,
+ )
+
+ self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock)
+
+ return lock
+
+ def acquire_read_write_lock(
+ self,
+ lock_name: str,
+ lock_key: str,
+ *,
+ write: bool,
+ ) -> "WaitingLock":
+ """Acquire a read/write lock, returns a context manager that will block
+ until the lock is acquired.
+
+ Note: Care must be taken to avoid deadlocks. In particular, this
+ function does *not* timeout.
+
+ Usage:
+ async with handler.acquire_read_write_lock(name, key, write=True):
+ # Do work while holding the lock...
+ """
+
+ lock = WaitingLock(
+ reactor=self._reactor,
+ store=self._store,
+ handler=self,
+ lock_name=lock_name,
+ lock_key=lock_key,
+ write=write,
+ )
+
+ self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock)
+
+ return lock
+
+ def acquire_multi_read_write_lock(
+ self,
+ lock_names: Collection[Tuple[str, str]],
+ *,
+ write: bool,
+ ) -> "WaitingMultiLock":
+ """Acquires multi read/write locks at once, returns a context manager
+ that will block until all the locks are acquired.
+
+ This will try and acquire all locks at once, and will never hold on to a
+ subset of the locks. (This avoids accidentally creating deadlocks).
+
+ Note: Care must be taken to avoid deadlocks. In particular, this
+ function does *not* timeout.
+ """
+
+ lock = WaitingMultiLock(
+ lock_names=lock_names,
+ write=write,
+ reactor=self._reactor,
+ store=self._store,
+ handler=self,
+ )
+
+ for lock_name, lock_key in lock_names:
+ self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock)
+
+ return lock
+
+ def notify_lock_released(self, lock_name: str, lock_key: str) -> None:
+ """Notify that a lock has been released.
+
+ Pokes both the notifier and replication.
+ """
+
+ self._notifier.notify_lock_released(self._instance_name, lock_name, lock_key)
+
+ def _on_lock_released(
+ self, instance_name: str, lock_name: str, lock_key: str
+ ) -> None:
+ """Called when a lock has been released.
+
+ Wakes up any locks that might be waiting on this.
+ """
+ locks = self._locks.get((lock_name, lock_key))
+ if not locks:
+ return
+
+ def _wake_deferred(deferred: defer.Deferred) -> None:
+ if not deferred.called:
+ deferred.callback(None)
+
+ for lock in locks:
+ self._clock.call_later(0, _wake_deferred, lock.deferred)
+
+ @wrap_as_background_process("_cleanup_locks")
+ async def _cleanup_locks(self) -> None:
+ """Periodically cleans out stale entries in the locks map"""
+ self._locks = {key: value for key, value in self._locks.items() if value}
+
+
+@attr.s(auto_attribs=True, eq=False)
+class WaitingLock:
+ reactor: IReactorTime
+ store: LockStore
+ handler: WorkerLocksHandler
+ lock_name: str
+ lock_key: str
+ write: Optional[bool]
+ deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred)
+ _inner_lock: Optional[Lock] = None
+ _retry_interval: float = 0.1
+ _lock_span: "opentracing.Scope" = attr.Factory(
+ lambda: start_active_span("WaitingLock.lock")
+ )
+
+ async def __aenter__(self) -> None:
+ self._lock_span.__enter__()
+
+ with start_active_span("WaitingLock.waiting_for_lock"):
+ while self._inner_lock is None:
+ self.deferred = defer.Deferred()
+
+ if self.write is not None:
+ lock = await self.store.try_acquire_read_write_lock(
+ self.lock_name, self.lock_key, write=self.write
+ )
+ else:
+ lock = await self.store.try_acquire_lock(
+ self.lock_name, self.lock_key
+ )
+
+ if lock:
+ self._inner_lock = lock
+ break
+
+ try:
+ # Wait until the we get notified the lock might have been
+ # released (by the deferred being resolved). We also
+ # periodically wake up in case the lock was released but we
+ # weren't notified.
+ with PreserveLoggingContext():
+ await timeout_deferred(
+ deferred=self.deferred,
+ timeout=self._get_next_retry_interval(),
+ reactor=self.reactor,
+ )
+ except Exception:
+ pass
+
+ return await self._inner_lock.__aenter__()
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc: Optional[BaseException],
+ tb: Optional[TracebackType],
+ ) -> Optional[bool]:
+ assert self._inner_lock
+
+ self.handler.notify_lock_released(self.lock_name, self.lock_key)
+
+ try:
+ r = await self._inner_lock.__aexit__(exc_type, exc, tb)
+ finally:
+ self._lock_span.__exit__(exc_type, exc, tb)
+
+ return r
+
+ def _get_next_retry_interval(self) -> float:
+ next = self._retry_interval
+ self._retry_interval = max(5, next * 2)
+ return next * random.uniform(0.9, 1.1)
+
+
+@attr.s(auto_attribs=True, eq=False)
+class WaitingMultiLock:
+ lock_names: Collection[Tuple[str, str]]
+
+ write: bool
+
+ reactor: IReactorTime
+ store: LockStore
+ handler: WorkerLocksHandler
+
+ deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred)
+
+ _inner_lock_cm: Optional[AsyncContextManager] = None
+ _retry_interval: float = 0.1
+ _lock_span: "opentracing.Scope" = attr.Factory(
+ lambda: start_active_span("WaitingLock.lock")
+ )
+
+ async def __aenter__(self) -> None:
+ self._lock_span.__enter__()
+
+ with start_active_span("WaitingLock.waiting_for_lock"):
+ while self._inner_lock_cm is None:
+ self.deferred = defer.Deferred()
+
+ lock_cm = await self.store.try_acquire_multi_read_write_lock(
+ self.lock_names, write=self.write
+ )
+
+ if lock_cm:
+ self._inner_lock_cm = lock_cm
+ break
+
+ try:
+ # Wait until the we get notified the lock might have been
+ # released (by the deferred being resolved). We also
+ # periodically wake up in case the lock was released but we
+ # weren't notified.
+ with PreserveLoggingContext():
+ await timeout_deferred(
+ deferred=self.deferred,
+ timeout=self._get_next_retry_interval(),
+ reactor=self.reactor,
+ )
+ except Exception:
+ pass
+
+ assert self._inner_lock_cm
+ await self._inner_lock_cm.__aenter__()
+ return
+
+ async def __aexit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc: Optional[BaseException],
+ tb: Optional[TracebackType],
+ ) -> Optional[bool]:
+ assert self._inner_lock_cm
+
+ for lock_name, lock_key in self.lock_names:
+ self.handler.notify_lock_released(lock_name, lock_key)
+
+ try:
+ r = await self._inner_lock_cm.__aexit__(exc_type, exc, tb)
+ finally:
+ self._lock_span.__exit__(exc_type, exc, tb)
+
+ return r
+
+ def _get_next_retry_interval(self) -> float:
+ next = self._retry_interval
+ self._retry_interval = max(5, next * 2)
+ return next * random.uniform(0.9, 1.1)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 897272ad5b..68115bca70 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -234,6 +234,9 @@ class Notifier:
self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
+ # List of callbacks to be notified when a lock is released
+ self._lock_released_callback: List[Callable[[str, str, str], None]] = []
+
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
self._pusher_pool = hs.get_pusherpool()
@@ -785,6 +788,19 @@ class Notifier:
# that any in flight requests can be immediately retried.
self._federation_client.wake_destination(server)
+ def add_lock_released_callback(
+ self, callback: Callable[[str, str, str], None]
+ ) -> None:
+ """Add a function to be called whenever we are notified about a released lock."""
+ self._lock_released_callback.append(callback)
+
+ def notify_lock_released(
+ self, instance_name: str, lock_name: str, lock_key: str
+ ) -> None:
+ """Notify the callbacks that a lock has been released."""
+ for cb in self._lock_released_callback:
+ cb(instance_name, lock_name, lock_key)
+
@attr.s(auto_attribs=True)
class ReplicationNotifier:
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index f874f072f9..73f3de3642 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -107,8 +107,7 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on
the main process to accomplish this.
- Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
- Request format(borrowed and expanded from KeyUploadServlet):
+ Request format for this endpoint (borrowed and expanded from KeyUploadServlet):
POST /_synapse/replication/upload_keys_for_user
@@ -117,6 +116,7 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
"device_id": "<device_id>",
"keys": {
....this part can be found in KeyUploadServlet in rest/client/keys.py....
+ or as defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
}
}
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 32f52e54d8..10f5c98ff8 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -422,6 +422,36 @@ class RemoteServerUpCommand(_SimpleCommand):
NAME = "REMOTE_SERVER_UP"
+class LockReleasedCommand(Command):
+ """Sent to inform other instances that a given lock has been dropped.
+
+ Format::
+
+ LOCK_RELEASED ["<instance_name>", "<lock_name>", "<lock_key>"]
+ """
+
+ NAME = "LOCK_RELEASED"
+
+ def __init__(
+ self,
+ instance_name: str,
+ lock_name: str,
+ lock_key: str,
+ ):
+ self.instance_name = instance_name
+ self.lock_name = lock_name
+ self.lock_key = lock_key
+
+ @classmethod
+ def from_line(cls: Type["LockReleasedCommand"], line: str) -> "LockReleasedCommand":
+ instance_name, lock_name, lock_key = json_decoder.decode(line)
+
+ return cls(instance_name, lock_name, lock_key)
+
+ def to_line(self) -> str:
+ return json_encoder.encode([self.instance_name, self.lock_name, self.lock_key])
+
+
_COMMANDS: Tuple[Type[Command], ...] = (
ServerCommand,
RdataCommand,
@@ -435,6 +465,7 @@ _COMMANDS: Tuple[Type[Command], ...] = (
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
+ LockReleasedCommand,
)
# Map of command name to command type.
@@ -448,6 +479,7 @@ VALID_SERVER_COMMANDS = (
ErrorCommand.NAME,
PingCommand.NAME,
RemoteServerUpCommand.NAME,
+ LockReleasedCommand.NAME,
)
# The commands the client is allowed to send
@@ -461,6 +493,7 @@ VALID_CLIENT_COMMANDS = (
UserIpCommand.NAME,
ErrorCommand.NAME,
RemoteServerUpCommand.NAME,
+ LockReleasedCommand.NAME,
)
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 5d108fe11b..a2cabba7b1 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -39,6 +39,7 @@ from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
FederationAckCommand,
+ LockReleasedCommand,
PositionCommand,
RdataCommand,
RemoteServerUpCommand,
@@ -248,6 +249,9 @@ class ReplicationCommandHandler:
if self._is_master or self._should_insert_client_ips:
self.subscribe_to_channel("USER_IP")
+ if hs.config.redis.redis_enabled:
+ self._notifier.add_lock_released_callback(self.on_lock_released)
+
def subscribe_to_channel(self, channel_name: str) -> None:
"""
Indicates that we wish to subscribe to a Redis channel by name.
@@ -648,6 +652,17 @@ class ReplicationCommandHandler:
self._notifier.notify_remote_server_up(cmd.data)
+ def on_LOCK_RELEASED(
+ self, conn: IReplicationConnection, cmd: LockReleasedCommand
+ ) -> None:
+ """Called when we get a new LOCK_RELEASED command."""
+ if cmd.instance_name == self._instance_name:
+ return
+
+ self._notifier.notify_lock_released(
+ cmd.instance_name, cmd.lock_name, cmd.lock_key
+ )
+
def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
self._connections.append(connection)
@@ -754,6 +769,13 @@ class ReplicationCommandHandler:
"""
self.send_command(RdataCommand(stream_name, self._instance_name, token, data))
+ def on_lock_released(
+ self, instance_name: str, lock_name: str, lock_key: str
+ ) -> None:
+ """Called when we released a lock and should notify other instances."""
+ if instance_name == self._instance_name:
+ self.send_command(LockReleasedCommand(instance_name, lock_name, lock_key))
+
UpdateToken = TypeVar("UpdateToken")
UpdateRow = TypeVar("UpdateRow")
diff --git a/synapse/rest/client/room_upgrade_rest_servlet.py b/synapse/rest/client/room_upgrade_rest_servlet.py
index 6a7792e18b..4a5d9e13e7 100644
--- a/synapse/rest/client/room_upgrade_rest_servlet.py
+++ b/synapse/rest/client/room_upgrade_rest_servlet.py
@@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, Tuple
from synapse.api.errors import Codes, ShadowBanError, SynapseError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.http.server import HttpServer
from synapse.http.servlet import (
RestServlet,
@@ -60,6 +61,7 @@ class RoomUpgradeRestServlet(RestServlet):
self._hs = hs
self._room_creation_handler = hs.get_room_creation_handler()
self._auth = hs.get_auth()
+ self._worker_lock_handler = hs.get_worker_locks_handler()
async def on_POST(
self, request: SynapseRequest, room_id: str
@@ -78,9 +80,12 @@ class RoomUpgradeRestServlet(RestServlet):
)
try:
- new_room_id = await self._room_creation_handler.upgrade_room(
- requester, room_id, new_version
- )
+ async with self._worker_lock_handler.acquire_read_write_lock(
+ DELETE_ROOM_LOCK_NAME, room_id, write=False
+ ):
+ new_room_id = await self._room_creation_handler.upgrade_room(
+ requester, room_id, new_version
+ )
except ShadowBanError:
# Generate a random room ID.
new_room_id = stringutils.random_string(18)
diff --git a/synapse/server.py b/synapse/server.py
index b72b76a38b..8430f99ef2 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -107,6 +107,7 @@ from synapse.handlers.stats import StatsHandler
from synapse.handlers.sync import SyncHandler
from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
from synapse.handlers.user_directory import UserDirectoryHandler
+from synapse.handlers.worker_lock import WorkerLocksHandler
from synapse.http.client import (
InsecureInterceptableContextFactory,
ReplicationClient,
@@ -912,3 +913,7 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager:
"""Usage metrics shared between phone home stats and the prometheus exporter."""
return CommonUsageMetricsManager(self)
+
+ @cache_in_self
+ def get_worker_locks_handler(self) -> WorkerLocksHandler:
+ return WorkerLocksHandler(self)
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index 35c0680365..35cd1089d6 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -45,6 +45,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.opentracing import (
SynapseTags,
@@ -338,6 +339,7 @@ class EventsPersistenceStorageController:
)
self._state_resolution_handler = hs.get_state_resolution_handler()
self._state_controller = state_controller
+ self.hs = hs
async def _process_event_persist_queue_task(
self,
@@ -350,15 +352,22 @@ class EventsPersistenceStorageController:
A dictionary of event ID to event ID we didn't persist as we already
had another event persisted with the same TXN ID.
"""
- if isinstance(task, _PersistEventsTask):
- return await self._persist_event_batch(room_id, task)
- elif isinstance(task, _UpdateCurrentStateTask):
- await self._update_current_state(room_id, task)
- return {}
- else:
- raise AssertionError(
- f"Found an unexpected task type in event persistence queue: {task}"
- )
+
+ # Ensure that the room can't be deleted while we're persisting events to
+ # it. We might already have taken out the lock, but since this is just a
+ # "read" lock its inherently reentrant.
+ async with self.hs.get_worker_locks_handler().acquire_read_write_lock(
+ DELETE_ROOM_LOCK_NAME, room_id, write=False
+ ):
+ if isinstance(task, _PersistEventsTask):
+ return await self._persist_event_batch(room_id, task)
+ elif isinstance(task, _UpdateCurrentStateTask):
+ await self._update_current_state(room_id, task)
+ return {}
+ else:
+ raise AssertionError(
+ f"Found an unexpected task type in event persistence queue: {task}"
+ )
@trace
async def persist_events(
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index b2cda52ce5..534dc32413 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -843,7 +843,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
* because the schema change is in a background update, it's not
* necessarily safe to assume that it will have been completed.
*/
- AND edge.is_state is ? /* False */
+ AND edge.is_state is FALSE
/**
* We only want backwards extremities that are older than or at
* the same position of the given `current_depth` (where older
@@ -886,7 +886,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
sql,
(
room_id,
- False,
current_depth,
self._clock.time_msec(),
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 2b83a69426..bd3f14fb71 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1455,8 +1455,8 @@ class PersistEventsStore:
},
)
- sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
- txn.execute(sql, (False, event.event_id))
+ sql = "UPDATE events SET outlier = FALSE WHERE event_id = ?"
+ txn.execute(sql, (event.event_id,))
# Update the event_backward_extremities table now that this
# event isn't an outlier any more.
@@ -1549,13 +1549,13 @@ class PersistEventsStore:
for event, _ in events_and_contexts
if not event.internal_metadata.is_redacted()
]
- sql = "UPDATE redactions SET have_censored = ? WHERE "
+ sql = "UPDATE redactions SET have_censored = FALSE WHERE "
clause, args = make_in_list_sql_clause(
self.database_engine,
"redacts",
unredacted_events,
)
- txn.execute(sql + clause, [False] + args)
+ txn.execute(sql + clause, args)
self.db_pool.simple_insert_many_txn(
txn,
@@ -2318,14 +2318,14 @@ class PersistEventsStore:
" SELECT 1 FROM events"
" LEFT JOIN event_edges edge"
" ON edge.event_id = events.event_id"
- " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)"
+ " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
" )"
)
txn.execute_batch(
query,
[
- (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+ (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
for ev in events
for e_id in ev.prev_event_ids()
if not ev.internal_metadata.is_outlier()
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index c89b4f7919..1680bf6168 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -12,8 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from contextlib import AsyncExitStack
from types import TracebackType
-from typing import TYPE_CHECKING, Optional, Set, Tuple, Type
+from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
from weakref import WeakValueDictionary
from twisted.internet.interfaces import IReactorCore
@@ -208,76 +209,85 @@ class LockStore(SQLBaseStore):
used (otherwise the lock will leak).
"""
+ try:
+ lock = await self.db_pool.runInteraction(
+ "try_acquire_read_write_lock",
+ self._try_acquire_read_write_lock_txn,
+ lock_name,
+ lock_key,
+ write,
+ )
+ except self.database_engine.module.IntegrityError:
+ return None
+
+ return lock
+
+ def _try_acquire_read_write_lock_txn(
+ self,
+ txn: LoggingTransaction,
+ lock_name: str,
+ lock_key: str,
+ write: bool,
+ ) -> "Lock":
+ # We attempt to acquire the lock by inserting into
+ # `worker_read_write_locks` and seeing if that fails any
+ # constraints. If it doesn't then we have acquired the lock,
+ # otherwise we haven't.
+ #
+ # Before that though we clear the table of any stale locks.
+
now = self._clock.time_msec()
token = random_string(6)
- def _try_acquire_read_write_lock_txn(txn: LoggingTransaction) -> None:
- # We attempt to acquire the lock by inserting into
- # `worker_read_write_locks` and seeing if that fails any
- # constraints. If it doesn't then we have acquired the lock,
- # otherwise we haven't.
- #
- # Before that though we clear the table of any stale locks.
-
- delete_sql = """
- DELETE FROM worker_read_write_locks
- WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
- """
-
- insert_sql = """
- INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
- VALUES (?, ?, ?, ?, ?, ?)
- """
-
- if isinstance(self.database_engine, PostgresEngine):
- # For Postgres we can send these queries at the same time.
- txn.execute(
- delete_sql + ";" + insert_sql,
- (
- # DELETE args
- now - _LOCK_TIMEOUT_MS,
- lock_name,
- lock_key,
- # UPSERT args
- lock_name,
- lock_key,
- write,
- self._instance_name,
- token,
- now,
- ),
- )
- else:
- # For SQLite these need to be two queries.
- txn.execute(
- delete_sql,
- (
- now - _LOCK_TIMEOUT_MS,
- lock_name,
- lock_key,
- ),
- )
- txn.execute(
- insert_sql,
- (
- lock_name,
- lock_key,
- write,
- self._instance_name,
- token,
- now,
- ),
- )
+ delete_sql = """
+ DELETE FROM worker_read_write_locks
+ WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
+ """
- return
+ insert_sql = """
+ INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
+ VALUES (?, ?, ?, ?, ?, ?)
+ """
- try:
- await self.db_pool.runInteraction(
- "try_acquire_read_write_lock",
- _try_acquire_read_write_lock_txn,
+ if isinstance(self.database_engine, PostgresEngine):
+ # For Postgres we can send these queries at the same time.
+ txn.execute(
+ delete_sql + ";" + insert_sql,
+ (
+ # DELETE args
+ now - _LOCK_TIMEOUT_MS,
+ lock_name,
+ lock_key,
+ # UPSERT args
+ lock_name,
+ lock_key,
+ write,
+ self._instance_name,
+ token,
+ now,
+ ),
+ )
+ else:
+ # For SQLite these need to be two queries.
+ txn.execute(
+ delete_sql,
+ (
+ now - _LOCK_TIMEOUT_MS,
+ lock_name,
+ lock_key,
+ ),
+ )
+ txn.execute(
+ insert_sql,
+ (
+ lock_name,
+ lock_key,
+ write,
+ self._instance_name,
+ token,
+ now,
+ ),
)
- except self.database_engine.module.IntegrityError:
- return None
lock = Lock(
self._reactor,
@@ -289,10 +299,58 @@ class LockStore(SQLBaseStore):
token=token,
)
- self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock
+ def set_lock() -> None:
+ self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock
+
+ txn.call_after(set_lock)
return lock
+ async def try_acquire_multi_read_write_lock(
+ self,
+ lock_names: Collection[Tuple[str, str]],
+ write: bool,
+ ) -> Optional[AsyncExitStack]:
+ """Try to acquire multiple locks for the given names/keys. Will return
+ an async context manager if the locks are successfully acquired, which
+ *must* be used (otherwise the lock will leak).
+
+ If only a subset of the locks can be acquired then it will immediately
+ drop them and return `None`.
+ """
+ try:
+ locks = await self.db_pool.runInteraction(
+ "try_acquire_multi_read_write_lock",
+ self._try_acquire_multi_read_write_lock_txn,
+ lock_names,
+ write,
+ )
+ except self.database_engine.module.IntegrityError:
+ return None
+
+ stack = AsyncExitStack()
+
+ for lock in locks:
+ await stack.enter_async_context(lock)
+
+ return stack
+
+ def _try_acquire_multi_read_write_lock_txn(
+ self,
+ txn: LoggingTransaction,
+ lock_names: Collection[Tuple[str, str]],
+ write: bool,
+ ) -> Collection["Lock"]:
+ locks = []
+
+ for lock_name, lock_key in lock_names:
+ lock = self._try_acquire_read_write_lock_txn(
+ txn, lock_name, lock_key, write
+ )
+ locks.append(lock)
+
+ return locks
+
class Lock:
"""An async context manager that manages an acquired lock, ensuring it is
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 9773c1fcd2..b52f48cf04 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -249,12 +249,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# Mark all state and own events as outliers
logger.info("[purge] marking remaining events as outliers")
txn.execute(
- "UPDATE events SET outlier = ?"
+ "UPDATE events SET outlier = TRUE"
" WHERE event_id IN ("
- " SELECT event_id FROM events_to_purge "
- " WHERE NOT should_delete"
- ")",
- (True,),
+ " SELECT event_id FROM events_to_purge "
+ " WHERE NOT should_delete"
+ ")"
)
# synapse tries to take out an exclusive lock on room_depth whenever it
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index e098ceea3c..c13c0bc7d7 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -560,19 +560,19 @@ class PushRuleStore(PushRulesWorkerStore):
if isinstance(self.database_engine, PostgresEngine):
sql = """
INSERT INTO push_rules_enable (id, user_name, rule_id, enabled)
- VALUES (?, ?, ?, ?)
+ VALUES (?, ?, ?, 1)
ON CONFLICT DO NOTHING
"""
elif isinstance(self.database_engine, Sqlite3Engine):
sql = """
INSERT OR IGNORE INTO push_rules_enable (id, user_name, rule_id, enabled)
- VALUES (?, ?, ?, ?)
+ VALUES (?, ?, ?, 1)
"""
else:
raise RuntimeError("Unknown database engine")
new_enable_id = self._push_rules_enable_id_gen.get_next()
- txn.execute(sql, (new_enable_id, user_id, rule_id, 1))
+ txn.execute(sql, (new_enable_id, user_id, rule_id))
async def delete_push_rule(self, user_id: str, rule_id: str) -> None:
"""
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 676d03bb7e..c582cf0573 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -454,9 +454,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
) -> List[Tuple[str, int]]:
sql = (
"SELECT user_id, expiration_ts_ms FROM account_validity"
- " WHERE email_sent = ? AND (expiration_ts_ms - ?) <= ?"
+ " WHERE email_sent = FALSE AND (expiration_ts_ms - ?) <= ?"
)
- values = [False, now_ms, renew_at]
+ values = [now_ms, renew_at]
txn.execute(sql, values)
return cast(List[Tuple[str, int]], txn.fetchall())
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 830658f328..719e11aea6 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -936,11 +936,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
JOIN event_json USING (room_id, event_id)
WHERE room_id = ?
%(where_clause)s
- AND contains_url = ? AND outlier = ?
+ AND contains_url = TRUE AND outlier = FALSE
ORDER BY stream_ordering DESC
LIMIT ?
"""
- txn.execute(sql % {"where_clause": ""}, (room_id, True, False, 100))
+ txn.execute(sql % {"where_clause": ""}, (room_id, 100))
local_media_mxcs = []
remote_media_mxcs = []
@@ -976,7 +976,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
txn.execute(
sql % {"where_clause": "AND stream_ordering < ?"},
- (room_id, next_token, True, False, 100),
+ (room_id, next_token, 100),
)
return local_media_mxcs, remote_media_mxcs
@@ -1086,9 +1086,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
# set quarantine
if quarantined_by is not None:
- sql += "AND safe_from_quarantine = ?"
+ sql += "AND safe_from_quarantine = FALSE"
txn.executemany(
- sql, [(quarantined_by, media_id, False) for media_id in local_mxcs]
+ sql, [(quarantined_by, media_id) for media_id in local_mxcs]
)
# remove from quarantine
else:
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 92cbe262a6..5a3611c415 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1401,7 +1401,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
`to_token`), or `limit` is zero.
"""
- args = [False, room_id]
+ args: List[Any] = [room_id]
order, from_bound, to_bound = generate_pagination_bounds(
direction, from_token, to_token
@@ -1475,7 +1475,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
event.topological_ordering, event.stream_ordering
FROM events AS event
%(join_clause)s
- WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s
+ WHERE event.outlier = FALSE AND event.room_id = ? AND %(bounds)s
ORDER BY event.topological_ordering %(order)s,
event.stream_ordering %(order)s LIMIT ?
""" % {
diff --git a/tests/appservice/test_api.py b/tests/appservice/test_api.py
index 15fce165b6..807dc2f21c 100644
--- a/tests/appservice/test_api.py
+++ b/tests/appservice/test_api.py
@@ -16,7 +16,6 @@ from unittest.mock import Mock
from twisted.test.proto_helpers import MemoryReactor
-from synapse.api.errors import HttpResponseException
from synapse.appservice import ApplicationService
from synapse.server import HomeServer
from synapse.types import JsonDict
@@ -107,58 +106,6 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase):
self.assertEqual(self.request_url, URL_LOCATION)
self.assertEqual(result, SUCCESS_RESULT_LOCATION)
- def test_fallback(self) -> None:
- """
- Tests that the fallback to legacy URLs works.
- """
- SUCCESS_RESULT_USER = [
- {
- "protocol": PROTOCOL,
- "userid": "@a:user",
- "fields": {
- "more": "fields",
- },
- }
- ]
-
- URL_USER = f"{URL}/_matrix/app/v1/thirdparty/user/{PROTOCOL}"
- FALLBACK_URL_USER = f"{URL}/_matrix/app/unstable/thirdparty/user/{PROTOCOL}"
-
- self.request_url = None
- self.v1_seen = False
-
- async def get_json(
- url: str,
- args: Mapping[Any, Any],
- headers: Mapping[Union[str, bytes], Sequence[Union[str, bytes]]],
- ) -> List[JsonDict]:
- # Ensure the access token is passed as both a header and query arg.
- if not headers.get("Authorization") or not args.get(b"access_token"):
- raise RuntimeError("Access token not provided")
-
- self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"])
- self.assertEqual(args.get(b"access_token"), TOKEN)
- self.request_url = url
- if url == URL_USER:
- self.v1_seen = True
- raise HttpResponseException(404, "NOT_FOUND", b"NOT_FOUND")
- elif url == FALLBACK_URL_USER:
- return SUCCESS_RESULT_USER
- else:
- raise RuntimeError(
- "URL provided was invalid. This should never be seen."
- )
-
- # We assign to a method, which mypy doesn't like.
- self.api.get_json = Mock(side_effect=get_json) # type: ignore[assignment]
-
- result = self.get_success(
- self.api.query_3pe(self.service, "user", PROTOCOL, {b"some": [b"field"]})
- )
- self.assertTrue(self.v1_seen)
- self.assertEqual(self.request_url, FALLBACK_URL_USER)
- self.assertEqual(result, SUCCESS_RESULT_USER)
-
def test_claim_keys(self) -> None:
"""
Tests that the /keys/claim response is properly parsed for missing
diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py
index 196ceb0b82..ec2f5d30be 100644
--- a/tests/handlers/test_profile.py
+++ b/tests/handlers/test_profile.py
@@ -179,6 +179,16 @@ class ProfileTestCase(unittest.HomeserverTestCase):
self.assertEqual("http://my.server/me.png", avatar_url)
+ def test_get_profile_empty_displayname(self) -> None:
+ self.get_success(self.store.set_profile_displayname(self.frank, None))
+ self.get_success(
+ self.store.set_profile_avatar_url(self.frank, "http://my.server/me.png")
+ )
+
+ profile = self.get_success(self.handler.get_profile(self.frank.to_string()))
+
+ self.assertEqual("http://my.server/me.png", profile["avatar_url"])
+
def test_set_my_avatar(self) -> None:
self.get_success(
self.handler.set_avatar_url(
diff --git a/tests/handlers/test_worker_lock.py b/tests/handlers/test_worker_lock.py
new file mode 100644
index 0000000000..73e548726c
--- /dev/null
+++ b/tests/handlers/test_worker_lock.py
@@ -0,0 +1,74 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.server import HomeServer
+from synapse.util import Clock
+
+from tests import unittest
+from tests.replication._base import BaseMultiWorkerStreamTestCase
+
+
+class WorkerLockTestCase(unittest.HomeserverTestCase):
+ def prepare(
+ self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
+ ) -> None:
+ self.worker_lock_handler = self.hs.get_worker_locks_handler()
+
+ def test_wait_for_lock_locally(self) -> None:
+ """Test waiting for a lock on a single worker"""
+
+ lock1 = self.worker_lock_handler.acquire_lock("name", "key")
+ self.get_success(lock1.__aenter__())
+
+ lock2 = self.worker_lock_handler.acquire_lock("name", "key")
+ d2 = defer.ensureDeferred(lock2.__aenter__())
+ self.assertNoResult(d2)
+
+ self.get_success(lock1.__aexit__(None, None, None))
+
+ self.get_success(d2)
+ self.get_success(lock2.__aexit__(None, None, None))
+
+
+class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase):
+ def prepare(
+ self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
+ ) -> None:
+ self.main_worker_lock_handler = self.hs.get_worker_locks_handler()
+
+ def test_wait_for_lock_worker(self) -> None:
+ """Test waiting for a lock on another worker"""
+
+ worker = self.make_worker_hs(
+ "synapse.app.generic_worker",
+ extra_config={
+ "redis": {"enabled": True},
+ },
+ )
+ worker_lock_handler = worker.get_worker_locks_handler()
+
+ lock1 = self.main_worker_lock_handler.acquire_lock("name", "key")
+ self.get_success(lock1.__aenter__())
+
+ lock2 = worker_lock_handler.acquire_lock("name", "key")
+ d2 = defer.ensureDeferred(lock2.__aenter__())
+ self.assertNoResult(d2)
+
+ self.get_success(lock1.__aexit__(None, None, None))
+
+ self.get_success(d2)
+ self.get_success(lock2.__aexit__(None, None, None))
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index d013e75d55..4f6347be15 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -711,7 +711,7 @@ class RoomsCreateTestCase(RoomBase):
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
self.assertTrue("room_id" in channel.json_body)
assert channel.resource_usage is not None
- self.assertEqual(30, channel.resource_usage.db_txn_count)
+ self.assertEqual(32, channel.resource_usage.db_txn_count)
def test_post_room_initial_state(self) -> None:
# POST with initial_state config key, expect new room id
@@ -724,7 +724,7 @@ class RoomsCreateTestCase(RoomBase):
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
self.assertTrue("room_id" in channel.json_body)
assert channel.resource_usage is not None
- self.assertEqual(32, channel.resource_usage.db_txn_count)
+ self.assertEqual(34, channel.resource_usage.db_txn_count)
def test_post_room_visibility_key(self) -> None:
# POST with visibility config key, expect new room id
diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py
index ad454f6dd8..383da83dfb 100644
--- a/tests/storage/databases/main/test_lock.py
+++ b/tests/storage/databases/main/test_lock.py
@@ -448,3 +448,55 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase):
self.get_success(self.store._on_shutdown())
self.assertEqual(self.store._live_read_write_lock_tokens, {})
+
+ def test_acquire_multiple_locks(self) -> None:
+ """Tests that acquiring multiple locks at once works."""
+
+ # Take out multiple locks and ensure that we can't get those locks out
+ # again.
+ lock = self.get_success(
+ self.store.try_acquire_multi_read_write_lock(
+ [("name1", "key1"), ("name2", "key2")], write=True
+ )
+ )
+ self.assertIsNotNone(lock)
+
+ assert lock is not None
+ self.get_success(lock.__aenter__())
+
+ lock2 = self.get_success(
+ self.store.try_acquire_read_write_lock("name1", "key1", write=True)
+ )
+ self.assertIsNone(lock2)
+
+ lock3 = self.get_success(
+ self.store.try_acquire_read_write_lock("name2", "key2", write=False)
+ )
+ self.assertIsNone(lock3)
+
+ # Overlapping locks attempts will fail, and won't lock any locks.
+ lock4 = self.get_success(
+ self.store.try_acquire_multi_read_write_lock(
+ [("name1", "key1"), ("name3", "key3")], write=True
+ )
+ )
+ self.assertIsNone(lock4)
+
+ lock5 = self.get_success(
+ self.store.try_acquire_read_write_lock("name3", "key3", write=True)
+ )
+ self.assertIsNotNone(lock5)
+ assert lock5 is not None
+ self.get_success(lock5.__aenter__())
+ self.get_success(lock5.__aexit__(None, None, None))
+
+ # Once we release the lock we can take out the locks again.
+ self.get_success(lock.__aexit__(None, None, None))
+
+ lock6 = self.get_success(
+ self.store.try_acquire_read_write_lock("name1", "key1", write=True)
+ )
+ self.assertIsNotNone(lock6)
+ assert lock6 is not None
+ self.get_success(lock6.__aenter__())
+ self.get_success(lock6.__aexit__(None, None, None))
|