summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--CHANGES.md71
-rw-r--r--Cargo.lock8
-rw-r--r--changelog.d/15708.feature1
-rw-r--r--changelog.d/15820.bugfix1
-rw-r--r--changelog.d/15884.misc1
-rw-r--r--changelog.d/15909.misc1
-rw-r--r--changelog.d/15911.feature1
-rw-r--r--changelog.d/15921.doc1
-rw-r--r--changelog.d/15922.misc1
-rw-r--r--changelog.d/15924.feature1
-rw-r--r--changelog.d/15925.bugfix1
-rw-r--r--changelog.d/15926.misc1
-rw-r--r--changelog.d/15928.removal1
-rw-r--r--changelog.d/15938.doc1
-rw-r--r--changelog.d/15940.misc1
-rw-r--r--contrib/grafana/synapse.json2
-rw-r--r--debian/changelog12
-rw-r--r--docs/setup/installation.md4
-rw-r--r--docs/usage/configuration/config_documentation.md33
-rw-r--r--docs/workers.md24
-rw-r--r--poetry.lock169
-rw-r--r--pyproject.toml2
-rwxr-xr-xscripts-dev/build_debian_packages.py1
-rwxr-xr-xscripts-dev/complement.sh2
-rw-r--r--synapse/api/errors.py7
-rw-r--r--synapse/api/room_versions.py349
-rw-r--r--synapse/app/_base.py2
-rw-r--r--synapse/app/generic_worker.py1
-rw-r--r--synapse/app/homeserver.py1
-rw-r--r--synapse/config/experimental.py21
-rw-r--r--synapse/config/workers.py45
-rw-r--r--synapse/event_auth.py28
-rw-r--r--synapse/events/__init__.py2
-rw-r--r--synapse/events/builder.py2
-rw-r--r--synapse/events/utils.py39
-rw-r--r--synapse/federation/federation_base.py2
-rw-r--r--synapse/federation/federation_client.py6
-rw-r--r--synapse/federation/federation_server.py6
-rw-r--r--synapse/handlers/device.py13
-rw-r--r--synapse/handlers/devicemessage.py108
-rw-r--r--synapse/handlers/event_auth.py4
-rw-r--r--synapse/handlers/federation.py8
-rw-r--r--synapse/handlers/message.py13
-rw-r--r--synapse/handlers/presence.py37
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/room_member.py15
-rw-r--r--synapse/handlers/room_summary.py4
-rw-r--r--synapse/http/client.py7
-rw-r--r--synapse/http/connectproxyclient.py20
-rw-r--r--synapse/http/matrixfederationclient.py142
-rw-r--r--synapse/http/proxy.py283
-rw-r--r--synapse/http/proxyagent.py141
-rw-r--r--synapse/http/server.py51
-rw-r--r--synapse/http/site.py27
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py2
-rw-r--r--synapse/rest/client/devices.py232
-rw-r--r--synapse/rest/client/room.py4
-rw-r--r--synapse/state/__init__.py3
-rw-r--r--synapse/state/v2.py9
-rw-r--r--synapse/storage/controllers/state.py137
-rw-r--r--synapse/storage/database.py2
-rw-r--r--synapse/storage/databases/main/__init__.py4
-rw-r--r--synapse/storage/databases/main/filtering.py5
-rw-r--r--synapse/storage/databases/main/profile.py12
-rw-r--r--synapse/storage/databases/main/room.py2
-rw-r--r--synapse/storage/databases/main/roommember.py122
-rw-r--r--synapse/storage/databases/main/stats.py4
-rw-r--r--synapse/storage/databases/main/user_directory.py13
-rw-r--r--synapse/storage/schema/__init__.py7
-rw-r--r--synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py50
-rw-r--r--synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py54
-rw-r--r--synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.postgres (renamed from synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres)64
-rw-r--r--synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.sqlite (renamed from synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite)59
-rw-r--r--synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py70
-rw-r--r--synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres69
-rw-r--r--synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite65
-rw-r--r--tests/app/test_openid_listener.py8
-rw-r--r--tests/events/test_utils.py39
-rw-r--r--tests/handlers/test_device.py102
-rw-r--r--tests/handlers/test_federation.py2
-rw-r--r--tests/handlers/test_presence.py1
-rw-r--r--tests/handlers/test_typing.py10
-rw-r--r--tests/http/test_matrixfederationclient.py284
-rw-r--r--tests/http/test_proxy.py53
-rw-r--r--tests/http/test_proxyagent.py4
-rw-r--r--tests/replication/_base.py3
-rw-r--r--tests/replication/test_federation_sender_shard.py22
-rw-r--r--tests/rest/admin/test_user.py2
-rw-r--r--tests/rest/client/test_devices.py150
-rw-r--r--tests/rest/client/test_presence.py1
-rw-r--r--tests/rest/client/test_redactions.py21
-rw-r--r--tests/rest/client/test_rooms.py2
-rw-r--r--tests/storage/test_e2e_room_keys.py2
-rw-r--r--tests/storage/test_profile.py63
-rw-r--r--tests/storage/test_purge.py2
-rw-r--r--tests/storage/test_rollback_worker.py4
-rw-r--r--tests/storage/test_user_filters.py94
-rw-r--r--tests/test_server.py33
-rw-r--r--tests/unittest.py1
99 files changed, 2690 insertions, 929 deletions
diff --git a/CHANGES.md b/CHANGES.md
index 22d56a9a01..c0570b1fd0 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,4 +1,65 @@
-# Synapse 1.88.0rc1 (2023-07-11)
+# Synapse 1.89.0rc1 (2023-07-25)
+
+### Features
+
+- Add Unix Socket support for HTTP Replication Listeners. [Document and provide usage instructions](https://matrix-org.github.io/synapse/v1.89/usage/configuration/config_documentation.html#listeners) for utilizing Unix sockets in Synapse. Contributed by Jason Little. ([\#15708](https://github.com/matrix-org/synapse/issues/15708), [\#15924](https://github.com/matrix-org/synapse/issues/15924))
+- Allow `+` in Matrix IDs, per [MSC4009](https://github.com/matrix-org/matrix-spec-proposals/pull/4009). ([\#15911](https://github.com/matrix-org/synapse/issues/15911))
+- Support room version 11 from [MSC3820](https://github.com/matrix-org/matrix-spec-proposals/pull/3820). ([\#15912](https://github.com/matrix-org/synapse/issues/15912))
+- Allow configuring the set of workers to proxy outbound federation traffic through via `outbound_federation_restricted_to`. ([\#15913](https://github.com/matrix-org/synapse/issues/15913), [\#15969](https://github.com/matrix-org/synapse/issues/15969))
+- Implement [MSC3814](https://github.com/matrix-org/matrix-spec-proposals/pull/3814), dehydrated devices v2/shrivelled sessions and move [MSC2697](https://github.com/matrix-org/matrix-spec-proposals/pull/2697) behind a config flag. Contributed by Nico from Famedly, H-Shay and poljar. ([\#15929](https://github.com/matrix-org/synapse/issues/15929))
+
+### Bugfixes
+
+- Fix a long-standing bug where remote invites weren't correctly pushed. ([\#15820](https://github.com/matrix-org/synapse/issues/15820))
+- Fix background schema updates failing over a large upgrade gap. ([\#15887](https://github.com/matrix-org/synapse/issues/15887))
+- Fix a bug introduced in 1.86.0 where Synapse starting with an empty `experimental_features` configuration setting. ([\#15925](https://github.com/matrix-org/synapse/issues/15925))
+- Fixed deploy annotations in the provided Grafana dashboard config, so that it shows for any homeserver and not just matrix.org. Contributed by @wrjlewis. ([\#15957](https://github.com/matrix-org/synapse/issues/15957))
+- Ensure a long state res does not starve CPU by occasionally yielding to the reactor. ([\#15960](https://github.com/matrix-org/synapse/issues/15960))
+- Properly handle redactions of creation events. ([\#15973](https://github.com/matrix-org/synapse/issues/15973))
+- Fix a bug where resyncing stale device lists could block responding to federation transactions, and thus delay receiving new data from the remote server. ([\#15975](https://github.com/matrix-org/synapse/issues/15975))
+
+### Improved Documentation
+
+- Better clarify how to run a worker instance (pass both configs). ([\#15921](https://github.com/matrix-org/synapse/issues/15921))
+- Improve [the documentation](https://matrix-org.github.io/synapse/v1.89/admin_api/user_admin_api.html#login-as-a-user) for the login as a user admin API. ([\#15938](https://github.com/matrix-org/synapse/issues/15938))
+- Fix broken Arch Linux package link. Contributed by @SnipeXandrej. ([\#15981](https://github.com/matrix-org/synapse/issues/15981))
+
+### Deprecations and Removals
+
+- Remove support for calling the `/register` endpoint with an unspecced `user` property for application services. ([\#15928](https://github.com/matrix-org/synapse/issues/15928))
+
+### Internal Changes
+
+- Mark `get_user_in_directory` private since it is only used in tests. Also remove the cache from it. ([\#15884](https://github.com/matrix-org/synapse/issues/15884))
+- Document which Python version runs on a given Linux distribution so we can more easily clean up later. ([\#15909](https://github.com/matrix-org/synapse/issues/15909))
+- Add details to warning in log when we fail to fetch an alias. ([\#15922](https://github.com/matrix-org/synapse/issues/15922))
+- Remove unneeded `__init__`. ([\#15926](https://github.com/matrix-org/synapse/issues/15926))
+- Fix bug with read/write lock implementation. This is currently unused so has no observable effects. ([\#15933](https://github.com/matrix-org/synapse/issues/15933), [\#15958](https://github.com/matrix-org/synapse/issues/15958))
+- Unbreak the nix development environment by pinning the Rust version to 1.70.0. ([\#15940](https://github.com/matrix-org/synapse/issues/15940))
+- Update presence metrics to differentiate remote vs local users. ([\#15952](https://github.com/matrix-org/synapse/issues/15952))
+- Stop reading from column `user_id` of table `profiles`. ([\#15955](https://github.com/matrix-org/synapse/issues/15955))
+- Build packages for Debian Trixie. ([\#15961](https://github.com/matrix-org/synapse/issues/15961))
+- Reduce the amount of state we pull out. ([\#15968](https://github.com/matrix-org/synapse/issues/15968))
+- Speed up updating state in large rooms. ([\#15971](https://github.com/matrix-org/synapse/issues/15971))
+
+### Updates to locked dependencies
+
+* Bump anyhow from 1.0.71 to 1.0.72. ([\#15949](https://github.com/matrix-org/synapse/issues/15949))
+* Bump click from 8.1.3 to 8.1.6. ([\#15984](https://github.com/matrix-org/synapse/issues/15984))
+* Bump cryptography from 41.0.1 to 41.0.2. ([\#15943](https://github.com/matrix-org/synapse/issues/15943))
+* Bump jsonschema from 4.17.3 to 4.18.3. ([\#15948](https://github.com/matrix-org/synapse/issues/15948))
+* Bump pillow from 9.4.0 to 10.0.0. ([\#15986](https://github.com/matrix-org/synapse/issues/15986))
+* Bump prometheus-client from 0.17.0 to 0.17.1. ([\#15945](https://github.com/matrix-org/synapse/issues/15945))
+* Bump pydantic from 1.10.10 to 1.10.11. ([\#15946](https://github.com/matrix-org/synapse/issues/15946))
+* Bump pygithub from 1.58.2 to 1.59.0. ([\#15834](https://github.com/matrix-org/synapse/issues/15834))
+* Bump pyo3-log from 0.8.2 to 0.8.3. ([\#15951](https://github.com/matrix-org/synapse/issues/15951))
+* Bump sentry-sdk from 1.26.0 to 1.28.1. ([\#15985](https://github.com/matrix-org/synapse/issues/15985))
+* Bump serde_json from 1.0.100 to 1.0.103. ([\#15950](https://github.com/matrix-org/synapse/issues/15950))
+* Bump types-pillow from 9.5.0.4 to 10.0.0.1. ([\#15932](https://github.com/matrix-org/synapse/issues/15932))
+* Bump types-requests from 2.31.0.1 to 2.31.0.2. ([\#15983](https://github.com/matrix-org/synapse/issues/15983))
+* Bump typing-extensions from 4.5.0 to 4.7.1. ([\#15947](https://github.com/matrix-org/synapse/issues/15947))
+
+# Synapse 1.88.0 (2023-07-18)
 
 This release
  - raises the minimum supported version of Python to 3.8, as Python 3.7 is now [end-of-life](https://devguide.python.org/versions/), and
@@ -6,6 +67,14 @@ This release
 
 See [the upgrade notes](https://github.com/matrix-org/synapse/blob/release-v1.88/docs/upgrade.md#upgrading-to-v1880) for more information.
 
+
+### Bugfixes
+
+- Revert "Stop writing to column `user_id` of tables `profiles` and `user_filters`", which was introduced in Synapse 1.88.0rc1. ([\#15953](https://github.com/matrix-org/synapse/issues/15953))
+
+
+# Synapse 1.88.0rc1 (2023-07-11)
+
 ### Features
 
 - Add `not_user_type` param to the [list accounts admin API](https://matrix-org.github.io/synapse/v1.88/admin_api/user_admin_api.html#list-accounts). ([\#15844](https://github.com/matrix-org/synapse/issues/15844))
diff --git a/Cargo.lock b/Cargo.lock
index 2264e67245..b29a72a3b8 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -332,18 +332,18 @@ checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd"
 
 [[package]]
 name = "serde"
-version = "1.0.171"
+version = "1.0.175"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "30e27d1e4fd7659406c492fd6cfaf2066ba8773de45ca75e855590f856dc34a9"
+checksum = "5d25439cd7397d044e2748a6fe2432b5e85db703d6d097bd014b3c0ad1ebff0b"
 dependencies = [
  "serde_derive",
 ]
 
 [[package]]
 name = "serde_derive"
-version = "1.0.171"
+version = "1.0.175"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "389894603bd18c46fa56231694f8d827779c0951a667087194cf9de94ed24682"
+checksum = "b23f7ade6f110613c0d63858ddb8b94c1041f550eab58a16b371bdf2c9c80ab4"
 dependencies = [
  "proc-macro2",
  "quote",
diff --git a/changelog.d/15708.feature b/changelog.d/15708.feature
deleted file mode 100644
index 06a6c959ab..0000000000
--- a/changelog.d/15708.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add Unix Socket support for HTTP Replication Listeners. Document and provide usage instructions for utilizing Unix sockets in Synapse. Contributed by Jason Little.
diff --git a/changelog.d/15820.bugfix b/changelog.d/15820.bugfix
deleted file mode 100644
index d259d32061..0000000000
--- a/changelog.d/15820.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix long-standing bug where remote invites weren't correctly pushed.
diff --git a/changelog.d/15884.misc b/changelog.d/15884.misc
deleted file mode 100644
index 8e73a9a6cd..0000000000
--- a/changelog.d/15884.misc
+++ /dev/null
@@ -1 +0,0 @@
-Mark `get_user_in_directory` private since it is only used in tests. Also remove the cache from it.
diff --git a/changelog.d/15909.misc b/changelog.d/15909.misc
deleted file mode 100644
index ba36a97442..0000000000
--- a/changelog.d/15909.misc
+++ /dev/null
@@ -1 +0,0 @@
-Document which Python version runs on a given Linux distribution so we can more easily clean up later.
diff --git a/changelog.d/15911.feature b/changelog.d/15911.feature
deleted file mode 100644
index b24077c6c3..0000000000
--- a/changelog.d/15911.feature
+++ /dev/null
@@ -1 +0,0 @@
-Allow `+` in Matrix IDs, per [MSC4009](https://github.com/matrix-org/matrix-spec-proposals/pull/4009).
diff --git a/changelog.d/15921.doc b/changelog.d/15921.doc
deleted file mode 100644
index 02f34c73d5..0000000000
--- a/changelog.d/15921.doc
+++ /dev/null
@@ -1 +0,0 @@
-Better clarify how to run a worker instance (pass both configs).
diff --git a/changelog.d/15922.misc b/changelog.d/15922.misc
deleted file mode 100644
index 93fc644877..0000000000
--- a/changelog.d/15922.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add details to warning in log when we fail to fetch an alias.
diff --git a/changelog.d/15924.feature b/changelog.d/15924.feature
deleted file mode 100644
index 06a6c959ab..0000000000
--- a/changelog.d/15924.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add Unix Socket support for HTTP Replication Listeners. Document and provide usage instructions for utilizing Unix sockets in Synapse. Contributed by Jason Little.
diff --git a/changelog.d/15925.bugfix b/changelog.d/15925.bugfix
deleted file mode 100644
index e3ef783576..0000000000
--- a/changelog.d/15925.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a bug introduced in 1.86.0 where Synapse starting with an empty `experimental_features` configuration setting.
diff --git a/changelog.d/15926.misc b/changelog.d/15926.misc
deleted file mode 100644
index bf4c0fa5d0..0000000000
--- a/changelog.d/15926.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove unneeded `__init__`.
diff --git a/changelog.d/15928.removal b/changelog.d/15928.removal
deleted file mode 100644
index 5563213d31..0000000000
--- a/changelog.d/15928.removal
+++ /dev/null
@@ -1 +0,0 @@
-Remove support for calling the `/register` endpoint with an unspecced `user` property for application services.
diff --git a/changelog.d/15938.doc b/changelog.d/15938.doc
deleted file mode 100644
index 8d99e5f4ea..0000000000
--- a/changelog.d/15938.doc
+++ /dev/null
@@ -1 +0,0 @@
-Improve the documentation for the login as a user admin API.
diff --git a/changelog.d/15940.misc b/changelog.d/15940.misc
deleted file mode 100644
index eac008eb3e..0000000000
--- a/changelog.d/15940.misc
+++ /dev/null
@@ -1 +0,0 @@
-Unbreak the nix development environment by pinning the Rust version to 1.70.0.
\ No newline at end of file
diff --git a/contrib/grafana/synapse.json b/contrib/grafana/synapse.json
index f3253b32b9..90f449aa76 100644
--- a/contrib/grafana/synapse.json
+++ b/contrib/grafana/synapse.json
@@ -63,7 +63,7 @@
           "uid": "${DS_PROMETHEUS}"
         },
         "enable": true,
-        "expr": "changes(process_start_time_seconds{instance=\"matrix.org\",job=~\"synapse\"}[$bucket_size]) * on (instance, job) group_left(version) synapse_build_info{instance=\"matrix.org\",job=\"synapse\"}",
+        "expr": "changes(process_start_time_seconds{instance=\"$instance\",job=~\"synapse\"}[$bucket_size]) * on (instance, job) group_left(version) synapse_build_info{instance=\"$instance\",job=\"synapse\"}",
         "iconColor": "purple",
         "name": "deploys",
         "titleFormat": "Deployed {{version}}"
diff --git a/debian/changelog b/debian/changelog
index 763edb8ec2..384edbdab1 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,15 @@
+matrix-synapse-py3 (1.89.0~rc1) stable; urgency=medium
+
+  * New Synapse release 1.89.0rc1.
+
+ -- Synapse Packaging team <packages@matrix.org>  Tue, 25 Jul 2023 14:31:07 +0200
+
+matrix-synapse-py3 (1.88.0) stable; urgency=medium
+
+  * New Synapse release 1.88.0.
+
+ -- Synapse Packaging team <packages@matrix.org>  Tue, 18 Jul 2023 13:59:28 +0100
+
 matrix-synapse-py3 (1.88.0~rc1) stable; urgency=medium
 
   * New Synapse release 1.88.0rc1.
diff --git a/docs/setup/installation.md b/docs/setup/installation.md
index 4ca8c6b697..479f7ea543 100644
--- a/docs/setup/installation.md
+++ b/docs/setup/installation.md
@@ -135,8 +135,8 @@ Unofficial package are built for SLES 15 in the openSUSE:Backports:SLE-15 reposi
 
 #### ArchLinux
 
-The quickest way to get up and running with ArchLinux is probably with the community package
-<https://archlinux.org/packages/community/x86_64/matrix-synapse/>, which should pull in most of
+The quickest way to get up and running with ArchLinux is probably with the package provided by ArchLinux
+<https://archlinux.org/packages/extra/x86_64/matrix-synapse/>, which should pull in most of
 the necessary dependencies.
 
 pip may be outdated (6.0.7-1 and needs to be upgraded to 6.0.8-1 ):
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 22cd1772dc..4e6fcd085a 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -3960,13 +3960,14 @@ federation_sender_instances:
 ---
 ### `instance_map`
 
-When using workers this should be a map from [`worker_name`](#worker_name) to the
-HTTP replication listener of the worker, if configured, and to the main process.
-Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs
-a HTTP replication listener, and that listener should be included in the `instance_map`.
-The main process also needs an entry on the `instance_map`, and it should be listed under
-`main` **if even one other worker exists**. Ensure the port matches with what is declared 
-inside the `listener` block for a `replication` listener.
+When using workers this should be a map from [`worker_name`](#worker_name) to the HTTP
+replication listener of the worker, if configured, and to the main process. Each worker
+declared under [`stream_writers`](../../workers.md#stream-writers) and
+[`outbound_federation_restricted_to`](#outbound_federation_restricted_to) needs a HTTP
+replication listener, and that listener should be included in the `instance_map`. The
+main process also needs an entry on the `instance_map`, and it should be listed under
+`main` **if even one other worker exists**. Ensure the port matches with what is
+declared inside the `listener` block for a `replication` listener.
 
 
 Example configuration:
@@ -4004,6 +4005,24 @@ stream_writers:
   typing: worker1
 ```
 ---
+### `outbound_federation_restricted_to`
+
+When using workers, you can restrict outbound federation traffic to only go through a
+specific subset of workers. Any worker specified here must also be in the
+[`instance_map`](#instance_map).
+[`worker_replication_secret`](#worker_replication_secret) must also be configured to
+authorize inter-worker communication.
+
+```yaml
+outbound_federation_restricted_to:
+  - federation_sender1
+  - federation_sender2
+```
+
+Also see the [worker
+documentation](../../workers.md#restrict-outbound-federation-traffic-to-a-specific-set-of-workers)
+for more info.
+---
 ### `run_background_tasks_on`
 
 The [worker](../../workers.md#background-tasks) that is used to run
diff --git a/docs/workers.md b/docs/workers.md
index cf9c0add82..24bd22724e 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -531,6 +531,30 @@ the stream writer for the `presence` stream:
 
     ^/_matrix/client/(api/v1|r0|v3|unstable)/presence/
 
+#### Restrict outbound federation traffic to a specific set of workers
+
+The
+[`outbound_federation_restricted_to`](usage/configuration/config_documentation.md#outbound_federation_restricted_to)
+configuration is useful to make sure outbound federation traffic only goes through a
+specified subset of workers. This allows you to set more strict access controls (like a
+firewall) for all workers and only allow the `federation_sender`'s to contact the
+outside world.
+
+```yaml
+instance_map:
+    main:
+        host: localhost
+        port: 8030
+    federation_sender1:
+        host: localhost
+        port: 8034
+
+outbound_federation_restricted_to:
+  - federation_sender1
+
+worker_replication_secret: "secret_secret"
+```
+
 #### Background tasks
 
 There is also support for moving background tasks to a separate
diff --git a/poetry.lock b/poetry.lock
index 27c8b103e5..d5b30a11c4 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -397,13 +397,13 @@ files = [
 
 [[package]]
 name = "click"
-version = "8.1.3"
+version = "8.1.6"
 description = "Composable command line interface toolkit"
 optional = false
 python-versions = ">=3.7"
 files = [
-    {file = "click-8.1.3-py3-none-any.whl", hash = "sha256:bb4d8133cb15a609f44e8213d9b391b0809795062913b383c62be0ee95b1db48"},
-    {file = "click-8.1.3.tar.gz", hash = "sha256:7682dc8afb30297001674575ea00d1814d808d6a36af415a82bd481d37ba7b8e"},
+    {file = "click-8.1.6-py3-none-any.whl", hash = "sha256:fa244bb30b3b5ee2cae3da8f55c9e5e0c0e86093306301fb418eb9dc40fbded5"},
+    {file = "click-8.1.6.tar.gz", hash = "sha256:48ee849951919527a045bfe3bf7baa8a959c423134e1a5b98c05c20ba75a1cbd"},
 ]
 
 [package.dependencies]
@@ -1621,92 +1621,71 @@ files = [
 
 [[package]]
 name = "pillow"
-version = "9.4.0"
+version = "10.0.0"
 description = "Python Imaging Library (Fork)"
 optional = false
-python-versions = ">=3.7"
+python-versions = ">=3.8"
 files = [
-    {file = "Pillow-9.4.0-1-cp310-cp310-macosx_10_10_x86_64.whl", hash = "sha256:1b4b4e9dda4f4e4c4e6896f93e84a8f0bcca3b059de9ddf67dac3c334b1195e1"},
-    {file = "Pillow-9.4.0-1-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:fb5c1ad6bad98c57482236a21bf985ab0ef42bd51f7ad4e4538e89a997624e12"},
-    {file = "Pillow-9.4.0-1-cp37-cp37m-macosx_10_10_x86_64.whl", hash = "sha256:f0caf4a5dcf610d96c3bd32932bfac8aee61c96e60481c2a0ea58da435e25acd"},
-    {file = "Pillow-9.4.0-1-cp38-cp38-macosx_10_10_x86_64.whl", hash = "sha256:3f4cc516e0b264c8d4ccd6b6cbc69a07c6d582d8337df79be1e15a5056b258c9"},
-    {file = "Pillow-9.4.0-1-cp39-cp39-macosx_10_10_x86_64.whl", hash = "sha256:b8c2f6eb0df979ee99433d8b3f6d193d9590f735cf12274c108bd954e30ca858"},
-    {file = "Pillow-9.4.0-1-pp38-pypy38_pp73-macosx_10_10_x86_64.whl", hash = "sha256:b70756ec9417c34e097f987b4d8c510975216ad26ba6e57ccb53bc758f490dab"},
-    {file = "Pillow-9.4.0-1-pp39-pypy39_pp73-macosx_10_10_x86_64.whl", hash = "sha256:43521ce2c4b865d385e78579a082b6ad1166ebed2b1a2293c3be1d68dd7ca3b9"},
-    {file = "Pillow-9.4.0-2-cp310-cp310-macosx_10_10_x86_64.whl", hash = "sha256:9d9a62576b68cd90f7075876f4e8444487db5eeea0e4df3ba298ee38a8d067b0"},
-    {file = "Pillow-9.4.0-2-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:87708d78a14d56a990fbf4f9cb350b7d89ee8988705e58e39bdf4d82c149210f"},
-    {file = "Pillow-9.4.0-2-cp37-cp37m-macosx_10_10_x86_64.whl", hash = "sha256:8a2b5874d17e72dfb80d917213abd55d7e1ed2479f38f001f264f7ce7bae757c"},
-    {file = "Pillow-9.4.0-2-cp38-cp38-macosx_10_10_x86_64.whl", hash = "sha256:83125753a60cfc8c412de5896d10a0a405e0bd88d0470ad82e0869ddf0cb3848"},
-    {file = "Pillow-9.4.0-2-cp39-cp39-macosx_10_10_x86_64.whl", hash = "sha256:9e5f94742033898bfe84c93c831a6f552bb629448d4072dd312306bab3bd96f1"},
-    {file = "Pillow-9.4.0-2-pp38-pypy38_pp73-macosx_10_10_x86_64.whl", hash = "sha256:013016af6b3a12a2f40b704677f8b51f72cb007dac785a9933d5c86a72a7fe33"},
-    {file = "Pillow-9.4.0-2-pp39-pypy39_pp73-macosx_10_10_x86_64.whl", hash = "sha256:99d92d148dd03fd19d16175b6d355cc1b01faf80dae93c6c3eb4163709edc0a9"},
-    {file = "Pillow-9.4.0-cp310-cp310-macosx_10_10_x86_64.whl", hash = "sha256:2968c58feca624bb6c8502f9564dd187d0e1389964898f5e9e1fbc8533169157"},
-    {file = "Pillow-9.4.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:c5c1362c14aee73f50143d74389b2c158707b4abce2cb055b7ad37ce60738d47"},
-    {file = "Pillow-9.4.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:bd752c5ff1b4a870b7661234694f24b1d2b9076b8bf337321a814c612665f343"},
-    {file = "Pillow-9.4.0-cp310-cp310-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:9a3049a10261d7f2b6514d35bbb7a4dfc3ece4c4de14ef5876c4b7a23a0e566d"},
-    {file = "Pillow-9.4.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:16a8df99701f9095bea8a6c4b3197da105df6f74e6176c5b410bc2df2fd29a57"},
-    {file = "Pillow-9.4.0-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:94cdff45173b1919350601f82d61365e792895e3c3a3443cf99819e6fbf717a5"},
-    {file = "Pillow-9.4.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:ed3e4b4e1e6de75fdc16d3259098de7c6571b1a6cc863b1a49e7d3d53e036070"},
-    {file = "Pillow-9.4.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:d5b2f8a31bd43e0f18172d8ac82347c8f37ef3e0b414431157718aa234991b28"},
-    {file = "Pillow-9.4.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:09b89ddc95c248ee788328528e6a2996e09eaccddeeb82a5356e92645733be35"},
-    {file = "Pillow-9.4.0-cp310-cp310-win32.whl", hash = "sha256:f09598b416ba39a8f489c124447b007fe865f786a89dbfa48bb5cf395693132a"},
-    {file = "Pillow-9.4.0-cp310-cp310-win_amd64.whl", hash = "sha256:f6e78171be3fb7941f9910ea15b4b14ec27725865a73c15277bc39f5ca4f8391"},
-    {file = "Pillow-9.4.0-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:3fa1284762aacca6dc97474ee9c16f83990b8eeb6697f2ba17140d54b453e133"},
-    {file = "Pillow-9.4.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:eaef5d2de3c7e9b21f1e762f289d17b726c2239a42b11e25446abf82b26ac132"},
-    {file = "Pillow-9.4.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a4dfdae195335abb4e89cc9762b2edc524f3c6e80d647a9a81bf81e17e3fb6f0"},
-    {file = "Pillow-9.4.0-cp311-cp311-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:6abfb51a82e919e3933eb137e17c4ae9c0475a25508ea88993bb59faf82f3b35"},
-    {file = "Pillow-9.4.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:451f10ef963918e65b8869e17d67db5e2f4ab40e716ee6ce7129b0cde2876eab"},
-    {file = "Pillow-9.4.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:6663977496d616b618b6cfa43ec86e479ee62b942e1da76a2c3daa1c75933ef4"},
-    {file = "Pillow-9.4.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:60e7da3a3ad1812c128750fc1bc14a7ceeb8d29f77e0a2356a8fb2aa8925287d"},
-    {file = "Pillow-9.4.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:19005a8e58b7c1796bc0167862b1f54a64d3b44ee5d48152b06bb861458bc0f8"},
-    {file = "Pillow-9.4.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:f715c32e774a60a337b2bb8ad9839b4abf75b267a0f18806f6f4f5f1688c4b5a"},
-    {file = "Pillow-9.4.0-cp311-cp311-win32.whl", hash = "sha256:b222090c455d6d1a64e6b7bb5f4035c4dff479e22455c9eaa1bdd4c75b52c80c"},
-    {file = "Pillow-9.4.0-cp311-cp311-win_amd64.whl", hash = "sha256:ba6612b6548220ff5e9df85261bddc811a057b0b465a1226b39bfb8550616aee"},
-    {file = "Pillow-9.4.0-cp37-cp37m-macosx_10_10_x86_64.whl", hash = "sha256:5f532a2ad4d174eb73494e7397988e22bf427f91acc8e6ebf5bb10597b49c493"},
-    {file = "Pillow-9.4.0-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5dd5a9c3091a0f414a963d427f920368e2b6a4c2f7527fdd82cde8ef0bc7a327"},
-    {file = "Pillow-9.4.0-cp37-cp37m-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:ef21af928e807f10bf4141cad4746eee692a0dd3ff56cfb25fce076ec3cc8abe"},
-    {file = "Pillow-9.4.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:847b114580c5cc9ebaf216dd8c8dbc6b00a3b7ab0131e173d7120e6deade1f57"},
-    {file = "Pillow-9.4.0-cp37-cp37m-manylinux_2_28_aarch64.whl", hash = "sha256:653d7fb2df65efefbcbf81ef5fe5e5be931f1ee4332c2893ca638c9b11a409c4"},
-    {file = "Pillow-9.4.0-cp37-cp37m-manylinux_2_28_x86_64.whl", hash = "sha256:46f39cab8bbf4a384ba7cb0bc8bae7b7062b6a11cfac1ca4bc144dea90d4a9f5"},
-    {file = "Pillow-9.4.0-cp37-cp37m-win32.whl", hash = "sha256:7ac7594397698f77bce84382929747130765f66406dc2cd8b4ab4da68ade4c6e"},
-    {file = "Pillow-9.4.0-cp37-cp37m-win_amd64.whl", hash = "sha256:46c259e87199041583658457372a183636ae8cd56dbf3f0755e0f376a7f9d0e6"},
-    {file = "Pillow-9.4.0-cp38-cp38-macosx_10_10_x86_64.whl", hash = "sha256:0e51f608da093e5d9038c592b5b575cadc12fd748af1479b5e858045fff955a9"},
-    {file = "Pillow-9.4.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:765cb54c0b8724a7c12c55146ae4647e0274a839fb6de7bcba841e04298e1011"},
-    {file = "Pillow-9.4.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:519e14e2c49fcf7616d6d2cfc5c70adae95682ae20f0395e9280db85e8d6c4df"},
-    {file = "Pillow-9.4.0-cp38-cp38-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d197df5489004db87d90b918033edbeee0bd6df3848a204bca3ff0a903bef837"},
-    {file = "Pillow-9.4.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0845adc64fe9886db00f5ab68c4a8cd933ab749a87747555cec1c95acea64b0b"},
-    {file = "Pillow-9.4.0-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:e1339790c083c5a4de48f688b4841f18df839eb3c9584a770cbd818b33e26d5d"},
-    {file = "Pillow-9.4.0-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:a96e6e23f2b79433390273eaf8cc94fec9c6370842e577ab10dabdcc7ea0a66b"},
-    {file = "Pillow-9.4.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:7cfc287da09f9d2a7ec146ee4d72d6ea1342e770d975e49a8621bf54eaa8f30f"},
-    {file = "Pillow-9.4.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:d7081c084ceb58278dd3cf81f836bc818978c0ccc770cbbb202125ddabec6628"},
-    {file = "Pillow-9.4.0-cp38-cp38-win32.whl", hash = "sha256:df41112ccce5d47770a0c13651479fbcd8793f34232a2dd9faeccb75eb5d0d0d"},
-    {file = "Pillow-9.4.0-cp38-cp38-win_amd64.whl", hash = "sha256:7a21222644ab69ddd9967cfe6f2bb420b460dae4289c9d40ff9a4896e7c35c9a"},
-    {file = "Pillow-9.4.0-cp39-cp39-macosx_10_10_x86_64.whl", hash = "sha256:0f3269304c1a7ce82f1759c12ce731ef9b6e95b6df829dccd9fe42912cc48569"},
-    {file = "Pillow-9.4.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:cb362e3b0976dc994857391b776ddaa8c13c28a16f80ac6522c23d5257156bed"},
-    {file = "Pillow-9.4.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a2e0f87144fcbbe54297cae708c5e7f9da21a4646523456b00cc956bd4c65815"},
-    {file = "Pillow-9.4.0-cp39-cp39-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:28676836c7796805914b76b1837a40f76827ee0d5398f72f7dcc634bae7c6264"},
-    {file = "Pillow-9.4.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0884ba7b515163a1a05440a138adeb722b8a6ae2c2b33aea93ea3118dd3a899e"},
-    {file = "Pillow-9.4.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:53dcb50fbdc3fb2c55431a9b30caeb2f7027fcd2aeb501459464f0214200a503"},
-    {file = "Pillow-9.4.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:e8c5cf126889a4de385c02a2c3d3aba4b00f70234bfddae82a5eaa3ee6d5e3e6"},
-    {file = "Pillow-9.4.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:6c6b1389ed66cdd174d040105123a5a1bc91d0aa7059c7261d20e583b6d8cbd2"},
-    {file = "Pillow-9.4.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:0dd4c681b82214b36273c18ca7ee87065a50e013112eea7d78c7a1b89a739153"},
-    {file = "Pillow-9.4.0-cp39-cp39-win32.whl", hash = "sha256:6d9dfb9959a3b0039ee06c1a1a90dc23bac3b430842dcb97908ddde05870601c"},
-    {file = "Pillow-9.4.0-cp39-cp39-win_amd64.whl", hash = "sha256:54614444887e0d3043557d9dbc697dbb16cfb5a35d672b7a0fcc1ed0cf1c600b"},
-    {file = "Pillow-9.4.0-pp38-pypy38_pp73-macosx_10_10_x86_64.whl", hash = "sha256:b9b752ab91e78234941e44abdecc07f1f0d8f51fb62941d32995b8161f68cfe5"},
-    {file = "Pillow-9.4.0-pp38-pypy38_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d3b56206244dc8711f7e8b7d6cad4663917cd5b2d950799425076681e8766286"},
-    {file = "Pillow-9.4.0-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aabdab8ec1e7ca7f1434d042bf8b1e92056245fb179790dc97ed040361f16bfd"},
-    {file = "Pillow-9.4.0-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:db74f5562c09953b2c5f8ec4b7dfd3f5421f31811e97d1dbc0a7c93d6e3a24df"},
-    {file = "Pillow-9.4.0-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:e9d7747847c53a16a729b6ee5e737cf170f7a16611c143d95aa60a109a59c336"},
-    {file = "Pillow-9.4.0-pp39-pypy39_pp73-macosx_10_10_x86_64.whl", hash = "sha256:b52ff4f4e002f828ea6483faf4c4e8deea8d743cf801b74910243c58acc6eda3"},
-    {file = "Pillow-9.4.0-pp39-pypy39_pp73-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:575d8912dca808edd9acd6f7795199332696d3469665ef26163cd090fa1f8bfa"},
-    {file = "Pillow-9.4.0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c3c4ed2ff6760e98d262e0cc9c9a7f7b8a9f61aa4d47c58835cdaf7b0b8811bb"},
-    {file = "Pillow-9.4.0-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:e621b0246192d3b9cb1dc62c78cfa4c6f6d2ddc0ec207d43c0dedecb914f152a"},
-    {file = "Pillow-9.4.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:8f127e7b028900421cad64f51f75c051b628db17fb00e099eb148761eed598c9"},
-    {file = "Pillow-9.4.0.tar.gz", hash = "sha256:a1c2d7780448eb93fbcc3789bf3916aa5720d942e37945f4056680317f1cd23e"},
-]
-
-[package.extras]
-docs = ["furo", "olefile", "sphinx (>=2.4)", "sphinx-copybutton", "sphinx-inline-tabs", "sphinx-issues (>=3.0.1)", "sphinx-removed-in", "sphinxext-opengraph"]
+    {file = "Pillow-10.0.0-cp310-cp310-macosx_10_10_x86_64.whl", hash = "sha256:1f62406a884ae75fb2f818694469519fb685cc7eaff05d3451a9ebe55c646891"},
+    {file = "Pillow-10.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:d5db32e2a6ccbb3d34d87c87b432959e0db29755727afb37290e10f6e8e62614"},
+    {file = "Pillow-10.0.0-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:edf4392b77bdc81f36e92d3a07a5cd072f90253197f4a52a55a8cec48a12483b"},
+    {file = "Pillow-10.0.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:520f2a520dc040512699f20fa1c363eed506e94248d71f85412b625026f6142c"},
+    {file = "Pillow-10.0.0-cp310-cp310-manylinux_2_28_aarch64.whl", hash = "sha256:8c11160913e3dd06c8ffdb5f233a4f254cb449f4dfc0f8f4549eda9e542c93d1"},
+    {file = "Pillow-10.0.0-cp310-cp310-manylinux_2_28_x86_64.whl", hash = "sha256:a74ba0c356aaa3bb8e3eb79606a87669e7ec6444be352870623025d75a14a2bf"},
+    {file = "Pillow-10.0.0-cp310-cp310-musllinux_1_1_aarch64.whl", hash = "sha256:d5d0dae4cfd56969d23d94dc8e89fb6a217be461c69090768227beb8ed28c0a3"},
+    {file = "Pillow-10.0.0-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:22c10cc517668d44b211717fd9775799ccec4124b9a7f7b3635fc5386e584992"},
+    {file = "Pillow-10.0.0-cp310-cp310-win_amd64.whl", hash = "sha256:dffe31a7f47b603318c609f378ebcd57f1554a3a6a8effbc59c3c69f804296de"},
+    {file = "Pillow-10.0.0-cp311-cp311-macosx_10_10_x86_64.whl", hash = "sha256:9fb218c8a12e51d7ead2a7c9e101a04982237d4855716af2e9499306728fb485"},
+    {file = "Pillow-10.0.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:d35e3c8d9b1268cbf5d3670285feb3528f6680420eafe35cccc686b73c1e330f"},
+    {file = "Pillow-10.0.0-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3ed64f9ca2f0a95411e88a4efbd7a29e5ce2cea36072c53dd9d26d9c76f753b3"},
+    {file = "Pillow-10.0.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0b6eb5502f45a60a3f411c63187db83a3d3107887ad0d036c13ce836f8a36f1d"},
+    {file = "Pillow-10.0.0-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:c1fbe7621c167ecaa38ad29643d77a9ce7311583761abf7836e1510c580bf3dd"},
+    {file = "Pillow-10.0.0-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:cd25d2a9d2b36fcb318882481367956d2cf91329f6892fe5d385c346c0649629"},
+    {file = "Pillow-10.0.0-cp311-cp311-musllinux_1_1_aarch64.whl", hash = "sha256:3b08d4cc24f471b2c8ca24ec060abf4bebc6b144cb89cba638c720546b1cf538"},
+    {file = "Pillow-10.0.0-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:d737a602fbd82afd892ca746392401b634e278cb65d55c4b7a8f48e9ef8d008d"},
+    {file = "Pillow-10.0.0-cp311-cp311-win_amd64.whl", hash = "sha256:3a82c40d706d9aa9734289740ce26460a11aeec2d9c79b7af87bb35f0073c12f"},
+    {file = "Pillow-10.0.0-cp311-cp311-win_arm64.whl", hash = "sha256:bc2ec7c7b5d66b8ec9ce9f720dbb5fa4bace0f545acd34870eff4a369b44bf37"},
+    {file = "Pillow-10.0.0-cp312-cp312-macosx_10_10_x86_64.whl", hash = "sha256:d80cf684b541685fccdd84c485b31ce73fc5c9b5d7523bf1394ce134a60c6883"},
+    {file = "Pillow-10.0.0-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:76de421f9c326da8f43d690110f0e79fe3ad1e54be811545d7d91898b4c8493e"},
+    {file = "Pillow-10.0.0-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:81ff539a12457809666fef6624684c008e00ff6bf455b4b89fd00a140eecd640"},
+    {file = "Pillow-10.0.0-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ce543ed15570eedbb85df19b0a1a7314a9c8141a36ce089c0a894adbfccb4568"},
+    {file = "Pillow-10.0.0-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:685ac03cc4ed5ebc15ad5c23bc555d68a87777586d970c2c3e216619a5476223"},
+    {file = "Pillow-10.0.0-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:d72e2ecc68a942e8cf9739619b7f408cc7b272b279b56b2c83c6123fcfa5cdff"},
+    {file = "Pillow-10.0.0-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:d50b6aec14bc737742ca96e85d6d0a5f9bfbded018264b3b70ff9d8c33485551"},
+    {file = "Pillow-10.0.0-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:00e65f5e822decd501e374b0650146063fbb30a7264b4d2744bdd7b913e0cab5"},
+    {file = "Pillow-10.0.0-cp312-cp312-win_amd64.whl", hash = "sha256:f31f9fdbfecb042d046f9d91270a0ba28368a723302786c0009ee9b9f1f60199"},
+    {file = "Pillow-10.0.0-cp312-cp312-win_arm64.whl", hash = "sha256:1ce91b6ec08d866b14413d3f0bbdea7e24dfdc8e59f562bb77bc3fe60b6144ca"},
+    {file = "Pillow-10.0.0-cp38-cp38-macosx_10_10_x86_64.whl", hash = "sha256:349930d6e9c685c089284b013478d6f76e3a534e36ddfa912cde493f235372f3"},
+    {file = "Pillow-10.0.0-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:3a684105f7c32488f7153905a4e3015a3b6c7182e106fe3c37fbb5ef3e6994c3"},
+    {file = "Pillow-10.0.0-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:b4f69b3700201b80bb82c3a97d5e9254084f6dd5fb5b16fc1a7b974260f89f43"},
+    {file = "Pillow-10.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:3f07ea8d2f827d7d2a49ecf1639ec02d75ffd1b88dcc5b3a61bbb37a8759ad8d"},
+    {file = "Pillow-10.0.0-cp38-cp38-manylinux_2_28_aarch64.whl", hash = "sha256:040586f7d37b34547153fa383f7f9aed68b738992380ac911447bb78f2abe530"},
+    {file = "Pillow-10.0.0-cp38-cp38-manylinux_2_28_x86_64.whl", hash = "sha256:f88a0b92277de8e3ca715a0d79d68dc82807457dae3ab8699c758f07c20b3c51"},
+    {file = "Pillow-10.0.0-cp38-cp38-musllinux_1_1_aarch64.whl", hash = "sha256:c7cf14a27b0d6adfaebb3ae4153f1e516df54e47e42dcc073d7b3d76111a8d86"},
+    {file = "Pillow-10.0.0-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:3400aae60685b06bb96f99a21e1ada7bc7a413d5f49bce739828ecd9391bb8f7"},
+    {file = "Pillow-10.0.0-cp38-cp38-win_amd64.whl", hash = "sha256:dbc02381779d412145331789b40cc7b11fdf449e5d94f6bc0b080db0a56ea3f0"},
+    {file = "Pillow-10.0.0-cp39-cp39-macosx_10_10_x86_64.whl", hash = "sha256:9211e7ad69d7c9401cfc0e23d49b69ca65ddd898976d660a2fa5904e3d7a9baa"},
+    {file = "Pillow-10.0.0-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:faaf07ea35355b01a35cb442dd950d8f1bb5b040a7787791a535de13db15ed90"},
+    {file = "Pillow-10.0.0-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:c9f72a021fbb792ce98306ffb0c348b3c9cb967dce0f12a49aa4c3d3fdefa967"},
+    {file = "Pillow-10.0.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9f7c16705f44e0504a3a2a14197c1f0b32a95731d251777dcb060aa83022cb2d"},
+    {file = "Pillow-10.0.0-cp39-cp39-manylinux_2_28_aarch64.whl", hash = "sha256:76edb0a1fa2b4745fb0c99fb9fb98f8b180a1bbceb8be49b087e0b21867e77d3"},
+    {file = "Pillow-10.0.0-cp39-cp39-manylinux_2_28_x86_64.whl", hash = "sha256:368ab3dfb5f49e312231b6f27b8820c823652b7cd29cfbd34090565a015e99ba"},
+    {file = "Pillow-10.0.0-cp39-cp39-musllinux_1_1_aarch64.whl", hash = "sha256:608bfdee0d57cf297d32bcbb3c728dc1da0907519d1784962c5f0c68bb93e5a3"},
+    {file = "Pillow-10.0.0-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5c6e3df6bdd396749bafd45314871b3d0af81ff935b2d188385e970052091017"},
+    {file = "Pillow-10.0.0-cp39-cp39-win_amd64.whl", hash = "sha256:7be600823e4c8631b74e4a0d38384c73f680e6105a7d3c6824fcf226c178c7e6"},
+    {file = "Pillow-10.0.0-pp310-pypy310_pp73-macosx_10_10_x86_64.whl", hash = "sha256:92be919bbc9f7d09f7ae343c38f5bb21c973d2576c1d45600fce4b74bafa7ac0"},
+    {file = "Pillow-10.0.0-pp310-pypy310_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8f8182b523b2289f7c415f589118228d30ac8c355baa2f3194ced084dac2dbba"},
+    {file = "Pillow-10.0.0-pp310-pypy310_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:38250a349b6b390ee6047a62c086d3817ac69022c127f8a5dc058c31ccef17f3"},
+    {file = "Pillow-10.0.0-pp310-pypy310_pp73-win_amd64.whl", hash = "sha256:88af2003543cc40c80f6fca01411892ec52b11021b3dc22ec3bc9d5afd1c5334"},
+    {file = "Pillow-10.0.0-pp39-pypy39_pp73-macosx_10_10_x86_64.whl", hash = "sha256:c189af0545965fa8d3b9613cfdb0cd37f9d71349e0f7750e1fd704648d475ed2"},
+    {file = "Pillow-10.0.0-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ce7b031a6fc11365970e6a5686d7ba8c63e4c1cf1ea143811acbb524295eabed"},
+    {file = "Pillow-10.0.0-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:db24668940f82321e746773a4bc617bfac06ec831e5c88b643f91f122a785684"},
+    {file = "Pillow-10.0.0-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:efe8c0681042536e0d06c11f48cebe759707c9e9abf880ee213541c5b46c5bf3"},
+    {file = "Pillow-10.0.0.tar.gz", hash = "sha256:9c82b5b3e043c7af0d95792d0d20ccf68f61a1fec6b3530e718b688422727396"},
+]
+
+[package.extras]
+docs = ["furo", "olefile", "sphinx (>=2.4)", "sphinx-copybutton", "sphinx-inline-tabs", "sphinx-removed-in", "sphinxext-opengraph"]
 tests = ["check-manifest", "coverage", "defusedxml", "markdown2", "olefile", "packaging", "pyroma", "pytest", "pytest-cov", "pytest-timeout"]
 
 [[package]]
@@ -1902,13 +1881,13 @@ email = ["email-validator (>=1.0.3)"]
 
 [[package]]
 name = "pygithub"
-version = "1.58.2"
+version = "1.59.0"
 description = "Use the full Github API v3"
 optional = false
 python-versions = ">=3.7"
 files = [
-    {file = "PyGithub-1.58.2-py3-none-any.whl", hash = "sha256:f435884af617c6debaa76cbc355372d1027445a56fbc39972a3b9ed4968badc8"},
-    {file = "PyGithub-1.58.2.tar.gz", hash = "sha256:1e6b1b7afe31f75151fb81f7ab6b984a7188a852bdb123dbb9ae90023c3ce60f"},
+    {file = "PyGithub-1.59.0-py3-none-any.whl", hash = "sha256:126bdbae72087d8d038b113aab6b059b4553cb59348e3024bb1a1cae406ace9e"},
+    {file = "PyGithub-1.59.0.tar.gz", hash = "sha256:6e05ff49bac3caa7d1d6177a10c6e55a3e20c85b92424cc198571fd0cf786690"},
 ]
 
 [package.dependencies]
@@ -2406,13 +2385,13 @@ doc = ["Sphinx", "sphinx-rtd-theme"]
 
 [[package]]
 name = "sentry-sdk"
-version = "1.26.0"
+version = "1.28.1"
 description = "Python client for Sentry (https://sentry.io)"
 optional = true
 python-versions = "*"
 files = [
-    {file = "sentry-sdk-1.26.0.tar.gz", hash = "sha256:760e4fb6d01c994110507133e08ecd4bdf4d75ee4be77f296a3579796cf73134"},
-    {file = "sentry_sdk-1.26.0-py2.py3-none-any.whl", hash = "sha256:0c9f858337ec3781cf4851972ef42bba8c9828aea116b0dbed8f38c5f9a1896c"},
+    {file = "sentry-sdk-1.28.1.tar.gz", hash = "sha256:dcd88c68aa64dae715311b5ede6502fd684f70d00a7cd4858118f0ba3153a3ae"},
+    {file = "sentry_sdk-1.28.1-py2.py3-none-any.whl", hash = "sha256:6bdb25bd9092478d3a817cb0d01fa99e296aea34d404eac3ca0037faa5c2aa0a"},
 ]
 
 [package.dependencies]
@@ -3059,13 +3038,13 @@ files = [
 
 [[package]]
 name = "types-requests"
-version = "2.31.0.1"
+version = "2.31.0.2"
 description = "Typing stubs for requests"
 optional = false
 python-versions = "*"
 files = [
-    {file = "types-requests-2.31.0.1.tar.gz", hash = "sha256:3de667cffa123ce698591de0ad7db034a5317457a596eb0b4944e5a9d9e8d1ac"},
-    {file = "types_requests-2.31.0.1-py3-none-any.whl", hash = "sha256:afb06ef8f25ba83d59a1d424bd7a5a939082f94b94e90ab5e6116bd2559deaa3"},
+    {file = "types-requests-2.31.0.2.tar.gz", hash = "sha256:6aa3f7faf0ea52d728bb18c0a0d1522d9bfd8c72d26ff6f61bfc3d06a411cf40"},
+    {file = "types_requests-2.31.0.2-py3-none-any.whl", hash = "sha256:56d181c85b5925cbc59f4489a57e72a8b2166f18273fd8ba7b6fe0c0b986f12a"},
 ]
 
 [package.dependencies]
diff --git a/pyproject.toml b/pyproject.toml
index d56602b2df..89c5edb4db 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -89,7 +89,7 @@ manifest-path = "rust/Cargo.toml"
 
 [tool.poetry]
 name = "matrix-synapse"
-version = "1.88.0rc1"
+version = "1.89.0rc1"
 description = "Homeserver for the Matrix decentralised comms protocol"
 authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
 license = "Apache-2.0"
diff --git a/scripts-dev/build_debian_packages.py b/scripts-dev/build_debian_packages.py
index 1954835474..bb89ba581c 100755
--- a/scripts-dev/build_debian_packages.py
+++ b/scripts-dev/build_debian_packages.py
@@ -34,6 +34,7 @@ DISTS = (
     "ubuntu:jammy",  # 22.04 LTS (EOL 2027-04) (our EOL forced by Python 3.10 is 2026-10-04)
     "ubuntu:kinetic",  # 22.10 (EOL 2023-07-20) (our EOL forced by Python 3.10 is 2026-10-04)
     "ubuntu:lunar",  # 23.04 (EOL 2024-01) (our EOL forced by Python 3.11 is 2027-10-24)
+    "debian:trixie",  # (EOL not specified yet)
 )
 
 DESC = """\
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh
index fea76cb5af..8416b55674 100755
--- a/scripts-dev/complement.sh
+++ b/scripts-dev/complement.sh
@@ -214,7 +214,7 @@ fi
 
 extra_test_args=()
 
-test_tags="synapse_blacklist,msc3787,msc3874,msc3890,msc3391,msc3930,faster_joins"
+test_tags="synapse_blacklist,msc3874,msc3890,msc3391,msc3930,faster_joins"
 
 # All environment variables starting with PASS_ will be shared.
 # (The prefix is stripped off before reaching the container.)
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index af894243f8..3546aaf7c3 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -217,6 +217,13 @@ class InvalidAPICallError(SynapseError):
         super().__init__(HTTPStatus.BAD_REQUEST, msg, Codes.BAD_JSON)
 
 
+class InvalidProxyCredentialsError(SynapseError):
+    """Error raised when the proxy credentials are invalid."""
+
+    def __init__(self, msg: str, errcode: str = Codes.UNKNOWN):
+        super().__init__(401, msg, errcode)
+
+
 class ProxiedRequestError(SynapseError):
     """An error from a general matrix endpoint, eg. from a proxied Matrix API call.
 
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index d9fdda9745..9b8a19ece5 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -80,36 +80,29 @@ class RoomVersion:
     # MSC2209: Check 'notifications' key while verifying
     # m.room.power_levels auth rules.
     limit_notifications_power_levels: bool
-    # MSC2175: No longer include the creator in m.room.create events.
-    msc2175_implicit_room_creator: bool
-    # MSC2174/MSC2176: Apply updated redaction rules algorithm, move redacts to
-    # content property.
-    msc2176_redaction_rules: bool
-    # MSC3083: Support the 'restricted' join_rule.
-    msc3083_join_rules: bool
-    # MSC3375: Support for the proper redaction rules for MSC3083. This mustn't
-    #          be enabled if MSC3083 is not.
-    msc3375_redaction_rules: bool
-    # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending
-    # m.room.membership event with membership 'knock'.
-    msc2403_knocking: bool
+    # No longer include the creator in m.room.create events.
+    implicit_room_creator: bool
+    # Apply updated redaction rules algorithm from room version 11.
+    updated_redaction_rules: bool
+    # Support the 'restricted' join rule.
+    restricted_join_rule: bool
+    # Support for the proper redaction rules for the restricted join rule. This requires
+    # restricted_join_rule to be enabled.
+    restricted_join_rule_fix: bool
+    # Support the 'knock' join rule.
+    knock_join_rule: bool
     # MSC3389: Protect relation information from redaction.
     msc3389_relation_redactions: bool
-    # MSC3787: Adds support for a `knock_restricted` join rule, mixing concepts of
-    # knocks and restricted join rules into the same join condition.
-    msc3787_knock_restricted_join_rule: bool
-    # MSC3667: Enforce integer power levels
-    msc3667_int_only_power_levels: bool
-    # MSC3821: Do not redact the third_party_invite content field for membership events.
-    msc3821_redaction_rules: bool
+    # Support the 'knock_restricted' join rule.
+    knock_restricted_join_rule: bool
+    # Enforce integer power levels
+    enforce_int_power_levels: bool
     # MSC3931: Adds a push rule condition for "room version feature flags", making
     # some push rules room version dependent. Note that adding a flag to this list
     # is not enough to mark it "supported": the push rule evaluator also needs to
     # support the flag. Unknown flags are ignored by the evaluator, making conditions
     # fail if used.
     msc3931_push_features: Tuple[str, ...]  # values from PushRuleRoomFlag
-    # MSC3989: Redact the origin field.
-    msc3989_redaction_rules: bool
     linearized_matrix: bool
 
 
@@ -123,17 +116,15 @@ class RoomVersions:
         special_case_aliases_auth=True,
         strict_canonicaljson=False,
         limit_notifications_power_levels=False,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=False,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=False,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=False,
+        restricted_join_rule_fix=False,
+        knock_join_rule=False,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
+        knock_restricted_join_rule=False,
+        enforce_int_power_levels=False,
         msc3931_push_features=(),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
     V2 = RoomVersion(
@@ -145,17 +136,15 @@ class RoomVersions:
         special_case_aliases_auth=True,
         strict_canonicaljson=False,
         limit_notifications_power_levels=False,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=False,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=False,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=False,
+        restricted_join_rule_fix=False,
+        knock_join_rule=False,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
+        knock_restricted_join_rule=False,
+        enforce_int_power_levels=False,
         msc3931_push_features=(),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
     V3 = RoomVersion(
@@ -167,17 +156,15 @@ class RoomVersions:
         special_case_aliases_auth=True,
         strict_canonicaljson=False,
         limit_notifications_power_levels=False,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=False,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=False,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=False,
+        restricted_join_rule_fix=False,
+        knock_join_rule=False,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
+        knock_restricted_join_rule=False,
+        enforce_int_power_levels=False,
         msc3931_push_features=(),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
     V4 = RoomVersion(
@@ -189,17 +176,15 @@ class RoomVersions:
         special_case_aliases_auth=True,
         strict_canonicaljson=False,
         limit_notifications_power_levels=False,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=False,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=False,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=False,
+        restricted_join_rule_fix=False,
+        knock_join_rule=False,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
+        knock_restricted_join_rule=False,
+        enforce_int_power_levels=False,
         msc3931_push_features=(),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
     V5 = RoomVersion(
@@ -211,17 +196,15 @@ class RoomVersions:
         special_case_aliases_auth=True,
         strict_canonicaljson=False,
         limit_notifications_power_levels=False,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=False,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=False,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=False,
+        restricted_join_rule_fix=False,
+        knock_join_rule=False,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
+        knock_restricted_join_rule=False,
+        enforce_int_power_levels=False,
         msc3931_push_features=(),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
     V6 = RoomVersion(
@@ -233,39 +216,15 @@ class RoomVersions:
         special_case_aliases_auth=False,
         strict_canonicaljson=True,
         limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=False,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=False,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=False,
+        restricted_join_rule_fix=False,
+        knock_join_rule=False,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
+        knock_restricted_join_rule=False,
+        enforce_int_power_levels=False,
         msc3931_push_features=(),
-        msc3989_redaction_rules=False,
-        linearized_matrix=False,
-    )
-    MSC2176 = RoomVersion(
-        "org.matrix.msc2176",
-        RoomDisposition.UNSTABLE,
-        EventFormatVersions.ROOM_V4_PLUS,
-        StateResolutionVersions.V2,
-        enforce_key_validity=True,
-        special_case_aliases_auth=False,
-        strict_canonicaljson=True,
-        limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=True,
-        msc3083_join_rules=False,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=False,
-        msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
-        msc3931_push_features=(),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
     V7 = RoomVersion(
@@ -277,17 +236,15 @@ class RoomVersions:
         special_case_aliases_auth=False,
         strict_canonicaljson=True,
         limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=False,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=True,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=False,
+        restricted_join_rule_fix=False,
+        knock_join_rule=True,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
+        knock_restricted_join_rule=False,
+        enforce_int_power_levels=False,
         msc3931_push_features=(),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
     V8 = RoomVersion(
@@ -299,17 +256,15 @@ class RoomVersions:
         special_case_aliases_auth=False,
         strict_canonicaljson=True,
         limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=True,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=True,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=True,
+        restricted_join_rule_fix=False,
+        knock_join_rule=True,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
+        knock_restricted_join_rule=False,
+        enforce_int_power_levels=False,
         msc3931_push_features=(),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
     V9 = RoomVersion(
@@ -321,61 +276,15 @@ class RoomVersions:
         special_case_aliases_auth=False,
         strict_canonicaljson=True,
         limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=True,
-        msc3375_redaction_rules=True,
-        msc2403_knocking=True,
-        msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
-        msc3931_push_features=(),
-        msc3989_redaction_rules=False,
-        linearized_matrix=False,
-    )
-    MSC3787 = RoomVersion(
-        "org.matrix.msc3787",
-        RoomDisposition.UNSTABLE,
-        EventFormatVersions.ROOM_V4_PLUS,
-        StateResolutionVersions.V2,
-        enforce_key_validity=True,
-        special_case_aliases_auth=False,
-        strict_canonicaljson=True,
-        limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=True,
-        msc3375_redaction_rules=True,
-        msc2403_knocking=True,
-        msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=True,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=False,
-        msc3931_push_features=(),
-        msc3989_redaction_rules=False,
-        linearized_matrix=False,
-    )
-    MSC3821 = RoomVersion(
-        "org.matrix.msc3821.opt1",
-        RoomDisposition.UNSTABLE,
-        EventFormatVersions.ROOM_V4_PLUS,
-        StateResolutionVersions.V2,
-        enforce_key_validity=True,
-        special_case_aliases_auth=False,
-        strict_canonicaljson=True,
-        limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=True,
-        msc3375_redaction_rules=True,
-        msc2403_knocking=True,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=True,
+        restricted_join_rule_fix=True,
+        knock_join_rule=True,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=False,
-        msc3821_redaction_rules=True,
+        knock_restricted_join_rule=False,
+        enforce_int_power_levels=False,
         msc3931_push_features=(),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
     V10 = RoomVersion(
@@ -387,17 +296,15 @@ class RoomVersions:
         special_case_aliases_auth=False,
         strict_canonicaljson=True,
         limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=True,
-        msc3375_redaction_rules=True,
-        msc2403_knocking=True,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=True,
+        restricted_join_rule_fix=True,
+        knock_join_rule=True,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=True,
-        msc3667_int_only_power_levels=True,
-        msc3821_redaction_rules=False,
+        knock_restricted_join_rule=True,
+        enforce_int_power_levels=True,
         msc3931_push_features=(),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
     MSC1767v10 = RoomVersion(
@@ -410,62 +317,35 @@ class RoomVersions:
         special_case_aliases_auth=False,
         strict_canonicaljson=True,
         limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=True,
-        msc3375_redaction_rules=True,
-        msc2403_knocking=True,
+        implicit_room_creator=False,
+        updated_redaction_rules=False,
+        restricted_join_rule=True,
+        restricted_join_rule_fix=True,
+        knock_join_rule=True,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=True,
-        msc3667_int_only_power_levels=True,
-        msc3821_redaction_rules=False,
+        knock_restricted_join_rule=True,
+        enforce_int_power_levels=True,
         msc3931_push_features=(PushRuleRoomFlag.EXTENSIBLE_EVENTS,),
-        msc3989_redaction_rules=False,
         linearized_matrix=False,
     )
-    MSC3989 = RoomVersion(
-        "org.matrix.msc3989",
-        RoomDisposition.UNSTABLE,
-        EventFormatVersions.ROOM_V4_PLUS,
-        StateResolutionVersions.V2,
-        enforce_key_validity=True,
-        special_case_aliases_auth=False,
-        strict_canonicaljson=True,
-        limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=False,
-        msc2176_redaction_rules=False,
-        msc3083_join_rules=True,
-        msc3375_redaction_rules=True,
-        msc2403_knocking=True,
-        msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=True,
-        msc3667_int_only_power_levels=True,
-        msc3821_redaction_rules=False,
-        msc3931_push_features=(),
-        msc3989_redaction_rules=True,
-        linearized_matrix=False,
-    )
-    MSC3820opt2 = RoomVersion(
-        # Based upon v10
-        "org.matrix.msc3820.opt2",
-        RoomDisposition.UNSTABLE,
+    V11 = RoomVersion(
+        "11",
+        RoomDisposition.STABLE,
         EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
         strict_canonicaljson=True,
         limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=True,  # Used by MSC3820
-        msc2176_redaction_rules=True,  # Used by MSC3820
-        msc3083_join_rules=True,
-        msc3375_redaction_rules=True,
-        msc2403_knocking=True,
+        implicit_room_creator=True,  # Used by MSC3820
+        updated_redaction_rules=True,  # Used by MSC3820
+        restricted_join_rule=True,
+        restricted_join_rule_fix=True,
+        knock_join_rule=True,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=True,
-        msc3667_int_only_power_levels=True,
-        msc3821_redaction_rules=True,  # Used by MSC3820
+        knock_restricted_join_rule=True,
+        enforce_int_power_levels=True,
         msc3931_push_features=(),
-        msc3989_redaction_rules=True,  # Used by MSC3820
         linearized_matrix=False,
     )
     # Based on room version 11:
@@ -482,17 +362,15 @@ class RoomVersions:
         special_case_aliases_auth=False,
         strict_canonicaljson=True,
         limit_notifications_power_levels=True,
-        msc2175_implicit_room_creator=True,
-        msc2176_redaction_rules=True,
-        msc3083_join_rules=False,
-        msc3375_redaction_rules=False,
-        msc2403_knocking=True,
+        implicit_room_creator=True,
+        updated_redaction_rules=True,
+        restricted_join_rule=True,
+        restricted_join_rule_fix=True,
+        knock_join_rule=True,
         msc3389_relation_redactions=False,
-        msc3787_knock_restricted_join_rule=False,
-        msc3667_int_only_power_levels=True,
-        msc3821_redaction_rules=True,
+        knock_restricted_join_rule=True,
+        enforce_int_power_levels=True,
         msc3931_push_features=(),
-        msc3989_redaction_rules=True,
         linearized_matrix=True,
     )
 
@@ -506,14 +384,11 @@ KNOWN_ROOM_VERSIONS: Dict[str, RoomVersion] = {
         RoomVersions.V4,
         RoomVersions.V5,
         RoomVersions.V6,
-        RoomVersions.MSC2176,
         RoomVersions.V7,
         RoomVersions.V8,
         RoomVersions.V9,
-        RoomVersions.MSC3787,
         RoomVersions.V10,
-        RoomVersions.MSC3989,
-        RoomVersions.MSC3820opt2,
+        RoomVersions.V11,
         RoomVersions.LINEARIZED,
     )
 }
@@ -543,12 +418,12 @@ MSC3244_CAPABILITIES = {
         RoomVersionCapability(
             "knock",
             RoomVersions.V7,
-            lambda room_version: room_version.msc2403_knocking,
+            lambda room_version: room_version.knock_join_rule,
         ),
         RoomVersionCapability(
             "restricted",
             RoomVersions.V9,
-            lambda room_version: room_version.msc3083_join_rules,
+            lambda room_version: room_version.restricted_join_rule,
         ),
     )
 }
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 936b1b0430..a94b57a671 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -386,6 +386,7 @@ def listen_unix(
 
 
 def listen_http(
+    hs: "HomeServer",
     listener_config: ListenerConfig,
     root_resource: Resource,
     version_string: str,
@@ -406,6 +407,7 @@ def listen_http(
         version_string,
         max_request_body_size=max_request_body_size,
         reactor=reactor,
+        hs=hs,
     )
 
     if isinstance(listener_config, TCPListenerConfig):
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 7406c3948c..dc79efcc14 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -221,6 +221,7 @@ class GenericWorkerServer(HomeServer):
         root_resource = create_resource_tree(resources, OptionsResource())
 
         _base.listen_http(
+            self,
             listener_config,
             root_resource,
             self.version_string,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 84236ac299..f188c7265a 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -139,6 +139,7 @@ class SynapseHomeServer(HomeServer):
             root_resource = OptionsResource()
 
         ports = listen_http(
+            self,
             listener_config,
             create_resource_tree(resources, root_resource),
             self.version_string,
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 0970f22a75..1695ed8ca3 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -247,6 +247,27 @@ class ExperimentalConfig(Config):
         # MSC3026 (busy presence state)
         self.msc3026_enabled: bool = experimental.get("msc3026_enabled", False)
 
+        # MSC2697 (device dehydration)
+        # Enabled by default since this option was added after adding the feature.
+        # It is not recommended that both MSC2697 and MSC3814 both be enabled at
+        # once.
+        self.msc2697_enabled: bool = experimental.get("msc2697_enabled", True)
+
+        # MSC3814 (dehydrated devices with SSSS)
+        # This is an alternative method to achieve the same goals as MSC2697.
+        # It is not recommended that both MSC2697 and MSC3814 both be enabled at
+        # once.
+        self.msc3814_enabled: bool = experimental.get("msc3814_enabled", False)
+
+        if self.msc2697_enabled and self.msc3814_enabled:
+            raise ConfigError(
+                "MSC2697 and MSC3814 should not both be enabled.",
+                (
+                    "experimental_features",
+                    "msc3814_enabled",
+                ),
+            )
+
         # MSC3244 (room version capabilities)
         self.msc3244_enabled: bool = experimental.get("msc3244_enabled", True)
 
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index e55ca12a36..6567fb6bb0 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -15,7 +15,7 @@
 
 import argparse
 import logging
-from typing import Any, Dict, List, Union
+from typing import Any, Dict, List, Optional, Union
 
 import attr
 from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
@@ -171,6 +171,27 @@ class WriterLocations:
     )
 
 
+@attr.s(auto_attribs=True)
+class OutboundFederationRestrictedTo:
+    """Whether we limit outbound federation to a certain set of instances.
+
+    Attributes:
+        instances: optional list of instances that can make outbound federation
+            requests. If None then all instances can make federation requests.
+        locations: list of instance locations to connect to proxy via.
+    """
+
+    instances: Optional[List[str]]
+    locations: List[InstanceLocationConfig] = attr.Factory(list)
+
+    def __contains__(self, instance: str) -> bool:
+        # It feels a bit dirty to return `True` if `instances` is `None`, but it makes
+        # sense in downstream usage in the sense that if
+        # `outbound_federation_restricted_to` is not configured, then any instance can
+        # talk to federation (no restrictions so always return `True`).
+        return self.instances is None or instance in self.instances
+
+
 class WorkerConfig(Config):
     """The workers are processes run separately to the main synapse process.
     They have their own pid_file and listener configuration. They use the
@@ -385,6 +406,28 @@ class WorkerConfig(Config):
             new_option_name="update_user_directory_from_worker",
         )
 
+        outbound_federation_restricted_to = config.get(
+            "outbound_federation_restricted_to", None
+        )
+        self.outbound_federation_restricted_to = OutboundFederationRestrictedTo(
+            outbound_federation_restricted_to
+        )
+        if outbound_federation_restricted_to:
+            if not self.worker_replication_secret:
+                raise ConfigError(
+                    "`worker_replication_secret` must be configured when using `outbound_federation_restricted_to`."
+                )
+
+            for instance in outbound_federation_restricted_to:
+                if instance not in self.instance_map:
+                    raise ConfigError(
+                        "Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config."
+                        % (instance,)
+                    )
+                self.outbound_federation_restricted_to.locations.append(
+                    self.instance_map[instance]
+                )
+
     def _should_this_worker_perform_duty(
         self,
         config: Dict[str, Any],
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 1f9c572a48..97e69c56c9 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -135,7 +135,7 @@ def validate_event_for_room_version(event: "EventBase") -> None:
             raise AuthError(403, "Event not signed by hub server")
 
     is_invite_via_allow_rule = (
-        event.room_version.msc3083_join_rules
+        event.room_version.restricted_join_rule
         and event.type == EventTypes.Member
         and event.membership == Membership.JOIN
         and EventContentFields.AUTHORISING_USER in event.content
@@ -368,11 +368,9 @@ LENIENT_EVENT_BYTE_LIMITS_ROOM_VERSIONS = {
     RoomVersions.V4,
     RoomVersions.V5,
     RoomVersions.V6,
-    RoomVersions.MSC2176,
     RoomVersions.V7,
     RoomVersions.V8,
     RoomVersions.V9,
-    RoomVersions.MSC3787,
     RoomVersions.V10,
     RoomVersions.MSC1767v10,
 }
@@ -465,7 +463,7 @@ def _check_create(event: "EventBase") -> None:
 
     # 1.4 If content has no creator field, reject if the room version requires it.
     if (
-        not event.room_version.msc2175_implicit_room_creator
+        not event.room_version.implicit_room_creator
         and EventContentFields.ROOM_CREATOR not in event.content
     ):
         raise AuthError(403, "Create event lacks a 'creator' property")
@@ -502,7 +500,7 @@ def _is_membership_change_allowed(
         key = (EventTypes.Create, "")
         create = auth_events.get(key)
         if create and event.prev_event_ids()[0] == create.event_id:
-            if room_version.msc2175_implicit_room_creator:
+            if room_version.implicit_room_creator:
                 creator = create.sender
             else:
                 creator = create.content[EventContentFields.ROOM_CREATOR]
@@ -525,7 +523,7 @@ def _is_membership_change_allowed(
     caller_invited = caller and caller.membership == Membership.INVITE
     caller_knocked = (
         caller
-        and room_version.msc2403_knocking
+        and room_version.knock_join_rule
         and caller.membership == Membership.KNOCK
     )
 
@@ -625,9 +623,9 @@ def _is_membership_change_allowed(
         elif join_rule == JoinRules.PUBLIC:
             pass
         elif (
-            room_version.msc3083_join_rules and join_rule == JoinRules.RESTRICTED
+            room_version.restricted_join_rule and join_rule == JoinRules.RESTRICTED
         ) or (
-            room_version.msc3787_knock_restricted_join_rule
+            room_version.knock_restricted_join_rule
             and join_rule == JoinRules.KNOCK_RESTRICTED
         ):
             # This is the same as public, but the event must contain a reference
@@ -657,9 +655,9 @@ def _is_membership_change_allowed(
 
         elif (
             join_rule == JoinRules.INVITE
-            or (room_version.msc2403_knocking and join_rule == JoinRules.KNOCK)
+            or (room_version.knock_join_rule and join_rule == JoinRules.KNOCK)
             or (
-                room_version.msc3787_knock_restricted_join_rule
+                room_version.knock_restricted_join_rule
                 and join_rule == JoinRules.KNOCK_RESTRICTED
             )
         ):
@@ -693,9 +691,9 @@ def _is_membership_change_allowed(
                 "You don't have permission to ban",
                 errcode=Codes.INSUFFICIENT_POWER,
             )
-    elif room_version.msc2403_knocking and Membership.KNOCK == membership:
+    elif room_version.knock_join_rule and Membership.KNOCK == membership:
         if join_rule != JoinRules.KNOCK and (
-            not room_version.msc3787_knock_restricted_join_rule
+            not room_version.knock_restricted_join_rule
             or join_rule != JoinRules.KNOCK_RESTRICTED
         ):
             raise AuthError(403, "You don't have permission to knock")
@@ -852,7 +850,7 @@ def _check_power_levels(
     # Reject events with stringy power levels if required by room version
     if (
         event.type == EventTypes.PowerLevels
-        and room_version_obj.msc3667_int_only_power_levels
+        and room_version_obj.enforce_int_power_levels
     ):
         for k, v in event.content.items():
             if k in {
@@ -988,7 +986,7 @@ def get_user_power_level(user_id: str, auth_events: StateMap["EventBase"]) -> in
         key = (EventTypes.Create, "")
         create_event = auth_events.get(key)
         if create_event is not None:
-            if create_event.room_version.msc2175_implicit_room_creator:
+            if create_event.room_version.implicit_room_creator:
                 creator = create_event.sender
             else:
                 creator = create_event.content[EventContentFields.ROOM_CREATOR]
@@ -1132,7 +1130,7 @@ def auth_types_for_event(
                 )
                 auth_types.add(key)
 
-        if room_version.msc3083_join_rules and membership == Membership.JOIN:
+        if room_version.restricted_join_rule and membership == Membership.JOIN:
             if EventContentFields.AUTHORISING_USER in event.content:
                 key = (
                     EventTypes.Member,
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index f95ec7dad7..eaf6fcf2f9 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -353,7 +353,7 @@ class EventBase(metaclass=abc.ABCMeta):
     @property
     def redacts(self) -> Optional[str]:
         """MSC2176 moved the redacts field into the content."""
-        if self.room_version.msc2176_redaction_rules:
+        if self.room_version.updated_redaction_rules:
             return self.content.get("redacts")
         return self.get("redacts")
 
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 9efaff77c9..e4ed46c756 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -183,7 +183,7 @@ class EventBuilder:
 
         # MSC2174 moves the redacts property to the content, it is invalid to
         # provide it as a top-level property.
-        if self._redacts is not None and not self.room_version.msc2176_redaction_rules:
+        if self._redacts is not None and not self.room_version.updated_redaction_rules:
             event_dict["redacts"] = self._redacts
 
         if self._origin_server_ts is not None:
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index c692b4008d..f834d5dad8 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -108,19 +108,16 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
         "origin_server_ts",
     ]
 
-    # Room versions from before MSC2176 had additional allowed keys.
-    if not room_version.msc2176_redaction_rules:
-        allowed_keys.extend(["prev_state", "membership"])
+    # Earlier room versions from had additional allowed keys.
+    if not room_version.updated_redaction_rules:
+        allowed_keys.extend(["prev_state", "membership", "origin"])
+
     # The hub server should not be redacted for linear matrix.
     if room_version.linearized_matrix:
         allowed_keys.append("hub_server")
     else:
         allowed_keys.append("depth")
 
-    # Room versions before MSC3989 kept the origin field.
-    if not room_version.msc3989_redaction_rules:
-        allowed_keys.append("origin")
-
     event_type = event_dict["type"]
 
     new_content = {}
@@ -132,9 +129,9 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
 
     if event_type == EventTypes.Member:
         add_fields("membership")
-        if room_version.msc3375_redaction_rules:
+        if room_version.restricted_join_rule_fix:
             add_fields(EventContentFields.AUTHORISING_USER)
-        if room_version.msc3821_redaction_rules:
+        if room_version.updated_redaction_rules:
             # Preserve the signed field under third_party_invite.
             third_party_invite = event_dict["content"].get("third_party_invite")
             if isinstance(third_party_invite, collections.abc.Mapping):
@@ -145,15 +142,16 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
                     ]
 
     elif event_type == EventTypes.Create:
-        # MSC2176 rules state that create events cannot have their `content` redacted.
-        if room_version.msc2176_redaction_rules:
+        if room_version.updated_redaction_rules:
+            # MSC2176 rules state that create events cannot have their `content` redacted.
             new_content = event_dict["content"]
-
-        if not room_version.msc2175_implicit_room_creator:
+        elif not room_version.implicit_room_creator:
+            # Some room versions give meaning to `creator`
             add_fields("creator")
+
     elif event_type == EventTypes.JoinRules:
         add_fields("join_rule")
-        if room_version.msc3083_join_rules:
+        if room_version.restricted_join_rule:
             add_fields("allow")
     elif event_type == EventTypes.PowerLevels:
         add_fields(
@@ -167,14 +165,14 @@ def prune_event_dict(room_version: RoomVersion, event_dict: JsonDict) -> JsonDic
             "redact",
         )
 
-        if room_version.msc2176_redaction_rules:
+        if room_version.updated_redaction_rules:
             add_fields("invite")
 
     elif event_type == EventTypes.Aliases and room_version.special_case_aliases_auth:
         add_fields("aliases")
     elif event_type == EventTypes.RoomHistoryVisibility:
         add_fields("history_visibility")
-    elif event_type == EventTypes.Redaction and room_version.msc2176_redaction_rules:
+    elif event_type == EventTypes.Redaction and room_version.updated_redaction_rules:
         add_fields("redacts")
 
     # Protect the rel_type and event_id fields under the m.relates_to field.
@@ -483,6 +481,15 @@ def serialize_event(
     if config.as_client_event:
         d = config.event_format(d)
 
+    # If the event is a redaction, copy the redacts field from the content to
+    # top-level for backwards compatibility.
+    if (
+        e.type == EventTypes.Redaction
+        and e.room_version.updated_redaction_rules
+        and e.redacts is not None
+    ):
+        d["redacts"] = e.redacts
+
     only_event_fields = config.only_event_fields
     if only_event_fields:
         if not isinstance(only_event_fields, list) or not all(
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 21cb3e4675..400b74006c 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -232,7 +232,7 @@ async def _check_sigs_on_pdu(
     # If this is a join event for a restricted room it may have been authorised
     # via a different server from the sending server. Check those signatures.
     if (
-        room_version.msc3083_join_rules
+        room_version.restricted_join_rule
         and pdu.type == EventTypes.Member
         and pdu.membership == Membership.JOIN
         and EventContentFields.AUTHORISING_USER in pdu.content
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index b473a5c31b..5c79ddee0c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -1008,7 +1008,7 @@ class FederationClient(FederationBase):
             if not room_version:
                 raise UnsupportedRoomVersionError()
 
-            if not room_version.msc2403_knocking and membership == Membership.KNOCK:
+            if not room_version.knock_join_rule and membership == Membership.KNOCK:
                 raise SynapseError(
                     400,
                     "This room version does not support knocking",
@@ -1094,7 +1094,7 @@ class FederationClient(FederationBase):
             # * Ensure the signatures are good.
             #
             # Otherwise, fallback to the provided event.
-            if room_version.msc3083_join_rules and response.event:
+            if room_version.restricted_join_rule and response.event:
                 event = response.event
 
                 valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
@@ -1220,7 +1220,7 @@ class FederationClient(FederationBase):
 
         # MSC3083 defines additional error codes for room joins.
         failover_errcodes = None
-        if room_version.msc3083_join_rules:
+        if room_version.restricted_join_rule:
             failover_errcodes = (
                 Codes.UNABLE_AUTHORISE_JOIN,
                 Codes.UNABLE_TO_GRANT_JOIN,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 2a8b177162..24f726a4dd 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -857,7 +857,7 @@ class FederationServer(FederationBase):
             raise IncompatibleRoomVersionError(room_version=room_version.identifier)
 
         # Check that this room supports knocking as defined by its room version
-        if not room_version.msc2403_knocking:
+        if not room_version.knock_join_rule:
             raise SynapseError(
                 403,
                 "This room version does not support knocking",
@@ -965,7 +965,7 @@ class FederationServer(FederationBase):
                 errcode=Codes.NOT_FOUND,
             )
 
-        if membership_type == Membership.KNOCK and not room_version.msc2403_knocking:
+        if membership_type == Membership.KNOCK and not room_version.knock_join_rule:
             raise SynapseError(
                 403,
                 "This room version does not support knocking",
@@ -994,7 +994,7 @@ class FederationServer(FederationBase):
         # the event is valid to be sent into the room. Currently this is only done
         # if the user is being joined via restricted join rules.
         if (
-            room_version.msc3083_join_rules
+            room_version.restricted_join_rule
             and event.membership == Membership.JOIN
             and EventContentFields.AUTHORISING_USER in event.content
         ):
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 5d12a39e26..f3a713f5fa 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -653,6 +653,7 @@ class DeviceHandler(DeviceWorkerHandler):
     async def store_dehydrated_device(
         self,
         user_id: str,
+        device_id: Optional[str],
         device_data: JsonDict,
         initial_device_display_name: Optional[str] = None,
     ) -> str:
@@ -661,6 +662,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         Args:
             user_id: the user that we are storing the device for
+            device_id: device id supplied by client
             device_data: the dehydrated device information
             initial_device_display_name: The display name to use for the device
         Returns:
@@ -668,7 +670,7 @@ class DeviceHandler(DeviceWorkerHandler):
         """
         device_id = await self.check_device_registered(
             user_id,
-            None,
+            device_id,
             initial_device_display_name,
         )
         old_device_id = await self.store.store_dehydrated_device(
@@ -1124,7 +1126,14 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
                 )
 
             if resync:
-                await self.multi_user_device_resync([user_id])
+                # We mark as stale up front in case we get restarted.
+                await self.store.mark_remote_users_device_caches_as_stale([user_id])
+                run_as_background_process(
+                    "_maybe_retry_device_resync",
+                    self.multi_user_device_resync,
+                    [user_id],
+                    False,
+                )
             else:
                 # Simply update the single device, since we know that is the only
                 # change (because of the single prev_id matching the current cache)
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 3caf9b31cc..15e94a03cb 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -13,10 +13,11 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, Any, Dict
+from http import HTTPStatus
+from typing import TYPE_CHECKING, Any, Dict, Optional
 
 from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
-from synapse.api.errors import SynapseError
+from synapse.api.errors import Codes, SynapseError
 from synapse.api.ratelimiting import Ratelimiter
 from synapse.logging.context import run_in_background
 from synapse.logging.opentracing import (
@@ -48,6 +49,9 @@ class DeviceMessageHandler:
         self.store = hs.get_datastores().main
         self.notifier = hs.get_notifier()
         self.is_mine = hs.is_mine
+        if hs.config.experimental.msc3814_enabled:
+            self.event_sources = hs.get_event_sources()
+            self.device_handler = hs.get_device_handler()
 
         # We only need to poke the federation sender explicitly if its on the
         # same instance. Other federation sender instances will get notified by
@@ -303,3 +307,103 @@ class DeviceMessageHandler:
                 # Enqueue a new federation transaction to send the new
                 # device messages to each remote destination.
                 self.federation_sender.send_device_messages(destination)
+
+    async def get_events_for_dehydrated_device(
+        self,
+        requester: Requester,
+        device_id: str,
+        since_token: Optional[str],
+        limit: int,
+    ) -> JsonDict:
+        """Fetches up to `limit` events sent to `device_id` starting from `since_token`
+        and returns the new since token. If there are no more messages, returns an empty
+        array.
+
+        Args:
+            requester: the user requesting the messages
+            device_id: ID of the dehydrated device
+            since_token: stream id to start from when fetching messages
+            limit: the number of messages to fetch
+        Returns:
+            A dict containing the to-device messages, as well as a token that the client
+            can provide in the next call to fetch the next batch of messages
+        """
+
+        user_id = requester.user.to_string()
+
+        # only allow fetching messages for the dehydrated device id currently associated
+        # with the user
+        dehydrated_device = await self.device_handler.get_dehydrated_device(user_id)
+        if dehydrated_device is None:
+            raise SynapseError(
+                HTTPStatus.FORBIDDEN,
+                "No dehydrated device exists",
+                Codes.FORBIDDEN,
+            )
+
+        dehydrated_device_id, _ = dehydrated_device
+        if device_id != dehydrated_device_id:
+            raise SynapseError(
+                HTTPStatus.FORBIDDEN,
+                "You may only fetch messages for your dehydrated device",
+                Codes.FORBIDDEN,
+            )
+
+        since_stream_id = 0
+        if since_token:
+            if not since_token.startswith("d"):
+                raise SynapseError(
+                    HTTPStatus.BAD_REQUEST,
+                    "from parameter %r has an invalid format" % (since_token,),
+                    errcode=Codes.INVALID_PARAM,
+                )
+
+            try:
+                since_stream_id = int(since_token[1:])
+            except Exception:
+                raise SynapseError(
+                    HTTPStatus.BAD_REQUEST,
+                    "from parameter %r has an invalid format" % (since_token,),
+                    errcode=Codes.INVALID_PARAM,
+                )
+
+            # if we have a since token, delete any to-device messages before that token
+            # (since we now know that the device has received them)
+            deleted = await self.store.delete_messages_for_device(
+                user_id, device_id, since_stream_id
+            )
+            logger.debug(
+                "Deleted %d to-device messages up to %d for user_id %s device_id %s",
+                deleted,
+                since_stream_id,
+                user_id,
+                device_id,
+            )
+
+        to_token = self.event_sources.get_current_token().to_device_key
+
+        messages, stream_id = await self.store.get_messages_for_device(
+            user_id, device_id, since_stream_id, to_token, limit
+        )
+
+        for message in messages:
+            # Remove the message id before sending to client
+            message_id = message.pop("message_id", None)
+            if message_id:
+                set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
+
+        logger.debug(
+            "Returning %d to-device messages between %d and %d (current token: %d) for "
+            "dehydrated device %s, user_id %s",
+            len(messages),
+            since_stream_id,
+            stream_id,
+            to_token,
+            device_id,
+            user_id,
+        )
+
+        return {
+            "events": messages,
+            "next_batch": f"d{stream_id}",
+        }
diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
index 3e37c0cbe2..82a7617a08 100644
--- a/synapse/handlers/event_auth.py
+++ b/synapse/handlers/event_auth.py
@@ -277,7 +277,7 @@ class EventAuthHandler:
             True if the proper room version and join rules are set for restricted access.
         """
         # This only applies to room versions which support the new join rule.
-        if not room_version.msc3083_join_rules:
+        if not room_version.restricted_join_rule:
             return False
 
         # If there's no join rule, then it defaults to invite (so this doesn't apply).
@@ -292,7 +292,7 @@ class EventAuthHandler:
             return True
 
         # also check for MSC3787 behaviour
-        if room_version.msc3787_knock_restricted_join_rule:
+        if room_version.knock_restricted_join_rule:
             return content_join_rule == JoinRules.KNOCK_RESTRICTED
 
         return False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f0f61dbdb0..58cbf9be13 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -959,7 +959,7 @@ class FederationHandler:
         # Note that this requires the /send_join request to come back to the
         # same server.
         prev_event_ids = None
-        if room_version.msc3083_join_rules:
+        if room_version.restricted_join_rule:
             # Note that the room's state can change out from under us and render our
             # nice join rules-conformant event non-conformant by the time we build the
             # event. When this happens, our validation at the end fails and we respond
@@ -1582,9 +1582,7 @@ class FederationHandler:
             event.content["third_party_invite"]["signed"]["token"],
         )
         original_invite = None
-        prev_state_ids = await context.get_prev_state_ids(
-            StateFilter.from_types([(EventTypes.ThirdPartyInvite, None)])
-        )
+        prev_state_ids = await context.get_prev_state_ids(StateFilter.from_types([key]))
         original_invite_id = prev_state_ids.get(key)
         if original_invite_id:
             original_invite = await self.store.get_event(
@@ -1637,7 +1635,7 @@ class FederationHandler:
         token = signed["token"]
 
         prev_state_ids = await context.get_prev_state_ids(
-            StateFilter.from_types([(EventTypes.ThirdPartyInvite, None)])
+            StateFilter.from_types([(EventTypes.ThirdPartyInvite, token)])
         )
         invite_event_id = prev_state_ids.get((EventTypes.ThirdPartyInvite, token))
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4292b47037..fff0b5fa12 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -738,7 +738,7 @@ class EventCreationHandler:
                 prev_event_id = state_map.get((EventTypes.Member, event.sender))
             else:
                 prev_state_ids = await unpersisted_context.get_prev_state_ids(
-                    StateFilter.from_types([(EventTypes.Member, None)])
+                    StateFilter.from_types([(EventTypes.Member, event.sender)])
                 )
                 prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
             prev_event = (
@@ -860,7 +860,7 @@ class EventCreationHandler:
             return None
 
         prev_state_ids = await context.get_prev_state_ids(
-            StateFilter.from_types([(event.type, None)])
+            StateFilter.from_types([(event.type, event.state_key)])
         )
         prev_event_id = prev_state_ids.get((event.type, event.state_key))
         if not prev_event_id:
@@ -1565,12 +1565,11 @@ class EventCreationHandler:
                 if state_entry.state_group in self._external_cache_joined_hosts_updates:
                     return
 
-                state = await state_entry.get_state(
-                    self._storage_controllers.state, StateFilter.all()
-                )
                 with opentracing.start_active_span("get_joined_hosts"):
-                    joined_hosts = await self.store.get_joined_hosts(
-                        event.room_id, state, state_entry
+                    joined_hosts = (
+                        await self._storage_controllers.state.get_joined_hosts(
+                            event.room_id, state_entry
+                        )
                     )
 
                 # Note that the expiry times must be larger than the expiry time in
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 0a219b7962..cd7df0525f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -95,13 +95,12 @@ bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time",
 get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
 
 notify_reason_counter = Counter(
-    "synapse_handler_presence_notify_reason", "", ["reason"]
+    "synapse_handler_presence_notify_reason", "", ["locality", "reason"]
 )
 state_transition_counter = Counter(
-    "synapse_handler_presence_state_transition", "", ["from", "to"]
+    "synapse_handler_presence_state_transition", "", ["locality", "from", "to"]
 )
 
-
 # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
 # "currently_active"
 LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -567,8 +566,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
         for new_state in states:
             old_state = self.user_to_current_state.get(new_state.user_id)
             self.user_to_current_state[new_state.user_id] = new_state
-
-            if not old_state or should_notify(old_state, new_state):
+            is_mine = self.is_mine_id(new_state.user_id)
+            if not old_state or should_notify(old_state, new_state, is_mine):
                 state_to_notify.append(new_state)
 
         stream_id = token
@@ -1499,23 +1498,31 @@ class PresenceHandler(BasePresenceHandler):
             )
 
 
-def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
+def should_notify(
+    old_state: UserPresenceState, new_state: UserPresenceState, is_mine: bool
+) -> bool:
     """Decides if a presence state change should be sent to interested parties."""
+    user_location = "remote"
+    if is_mine:
+        user_location = "local"
+
     if old_state == new_state:
         return False
 
     if old_state.status_msg != new_state.status_msg:
-        notify_reason_counter.labels("status_msg_change").inc()
+        notify_reason_counter.labels(user_location, "status_msg_change").inc()
         return True
 
     if old_state.state != new_state.state:
-        notify_reason_counter.labels("state_change").inc()
-        state_transition_counter.labels(old_state.state, new_state.state).inc()
+        notify_reason_counter.labels(user_location, "state_change").inc()
+        state_transition_counter.labels(
+            user_location, old_state.state, new_state.state
+        ).inc()
         return True
 
     if old_state.state == PresenceState.ONLINE:
         if new_state.currently_active != old_state.currently_active:
-            notify_reason_counter.labels("current_active_change").inc()
+            notify_reason_counter.labels(user_location, "current_active_change").inc()
             return True
 
         if (
@@ -1524,12 +1531,16 @@ def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) ->
         ):
             # Only notify about last active bumps if we're not currently active
             if not new_state.currently_active:
-                notify_reason_counter.labels("last_active_change_online").inc()
+                notify_reason_counter.labels(
+                    user_location, "last_active_change_online"
+                ).inc()
                 return True
 
     elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
         # Always notify for a transition where last active gets bumped.
-        notify_reason_counter.labels("last_active_change_not_online").inc()
+        notify_reason_counter.labels(
+            user_location, "last_active_change_not_online"
+        ).inc()
         return True
 
     return False
@@ -1989,7 +2000,7 @@ def handle_update(
         )
 
     # Check whether the change was something worth notifying about
-    if should_notify(prev_state, new_state):
+    if should_notify(prev_state, new_state, is_mine):
         new_state = new_state.copy_and_replace(last_federation_update_ts=now)
         persist_and_notify = True
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index bf907b7881..0513e28aab 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1116,7 +1116,7 @@ class RoomCreationHandler:
         preset_config, config = self._room_preset_config(room_config)
 
         # MSC2175 removes the creator field from the create event.
-        if not room_version.msc2175_implicit_room_creator:
+        if not room_version.implicit_room_creator:
             creation_content["creator"] = creator_id
         creation_event, unpersisted_creation_context = await create_event(
             EventTypes.Create, creation_content, False
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 82e4fa7363..496e701f13 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -473,7 +473,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 )
                 context = await unpersisted_context.persist(event)
                 prev_state_ids = await context.get_prev_state_ids(
-                    StateFilter.from_types([(EventTypes.Member, None)])
+                    StateFilter.from_types([(EventTypes.Member, user_id)])
                 )
 
                 prev_member_event_id = prev_state_ids.get(
@@ -1340,7 +1340,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             requester = types.create_requester(target_user)
 
         prev_state_ids = await context.get_prev_state_ids(
-            StateFilter.from_types([(EventTypes.GuestAccess, None)])
+            StateFilter.from_types([(EventTypes.GuestAccess, "")])
         )
         if event.membership == Membership.JOIN:
             if requester.is_guest:
@@ -1362,11 +1362,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             ratelimit=ratelimit,
         )
 
-        prev_member_event_id = prev_state_ids.get(
-            (EventTypes.Member, event.state_key), None
-        )
-
         if event.membership == Membership.LEAVE:
+            prev_state_ids = await context.get_prev_state_ids(
+                StateFilter.from_types([(EventTypes.Member, event.state_key)])
+            )
+            prev_member_event_id = prev_state_ids.get(
+                (EventTypes.Member, event.state_key), None
+            )
+
             if prev_member_event_id:
                 prev_member_event = await self.store.get_event(prev_member_event_id)
                 if prev_member_event.membership == Membership.JOIN:
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 807245160d..dad3e23470 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -564,9 +564,9 @@ class RoomSummaryHandler:
             join_rule = join_rules_event.content.get("join_rule")
             if (
                 join_rule == JoinRules.PUBLIC
-                or (room_version.msc2403_knocking and join_rule == JoinRules.KNOCK)
+                or (room_version.knock_join_rule and join_rule == JoinRules.KNOCK)
                 or (
-                    room_version.msc3787_knock_restricted_join_rule
+                    room_version.knock_restricted_join_rule
                     and join_rule == JoinRules.KNOCK_RESTRICTED
                 )
             ):
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 09ea93e10d..ca2cdbc6e2 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -1037,7 +1037,12 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
         if reason.check(ResponseDone):
             self.deferred.callback(self.length)
         elif reason.check(PotentialDataLoss):
-            # stolen from https://github.com/twisted/treq/pull/49/files
+            # This applies to requests which don't set `Content-Length` or a
+            # `Transfer-Encoding` in the response because in this case the end of the
+            # response is indicated by the connection being closed, an event which may
+            # also be due to a transient network problem or other error. But since this
+            # behavior is expected of some servers (like YouTube), let's ignore it.
+            # Stolen from https://github.com/twisted/treq/pull/49/files
             # http://twistedmatrix.com/trac/ticket/4840
             self.deferred.callback(self.length)
         else:
diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py
index 23a60af171..636efc33e8 100644
--- a/synapse/http/connectproxyclient.py
+++ b/synapse/http/connectproxyclient.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import abc
 import base64
 import logging
 from typing import Optional, Union
@@ -39,8 +40,14 @@ class ProxyConnectError(ConnectError):
     pass
 
 
-@attr.s(auto_attribs=True)
 class ProxyCredentials:
+    @abc.abstractmethod
+    def as_proxy_authorization_value(self) -> bytes:
+        raise NotImplementedError()
+
+
+@attr.s(auto_attribs=True)
+class BasicProxyCredentials(ProxyCredentials):
     username_password: bytes
 
     def as_proxy_authorization_value(self) -> bytes:
@@ -55,6 +62,17 @@ class ProxyCredentials:
         return b"Basic " + base64.encodebytes(self.username_password)
 
 
+@attr.s(auto_attribs=True)
+class BearerProxyCredentials(ProxyCredentials):
+    access_token: bytes
+
+    def as_proxy_authorization_value(self) -> bytes:
+        """
+        Return the value for a Proxy-Authorization header (i.e. 'Bearer xxx').
+        """
+        return b"Bearer " + self.access_token
+
+
 @implementer(IStreamClientEndpoint)
 class HTTPConnectProxyEndpoint:
     """An Endpoint implementation which will send a CONNECT request to an http proxy
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b973246adc..9c18512813 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime
 from twisted.internet.task import Cooperator
 from twisted.web.client import ResponseFailed
 from twisted.web.http_headers import Headers
-from twisted.web.iweb import IBodyProducer, IResponse
+from twisted.web.iweb import IAgent, IBodyProducer, IResponse
 
 import synapse.metrics
 import synapse.util.retryutils
@@ -71,7 +71,9 @@ from synapse.http.client import (
     encode_query_args,
     read_body_with_max_size,
 )
+from synapse.http.connectproxyclient import BearerProxyCredentials
 from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
+from synapse.http.proxyagent import ProxyAgent
 from synapse.http.types import QueryParams
 from synapse.logging import opentracing
 from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -393,17 +395,41 @@ class MatrixFederationHttpClient:
         if hs.config.server.user_agent_suffix:
             user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
 
-        federation_agent = MatrixFederationAgent(
-            self.reactor,
-            tls_client_options_factory,
-            user_agent.encode("ascii"),
-            hs.config.server.federation_ip_range_allowlist,
-            hs.config.server.federation_ip_range_blocklist,
+        outbound_federation_restricted_to = (
+            hs.config.worker.outbound_federation_restricted_to
         )
+        if hs.get_instance_name() in outbound_federation_restricted_to:
+            # Talk to federation directly
+            federation_agent: IAgent = MatrixFederationAgent(
+                self.reactor,
+                tls_client_options_factory,
+                user_agent.encode("ascii"),
+                hs.config.server.federation_ip_range_allowlist,
+                hs.config.server.federation_ip_range_blocklist,
+            )
+        else:
+            proxy_authorization_secret = hs.config.worker.worker_replication_secret
+            assert (
+                proxy_authorization_secret is not None
+            ), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)"
+            federation_proxy_credentials = BearerProxyCredentials(
+                proxy_authorization_secret.encode("ascii")
+            )
+
+            # We need to talk to federation via the proxy via one of the configured
+            # locations
+            federation_proxy_locations = outbound_federation_restricted_to.locations
+            federation_agent = ProxyAgent(
+                self.reactor,
+                self.reactor,
+                tls_client_options_factory,
+                federation_proxy_locations=federation_proxy_locations,
+                federation_proxy_credentials=federation_proxy_credentials,
+            )
 
         # Use a BlocklistingAgentWrapper to prevent circumventing the IP
         # blocking via IP literals in server names
-        self.agent = BlocklistingAgentWrapper(
+        self.agent: IAgent = BlocklistingAgentWrapper(
             federation_agent,
             ip_blocklist=hs.config.server.federation_ip_range_blocklist,
         )
@@ -412,7 +438,6 @@ class MatrixFederationHttpClient:
         self._store = hs.get_datastores().main
         self.version_string_bytes = hs.version_string.encode("ascii")
         self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
-
         self.max_long_retry_delay_seconds = (
             hs.config.federation.max_long_retry_delay_ms / 1000
         )
@@ -1176,6 +1201,101 @@ class MatrixFederationHttpClient:
             RequestSendFailed: If there were problems connecting to the
                 remote, due to e.g. DNS failures, connection timeouts etc.
         """
+        json_dict, _ = await self.get_json_with_headers(
+            destination=destination,
+            path=path,
+            args=args,
+            retry_on_dns_fail=retry_on_dns_fail,
+            timeout=timeout,
+            ignore_backoff=ignore_backoff,
+            try_trailing_slash_on_400=try_trailing_slash_on_400,
+            parser=parser,
+        )
+        return json_dict
+
+    @overload
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = None,
+        retry_on_dns_fail: bool = True,
+        timeout: Optional[int] = None,
+        ignore_backoff: bool = False,
+        try_trailing_slash_on_400: bool = False,
+        parser: Literal[None] = None,
+    ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
+        ...
+
+    @overload
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = ...,
+        retry_on_dns_fail: bool = ...,
+        timeout: Optional[int] = ...,
+        ignore_backoff: bool = ...,
+        try_trailing_slash_on_400: bool = ...,
+        parser: ByteParser[T] = ...,
+    ) -> Tuple[T, Dict[bytes, List[bytes]]]:
+        ...
+
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = None,
+        retry_on_dns_fail: bool = True,
+        timeout: Optional[int] = None,
+        ignore_backoff: bool = False,
+        try_trailing_slash_on_400: bool = False,
+        parser: Optional[ByteParser[T]] = None,
+    ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
+        """GETs some json from the given host homeserver and path
+
+        Args:
+            destination: The remote server to send the HTTP request to.
+
+            path: The HTTP path.
+
+            args: A dictionary used to create query strings, defaults to
+                None.
+
+            retry_on_dns_fail: true if the request should be retried on DNS failures
+
+            timeout: number of milliseconds to wait for the response.
+                self._default_timeout (60s) by default.
+
+                Note that we may make several attempts to send the request; this
+                timeout applies to the time spent waiting for response headers for
+                *each* attempt (including connection time) as well as the time spent
+                reading the response body after a 200 response.
+
+            ignore_backoff: true to ignore the historical backoff data
+                and try the request anyway.
+
+            try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
+                response we should try appending a trailing slash to the end of
+                the request. Workaround for #3622 in Synapse <= v0.99.3.
+
+            parser: The parser to use to decode the response. Defaults to
+                parsing as JSON.
+
+        Returns:
+            Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
+            decoded JSON body and a dict of the response headers.
+
+        Raises:
+            HttpResponseException: If we get an HTTP response code >= 300
+                (except 429).
+            NotRetryingDestination: If we are not yet ready to retry this
+                server.
+            FederationDeniedError: If this destination is not on our
+                federation whitelist
+            RequestSendFailed: If there were problems connecting to the
+                remote, due to e.g. DNS failures, connection timeouts etc.
+        """
         request = MatrixFederationRequest(
             method="GET", destination=destination, path=path, query=args
         )
@@ -1191,6 +1311,8 @@ class MatrixFederationHttpClient:
             timeout=timeout,
         )
 
+        headers = dict(response.headers.getAllRawHeaders())
+
         if timeout is not None:
             _sec_timeout = timeout / 1000
         else:
@@ -1208,7 +1330,7 @@ class MatrixFederationHttpClient:
             parser=parser,
         )
 
-        return body
+        return body, headers
 
     async def delete_json(
         self,
diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py
new file mode 100644
index 0000000000..c9f51e51bc
--- /dev/null
+++ b/synapse/http/proxy.py
@@ -0,0 +1,283 @@
+#  Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+import json
+import logging
+import urllib.parse
+from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast
+
+from twisted.internet import protocol
+from twisted.internet.interfaces import ITCPTransport
+from twisted.internet.protocol import connectionDone
+from twisted.python import failure
+from twisted.python.failure import Failure
+from twisted.web.client import ResponseDone
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import IResponse
+from twisted.web.resource import IResource
+from twisted.web.server import Request, Site
+
+from synapse.api.errors import Codes, InvalidProxyCredentialsError
+from synapse.http import QuieterFileBodyProducer
+from synapse.http.server import _AsyncResource
+from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.types import ISynapseReactor
+from synapse.util.async_helpers import timeout_deferred
+
+if TYPE_CHECKING:
+    from synapse.http.site import SynapseRequest
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616
+# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be
+# consumed by the immediate recipient and not be forwarded on.
+HOP_BY_HOP_HEADERS = {
+    "Connection",
+    "Keep-Alive",
+    "Proxy-Authenticate",
+    "Proxy-Authorization",
+    "TE",
+    "Trailers",
+    "Transfer-Encoding",
+    "Upgrade",
+}
+
+
+def parse_connection_header_value(
+    connection_header_value: Optional[bytes],
+) -> Set[str]:
+    """
+    Parse the `Connection` header to determine which headers we should not be copied
+    over from the remote response.
+
+    As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1
+
+    Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}`
+
+    Even though "close" is a special directive, let's just treat it as just another
+    header for simplicity. If people want to check for this directive, they can simply
+    check for `"Close" in headers`.
+
+    Args:
+        connection_header_value: The value of the `Connection` header.
+
+    Returns:
+        The set of header names that should not be copied over from the remote response.
+        The keys are capitalized in canonical capitalization.
+    """
+    headers = Headers()
+    extra_headers_to_remove: Set[str] = set()
+    if connection_header_value:
+        extra_headers_to_remove = {
+            headers._canonicalNameCaps(connection_option.strip()).decode("ascii")
+            for connection_option in connection_header_value.split(b",")
+        }
+
+    return extra_headers_to_remove
+
+
+class ProxyResource(_AsyncResource):
+    """
+    A stub resource that proxies any requests with a `matrix-federation://` scheme
+    through the given `federation_agent` to the remote homeserver and ferries back the
+    info.
+    """
+
+    isLeaf = True
+
+    def __init__(self, reactor: ISynapseReactor, hs: "HomeServer"):
+        super().__init__(True)
+
+        self.reactor = reactor
+        self.agent = hs.get_federation_http_client().agent
+
+        self._proxy_authorization_secret = hs.config.worker.worker_replication_secret
+
+    def _check_auth(self, request: Request) -> None:
+        # The `matrix-federation://` proxy functionality can only be used with auth.
+        # Protect homserver admins forgetting to configure a secret.
+        assert self._proxy_authorization_secret is not None
+
+        # Get the authorization header.
+        auth_headers = request.requestHeaders.getRawHeaders(b"Proxy-Authorization")
+
+        if not auth_headers:
+            raise InvalidProxyCredentialsError(
+                "Missing Proxy-Authorization header.", Codes.MISSING_TOKEN
+            )
+        if len(auth_headers) > 1:
+            raise InvalidProxyCredentialsError(
+                "Too many Proxy-Authorization headers.", Codes.UNAUTHORIZED
+            )
+        parts = auth_headers[0].split(b" ")
+        if parts[0] == b"Bearer" and len(parts) == 2:
+            received_secret = parts[1].decode("ascii")
+            if self._proxy_authorization_secret == received_secret:
+                # Success!
+                return
+
+        raise InvalidProxyCredentialsError(
+            "Invalid Proxy-Authorization header.", Codes.UNAUTHORIZED
+        )
+
+    async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
+        uri = urllib.parse.urlparse(request.uri)
+        assert uri.scheme == b"matrix-federation"
+
+        # Check the authorization headers before handling the request.
+        self._check_auth(request)
+
+        headers = Headers()
+        for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
+            header_value = request.getHeader(header_name)
+            if header_value:
+                headers.addRawHeader(header_name, header_value)
+
+        request_deferred = run_in_background(
+            self.agent.request,
+            request.method,
+            request.uri,
+            headers=headers,
+            bodyProducer=QuieterFileBodyProducer(request.content),
+        )
+        request_deferred = timeout_deferred(
+            request_deferred,
+            # This should be set longer than the timeout in `MatrixFederationHttpClient`
+            # so that it has enough time to complete and pass us the data before we give
+            # up.
+            timeout=90,
+            reactor=self.reactor,
+        )
+
+        response = await make_deferred_yieldable(request_deferred)
+
+        return response.code, response
+
+    def _send_response(
+        self,
+        request: "SynapseRequest",
+        code: int,
+        response_object: Any,
+    ) -> None:
+        response = cast(IResponse, response_object)
+        response_headers = cast(Headers, response.headers)
+
+        request.setResponseCode(code)
+
+        # The `Connection` header also defines which headers should not be copied over.
+        connection_header = response_headers.getRawHeaders(b"connection")
+        extra_headers_to_remove = parse_connection_header_value(
+            connection_header[0] if connection_header else None
+        )
+
+        # Copy headers.
+        for k, v in response_headers.getAllRawHeaders():
+            # Do not copy over any hop-by-hop headers. These are meant to only be
+            # consumed by the immediate recipient and not be forwarded on.
+            header_key = k.decode("ascii")
+            if (
+                header_key in HOP_BY_HOP_HEADERS
+                or header_key in extra_headers_to_remove
+            ):
+                continue
+
+            request.responseHeaders.setRawHeaders(k, v)
+
+        response.deliverBody(_ProxyResponseBody(request))
+
+    def _send_error_response(
+        self,
+        f: failure.Failure,
+        request: "SynapseRequest",
+    ) -> None:
+        if isinstance(f.value, InvalidProxyCredentialsError):
+            error_response_code = f.value.code
+            error_response_json = {"errcode": f.value.errcode, "err": f.value.msg}
+        else:
+            error_response_code = 502
+            error_response_json = {
+                "errcode": Codes.UNKNOWN,
+                "err": "ProxyResource: Error when proxying request: %s %s -> %s"
+                % (
+                    request.method.decode("ascii"),
+                    request.uri.decode("ascii"),
+                    f,
+                ),
+            }
+
+        request.setResponseCode(error_response_code)
+        request.setHeader(b"Content-Type", b"application/json")
+        request.write((json.dumps(error_response_json)).encode())
+        request.finish()
+
+
+class _ProxyResponseBody(protocol.Protocol):
+    """
+    A protocol that proxies the given remote response data back out to the given local
+    request.
+    """
+
+    transport: Optional[ITCPTransport] = None
+
+    def __init__(self, request: "SynapseRequest") -> None:
+        self._request = request
+
+    def dataReceived(self, data: bytes) -> None:
+        # Avoid sending response data to the local request that already disconnected
+        if self._request._disconnected and self.transport is not None:
+            # Close the connection (forcefully) since all the data will get
+            # discarded anyway.
+            self.transport.abortConnection()
+            return
+
+        self._request.write(data)
+
+    def connectionLost(self, reason: Failure = connectionDone) -> None:
+        # If the local request is already finished (successfully or failed), don't
+        # worry about sending anything back.
+        if self._request.finished:
+            return
+
+        if reason.check(ResponseDone):
+            self._request.finish()
+        else:
+            # Abort the underlying request since our remote request also failed.
+            self._request.transport.abortConnection()
+
+
+class ProxySite(Site):
+    """
+    Proxies any requests with a `matrix-federation://` scheme through the given
+    `federation_agent`. Otherwise, behaves like a normal `Site`.
+    """
+
+    def __init__(
+        self,
+        resource: IResource,
+        reactor: ISynapseReactor,
+        hs: "HomeServer",
+    ):
+        super().__init__(resource, reactor=reactor)
+
+        self._proxy_resource = ProxyResource(reactor, hs=hs)
+
+    def getResourceFor(self, request: "SynapseRequest") -> IResource:
+        uri = urllib.parse.urlparse(request.uri)
+        if uri.scheme == b"matrix-federation":
+            return self._proxy_resource
+
+        return super().getResourceFor(request)
diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py
index 7bdc4acae7..59ab8fad35 100644
--- a/synapse/http/proxyagent.py
+++ b/synapse/http/proxyagent.py
@@ -12,8 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import random
 import re
-from typing import Any, Dict, Optional, Tuple
+from typing import Any, Collection, Dict, List, Optional, Sequence, Tuple
 from urllib.parse import urlparse
 from urllib.request import (  # type: ignore[attr-defined]
     getproxies_environment,
@@ -23,8 +24,17 @@ from urllib.request import (  # type: ignore[attr-defined]
 from zope.interface import implementer
 
 from twisted.internet import defer
-from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.internet.interfaces import IReactorCore, IStreamClientEndpoint
+from twisted.internet.endpoints import (
+    HostnameEndpoint,
+    UNIXClientEndpoint,
+    wrapClientTLS,
+)
+from twisted.internet.interfaces import (
+    IProtocol,
+    IProtocolFactory,
+    IReactorCore,
+    IStreamClientEndpoint,
+)
 from twisted.python.failure import Failure
 from twisted.web.client import (
     URI,
@@ -36,8 +46,18 @@ from twisted.web.error import SchemeNotSupported
 from twisted.web.http_headers import Headers
 from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse
 
+from synapse.config.workers import (
+    InstanceLocationConfig,
+    InstanceTcpLocationConfig,
+    InstanceUnixLocationConfig,
+)
 from synapse.http import redact_uri
-from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials
+from synapse.http.connectproxyclient import (
+    BasicProxyCredentials,
+    HTTPConnectProxyEndpoint,
+    ProxyCredentials,
+)
+from synapse.logging.context import run_in_background
 
 logger = logging.getLogger(__name__)
 
@@ -74,6 +94,14 @@ class ProxyAgent(_AgentBase):
         use_proxy: Whether proxy settings should be discovered and used
             from conventional environment variables.
 
+        federation_proxy_locations: An optional list of locations to proxy outbound federation
+            traffic through (only requests that use the `matrix-federation://` scheme
+            will be proxied).
+
+        federation_proxy_credentials: Required if `federation_proxy_locations` is set. The
+            credentials to use when proxying outbound federation traffic through another
+            worker.
+
     Raises:
         ValueError if use_proxy is set and the environment variables
             contain an invalid proxy specification.
@@ -89,6 +117,8 @@ class ProxyAgent(_AgentBase):
         bindAddress: Optional[bytes] = None,
         pool: Optional[HTTPConnectionPool] = None,
         use_proxy: bool = False,
+        federation_proxy_locations: Collection[InstanceLocationConfig] = (),
+        federation_proxy_credentials: Optional[ProxyCredentials] = None,
     ):
         contextFactory = contextFactory or BrowserLikePolicyForHTTPS()
 
@@ -127,6 +157,47 @@ class ProxyAgent(_AgentBase):
         self._policy_for_https = contextFactory
         self._reactor = reactor
 
+        self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None
+        self._federation_proxy_credentials: Optional[ProxyCredentials] = None
+        if federation_proxy_locations:
+            assert (
+                federation_proxy_credentials is not None
+            ), "`federation_proxy_credentials` are required when using `federation_proxy_locations`"
+
+            endpoints: List[IStreamClientEndpoint] = []
+            for federation_proxy_location in federation_proxy_locations:
+                endpoint: IStreamClientEndpoint
+                if isinstance(federation_proxy_location, InstanceTcpLocationConfig):
+                    endpoint = HostnameEndpoint(
+                        self.proxy_reactor,
+                        federation_proxy_location.host,
+                        federation_proxy_location.port,
+                    )
+                    if federation_proxy_location.tls:
+                        tls_connection_creator = (
+                            self._policy_for_https.creatorForNetloc(
+                                federation_proxy_location.host.encode("utf-8"),
+                                federation_proxy_location.port,
+                            )
+                        )
+                        endpoint = wrapClientTLS(tls_connection_creator, endpoint)
+
+                elif isinstance(federation_proxy_location, InstanceUnixLocationConfig):
+                    endpoint = UNIXClientEndpoint(
+                        self.proxy_reactor, federation_proxy_location.path
+                    )
+
+                else:
+                    # It is supremely unlikely we ever hit this
+                    raise SchemeNotSupported(
+                        f"Unknown type of Endpoint requested, check {federation_proxy_location}"
+                    )
+
+                endpoints.append(endpoint)
+
+            self._federation_proxy_endpoint = _RandomSampleEndpoints(endpoints)
+            self._federation_proxy_credentials = federation_proxy_credentials
+
     def request(
         self,
         method: bytes,
@@ -214,6 +285,25 @@ class ProxyAgent(_AgentBase):
                 parsed_uri.port,
                 self.https_proxy_creds,
             )
+        elif (
+            parsed_uri.scheme == b"matrix-federation"
+            and self._federation_proxy_endpoint
+        ):
+            assert (
+                self._federation_proxy_credentials is not None
+            ), "`federation_proxy_credentials` are required when using `federation_proxy_locations`"
+
+            # Set a Proxy-Authorization header
+            if headers is None:
+                headers = Headers()
+            # We always need authentication for the outbound federation proxy
+            headers.addRawHeader(
+                b"Proxy-Authorization",
+                self._federation_proxy_credentials.as_proxy_authorization_value(),
+            )
+
+            endpoint = self._federation_proxy_endpoint
+            request_path = uri
         else:
             # not using a proxy
             endpoint = HostnameEndpoint(
@@ -233,6 +323,11 @@ class ProxyAgent(_AgentBase):
             endpoint = wrapClientTLS(tls_connection_creator, endpoint)
         elif parsed_uri.scheme == b"http":
             pass
+        elif (
+            parsed_uri.scheme == b"matrix-federation"
+            and self._federation_proxy_endpoint
+        ):
+            pass
         else:
             return defer.fail(
                 Failure(
@@ -334,6 +429,42 @@ def parse_proxy(
 
     credentials = None
     if url.username and url.password:
-        credentials = ProxyCredentials(b"".join([url.username, b":", url.password]))
+        credentials = BasicProxyCredentials(
+            b"".join([url.username, b":", url.password])
+        )
 
     return url.scheme, url.hostname, url.port or default_port, credentials
+
+
+@implementer(IStreamClientEndpoint)
+class _RandomSampleEndpoints:
+    """An endpoint that randomly iterates through a given list of endpoints at
+    each connection attempt.
+    """
+
+    def __init__(
+        self,
+        endpoints: Sequence[IStreamClientEndpoint],
+    ) -> None:
+        assert endpoints
+        self._endpoints = endpoints
+
+    def __repr__(self) -> str:
+        return f"<_RandomSampleEndpoints endpoints={self._endpoints}>"
+
+    def connect(
+        self, protocol_factory: IProtocolFactory
+    ) -> "defer.Deferred[IProtocol]":
+        """Implements IStreamClientEndpoint interface"""
+
+        return run_in_background(self._do_connect, protocol_factory)
+
+    async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
+        failures: List[Failure] = []
+        for endpoint in random.sample(self._endpoints, k=len(self._endpoints)):
+            try:
+                return await endpoint.connect(protocol_factory)
+            except Exception:
+                failures.append(Failure())
+
+        failures.pop().raiseException()
diff --git a/synapse/http/server.py b/synapse/http/server.py
index e411ac7e62..5109cec983 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -18,6 +18,7 @@ import html
 import logging
 import types
 import urllib
+import urllib.parse
 from http import HTTPStatus
 from http.client import FOUND
 from inspect import isawaitable
@@ -65,7 +66,6 @@ from synapse.api.errors import (
     UnrecognizedRequestError,
 )
 from synapse.config.homeserver import HomeServerConfig
-from synapse.http.site import SynapseRequest
 from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
 from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
 from synapse.util import json_encoder
@@ -76,6 +76,7 @@ from synapse.util.iterutils import chunk_seq
 if TYPE_CHECKING:
     import opentracing
 
+    from synapse.http.site import SynapseRequest
     from synapse.server import HomeServer
 
 logger = logging.getLogger(__name__)
@@ -102,7 +103,7 @@ HTTP_STATUS_REQUEST_CANCELLED = 499
 
 
 def return_json_error(
-    f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig]
+    f: failure.Failure, request: "SynapseRequest", config: Optional[HomeServerConfig]
 ) -> None:
     """Sends a JSON error response to clients."""
 
@@ -220,8 +221,8 @@ def return_html_error(
 
 
 def wrap_async_request_handler(
-    h: Callable[["_AsyncResource", SynapseRequest], Awaitable[None]]
-) -> Callable[["_AsyncResource", SynapseRequest], "defer.Deferred[None]"]:
+    h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]]
+) -> Callable[["_AsyncResource", "SynapseRequest"], "defer.Deferred[None]"]:
     """Wraps an async request handler so that it calls request.processing.
 
     This helps ensure that work done by the request handler after the request is completed
@@ -235,7 +236,7 @@ def wrap_async_request_handler(
     """
 
     async def wrapped_async_request_handler(
-        self: "_AsyncResource", request: SynapseRequest
+        self: "_AsyncResource", request: "SynapseRequest"
     ) -> None:
         with request.processing():
             await h(self, request)
@@ -300,7 +301,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
 
         self._extract_context = extract_context
 
-    def render(self, request: SynapseRequest) -> int:
+    def render(self, request: "SynapseRequest") -> int:
         """This gets called by twisted every time someone sends us a request."""
         request.render_deferred = defer.ensureDeferred(
             self._async_render_wrapper(request)
@@ -308,7 +309,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
         return NOT_DONE_YET
 
     @wrap_async_request_handler
-    async def _async_render_wrapper(self, request: SynapseRequest) -> None:
+    async def _async_render_wrapper(self, request: "SynapseRequest") -> None:
         """This is a wrapper that delegates to `_async_render` and handles
         exceptions, return values, metrics, etc.
         """
@@ -328,7 +329,9 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
             f = failure.Failure()
             self._send_error_response(f, request)
 
-    async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, Any]]:
+    async def _async_render(
+        self, request: "SynapseRequest"
+    ) -> Optional[Tuple[int, Any]]:
         """Delegates to `_async_render_<METHOD>` methods, or returns a 400 if
         no appropriate method exists. Can be overridden in sub classes for
         different routing.
@@ -358,7 +361,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
     @abc.abstractmethod
     def _send_response(
         self,
-        request: SynapseRequest,
+        request: "SynapseRequest",
         code: int,
         response_object: Any,
     ) -> None:
@@ -368,7 +371,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
     def _send_error_response(
         self,
         f: failure.Failure,
-        request: SynapseRequest,
+        request: "SynapseRequest",
     ) -> None:
         raise NotImplementedError()
 
@@ -384,7 +387,7 @@ class DirectServeJsonResource(_AsyncResource):
 
     def _send_response(
         self,
-        request: SynapseRequest,
+        request: "SynapseRequest",
         code: int,
         response_object: Any,
     ) -> None:
@@ -401,7 +404,7 @@ class DirectServeJsonResource(_AsyncResource):
     def _send_error_response(
         self,
         f: failure.Failure,
-        request: SynapseRequest,
+        request: "SynapseRequest",
     ) -> None:
         """Implements _AsyncResource._send_error_response"""
         return_json_error(f, request, None)
@@ -473,7 +476,7 @@ class JsonResource(DirectServeJsonResource):
             )
 
     def _get_handler_for_request(
-        self, request: SynapseRequest
+        self, request: "SynapseRequest"
     ) -> Tuple[ServletCallback, str, Dict[str, str]]:
         """Finds a callback method to handle the given request.
 
@@ -503,7 +506,7 @@ class JsonResource(DirectServeJsonResource):
         # Huh. No one wanted to handle that? Fiiiiiine.
         raise UnrecognizedRequestError(code=404)
 
-    async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]:
+    async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
         callback, servlet_classname, group_dict = self._get_handler_for_request(request)
 
         request.is_render_cancellable = is_function_cancellable(callback)
@@ -535,7 +538,7 @@ class JsonResource(DirectServeJsonResource):
     def _send_error_response(
         self,
         f: failure.Failure,
-        request: SynapseRequest,
+        request: "SynapseRequest",
     ) -> None:
         """Implements _AsyncResource._send_error_response"""
         return_json_error(f, request, self.hs.config)
@@ -551,7 +554,7 @@ class DirectServeHtmlResource(_AsyncResource):
 
     def _send_response(
         self,
-        request: SynapseRequest,
+        request: "SynapseRequest",
         code: int,
         response_object: Any,
     ) -> None:
@@ -565,7 +568,7 @@ class DirectServeHtmlResource(_AsyncResource):
     def _send_error_response(
         self,
         f: failure.Failure,
-        request: SynapseRequest,
+        request: "SynapseRequest",
     ) -> None:
         """Implements _AsyncResource._send_error_response"""
         return_html_error(f, request, self.ERROR_TEMPLATE)
@@ -592,7 +595,7 @@ class UnrecognizedRequestResource(resource.Resource):
     errcode of M_UNRECOGNIZED.
     """
 
-    def render(self, request: SynapseRequest) -> int:
+    def render(self, request: "SynapseRequest") -> int:
         f = failure.Failure(UnrecognizedRequestError(code=404))
         return_json_error(f, request, None)
         # A response has already been sent but Twisted requires either NOT_DONE_YET
@@ -622,7 +625,7 @@ class RootRedirect(resource.Resource):
 class OptionsResource(resource.Resource):
     """Responds to OPTION requests for itself and all children."""
 
-    def render_OPTIONS(self, request: SynapseRequest) -> bytes:
+    def render_OPTIONS(self, request: "SynapseRequest") -> bytes:
         request.setResponseCode(204)
         request.setHeader(b"Content-Length", b"0")
 
@@ -737,7 +740,7 @@ def _encode_json_bytes(json_object: object) -> bytes:
 
 
 def respond_with_json(
-    request: SynapseRequest,
+    request: "SynapseRequest",
     code: int,
     json_object: Any,
     send_cors: bool = False,
@@ -787,7 +790,7 @@ def respond_with_json(
 
 
 def respond_with_json_bytes(
-    request: SynapseRequest,
+    request: "SynapseRequest",
     code: int,
     json_bytes: bytes,
     send_cors: bool = False,
@@ -825,7 +828,7 @@ def respond_with_json_bytes(
 
 
 async def _async_write_json_to_request_in_thread(
-    request: SynapseRequest,
+    request: "SynapseRequest",
     json_encoder: Callable[[Any], bytes],
     json_object: Any,
 ) -> None:
@@ -883,7 +886,7 @@ def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None:
     _ByteProducer(request, bytes_generator)
 
 
-def set_cors_headers(request: SynapseRequest) -> None:
+def set_cors_headers(request: "SynapseRequest") -> None:
     """Set the CORS headers so that javascript running in a web browsers can
     use this API
 
@@ -981,7 +984,7 @@ def set_clickjacking_protection_headers(request: Request) -> None:
 
 
 def respond_with_redirect(
-    request: SynapseRequest, url: bytes, statusCode: int = FOUND, cors: bool = False
+    request: "SynapseRequest", url: bytes, statusCode: int = FOUND, cors: bool = False
 ) -> None:
     """
     Write a 302 (or other specified status code) response to the request, if it is still alive.
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 5b5a7c1e59..a388d6cf7f 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -21,25 +21,29 @@ from zope.interface import implementer
 
 from twisted.internet.address import UNIXAddress
 from twisted.internet.defer import Deferred
-from twisted.internet.interfaces import IAddress, IReactorTime
+from twisted.internet.interfaces import IAddress
 from twisted.python.failure import Failure
 from twisted.web.http import HTTPChannel
 from twisted.web.resource import IResource, Resource
-from twisted.web.server import Request, Site
+from twisted.web.server import Request
 
 from synapse.config.server import ListenerConfig
 from synapse.http import get_request_user_agent, redact_uri
+from synapse.http.proxy import ProxySite
 from synapse.http.request_metrics import RequestMetrics, requests_counter
 from synapse.logging.context import (
     ContextRequest,
     LoggingContext,
     PreserveLoggingContext,
 )
-from synapse.types import Requester
+from synapse.types import ISynapseReactor, Requester
 
 if TYPE_CHECKING:
     import opentracing
 
+    from synapse.server import HomeServer
+
+
 logger = logging.getLogger(__name__)
 
 _next_request_seq = 0
@@ -102,7 +106,7 @@ class SynapseRequest(Request):
         # A boolean indicating whether `render_deferred` should be cancelled if the
         # client disconnects early. Expected to be set by the coroutine started by
         # `Resource.render`, if rendering is asynchronous.
-        self.is_render_cancellable = False
+        self.is_render_cancellable: bool = False
 
         global _next_request_seq
         self.request_seq = _next_request_seq
@@ -601,7 +605,7 @@ class _XForwardedForAddress:
     host: str
 
 
-class SynapseSite(Site):
+class SynapseSite(ProxySite):
     """
     Synapse-specific twisted http Site
 
@@ -623,7 +627,8 @@ class SynapseSite(Site):
         resource: IResource,
         server_version_string: str,
         max_request_body_size: int,
-        reactor: IReactorTime,
+        reactor: ISynapseReactor,
+        hs: "HomeServer",
     ):
         """
 
@@ -638,7 +643,11 @@ class SynapseSite(Site):
                 dropping the connection
             reactor: reactor to be used to manage connection timeouts
         """
-        Site.__init__(self, resource, reactor=reactor)
+        super().__init__(
+            resource=resource,
+            reactor=reactor,
+            hs=hs,
+        )
 
         self.site_tag = site_tag
         self.reactor = reactor
@@ -649,7 +658,9 @@ class SynapseSite(Site):
 
         request_id_header = config.http_options.request_id_header
 
-        self.experimental_cors_msc3886 = config.http_options.experimental_cors_msc3886
+        self.experimental_cors_msc3886: bool = (
+            config.http_options.experimental_cors_msc3886
+        )
 
         def request_factory(channel: HTTPChannel, queued: bool) -> Request:
             return request_class(
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 67377c647b..990c079c81 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -375,7 +375,7 @@ class BulkPushRuleEvaluator:
         # _get_power_levels_and_sender_level in its call to get_user_power_level
         # (even for room V10.)
         notification_levels = power_levels.get("notifications", {})
-        if not event.room_version.msc3667_int_only_power_levels:
+        if not event.room_version.enforce_int_power_levels:
             keys = list(notification_levels.keys())
             for key in keys:
                 level = notification_levels.get(key, SENTINEL)
diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py
index 38dff9703f..690d2ec406 100644
--- a/synapse/rest/client/devices.py
+++ b/synapse/rest/client/devices.py
@@ -14,19 +14,22 @@
 # limitations under the License.
 
 import logging
+from http import HTTPStatus
 from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from pydantic import Extra, StrictStr
 
 from synapse.api import errors
-from synapse.api.errors import NotFoundError, UnrecognizedRequestError
+from synapse.api.errors import NotFoundError, SynapseError, UnrecognizedRequestError
 from synapse.handlers.device import DeviceHandler
 from synapse.http.server import HttpServer
 from synapse.http.servlet import (
     RestServlet,
     parse_and_validate_json_object_from_request,
+    parse_integer,
 )
 from synapse.http.site import SynapseRequest
+from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet
 from synapse.rest.client._base import client_patterns, interactive_auth_handler
 from synapse.rest.client.models import AuthenticationData
 from synapse.rest.models import RequestBodyModel
@@ -229,6 +232,8 @@ class DehydratedDeviceDataModel(RequestBodyModel):
 class DehydratedDeviceServlet(RestServlet):
     """Retrieve or store a dehydrated device.
 
+    Implements either MSC2697 or MSC3814.
+
     GET /org.matrix.msc2697.v2/dehydrated_device
 
     HTTP/1.1 200 OK
@@ -261,9 +266,7 @@ class DehydratedDeviceServlet(RestServlet):
 
     """
 
-    PATTERNS = client_patterns("/org.matrix.msc2697.v2/dehydrated_device$", releases=())
-
-    def __init__(self, hs: "HomeServer"):
+    def __init__(self, hs: "HomeServer", msc2697: bool = True):
         super().__init__()
         self.hs = hs
         self.auth = hs.get_auth()
@@ -271,6 +274,13 @@ class DehydratedDeviceServlet(RestServlet):
         assert isinstance(handler, DeviceHandler)
         self.device_handler = handler
 
+        self.PATTERNS = client_patterns(
+            "/org.matrix.msc2697.v2/dehydrated_device$"
+            if msc2697
+            else "/org.matrix.msc3814.v1/dehydrated_device$",
+            releases=(),
+        )
+
     async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request)
         dehydrated_device = await self.device_handler.get_dehydrated_device(
@@ -293,6 +303,7 @@ class DehydratedDeviceServlet(RestServlet):
 
         device_id = await self.device_handler.store_dehydrated_device(
             requester.user.to_string(),
+            None,
             submission.device_data.dict(),
             submission.initial_device_display_name,
         )
@@ -347,6 +358,210 @@ class ClaimDehydratedDeviceServlet(RestServlet):
         return 200, result
 
 
+class DehydratedDeviceEventsServlet(RestServlet):
+    PATTERNS = client_patterns(
+        "/org.matrix.msc3814.v1/dehydrated_device/(?P<device_id>[^/]*)/events$",
+        releases=(),
+    )
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__()
+        self.message_handler = hs.get_device_message_handler()
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastores().main
+
+    class PostBody(RequestBodyModel):
+        next_batch: Optional[StrictStr]
+
+    async def on_POST(
+        self, request: SynapseRequest, device_id: str
+    ) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+
+        next_batch = parse_and_validate_json_object_from_request(
+            request, self.PostBody
+        ).next_batch
+        limit = parse_integer(request, "limit", 100)
+
+        msgs = await self.message_handler.get_events_for_dehydrated_device(
+            requester=requester,
+            device_id=device_id,
+            since_token=next_batch,
+            limit=limit,
+        )
+
+        return 200, msgs
+
+
+class DehydratedDeviceV2Servlet(RestServlet):
+    """Upload, retrieve, or delete a dehydrated device.
+
+    GET /org.matrix.msc3814.v1/dehydrated_device
+
+    HTTP/1.1 200 OK
+    Content-Type: application/json
+
+    {
+      "device_id": "dehydrated_device_id",
+      "device_data": {
+        "algorithm": "org.matrix.msc2697.v1.dehydration.v1.olm",
+        "account": "dehydrated_device"
+      }
+    }
+
+    PUT /org.matrix.msc3814.v1/dehydrated_device
+    Content-Type: application/json
+
+    {
+        "device_id": "dehydrated_device_id",
+        "device_data": {
+            "algorithm": "org.matrix.msc2697.v1.dehydration.v1.olm",
+            "account": "dehydrated_device"
+        },
+        "device_keys": {
+            "user_id": "<user_id>",
+            "device_id": "<device_id>",
+            "valid_until_ts": <millisecond_timestamp>,
+            "algorithms": [
+                "m.olm.curve25519-aes-sha2",
+            ]
+            "keys": {
+                "<algorithm>:<device_id>": "<key_base64>",
+            },
+            "signatures:" {
+                "<user_id>" {
+                    "<algorithm>:<device_id>": "<signature_base64>"
+                }
+            }
+        },
+        "fallback_keys": {
+            "<algorithm>:<device_id>": "<key_base64>",
+            "signed_<algorithm>:<device_id>": {
+                "fallback": true,
+                "key": "<key_base64>",
+                "signatures": {
+                    "<user_id>": {
+                        "<algorithm>:<device_id>": "<key_base64>"
+                    }
+                }
+            }
+        }
+        "one_time_keys": {
+            "<algorithm>:<key_id>": "<key_base64>"
+        },
+
+    }
+
+    HTTP/1.1 200 OK
+    Content-Type: application/json
+
+    {
+      "device_id": "dehydrated_device_id"
+    }
+
+    DELETE /org.matrix.msc3814.v1/dehydrated_device
+
+    HTTP/1.1 200 OK
+    Content-Type: application/json
+
+    {
+      "device_id": "dehydrated_device_id",
+    }
+    """
+
+    PATTERNS = [
+        *client_patterns("/org.matrix.msc3814.v1/dehydrated_device$", releases=()),
+    ]
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__()
+        self.hs = hs
+        self.auth = hs.get_auth()
+        handler = hs.get_device_handler()
+        assert isinstance(handler, DeviceHandler)
+        self.e2e_keys_handler = hs.get_e2e_keys_handler()
+        self.device_handler = handler
+
+        if hs.config.worker.worker_app is None:
+            # if main process
+            self.key_uploader = self.e2e_keys_handler.upload_keys_for_user
+        else:
+            # then a worker
+            self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs)
+
+    async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+
+        dehydrated_device = await self.device_handler.get_dehydrated_device(
+            requester.user.to_string()
+        )
+
+        if dehydrated_device is not None:
+            (device_id, device_data) = dehydrated_device
+            result = {"device_id": device_id, "device_data": device_data}
+            return 200, result
+        else:
+            raise errors.NotFoundError("No dehydrated device available")
+
+    async def on_DELETE(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+
+        dehydrated_device = await self.device_handler.get_dehydrated_device(
+            requester.user.to_string()
+        )
+
+        if dehydrated_device is not None:
+            (device_id, device_data) = dehydrated_device
+
+            result = await self.device_handler.rehydrate_device(
+                requester.user.to_string(),
+                self.auth.get_access_token_from_request(request),
+                device_id,
+            )
+
+            result = {"device_id": device_id}
+
+            return 200, result
+        else:
+            raise errors.NotFoundError("No dehydrated device available")
+
+    class PutBody(RequestBodyModel):
+        device_data: DehydratedDeviceDataModel
+        device_id: StrictStr
+        initial_device_display_name: Optional[StrictStr]
+
+        class Config:
+            extra = Extra.allow
+
+    async def on_PUT(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        submission = parse_and_validate_json_object_from_request(request, self.PutBody)
+        requester = await self.auth.get_user_by_req(request)
+        user_id = requester.user.to_string()
+
+        device_info = submission.dict()
+        if "device_keys" not in device_info.keys():
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "Device key(s) not found, these must be provided.",
+            )
+
+        # TODO: Those two operations, creating a device and storing the
+        # device's keys should be atomic.
+        device_id = await self.device_handler.store_dehydrated_device(
+            requester.user.to_string(),
+            submission.device_id,
+            submission.device_data.dict(),
+            submission.initial_device_display_name,
+        )
+
+        # TODO: Do we need to do something with the result here?
+        await self.key_uploader(
+            user_id=user_id, device_id=submission.device_id, keys=submission.dict()
+        )
+
+        return 200, {"device_id": device_id}
+
+
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     if (
         hs.config.worker.worker_app is None
@@ -354,7 +569,12 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ):
         DeleteDevicesRestServlet(hs).register(http_server)
     DevicesRestServlet(hs).register(http_server)
+
     if hs.config.worker.worker_app is None:
         DeviceRestServlet(hs).register(http_server)
-        DehydratedDeviceServlet(hs).register(http_server)
-        ClaimDehydratedDeviceServlet(hs).register(http_server)
+        if hs.config.experimental.msc2697_enabled:
+            DehydratedDeviceServlet(hs, msc2697=True).register(http_server)
+            ClaimDehydratedDeviceServlet(hs).register(http_server)
+        if hs.config.experimental.msc3814_enabled:
+            DehydratedDeviceV2Servlet(hs).register(http_server)
+            DehydratedDeviceEventsServlet(hs).register(http_server)
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 951bd033f5..dc498001e4 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -1117,7 +1117,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
         # Ensure the redacts property in the content matches the one provided in
         # the URL.
         room_version = await self._store.get_room_version(room_id)
-        if room_version.msc2176_redaction_rules:
+        if room_version.updated_redaction_rules:
             if "redacts" in content and content["redacts"] != event_id:
                 raise SynapseError(
                     400,
@@ -1151,7 +1151,7 @@ class RoomRedactEventRestServlet(TransactionRestServlet):
                     "sender": requester.user.to_string(),
                 }
                 # Earlier room versions had a top-level redacts property.
-                if not room_version.msc2176_redaction_rules:
+                if not room_version.updated_redaction_rules:
                     event_dict["redacts"] = event_id
 
                 (
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 9bc0c3b7b9..1b91cf5eaa 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -268,8 +268,7 @@ class StateHandler:
             The hosts in the room at the given events
         """
         entry = await self.resolve_state_groups_for_events(room_id, event_ids)
-        state = await entry.get_state(self._state_storage_controller, StateFilter.all())
-        return await self.store.get_joined_hosts(room_id, state, entry)
+        return await self._state_storage_controller.get_joined_hosts(room_id, entry)
 
     @trace
     @tag_args
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index 1b9d7d8457..44c49274a9 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -667,7 +667,7 @@ async def _mainline_sort(
     order_map = {}
     for idx, ev_id in enumerate(event_ids, start=1):
         depth = await _get_mainline_depth_for_event(
-            event_map[ev_id], mainline_map, event_map, state_res_store
+            clock, event_map[ev_id], mainline_map, event_map, state_res_store
         )
         order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id)
 
@@ -682,6 +682,7 @@ async def _mainline_sort(
 
 
 async def _get_mainline_depth_for_event(
+    clock: Clock,
     event: EventBase,
     mainline_map: Dict[str, int],
     event_map: Dict[str, EventBase],
@@ -704,6 +705,7 @@ async def _get_mainline_depth_for_event(
 
     # We do an iterative search, replacing `event with the power level in its
     # auth events (if any)
+    idx = 0
     while tmp_event:
         depth = mainline_map.get(tmp_event.event_id)
         if depth is not None:
@@ -720,6 +722,11 @@ async def _get_mainline_depth_for_event(
                 tmp_event = aev
                 break
 
+        idx += 1
+
+        if idx % _AWAIT_AFTER_ITERATIONS == 0:
+            await clock.sleep(0)
+
     # Didn't find a power level auth event, so we just return 0
     return 0
 
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 233df7cce2..278c7832ba 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from itertools import chain
 from typing import (
     TYPE_CHECKING,
     AbstractSet,
@@ -19,14 +20,16 @@ from typing import (
     Callable,
     Collection,
     Dict,
+    FrozenSet,
     Iterable,
     List,
     Mapping,
     Optional,
     Tuple,
+    Union,
 )
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
 from synapse.logging.opentracing import tag_args, trace
 from synapse.storage.roommember import ProfileInfo
@@ -34,14 +37,20 @@ from synapse.storage.util.partial_state_events_tracker import (
     PartialCurrentStateTracker,
     PartialStateEventsTracker,
 )
-from synapse.types import MutableStateMap, StateMap
+from synapse.types import MutableStateMap, StateMap, get_domain_from_id
 from synapse.types.state import StateFilter
+from synapse.util.async_helpers import Linearizer
+from synapse.util.caches import intern_string
+from synapse.util.caches.descriptors import cached
 from synapse.util.cancellation import cancellable
+from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
+    from synapse.state import _StateCacheEntry
     from synapse.storage.databases import Databases
 
+
 logger = logging.getLogger(__name__)
 
 
@@ -52,10 +61,15 @@ class StateStorageController:
 
     def __init__(self, hs: "HomeServer", stores: "Databases"):
         self._is_mine_id = hs.is_mine_id
+        self._clock = hs.get_clock()
         self.stores = stores
         self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
         self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main)
 
+        # Used by `_get_joined_hosts` to ensure only one thing mutates the cache
+        # at a time. Keyed by room_id.
+        self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
+
     def notify_event_un_partial_stated(self, event_id: str) -> None:
         self._partial_state_events_tracker.notify_un_partial_stated(event_id)
 
@@ -627,3 +641,122 @@ class StateStorageController:
         await self._partial_state_room_tracker.await_full_state(room_id)
 
         return await self.stores.main.get_users_in_room_with_profiles(room_id)
+
+    async def get_joined_hosts(
+        self, room_id: str, state_entry: "_StateCacheEntry"
+    ) -> FrozenSet[str]:
+        state_group: Union[object, int] = state_entry.state_group
+        if not state_group:
+            # If state_group is None it means it has yet to be assigned a
+            # state group, i.e. we need to make sure that calls with a state_group
+            # of None don't hit previous cached calls with a None state_group.
+            # To do this we set the state_group to a new object as object() != object()
+            state_group = object()
+
+        assert state_group is not None
+        with Measure(self._clock, "get_joined_hosts"):
+            return await self._get_joined_hosts(
+                room_id, state_group, state_entry=state_entry
+            )
+
+    @cached(num_args=2, max_entries=10000, iterable=True)
+    async def _get_joined_hosts(
+        self,
+        room_id: str,
+        state_group: Union[object, int],
+        state_entry: "_StateCacheEntry",
+    ) -> FrozenSet[str]:
+        # We don't use `state_group`, it's there so that we can cache based on
+        # it. However, its important that its never None, since two
+        # current_state's with a state_group of None are likely to be different.
+        #
+        # The `state_group` must match the `state_entry.state_group` (if not None).
+        assert state_group is not None
+        assert state_entry.state_group is None or state_entry.state_group == state_group
+
+        # We use a secondary cache of previous work to allow us to build up the
+        # joined hosts for the given state group based on previous state groups.
+        #
+        # We cache one object per room containing the results of the last state
+        # group we got joined hosts for. The idea is that generally
+        # `get_joined_hosts` is called with the "current" state group for the
+        # room, and so consecutive calls will be for consecutive state groups
+        # which point to the previous state group.
+        cache = await self.stores.main._get_joined_hosts_cache(room_id)
+
+        # If the state group in the cache matches, we already have the data we need.
+        if state_entry.state_group == cache.state_group:
+            return frozenset(cache.hosts_to_joined_users)
+
+        # Since we'll mutate the cache we need to lock.
+        async with self._joined_host_linearizer.queue(room_id):
+            if state_entry.state_group == cache.state_group:
+                # Same state group, so nothing to do. We've already checked for
+                # this above, but the cache may have changed while waiting on
+                # the lock.
+                pass
+            elif state_entry.prev_group == cache.state_group:
+                # The cached work is for the previous state group, so we work out
+                # the delta.
+                assert state_entry.delta_ids is not None
+                for (typ, state_key), event_id in state_entry.delta_ids.items():
+                    if typ != EventTypes.Member:
+                        continue
+
+                    host = intern_string(get_domain_from_id(state_key))
+                    user_id = state_key
+                    known_joins = cache.hosts_to_joined_users.setdefault(host, set())
+
+                    event = await self.stores.main.get_event(event_id)
+                    if event.membership == Membership.JOIN:
+                        known_joins.add(user_id)
+                    else:
+                        known_joins.discard(user_id)
+
+                        if not known_joins:
+                            cache.hosts_to_joined_users.pop(host, None)
+            else:
+                # The cache doesn't match the state group or prev state group,
+                # so we calculate the result from first principles.
+                #
+                # We need to fetch all hosts joined to the room according to `state` by
+                # inspecting all join memberships in `state`. However, if the `state` is
+                # relatively recent then many of its events are likely to be held in
+                # the current state of the room, which is easily available and likely
+                # cached.
+                #
+                # We therefore compute the set of `state` events not in the
+                # current state and only fetch those.
+                current_memberships = (
+                    await self.stores.main._get_approximate_current_memberships_in_room(
+                        room_id
+                    )
+                )
+                unknown_state_events = {}
+                joined_users_in_current_state = []
+
+                state = await state_entry.get_state(
+                    self, StateFilter.from_types([(EventTypes.Member, None)])
+                )
+
+                for (type, state_key), event_id in state.items():
+                    if event_id not in current_memberships:
+                        unknown_state_events[type, state_key] = event_id
+                    elif current_memberships[event_id] == Membership.JOIN:
+                        joined_users_in_current_state.append(state_key)
+
+                joined_user_ids = await self.stores.main.get_joined_user_ids_from_state(
+                    room_id, unknown_state_events
+                )
+
+                cache.hosts_to_joined_users = {}
+                for user_id in chain(joined_user_ids, joined_users_in_current_state):
+                    host = intern_string(get_domain_from_id(user_id))
+                    cache.hosts_to_joined_users.setdefault(host, set()).add(user_id)
+
+            if state_entry.state_group:
+                cache.state_group = state_entry.state_group
+            else:
+                cache.state_group = object()
+
+        return frozenset(cache.hosts_to_joined_users)
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index c9d687fb2f..a1c8fb0f46 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -98,8 +98,6 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
     "event_push_summary": "event_push_summary_unique_index2",
     "receipts_linearized": "receipts_linearized_unique_index",
     "receipts_graph": "receipts_graph_unique_index",
-    "profiles": "profiles_full_user_id_key_idx",
-    "user_filters": "full_users_filters_unique_idx",
 }
 
 
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index b6028853c9..be67d1ff22 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple, Union, cast
+from typing import TYPE_CHECKING, List, Optional, Tuple, cast
 
 from synapse.api.constants import Direction
 from synapse.config.homeserver import HomeServerConfig
@@ -196,7 +196,7 @@ class DataStore(
             txn: LoggingTransaction,
         ) -> Tuple[List[JsonDict], int]:
             filters = []
-            args: List[Union[str, int]] = []
+            args: list = []
 
             # Set ordering
             order_by_column = UserSortOrder(order_by).value
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index 75f7fe8756..fff417f9e3 100644
--- a/synapse/storage/databases/main/filtering.py
+++ b/synapse/storage/databases/main/filtering.py
@@ -188,13 +188,14 @@ class FilteringWorkerStore(SQLBaseStore):
                 filter_id = max_id + 1
 
             sql = (
-                "INSERT INTO user_filters (full_user_id, filter_id, filter_json)"
-                "VALUES(?, ?, ?)"
+                "INSERT INTO user_filters (full_user_id, user_id, filter_id, filter_json)"
+                "VALUES(?, ?, ?, ?)"
             )
             txn.execute(
                 sql,
                 (
                     user_id.to_string(),
+                    user_id.localpart,
                     filter_id,
                     bytearray(def_json),
                 ),
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 660a5507b7..3ba9cc8853 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -173,9 +173,10 @@ class ProfileWorkerStore(SQLBaseStore):
         )
 
     async def create_profile(self, user_id: UserID) -> None:
+        user_localpart = user_id.localpart
         await self.db_pool.simple_insert(
             table="profiles",
-            values={"full_user_id": user_id.to_string()},
+            values={"user_id": user_localpart, "full_user_id": user_id.to_string()},
             desc="create_profile",
         )
 
@@ -190,11 +191,13 @@ class ProfileWorkerStore(SQLBaseStore):
             new_displayname: The new display name. If this is None, the user's display
                 name is removed.
         """
+        user_localpart = user_id.localpart
         await self.db_pool.simple_upsert(
             table="profiles",
-            keyvalues={"full_user_id": user_id.to_string()},
+            keyvalues={"user_id": user_localpart},
             values={
                 "displayname": new_displayname,
+                "full_user_id": user_id.to_string(),
             },
             desc="set_profile_displayname",
         )
@@ -210,10 +213,11 @@ class ProfileWorkerStore(SQLBaseStore):
             new_avatar_url: The new avatar URL. If this is None, the user's avatar is
                 removed.
         """
+        user_localpart = user_id.localpart
         await self.db_pool.simple_upsert(
             table="profiles",
-            keyvalues={"full_user_id": user_id.to_string()},
-            values={"avatar_url": new_avatar_url},
+            keyvalues={"user_id": user_localpart},
+            values={"avatar_url": new_avatar_url, "full_user_id": user_id.to_string()},
             desc="set_profile_avatar_url",
         )
 
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index ca8be8c80d..830658f328 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -2136,7 +2136,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             raise StoreError(400, "No create event in state")
 
         # Before MSC2175, the room creator was a separate field.
-        if not room_version.msc2175_implicit_room_creator:
+        if not room_version.implicit_room_creator:
             room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
 
             if not isinstance(room_creator, str):
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 582875c91a..fff259f74c 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from itertools import chain
 from typing import (
     TYPE_CHECKING,
     AbstractSet,
@@ -57,15 +56,12 @@ from synapse.types import (
     StrCollection,
     get_domain_from_id,
 )
-from synapse.util.async_helpers import Linearizer
-from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
-    from synapse.state import _StateCacheEntry
 
 logger = logging.getLogger(__name__)
 
@@ -91,10 +87,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
     ):
         super().__init__(database, db_conn, hs)
 
-        # Used by `_get_joined_hosts` to ensure only one thing mutates the cache
-        # at a time. Keyed by room_id.
-        self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
-
         self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
 
         if (
@@ -1057,120 +1049,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
             "get_current_hosts_in_room_ordered", get_current_hosts_in_room_ordered_txn
         )
 
-    async def get_joined_hosts(
-        self, room_id: str, state: StateMap[str], state_entry: "_StateCacheEntry"
-    ) -> FrozenSet[str]:
-        state_group: Union[object, int] = state_entry.state_group
-        if not state_group:
-            # If state_group is None it means it has yet to be assigned a
-            # state group, i.e. we need to make sure that calls with a state_group
-            # of None don't hit previous cached calls with a None state_group.
-            # To do this we set the state_group to a new object as object() != object()
-            state_group = object()
-
-        assert state_group is not None
-        with Measure(self._clock, "get_joined_hosts"):
-            return await self._get_joined_hosts(
-                room_id, state_group, state, state_entry=state_entry
-            )
-
-    @cached(num_args=2, max_entries=10000, iterable=True)
-    async def _get_joined_hosts(
-        self,
-        room_id: str,
-        state_group: Union[object, int],
-        state: StateMap[str],
-        state_entry: "_StateCacheEntry",
-    ) -> FrozenSet[str]:
-        # We don't use `state_group`, it's there so that we can cache based on
-        # it. However, its important that its never None, since two
-        # current_state's with a state_group of None are likely to be different.
-        #
-        # The `state_group` must match the `state_entry.state_group` (if not None).
-        assert state_group is not None
-        assert state_entry.state_group is None or state_entry.state_group == state_group
-
-        # We use a secondary cache of previous work to allow us to build up the
-        # joined hosts for the given state group based on previous state groups.
-        #
-        # We cache one object per room containing the results of the last state
-        # group we got joined hosts for. The idea is that generally
-        # `get_joined_hosts` is called with the "current" state group for the
-        # room, and so consecutive calls will be for consecutive state groups
-        # which point to the previous state group.
-        cache = await self._get_joined_hosts_cache(room_id)
-
-        # If the state group in the cache matches, we already have the data we need.
-        if state_entry.state_group == cache.state_group:
-            return frozenset(cache.hosts_to_joined_users)
-
-        # Since we'll mutate the cache we need to lock.
-        async with self._joined_host_linearizer.queue(room_id):
-            if state_entry.state_group == cache.state_group:
-                # Same state group, so nothing to do. We've already checked for
-                # this above, but the cache may have changed while waiting on
-                # the lock.
-                pass
-            elif state_entry.prev_group == cache.state_group:
-                # The cached work is for the previous state group, so we work out
-                # the delta.
-                assert state_entry.delta_ids is not None
-                for (typ, state_key), event_id in state_entry.delta_ids.items():
-                    if typ != EventTypes.Member:
-                        continue
-
-                    host = intern_string(get_domain_from_id(state_key))
-                    user_id = state_key
-                    known_joins = cache.hosts_to_joined_users.setdefault(host, set())
-
-                    event = await self.get_event(event_id)
-                    if event.membership == Membership.JOIN:
-                        known_joins.add(user_id)
-                    else:
-                        known_joins.discard(user_id)
-
-                        if not known_joins:
-                            cache.hosts_to_joined_users.pop(host, None)
-            else:
-                # The cache doesn't match the state group or prev state group,
-                # so we calculate the result from first principles.
-                #
-                # We need to fetch all hosts joined to the room according to `state` by
-                # inspecting all join memberships in `state`. However, if the `state` is
-                # relatively recent then many of its events are likely to be held in
-                # the current state of the room, which is easily available and likely
-                # cached.
-                #
-                # We therefore compute the set of `state` events not in the
-                # current state and only fetch those.
-                current_memberships = (
-                    await self._get_approximate_current_memberships_in_room(room_id)
-                )
-                unknown_state_events = {}
-                joined_users_in_current_state = []
-
-                for (type, state_key), event_id in state.items():
-                    if event_id not in current_memberships:
-                        unknown_state_events[type, state_key] = event_id
-                    elif current_memberships[event_id] == Membership.JOIN:
-                        joined_users_in_current_state.append(state_key)
-
-                joined_user_ids = await self.get_joined_user_ids_from_state(
-                    room_id, unknown_state_events
-                )
-
-                cache.hosts_to_joined_users = {}
-                for user_id in chain(joined_user_ids, joined_users_in_current_state):
-                    host = intern_string(get_domain_from_id(user_id))
-                    cache.hosts_to_joined_users.setdefault(host, set()).add(user_id)
-
-            if state_entry.state_group:
-                cache.state_group = state_entry.state_group
-            else:
-                cache.state_group = object()
-
-        return frozenset(cache.hosts_to_joined_users)
-
     async def _get_approximate_current_memberships_in_room(
         self, room_id: str
     ) -> Mapping[str, Optional[str]]:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 97c4dc2603..f34b7ce8f4 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -697,7 +697,7 @@ class StatsStore(StateDeltasStore):
             txn: LoggingTransaction,
         ) -> Tuple[List[JsonDict], int]:
             filters = []
-            args = [self.hs.config.server.server_name]
+            args: list = []
 
             if search_term:
                 filters.append("(lmr.user_id LIKE ? OR displayname LIKE ?)")
@@ -733,7 +733,7 @@ class StatsStore(StateDeltasStore):
 
             sql_base = """
                 FROM local_media_repository as lmr
-                LEFT JOIN profiles AS p ON lmr.user_id = '@' || p.user_id || ':' || ?
+                LEFT JOIN profiles AS p ON lmr.user_id = p.full_user_id
                 {}
                 GROUP BY lmr.user_id, displayname
             """.format(
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 924022c95c..2a136f2ff6 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -409,23 +409,22 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
                 txn, users_to_work_on
             )
 
-            # Next fetch their profiles. Note that the `user_id` here is the
-            # *localpart*, and that not all users have profiles.
+            # Next fetch their profiles. Note that not all users have profiles.
             profile_rows = self.db_pool.simple_select_many_txn(
                 txn,
                 table="profiles",
-                column="user_id",
-                iterable=[get_localpart_from_id(u) for u in users_to_insert],
+                column="full_user_id",
+                iterable=list(users_to_insert),
                 retcols=(
-                    "user_id",
+                    "full_user_id",
                     "displayname",
                     "avatar_url",
                 ),
                 keyvalues={},
             )
             profiles = {
-                f"@{row['user_id']}:{self.server_name}": _UserDirProfile(
-                    f"@{row['user_id']}:{self.server_name}",
+                row["full_user_id"]: _UserDirProfile(
+                    row["full_user_id"],
                     row["displayname"],
                     row["avatar_url"],
                 )
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 6d14963c0a..d3ec648f6d 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -108,7 +108,8 @@ Changes in SCHEMA_VERSION = 78
     - Validate check (full_user_id IS NOT NULL) on tables profiles and user_filters
 
 Changes in SCHEMA_VERSION = 79
-    - We no longer write to column user_id of tables profiles and user_filters
+    - Add tables to handle in DB read-write locks.
+    - Add some mitigations for a painful race between foreground and background updates, cf #15677.
 """
 
 
@@ -121,9 +122,7 @@ SCHEMA_COMPAT_VERSION = (
     #
     # insertions to the column `full_user_id` of tables profiles and user_filters can no
     # longer be null
-    #
-    # we no longer write to column `full_user_id` of tables profiles and user_filters
-    78
+    76
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
 
diff --git a/synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py b/synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py
deleted file mode 100644
index 3541266f7d..0000000000
--- a/synapse/storage/schema/main/delta/79/01_drop_user_id_constraint_profiles.py
+++ /dev/null
@@ -1,50 +0,0 @@
-from synapse.storage.database import LoggingTransaction
-from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
-
-
-def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
-    """
-    Update to drop the NOT NULL constraint on column user_id so that we can cease to
-    write to it without inserts to other columns triggering the constraint
-    """
-
-    if isinstance(database_engine, PostgresEngine):
-        drop_sql = """
-        ALTER TABLE profiles ALTER COLUMN user_id DROP NOT NULL
-        """
-        cur.execute(drop_sql)
-    else:
-        # irritatingly in SQLite we need to rewrite the table to drop the constraint.
-        cur.execute("DROP TABLE IF EXISTS temp_profiles")
-
-        create_sql = """
-        CREATE TABLE temp_profiles (
-            full_user_id text NOT NULL,
-            user_id text,
-            displayname text,
-            avatar_url text,
-            UNIQUE (full_user_id),
-            UNIQUE (user_id)
-        )
-        """
-        cur.execute(create_sql)
-
-        copy_sql = """
-        INSERT INTO temp_profiles (
-            user_id,
-            displayname,
-            avatar_url,
-            full_user_id)
-            SELECT user_id, displayname, avatar_url, full_user_id FROM profiles
-        """
-        cur.execute(copy_sql)
-
-        drop_sql = """
-        DROP TABLE profiles
-        """
-        cur.execute(drop_sql)
-
-        rename_sql = """
-        ALTER TABLE temp_profiles RENAME to profiles
-        """
-        cur.execute(rename_sql)
diff --git a/synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py b/synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py
deleted file mode 100644
index 8e7569c470..0000000000
--- a/synapse/storage/schema/main/delta/79/02_drop_user_id_constraint_user_filters.py
+++ /dev/null
@@ -1,54 +0,0 @@
-from synapse.storage.database import LoggingTransaction
-from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
-
-
-def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
-    """
-    Update to drop the NOT NULL constraint on column user_id so that we can cease to
-    write to it without inserts to other columns triggering the constraint
-    """
-    if isinstance(database_engine, PostgresEngine):
-        drop_sql = """
-        ALTER TABLE user_filters ALTER COLUMN user_id DROP NOT NULL
-        """
-        cur.execute(drop_sql)
-
-    else:
-        # irritatingly in SQLite we need to rewrite the table to drop the constraint.
-        cur.execute("DROP TABLE IF EXISTS temp_user_filters")
-
-        create_sql = """
-        CREATE TABLE temp_user_filters (
-            full_user_id text NOT NULL,
-            user_id text,
-            filter_id bigint NOT NULL,
-            filter_json bytea NOT NULL
-        )
-        """
-        cur.execute(create_sql)
-
-        index_sql = """
-            CREATE UNIQUE INDEX IF NOT EXISTS user_filters_full_user_id_unique ON
-            temp_user_filters (full_user_id, filter_id)
-        """
-        cur.execute(index_sql)
-
-        copy_sql = """
-            INSERT INTO temp_user_filters (
-                user_id,
-                filter_id,
-                filter_json,
-                full_user_id)
-            SELECT user_id, filter_id, filter_json, full_user_id FROM user_filters
-        """
-        cur.execute(copy_sql)
-
-        drop_sql = """
-        DROP TABLE user_filters
-        """
-        cur.execute(drop_sql)
-
-        rename_sql = """
-        ALTER TABLE temp_user_filters RENAME to user_filters
-        """
-        cur.execute(rename_sql)
diff --git a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.postgres
index e1a41be9c9..7df07ab0da 100644
--- a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.postgres
+++ b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.postgres
@@ -44,7 +44,7 @@
 
 -- A table to track whether a lock is currently acquired, and if so whether its
 -- in read or write mode.
-CREATE TABLE worker_read_write_locks_mode (
+CREATE TABLE IF NOT EXISTS worker_read_write_locks_mode (
     lock_name TEXT NOT NULL,
     lock_key TEXT NOT NULL,
     -- Whether this lock is in read (false) or write (true) mode
@@ -55,14 +55,14 @@ CREATE TABLE worker_read_write_locks_mode (
 );
 
 -- Ensure that we can only have one row per lock
-CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
 -- We need this (redundant) constraint so that we can have a foreign key
 -- constraint against this table.
-CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);
 
 
 -- A table to track who has currently acquired a given lock.
-CREATE TABLE worker_read_write_locks (
+CREATE TABLE IF NOT EXISTS worker_read_write_locks (
     lock_name TEXT NOT NULL,
     lock_key TEXT NOT NULL,
     -- We write the instance name to ease manual debugging, we don't ever read
@@ -84,9 +84,9 @@ CREATE TABLE worker_read_write_locks (
     FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock)
 );
 
-CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
 -- Ensures that only one instance can acquire a lock in write mode at a time.
-CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
 
 
 -- Add a foreign key constraint to ensure that if a lock is in
@@ -97,56 +97,6 @@ CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lo
 -- We only add to PostgreSQL as SQLite does not support adding constraints
 -- after table creation, and so doesn't support "circular" foreign key
 -- constraints.
+ALTER TABLE worker_read_write_locks_mode DROP CONSTRAINT IF EXISTS worker_read_write_locks_mode_foreign;
 ALTER TABLE worker_read_write_locks_mode ADD CONSTRAINT worker_read_write_locks_mode_foreign
     FOREIGN KEY (lock_name, lock_key, token) REFERENCES worker_read_write_locks(lock_name, lock_key, token) DEFERRABLE INITIALLY DEFERRED;
-
-
--- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try
--- and acquire a lock, i.e. insert into `worker_read_write_locks`,
-CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$
-BEGIN
-    INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
-        VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
-        ON CONFLICT (lock_name, lock_key)
-        DO NOTHING;
-    RETURN NEW;
-END
-$$
-LANGUAGE plpgsql;
-
-CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks
-    FOR EACH ROW
-    EXECUTE PROCEDURE upsert_read_write_lock_parent();
-
-
--- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
--- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
--- update the `worker_read_write_locks_mode.token` to match another instance
--- that has currently acquired the lock, or we delete the row if nobody has
--- currently acquired a lock.
-CREATE OR REPLACE FUNCTION delete_read_write_lock_parent() RETURNS trigger AS $$
-DECLARE
-    new_token TEXT;
-BEGIN
-    SELECT token INTO new_token FROM worker_read_write_locks
-        WHERE
-            lock_name = OLD.lock_name
-            AND lock_key = OLD.lock_key;
-
-    IF NOT FOUND THEN
-        DELETE FROM worker_read_write_locks_mode
-            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
-    ELSE
-        UPDATE worker_read_write_locks_mode
-            SET token = new_token
-            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
-    END IF;
-
-    RETURN NEW;
-END
-$$
-LANGUAGE plpgsql;
-
-CREATE TRIGGER delete_read_write_lock_parent_trigger AFTER DELETE ON worker_read_write_locks
-    FOR EACH ROW
-    EXECUTE PROCEDURE delete_read_write_lock_parent();
diff --git a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.sqlite
index be2dfbbb8a..95f9dbf120 100644
--- a/synapse/storage/schema/main/delta/78/04_read_write_locks_triggers.sql.sqlite
+++ b/synapse/storage/schema/main/delta/79/03_read_write_locks_triggers.sql.sqlite
@@ -22,7 +22,7 @@
 
 -- A table to track whether a lock is currently acquired, and if so whether its
 -- in read or write mode.
-CREATE TABLE worker_read_write_locks_mode (
+CREATE TABLE IF NOT EXISTS worker_read_write_locks_mode (
     lock_name TEXT NOT NULL,
     lock_key TEXT NOT NULL,
     -- Whether this lock is in read (false) or write (true) mode
@@ -38,14 +38,14 @@ CREATE TABLE worker_read_write_locks_mode (
 );
 
 -- Ensure that we can only have one row per lock
-CREATE UNIQUE INDEX worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_key ON worker_read_write_locks_mode (lock_name, lock_key);
 -- We need this (redundant) constraint so that we can have a foreign key
 -- constraint against this table.
-CREATE UNIQUE INDEX worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_mode_type ON worker_read_write_locks_mode (lock_name, lock_key, write_lock);
 
 
 -- A table to track who has currently acquired a given lock.
-CREATE TABLE worker_read_write_locks (
+CREATE TABLE IF NOT EXISTS worker_read_write_locks (
     lock_name TEXT NOT NULL,
     lock_key TEXT NOT NULL,
     -- We write the instance name to ease manual debugging, we don't ever read
@@ -67,53 +67,6 @@ CREATE TABLE worker_read_write_locks (
     FOREIGN KEY (lock_name, lock_key, write_lock) REFERENCES worker_read_write_locks_mode (lock_name, lock_key, write_lock)
 );
 
-CREATE UNIQUE INDEX worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_key ON worker_read_write_locks (lock_name, lock_key, token);
 -- Ensures that only one instance can acquire a lock in write mode at a time.
-CREATE UNIQUE INDEX worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
-
-
--- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try
--- and acquire a lock, i.e. insert into `worker_read_write_locks`,
-CREATE TRIGGER IF NOT EXISTS upsert_read_write_lock_parent_trigger
-BEFORE INSERT ON worker_read_write_locks
-FOR EACH ROW
-BEGIN
-    -- First ensure that `worker_read_write_locks_mode` doesn't have stale
-    -- entries in it, as on SQLite we don't have the foreign key constraint to
-    -- enforce this.
-    DELETE FROM worker_read_write_locks_mode
-        WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key
-        AND NOT EXISTS (
-            SELECT 1 FROM worker_read_write_locks
-            WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key
-        );
-
-    INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
-        VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
-        ON CONFLICT (lock_name, lock_key)
-        DO NOTHING;
-END;
-
--- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
--- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
--- update the `worker_read_write_locks_mode.token` to match another instance
--- that has currently acquired the lock, or we delete the row if nobody has
--- currently acquired a lock.
-CREATE TRIGGER IF NOT EXISTS delete_read_write_lock_parent_trigger
-AFTER DELETE ON worker_read_write_locks
-FOR EACH ROW
-BEGIN
-    DELETE FROM worker_read_write_locks_mode
-        WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
-        AND NOT EXISTS (
-            SELECT 1 FROM worker_read_write_locks
-            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
-        );
-
-    UPDATE worker_read_write_locks_mode
-        SET token = (
-            SELECT token FROM worker_read_write_locks
-            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
-        )
-        WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
-END;
+CREATE UNIQUE INDEX IF NOT EXISTS worker_read_write_locks_write ON worker_read_write_locks (lock_name, lock_key) WHERE write_lock;
diff --git a/synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py b/synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py
new file mode 100644
index 0000000000..ae63585847
--- /dev/null
+++ b/synapse/storage/schema/main/delta/79/04_mitigate_stream_ordering_update_race.py
@@ -0,0 +1,70 @@
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+
+
+def run_create(
+    cur: LoggingTransaction,
+    database_engine: BaseDatabaseEngine,
+) -> None:
+    """
+    An attempt to mitigate a painful race between foreground and background updates
+    touching the `stream_ordering` column of the events table. More info can be found
+    at https://github.com/matrix-org/synapse/issues/15677.
+    """
+
+    # technically the bg update we're concerned with below should only have been added in
+    # postgres but it doesn't hurt to be extra careful
+    if isinstance(database_engine, PostgresEngine):
+        select_sql = """
+            SELECT 1 FROM background_updates
+                WHERE update_name = 'replace_stream_ordering_column'
+        """
+        cur.execute(select_sql)
+        res = cur.fetchone()
+
+        # if the background update `replace_stream_ordering_column` is still pending, we need
+        # to drop the indexes added in 7403, and re-add them to the column `stream_ordering2`
+        # with the idea that they will be preserved when the column is renamed `stream_ordering`
+        # after the background update has finished
+        if res:
+            drop_cse_sql = """
+            ALTER TABLE current_state_events DROP CONSTRAINT IF EXISTS event_stream_ordering_fkey
+            """
+            cur.execute(drop_cse_sql)
+
+            drop_lcm_sql = """
+            ALTER TABLE local_current_membership DROP CONSTRAINT IF EXISTS event_stream_ordering_fkey
+            """
+            cur.execute(drop_lcm_sql)
+
+            drop_rm_sql = """
+            ALTER TABLE room_memberships DROP CONSTRAINT IF EXISTS event_stream_ordering_fkey
+            """
+            cur.execute(drop_rm_sql)
+
+            add_cse_sql = """
+            ALTER TABLE current_state_events ADD CONSTRAINT event_stream_ordering_fkey
+            FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering2) NOT VALID;
+            """
+            cur.execute(add_cse_sql)
+
+            add_lcm_sql = """
+            ALTER TABLE local_current_membership ADD CONSTRAINT event_stream_ordering_fkey
+            FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering2) NOT VALID;
+            """
+            cur.execute(add_lcm_sql)
+
+            add_rm_sql = """
+            ALTER TABLE room_memberships ADD CONSTRAINT event_stream_ordering_fkey
+            FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering2) NOT VALID;
+            """
+            cur.execute(add_rm_sql)
diff --git a/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres
new file mode 100644
index 0000000000..ea3496ef2d
--- /dev/null
+++ b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.postgres
@@ -0,0 +1,69 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Fix up the triggers that were in `78/04_read_write_locks_triggers.sql`
+
+-- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try
+-- and acquire a lock, i.e. insert into `worker_read_write_locks`,
+CREATE OR REPLACE FUNCTION upsert_read_write_lock_parent() RETURNS trigger AS $$
+BEGIN
+    INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
+        VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
+        ON CONFLICT (lock_name, lock_key)
+        DO UPDATE SET write_lock = NEW.write_lock, token = NEW.token;
+    RETURN NEW;
+END
+$$
+LANGUAGE plpgsql;
+
+DROP TRIGGER IF EXISTS upsert_read_write_lock_parent_trigger ON worker_read_write_locks;
+CREATE TRIGGER upsert_read_write_lock_parent_trigger BEFORE INSERT ON worker_read_write_locks
+    FOR EACH ROW
+    EXECUTE PROCEDURE upsert_read_write_lock_parent();
+
+
+-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
+-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
+-- update the `worker_read_write_locks_mode.token` to match another instance
+-- that has currently acquired the lock, or we delete the row if nobody has
+-- currently acquired a lock.
+CREATE OR REPLACE FUNCTION delete_read_write_lock_parent() RETURNS trigger AS $$
+DECLARE
+    new_token TEXT;
+BEGIN
+    SELECT token INTO new_token FROM worker_read_write_locks
+        WHERE
+            lock_name = OLD.lock_name
+            AND lock_key = OLD.lock_key
+        LIMIT 1 FOR UPDATE;
+
+    IF NOT FOUND THEN
+        DELETE FROM worker_read_write_locks_mode
+            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key AND token = OLD.token;
+    ELSE
+        UPDATE worker_read_write_locks_mode
+            SET token = new_token
+            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
+    END IF;
+
+    RETURN NEW;
+END
+$$
+LANGUAGE plpgsql;
+
+DROP TRIGGER IF EXISTS delete_read_write_lock_parent_trigger ON worker_read_write_locks;
+CREATE TRIGGER delete_read_write_lock_parent_trigger AFTER DELETE ON worker_read_write_locks
+    FOR EACH ROW
+    EXECUTE PROCEDURE delete_read_write_lock_parent();
diff --git a/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite
new file mode 100644
index 0000000000..acb1a77c80
--- /dev/null
+++ b/synapse/storage/schema/main/delta/79/05_read_write_locks_triggers.sql.sqlite
@@ -0,0 +1,65 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Fix up the triggers that were in `78/04_read_write_locks_triggers.sql`
+
+-- Add a trigger to UPSERT into `worker_read_write_locks_mode` whenever we try
+-- and acquire a lock, i.e. insert into `worker_read_write_locks`,
+DROP TRIGGER IF EXISTS upsert_read_write_lock_parent_trigger;
+CREATE TRIGGER IF NOT EXISTS upsert_read_write_lock_parent_trigger
+BEFORE INSERT ON worker_read_write_locks
+FOR EACH ROW
+BEGIN
+    -- First ensure that `worker_read_write_locks_mode` doesn't have stale
+    -- entries in it, as on SQLite we don't have the foreign key constraint to
+    -- enforce this.
+    DELETE FROM worker_read_write_locks_mode
+        WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key
+        AND NOT EXISTS (
+            SELECT 1 FROM worker_read_write_locks
+            WHERE lock_name = NEW.lock_name AND lock_key = NEW.lock_key
+        );
+
+    INSERT INTO worker_read_write_locks_mode (lock_name, lock_key, write_lock, token)
+        VALUES (NEW.lock_name, NEW.lock_key, NEW.write_lock, NEW.token)
+        ON CONFLICT (lock_name, lock_key)
+        DO UPDATE SET write_lock = NEW.write_lock, token = NEW.token;
+END;
+
+-- Ensure that we keep `worker_read_write_locks_mode` up to date whenever a lock
+-- is released (i.e. a row deleted from `worker_read_write_locks`). Either we
+-- update the `worker_read_write_locks_mode.token` to match another instance
+-- that has currently acquired the lock, or we delete the row if nobody has
+-- currently acquired a lock.
+DROP TRIGGER IF EXISTS delete_read_write_lock_parent_trigger;
+CREATE TRIGGER IF NOT EXISTS delete_read_write_lock_parent_trigger
+AFTER DELETE ON worker_read_write_locks
+FOR EACH ROW
+BEGIN
+    DELETE FROM worker_read_write_locks_mode
+        WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
+            AND token = OLD.token
+        AND NOT EXISTS (
+            SELECT 1 FROM worker_read_write_locks
+            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
+        );
+
+    UPDATE worker_read_write_locks_mode
+        SET token = (
+            SELECT token FROM worker_read_write_locks
+            WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key
+        )
+        WHERE lock_name = OLD.lock_name AND lock_key = OLD.lock_key;
+END;
diff --git a/tests/app/test_openid_listener.py b/tests/app/test_openid_listener.py
index 5a965f233b..21c5309740 100644
--- a/tests/app/test_openid_listener.py
+++ b/tests/app/test_openid_listener.py
@@ -31,9 +31,7 @@ from tests.unittest import HomeserverTestCase
 
 class FederationReaderOpenIDListenerTests(HomeserverTestCase):
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-        hs = self.setup_test_homeserver(
-            federation_http_client=None, homeserver_to_use=GenericWorkerServer
-        )
+        hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
         return hs
 
     def default_config(self) -> JsonDict:
@@ -91,9 +89,7 @@ class FederationReaderOpenIDListenerTests(HomeserverTestCase):
 @patch("synapse.app.homeserver.KeyResource", new=Mock())
 class SynapseHomeserverOpenIDListenerTests(HomeserverTestCase):
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-        hs = self.setup_test_homeserver(
-            federation_http_client=None, homeserver_to_use=SynapseHomeServer
-        )
+        hs = self.setup_test_homeserver(homeserver_to_use=SynapseHomeServer)
         return hs
 
     @parameterized.expand(
diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py
index c9a610db9a..978612e432 100644
--- a/tests/events/test_utils.py
+++ b/tests/events/test_utils.py
@@ -140,18 +140,16 @@ class PruneEventTestCase(stdlib_unittest.TestCase):
             },
         )
 
-        # As of MSC2176 we now redact the membership and prev_states keys.
+        # As of room versions we now redact the membership, prev_states, and origin keys.
         self.run_test(
-            {"type": "A", "prev_state": "prev_state", "membership": "join"},
-            {"type": "A", "content": {}, "signatures": {}, "unsigned": {}},
-            room_version=RoomVersions.MSC2176,
-        )
-
-        # As of MSC3989 we now redact the origin key.
-        self.run_test(
-            {"type": "A", "origin": "example.com"},
+            {
+                "type": "A",
+                "prev_state": "prev_state",
+                "membership": "join",
+                "origin": "example.com",
+            },
             {"type": "A", "content": {}, "signatures": {}, "unsigned": {}},
-            room_version=RoomVersions.MSC3989,
+            room_version=RoomVersions.V11,
         )
 
     def test_unsigned(self) -> None:
@@ -227,16 +225,21 @@ class PruneEventTestCase(stdlib_unittest.TestCase):
             },
         )
 
-        # After MSC2176, create events get nothing redacted.
+        # After MSC2176, create events should preserve field `content`
         self.run_test(
-            {"type": "m.room.create", "content": {"not_a_real_key": True}},
+            {
+                "type": "m.room.create",
+                "content": {"not_a_real_key": True},
+                "origin": "some_homeserver",
+                "nonsense_field": "some_random_garbage",
+            },
             {
                 "type": "m.room.create",
                 "content": {"not_a_real_key": True},
                 "signatures": {},
                 "unsigned": {},
             },
-            room_version=RoomVersions.MSC2176,
+            room_version=RoomVersions.V11,
         )
 
     def test_power_levels(self) -> None:
@@ -286,7 +289,7 @@ class PruneEventTestCase(stdlib_unittest.TestCase):
                 "signatures": {},
                 "unsigned": {},
             },
-            room_version=RoomVersions.MSC2176,
+            room_version=RoomVersions.V11,
         )
 
     def test_alias_event(self) -> None:
@@ -349,7 +352,7 @@ class PruneEventTestCase(stdlib_unittest.TestCase):
                 "signatures": {},
                 "unsigned": {},
             },
-            room_version=RoomVersions.MSC2176,
+            room_version=RoomVersions.V11,
         )
 
     def test_join_rules(self) -> None:
@@ -472,7 +475,7 @@ class PruneEventTestCase(stdlib_unittest.TestCase):
                 "signatures": {},
                 "unsigned": {},
             },
-            room_version=RoomVersions.MSC3821,
+            room_version=RoomVersions.V11,
         )
 
         # Ensure this doesn't break if an invalid field is sent.
@@ -491,7 +494,7 @@ class PruneEventTestCase(stdlib_unittest.TestCase):
                 "signatures": {},
                 "unsigned": {},
             },
-            room_version=RoomVersions.MSC3821,
+            room_version=RoomVersions.V11,
         )
 
         self.run_test(
@@ -509,7 +512,7 @@ class PruneEventTestCase(stdlib_unittest.TestCase):
                 "signatures": {},
                 "unsigned": {},
             },
-            room_version=RoomVersions.MSC3821,
+            room_version=RoomVersions.V11,
         )
 
     def test_relations(self) -> None:
diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py
index ee48f9e546..647ee09279 100644
--- a/tests/handlers/test_device.py
+++ b/tests/handlers/test_device.py
@@ -17,15 +17,18 @@
 from typing import Optional
 from unittest import mock
 
+from twisted.internet.defer import ensureDeferred
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.api.constants import RoomEncryptionAlgorithms
 from synapse.api.errors import NotFoundError, SynapseError
 from synapse.appservice import ApplicationService
 from synapse.handlers.device import MAX_DEVICE_DISPLAY_NAME_LEN, DeviceHandler
+from synapse.rest import admin
+from synapse.rest.client import devices, login, register
 from synapse.server import HomeServer
 from synapse.storage.databases.main.appservice import _make_exclusive_regex
-from synapse.types import JsonDict
+from synapse.types import JsonDict, create_requester
 from synapse.util import Clock
 
 from tests import unittest
@@ -41,7 +44,6 @@ class DeviceTestCase(unittest.HomeserverTestCase):
         self.appservice_api = mock.Mock()
         hs = self.setup_test_homeserver(
             "server",
-            federation_http_client=None,
             application_service_api=self.appservice_api,
         )
         handler = hs.get_device_handler()
@@ -400,11 +402,19 @@ class DeviceTestCase(unittest.HomeserverTestCase):
 
 
 class DehydrationTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        admin.register_servlets_for_client_rest_resource,
+        login.register_servlets,
+        register.register_servlets,
+        devices.register_servlets,
+    ]
+
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-        hs = self.setup_test_homeserver("server", federation_http_client=None)
+        hs = self.setup_test_homeserver("server")
         handler = hs.get_device_handler()
         assert isinstance(handler, DeviceHandler)
         self.handler = handler
+        self.message_handler = hs.get_device_message_handler()
         self.registration = hs.get_registration_handler()
         self.auth = hs.get_auth()
         self.store = hs.get_datastores().main
@@ -419,6 +429,7 @@ class DehydrationTestCase(unittest.HomeserverTestCase):
         stored_dehydrated_device_id = self.get_success(
             self.handler.store_dehydrated_device(
                 user_id=user_id,
+                device_id=None,
                 device_data={"device_data": {"foo": "bar"}},
                 initial_device_display_name="dehydrated device",
             )
@@ -482,3 +493,88 @@ class DehydrationTestCase(unittest.HomeserverTestCase):
         ret = self.get_success(self.handler.get_dehydrated_device(user_id=user_id))
 
         self.assertIsNone(ret)
+
+    @unittest.override_config(
+        {"experimental_features": {"msc2697_enabled": False, "msc3814_enabled": True}}
+    )
+    def test_dehydrate_v2_and_fetch_events(self) -> None:
+        user_id = "@boris:server"
+
+        self.get_success(self.store.register_user(user_id, "foobar"))
+
+        # First check if we can store and fetch a dehydrated device
+        stored_dehydrated_device_id = self.get_success(
+            self.handler.store_dehydrated_device(
+                user_id=user_id,
+                device_id=None,
+                device_data={"device_data": {"foo": "bar"}},
+                initial_device_display_name="dehydrated device",
+            )
+        )
+
+        device_info = self.get_success(
+            self.handler.get_dehydrated_device(user_id=user_id)
+        )
+        assert device_info is not None
+        retrieved_device_id, device_data = device_info
+        self.assertEqual(retrieved_device_id, stored_dehydrated_device_id)
+        self.assertEqual(device_data, {"device_data": {"foo": "bar"}})
+
+        # Create a new login for the user
+        device_id, access_token, _expiration_time, _refresh_token = self.get_success(
+            self.registration.register_device(
+                user_id=user_id,
+                device_id=None,
+                initial_display_name="new device",
+            )
+        )
+
+        requester = create_requester(user_id, device_id=device_id)
+
+        # Fetching messages for a non-existing device should return an error
+        self.get_failure(
+            self.message_handler.get_events_for_dehydrated_device(
+                requester=requester,
+                device_id="not the right device ID",
+                since_token=None,
+                limit=10,
+            ),
+            SynapseError,
+        )
+
+        # Send a message to the dehydrated device
+        ensureDeferred(
+            self.message_handler.send_device_message(
+                requester=requester,
+                message_type="test.message",
+                messages={user_id: {stored_dehydrated_device_id: {"body": "foo"}}},
+            )
+        )
+        self.pump()
+
+        # Fetch the message of the dehydrated device
+        res = self.get_success(
+            self.message_handler.get_events_for_dehydrated_device(
+                requester=requester,
+                device_id=stored_dehydrated_device_id,
+                since_token=None,
+                limit=10,
+            )
+        )
+
+        self.assertTrue(len(res["next_batch"]) > 1)
+        self.assertEqual(len(res["events"]), 1)
+        self.assertEqual(res["events"][0]["content"]["body"], "foo")
+
+        # Fetch the message of the dehydrated device again, which should return nothing
+        # and delete the old messages
+        res = self.get_success(
+            self.message_handler.get_events_for_dehydrated_device(
+                requester=requester,
+                device_id=stored_dehydrated_device_id,
+                since_token=res["next_batch"],
+                limit=10,
+            )
+        )
+        self.assertTrue(len(res["next_batch"]) > 1)
+        self.assertEqual(len(res["events"]), 0)
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index e5f67693b8..1b6ef39120 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -57,7 +57,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
     ]
 
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-        hs = self.setup_test_homeserver(federation_http_client=None)
+        hs = self.setup_test_homeserver()
         self.handler = hs.get_federation_handler()
         self.store = hs.get_datastores().main
         return hs
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 19f5322317..fd66d573d2 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -993,7 +993,6 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
         hs = self.setup_test_homeserver(
             "server",
-            federation_http_client=None,
             federation_sender=Mock(spec=FederationSender),
         )
         return hs
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index 94518a7196..5da1d95f0b 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -17,6 +17,8 @@ import json
 from typing import Dict, List, Set
 from unittest.mock import ANY, Mock, call
 
+from netaddr import IPSet
+
 from twisted.test.proto_helpers import MemoryReactor
 from twisted.web.resource import Resource
 
@@ -24,6 +26,7 @@ from synapse.api.constants import EduTypes
 from synapse.api.errors import AuthError
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.handlers.typing import TypingWriterHandler
+from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
 from synapse.server import HomeServer
 from synapse.types import JsonDict, Requester, UserID, create_requester
 from synapse.util import Clock
@@ -76,6 +79,13 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
         # we mock out the federation client too
         self.mock_federation_client = Mock(spec=["put_json"])
         self.mock_federation_client.put_json.return_value = make_awaitable((200, "OK"))
+        self.mock_federation_client.agent = MatrixFederationAgent(
+            reactor,
+            tls_client_options_factory=None,
+            user_agent=b"SynapseInTrialTest/0.0.0",
+            ip_allowlist=None,
+            ip_blocklist=IPSet(),
+        )
 
         # the tests assume that we are starting at unix time 1000
         reactor.pump((1000,))
diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py
index b5f4a60fe5..ab94f3f67a 100644
--- a/tests/http/test_matrixfederationclient.py
+++ b/tests/http/test_matrixfederationclient.py
@@ -11,8 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Generator
-from unittest.mock import Mock
+from typing import Any, Dict, Generator
+from unittest.mock import ANY, Mock, create_autospec
 
 from netaddr import IPSet
 from parameterized import parameterized
@@ -21,10 +21,12 @@ from twisted.internet import defer
 from twisted.internet.defer import Deferred, TimeoutError
 from twisted.internet.error import ConnectingCancelledError, DNSLookupError
 from twisted.test.proto_helpers import MemoryReactor, StringTransport
-from twisted.web.client import ResponseNeverReceived
+from twisted.web.client import Agent, ResponseNeverReceived
 from twisted.web.http import HTTPChannel
+from twisted.web.http_headers import Headers
 
-from synapse.api.errors import RequestSendFailed
+from synapse.api.errors import HttpResponseException, RequestSendFailed
+from synapse.config._base import ConfigError
 from synapse.http.matrixfederationclient import (
     ByteParser,
     MatrixFederationHttpClient,
@@ -39,7 +41,9 @@ from synapse.logging.context import (
 from synapse.server import HomeServer
 from synapse.util import Clock
 
+from tests.replication._base import BaseMultiWorkerStreamTestCase
 from tests.server import FakeTransport
+from tests.test_utils import FakeResponse
 from tests.unittest import HomeserverTestCase, override_config
 
 
@@ -658,3 +662,275 @@ class FederationClientTests(HomeserverTestCase):
         self.assertEqual(self.cl.max_short_retry_delay_seconds, 7)
         self.assertEqual(self.cl.max_long_retries, 20)
         self.assertEqual(self.cl.max_short_retries, 5)
+
+
+class FederationClientProxyTests(BaseMultiWorkerStreamTestCase):
+    def default_config(self) -> Dict[str, Any]:
+        conf = super().default_config()
+        conf["instance_map"] = {
+            "main": {"host": "testserv", "port": 8765},
+            "federation_sender": {"host": "testserv", "port": 1001},
+        }
+        return conf
+
+    @override_config(
+        {
+            "outbound_federation_restricted_to": ["federation_sender"],
+            "worker_replication_secret": "secret",
+        }
+    )
+    def test_proxy_requests_through_federation_sender_worker(self) -> None:
+        """
+        Test that all outbound federation requests go through the `federation_sender`
+        worker
+        """
+        # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance
+        # so we can act like some remote server responding to requests
+        mock_client_on_federation_sender = Mock()
+        mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True)
+        mock_client_on_federation_sender.agent = mock_agent_on_federation_sender
+
+        # Create the `federation_sender` worker
+        self.make_worker_hs(
+            "synapse.app.generic_worker",
+            {"worker_name": "federation_sender"},
+            federation_http_client=mock_client_on_federation_sender,
+        )
+
+        # Fake `remoteserv:8008` responding to requests
+        mock_agent_on_federation_sender.request.side_effect = (
+            lambda *args, **kwargs: defer.succeed(
+                FakeResponse.json(
+                    payload={
+                        "foo": "bar",
+                    }
+                )
+            )
+        )
+
+        # This federation request from the main process should be proxied through the
+        # `federation_sender` worker off to the remote server
+        test_request_from_main_process_d = defer.ensureDeferred(
+            self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar")
+        )
+
+        # Pump the reactor so our deferred goes through the motions
+        self.pump()
+
+        # Make sure that the request was proxied through the `federation_sender` worker
+        mock_agent_on_federation_sender.request.assert_called_once_with(
+            b"GET",
+            b"matrix-federation://remoteserv:8008/foo/bar",
+            headers=ANY,
+            bodyProducer=ANY,
+        )
+
+        # Make sure the response is as expected back on the main worker
+        res = self.successResultOf(test_request_from_main_process_d)
+        self.assertEqual(res, {"foo": "bar"})
+
+    @override_config(
+        {
+            "outbound_federation_restricted_to": ["federation_sender"],
+            "worker_replication_secret": "secret",
+        }
+    )
+    def test_proxy_request_with_network_error_through_federation_sender_worker(
+        self,
+    ) -> None:
+        """
+        Test that when the outbound federation request fails with a network related
+        error, a sensible error makes its way back to the main process.
+        """
+        # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance
+        # so we can act like some remote server responding to requests
+        mock_client_on_federation_sender = Mock()
+        mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True)
+        mock_client_on_federation_sender.agent = mock_agent_on_federation_sender
+
+        # Create the `federation_sender` worker
+        self.make_worker_hs(
+            "synapse.app.generic_worker",
+            {"worker_name": "federation_sender"},
+            federation_http_client=mock_client_on_federation_sender,
+        )
+
+        # Fake `remoteserv:8008` responding to requests
+        mock_agent_on_federation_sender.request.side_effect = (
+            lambda *args, **kwargs: defer.fail(ResponseNeverReceived("fake error"))
+        )
+
+        # This federation request from the main process should be proxied through the
+        # `federation_sender` worker off to the remote server
+        test_request_from_main_process_d = defer.ensureDeferred(
+            self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar")
+        )
+
+        # Pump the reactor so our deferred goes through the motions. We pump with 10
+        # seconds (0.1 * 100) so the `MatrixFederationHttpClient` runs out of retries
+        # and finally passes along the error response.
+        self.pump(0.1)
+
+        # Make sure that the request was proxied through the `federation_sender` worker
+        mock_agent_on_federation_sender.request.assert_called_with(
+            b"GET",
+            b"matrix-federation://remoteserv:8008/foo/bar",
+            headers=ANY,
+            bodyProducer=ANY,
+        )
+
+        # Make sure we get some sort of error back on the main worker
+        failure_res = self.failureResultOf(test_request_from_main_process_d)
+        self.assertIsInstance(failure_res.value, RequestSendFailed)
+        self.assertIsInstance(failure_res.value.inner_exception, HttpResponseException)
+        self.assertEqual(failure_res.value.inner_exception.code, 502)
+
+    @override_config(
+        {
+            "outbound_federation_restricted_to": ["federation_sender"],
+            "worker_replication_secret": "secret",
+        }
+    )
+    def test_proxy_requests_and_discards_hop_by_hop_headers(self) -> None:
+        """
+        Test to make sure hop-by-hop headers and addional headers defined in the
+        `Connection` header are discarded when proxying requests
+        """
+        # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance
+        # so we can act like some remote server responding to requests
+        mock_client_on_federation_sender = Mock()
+        mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True)
+        mock_client_on_federation_sender.agent = mock_agent_on_federation_sender
+
+        # Create the `federation_sender` worker
+        self.make_worker_hs(
+            "synapse.app.generic_worker",
+            {"worker_name": "federation_sender"},
+            federation_http_client=mock_client_on_federation_sender,
+        )
+
+        # Fake `remoteserv:8008` responding to requests
+        mock_agent_on_federation_sender.request.side_effect = lambda *args, **kwargs: defer.succeed(
+            FakeResponse(
+                code=200,
+                body=b'{"foo": "bar"}',
+                headers=Headers(
+                    {
+                        "Content-Type": ["application/json"],
+                        "Connection": ["close, X-Foo, X-Bar"],
+                        # Should be removed because it's defined in the `Connection` header
+                        "X-Foo": ["foo"],
+                        "X-Bar": ["bar"],
+                        # Should be removed because it's a hop-by-hop header
+                        "Proxy-Authorization": "abcdef",
+                    }
+                ),
+            )
+        )
+
+        # This federation request from the main process should be proxied through the
+        # `federation_sender` worker off to the remote server
+        test_request_from_main_process_d = defer.ensureDeferred(
+            self.hs.get_federation_http_client().get_json_with_headers(
+                "remoteserv:8008", "foo/bar"
+            )
+        )
+
+        # Pump the reactor so our deferred goes through the motions
+        self.pump()
+
+        # Make sure that the request was proxied through the `federation_sender` worker
+        mock_agent_on_federation_sender.request.assert_called_once_with(
+            b"GET",
+            b"matrix-federation://remoteserv:8008/foo/bar",
+            headers=ANY,
+            bodyProducer=ANY,
+        )
+
+        res, headers = self.successResultOf(test_request_from_main_process_d)
+        header_names = set(headers.keys())
+
+        # Make sure the response does not include the hop-by-hop headers
+        self.assertNotIn(b"X-Foo", header_names)
+        self.assertNotIn(b"X-Bar", header_names)
+        self.assertNotIn(b"Proxy-Authorization", header_names)
+        # Make sure the response is as expected back on the main worker
+        self.assertEqual(res, {"foo": "bar"})
+
+    @override_config(
+        {
+            "outbound_federation_restricted_to": ["federation_sender"],
+            # `worker_replication_secret` is set here so that the test setup is able to pass
+            # but the actual homserver creation test is in the test body below
+            "worker_replication_secret": "secret",
+        }
+    )
+    def test_not_able_to_proxy_requests_through_federation_sender_worker_when_no_secret_configured(
+        self,
+    ) -> None:
+        """
+        Test that we aren't able to proxy any outbound federation requests when
+        `worker_replication_secret` is not configured.
+        """
+        with self.assertRaises(ConfigError):
+            # Create the `federation_sender` worker
+            self.make_worker_hs(
+                "synapse.app.generic_worker",
+                {
+                    "worker_name": "federation_sender",
+                    # Test that we aren't able to proxy any outbound federation requests
+                    # when `worker_replication_secret` is not configured.
+                    "worker_replication_secret": None,
+                },
+            )
+
+    @override_config(
+        {
+            "outbound_federation_restricted_to": ["federation_sender"],
+            "worker_replication_secret": "secret",
+        }
+    )
+    def test_not_able_to_proxy_requests_through_federation_sender_worker_when_wrong_auth_given(
+        self,
+    ) -> None:
+        """
+        Test that we aren't able to proxy any outbound federation requests when the
+        wrong authorization is given.
+        """
+        # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance
+        # so we can act like some remote server responding to requests
+        mock_client_on_federation_sender = Mock()
+        mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True)
+        mock_client_on_federation_sender.agent = mock_agent_on_federation_sender
+
+        # Create the `federation_sender` worker
+        self.make_worker_hs(
+            "synapse.app.generic_worker",
+            {
+                "worker_name": "federation_sender",
+                # Test that we aren't able to proxy any outbound federation requests
+                # when `worker_replication_secret` is wrong.
+                "worker_replication_secret": "wrong",
+            },
+            federation_http_client=mock_client_on_federation_sender,
+        )
+
+        # This federation request from the main process should be proxied through the
+        # `federation_sender` worker off but will fail here because it's using the wrong
+        # authorization.
+        test_request_from_main_process_d = defer.ensureDeferred(
+            self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar")
+        )
+
+        # Pump the reactor so our deferred goes through the motions. We pump with 10
+        # seconds (0.1 * 100) so the `MatrixFederationHttpClient` runs out of retries
+        # and finally passes along the error response.
+        self.pump(0.1)
+
+        # Make sure that the request was *NOT* proxied through the `federation_sender`
+        # worker
+        mock_agent_on_federation_sender.request.assert_not_called()
+
+        failure_res = self.failureResultOf(test_request_from_main_process_d)
+        self.assertIsInstance(failure_res.value, HttpResponseException)
+        self.assertEqual(failure_res.value.code, 401)
diff --git a/tests/http/test_proxy.py b/tests/http/test_proxy.py
new file mode 100644
index 0000000000..0dc9ba8e05
--- /dev/null
+++ b/tests/http/test_proxy.py
@@ -0,0 +1,53 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import Set
+
+from parameterized import parameterized
+
+from synapse.http.proxy import parse_connection_header_value
+
+from tests.unittest import TestCase
+
+
+class ProxyTests(TestCase):
+    @parameterized.expand(
+        [
+            [b"close, X-Foo, X-Bar", {"Close", "X-Foo", "X-Bar"}],
+            # No whitespace
+            [b"close,X-Foo,X-Bar", {"Close", "X-Foo", "X-Bar"}],
+            # More whitespace
+            [b"close,    X-Foo,      X-Bar", {"Close", "X-Foo", "X-Bar"}],
+            # "close" directive in not the first position
+            [b"X-Foo, X-Bar, close", {"X-Foo", "X-Bar", "Close"}],
+            # Normalizes header capitalization
+            [b"keep-alive, x-fOo, x-bAr", {"Keep-Alive", "X-Foo", "X-Bar"}],
+            # Handles header names with whitespace
+            [
+                b"keep-alive, x  foo, x bar",
+                {"Keep-Alive", "X  foo", "X bar"},
+            ],
+        ]
+    )
+    def test_parse_connection_header_value(
+        self,
+        connection_header_value: bytes,
+        expected_extra_headers_to_remove: Set[str],
+    ) -> None:
+        """
+        Tests that the connection header value is parsed correctly
+        """
+        self.assertEqual(
+            expected_extra_headers_to_remove,
+            parse_connection_header_value(connection_header_value),
+        )
diff --git a/tests/http/test_proxyagent.py b/tests/http/test_proxyagent.py
index e0ae5a88ff..8164b0b78e 100644
--- a/tests/http/test_proxyagent.py
+++ b/tests/http/test_proxyagent.py
@@ -33,7 +33,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory, TLSMemoryBIOProtocol
 from twisted.web.http import HTTPChannel
 
 from synapse.http.client import BlocklistingReactorWrapper
-from synapse.http.connectproxyclient import ProxyCredentials
+from synapse.http.connectproxyclient import BasicProxyCredentials
 from synapse.http.proxyagent import ProxyAgent, parse_proxy
 
 from tests.http import (
@@ -205,7 +205,7 @@ class ProxyParserTests(TestCase):
         """
         proxy_cred = None
         if expected_credentials:
-            proxy_cred = ProxyCredentials(expected_credentials)
+            proxy_cred = BasicProxyCredentials(expected_credentials)
         self.assertEqual(
             (
                 expected_scheme,
diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index 39aadb9ed5..6712ac485d 100644
--- a/tests/replication/_base.py
+++ b/tests/replication/_base.py
@@ -70,10 +70,10 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
         # Make a new HomeServer object for the worker
         self.reactor.lookups["testserv"] = "1.2.3.4"
         self.worker_hs = self.setup_test_homeserver(
-            federation_http_client=None,
             homeserver_to_use=GenericWorkerServer,
             config=self._get_worker_hs_config(),
             reactor=self.reactor,
+            federation_http_client=None,
         )
 
         # Since we use sqlite in memory databases we need to make sure the
@@ -385,6 +385,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
             server_version_string="1",
             max_request_body_size=8192,
             reactor=self.reactor,
+            hs=worker_hs,
         )
 
         worker_hs.get_replication_command_handler().start_replication(worker_hs)
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index 08703206a9..a324b4d31d 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -14,14 +14,18 @@
 import logging
 from unittest.mock import Mock
 
+from netaddr import IPSet
+
 from synapse.api.constants import EventTypes, Membership
 from synapse.events.builder import EventBuilderFactory
 from synapse.handlers.typing import TypingWriterHandler
+from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
 from synapse.rest.admin import register_servlets_for_client_rest_resource
 from synapse.rest.client import login, room
 from synapse.types import UserID, create_requester
 
 from tests.replication._base import BaseMultiWorkerStreamTestCase
+from tests.server import get_clock
 from tests.test_utils import make_awaitable
 
 logger = logging.getLogger(__name__)
@@ -41,13 +45,25 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         room.register_servlets,
     ]
 
+    def setUp(self) -> None:
+        super().setUp()
+
+        reactor, _ = get_clock()
+        self.matrix_federation_agent = MatrixFederationAgent(
+            reactor,
+            tls_client_options_factory=None,
+            user_agent=b"SynapseInTrialTest/0.0.0",
+            ip_allowlist=None,
+            ip_blocklist=IPSet(),
+        )
+
     def test_send_event_single_sender(self) -> None:
         """Test that using a single federation sender worker correctly sends a
         new event.
         """
         mock_client = Mock(spec=["put_json"])
         mock_client.put_json.return_value = make_awaitable({})
-
+        mock_client.agent = self.matrix_federation_agent
         self.make_worker_hs(
             "synapse.app.generic_worker",
             {
@@ -78,6 +94,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         """
         mock_client1 = Mock(spec=["put_json"])
         mock_client1.put_json.return_value = make_awaitable({})
+        mock_client1.agent = self.matrix_federation_agent
         self.make_worker_hs(
             "synapse.app.generic_worker",
             {
@@ -92,6 +109,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
 
         mock_client2 = Mock(spec=["put_json"])
         mock_client2.put_json.return_value = make_awaitable({})
+        mock_client2.agent = self.matrix_federation_agent
         self.make_worker_hs(
             "synapse.app.generic_worker",
             {
@@ -145,6 +163,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         """
         mock_client1 = Mock(spec=["put_json"])
         mock_client1.put_json.return_value = make_awaitable({})
+        mock_client1.agent = self.matrix_federation_agent
         self.make_worker_hs(
             "synapse.app.generic_worker",
             {
@@ -159,6 +178,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
 
         mock_client2 = Mock(spec=["put_json"])
         mock_client2.put_json.return_value = make_awaitable({})
+        mock_client2.agent = self.matrix_federation_agent
         self.make_worker_hs(
             "synapse.app.generic_worker",
             {
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 6f7b4bf642..9af9db6e3e 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -1418,7 +1418,7 @@ class DeactivateAccountTestCase(unittest.HomeserverTestCase):
         # To test deactivation for users without a profile, we delete the profile information for our user.
         self.get_success(
             self.store.db_pool.simple_delete_one(
-                table="profiles", keyvalues={"user_id": "user"}
+                table="profiles", keyvalues={"full_user_id": "@user:test"}
             )
         )
 
diff --git a/tests/rest/client/test_devices.py b/tests/rest/client/test_devices.py
index d80eea17d3..b7d420cfec 100644
--- a/tests/rest/client/test_devices.py
+++ b/tests/rest/client/test_devices.py
@@ -13,12 +13,14 @@
 # limitations under the License.
 from http import HTTPStatus
 
+from twisted.internet.defer import ensureDeferred
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.api.errors import NotFoundError
 from synapse.rest import admin, devices, room, sync
-from synapse.rest.client import account, login, register
+from synapse.rest.client import account, keys, login, register
 from synapse.server import HomeServer
+from synapse.types import JsonDict, create_requester
 from synapse.util import Clock
 
 from tests import unittest
@@ -208,8 +210,13 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase):
         login.register_servlets,
         register.register_servlets,
         devices.register_servlets,
+        keys.register_servlets,
     ]
 
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.registration = hs.get_registration_handler()
+        self.message_handler = hs.get_device_message_handler()
+
     def test_PUT(self) -> None:
         """Sanity-check that we can PUT a dehydrated device.
 
@@ -226,7 +233,21 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase):
                 "device_data": {
                     "algorithm": "org.matrix.msc2697.v1.dehydration.v1.olm",
                     "account": "dehydrated_device",
-                }
+                },
+                "device_keys": {
+                    "user_id": "@alice:test",
+                    "device_id": "device1",
+                    "valid_until_ts": "80",
+                    "algorithms": [
+                        "m.olm.curve25519-aes-sha2",
+                    ],
+                    "keys": {
+                        "<algorithm>:<device_id>": "<key_base64>",
+                    },
+                    "signatures": {
+                        "<user_id>": {"<algorithm>:<device_id>": "<signature_base64>"}
+                    },
+                },
             },
             access_token=token,
             shorthand=False,
@@ -234,3 +255,128 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase):
         self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
         device_id = channel.json_body.get("device_id")
         self.assertIsInstance(device_id, str)
+
+    @unittest.override_config(
+        {"experimental_features": {"msc2697_enabled": False, "msc3814_enabled": True}}
+    )
+    def test_dehydrate_msc3814(self) -> None:
+        user = self.register_user("mikey", "pass")
+        token = self.login(user, "pass", device_id="device1")
+        content: JsonDict = {
+            "device_data": {
+                "algorithm": "m.dehydration.v1.olm",
+            },
+            "device_id": "device1",
+            "initial_device_display_name": "foo bar",
+            "device_keys": {
+                "user_id": "@mikey:test",
+                "device_id": "device1",
+                "valid_until_ts": "80",
+                "algorithms": [
+                    "m.olm.curve25519-aes-sha2",
+                ],
+                "keys": {
+                    "<algorithm>:<device_id>": "<key_base64>",
+                },
+                "signatures": {
+                    "<user_id>": {"<algorithm>:<device_id>": "<signature_base64>"}
+                },
+            },
+        }
+        channel = self.make_request(
+            "PUT",
+            "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device",
+            content=content,
+            access_token=token,
+            shorthand=False,
+        )
+        self.assertEqual(channel.code, 200)
+        device_id = channel.json_body.get("device_id")
+        assert device_id is not None
+        self.assertIsInstance(device_id, str)
+        self.assertEqual("device1", device_id)
+
+        # test that we can now GET the dehydrated device info
+        channel = self.make_request(
+            "GET",
+            "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device",
+            access_token=token,
+            shorthand=False,
+        )
+        self.assertEqual(channel.code, 200)
+        returned_device_id = channel.json_body.get("device_id")
+        self.assertEqual(returned_device_id, device_id)
+        device_data = channel.json_body.get("device_data")
+        expected_device_data = {
+            "algorithm": "m.dehydration.v1.olm",
+        }
+        self.assertEqual(device_data, expected_device_data)
+
+        # create another device for the user
+        (
+            new_device_id,
+            _,
+            _,
+            _,
+        ) = self.get_success(
+            self.registration.register_device(
+                user_id=user,
+                device_id=None,
+                initial_display_name="new device",
+            )
+        )
+        requester = create_requester(user, device_id=new_device_id)
+
+        # Send a message to the dehydrated device
+        ensureDeferred(
+            self.message_handler.send_device_message(
+                requester=requester,
+                message_type="test.message",
+                messages={user: {device_id: {"body": "test_message"}}},
+            )
+        )
+        self.pump()
+
+        # make sure we can fetch the message with our dehydrated device id
+        channel = self.make_request(
+            "POST",
+            f"_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device/{device_id}/events",
+            content={},
+            access_token=token,
+            shorthand=False,
+        )
+        self.assertEqual(channel.code, 200)
+        expected_content = {"body": "test_message"}
+        self.assertEqual(channel.json_body["events"][0]["content"], expected_content)
+        next_batch_token = channel.json_body.get("next_batch")
+
+        # fetch messages again and make sure that the message was deleted and we are returned an
+        # empty array
+        content = {"next_batch": next_batch_token}
+        channel = self.make_request(
+            "POST",
+            f"_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device/{device_id}/events",
+            content=content,
+            access_token=token,
+            shorthand=False,
+        )
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(channel.json_body["events"], [])
+
+        # make sure we can delete the dehydrated device
+        channel = self.make_request(
+            "DELETE",
+            "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device",
+            access_token=token,
+            shorthand=False,
+        )
+        self.assertEqual(channel.code, 200)
+
+        # ...and after deleting it is no longer available
+        channel = self.make_request(
+            "GET",
+            "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device",
+            access_token=token,
+            shorthand=False,
+        )
+        self.assertEqual(channel.code, 404)
diff --git a/tests/rest/client/test_presence.py b/tests/rest/client/test_presence.py
index dcbb125a3b..e12098102b 100644
--- a/tests/rest/client/test_presence.py
+++ b/tests/rest/client/test_presence.py
@@ -40,7 +40,6 @@ class PresenceTestCase(unittest.HomeserverTestCase):
 
         hs = self.setup_test_homeserver(
             "red",
-            federation_http_client=None,
             federation_client=Mock(),
             presence_handler=self.presence_handler,
         )
diff --git a/tests/rest/client/test_redactions.py b/tests/rest/client/test_redactions.py
index b43e95292c..6028886bd6 100644
--- a/tests/rest/client/test_redactions.py
+++ b/tests/rest/client/test_redactions.py
@@ -20,6 +20,8 @@ from synapse.api.room_versions import RoomVersions
 from synapse.rest import admin
 from synapse.rest.client import login, room, sync
 from synapse.server import HomeServer
+from synapse.storage._base import db_to_json
+from synapse.storage.database import LoggingTransaction
 from synapse.types import JsonDict
 from synapse.util import Clock
 
@@ -573,7 +575,7 @@ class RedactionsTestCase(HomeserverTestCase):
         room_id = self.helper.create_room_as(
             self.mod_user_id,
             tok=self.mod_access_token,
-            room_version=RoomVersions.MSC2176.identifier,
+            room_version=RoomVersions.V11.identifier,
         )
 
         # Create an event.
@@ -597,5 +599,20 @@ class RedactionsTestCase(HomeserverTestCase):
         redact_event = timeline[-1]
         self.assertEqual(redact_event["type"], EventTypes.Redaction)
         # The redacts key should be in the content.
-        self.assertNotIn("redacts", redact_event)
         self.assertEquals(redact_event["content"]["redacts"], event_id)
+
+        # It should also be copied as the top-level redacts field for backwards
+        # compatibility.
+        self.assertEquals(redact_event["redacts"], event_id)
+
+        # But it isn't actually part of the event.
+        def get_event(txn: LoggingTransaction) -> JsonDict:
+            return db_to_json(
+                main_datastore._fetch_event_rows(txn, [event_id])[event_id].json
+            )
+
+        main_datastore = self.hs.get_datastores().main
+        event_json = self.get_success(
+            main_datastore.db_pool.runInteraction("get_event", get_event)
+        )
+        self.assertNotIn("redacts", event_json)
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index f1b4e1ad2f..d013e75d55 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -67,8 +67,6 @@ class RoomBase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
         self.hs = self.setup_test_homeserver(
             "red",
-            federation_http_client=None,
-            federation_client=Mock(),
         )
 
         self.hs.get_federation_handler = Mock()  # type: ignore[assignment]
diff --git a/tests/storage/test_e2e_room_keys.py b/tests/storage/test_e2e_room_keys.py
index 9cb326d90a..f6df31aba4 100644
--- a/tests/storage/test_e2e_room_keys.py
+++ b/tests/storage/test_e2e_room_keys.py
@@ -31,7 +31,7 @@ room_key: RoomKey = {
 
 class E2eRoomKeysHandlerTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-        hs = self.setup_test_homeserver("server", federation_http_client=None)
+        hs = self.setup_test_homeserver("server")
         self.store = hs.get_datastores().main
         return hs
 
diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py
index bbe8bd88bc..fe5bb77913 100644
--- a/tests/storage/test_profile.py
+++ b/tests/storage/test_profile.py
@@ -15,6 +15,8 @@
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.server import HomeServer
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import PostgresEngine
 from synapse.types import UserID
 from synapse.util import Clock
 
@@ -62,3 +64,64 @@ class ProfileStoreTestCase(unittest.HomeserverTestCase):
         self.assertIsNone(
             self.get_success(self.store.get_profile_avatar_url(self.u_frank))
         )
+
+    def test_profiles_bg_migration(self) -> None:
+        """
+        Test background job that copies entries from column user_id to full_user_id, adding
+        the hostname in the process.
+        """
+        updater = self.hs.get_datastores().main.db_pool.updates
+
+        # drop the constraint so we can insert nulls in full_user_id to populate the test
+        if isinstance(self.store.database_engine, PostgresEngine):
+
+            def f(txn: LoggingTransaction) -> None:
+                txn.execute(
+                    "ALTER TABLE profiles DROP CONSTRAINT full_user_id_not_null"
+                )
+
+            self.get_success(self.store.db_pool.runInteraction("", f))
+
+        for i in range(0, 70):
+            self.get_success(
+                self.store.db_pool.simple_insert(
+                    "profiles",
+                    {"user_id": f"hello{i:02}"},
+                )
+            )
+
+        # re-add the constraint so that when it's validated it actually exists
+        if isinstance(self.store.database_engine, PostgresEngine):
+
+            def f(txn: LoggingTransaction) -> None:
+                txn.execute(
+                    "ALTER TABLE profiles ADD CONSTRAINT full_user_id_not_null CHECK (full_user_id IS NOT NULL) NOT VALID"
+                )
+
+            self.get_success(self.store.db_pool.runInteraction("", f))
+
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                "background_updates",
+                values={
+                    "update_name": "populate_full_user_id_profiles",
+                    "progress_json": "{}",
+                },
+            )
+        )
+
+        self.get_success(
+            updater.run_background_updates(False),
+        )
+
+        expected_values = []
+        for i in range(0, 70):
+            expected_values.append((f"@hello{i:02}:{self.hs.hostname}",))
+
+        res = self.get_success(
+            self.store.db_pool.execute(
+                "", None, "SELECT full_user_id from profiles ORDER BY full_user_id"
+            )
+        )
+        self.assertEqual(len(res), len(expected_values))
+        self.assertEqual(res, expected_values)
diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py
index 857e2caf2e..0282673167 100644
--- a/tests/storage/test_purge.py
+++ b/tests/storage/test_purge.py
@@ -27,7 +27,7 @@ class PurgeTests(HomeserverTestCase):
     servlets = [room.register_servlets]
 
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-        hs = self.setup_test_homeserver("server", federation_http_client=None)
+        hs = self.setup_test_homeserver("server")
         return hs
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py
index 6861d3a6c9..809c9f175d 100644
--- a/tests/storage/test_rollback_worker.py
+++ b/tests/storage/test_rollback_worker.py
@@ -45,9 +45,7 @@ def fake_listdir(filepath: str) -> List[str]:
 
 class WorkerSchemaTests(HomeserverTestCase):
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
-        hs = self.setup_test_homeserver(
-            federation_http_client=None, homeserver_to_use=GenericWorkerServer
-        )
+        hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
         return hs
 
     def default_config(self) -> JsonDict:
diff --git a/tests/storage/test_user_filters.py b/tests/storage/test_user_filters.py
new file mode 100644
index 0000000000..bab802f56e
--- /dev/null
+++ b/tests/storage/test_user_filters.py
@@ -0,0 +1,94 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.server import HomeServer
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import PostgresEngine
+from synapse.util import Clock
+
+from tests import unittest
+
+
+class UserFiltersStoreTestCase(unittest.HomeserverTestCase):
+    """
+    Test background migration that copies entries from column user_id to full_user_id, adding
+    the hostname in the process.
+    """
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+
+    def test_bg_migration(self) -> None:
+        updater = self.hs.get_datastores().main.db_pool.updates
+
+        # drop the constraint so we can insert nulls in full_user_id to populate the test
+        if isinstance(self.store.database_engine, PostgresEngine):
+
+            def f(txn: LoggingTransaction) -> None:
+                txn.execute(
+                    "ALTER TABLE user_filters DROP CONSTRAINT full_user_id_not_null"
+                )
+
+            self.get_success(self.store.db_pool.runInteraction("", f))
+
+        for i in range(0, 70):
+            self.get_success(
+                self.store.db_pool.simple_insert(
+                    "user_filters",
+                    {
+                        "user_id": f"hello{i:02}",
+                        "filter_id": i,
+                        "filter_json": bytearray(i),
+                    },
+                )
+            )
+
+        # re-add the constraint so that when it's validated it actually exists
+        if isinstance(self.store.database_engine, PostgresEngine):
+
+            def f(txn: LoggingTransaction) -> None:
+                txn.execute(
+                    "ALTER TABLE user_filters ADD CONSTRAINT full_user_id_not_null CHECK (full_user_id IS NOT NULL) NOT VALID"
+                )
+
+            self.get_success(self.store.db_pool.runInteraction("", f))
+
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                "background_updates",
+                values={
+                    "update_name": "populate_full_user_id_user_filters",
+                    "progress_json": "{}",
+                },
+            )
+        )
+
+        self.get_success(
+            updater.run_background_updates(False),
+        )
+
+        expected_values = []
+        for i in range(0, 70):
+            expected_values.append((f"@hello{i:02}:{self.hs.hostname}",))
+
+        res = self.get_success(
+            self.store.db_pool.execute(
+                "", None, "SELECT full_user_id from user_filters ORDER BY full_user_id"
+            )
+        )
+        self.assertEqual(len(res), len(expected_values))
+        self.assertEqual(res, expected_values)
diff --git a/tests/test_server.py b/tests/test_server.py
index dc491e06ed..36162cd1f5 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -38,7 +38,7 @@ from tests.http.server._base import test_disconnect
 from tests.server import (
     FakeChannel,
     FakeSite,
-    ThreadedMemoryReactorClock,
+    get_clock,
     make_request,
     setup_test_homeserver,
 )
@@ -46,12 +46,11 @@ from tests.server import (
 
 class JsonResourceTests(unittest.TestCase):
     def setUp(self) -> None:
-        self.reactor = ThreadedMemoryReactorClock()
-        self.hs_clock = Clock(self.reactor)
+        reactor, clock = get_clock()
+        self.reactor = reactor
         self.homeserver = setup_test_homeserver(
             self.addCleanup,
-            federation_http_client=None,
-            clock=self.hs_clock,
+            clock=clock,
             reactor=self.reactor,
         )
 
@@ -209,7 +208,13 @@ class JsonResourceTests(unittest.TestCase):
 
 class OptionsResourceTests(unittest.TestCase):
     def setUp(self) -> None:
-        self.reactor = ThreadedMemoryReactorClock()
+        reactor, clock = get_clock()
+        self.reactor = reactor
+        self.homeserver = setup_test_homeserver(
+            self.addCleanup,
+            clock=clock,
+            reactor=self.reactor,
+        )
 
         class DummyResource(Resource):
             isLeaf = True
@@ -242,6 +247,7 @@ class OptionsResourceTests(unittest.TestCase):
             "1.0",
             max_request_body_size=4096,
             reactor=self.reactor,
+            hs=self.homeserver,
         )
 
         # render the request and return the channel
@@ -344,7 +350,8 @@ class WrapHtmlRequestHandlerTests(unittest.TestCase):
             await self.callback(request)
 
     def setUp(self) -> None:
-        self.reactor = ThreadedMemoryReactorClock()
+        reactor, _ = get_clock()
+        self.reactor = reactor
 
     def test_good_response(self) -> None:
         async def callback(request: SynapseRequest) -> None:
@@ -462,9 +469,9 @@ class DirectServeJsonResourceCancellationTests(unittest.TestCase):
     """Tests for `DirectServeJsonResource` cancellation."""
 
     def setUp(self) -> None:
-        self.reactor = ThreadedMemoryReactorClock()
-        self.clock = Clock(self.reactor)
-        self.resource = CancellableDirectServeJsonResource(self.clock)
+        reactor, clock = get_clock()
+        self.reactor = reactor
+        self.resource = CancellableDirectServeJsonResource(clock)
         self.site = FakeSite(self.resource, self.reactor)
 
     def test_cancellable_disconnect(self) -> None:
@@ -496,9 +503,9 @@ class DirectServeHtmlResourceCancellationTests(unittest.TestCase):
     """Tests for `DirectServeHtmlResource` cancellation."""
 
     def setUp(self) -> None:
-        self.reactor = ThreadedMemoryReactorClock()
-        self.clock = Clock(self.reactor)
-        self.resource = CancellableDirectServeHtmlResource(self.clock)
+        reactor, clock = get_clock()
+        self.reactor = reactor
+        self.resource = CancellableDirectServeHtmlResource(clock)
         self.site = FakeSite(self.resource, self.reactor)
 
     def test_cancellable_disconnect(self) -> None:
diff --git a/tests/unittest.py b/tests/unittest.py
index c73195b32b..b0721e060c 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -358,6 +358,7 @@ class HomeserverTestCase(TestCase):
             server_version_string="1",
             max_request_body_size=4096,
             reactor=self.reactor,
+            hs=self.hs,
         )
 
         from tests.rest.client.utils import RestHelper