summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/docker.yml14
-rw-r--r--Cargo.lock12
-rw-r--r--changelog.d/15525.misc1
-rw-r--r--changelog.d/15791.bugfix1
-rw-r--r--changelog.d/15964.removal1
-rw-r--r--changelog.d/15972.docker1
-rw-r--r--changelog.d/16009.docker1
-rw-r--r--changelog.d/16011.misc1
-rw-r--r--changelog.d/16012.bugfix1
-rw-r--r--changelog.d/16016.doc2
-rw-r--r--changelog.d/16023.misc1
-rw-r--r--changelog.d/16027.doc1
-rw-r--r--docs/reverse_proxy.md2
-rw-r--r--poetry.lock122
-rw-r--r--synapse/appservice/api.py82
-rw-r--r--synapse/federation/federation_server.py17
-rw-r--r--synapse/handlers/message.py77
-rw-r--r--synapse/handlers/pagination.py23
-rw-r--r--synapse/handlers/profile.py2
-rw-r--r--synapse/handlers/room_member.py73
-rw-r--r--synapse/handlers/worker_lock.py333
-rw-r--r--synapse/notifier.py16
-rw-r--r--synapse/replication/http/devices.py4
-rw-r--r--synapse/replication/tcp/commands.py33
-rw-r--r--synapse/replication/tcp/handler.py22
-rw-r--r--synapse/rest/client/room_upgrade_rest_servlet.py11
-rw-r--r--synapse/server.py5
-rw-r--r--synapse/storage/controllers/persist_events.py27
-rw-r--r--synapse/storage/databases/main/event_federation.py3
-rw-r--r--synapse/storage/databases/main/events.py12
-rw-r--r--synapse/storage/databases/main/lock.py190
-rw-r--r--synapse/storage/databases/main/purge_events.py9
-rw-r--r--synapse/storage/databases/main/push_rule.py6
-rw-r--r--synapse/storage/databases/main/registration.py4
-rw-r--r--synapse/storage/databases/main/room.py10
-rw-r--r--synapse/storage/databases/main/stream.py4
-rw-r--r--tests/appservice/test_api.py53
-rw-r--r--tests/handlers/test_profile.py10
-rw-r--r--tests/handlers/test_worker_lock.py74
-rw-r--r--tests/rest/client/test_rooms.py4
-rw-r--r--tests/storage/databases/main/test_lock.py52
41 files changed, 956 insertions, 361 deletions
diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml
index 602f5e1759..8a69dc4986 100644
--- a/.github/workflows/docker.yml
+++ b/.github/workflows/docker.yml
@@ -29,6 +29,16 @@ jobs:
       - name: Inspect builder
         run: docker buildx inspect
 
+      - name: Checkout repository
+        uses: actions/checkout@v3
+
+      - name: Extract version from pyproject.toml
+        # Note: explicitly requesting bash will mean bash is invoked with `-eo pipefail`, see
+        # https://docs.github.com/en/actions/using-workflows/workflow-syntax-for-github-actions#jobsjob_idstepsshell
+        shell: bash
+        run: |
+          echo "SYNAPSE_VERSION=$(grep "^version" pyproject.toml | sed -E 's/version\s*=\s*["]([^"]*)["]/\1/')" >> $GITHUB_ENV
+
       - name: Log in to DockerHub
         uses: docker/login-action@v2
         with:
@@ -61,7 +71,9 @@ jobs:
         uses: docker/build-push-action@v4
         with:
           push: true
-          labels: "gitsha1=${{ github.sha }}"
+          labels: |
+            gitsha1=${{ github.sha }}
+            org.opencontainers.image.version=${{ env.SYNAPSE_VERSION }}
           tags: "${{ steps.set-tag.outputs.tags }}"
           file: "docker/Dockerfile"
           platforms: linux/amd64,linux/arm64
diff --git a/Cargo.lock b/Cargo.lock
index 2264e67245..d28fe3c228 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -332,18 +332,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
 
 [[package]]
 name = "serde"
-version = "1.0.171"
+version = "1.0.179"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9"
+checksum = "0a5bf42b8d227d4abf38a1ddb08602e229108a517cd4e5bb28f9c7eaafdce5c0"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.171"
+version = "1.0.179"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682"
+checksum = "741e124f5485c7e60c03b043f79f320bff3527f4bbf12cf3831750dc46a0ec2c"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -352,9 +352,9 @@ dependencies = [
 
 [[package]]
 name = "serde_json"
-version = "1.0.103"
+version = "1.0.104"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "d03b412469450d4404fe8499a268edd7f8b79fecb074b0d812ad64ca21f4031b"
+checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c"
 dependencies = [
  "itoa",
  "ryu",
diff --git a/changelog.d/15525.misc b/changelog.d/15525.misc
new file mode 100644
index 0000000000..67ab0cf62f
--- /dev/null
+++ b/changelog.d/15525.misc
@@ -0,0 +1 @@
+Update SQL queries to inline boolean parameters as supported in SQLite 3.27.
diff --git a/changelog.d/15791.bugfix b/changelog.d/15791.bugfix
new file mode 100644
index 0000000000..182634b62f
--- /dev/null
+++ b/changelog.d/15791.bugfix
@@ -0,0 +1 @@
+Fix bug where purging history and paginating simultaneously could lead to database corruption when using workers.
diff --git a/changelog.d/15964.removal b/changelog.d/15964.removal
new file mode 100644
index 0000000000..7613afe505
--- /dev/null
+++ b/changelog.d/15964.removal
@@ -0,0 +1 @@
+Remove support for legacy application service paths.
diff --git a/changelog.d/15972.docker b/changelog.d/15972.docker
new file mode 100644
index 0000000000..7fd9707deb
--- /dev/null
+++ b/changelog.d/15972.docker
@@ -0,0 +1 @@
+Add `org.opencontainers.image.version` labels to Docker containers [published by Matrix.org](https://hub.docker.com/r/matrixdotorg/synapse). Contributed by Mo Balaa.
diff --git a/changelog.d/16009.docker b/changelog.d/16009.docker
new file mode 100644
index 0000000000..7fd9707deb
--- /dev/null
+++ b/changelog.d/16009.docker
@@ -0,0 +1 @@
+Add `org.opencontainers.image.version` labels to Docker containers [published by Matrix.org](https://hub.docker.com/r/matrixdotorg/synapse). Contributed by Mo Balaa.
diff --git a/changelog.d/16011.misc b/changelog.d/16011.misc
new file mode 100644
index 0000000000..8a8d9822c6
--- /dev/null
+++ b/changelog.d/16011.misc
@@ -0,0 +1 @@
+Update PyYAML to 6.0.1.
diff --git a/changelog.d/16012.bugfix b/changelog.d/16012.bugfix
new file mode 100644
index 0000000000..44ca9377ff
--- /dev/null
+++ b/changelog.d/16012.bugfix
@@ -0,0 +1 @@
+Fix 404 not found code returned on profile endpoint when the display name is empty but not the avatar URL.
diff --git a/changelog.d/16016.doc b/changelog.d/16016.doc
new file mode 100644
index 0000000000..e677058c2d
--- /dev/null
+++ b/changelog.d/16016.doc
@@ -0,0 +1,2 @@
+Clarify comment on the keys/upload over replication enpoint.
+
diff --git a/changelog.d/16023.misc b/changelog.d/16023.misc
new file mode 100644
index 0000000000..ee732318e4
--- /dev/null
+++ b/changelog.d/16023.misc
@@ -0,0 +1 @@
+Combine duplicated code.
diff --git a/changelog.d/16027.doc b/changelog.d/16027.doc
new file mode 100644
index 0000000000..201e88d6b6
--- /dev/null
+++ b/changelog.d/16027.doc
@@ -0,0 +1 @@
+Do not expose Admin API in caddy reverse proxy example. Contributed by @NilsIrl.
diff --git a/docs/reverse_proxy.md b/docs/reverse_proxy.md
index 06337e7c00..fe9519b4b6 100644
--- a/docs/reverse_proxy.md
+++ b/docs/reverse_proxy.md
@@ -95,7 +95,7 @@ matrix.example.com {
 }
 
 example.com:8448 {
-  reverse_proxy localhost:8008
+  reverse_proxy /_matrix/* localhost:8008
 }
 ```
 
diff --git a/poetry.lock b/poetry.lock
index d5b30a11c4..94baffe496 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -824,13 +824,13 @@ files = [
 
 [[package]]
 name = "immutabledict"
-version = "2.2.4"
+version = "3.0.0"
 description = "Immutable wrapper around dictionaries (a fork of frozendict)"
 optional = false
-python-versions = ">=3.7,<4.0"
+python-versions = ">=3.8,<4.0"
 files = [
-    {file = "immutabledict-2.2.4-py3-none-any.whl", hash = "sha256:c827715c147d2364522f9a7709cc424c7001015274a3c705250e673605bde64b"},
-    {file = "immutabledict-2.2.4.tar.gz", hash = "sha256:3bedc0741faaa2846f6edf5c29183f993da3abaff6a5961bb70a5659bb9e68ab"},
+    {file = "immutabledict-3.0.0-py3-none-any.whl", hash = "sha256:034bacc6c6872707c4ec0ea9515de6bbe0dcf0fcabd97ae19fd4e4c338f05798"},
+    {file = "immutabledict-3.0.0.tar.gz", hash = "sha256:5a23cd369a6187f76a8c29d7d687980b092538eb9800e58964603f1b973c56fe"},
 ]
 
 [[package]]
@@ -2072,51 +2072,51 @@ files = [
 
 [[package]]
 name = "pyyaml"
-version = "6.0"
+version = "6.0.1"
 description = "YAML parser and emitter for Python"
 optional = false
 python-versions = ">=3.6"
 files = [
-    {file = "PyYAML-6.0-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d4db7c7aef085872ef65a8fd7d6d09a14ae91f691dec3e87ee5ee0539d516f53"},
-    {file = "PyYAML-6.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:9df7ed3b3d2e0ecfe09e14741b857df43adb5a3ddadc919a2d94fbdf78fea53c"},
-    {file = "PyYAML-6.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:77f396e6ef4c73fdc33a9157446466f1cff553d979bd00ecb64385760c6babdc"},
-    {file = "PyYAML-6.0-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:a80a78046a72361de73f8f395f1f1e49f956c6be882eed58505a15f3e430962b"},
-    {file = "PyYAML-6.0-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f84fbc98b019fef2ee9a1cb3ce93e3187a6df0b2538a651bfb890254ba9f90b5"},
-    {file = "PyYAML-6.0-cp310-cp310-win32.whl", hash = "sha256:2cd5df3de48857ed0544b34e2d40e9fac445930039f3cfe4bcc592a1f836d513"},
-    {file = "PyYAML-6.0-cp310-cp310-win_amd64.whl", hash = "sha256:daf496c58a8c52083df09b80c860005194014c3698698d1a57cbcfa182142a3a"},
-    {file = "PyYAML-6.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:d4b0ba9512519522b118090257be113b9468d804b19d63c71dbcf4a48fa32358"},
-    {file = "PyYAML-6.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:81957921f441d50af23654aa6c5e5eaf9b06aba7f0a19c18a538dc7ef291c5a1"},
-    {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:afa17f5bc4d1b10afd4466fd3a44dc0e245382deca5b3c353d8b757f9e3ecb8d"},
-    {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dbad0e9d368bb989f4515da330b88a057617d16b6a8245084f1b05400f24609f"},
-    {file = "PyYAML-6.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:432557aa2c09802be39460360ddffd48156e30721f5e8d917f01d31694216782"},
-    {file = "PyYAML-6.0-cp311-cp311-win32.whl", hash = "sha256:bfaef573a63ba8923503d27530362590ff4f576c626d86a9fed95822a8255fd7"},
-    {file = "PyYAML-6.0-cp311-cp311-win_amd64.whl", hash = "sha256:01b45c0191e6d66c470b6cf1b9531a771a83c1c4208272ead47a3ae4f2f603bf"},
-    {file = "PyYAML-6.0-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:897b80890765f037df3403d22bab41627ca8811ae55e9a722fd0392850ec4d86"},
-    {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50602afada6d6cbfad699b0c7bb50d5ccffa7e46a3d738092afddc1f9758427f"},
-    {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:48c346915c114f5fdb3ead70312bd042a953a8ce5c7106d5bfb1a5254e47da92"},
-    {file = "PyYAML-6.0-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:98c4d36e99714e55cfbaaee6dd5badbc9a1ec339ebfc3b1f52e293aee6bb71a4"},
-    {file = "PyYAML-6.0-cp36-cp36m-win32.whl", hash = "sha256:0283c35a6a9fbf047493e3a0ce8d79ef5030852c51e9d911a27badfde0605293"},
-    {file = "PyYAML-6.0-cp36-cp36m-win_amd64.whl", hash = "sha256:07751360502caac1c067a8132d150cf3d61339af5691fe9e87803040dbc5db57"},
-    {file = "PyYAML-6.0-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:819b3830a1543db06c4d4b865e70ded25be52a2e0631ccd2f6a47a2822f2fd7c"},
-    {file = "PyYAML-6.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:473f9edb243cb1935ab5a084eb238d842fb8f404ed2193a915d1784b5a6b5fc0"},
-    {file = "PyYAML-6.0-cp37-cp37m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:0ce82d761c532fe4ec3f87fc45688bdd3a4c1dc5e0b4a19814b9009a29baefd4"},
-    {file = "PyYAML-6.0-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:231710d57adfd809ef5d34183b8ed1eeae3f76459c18fb4a0b373ad56bedcdd9"},
-    {file = "PyYAML-6.0-cp37-cp37m-win32.whl", hash = "sha256:c5687b8d43cf58545ade1fe3e055f70eac7a5a1a0bf42824308d868289a95737"},
-    {file = "PyYAML-6.0-cp37-cp37m-win_amd64.whl", hash = "sha256:d15a181d1ecd0d4270dc32edb46f7cb7733c7c508857278d3d378d14d606db2d"},
-    {file = "PyYAML-6.0-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:0b4624f379dab24d3725ffde76559cff63d9ec94e1736b556dacdfebe5ab6d4b"},
-    {file = "PyYAML-6.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:213c60cd50106436cc818accf5baa1aba61c0189ff610f64f4a3e8c6726218ba"},
-    {file = "PyYAML-6.0-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:9fa600030013c4de8165339db93d182b9431076eb98eb40ee068700c9c813e34"},
-    {file = "PyYAML-6.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:277a0ef2981ca40581a47093e9e2d13b3f1fbbeffae064c1d21bfceba2030287"},
-    {file = "PyYAML-6.0-cp38-cp38-win32.whl", hash = "sha256:d4eccecf9adf6fbcc6861a38015c2a64f38b9d94838ac1810a9023a0609e1b78"},
-    {file = "PyYAML-6.0-cp38-cp38-win_amd64.whl", hash = "sha256:1e4747bc279b4f613a09eb64bba2ba602d8a6664c6ce6396a4d0cd413a50ce07"},
-    {file = "PyYAML-6.0-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:055d937d65826939cb044fc8c9b08889e8c743fdc6a32b33e2390f66013e449b"},
-    {file = "PyYAML-6.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:e61ceaab6f49fb8bdfaa0f92c4b57bcfbea54c09277b1b4f7ac376bfb7a7c174"},
-    {file = "PyYAML-6.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d67d839ede4ed1b28a4e8909735fc992a923cdb84e618544973d7dfc71540803"},
-    {file = "PyYAML-6.0-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:cba8c411ef271aa037d7357a2bc8f9ee8b58b9965831d9e51baf703280dc73d3"},
-    {file = "PyYAML-6.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:40527857252b61eacd1d9af500c3337ba8deb8fc298940291486c465c8b46ec0"},
-    {file = "PyYAML-6.0-cp39-cp39-win32.whl", hash = "sha256:b5b9eccad747aabaaffbc6064800670f0c297e52c12754eb1d976c57e4f74dcb"},
-    {file = "PyYAML-6.0-cp39-cp39-win_amd64.whl", hash = "sha256:b3d267842bf12586ba6c734f89d1f5b871df0273157918b0ccefa29deb05c21c"},
-    {file = "PyYAML-6.0.tar.gz", hash = "sha256:68fb519c14306fec9720a2a5b45bc9f0c8d1b9c72adf45c37baedfcd949c35a2"},
+    {file = "PyYAML-6.0.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:d858aa552c999bc8a8d57426ed01e40bef403cd8ccdd0fc5f6f04a00414cac2a"},
+    {file = "PyYAML-6.0.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:fd66fc5d0da6d9815ba2cebeb4205f95818ff4b79c3ebe268e75d961704af52f"},
+    {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"},
+    {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"},
+    {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"},
+    {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"},
+    {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"},
+    {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"},
+    {file = "PyYAML-6.0.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:f003ed9ad21d6a4713f0a9b5a7a0a79e08dd0f221aff4525a2be4c346ee60aab"},
+    {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"},
+    {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"},
+    {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"},
+    {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"},
+    {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"},
+    {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"},
+    {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"},
+    {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"},
+    {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:afd7e57eddb1a54f0f1a974bc4391af8bcce0b444685d936840f125cf046d5bd"},
+    {file = "PyYAML-6.0.1-cp36-cp36m-win32.whl", hash = "sha256:fca0e3a251908a499833aa292323f32437106001d436eca0e6e7833256674585"},
+    {file = "PyYAML-6.0.1-cp36-cp36m-win_amd64.whl", hash = "sha256:f22ac1c3cac4dbc50079e965eba2c1058622631e526bd9afd45fedd49ba781fa"},
+    {file = "PyYAML-6.0.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:b1275ad35a5d18c62a7220633c913e1b42d44b46ee12554e5fd39c70a243d6a3"},
+    {file = "PyYAML-6.0.1-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:18aeb1bf9a78867dc38b259769503436b7c72f7a1f1f4c93ff9a17de54319b27"},
+    {file = "PyYAML-6.0.1-cp37-cp37m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:596106435fa6ad000c2991a98fa58eeb8656ef2325d7e158344fb33864ed87e3"},
+    {file = "PyYAML-6.0.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:baa90d3f661d43131ca170712d903e6295d1f7a0f595074f151c0aed377c9b9c"},
+    {file = "PyYAML-6.0.1-cp37-cp37m-win32.whl", hash = "sha256:9046c58c4395dff28dd494285c82ba00b546adfc7ef001486fbf0324bc174fba"},
+    {file = "PyYAML-6.0.1-cp37-cp37m-win_amd64.whl", hash = "sha256:4fb147e7a67ef577a588a0e2c17b6db51dda102c71de36f8549b6816a96e1867"},
+    {file = "PyYAML-6.0.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:1d4c7e777c441b20e32f52bd377e0c409713e8bb1386e1099c2415f26e479595"},
+    {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"},
+    {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"},
+    {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"},
+    {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"},
+    {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"},
+    {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"},
+    {file = "PyYAML-6.0.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:c8098ddcc2a85b61647b2590f825f3db38891662cfc2fc776415143f599bb859"},
+    {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"},
+    {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"},
+    {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"},
+    {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"},
+    {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"},
+    {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"},
 ]
 
 [[package]]
@@ -2427,13 +2427,13 @@ tornado = ["tornado (>=5)"]
 
 [[package]]
 name = "service-identity"
-version = "21.1.0"
+version = "23.1.0"
 description = "Service identity verification for pyOpenSSL & cryptography."
 optional = false
-python-versions = "*"
+python-versions = ">=3.8"
 files = [
-    {file = "service-identity-21.1.0.tar.gz", hash = "sha256:6e6c6086ca271dc11b033d17c3a8bea9f24ebff920c587da090afc9519419d34"},
-    {file = "service_identity-21.1.0-py2.py3-none-any.whl", hash = "sha256:f0b0caac3d40627c3c04d7a51b6e06721857a0e10a8775f2d1d7e72901b3a7db"},
+    {file = "service_identity-23.1.0-py3-none-any.whl", hash = "sha256:87415a691d52fcad954a500cb81f424d0273f8e7e3ee7d766128f4575080f383"},
+    {file = "service_identity-23.1.0.tar.gz", hash = "sha256:ecb33cd96307755041e978ab14f8b14e13b40f1fbd525a4dc78f46d2b986431d"},
 ]
 
 [package.dependencies]
@@ -2441,12 +2441,12 @@ attrs = ">=19.1.0"
 cryptography = "*"
 pyasn1 = "*"
 pyasn1-modules = "*"
-six = "*"
 
 [package.extras]
-dev = ["coverage[toml] (>=5.0.2)", "furo", "idna", "pyOpenSSL", "pytest", "sphinx"]
-docs = ["furo", "sphinx"]
+dev = ["pyopenssl", "service-identity[docs,idna,mypy,tests]"]
+docs = ["furo", "myst-parser", "pyopenssl", "sphinx", "sphinx-notfound-page"]
 idna = ["idna"]
+mypy = ["idna", "mypy", "types-pyopenssl"]
 tests = ["coverage[toml] (>=5.0.2)", "pytest"]
 
 [[package]]
@@ -2947,35 +2947,35 @@ files = [
 
 [[package]]
 name = "types-commonmark"
-version = "0.9.2.3"
+version = "0.9.2.4"
 description = "Typing stubs for commonmark"
 optional = false
 python-versions = "*"
 files = [
-    {file = "types-commonmark-0.9.2.3.tar.gz", hash = "sha256:42769a2c194fd5b49fd9eedfd4a83cd1d2514c6d0a36f00f5c5ffe0b6a2d2fcf"},
-    {file = "types_commonmark-0.9.2.3-py3-none-any.whl", hash = "sha256:b575156e1b8a292d43acb36f861110b85c4bc7aa53bbfb5ac64addec15d18cfa"},
+    {file = "types-commonmark-0.9.2.4.tar.gz", hash = "sha256:2c6486f65735cf18215cca3e962b17787fa545be279306f79b801f64a5319959"},
+    {file = "types_commonmark-0.9.2.4-py3-none-any.whl", hash = "sha256:d5090fa685c3e3c0ec3a5973ff842000baef6d86f762d52209b3c5e9fbd0b555"},
 ]
 
 [[package]]
 name = "types-jsonschema"
-version = "4.17.0.8"
+version = "4.17.0.10"
 description = "Typing stubs for jsonschema"
 optional = false
 python-versions = "*"
 files = [
-    {file = "types-jsonschema-4.17.0.8.tar.gz", hash = "sha256:96a56990910f405e62de58862c0bbb3ac29ee6dba6d3d99aa0ba7f874cc547de"},
-    {file = "types_jsonschema-4.17.0.8-py3-none-any.whl", hash = "sha256:f5958eb7b53217dfb5125f0412aeaef226a8a9013eac95816c95b5b523f6796b"},
+    {file = "types-jsonschema-4.17.0.10.tar.gz", hash = "sha256:8e979db34d69bc9f9b3d6e8b89bdbc60b3a41cfce4e1fb87bf191d205c7f5098"},
+    {file = "types_jsonschema-4.17.0.10-py3-none-any.whl", hash = "sha256:3aa2a89afbd9eaa6ce0c15618b36f02692a621433889ce73014656f7d8caf971"},
 ]
 
 [[package]]
 name = "types-netaddr"
-version = "0.8.0.8"
+version = "0.8.0.9"
 description = "Typing stubs for netaddr"
 optional = false
 python-versions = "*"
 files = [
-    {file = "types-netaddr-0.8.0.8.tar.gz", hash = "sha256:db7e8cd16b1244e7c4541edd0df99d1039fc05fd5387c21840f0b958fc52aabc"},
-    {file = "types_netaddr-0.8.0.8-py3-none-any.whl", hash = "sha256:6741b3824e2ec3f7a74842b394439b71107c7675f8ae42bb2b5e7a8ebfe8cf18"},
+    {file = "types-netaddr-0.8.0.9.tar.gz", hash = "sha256:68900c267fd31627c1721c5c52b32a257657ac2777457dca49b6b096ba2faf74"},
+    {file = "types_netaddr-0.8.0.9-py3-none-any.whl", hash = "sha256:63e871f064cd59473cec1177f372526f0fa3d565050247d5305bdc325be5c3f6"},
 ]
 
 [[package]]
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 5fb3d5083d..359999f680 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -17,8 +17,6 @@ import urllib.parse
 from typing import (
     TYPE_CHECKING,
     Any,
-    Awaitable,
-    Callable,
     Dict,
     Iterable,
     List,
@@ -30,7 +28,7 @@ from typing import (
 )
 
 from prometheus_client import Counter
-from typing_extensions import Concatenate, ParamSpec, TypeGuard
+from typing_extensions import ParamSpec, TypeGuard
 
 from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
 from synapse.api.errors import CodeMessageException, HttpResponseException
@@ -80,9 +78,7 @@ sent_todevice_counter = Counter(
 
 HOUR_IN_MS = 60 * 60 * 1000
 
-
 APP_SERVICE_PREFIX = "/_matrix/app/v1"
-APP_SERVICE_UNSTABLE_PREFIX = "/_matrix/app/unstable"
 
 P = ParamSpec("P")
 R = TypeVar("R")
@@ -128,47 +124,6 @@ class ApplicationServiceApi(SimpleHttpClient):
             hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
         )
 
-    async def _send_with_fallbacks(
-        self,
-        service: "ApplicationService",
-        prefixes: List[str],
-        path: str,
-        func: Callable[Concatenate[str, P], Awaitable[R]],
-        *args: P.args,
-        **kwargs: P.kwargs,
-    ) -> R:
-        """
-        Attempt to call an application service with multiple paths, falling back
-        until one succeeds.
-
-        Args:
-            service: The appliacation service, this provides the base URL.
-            prefixes: A last of paths to try in order for the requests.
-            path: A suffix to append to each prefix.
-            func: The function to call, the first argument will be the full
-                endpoint to fetch. Other arguments are provided by args/kwargs.
-
-        Returns:
-            The return value of func.
-        """
-        for i, prefix in enumerate(prefixes, start=1):
-            uri = f"{service.url}{prefix}{path}"
-            try:
-                return await func(uri, *args, **kwargs)
-            except HttpResponseException as e:
-                # If an error is received that is due to an unrecognised path,
-                # fallback to next path (if one exists). Otherwise, consider it
-                # a legitimate error and raise.
-                if i < len(prefixes) and is_unknown_endpoint(e):
-                    continue
-                raise
-            except Exception:
-                # Unexpected exceptions get sent to the caller.
-                raise
-
-        # The function should always exit via the return or raise above this.
-        raise RuntimeError("Unexpected fallback behaviour. This should never be seen.")
-
     async def query_user(self, service: "ApplicationService", user_id: str) -> bool:
         if service.url is None:
             return False
@@ -177,11 +132,8 @@ class ApplicationServiceApi(SimpleHttpClient):
         assert service.hs_token is not None
 
         try:
-            response = await self._send_with_fallbacks(
-                service,
-                [APP_SERVICE_PREFIX, ""],
-                f"/users/{urllib.parse.quote(user_id)}",
-                self.get_json,
+            response = await self.get_json(
+                f"{service.url}{APP_SERVICE_PREFIX}/users/{urllib.parse.quote(user_id)}",
                 {"access_token": service.hs_token},
                 headers={"Authorization": [f"Bearer {service.hs_token}"]},
             )
@@ -203,11 +155,8 @@ class ApplicationServiceApi(SimpleHttpClient):
         assert service.hs_token is not None
 
         try:
-            response = await self._send_with_fallbacks(
-                service,
-                [APP_SERVICE_PREFIX, ""],
-                f"/rooms/{urllib.parse.quote(alias)}",
-                self.get_json,
+            response = await self.get_json(
+                f"{service.url}{APP_SERVICE_PREFIX}/rooms/{urllib.parse.quote(alias)}",
                 {"access_token": service.hs_token},
                 headers={"Authorization": [f"Bearer {service.hs_token}"]},
             )
@@ -245,11 +194,8 @@ class ApplicationServiceApi(SimpleHttpClient):
                 **fields,
                 b"access_token": service.hs_token,
             }
-            response = await self._send_with_fallbacks(
-                service,
-                [APP_SERVICE_PREFIX, APP_SERVICE_UNSTABLE_PREFIX],
-                f"/thirdparty/{kind}/{urllib.parse.quote(protocol)}",
-                self.get_json,
+            response = await self.get_json(
+                f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/{kind}/{urllib.parse.quote(protocol)}",
                 args=args,
                 headers={"Authorization": [f"Bearer {service.hs_token}"]},
             )
@@ -285,11 +231,8 @@ class ApplicationServiceApi(SimpleHttpClient):
             # This is required by the configuration.
             assert service.hs_token is not None
             try:
-                info = await self._send_with_fallbacks(
-                    service,
-                    [APP_SERVICE_PREFIX, APP_SERVICE_UNSTABLE_PREFIX],
-                    f"/thirdparty/protocol/{urllib.parse.quote(protocol)}",
-                    self.get_json,
+                info = await self.get_json(
+                    f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/protocol/{urllib.parse.quote(protocol)}",
                     {"access_token": service.hs_token},
                     headers={"Authorization": [f"Bearer {service.hs_token}"]},
                 )
@@ -401,11 +344,8 @@ class ApplicationServiceApi(SimpleHttpClient):
                 }
 
         try:
-            await self._send_with_fallbacks(
-                service,
-                [APP_SERVICE_PREFIX, ""],
-                f"/transactions/{urllib.parse.quote(str(txn_id))}",
-                self.put_json,
+            await self.put_json(
+                f"{service.url}{APP_SERVICE_PREFIX}/transactions/{urllib.parse.quote(str(txn_id))}",
                 json_body=body,
                 args={"access_token": service.hs_token},
                 headers={"Authorization": [f"Bearer {service.hs_token}"]},
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index fa61dd8c10..a90d99c4d6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -63,6 +63,7 @@ from synapse.federation.federation_base import (
 )
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
 from synapse.http.servlet import assert_params_in_dict
 from synapse.logging.context import (
     make_deferred_yieldable,
@@ -137,6 +138,7 @@ class FederationServer(FederationBase):
         self._event_auth_handler = hs.get_event_auth_handler()
         self._room_member_handler = hs.get_room_member_handler()
         self._e2e_keys_handler = hs.get_e2e_keys_handler()
+        self._worker_lock_handler = hs.get_worker_locks_handler()
 
         self._state_storage_controller = hs.get_storage_controllers().state
 
@@ -1236,9 +1238,18 @@ class FederationServer(FederationBase):
                 logger.info("handling received PDU in room %s: %s", room_id, event)
                 try:
                     with nested_logging_context(event.event_id):
-                        await self._federation_event_handler.on_receive_pdu(
-                            origin, event
-                        )
+                        # We're taking out a lock within a lock, which could
+                        # lead to deadlocks if we're not careful. However, it is
+                        # safe on this occasion as we only ever take a write
+                        # lock when deleting a room, which we would never do
+                        # while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME`
+                        # lock.
+                        async with self._worker_lock_handler.acquire_read_write_lock(
+                            DELETE_ROOM_LOCK_NAME, room_id, write=False
+                        ):
+                            await self._federation_event_handler.on_receive_pdu(
+                                origin, event
+                            )
                 except FederationError as e:
                     # XXX: Ideally we'd inform the remote we failed to process
                     # the event, but we can't return an error in the transaction
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index fff0b5fa12..c656e07d37 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -53,6 +53,7 @@ from synapse.events.snapshot import EventContext, UnpersistedEventContextBase
 from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field
 from synapse.events.validator import EventValidator
 from synapse.handlers.directory import DirectoryHandler
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
 from synapse.logging import opentracing
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -485,6 +486,7 @@ class EventCreationHandler:
         self._events_shard_config = self.config.worker.events_shard_config
         self._instance_name = hs.get_instance_name()
         self._notifier = hs.get_notifier()
+        self._worker_lock_handler = hs.get_worker_locks_handler()
 
         self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state
 
@@ -876,14 +878,13 @@ class EventCreationHandler:
                 return prev_event
         return None
 
-    async def get_event_from_transaction(
+    async def get_event_id_from_transaction(
         self,
         requester: Requester,
         txn_id: str,
         room_id: str,
-    ) -> Optional[EventBase]:
-        """For the given transaction ID and room ID, check if there is a matching event.
-        If so, fetch it and return it.
+    ) -> Optional[str]:
+        """For the given transaction ID and room ID, check if there is a matching event ID.
 
         Args:
             requester: The requester making the request in the context of which we want
@@ -892,8 +893,9 @@ class EventCreationHandler:
             room_id: The room ID.
 
         Returns:
-            An event if one could be found, None otherwise.
+            An event ID if one could be found, None otherwise.
         """
+        existing_event_id = None
 
         if self._msc3970_enabled and requester.device_id:
             # When MSC3970 is enabled, we lookup for events sent by the same device first,
@@ -907,7 +909,7 @@ class EventCreationHandler:
                 )
             )
             if existing_event_id:
-                return await self.store.get_event(existing_event_id)
+                return existing_event_id
 
         # Pre-MSC3970, we looked up for events that were sent by the same session by
         # using the access token ID.
@@ -920,9 +922,32 @@ class EventCreationHandler:
                     txn_id,
                 )
             )
-            if existing_event_id:
-                return await self.store.get_event(existing_event_id)
 
+        return existing_event_id
+
+    async def get_event_from_transaction(
+        self,
+        requester: Requester,
+        txn_id: str,
+        room_id: str,
+    ) -> Optional[EventBase]:
+        """For the given transaction ID and room ID, check if there is a matching event.
+        If so, fetch it and return it.
+
+        Args:
+            requester: The requester making the request in the context of which we want
+                to fetch the event.
+            txn_id: The transaction ID.
+            room_id: The room ID.
+
+        Returns:
+            An event if one could be found, None otherwise.
+        """
+        existing_event_id = await self.get_event_id_from_transaction(
+            requester, txn_id, room_id
+        )
+        if existing_event_id:
+            return await self.store.get_event(existing_event_id)
         return None
 
     async def create_and_send_nonmember_event(
@@ -1010,6 +1035,37 @@ class EventCreationHandler:
                         event.internal_metadata.stream_ordering,
                     )
 
+        async with self._worker_lock_handler.acquire_read_write_lock(
+            DELETE_ROOM_LOCK_NAME, room_id, write=False
+        ):
+            return await self._create_and_send_nonmember_event_locked(
+                requester=requester,
+                event_dict=event_dict,
+                allow_no_prev_events=allow_no_prev_events,
+                prev_event_ids=prev_event_ids,
+                state_event_ids=state_event_ids,
+                ratelimit=ratelimit,
+                txn_id=txn_id,
+                ignore_shadow_ban=ignore_shadow_ban,
+                outlier=outlier,
+                depth=depth,
+            )
+
+    async def _create_and_send_nonmember_event_locked(
+        self,
+        requester: Requester,
+        event_dict: dict,
+        allow_no_prev_events: bool = False,
+        prev_event_ids: Optional[List[str]] = None,
+        state_event_ids: Optional[List[str]] = None,
+        ratelimit: bool = True,
+        txn_id: Optional[str] = None,
+        ignore_shadow_ban: bool = False,
+        outlier: bool = False,
+        depth: Optional[int] = None,
+    ) -> Tuple[EventBase, int]:
+        room_id = event_dict["room_id"]
+
         # If we don't have any prev event IDs specified then we need to
         # check that the host is in the room (as otherwise populating the
         # prev events will fail), at which point we may as well check the
@@ -1923,7 +1979,10 @@ class EventCreationHandler:
         )
 
         for room_id in room_ids:
-            dummy_event_sent = await self._send_dummy_event_for_room(room_id)
+            async with self._worker_lock_handler.acquire_read_write_lock(
+                DELETE_ROOM_LOCK_NAME, room_id, write=False
+            ):
+                dummy_event_sent = await self._send_dummy_event_for_room(room_id)
 
             if not dummy_event_sent:
                 # Did not find a valid user in the room, so remove from future attempts
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 19b8728db9..da34658470 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -46,6 +46,11 @@ logger = logging.getLogger(__name__)
 BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
 
 
+PURGE_HISTORY_LOCK_NAME = "purge_history_lock"
+
+DELETE_ROOM_LOCK_NAME = "delete_room_lock"
+
+
 @attr.s(slots=True, auto_attribs=True)
 class PurgeStatus:
     """Object tracking the status of a purge request
@@ -142,6 +147,7 @@ class PaginationHandler:
         self._server_name = hs.hostname
         self._room_shutdown_handler = hs.get_room_shutdown_handler()
         self._relations_handler = hs.get_relations_handler()
+        self._worker_locks = hs.get_worker_locks_handler()
 
         self.pagination_lock = ReadWriteLock()
         # IDs of rooms in which there currently an active purge *or delete* operation.
@@ -356,7 +362,9 @@ class PaginationHandler:
         """
         self._purges_in_progress_by_room.add(room_id)
         try:
-            async with self.pagination_lock.write(room_id):
+            async with self._worker_locks.acquire_read_write_lock(
+                PURGE_HISTORY_LOCK_NAME, room_id, write=True
+            ):
                 await self._storage_controllers.purge_events.purge_history(
                     room_id, token, delete_local_events
                 )
@@ -412,7 +420,10 @@ class PaginationHandler:
             room_id: room to be purged
             force: set true to skip checking for joined users.
         """
-        async with self.pagination_lock.write(room_id):
+        async with self._worker_locks.acquire_multi_read_write_lock(
+            [(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)],
+            write=True,
+        ):
             # first check that we have no users in this room
             if not force:
                 joined = await self.store.is_host_joined(room_id, self._server_name)
@@ -471,7 +482,9 @@ class PaginationHandler:
 
         room_token = from_token.room_key
 
-        async with self.pagination_lock.read(room_id):
+        async with self._worker_locks.acquire_read_write_lock(
+            PURGE_HISTORY_LOCK_NAME, room_id, write=False
+        ):
             (membership, member_event_id) = (None, None)
             if not use_admin_priviledge:
                 (
@@ -747,7 +760,9 @@ class PaginationHandler:
 
         self._purges_in_progress_by_room.add(room_id)
         try:
-            async with self.pagination_lock.write(room_id):
+            async with self._worker_locks.acquire_read_write_lock(
+                PURGE_HISTORY_LOCK_NAME, room_id, write=True
+            ):
                 self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
                 self._delete_by_id[
                     delete_id
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index a7f8c5e636..c7fe101cd9 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -68,7 +68,7 @@ class ProfileHandler:
 
         if self.hs.is_mine(target_user):
             profileinfo = await self.store.get_profileinfo(target_user)
-            if profileinfo.display_name is None:
+            if profileinfo.display_name is None and profileinfo.avatar_url is None:
                 raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
 
             return {
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 496e701f13..e3cdf2bc61 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -39,6 +39,7 @@ from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
 from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
 from synapse.logging import opentracing
 from synapse.metrics import event_processing_positions
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -94,6 +95,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         self.event_creation_handler = hs.get_event_creation_handler()
         self.account_data_handler = hs.get_account_data_handler()
         self.event_auth_handler = hs.get_event_auth_handler()
+        self._worker_lock_handler = hs.get_worker_locks_handler()
 
         self.member_linearizer: Linearizer = Linearizer(name="member")
         self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
@@ -174,8 +176,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         self.request_ratelimiter = hs.get_request_ratelimiter()
         hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
 
-        self._msc3970_enabled = hs.config.experimental.msc3970_enabled
-
     def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
         """Notify the rate limiter that a room join has occurred.
 
@@ -416,29 +416,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         # do this check just before we persist an event as well, but may as well
         # do it up front for efficiency.)
         if txn_id:
-            existing_event_id = None
-            if self._msc3970_enabled and requester.device_id:
-                # When MSC3970 is enabled, we lookup for events sent by the same device
-                # first, and fallback to the old behaviour if none were found.
-                existing_event_id = (
-                    await self.store.get_event_id_from_transaction_id_and_device_id(
-                        room_id,
-                        requester.user.to_string(),
-                        requester.device_id,
-                        txn_id,
-                    )
+            existing_event_id = (
+                await self.event_creation_handler.get_event_id_from_transaction(
+                    requester, txn_id, room_id
                 )
-
-            if requester.access_token_id and not existing_event_id:
-                existing_event_id = (
-                    await self.store.get_event_id_from_transaction_id_and_token_id(
-                        room_id,
-                        requester.user.to_string(),
-                        requester.access_token_id,
-                        txn_id,
-                    )
-                )
-
+            )
             if existing_event_id:
                 event_pos = await self.store.get_position_for_event(existing_event_id)
                 return existing_event_id, event_pos.stream
@@ -638,26 +620,29 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         # by application services), and then by room ID.
         async with self.member_as_limiter.queue(as_id):
             async with self.member_linearizer.queue(key):
-                with opentracing.start_active_span("update_membership_locked"):
-                    result = await self.update_membership_locked(
-                        requester,
-                        target,
-                        room_id,
-                        action,
-                        txn_id=txn_id,
-                        remote_room_hosts=remote_room_hosts,
-                        third_party_signed=third_party_signed,
-                        ratelimit=ratelimit,
-                        content=content,
-                        new_room=new_room,
-                        require_consent=require_consent,
-                        outlier=outlier,
-                        allow_no_prev_events=allow_no_prev_events,
-                        prev_event_ids=prev_event_ids,
-                        state_event_ids=state_event_ids,
-                        depth=depth,
-                        origin_server_ts=origin_server_ts,
-                    )
+                async with self._worker_lock_handler.acquire_read_write_lock(
+                    DELETE_ROOM_LOCK_NAME, room_id, write=False
+                ):
+                    with opentracing.start_active_span("update_membership_locked"):
+                        result = await self.update_membership_locked(
+                            requester,
+                            target,
+                            room_id,
+                            action,
+                            txn_id=txn_id,
+                            remote_room_hosts=remote_room_hosts,
+                            third_party_signed=third_party_signed,
+                            ratelimit=ratelimit,
+                            content=content,
+                            new_room=new_room,
+                            require_consent=require_consent,
+                            outlier=outlier,
+                            allow_no_prev_events=allow_no_prev_events,
+                            prev_event_ids=prev_event_ids,
+                            state_event_ids=state_event_ids,
+                            depth=depth,
+                            origin_server_ts=origin_server_ts,
+                        )
 
         return result
 
diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py
new file mode 100644
index 0000000000..72df773a86
--- /dev/null
+++ b/synapse/handlers/worker_lock.py
@@ -0,0 +1,333 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import random
+from types import TracebackType
+from typing import (
+    TYPE_CHECKING,
+    AsyncContextManager,
+    Collection,
+    Dict,
+    Optional,
+    Tuple,
+    Type,
+    Union,
+)
+from weakref import WeakSet
+
+import attr
+
+from twisted.internet import defer
+from twisted.internet.interfaces import IReactorTime
+
+from synapse.logging.context import PreserveLoggingContext
+from synapse.logging.opentracing import start_active_span
+from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.storage.databases.main.lock import Lock, LockStore
+from synapse.util.async_helpers import timeout_deferred
+
+if TYPE_CHECKING:
+    from synapse.logging.opentracing import opentracing
+    from synapse.server import HomeServer
+
+
+DELETE_ROOM_LOCK_NAME = "delete_room_lock"
+
+
+class WorkerLocksHandler:
+    """A class for waiting on taking out locks, rather than using the storage
+    functions directly (which don't support awaiting).
+    """
+
+    def __init__(self, hs: "HomeServer") -> None:
+        self._reactor = hs.get_reactor()
+        self._store = hs.get_datastores().main
+        self._clock = hs.get_clock()
+        self._notifier = hs.get_notifier()
+        self._instance_name = hs.get_instance_name()
+
+        # Map from lock name/key to set of `WaitingLock` that are active for
+        # that lock.
+        self._locks: Dict[
+            Tuple[str, str], WeakSet[Union[WaitingLock, WaitingMultiLock]]
+        ] = {}
+
+        self._clock.looping_call(self._cleanup_locks, 30_000)
+
+        self._notifier.add_lock_released_callback(self._on_lock_released)
+
+    def acquire_lock(self, lock_name: str, lock_key: str) -> "WaitingLock":
+        """Acquire a standard lock, returns a context manager that will block
+        until the lock is acquired.
+
+        Note: Care must be taken to avoid deadlocks. In particular, this
+        function does *not* timeout.
+
+        Usage:
+            async with handler.acquire_lock(name, key):
+                # Do work while holding the lock...
+        """
+
+        lock = WaitingLock(
+            reactor=self._reactor,
+            store=self._store,
+            handler=self,
+            lock_name=lock_name,
+            lock_key=lock_key,
+            write=None,
+        )
+
+        self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock)
+
+        return lock
+
+    def acquire_read_write_lock(
+        self,
+        lock_name: str,
+        lock_key: str,
+        *,
+        write: bool,
+    ) -> "WaitingLock":
+        """Acquire a read/write lock, returns a context manager that will block
+        until the lock is acquired.
+
+        Note: Care must be taken to avoid deadlocks. In particular, this
+        function does *not* timeout.
+
+        Usage:
+            async with handler.acquire_read_write_lock(name, key, write=True):
+                # Do work while holding the lock...
+        """
+
+        lock = WaitingLock(
+            reactor=self._reactor,
+            store=self._store,
+            handler=self,
+            lock_name=lock_name,
+            lock_key=lock_key,
+            write=write,
+        )
+
+        self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock)
+
+        return lock
+
+    def acquire_multi_read_write_lock(
+        self,
+        lock_names: Collection[Tuple[str, str]],
+        *,
+        write: bool,
+    ) -> "WaitingMultiLock":
+        """Acquires multi read/write locks at once, returns a context manager
+        that will block until all the locks are acquired.
+
+        This will try and acquire all locks at once, and will never hold on to a
+        subset of the locks. (This avoids accidentally creating deadlocks).
+
+        Note: Care must be taken to avoid deadlocks. In particular, this
+        function does *not* timeout.
+        """
+
+        lock = WaitingMultiLock(
+            lock_names=lock_names,
+            write=write,
+            reactor=self._reactor,
+            store=self._store,
+            handler=self,
+        )
+
+        for lock_name, lock_key in lock_names:
+            self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock)
+
+        return lock
+
+    def notify_lock_released(self, lock_name: str, lock_key: str) -> None:
+        """Notify that a lock has been released.
+
+        Pokes both the notifier and replication.
+        """
+
+        self._notifier.notify_lock_released(self._instance_name, lock_name, lock_key)
+
+    def _on_lock_released(
+        self, instance_name: str, lock_name: str, lock_key: str
+    ) -> None:
+        """Called when a lock has been released.
+
+        Wakes up any locks that might be waiting on this.
+        """
+        locks = self._locks.get((lock_name, lock_key))
+        if not locks:
+            return
+
+        def _wake_deferred(deferred: defer.Deferred) -> None:
+            if not deferred.called:
+                deferred.callback(None)
+
+        for lock in locks:
+            self._clock.call_later(0, _wake_deferred, lock.deferred)
+
+    @wrap_as_background_process("_cleanup_locks")
+    async def _cleanup_locks(self) -> None:
+        """Periodically cleans out stale entries in the locks map"""
+        self._locks = {key: value for key, value in self._locks.items() if value}
+
+
+@attr.s(auto_attribs=True, eq=False)
+class WaitingLock:
+    reactor: IReactorTime
+    store: LockStore
+    handler: WorkerLocksHandler
+    lock_name: str
+    lock_key: str
+    write: Optional[bool]
+    deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred)
+    _inner_lock: Optional[Lock] = None
+    _retry_interval: float = 0.1
+    _lock_span: "opentracing.Scope" = attr.Factory(
+        lambda: start_active_span("WaitingLock.lock")
+    )
+
+    async def __aenter__(self) -> None:
+        self._lock_span.__enter__()
+
+        with start_active_span("WaitingLock.waiting_for_lock"):
+            while self._inner_lock is None:
+                self.deferred = defer.Deferred()
+
+                if self.write is not None:
+                    lock = await self.store.try_acquire_read_write_lock(
+                        self.lock_name, self.lock_key, write=self.write
+                    )
+                else:
+                    lock = await self.store.try_acquire_lock(
+                        self.lock_name, self.lock_key
+                    )
+
+                if lock:
+                    self._inner_lock = lock
+                    break
+
+                try:
+                    # Wait until the we get notified the lock might have been
+                    # released (by the deferred being resolved). We also
+                    # periodically wake up in case the lock was released but we
+                    # weren't notified.
+                    with PreserveLoggingContext():
+                        await timeout_deferred(
+                            deferred=self.deferred,
+                            timeout=self._get_next_retry_interval(),
+                            reactor=self.reactor,
+                        )
+                except Exception:
+                    pass
+
+        return await self._inner_lock.__aenter__()
+
+    async def __aexit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc: Optional[BaseException],
+        tb: Optional[TracebackType],
+    ) -> Optional[bool]:
+        assert self._inner_lock
+
+        self.handler.notify_lock_released(self.lock_name, self.lock_key)
+
+        try:
+            r = await self._inner_lock.__aexit__(exc_type, exc, tb)
+        finally:
+            self._lock_span.__exit__(exc_type, exc, tb)
+
+        return r
+
+    def _get_next_retry_interval(self) -> float:
+        next = self._retry_interval
+        self._retry_interval = max(5, next * 2)
+        return next * random.uniform(0.9, 1.1)
+
+
+@attr.s(auto_attribs=True, eq=False)
+class WaitingMultiLock:
+    lock_names: Collection[Tuple[str, str]]
+
+    write: bool
+
+    reactor: IReactorTime
+    store: LockStore
+    handler: WorkerLocksHandler
+
+    deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred)
+
+    _inner_lock_cm: Optional[AsyncContextManager] = None
+    _retry_interval: float = 0.1
+    _lock_span: "opentracing.Scope" = attr.Factory(
+        lambda: start_active_span("WaitingLock.lock")
+    )
+
+    async def __aenter__(self) -> None:
+        self._lock_span.__enter__()
+
+        with start_active_span("WaitingLock.waiting_for_lock"):
+            while self._inner_lock_cm is None:
+                self.deferred = defer.Deferred()
+
+                lock_cm = await self.store.try_acquire_multi_read_write_lock(
+                    self.lock_names, write=self.write
+                )
+
+                if lock_cm:
+                    self._inner_lock_cm = lock_cm
+                    break
+
+                try:
+                    # Wait until the we get notified the lock might have been
+                    # released (by the deferred being resolved). We also
+                    # periodically wake up in case the lock was released but we
+                    # weren't notified.
+                    with PreserveLoggingContext():
+                        await timeout_deferred(
+                            deferred=self.deferred,
+                            timeout=self._get_next_retry_interval(),
+                            reactor=self.reactor,
+                        )
+                except Exception:
+                    pass
+
+        assert self._inner_lock_cm
+        await self._inner_lock_cm.__aenter__()
+        return
+
+    async def __aexit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc: Optional[BaseException],
+        tb: Optional[TracebackType],
+    ) -> Optional[bool]:
+        assert self._inner_lock_cm
+
+        for lock_name, lock_key in self.lock_names:
+            self.handler.notify_lock_released(lock_name, lock_key)
+
+        try:
+            r = await self._inner_lock_cm.__aexit__(exc_type, exc, tb)
+        finally:
+            self._lock_span.__exit__(exc_type, exc, tb)
+
+        return r
+
+    def _get_next_retry_interval(self) -> float:
+        next = self._retry_interval
+        self._retry_interval = max(5, next * 2)
+        return next * random.uniform(0.9, 1.1)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 897272ad5b..68115bca70 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -234,6 +234,9 @@ class Notifier:
 
         self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules
 
+        # List of callbacks to be notified when a lock is released
+        self._lock_released_callback: List[Callable[[str, str, str], None]] = []
+
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
         self._pusher_pool = hs.get_pusherpool()
@@ -785,6 +788,19 @@ class Notifier:
         # that any in flight requests can be immediately retried.
         self._federation_client.wake_destination(server)
 
+    def add_lock_released_callback(
+        self, callback: Callable[[str, str, str], None]
+    ) -> None:
+        """Add a function to be called whenever we are notified about a released lock."""
+        self._lock_released_callback.append(callback)
+
+    def notify_lock_released(
+        self, instance_name: str, lock_name: str, lock_key: str
+    ) -> None:
+        """Notify the callbacks that a lock has been released."""
+        for cb in self._lock_released_callback:
+            cb(instance_name, lock_name, lock_key)
+
 
 @attr.s(auto_attribs=True)
 class ReplicationNotifier:
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index f874f072f9..73f3de3642 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -107,8 +107,7 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
     Calls to e2e_keys_handler.upload_keys_for_user(user_id, device_id, keys) on
     the main process to accomplish this.
 
-    Defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
-    Request format(borrowed and expanded from KeyUploadServlet):
+    Request format for this endpoint (borrowed and expanded from KeyUploadServlet):
 
         POST /_synapse/replication/upload_keys_for_user
 
@@ -117,6 +116,7 @@ class ReplicationUploadKeysForUserRestServlet(ReplicationEndpoint):
         "device_id": "<device_id>",
         "keys": {
             ....this part can be found in KeyUploadServlet in rest/client/keys.py....
+            or as defined in https://spec.matrix.org/v1.4/client-server-api/#post_matrixclientv3keysupload
         }
     }
 
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 32f52e54d8..10f5c98ff8 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -422,6 +422,36 @@ class RemoteServerUpCommand(_SimpleCommand):
     NAME = "REMOTE_SERVER_UP"
 
 
+class LockReleasedCommand(Command):
+    """Sent to inform other instances that a given lock has been dropped.
+
+    Format::
+
+        LOCK_RELEASED ["<instance_name>", "<lock_name>", "<lock_key>"]
+    """
+
+    NAME = "LOCK_RELEASED"
+
+    def __init__(
+        self,
+        instance_name: str,
+        lock_name: str,
+        lock_key: str,
+    ):
+        self.instance_name = instance_name
+        self.lock_name = lock_name
+        self.lock_key = lock_key
+
+    @classmethod
+    def from_line(cls: Type["LockReleasedCommand"], line: str) -> "LockReleasedCommand":
+        instance_name, lock_name, lock_key = json_decoder.decode(line)
+
+        return cls(instance_name, lock_name, lock_key)
+
+    def to_line(self) -> str:
+        return json_encoder.encode([self.instance_name, self.lock_name, self.lock_key])
+
+
 _COMMANDS: Tuple[Type[Command], ...] = (
     ServerCommand,
     RdataCommand,
@@ -435,6 +465,7 @@ _COMMANDS: Tuple[Type[Command], ...] = (
     UserIpCommand,
     RemoteServerUpCommand,
     ClearUserSyncsCommand,
+    LockReleasedCommand,
 )
 
 # Map of command name to command type.
@@ -448,6 +479,7 @@ VALID_SERVER_COMMANDS = (
     ErrorCommand.NAME,
     PingCommand.NAME,
     RemoteServerUpCommand.NAME,
+    LockReleasedCommand.NAME,
 )
 
 # The commands the client is allowed to send
@@ -461,6 +493,7 @@ VALID_CLIENT_COMMANDS = (
     UserIpCommand.NAME,
     ErrorCommand.NAME,
     RemoteServerUpCommand.NAME,
+    LockReleasedCommand.NAME,
 )
 
 
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 5d108fe11b..a2cabba7b1 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -39,6 +39,7 @@ from synapse.replication.tcp.commands import (
     ClearUserSyncsCommand,
     Command,
     FederationAckCommand,
+    LockReleasedCommand,
     PositionCommand,
     RdataCommand,
     RemoteServerUpCommand,
@@ -248,6 +249,9 @@ class ReplicationCommandHandler:
         if self._is_master or self._should_insert_client_ips:
             self.subscribe_to_channel("USER_IP")
 
+        if hs.config.redis.redis_enabled:
+            self._notifier.add_lock_released_callback(self.on_lock_released)
+
     def subscribe_to_channel(self, channel_name: str) -> None:
         """
         Indicates that we wish to subscribe to a Redis channel by name.
@@ -648,6 +652,17 @@ class ReplicationCommandHandler:
 
         self._notifier.notify_remote_server_up(cmd.data)
 
+    def on_LOCK_RELEASED(
+        self, conn: IReplicationConnection, cmd: LockReleasedCommand
+    ) -> None:
+        """Called when we get a new LOCK_RELEASED command."""
+        if cmd.instance_name == self._instance_name:
+            return
+
+        self._notifier.notify_lock_released(
+            cmd.instance_name, cmd.lock_name, cmd.lock_key
+        )
+
     def new_connection(self, connection: IReplicationConnection) -> None:
         """Called when we have a new connection."""
         self._connections.append(connection)
@@ -754,6 +769,13 @@ class ReplicationCommandHandler:
         """
         self.send_command(RdataCommand(stream_name, self._instance_name, token, data))
 
+    def on_lock_released(
+        self, instance_name: str, lock_name: str, lock_key: str
+    ) -> None:
+        """Called when we released a lock and should notify other instances."""
+        if instance_name == self._instance_name:
+            self.send_command(LockReleasedCommand(instance_name, lock_name, lock_key))
+
 
 UpdateToken = TypeVar("UpdateToken")
 UpdateRow = TypeVar("UpdateRow")
diff --git a/synapse/rest/client/room_upgrade_rest_servlet.py b/synapse/rest/client/room_upgrade_rest_servlet.py
index 6a7792e18b..4a5d9e13e7 100644
--- a/synapse/rest/client/room_upgrade_rest_servlet.py
+++ b/synapse/rest/client/room_upgrade_rest_servlet.py
@@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, Tuple
 
 from synapse.api.errors import Codes, ShadowBanError, SynapseError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
 from synapse.http.server import HttpServer
 from synapse.http.servlet import (
     RestServlet,
@@ -60,6 +61,7 @@ class RoomUpgradeRestServlet(RestServlet):
         self._hs = hs
         self._room_creation_handler = hs.get_room_creation_handler()
         self._auth = hs.get_auth()
+        self._worker_lock_handler = hs.get_worker_locks_handler()
 
     async def on_POST(
         self, request: SynapseRequest, room_id: str
@@ -78,9 +80,12 @@ class RoomUpgradeRestServlet(RestServlet):
             )
 
         try:
-            new_room_id = await self._room_creation_handler.upgrade_room(
-                requester, room_id, new_version
-            )
+            async with self._worker_lock_handler.acquire_read_write_lock(
+                DELETE_ROOM_LOCK_NAME, room_id, write=False
+            ):
+                new_room_id = await self._room_creation_handler.upgrade_room(
+                    requester, room_id, new_version
+                )
         except ShadowBanError:
             # Generate a random room ID.
             new_room_id = stringutils.random_string(18)
diff --git a/synapse/server.py b/synapse/server.py
index b72b76a38b..8430f99ef2 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -107,6 +107,7 @@ from synapse.handlers.stats import StatsHandler
 from synapse.handlers.sync import SyncHandler
 from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler
 from synapse.handlers.user_directory import UserDirectoryHandler
+from synapse.handlers.worker_lock import WorkerLocksHandler
 from synapse.http.client import (
     InsecureInterceptableContextFactory,
     ReplicationClient,
@@ -912,3 +913,7 @@ class HomeServer(metaclass=abc.ABCMeta):
     def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager:
         """Usage metrics shared between phone home stats and the prometheus exporter."""
         return CommonUsageMetricsManager(self)
+
+    @cache_in_self
+    def get_worker_locks_handler(self) -> WorkerLocksHandler:
+        return WorkerLocksHandler(self)
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index 35c0680365..35cd1089d6 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -45,6 +45,7 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
 from synapse.logging.opentracing import (
     SynapseTags,
@@ -338,6 +339,7 @@ class EventsPersistenceStorageController:
         )
         self._state_resolution_handler = hs.get_state_resolution_handler()
         self._state_controller = state_controller
+        self.hs = hs
 
     async def _process_event_persist_queue_task(
         self,
@@ -350,15 +352,22 @@ class EventsPersistenceStorageController:
             A dictionary of event ID to event ID we didn't persist as we already
             had another event persisted with the same TXN ID.
         """
-        if isinstance(task, _PersistEventsTask):
-            return await self._persist_event_batch(room_id, task)
-        elif isinstance(task, _UpdateCurrentStateTask):
-            await self._update_current_state(room_id, task)
-            return {}
-        else:
-            raise AssertionError(
-                f"Found an unexpected task type in event persistence queue: {task}"
-            )
+
+        # Ensure that the room can't be deleted while we're persisting events to
+        # it. We might already have taken out the lock, but since this is just a
+        # "read" lock its inherently reentrant.
+        async with self.hs.get_worker_locks_handler().acquire_read_write_lock(
+            DELETE_ROOM_LOCK_NAME, room_id, write=False
+        ):
+            if isinstance(task, _PersistEventsTask):
+                return await self._persist_event_batch(room_id, task)
+            elif isinstance(task, _UpdateCurrentStateTask):
+                await self._update_current_state(room_id, task)
+                return {}
+            else:
+                raise AssertionError(
+                    f"Found an unexpected task type in event persistence queue: {task}"
+                )
 
     @trace
     async def persist_events(
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index b2cda52ce5..534dc32413 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -843,7 +843,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                      * because the schema change is in a background update, it's not
                      * necessarily safe to assume that it will have been completed.
                      */
-                    AND edge.is_state is ? /* False */
+                    AND edge.is_state is FALSE
                     /**
                      * We only want backwards extremities that are older than or at
                      * the same position of the given `current_depth` (where older
@@ -886,7 +886,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                 sql,
                 (
                     room_id,
-                    False,
                     current_depth,
                     self._clock.time_msec(),
                     BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 2b83a69426..bd3f14fb71 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1455,8 +1455,8 @@ class PersistEventsStore:
                     },
                 )
 
-                sql = "UPDATE events SET outlier = ? WHERE event_id = ?"
-                txn.execute(sql, (False, event.event_id))
+                sql = "UPDATE events SET outlier = FALSE WHERE event_id = ?"
+                txn.execute(sql, (event.event_id,))
 
                 # Update the event_backward_extremities table now that this
                 # event isn't an outlier any more.
@@ -1549,13 +1549,13 @@ class PersistEventsStore:
             for event, _ in events_and_contexts
             if not event.internal_metadata.is_redacted()
         ]
-        sql = "UPDATE redactions SET have_censored = ? WHERE "
+        sql = "UPDATE redactions SET have_censored = FALSE WHERE "
         clause, args = make_in_list_sql_clause(
             self.database_engine,
             "redacts",
             unredacted_events,
         )
-        txn.execute(sql + clause, [False] + args)
+        txn.execute(sql + clause, args)
 
         self.db_pool.simple_insert_many_txn(
             txn,
@@ -2318,14 +2318,14 @@ class PersistEventsStore:
             "   SELECT 1 FROM events"
             "   LEFT JOIN event_edges edge"
             "   ON edge.event_id = events.event_id"
-            "   WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)"
+            "   WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
             " )"
         )
 
         txn.execute_batch(
             query,
             [
-                (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False)
+                (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
                 for ev in events
                 for e_id in ev.prev_event_ids()
                 if not ev.internal_metadata.is_outlier()
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index c89b4f7919..1680bf6168 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -12,8 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from contextlib import AsyncExitStack
 from types import TracebackType
-from typing import TYPE_CHECKING, Optional, Set, Tuple, Type
+from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type
 from weakref import WeakValueDictionary
 
 from twisted.internet.interfaces import IReactorCore
@@ -208,76 +209,85 @@ class LockStore(SQLBaseStore):
         used (otherwise the lock will leak).
         """
 
+        try:
+            lock = await self.db_pool.runInteraction(
+                "try_acquire_read_write_lock",
+                self._try_acquire_read_write_lock_txn,
+                lock_name,
+                lock_key,
+                write,
+            )
+        except self.database_engine.module.IntegrityError:
+            return None
+
+        return lock
+
+    def _try_acquire_read_write_lock_txn(
+        self,
+        txn: LoggingTransaction,
+        lock_name: str,
+        lock_key: str,
+        write: bool,
+    ) -> "Lock":
+        # We attempt to acquire the lock by inserting into
+        # `worker_read_write_locks` and seeing if that fails any
+        # constraints. If it doesn't then we have acquired the lock,
+        # otherwise we haven't.
+        #
+        # Before that though we clear the table of any stale locks.
+
         now = self._clock.time_msec()
         token = random_string(6)
 
-        def _try_acquire_read_write_lock_txn(txn: LoggingTransaction) -> None:
-            # We attempt to acquire the lock by inserting into
-            # `worker_read_write_locks` and seeing if that fails any
-            # constraints. If it doesn't then we have acquired the lock,
-            # otherwise we haven't.
-            #
-            # Before that though we clear the table of any stale locks.
-
-            delete_sql = """
-                DELETE FROM worker_read_write_locks
-                    WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
-            """
-
-            insert_sql = """
-                INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
-                VALUES (?, ?, ?, ?, ?, ?)
-            """
-
-            if isinstance(self.database_engine, PostgresEngine):
-                # For Postgres we can send these queries at the same time.
-                txn.execute(
-                    delete_sql + ";" + insert_sql,
-                    (
-                        # DELETE args
-                        now - _LOCK_TIMEOUT_MS,
-                        lock_name,
-                        lock_key,
-                        # UPSERT args
-                        lock_name,
-                        lock_key,
-                        write,
-                        self._instance_name,
-                        token,
-                        now,
-                    ),
-                )
-            else:
-                # For SQLite these need to be two queries.
-                txn.execute(
-                    delete_sql,
-                    (
-                        now - _LOCK_TIMEOUT_MS,
-                        lock_name,
-                        lock_key,
-                    ),
-                )
-                txn.execute(
-                    insert_sql,
-                    (
-                        lock_name,
-                        lock_key,
-                        write,
-                        self._instance_name,
-                        token,
-                        now,
-                    ),
-                )
+        delete_sql = """
+            DELETE FROM worker_read_write_locks
+                WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?;
+        """
 
-            return
+        insert_sql = """
+            INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts)
+            VALUES (?, ?, ?, ?, ?, ?)
+        """
 
-        try:
-            await self.db_pool.runInteraction(
-                "try_acquire_read_write_lock",
-                _try_acquire_read_write_lock_txn,
+        if isinstance(self.database_engine, PostgresEngine):
+            # For Postgres we can send these queries at the same time.
+            txn.execute(
+                delete_sql + ";" + insert_sql,
+                (
+                    # DELETE args
+                    now - _LOCK_TIMEOUT_MS,
+                    lock_name,
+                    lock_key,
+                    # UPSERT args
+                    lock_name,
+                    lock_key,
+                    write,
+                    self._instance_name,
+                    token,
+                    now,
+                ),
+            )
+        else:
+            # For SQLite these need to be two queries.
+            txn.execute(
+                delete_sql,
+                (
+                    now - _LOCK_TIMEOUT_MS,
+                    lock_name,
+                    lock_key,
+                ),
+            )
+            txn.execute(
+                insert_sql,
+                (
+                    lock_name,
+                    lock_key,
+                    write,
+                    self._instance_name,
+                    token,
+                    now,
+                ),
             )
-        except self.database_engine.module.IntegrityError:
-            return None
 
         lock = Lock(
             self._reactor,
@@ -289,10 +299,58 @@ class LockStore(SQLBaseStore):
             token=token,
         )
 
-        self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock
+        def set_lock() -> None:
+            self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock
+
+        txn.call_after(set_lock)
 
         return lock
 
+    async def try_acquire_multi_read_write_lock(
+        self,
+        lock_names: Collection[Tuple[str, str]],
+        write: bool,
+    ) -> Optional[AsyncExitStack]:
+        """Try to acquire multiple locks for the given names/keys. Will return
+        an async context manager if the locks are successfully acquired, which
+        *must* be used (otherwise the lock will leak).
+
+        If only a subset of the locks can be acquired then it will immediately
+        drop them and return `None`.
+        """
+        try:
+            locks = await self.db_pool.runInteraction(
+                "try_acquire_multi_read_write_lock",
+                self._try_acquire_multi_read_write_lock_txn,
+                lock_names,
+                write,
+            )
+        except self.database_engine.module.IntegrityError:
+            return None
+
+        stack = AsyncExitStack()
+
+        for lock in locks:
+            await stack.enter_async_context(lock)
+
+        return stack
+
+    def _try_acquire_multi_read_write_lock_txn(
+        self,
+        txn: LoggingTransaction,
+        lock_names: Collection[Tuple[str, str]],
+        write: bool,
+    ) -> Collection["Lock"]:
+        locks = []
+
+        for lock_name, lock_key in lock_names:
+            lock = self._try_acquire_read_write_lock_txn(
+                txn, lock_name, lock_key, write
+            )
+            locks.append(lock)
+
+        return locks
+
 
 class Lock:
     """An async context manager that manages an acquired lock, ensuring it is
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 9773c1fcd2..b52f48cf04 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -249,12 +249,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         # Mark all state and own events as outliers
         logger.info("[purge] marking remaining events as outliers")
         txn.execute(
-            "UPDATE events SET outlier = ?"
+            "UPDATE events SET outlier = TRUE"
             " WHERE event_id IN ("
-            "    SELECT event_id FROM events_to_purge "
-            "    WHERE NOT should_delete"
-            ")",
-            (True,),
+            "   SELECT event_id FROM events_to_purge "
+            "   WHERE NOT should_delete"
+            ")"
         )
 
         # synapse tries to take out an exclusive lock on room_depth whenever it
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index e098ceea3c..c13c0bc7d7 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -560,19 +560,19 @@ class PushRuleStore(PushRulesWorkerStore):
         if isinstance(self.database_engine, PostgresEngine):
             sql = """
                 INSERT INTO push_rules_enable (id, user_name, rule_id, enabled)
-                VALUES (?, ?, ?, ?)
+                VALUES (?, ?, ?, 1)
                 ON CONFLICT DO NOTHING
             """
         elif isinstance(self.database_engine, Sqlite3Engine):
             sql = """
                 INSERT OR IGNORE INTO push_rules_enable (id, user_name, rule_id, enabled)
-                VALUES (?, ?, ?, ?)
+                VALUES (?, ?, ?, 1)
             """
         else:
             raise RuntimeError("Unknown database engine")
 
         new_enable_id = self._push_rules_enable_id_gen.get_next()
-        txn.execute(sql, (new_enable_id, user_id, rule_id, 1))
+        txn.execute(sql, (new_enable_id, user_id, rule_id))
 
     async def delete_push_rule(self, user_id: str, rule_id: str) -> None:
         """
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 676d03bb7e..c582cf0573 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -454,9 +454,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
         ) -> List[Tuple[str, int]]:
             sql = (
                 "SELECT user_id, expiration_ts_ms FROM account_validity"
-                " WHERE email_sent = ? AND (expiration_ts_ms - ?) <= ?"
+                " WHERE email_sent = FALSE AND (expiration_ts_ms - ?) <= ?"
             )
-            values = [False, now_ms, renew_at]
+            values = [now_ms, renew_at]
             txn.execute(sql, values)
             return cast(List[Tuple[str, int]], txn.fetchall())
 
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 830658f328..719e11aea6 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -936,11 +936,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
             JOIN event_json USING (room_id, event_id)
             WHERE room_id = ?
                 %(where_clause)s
-                AND contains_url = ? AND outlier = ?
+                AND contains_url = TRUE AND outlier = FALSE
             ORDER BY stream_ordering DESC
             LIMIT ?
         """
-        txn.execute(sql % {"where_clause": ""}, (room_id, True, False, 100))
+        txn.execute(sql % {"where_clause": ""}, (room_id, 100))
 
         local_media_mxcs = []
         remote_media_mxcs = []
@@ -976,7 +976,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
             txn.execute(
                 sql % {"where_clause": "AND stream_ordering < ?"},
-                (room_id, next_token, True, False, 100),
+                (room_id, next_token, 100),
             )
 
         return local_media_mxcs, remote_media_mxcs
@@ -1086,9 +1086,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
         # set quarantine
         if quarantined_by is not None:
-            sql += "AND safe_from_quarantine = ?"
+            sql += "AND safe_from_quarantine = FALSE"
             txn.executemany(
-                sql, [(quarantined_by, media_id, False) for media_id in local_mxcs]
+                sql, [(quarantined_by, media_id) for media_id in local_mxcs]
             )
         # remove from quarantine
         else:
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 92cbe262a6..5a3611c415 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1401,7 +1401,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             `to_token`), or `limit` is zero.
         """
 
-        args = [False, room_id]
+        args: List[Any] = [room_id]
 
         order, from_bound, to_bound = generate_pagination_bounds(
             direction, from_token, to_token
@@ -1475,7 +1475,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
                 event.topological_ordering, event.stream_ordering
             FROM events AS event
             %(join_clause)s
-            WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s
+            WHERE event.outlier = FALSE AND event.room_id = ? AND %(bounds)s
             ORDER BY event.topological_ordering %(order)s,
             event.stream_ordering %(order)s LIMIT ?
         """ % {
diff --git a/tests/appservice/test_api.py b/tests/appservice/test_api.py
index 15fce165b6..807dc2f21c 100644
--- a/tests/appservice/test_api.py
+++ b/tests/appservice/test_api.py
@@ -16,7 +16,6 @@ from unittest.mock import Mock
 
 from twisted.test.proto_helpers import MemoryReactor
 
-from synapse.api.errors import HttpResponseException
 from synapse.appservice import ApplicationService
 from synapse.server import HomeServer
 from synapse.types import JsonDict
@@ -107,58 +106,6 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase):
         self.assertEqual(self.request_url, URL_LOCATION)
         self.assertEqual(result, SUCCESS_RESULT_LOCATION)
 
-    def test_fallback(self) -> None:
-        """
-        Tests that the fallback to legacy URLs works.
-        """
-        SUCCESS_RESULT_USER = [
-            {
-                "protocol": PROTOCOL,
-                "userid": "@a:user",
-                "fields": {
-                    "more": "fields",
-                },
-            }
-        ]
-
-        URL_USER = f"{URL}/_matrix/app/v1/thirdparty/user/{PROTOCOL}"
-        FALLBACK_URL_USER = f"{URL}/_matrix/app/unstable/thirdparty/user/{PROTOCOL}"
-
-        self.request_url = None
-        self.v1_seen = False
-
-        async def get_json(
-            url: str,
-            args: Mapping[Any, Any],
-            headers: Mapping[Union[str, bytes], Sequence[Union[str, bytes]]],
-        ) -> List[JsonDict]:
-            # Ensure the access token is passed as both a header and query arg.
-            if not headers.get("Authorization") or not args.get(b"access_token"):
-                raise RuntimeError("Access token not provided")
-
-            self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"])
-            self.assertEqual(args.get(b"access_token"), TOKEN)
-            self.request_url = url
-            if url == URL_USER:
-                self.v1_seen = True
-                raise HttpResponseException(404, "NOT_FOUND", b"NOT_FOUND")
-            elif url == FALLBACK_URL_USER:
-                return SUCCESS_RESULT_USER
-            else:
-                raise RuntimeError(
-                    "URL provided was invalid. This should never be seen."
-                )
-
-        # We assign to a method, which mypy doesn't like.
-        self.api.get_json = Mock(side_effect=get_json)  # type: ignore[assignment]
-
-        result = self.get_success(
-            self.api.query_3pe(self.service, "user", PROTOCOL, {b"some": [b"field"]})
-        )
-        self.assertTrue(self.v1_seen)
-        self.assertEqual(self.request_url, FALLBACK_URL_USER)
-        self.assertEqual(result, SUCCESS_RESULT_USER)
-
     def test_claim_keys(self) -> None:
         """
         Tests that the /keys/claim response is properly parsed for missing
diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py
index 196ceb0b82..ec2f5d30be 100644
--- a/tests/handlers/test_profile.py
+++ b/tests/handlers/test_profile.py
@@ -179,6 +179,16 @@ class ProfileTestCase(unittest.HomeserverTestCase):
 
         self.assertEqual("http://my.server/me.png", avatar_url)
 
+    def test_get_profile_empty_displayname(self) -> None:
+        self.get_success(self.store.set_profile_displayname(self.frank, None))
+        self.get_success(
+            self.store.set_profile_avatar_url(self.frank, "http://my.server/me.png")
+        )
+
+        profile = self.get_success(self.handler.get_profile(self.frank.to_string()))
+
+        self.assertEqual("http://my.server/me.png", profile["avatar_url"])
+
     def test_set_my_avatar(self) -> None:
         self.get_success(
             self.handler.set_avatar_url(
diff --git a/tests/handlers/test_worker_lock.py b/tests/handlers/test_worker_lock.py
new file mode 100644
index 0000000000..73e548726c
--- /dev/null
+++ b/tests/handlers/test_worker_lock.py
@@ -0,0 +1,74 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.server import HomeServer
+from synapse.util import Clock
+
+from tests import unittest
+from tests.replication._base import BaseMultiWorkerStreamTestCase
+
+
+class WorkerLockTestCase(unittest.HomeserverTestCase):
+    def prepare(
+        self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
+    ) -> None:
+        self.worker_lock_handler = self.hs.get_worker_locks_handler()
+
+    def test_wait_for_lock_locally(self) -> None:
+        """Test waiting for a lock on a single worker"""
+
+        lock1 = self.worker_lock_handler.acquire_lock("name", "key")
+        self.get_success(lock1.__aenter__())
+
+        lock2 = self.worker_lock_handler.acquire_lock("name", "key")
+        d2 = defer.ensureDeferred(lock2.__aenter__())
+        self.assertNoResult(d2)
+
+        self.get_success(lock1.__aexit__(None, None, None))
+
+        self.get_success(d2)
+        self.get_success(lock2.__aexit__(None, None, None))
+
+
+class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase):
+    def prepare(
+        self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
+    ) -> None:
+        self.main_worker_lock_handler = self.hs.get_worker_locks_handler()
+
+    def test_wait_for_lock_worker(self) -> None:
+        """Test waiting for a lock on another worker"""
+
+        worker = self.make_worker_hs(
+            "synapse.app.generic_worker",
+            extra_config={
+                "redis": {"enabled": True},
+            },
+        )
+        worker_lock_handler = worker.get_worker_locks_handler()
+
+        lock1 = self.main_worker_lock_handler.acquire_lock("name", "key")
+        self.get_success(lock1.__aenter__())
+
+        lock2 = worker_lock_handler.acquire_lock("name", "key")
+        d2 = defer.ensureDeferred(lock2.__aenter__())
+        self.assertNoResult(d2)
+
+        self.get_success(lock1.__aexit__(None, None, None))
+
+        self.get_success(d2)
+        self.get_success(lock2.__aexit__(None, None, None))
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index d013e75d55..4f6347be15 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -711,7 +711,7 @@ class RoomsCreateTestCase(RoomBase):
         self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
         self.assertTrue("room_id" in channel.json_body)
         assert channel.resource_usage is not None
-        self.assertEqual(30, channel.resource_usage.db_txn_count)
+        self.assertEqual(32, channel.resource_usage.db_txn_count)
 
     def test_post_room_initial_state(self) -> None:
         # POST with initial_state config key, expect new room id
@@ -724,7 +724,7 @@ class RoomsCreateTestCase(RoomBase):
         self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
         self.assertTrue("room_id" in channel.json_body)
         assert channel.resource_usage is not None
-        self.assertEqual(32, channel.resource_usage.db_txn_count)
+        self.assertEqual(34, channel.resource_usage.db_txn_count)
 
     def test_post_room_visibility_key(self) -> None:
         # POST with visibility config key, expect new room id
diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py
index ad454f6dd8..383da83dfb 100644
--- a/tests/storage/databases/main/test_lock.py
+++ b/tests/storage/databases/main/test_lock.py
@@ -448,3 +448,55 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase):
         self.get_success(self.store._on_shutdown())
 
         self.assertEqual(self.store._live_read_write_lock_tokens, {})
+
+    def test_acquire_multiple_locks(self) -> None:
+        """Tests that acquiring multiple locks at once works."""
+
+        # Take out multiple locks and ensure that we can't get those locks out
+        # again.
+        lock = self.get_success(
+            self.store.try_acquire_multi_read_write_lock(
+                [("name1", "key1"), ("name2", "key2")], write=True
+            )
+        )
+        self.assertIsNotNone(lock)
+
+        assert lock is not None
+        self.get_success(lock.__aenter__())
+
+        lock2 = self.get_success(
+            self.store.try_acquire_read_write_lock("name1", "key1", write=True)
+        )
+        self.assertIsNone(lock2)
+
+        lock3 = self.get_success(
+            self.store.try_acquire_read_write_lock("name2", "key2", write=False)
+        )
+        self.assertIsNone(lock3)
+
+        # Overlapping locks attempts will fail, and won't lock any locks.
+        lock4 = self.get_success(
+            self.store.try_acquire_multi_read_write_lock(
+                [("name1", "key1"), ("name3", "key3")], write=True
+            )
+        )
+        self.assertIsNone(lock4)
+
+        lock5 = self.get_success(
+            self.store.try_acquire_read_write_lock("name3", "key3", write=True)
+        )
+        self.assertIsNotNone(lock5)
+        assert lock5 is not None
+        self.get_success(lock5.__aenter__())
+        self.get_success(lock5.__aexit__(None, None, None))
+
+        # Once we release the lock we can take out the locks again.
+        self.get_success(lock.__aexit__(None, None, None))
+
+        lock6 = self.get_success(
+            self.store.try_acquire_read_write_lock("name1", "key1", write=True)
+        )
+        self.assertIsNotNone(lock6)
+        assert lock6 is not None
+        self.get_success(lock6.__aenter__())
+        self.get_success(lock6.__aexit__(None, None, None))