summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-x.ci/scripts/setup_complement_prerequisites.sh2
-rw-r--r--.github/workflows/latest_deps.yml2
-rw-r--r--.github/workflows/tests.yml24
-rw-r--r--.github/workflows/twisted_trunk.yml2
-rw-r--r--changelog.d/14255.misc1
-rw-r--r--changelog.d/14435.bugfix1
-rw-r--r--changelog.d/14473.misc1
-rw-r--r--changelog.d/14490.feature1
-rw-r--r--changelog.d/14493.doc1
-rw-r--r--changelog.d/14517.doc1
-rw-r--r--changelog.d/14525.feature1
-rw-r--r--changelog.d/14528.misc1
-rw-r--r--changelog.d/14549.misc1
-rw-r--r--changelog.d/14551.feature1
-rw-r--r--changelog.d/14568.misc1
-rw-r--r--changelog.d/14576.feature1
-rw-r--r--changelog.d/14591.misc1
-rw-r--r--changelog.d/14592.bugfix1
-rw-r--r--changelog.d/14597.misc1
-rw-r--r--changelog.d/14598.feature1
-rw-r--r--changelog.d/14600.bugfix1
-rw-r--r--changelog.d/14602.misc1
-rw-r--r--changelog.d/14604.bugfix1
-rw-r--r--changelog.d/14607.misc1
-rw-r--r--changelog.d/14611.misc1
-rw-r--r--changelog.d/14612.misc1
-rw-r--r--changelog.d/14613.misc1
-rw-r--r--changelog.d/14614.misc1
-rw-r--r--changelog.d/14615.misc1
-rw-r--r--changelog.d/14616.misc1
-rw-r--r--changelog.d/14619.doc1
-rw-r--r--changelog.d/14620.bugfix1
-rw-r--r--docs/setup/installation.md19
-rw-r--r--docs/usage/configuration/config_documentation.md80
-rw-r--r--docs/workers.md8
-rw-r--r--mypy.ini13
-rw-r--r--poetry.lock114
-rw-r--r--pyproject.toml3
-rw-r--r--rust/benches/evaluator.rs16
-rw-r--r--rust/src/push/evaluator.rs22
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/app/generic_worker.py74
-rw-r--r--synapse/config/push.py1
-rw-r--r--synapse/crypto/keyring.py107
-rw-r--r--synapse/federation/federation_client.py29
-rw-r--r--synapse/federation/sender/__init__.py2
-rw-r--r--synapse/federation/sender/per_destination_queue.py2
-rw-r--r--synapse/handlers/appservice.py7
-rw-r--r--synapse/handlers/device.py2
-rw-r--r--synapse/handlers/devicemessage.py36
-rw-r--r--synapse/handlers/federation.py4
-rw-r--r--synapse/handlers/presence.py16
-rw-r--r--synapse/handlers/sync.py32
-rw-r--r--synapse/handlers/typing.py8
-rw-r--r--synapse/logging/opentracing.py11
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py3
-rw-r--r--synapse/push/push_tools.py28
-rw-r--r--synapse/replication/tcp/streams/__init__.py3
-rw-r--r--synapse/replication/tcp/streams/partial_state.py48
-rw-r--r--synapse/rest/__init__.py59
-rw-r--r--synapse/rest/client/account.py26
-rw-r--r--synapse/rest/client/devices.py10
-rw-r--r--synapse/rest/client/keys.py5
-rw-r--r--synapse/rest/client/register.py9
-rw-r--r--synapse/rest/client/room.py20
-rw-r--r--synapse/rest/client/sendtodevice.py1
-rw-r--r--synapse/rest/client/versions.py1
-rw-r--r--synapse/storage/databases/main/deviceinbox.py92
-rw-r--r--synapse/storage/databases/main/devices.py96
-rw-r--r--synapse/storage/databases/main/event_push_actions.py149
-rw-r--r--synapse/storage/databases/main/room.py237
-rw-r--r--synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql32
-rw-r--r--synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres20
-rw-r--r--synapse/util/caches/stream_change_cache.py182
-rw-r--r--tests/crypto/test_keyring.py14
-rw-r--r--tests/events/test_presence_router.py16
-rw-r--r--tests/federation/test_federation_catch_up.py21
-rw-r--r--tests/federation/test_federation_sender.py31
-rw-r--r--tests/handlers/test_appservice.py7
-rw-r--r--tests/handlers/test_presence.py3
-rw-r--r--tests/handlers/test_typing.py6
-rw-r--r--tests/handlers/test_user_directory.py7
-rw-r--r--tests/module_api/test_api.py3
-rw-r--r--tests/push/test_bulk_push_rule_evaluator.py45
-rw-r--r--tests/push/test_email.py1
-rw-r--r--tests/push/test_http.py7
-rw-r--r--tests/replication/_base.py2
-rw-r--r--tests/replication/tcp/streams/test_federation.py5
-rw-r--r--tests/replication/test_auth.py4
-rw-r--r--tests/replication/test_client_reader_shard.py14
-rw-r--r--tests/replication/test_federation_ack.py5
-rw-r--r--tests/replication/test_federation_sender_shard.py59
-rw-r--r--tests/replication/test_pusher_shard.py15
-rw-r--r--tests/rest/key/v2/test_remote_key_resource.py5
-rw-r--r--tests/storage/test_event_push_actions.py47
-rw-r--r--tests/util/test_async_helpers.py118
-rw-r--r--tests/util/test_batching_queue.py30
-rw-r--r--tests/util/test_check_dependencies.py29
-rw-r--r--tests/util/test_dict_cache.py20
-rw-r--r--tests/util/test_expiring_cache.py26
-rw-r--r--tests/util/test_file_consumer.py103
-rw-r--r--tests/util/test_itertools.py24
-rw-r--r--tests/util/test_logcontext.py86
-rw-r--r--tests/util/test_logformatter.py2
-rw-r--r--tests/util/test_lrucache.py80
-rw-r--r--tests/util/test_macaroons.py8
-rw-r--r--tests/util/test_ratelimitutils.py15
-rw-r--r--tests/util/test_retryutils.py4
-rw-r--r--tests/util/test_rwlock.py14
-rw-r--r--tests/util/test_stream_change_cache.py58
-rw-r--r--tests/util/test_stringutils.py4
-rw-r--r--tests/util/test_threepids.py16
-rw-r--r--tests/util/test_treecache.py14
-rw-r--r--tests/util/test_wheel_timer.py16
-rw-r--r--tests/utils.py8
115 files changed, 1739 insertions, 941 deletions
diff --git a/.ci/scripts/setup_complement_prerequisites.sh b/.ci/scripts/setup_complement_prerequisites.sh
index 42ef654167..3778478da6 100755
--- a/.ci/scripts/setup_complement_prerequisites.sh
+++ b/.ci/scripts/setup_complement_prerequisites.sh
@@ -21,7 +21,7 @@ endblock
 
 block Install Complement Dependencies
   sudo apt-get -qq update && sudo apt-get install -qqy libolm3 libolm-dev
-  go get -v github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest
+  go install -v github.com/gotesttools/gotestfmt/v2/cmd/gotestfmt@latest
 endblock
 
 block Install custom gotestfmt template
diff --git a/.github/workflows/latest_deps.yml b/.github/workflows/latest_deps.yml
index a7097d5eae..4bc4266c4d 100644
--- a/.github/workflows/latest_deps.yml
+++ b/.github/workflows/latest_deps.yml
@@ -208,7 +208,7 @@ jobs:
 
     steps:
       - uses: actions/checkout@v3
-      - uses: JasonEtco/create-an-issue@5d9504915f79f9cc6d791934b8ef34f2353dd74d # v2.5.0, 2020-12-06
+      - uses: JasonEtco/create-an-issue@77399b6110ef82b94c1c9f9f615acf9e604f7f56 # v2.5.0, 2020-12-06
         env:
           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
         with:
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index b687eb002d..4cb2459b37 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -109,7 +109,29 @@ jobs:
             components: clippy
       - uses: Swatinem/rust-cache@v2
 
-      - run: cargo clippy
+      - run: cargo clippy -- -D warnings
+
+  # We also lint against a nightly rustc so that we can lint the benchmark
+  # suite, which requires a nightly compiler.
+  lint-clippy-nightly:
+    runs-on: ubuntu-latest
+    needs: changes
+    if: ${{ needs.changes.outputs.rust == 'true' }}
+
+    steps:
+      - uses: actions/checkout@v3
+
+      - name: Install Rust
+        # There don't seem to be versioned releases of this action per se: for each rust
+        # version there is a branch which gets constantly rebased on top of master.
+        # We pin to a specific commit for paranoia's sake.
+        uses: dtolnay/rust-toolchain@e645b0cf01249a964ec099494d38d2da0f0b349f
+        with:
+            toolchain: nightly-2022-12-01
+            components: clippy
+      - uses: Swatinem/rust-cache@v2
+
+      - run: cargo clippy --all-features -- -D warnings
 
   lint-rustfmt:
     runs-on: ubuntu-latest
diff --git a/.github/workflows/twisted_trunk.yml b/.github/workflows/twisted_trunk.yml
index bbbe52d697..262b17a20d 100644
--- a/.github/workflows/twisted_trunk.yml
+++ b/.github/workflows/twisted_trunk.yml
@@ -174,7 +174,7 @@ jobs:
 
     steps:
       - uses: actions/checkout@v3
-      - uses: JasonEtco/create-an-issue@5d9504915f79f9cc6d791934b8ef34f2353dd74d # v2.5.0, 2020-12-06
+      - uses: JasonEtco/create-an-issue@77399b6110ef82b94c1c9f9f615acf9e604f7f56 # v2.5.0, 2020-12-06
         env:
           GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
         with:
diff --git a/changelog.d/14255.misc b/changelog.d/14255.misc
new file mode 100644
index 0000000000..39924659c7
--- /dev/null
+++ b/changelog.d/14255.misc
@@ -0,0 +1 @@
+Optimise push badge count calculations. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/changelog.d/14435.bugfix b/changelog.d/14435.bugfix
new file mode 100644
index 0000000000..149ee99dd7
--- /dev/null
+++ b/changelog.d/14435.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.
diff --git a/changelog.d/14473.misc b/changelog.d/14473.misc
new file mode 100644
index 0000000000..deccd4e91a
--- /dev/null
+++ b/changelog.d/14473.misc
@@ -0,0 +1 @@
+Faster remote room joins: stream the un-partial-stating of rooms over replication.
\ No newline at end of file
diff --git a/changelog.d/14490.feature b/changelog.d/14490.feature
new file mode 100644
index 0000000000..c7cb571294
--- /dev/null
+++ b/changelog.d/14490.feature
@@ -0,0 +1 @@
+Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`.
diff --git a/changelog.d/14493.doc b/changelog.d/14493.doc
new file mode 100644
index 0000000000..e26c68ffc2
--- /dev/null
+++ b/changelog.d/14493.doc
@@ -0,0 +1 @@
+Update worker settings for `pusher` and `federation_sender` functionality.
diff --git a/changelog.d/14517.doc b/changelog.d/14517.doc
new file mode 100644
index 0000000000..2c9de68971
--- /dev/null
+++ b/changelog.d/14517.doc
@@ -0,0 +1 @@
+Add links to third party package repositories, and point to the bug which highlights Ubuntu's out-of-date packages.
diff --git a/changelog.d/14525.feature b/changelog.d/14525.feature
new file mode 100644
index 0000000000..c7cb571294
--- /dev/null
+++ b/changelog.d/14525.feature
@@ -0,0 +1 @@
+Stop using deprecated `keyIds` parameter when calling `/_matrix/key/v2/server`.
diff --git a/changelog.d/14528.misc b/changelog.d/14528.misc
new file mode 100644
index 0000000000..4f233feab6
--- /dev/null
+++ b/changelog.d/14528.misc
@@ -0,0 +1 @@
+Share the `ClientRestResource` for both workers and the main process.
diff --git a/changelog.d/14549.misc b/changelog.d/14549.misc
new file mode 100644
index 0000000000..d9d863dd20
--- /dev/null
+++ b/changelog.d/14549.misc
@@ -0,0 +1 @@
+Faster joins: use servers list approximation to send read receipts when in partial state instead of waiting for the full state of the room.
\ No newline at end of file
diff --git a/changelog.d/14551.feature b/changelog.d/14551.feature
new file mode 100644
index 0000000000..43b91d2e57
--- /dev/null
+++ b/changelog.d/14551.feature
@@ -0,0 +1 @@
+Add new `push.enabled` config option to allow opting out of push notification calculation.
\ No newline at end of file
diff --git a/changelog.d/14568.misc b/changelog.d/14568.misc
new file mode 100644
index 0000000000..99973de1c1
--- /dev/null
+++ b/changelog.d/14568.misc
@@ -0,0 +1 @@
+Modernize unit tests configuration related to workers.
diff --git a/changelog.d/14576.feature b/changelog.d/14576.feature
new file mode 100644
index 0000000000..4fe8cb2667
--- /dev/null
+++ b/changelog.d/14576.feature
@@ -0,0 +1 @@
+Advertise support for Matrix 1.5 on `/_matrix/client/versions`.
diff --git a/changelog.d/14591.misc b/changelog.d/14591.misc
new file mode 100644
index 0000000000..053d868ba6
--- /dev/null
+++ b/changelog.d/14591.misc
@@ -0,0 +1 @@
+Bump jsonschema from 4.17.0 to 4.17.3.
diff --git a/changelog.d/14592.bugfix b/changelog.d/14592.bugfix
new file mode 100644
index 0000000000..149ee99dd7
--- /dev/null
+++ b/changelog.d/14592.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.
diff --git a/changelog.d/14597.misc b/changelog.d/14597.misc
new file mode 100644
index 0000000000..d44571b731
--- /dev/null
+++ b/changelog.d/14597.misc
@@ -0,0 +1 @@
+Add missing type hints.
diff --git a/changelog.d/14598.feature b/changelog.d/14598.feature
new file mode 100644
index 0000000000..88d561e286
--- /dev/null
+++ b/changelog.d/14598.feature
@@ -0,0 +1 @@
+Improve opentracing and logging for to-device message handling.
\ No newline at end of file
diff --git a/changelog.d/14600.bugfix b/changelog.d/14600.bugfix
new file mode 100644
index 0000000000..c4bf405684
--- /dev/null
+++ b/changelog.d/14600.bugfix
@@ -0,0 +1 @@
+Suppress a spurious warning when `POST /rooms/<room_id>/<membership>/`, `POST /join/<room_id_or_alias`, or the unspecced `PUT /join/<room_id_or_alias>/<txn_id>` receive an empty HTTP request body.
diff --git a/changelog.d/14602.misc b/changelog.d/14602.misc
new file mode 100644
index 0000000000..092ba609d8
--- /dev/null
+++ b/changelog.d/14602.misc
@@ -0,0 +1 @@
+Fix Rust lint CI.
diff --git a/changelog.d/14604.bugfix b/changelog.d/14604.bugfix
new file mode 100644
index 0000000000..149ee99dd7
--- /dev/null
+++ b/changelog.d/14604.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where a device list update might not be sent to clients in certain circumstances.
diff --git a/changelog.d/14607.misc b/changelog.d/14607.misc
new file mode 100644
index 0000000000..e255eee31f
--- /dev/null
+++ b/changelog.d/14607.misc
@@ -0,0 +1 @@
+Bump JasonEtco/create-an-issue from 2.5.0 to 2.8.1.
diff --git a/changelog.d/14611.misc b/changelog.d/14611.misc
new file mode 100644
index 0000000000..e4959d00f7
--- /dev/null
+++ b/changelog.d/14611.misc
@@ -0,0 +1 @@
+Switch to Go recommended installation method for `gotestfmt` template in CI.
diff --git a/changelog.d/14612.misc b/changelog.d/14612.misc
new file mode 100644
index 0000000000..74dae5684e
--- /dev/null
+++ b/changelog.d/14612.misc
@@ -0,0 +1 @@
+Bump phonenumbers from 8.13.0 to 8.13.1.
diff --git a/changelog.d/14613.misc b/changelog.d/14613.misc
new file mode 100644
index 0000000000..c719231815
--- /dev/null
+++ b/changelog.d/14613.misc
@@ -0,0 +1 @@
+Bump types-setuptools from 65.5.0.3 to 65.6.0.1.
diff --git a/changelog.d/14614.misc b/changelog.d/14614.misc
new file mode 100644
index 0000000000..189dd156e4
--- /dev/null
+++ b/changelog.d/14614.misc
@@ -0,0 +1 @@
+Bump twine from 4.0.1 to 4.0.2.
diff --git a/changelog.d/14615.misc b/changelog.d/14615.misc
new file mode 100644
index 0000000000..9d400a6100
--- /dev/null
+++ b/changelog.d/14615.misc
@@ -0,0 +1 @@
+Bump types-requests from 2.28.11.2 to 2.28.11.5.
diff --git a/changelog.d/14616.misc b/changelog.d/14616.misc
new file mode 100644
index 0000000000..a2a57a1948
--- /dev/null
+++ b/changelog.d/14616.misc
@@ -0,0 +1 @@
+Bump cryptography from 38.0.3 to 38.0.4.
diff --git a/changelog.d/14619.doc b/changelog.d/14619.doc
new file mode 100644
index 0000000000..f25e5494c0
--- /dev/null
+++ b/changelog.d/14619.doc
@@ -0,0 +1 @@
+Add new `push.enabled` config option to allow opting out of push notification calculation.
diff --git a/changelog.d/14620.bugfix b/changelog.d/14620.bugfix
new file mode 100644
index 0000000000..cb95a87d92
--- /dev/null
+++ b/changelog.d/14620.bugfix
@@ -0,0 +1 @@
+Return spec-compliant JSON errors when unknown endpoints are requested.
diff --git a/docs/setup/installation.md b/docs/setup/installation.md
index dcd8f17c5e..436041f8a8 100644
--- a/docs/setup/installation.md
+++ b/docs/setup/installation.md
@@ -84,7 +84,9 @@ file when you upgrade the Debian package to a later version.
 
 ##### Downstream Debian packages
 
-Andrej Shadura maintains a `matrix-synapse` package in the Debian repositories.
+Andrej Shadura maintains a
+[`matrix-synapse`](https://packages.debian.org/sid/matrix-synapse) package in
+the Debian repositories.
 For `bookworm` and `sid`, it can be installed simply with:
 
 ```sh
@@ -100,23 +102,27 @@ for information on how to use backports.
 ##### Downstream Ubuntu packages
 
 We do not recommend using the packages in the default Ubuntu repository
-at this time, as they are old and suffer from known security vulnerabilities.
+at this time, as they are [old and suffer from known security vulnerabilities](
+    https://bugs.launchpad.net/ubuntu/+source/matrix-synapse/+bug/1848709
+).
 The latest version of Synapse can be installed from [our repository](#matrixorg-packages).
 
 #### Fedora
 
-Synapse is in the Fedora repositories as `matrix-synapse`:
+Synapse is in the Fedora repositories as
+[`matrix-synapse`](https://src.fedoraproject.org/rpms/matrix-synapse):
 
 ```sh
 sudo dnf install matrix-synapse
 ```
 
-Oleg Girko provides Fedora RPMs at
+Additionally, Oleg Girko provides Fedora RPMs at
 <https://obs.infoserver.lv/project/monitor/matrix-synapse>
 
 #### OpenSUSE
 
-Synapse is in the OpenSUSE repositories as `matrix-synapse`:
+Synapse is in the OpenSUSE repositories as
+[`matrix-synapse`](https://software.opensuse.org/package/matrix-synapse):
 
 ```sh
 sudo zypper install matrix-synapse
@@ -151,7 +157,8 @@ sudo pip install py-bcrypt
 
 #### Void Linux
 
-Synapse can be found in the void repositories as 'synapse':
+Synapse can be found in the void repositories as
+['synapse'](https://github.com/void-linux/void-packages/tree/master/srcpkgs/synapse):
 
 ```sh
 xbps-install -Su
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 749af12aac..dc5e5ac597 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -858,7 +858,7 @@ which are older than the room's maximum retention period. Synapse will also
 filter events received over federation so that events that should have been
 purged are ignored and not stored again.
 
-The message retention policies feature is disabled by default. Please be advised 
+The message retention policies feature is disabled by default. Please be advised
 that enabling this feature carries some risk. There are known bugs with the implementation
 which can cause database corruption. Setting retention to delete older history
 is less risky than deleting newer history but in general caution is advised when enabling this
@@ -3003,7 +3003,7 @@ Options for each entry include:
      which is set to the claims returned by the UserInfo Endpoint and/or
      in the ID Token.
 
-* `backchannel_logout_enabled`: set to `true` to process OIDC Back-Channel Logout notifications. 
+* `backchannel_logout_enabled`: set to `true` to process OIDC Back-Channel Logout notifications.
   Those notifications are expected to be received on `/_synapse/client/oidc/backchannel_logout`.
   Defaults to `false`.
 
@@ -3355,6 +3355,10 @@ Configuration settings related to push notifications
 This setting defines options for push notifications.
 
 This option has a number of sub-options. They are as follows:
+* `enabled`: Enables or disables push notification calculation. Note, disabling this will also
+   stop unread counts being calculated for rooms. This mode of operation is intended
+   for homeservers which may only have bots or appservice users connected, or are otherwise
+   not interested in push/unread counters. This is enabled by default.
 * `include_content`: Clients requesting push notifications can either have the body of
    the message sent in the notification poke along with other details
    like the sender, or just the event ID and room ID (`event_id_only`).
@@ -3375,6 +3379,7 @@ This option has a number of sub-options. They are as follows:
 Example configuration:
 ```yaml
 push:
+  enabled: true
   include_content: false
   group_unread_count_by_room: false
 ```
@@ -3420,7 +3425,7 @@ This option has the following sub-options:
     NB. If you set this to true, and the last time the user_directory search
     indexes were (re)built was before Synapse 1.44, you'll have to
     rebuild the indexes in order to search through all known users.
-    
+
     These indexes are built the first time Synapse starts; admins can
     manually trigger a rebuild via the API following the instructions
     [for running background updates](../administration/admin_api/background_updates.md#run),
@@ -3679,7 +3684,7 @@ As a result, the worker configuration is divided into two parts.
 
 1. The first part (in this section of the manual) defines which shardable tasks
    are delegated to privileged workers. This allows unprivileged workers to make
-   request a privileged worker to act on their behalf.
+   requests to a privileged worker to act on their behalf.
 1. [The second part](#individual-worker-configuration)
    controls the behaviour of individual workers in isolation.
 
@@ -3691,7 +3696,7 @@ For guidance on setting up workers, see the [worker documentation](../../workers
 A shared secret used by the replication APIs on the main process to authenticate
 HTTP requests from workers.
 
-The default, this value is omitted (equivalently `null`), which means that 
+The default, this value is omitted (equivalently `null`), which means that
 traffic between the workers and the main process is not authenticated.
 
 Example configuration:
@@ -3701,6 +3706,8 @@ worker_replication_secret: "secret_secret"
 ---
 ### `start_pushers`
 
+Unnecessary to set if using [`pusher_instances`](#pusher_instances) with [`generic_workers`](../../workers.md#synapseappgeneric_worker).
+
 Controls sending of push notifications on the main process. Set to `false`
 if using a [pusher worker](../../workers.md#synapseapppusher). Defaults to `true`.
 
@@ -3711,25 +3718,30 @@ start_pushers: false
 ---
 ### `pusher_instances`
 
-It is possible to run multiple [pusher workers](../../workers.md#synapseapppusher),
-in which case the work is balanced across them. Use this setting to list the pushers by
-[`worker_name`](#worker_name). Ensure the main process and all pusher workers are
-restarted after changing this option.
+It is possible to scale the processes that handle sending push notifications to [sygnal](https://github.com/matrix-org/sygnal)
+and email by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding it's [`worker_name`](#worker_name) to
+a `pusher_instances` map. Doing so will remove handling of this function from the main
+process. Multiple workers can be added to this map, in which case the work is balanced
+across them. Ensure the main process and all pusher workers are restarted after changing
+this option.
 
-If no or only one pusher worker is configured, this setting is not necessary.
-The main process will send out push notifications by default if you do not disable
-it by setting [`start_pushers: false`](#start_pushers).
-
-Example configuration:
+Example configuration for a single worker:
+```yaml
+pusher_instances:
+  - pusher_worker1
+```
+And for multiple workers:
 ```yaml
-start_pushers: false
 pusher_instances:
   - pusher_worker1
   - pusher_worker2
 ```
+
 ---
 ### `send_federation`
 
+Unnecessary to set if using [`federation_sender_instances`](#federation_sender_instances) with [`generic_workers`](../../workers.md#synapseappgeneric_worker).
+
 Controls sending of outbound federation transactions on the main process.
 Set to `false` if using a [federation sender worker](../../workers.md#synapseappfederation_sender).
 Defaults to `true`.
@@ -3741,29 +3753,36 @@ send_federation: false
 ---
 ### `federation_sender_instances`
 
-It is possible to run multiple
-[federation sender worker](../../workers.md#synapseappfederation_sender), in which
-case the work is balanced across them. Use this setting to list the senders.
+It is possible to scale the processes that handle sending outbound federation requests
+by running a [`generic_worker`](../../workers.md#synapseappgeneric_worker) and adding it's [`worker_name`](#worker_name) to
+a `federation_sender_instances` map. Doing so will remove handling of this function from
+the main process. Multiple workers can be added to this map, in which case the work is
+balanced across them.
 
-This configuration setting must be shared between all federation sender workers, and if
-changed all federation sender workers must be stopped at the same time and then
-started, to ensure that all instances are running with the same config (otherwise
+This configuration setting must be shared between all workers handling federation
+sending, and if changed all federation sender workers must be stopped at the same time
+and then started, to ensure that all instances are running with the same config (otherwise
 events may be dropped).
 
-Example configuration:
+Example configuration for a single worker:
 ```yaml
-send_federation: false
 federation_sender_instances:
   - federation_sender1
 ```
+And for multiple workers:
+```yaml
+federation_sender_instances:
+  - federation_sender1
+  - federation_sender2
+```
 ---
 ### `instance_map`
 
 When using workers this should be a map from [`worker_name`](#worker_name) to the
 HTTP replication listener of the worker, if configured.
-Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs 
+Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs
 a HTTP replication listener, and that listener should be included in the `instance_map`.
-(The main process also needs an HTTP replication listener, but it should not be 
+(The main process also needs an HTTP replication listener, but it should not be
 listed in the `instance_map`.)
 
 Example configuration:
@@ -3897,8 +3916,8 @@ worker_replication_http_tls: true
 ---
 ### `worker_listeners`
 
-A worker can handle HTTP requests. To do so, a `worker_listeners` option 
-must be declared, in the same way as the [`listeners` option](#listeners) 
+A worker can handle HTTP requests. To do so, a `worker_listeners` option
+must be declared, in the same way as the [`listeners` option](#listeners)
 in the shared config.
 
 Workers declared in [`stream_writers`](#stream_writers) will need to include a
@@ -3917,7 +3936,7 @@ worker_listeners:
 ### `worker_daemonize`
 
 Specifies whether the worker should be started as a daemon process.
-If Synapse is being managed by [systemd](../../systemd-with-workers/README.md), this option 
+If Synapse is being managed by [systemd](../../systemd-with-workers/README.md), this option
 must be omitted or set to `false`.
 
 Defaults to `false`.
@@ -3929,11 +3948,11 @@ worker_daemonize: true
 ---
 ### `worker_pid_file`
 
-When running a worker as a daemon, we need a place to store the 
+When running a worker as a daemon, we need a place to store the
 [PID](https://en.wikipedia.org/wiki/Process_identifier) of the worker.
 This option defines the location of that "pid file".
 
-This option is required if `worker_daemonize` is `true` and ignored 
+This option is required if `worker_daemonize` is `true` and ignored
 otherwise. It has no default.
 
 See also the [`pid_file` option](#pid_file) option for the main Synapse process.
@@ -3983,4 +4002,3 @@ background_updates:
     min_batch_size: 10
     default_batch_size: 50
 ```
-
diff --git a/docs/workers.md b/docs/workers.md
index 2b65acb5ed..59a6487e0d 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -505,6 +505,9 @@ worker application type.
 
 ### `synapse.app.pusher`
 
+It is likely this option will be deprecated in the future and is not recommended for new
+installations. Instead, [use `synapse.app.generic_worker` with the `pusher_instances`](usage/configuration/config_documentation.md#pusher_instances).
+
 Handles sending push notifications to sygnal and email. Doesn't handle any
 REST endpoints itself, but you should set
 [`start_pushers: false`](usage/configuration/config_documentation.md#start_pushers) in the
@@ -543,6 +546,9 @@ Note this worker cannot be load-balanced: only one instance should be active.
 
 ### `synapse.app.federation_sender`
 
+It is likely this option will be deprecated in the future and not recommended for
+new installations. Instead, [use `synapse.app.generic_worker` with the `federation_sender_instances`](usage/configuration/config_documentation.md#federation_sender_instances). 
+
 Handles sending federation traffic to other servers. Doesn't handle any
 REST endpoints itself, but you should set
 [`send_federation: false`](usage/configuration/config_documentation.md#send_federation)
@@ -639,7 +645,9 @@ equivalent to `synapse.app.generic_worker`:
  * `synapse.app.client_reader`
  * `synapse.app.event_creator`
  * `synapse.app.federation_reader`
+ * `synapse.app.federation_sender`
  * `synapse.app.frontend_proxy`
+ * `synapse.app.pusher`
  * `synapse.app.synchrotron`
 
 
diff --git a/mypy.ini b/mypy.ini
index 0b6e7df267..c3fbd1a955 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -59,16 +59,6 @@ exclude = (?x)
    |tests/server_notices/test_resource_limits_server_notices.py
    |tests/test_state.py
    |tests/test_terms_auth.py
-   |tests/util/test_async_helpers.py
-   |tests/util/test_batching_queue.py
-   |tests/util/test_dict_cache.py
-   |tests/util/test_expiring_cache.py
-   |tests/util/test_file_consumer.py
-   |tests/util/test_linearizer.py
-   |tests/util/test_logcontext.py
-   |tests/util/test_lrucache.py
-   |tests/util/test_rwlock.py
-   |tests/util/test_wheel_timer.py
    )$
 
 [mypy-synapse.federation.transport.client]
@@ -137,6 +127,9 @@ disallow_untyped_defs = True
 [mypy-tests.util.caches.test_descriptors]
 disallow_untyped_defs = False
 
+[mypy-tests.util.*]
+disallow_untyped_defs = True
+
 [mypy-tests.utils]
 disallow_untyped_defs = True
 
diff --git a/poetry.lock b/poetry.lock
index d9e4803a5f..90b363a548 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -186,7 +186,7 @@ python-versions = "*"
 
 [[package]]
 name = "cryptography"
-version = "38.0.3"
+version = "38.0.4"
 description = "cryptography is a package which provides cryptographic recipes and primitives to Python developers."
 category = "main"
 optional = false
@@ -452,7 +452,7 @@ i18n = ["Babel (>=2.7)"]
 
 [[package]]
 name = "jsonschema"
-version = "4.17.0"
+version = "4.17.3"
 description = "An implementation of JSON Schema validation for Python"
 category = "main"
 optional = false
@@ -663,7 +663,7 @@ python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7"
 
 [[package]]
 name = "phonenumbers"
-version = "8.13.0"
+version = "8.13.1"
 description = "Python version of Google's common library for parsing, formatting, storing and validating international phone numbers."
 category = "main"
 optional = false
@@ -888,17 +888,17 @@ tests = ["hypothesis (>=3.27.0)", "pytest (>=3.2.1,!=3.3.0)"]
 
 [[package]]
 name = "pyopenssl"
-version = "22.0.0"
+version = "22.1.0"
 description = "Python wrapper module around the OpenSSL library"
 category = "main"
 optional = false
 python-versions = ">=3.6"
 
 [package.dependencies]
-cryptography = ">=35.0"
+cryptography = ">=38.0.0,<39"
 
 [package.extras]
-docs = ["sphinx", "sphinx-rtd-theme"]
+docs = ["sphinx (!=5.2.0,!=5.2.0.post0)", "sphinx-rtd-theme"]
 test = ["flaky", "pretend", "pytest (>=3.0.1)"]
 
 [[package]]
@@ -1076,7 +1076,7 @@ doc = ["Sphinx", "sphinx-rtd-theme"]
 
 [[package]]
 name = "sentry-sdk"
-version = "1.11.0"
+version = "1.11.1"
 description = "Python client for Sentry (https://sentry.io)"
 category = "main"
 optional = true
@@ -1295,7 +1295,7 @@ docs = ["sphinx (>=1.4.8)"]
 
 [[package]]
 name = "twine"
-version = "4.0.1"
+version = "4.0.2"
 description = "Collection of utilities for publishing packages on PyPI"
 category = "dev"
 optional = false
@@ -1380,7 +1380,7 @@ python-versions = ">=3.6"
 
 [[package]]
 name = "types-bleach"
-version = "5.0.3"
+version = "5.0.3.1"
 description = "Typing stubs for bleach"
 category = "dev"
 optional = false
@@ -1448,7 +1448,7 @@ python-versions = "*"
 
 [[package]]
 name = "types-psycopg2"
-version = "2.9.21.1"
+version = "2.9.21.2"
 description = "Typing stubs for psycopg2"
 category = "dev"
 optional = false
@@ -1475,7 +1475,7 @@ python-versions = "*"
 
 [[package]]
 name = "types-requests"
-version = "2.28.11.2"
+version = "2.28.11.5"
 description = "Typing stubs for requests"
 category = "dev"
 optional = false
@@ -1486,7 +1486,7 @@ types-urllib3 = "<1.27"
 
 [[package]]
 name = "types-setuptools"
-version = "65.5.0.3"
+version = "65.6.0.1"
 description = "Typing stubs for setuptools"
 category = "dev"
 optional = false
@@ -1639,7 +1639,7 @@ url-preview = ["lxml"]
 [metadata]
 lock-version = "1.1"
 python-versions = "^3.7.1"
-content-hash = "27811bd21d56ceeb0f68ded5a00375efcd1a004928f0736f5b02927ce8594cb0"
+content-hash = "8c44ceeb9df5c3ab43040400e0a6b895de49417e61293a1ba027640b34f03263"
 
 [metadata.files]
 attrs = [
@@ -1788,32 +1788,32 @@ constantly = [
     {file = "constantly-15.1.0.tar.gz", hash = "sha256:586372eb92059873e29eba4f9dec8381541b4d3834660707faf8ba59146dfc35"},
 ]
 cryptography = [
-    {file = "cryptography-38.0.3-cp36-abi3-macosx_10_10_universal2.whl", hash = "sha256:984fe150f350a3c91e84de405fe49e688aa6092b3525f407a18b9646f6612320"},
-    {file = "cryptography-38.0.3-cp36-abi3-macosx_10_10_x86_64.whl", hash = "sha256:ed7b00096790213e09eb11c97cc6e2b757f15f3d2f85833cd2d3ec3fe37c1722"},
-    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:bbf203f1a814007ce24bd4d51362991d5cb90ba0c177a9c08825f2cc304d871f"},
-    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:554bec92ee7d1e9d10ded2f7e92a5d70c1f74ba9524947c0ba0c850c7b011828"},
-    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b1b52c9e5f8aa2b802d48bd693190341fae201ea51c7a167d69fc48b60e8a959"},
-    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_24_x86_64.whl", hash = "sha256:728f2694fa743a996d7784a6194da430f197d5c58e2f4e278612b359f455e4a2"},
-    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:dfb4f4dd568de1b6af9f4cda334adf7d72cf5bc052516e1b2608b683375dd95c"},
-    {file = "cryptography-38.0.3-cp36-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:5419a127426084933076132d317911e3c6eb77568a1ce23c3ac1e12d111e61e0"},
-    {file = "cryptography-38.0.3-cp36-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:9b24bcff7853ed18a63cfb0c2b008936a9554af24af2fb146e16d8e1aed75748"},
-    {file = "cryptography-38.0.3-cp36-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:25c1d1f19729fb09d42e06b4bf9895212292cb27bb50229f5aa64d039ab29146"},
-    {file = "cryptography-38.0.3-cp36-abi3-win32.whl", hash = "sha256:7f836217000342d448e1c9a342e9163149e45d5b5eca76a30e84503a5a96cab0"},
-    {file = "cryptography-38.0.3-cp36-abi3-win_amd64.whl", hash = "sha256:c46837ea467ed1efea562bbeb543994c2d1f6e800785bd5a2c98bc096f5cb220"},
-    {file = "cryptography-38.0.3-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06fc3cc7b6f6cca87bd56ec80a580c88f1da5306f505876a71c8cfa7050257dd"},
-    {file = "cryptography-38.0.3-pp37-pypy37_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:65535bc550b70bd6271984d9863a37741352b4aad6fb1b3344a54e6950249b55"},
-    {file = "cryptography-38.0.3-pp37-pypy37_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:5e89468fbd2fcd733b5899333bc54d0d06c80e04cd23d8c6f3e0542358c6060b"},
-    {file = "cryptography-38.0.3-pp38-pypy38_pp73-macosx_10_10_x86_64.whl", hash = "sha256:6ab9516b85bebe7aa83f309bacc5f44a61eeb90d0b4ec125d2d003ce41932d36"},
-    {file = "cryptography-38.0.3-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:068147f32fa662c81aebab95c74679b401b12b57494872886eb5c1139250ec5d"},
-    {file = "cryptography-38.0.3-pp38-pypy38_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:402852a0aea73833d982cabb6d0c3bb582c15483d29fb7085ef2c42bfa7e38d7"},
-    {file = "cryptography-38.0.3-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:b1b35d9d3a65542ed2e9d90115dfd16bbc027b3f07ee3304fc83580f26e43249"},
-    {file = "cryptography-38.0.3-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:6addc3b6d593cd980989261dc1cce38263c76954d758c3c94de51f1e010c9a50"},
-    {file = "cryptography-38.0.3-pp39-pypy39_pp73-macosx_10_10_x86_64.whl", hash = "sha256:be243c7e2bfcf6cc4cb350c0d5cdf15ca6383bbcb2a8ef51d3c9411a9d4386f0"},
-    {file = "cryptography-38.0.3-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:78cf5eefac2b52c10398a42765bfa981ce2372cbc0457e6bf9658f41ec3c41d8"},
-    {file = "cryptography-38.0.3-pp39-pypy39_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:4e269dcd9b102c5a3d72be3c45d8ce20377b8076a43cbed6f660a1afe365e436"},
-    {file = "cryptography-38.0.3-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:8d41a46251bf0634e21fac50ffd643216ccecfaf3701a063257fe0b2be1b6548"},
-    {file = "cryptography-38.0.3-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:785e4056b5a8b28f05a533fab69febf5004458e20dad7e2e13a3120d8ecec75a"},
-    {file = "cryptography-38.0.3.tar.gz", hash = "sha256:bfbe6ee19615b07a98b1d2287d6a6073f734735b49ee45b11324d85efc4d5cbd"},
+    {file = "cryptography-38.0.4-cp36-abi3-macosx_10_10_universal2.whl", hash = "sha256:2fa36a7b2cc0998a3a4d5af26ccb6273f3df133d61da2ba13b3286261e7efb70"},
+    {file = "cryptography-38.0.4-cp36-abi3-macosx_10_10_x86_64.whl", hash = "sha256:1f13ddda26a04c06eb57119caf27a524ccae20533729f4b1e4a69b54e07035eb"},
+    {file = "cryptography-38.0.4-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.manylinux_2_24_aarch64.whl", hash = "sha256:2ec2a8714dd005949d4019195d72abed84198d877112abb5a27740e217e0ea8d"},
+    {file = "cryptography-38.0.4-cp36-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:50a1494ed0c3f5b4d07650a68cd6ca62efe8b596ce743a5c94403e6f11bf06c1"},
+    {file = "cryptography-38.0.4-cp36-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a10498349d4c8eab7357a8f9aa3463791292845b79597ad1b98a543686fb1ec8"},
+    {file = "cryptography-38.0.4-cp36-abi3-manylinux_2_24_x86_64.whl", hash = "sha256:10652dd7282de17990b88679cb82f832752c4e8237f0c714be518044269415db"},
+    {file = "cryptography-38.0.4-cp36-abi3-manylinux_2_28_aarch64.whl", hash = "sha256:bfe6472507986613dc6cc00b3d492b2f7564b02b3b3682d25ca7f40fa3fd321b"},
+    {file = "cryptography-38.0.4-cp36-abi3-manylinux_2_28_x86_64.whl", hash = "sha256:ce127dd0a6a0811c251a6cddd014d292728484e530d80e872ad9806cfb1c5b3c"},
+    {file = "cryptography-38.0.4-cp36-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:53049f3379ef05182864d13bb9686657659407148f901f3f1eee57a733fb4b00"},
+    {file = "cryptography-38.0.4-cp36-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:8a4b2bdb68a447fadebfd7d24855758fe2d6fecc7fed0b78d190b1af39a8e3b0"},
+    {file = "cryptography-38.0.4-cp36-abi3-win32.whl", hash = "sha256:1d7e632804a248103b60b16fb145e8df0bc60eed790ece0d12efe8cd3f3e7744"},
+    {file = "cryptography-38.0.4-cp36-abi3-win_amd64.whl", hash = "sha256:8e45653fb97eb2f20b8c96f9cd2b3a0654d742b47d638cf2897afbd97f80fa6d"},
+    {file = "cryptography-38.0.4-pp37-pypy37_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ca57eb3ddaccd1112c18fc80abe41db443cc2e9dcb1917078e02dfa010a4f353"},
+    {file = "cryptography-38.0.4-pp37-pypy37_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:c9e0d79ee4c56d841bd4ac6e7697c8ff3c8d6da67379057f29e66acffcd1e9a7"},
+    {file = "cryptography-38.0.4-pp37-pypy37_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:0e70da4bdff7601b0ef48e6348339e490ebfb0cbe638e083c9c41fb49f00c8bd"},
+    {file = "cryptography-38.0.4-pp38-pypy38_pp73-macosx_10_10_x86_64.whl", hash = "sha256:998cd19189d8a747b226d24c0207fdaa1e6658a1d3f2494541cb9dfbf7dcb6d2"},
+    {file = "cryptography-38.0.4-pp38-pypy38_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:67461b5ebca2e4c2ab991733f8ab637a7265bb582f07c7c88914b5afb88cb95b"},
+    {file = "cryptography-38.0.4-pp38-pypy38_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:4eb85075437f0b1fd8cd66c688469a0c4119e0ba855e3fef86691971b887caf6"},
+    {file = "cryptography-38.0.4-pp38-pypy38_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:3178d46f363d4549b9a76264f41c6948752183b3f587666aff0555ac50fd7876"},
+    {file = "cryptography-38.0.4-pp38-pypy38_pp73-win_amd64.whl", hash = "sha256:6391e59ebe7c62d9902c24a4d8bcbc79a68e7c4ab65863536127c8a9cd94043b"},
+    {file = "cryptography-38.0.4-pp39-pypy39_pp73-macosx_10_10_x86_64.whl", hash = "sha256:78e47e28ddc4ace41dd38c42e6feecfdadf9c3be2af389abbfeef1ff06822285"},
+    {file = "cryptography-38.0.4-pp39-pypy39_pp73-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2fb481682873035600b5502f0015b664abc26466153fab5c6bc92c1ea69d478b"},
+    {file = "cryptography-38.0.4-pp39-pypy39_pp73-manylinux_2_24_x86_64.whl", hash = "sha256:4367da5705922cf7070462e964f66e4ac24162e22ab0a2e9d31f1b270dd78083"},
+    {file = "cryptography-38.0.4-pp39-pypy39_pp73-manylinux_2_28_x86_64.whl", hash = "sha256:b4cad0cea995af760f82820ab4ca54e5471fc782f70a007f31531957f43e9dee"},
+    {file = "cryptography-38.0.4-pp39-pypy39_pp73-win_amd64.whl", hash = "sha256:80ca53981ceeb3241998443c4964a387771588c4e4a5d92735a493af868294f9"},
+    {file = "cryptography-38.0.4.tar.gz", hash = "sha256:175c1a818b87c9ac80bb7377f5520b7f31b3ef2a0004e2420319beadedb67290"},
 ]
 defusedxml = [
     {file = "defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61"},
@@ -2013,8 +2013,8 @@ jinja2 = [
     {file = "Jinja2-3.1.2.tar.gz", hash = "sha256:31351a702a408a9e7595a8fc6150fc3f43bb6bf7e319770cbc0db9df9437e852"},
 ]
 jsonschema = [
-    {file = "jsonschema-4.17.0-py3-none-any.whl", hash = "sha256:f660066c3966db7d6daeaea8a75e0b68237a48e51cf49882087757bb59916248"},
-    {file = "jsonschema-4.17.0.tar.gz", hash = "sha256:5bfcf2bca16a087ade17e02b282d34af7ccd749ef76241e7f9bd7c0cb8a9424d"},
+    {file = "jsonschema-4.17.3-py3-none-any.whl", hash = "sha256:a870ad254da1a8ca84b6a2905cac29d265f805acc57af304784962a2aa6508f6"},
+    {file = "jsonschema-4.17.3.tar.gz", hash = "sha256:0f864437ab8b6076ba6707453ef8f98a6a0d512a80e93f8abdb676f737ecb60d"},
 ]
 keyring = [
     {file = "keyring-23.5.0-py3-none-any.whl", hash = "sha256:b0d28928ac3ec8e42ef4cc227822647a19f1d544f21f96457965dc01cf555261"},
@@ -2258,8 +2258,8 @@ pathspec = [
     {file = "pathspec-0.9.0.tar.gz", hash = "sha256:e564499435a2673d586f6b2130bb5b95f04a3ba06f81b8f895b651a3c76aabb1"},
 ]
 phonenumbers = [
-    {file = "phonenumbers-8.13.0-py2.py3-none-any.whl", hash = "sha256:dbaea9e4005a976bcf18fbe2bb87cb9cd0a3f119136f04188ac412d7741cebf0"},
-    {file = "phonenumbers-8.13.0.tar.gz", hash = "sha256:93745d7afd38e246660bb601b07deac54eeb76c8e5e43f5e83333b0383a0a1e4"},
+    {file = "phonenumbers-8.13.1-py2.py3-none-any.whl", hash = "sha256:07a95c2f178687fd1c3f722cf792b3d33e3a225ae71577e500c99b28544cd6d0"},
+    {file = "phonenumbers-8.13.1.tar.gz", hash = "sha256:7cadfe900e833857500b7bafa3e5a7eddc3263eb66b66a767870b33e44665f92"},
 ]
 pillow = [
     {file = "Pillow-9.3.0-1-cp37-cp37m-win32.whl", hash = "sha256:e6ea6b856a74d560d9326c0f5895ef8050126acfdc7ca08ad703eb0081e82b74"},
@@ -2452,8 +2452,8 @@ pynacl = [
     {file = "PyNaCl-1.5.0.tar.gz", hash = "sha256:8ac7448f09ab85811607bdd21ec2464495ac8b7c66d146bf545b0f08fb9220ba"},
 ]
 pyopenssl = [
-    {file = "pyOpenSSL-22.0.0-py2.py3-none-any.whl", hash = "sha256:ea252b38c87425b64116f808355e8da644ef9b07e429398bfece610f893ee2e0"},
-    {file = "pyOpenSSL-22.0.0.tar.gz", hash = "sha256:660b1b1425aac4a1bea1d94168a85d99f0b3144c869dd4390d27629d0087f1bf"},
+    {file = "pyOpenSSL-22.1.0-py3-none-any.whl", hash = "sha256:b28437c9773bb6c6958628cf9c3bebe585de661dba6f63df17111966363dd15e"},
+    {file = "pyOpenSSL-22.1.0.tar.gz", hash = "sha256:7a83b7b272dd595222d672f5ce29aa030f1fb837630ef229f62e72e395ce8968"},
 ]
 pyparsing = [
     {file = "pyparsing-3.0.7-py3-none-any.whl", hash = "sha256:a6c06a88f252e6c322f65faf8f418b16213b51bdfaece0524c1c1bc30c63c484"},
@@ -2569,8 +2569,8 @@ semantic-version = [
     {file = "semantic_version-2.10.0.tar.gz", hash = "sha256:bdabb6d336998cbb378d4b9db3a4b56a1e3235701dc05ea2690d9a997ed5041c"},
 ]
 sentry-sdk = [
-    {file = "sentry-sdk-1.11.0.tar.gz", hash = "sha256:e7b78a1ddf97a5f715a50ab8c3f7a93f78b114c67307785ee828ef67a5d6f117"},
-    {file = "sentry_sdk-1.11.0-py2.py3-none-any.whl", hash = "sha256:f467e6c7fac23d4d42bc83eb049c400f756cd2d65ab44f0cc1165d0c7c3d40bc"},
+    {file = "sentry-sdk-1.11.1.tar.gz", hash = "sha256:675f6279b6bb1fea09fd61751061f9a90dca3b5929ef631dd50dc8b3aeb245e9"},
+    {file = "sentry_sdk-1.11.1-py2.py3-none-any.whl", hash = "sha256:8b4ff696c0bdcceb3f70bbb87a57ba84fd3168b1332d493fcd16c137f709578c"},
 ]
 service-identity = [
     {file = "service-identity-21.1.0.tar.gz", hash = "sha256:6e6c6086ca271dc11b033d17c3a8bea9f24ebff920c587da090afc9519419d34"},
@@ -2729,8 +2729,8 @@ treq = [
     {file = "treq-22.2.0.tar.gz", hash = "sha256:df757e3f141fc782ede076a604521194ffcb40fa2645cf48e5a37060307f52ec"},
 ]
 twine = [
-    {file = "twine-4.0.1-py3-none-any.whl", hash = "sha256:42026c18e394eac3e06693ee52010baa5313e4811d5a11050e7d48436cf41b9e"},
-    {file = "twine-4.0.1.tar.gz", hash = "sha256:96b1cf12f7ae611a4a40b6ae8e9570215daff0611828f5fe1f37a16255ab24a0"},
+    {file = "twine-4.0.2-py3-none-any.whl", hash = "sha256:929bc3c280033347a00f847236564d1c52a3e61b1ac2516c97c48f3ceab756d8"},
+    {file = "twine-4.0.2.tar.gz", hash = "sha256:9e102ef5fdd5a20661eb88fad46338806c3bd32cf1db729603fe3697b1bc83c8"},
 ]
 twisted = [
     {file = "Twisted-22.10.0-py3-none-any.whl", hash = "sha256:86c55f712cc5ab6f6d64e02503352464f0400f66d4f079096d744080afcccbd0"},
@@ -2781,8 +2781,8 @@ typed-ast = [
     {file = "typed_ast-1.5.2.tar.gz", hash = "sha256:525a2d4088e70a9f75b08b3f87a51acc9cde640e19cc523c7e41aa355564ae27"},
 ]
 types-bleach = [
-    {file = "types-bleach-5.0.3.tar.gz", hash = "sha256:f7b3df8278efe176d9670d0f063a66c866c77577f71f54b9c7a320e31b1a7bbd"},
-    {file = "types_bleach-5.0.3-py3-none-any.whl", hash = "sha256:5931525d03571f36b2bb40210c34b662c4d26c8fd6f2b1e1e83fe4d2d2fd63c7"},
+    {file = "types-bleach-5.0.3.1.tar.gz", hash = "sha256:ce8772ea5126dab1883851b41e3aeff229aa5213ced36096990344e632e92373"},
+    {file = "types_bleach-5.0.3.1-py3-none-any.whl", hash = "sha256:af5f1b3a54ff279f54c29eccb2e6988ebb6718bc4061469588a5fd4880a79287"},
 ]
 types-commonmark = [
     {file = "types-commonmark-0.9.2.tar.gz", hash = "sha256:b894b67750c52fd5abc9a40a9ceb9da4652a391d75c1b480bba9cef90f19fc86"},
@@ -2813,8 +2813,8 @@ types-pillow = [
     {file = "types_Pillow-9.3.0.1-py3-none-any.whl", hash = "sha256:79837755fe9659f29efd1016e9903ac4a500e0c73260483f07296bd6ca47668b"},
 ]
 types-psycopg2 = [
-    {file = "types-psycopg2-2.9.21.1.tar.gz", hash = "sha256:f5532cf15afdc6b5ebb1e59b7d896617217321f488fd1fbd74e7efb94decfab6"},
-    {file = "types_psycopg2-2.9.21.1-py3-none-any.whl", hash = "sha256:858838f1972f39da2a6e28274201fed8619a40a235dd86e7f66f4548ec474395"},
+    {file = "types-psycopg2-2.9.21.2.tar.gz", hash = "sha256:bff045579642ce00b4a3c8f2e401b7f96dfaa34939f10be64b0dd3b53feca57d"},
+    {file = "types_psycopg2-2.9.21.2-py3-none-any.whl", hash = "sha256:084558d6bc4b2cfa249b06be0fdd9a14a69d307bae5bb5809a2f14cfbaa7a23f"},
 ]
 types-pyopenssl = [
     {file = "types-pyOpenSSL-22.1.0.2.tar.gz", hash = "sha256:7a350e29e55bc3ee4571f996b4b1c18c4e4098947db45f7485b016eaa35b44bc"},
@@ -2825,12 +2825,12 @@ types-pyyaml = [
     {file = "types_PyYAML-6.0.12.2-py3-none-any.whl", hash = "sha256:1e94e80aafee07a7e798addb2a320e32956a373f376655128ae20637adb2655b"},
 ]
 types-requests = [
-    {file = "types-requests-2.28.11.2.tar.gz", hash = "sha256:fdcd7bd148139fb8eef72cf4a41ac7273872cad9e6ada14b11ff5dfdeee60ed3"},
-    {file = "types_requests-2.28.11.2-py3-none-any.whl", hash = "sha256:14941f8023a80b16441b3b46caffcbfce5265fd14555844d6029697824b5a2ef"},
+    {file = "types-requests-2.28.11.5.tar.gz", hash = "sha256:a7df37cc6fb6187a84097da951f8e21d335448aa2501a6b0a39cbd1d7ca9ee2a"},
+    {file = "types_requests-2.28.11.5-py3-none-any.whl", hash = "sha256:091d4a5a33c1b4f20d8b1b952aa8fa27a6e767c44c3cf65e56580df0b05fd8a9"},
 ]
 types-setuptools = [
-    {file = "types-setuptools-65.5.0.3.tar.gz", hash = "sha256:17769171f5f2a2dc69b25c0d3106552a5cda767bbf6b36cb6212b26dae5aa9fc"},
-    {file = "types_setuptools-65.5.0.3-py3-none-any.whl", hash = "sha256:9254c32b0cc91c486548e7d7561243b5bd185402a383e93c6691e1b9bc8d86e2"},
+    {file = "types-setuptools-65.6.0.1.tar.gz", hash = "sha256:a03cf72f336929c9405f485dd90baef31a401776675f785f69a5a519f0b099ca"},
+    {file = "types_setuptools-65.6.0.1-py3-none-any.whl", hash = "sha256:c957599502195ab98e90f0560466fa963f6a23373905e6d4e1772dbfaf1e44b7"},
 ]
 types-urllib3 = [
     {file = "types-urllib3-1.26.10.tar.gz", hash = "sha256:a26898f530e6c3f43f25b907f2b884486868ffd56a9faa94cbf9b3eb6e165d6a"},
diff --git a/pyproject.toml b/pyproject.toml
index 5537d4a0f7..df59fa0562 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -141,7 +141,8 @@ pyasn1 = ">=0.1.9"
 pyasn1-modules = ">=0.0.7"
 bcrypt = ">=3.1.7"
 Pillow = ">=5.4.0"
-sortedcontainers = ">=1.4.4"
+# We use SortedDict.peekitem(), which was added in sortedcontainers 1.5.2.
+sortedcontainers = ">=1.5.2"
 pymacaroons = ">=0.13.0"
 msgpack = ">=0.5.2"
 phonenumbers = ">=8.2.0"
diff --git a/rust/benches/evaluator.rs b/rust/benches/evaluator.rs
index ed411461d1..442a79348f 100644
--- a/rust/benches/evaluator.rs
+++ b/rust/benches/evaluator.rs
@@ -33,10 +33,12 @@ fn bench_match_exact(b: &mut Bencher) {
     let eval = PushRuleEvaluator::py_new(
         flattened_keys,
         10,
-        0,
+        Some(0),
         Default::default(),
         Default::default(),
         true,
+        vec![],
+        false,
     )
     .unwrap();
 
@@ -67,10 +69,12 @@ fn bench_match_word(b: &mut Bencher) {
     let eval = PushRuleEvaluator::py_new(
         flattened_keys,
         10,
-        0,
+        Some(0),
         Default::default(),
         Default::default(),
         true,
+        vec![],
+        false,
     )
     .unwrap();
 
@@ -101,10 +105,12 @@ fn bench_match_word_miss(b: &mut Bencher) {
     let eval = PushRuleEvaluator::py_new(
         flattened_keys,
         10,
-        0,
+        Some(0),
         Default::default(),
         Default::default(),
         true,
+        vec![],
+        false,
     )
     .unwrap();
 
@@ -135,10 +141,12 @@ fn bench_eval_message(b: &mut Bencher) {
     let eval = PushRuleEvaluator::py_new(
         flattened_keys,
         10,
-        0,
+        Some(0),
         Default::default(),
         Default::default(),
         true,
+        vec![],
+        false,
     )
     .unwrap();
 
diff --git a/rust/src/push/evaluator.rs b/rust/src/push/evaluator.rs
index 1cd54f7e2c..c901c0fbcc 100644
--- a/rust/src/push/evaluator.rs
+++ b/rust/src/push/evaluator.rs
@@ -12,10 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-use std::borrow::Cow;
 use std::collections::BTreeMap;
 
-use crate::push::{PushRule, PushRules};
 use anyhow::{Context, Error};
 use lazy_static::lazy_static;
 use log::warn;
@@ -98,6 +96,7 @@ pub struct PushRuleEvaluator {
 #[pymethods]
 impl PushRuleEvaluator {
     /// Create a new `PushRuleEvaluator`. See struct docstring for details.
+    #[allow(clippy::too_many_arguments)]
     #[new]
     pub fn py_new(
         flattened_keys: BTreeMap<String, String>,
@@ -153,15 +152,12 @@ impl PushRuleEvaluator {
             let mut has_rver_condition = false;
 
             for condition in push_rule.conditions.iter() {
-                has_rver_condition = has_rver_condition
-                    || match condition {
-                        Condition::Known(known) => match known {
-                            // per MSC3932, we just need *any* room version condition to match
-                            KnownCondition::RoomVersionSupports { feature: _ } => true,
-                            _ => false,
-                        },
-                        _ => false,
-                    };
+                has_rver_condition |= matches!(
+                    condition,
+                    // per MSC3932, we just need *any* room version condition to match
+                    Condition::Known(KnownCondition::RoomVersionSupports { feature: _ }),
+                );
+
                 match self.match_condition(condition, user_id, display_name) {
                     Ok(true) => {}
                     Ok(false) => continue 'outer,
@@ -444,6 +440,10 @@ fn push_rule_evaluator() {
 
 #[test]
 fn test_requires_room_version_supports_condition() {
+    use std::borrow::Cow;
+
+    use crate::push::{PushRule, PushRules};
+
     let mut flattened_keys = BTreeMap::new();
     flattened_keys.insert("content.body".to_string(), "foo bar bob hello".to_string());
     let flags = vec![RoomVersionFeatures::ExtensibleEvents.as_str().to_string()];
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index bc04a0755b..89723d24fa 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -230,6 +230,9 @@ class EventContentFields:
     # The authorising user for joining a restricted room.
     AUTHORISING_USER: Final = "join_authorised_via_users_server"
 
+    # an unspecced field added to to-device messages to identify them uniquely-ish
+    TO_DEVICE_MSGID: Final = "org.matrix.msgid"
+
 
 class RoomTypes:
     """Understood values of the room_type field of m.room.create events."""
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 46dc731696..bcc8abe20c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -44,40 +44,8 @@ from synapse.http.server import JsonResource, OptionsResource
 from synapse.logging.context import LoggingContext
 from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
+from synapse.rest import ClientRestResource
 from synapse.rest.admin import register_servlets_for_media_repo
-from synapse.rest.client import (
-    account_data,
-    events,
-    initial_sync,
-    login,
-    presence,
-    profile,
-    push_rule,
-    read_marker,
-    receipts,
-    relations,
-    room,
-    room_batch,
-    room_keys,
-    sendtodevice,
-    sync,
-    tags,
-    user_directory,
-    versions,
-    voip,
-)
-from synapse.rest.client.account import ThreepidRestServlet, WhoamiRestServlet
-from synapse.rest.client.devices import DevicesRestServlet
-from synapse.rest.client.keys import (
-    KeyChangesServlet,
-    KeyQueryServlet,
-    KeyUploadServlet,
-    OneTimeKeyServlet,
-)
-from synapse.rest.client.register import (
-    RegisterRestServlet,
-    RegistrationTokenValidityRestServlet,
-)
 from synapse.rest.health import HealthResource
 from synapse.rest.key.v2 import KeyResource
 from synapse.rest.synapse.client import build_synapse_client_resource_tree
@@ -200,45 +168,7 @@ class GenericWorkerServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "client":
-                    resource = JsonResource(self, canonical_json=False)
-
-                    RegisterRestServlet(self).register(resource)
-                    RegistrationTokenValidityRestServlet(self).register(resource)
-                    login.register_servlets(self, resource)
-                    ThreepidRestServlet(self).register(resource)
-                    WhoamiRestServlet(self).register(resource)
-                    DevicesRestServlet(self).register(resource)
-
-                    # Read-only
-                    KeyUploadServlet(self).register(resource)
-                    KeyQueryServlet(self).register(resource)
-                    KeyChangesServlet(self).register(resource)
-                    OneTimeKeyServlet(self).register(resource)
-
-                    voip.register_servlets(self, resource)
-                    push_rule.register_servlets(self, resource)
-                    versions.register_servlets(self, resource)
-
-                    profile.register_servlets(self, resource)
-
-                    sync.register_servlets(self, resource)
-                    events.register_servlets(self, resource)
-                    room.register_servlets(self, resource, is_worker=True)
-                    relations.register_servlets(self, resource)
-                    room.register_deprecated_servlets(self, resource)
-                    initial_sync.register_servlets(self, resource)
-                    room_batch.register_servlets(self, resource)
-                    room_keys.register_servlets(self, resource)
-                    tags.register_servlets(self, resource)
-                    account_data.register_servlets(self, resource)
-                    receipts.register_servlets(self, resource)
-                    read_marker.register_servlets(self, resource)
-
-                    sendtodevice.register_servlets(self, resource)
-
-                    user_directory.register_servlets(self, resource)
-
-                    presence.register_servlets(self, resource)
+                    resource: Resource = ClientRestResource(self)
 
                     resources[CLIENT_API_PREFIX] = resource
 
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 979b128eae..3b5378e6ea 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -26,6 +26,7 @@ class PushConfig(Config):
     def read_config(self, config: JsonDict, **kwargs: Any) -> None:
         push_config = config.get("push") or {}
         self.push_include_content = push_config.get("include_content", True)
+        self.enable_push = push_config.get("enabled", True)
         self.push_group_unread_count_by_room = push_config.get(
             "group_unread_count_by_room", True
         )
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index ed15f88350..69310d9035 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -14,7 +14,6 @@
 
 import abc
 import logging
-import urllib
 from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Tuple
 
 import attr
@@ -813,31 +812,27 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
 
         results = {}
 
-        async def get_key(key_to_fetch_item: _FetchKeyRequest) -> None:
+        async def get_keys(key_to_fetch_item: _FetchKeyRequest) -> None:
             server_name = key_to_fetch_item.server_name
-            key_ids = key_to_fetch_item.key_ids
 
             try:
-                keys = await self.get_server_verify_key_v2_direct(server_name, key_ids)
+                keys = await self.get_server_verify_keys_v2_direct(server_name)
                 results[server_name] = keys
             except KeyLookupError as e:
-                logger.warning(
-                    "Error looking up keys %s from %s: %s", key_ids, server_name, e
-                )
+                logger.warning("Error looking up keys from %s: %s", server_name, e)
             except Exception:
-                logger.exception("Error getting keys %s from %s", key_ids, server_name)
+                logger.exception("Error getting keys from %s", server_name)
 
-        await yieldable_gather_results(get_key, keys_to_fetch)
+        await yieldable_gather_results(get_keys, keys_to_fetch)
         return results
 
-    async def get_server_verify_key_v2_direct(
-        self, server_name: str, key_ids: Iterable[str]
+    async def get_server_verify_keys_v2_direct(
+        self, server_name: str
     ) -> Dict[str, FetchKeyResult]:
         """
 
         Args:
-            server_name:
-            key_ids:
+            server_name: Server to request keys from
 
         Returns:
             Map from key ID to lookup result
@@ -845,57 +840,41 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
         Raises:
             KeyLookupError if there was a problem making the lookup
         """
-        keys: Dict[str, FetchKeyResult] = {}
-
-        for requested_key_id in key_ids:
-            # we may have found this key as a side-effect of asking for another.
-            if requested_key_id in keys:
-                continue
-
-            time_now_ms = self.clock.time_msec()
-            try:
-                response = await self.client.get_json(
-                    destination=server_name,
-                    path="/_matrix/key/v2/server/"
-                    + urllib.parse.quote(requested_key_id, safe=""),
-                    ignore_backoff=True,
-                    # we only give the remote server 10s to respond. It should be an
-                    # easy request to handle, so if it doesn't reply within 10s, it's
-                    # probably not going to.
-                    #
-                    # Furthermore, when we are acting as a notary server, we cannot
-                    # wait all day for all of the origin servers, as the requesting
-                    # server will otherwise time out before we can respond.
-                    #
-                    # (Note that get_json may make 4 attempts, so this can still take
-                    # almost 45 seconds to fetch the headers, plus up to another 60s to
-                    # read the response).
-                    timeout=10000,
-                )
-            except (NotRetryingDestination, RequestSendFailed) as e:
-                # these both have str() representations which we can't really improve
-                # upon
-                raise KeyLookupError(str(e))
-            except HttpResponseException as e:
-                raise KeyLookupError("Remote server returned an error: %s" % (e,))
-
-            assert isinstance(response, dict)
-            if response["server_name"] != server_name:
-                raise KeyLookupError(
-                    "Expected a response for server %r not %r"
-                    % (server_name, response["server_name"])
-                )
-
-            response_keys = await self.process_v2_response(
-                from_server=server_name,
-                response_json=response,
-                time_added_ms=time_now_ms,
+        time_now_ms = self.clock.time_msec()
+        try:
+            response = await self.client.get_json(
+                destination=server_name,
+                path="/_matrix/key/v2/server",
+                ignore_backoff=True,
+                # we only give the remote server 10s to respond. It should be an
+                # easy request to handle, so if it doesn't reply within 10s, it's
+                # probably not going to.
+                #
+                # Furthermore, when we are acting as a notary server, we cannot
+                # wait all day for all of the origin servers, as the requesting
+                # server will otherwise time out before we can respond.
+                #
+                # (Note that get_json may make 4 attempts, so this can still take
+                # almost 45 seconds to fetch the headers, plus up to another 60s to
+                # read the response).
+                timeout=10000,
             )
-            await self.store.store_server_verify_keys(
-                server_name,
-                time_now_ms,
-                ((server_name, key_id, key) for key_id, key in response_keys.items()),
+        except (NotRetryingDestination, RequestSendFailed) as e:
+            # these both have str() representations which we can't really improve
+            # upon
+            raise KeyLookupError(str(e))
+        except HttpResponseException as e:
+            raise KeyLookupError("Remote server returned an error: %s" % (e,))
+
+        assert isinstance(response, dict)
+        if response["server_name"] != server_name:
+            raise KeyLookupError(
+                "Expected a response for server %r not %r"
+                % (server_name, response["server_name"])
             )
-            keys.update(response_keys)
 
-        return keys
+        return await self.process_v2_response(
+            from_server=server_name,
+            response_json=response,
+            time_added_ms=time_now_ms,
+        )
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 8bccc9c60d..137cfb3346 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -771,17 +771,28 @@ class FederationClient(FederationBase):
         """
         if synapse_error is None:
             synapse_error = e.to_synapse_error()
-        # There is no good way to detect an "unknown" endpoint.
+        # MSC3743 specifies that servers should return a 404 or 405 with an errcode
+        # of M_UNRECOGNIZED when they receive a request to an unknown endpoint or
+        # to an unknown method, respectively.
         #
-        # Dendrite returns a 404 (with a body of "404 page not found");
-        # Conduit returns a 404 (with no body); and Synapse returns a 400
-        # with M_UNRECOGNIZED.
-        #
-        # This needs to be rather specific as some endpoints truly do return 404
-        # errors.
+        # Older versions of servers don't properly handle this. This needs to be
+        # rather specific as some endpoints truly do return 404 errors.
         return (
-            e.code == 404 and (not e.response or e.response == b"404 page not found")
-        ) or (e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED)
+            # 404 is an unknown endpoint, 405 is a known endpoint, but unknown method.
+            (e.code == 404 or e.code == 405)
+            and (
+                # Older Dendrites returned a text or empty body.
+                # Older Conduit returned an empty body.
+                not e.response
+                or e.response == b"404 page not found"
+                # The proper response JSON with M_UNRECOGNIZED errcode.
+                or synapse_error.errcode == Codes.UNRECOGNIZED
+            )
+        ) or (
+            # Older Synapses returned a 400 error.
+            e.code == 400
+            and synapse_error.errcode == Codes.UNRECOGNIZED
+        )
 
     async def _try_destination_list(
         self,
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index fc1d8c88a7..30ebd62883 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -647,7 +647,7 @@ class FederationSender(AbstractFederationSender):
         room_id = receipt.room_id
 
         # Work out which remote servers should be poked and poke them.
-        domains_set = await self._storage_controllers.state.get_current_hosts_in_room(
+        domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
             room_id
         )
         domains = [
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 5af2784f1e..ffc9d95ee7 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -641,7 +641,7 @@ class PerDestinationQueue:
             if not message_id:
                 continue
 
-            set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+            set_tag(SynapseTags.TO_DEVICE_EDU_ID, message_id)
 
         edus = [
             Edu(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 66f5b8d108..5d1d21cdc8 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -578,9 +578,6 @@ class ApplicationServicesHandler:
             device_id,
         ), messages in recipient_device_to_messages.items():
             for message_json in messages:
-                # Remove 'message_id' from the to-device message, as it's an internal ID
-                message_json.pop("message_id", None)
-
                 message_payload.append(
                     {
                         "to_user_id": user_id,
@@ -615,8 +612,8 @@ class ApplicationServicesHandler:
         )
 
         # Fetch the users who have modified their device list since then.
-        users_with_changed_device_lists = (
-            await self.store.get_users_whose_devices_changed(from_key, to_key=new_key)
+        users_with_changed_device_lists = await self.store.get_all_devices_changed(
+            from_key, to_key=new_key
         )
 
         # Filter out any users the application service is not interested in
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index b1e55e1b9e..d4750a32e6 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -996,7 +996,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater):
         # Check if we are partially joining any rooms. If so we need to store
         # all device list updates so that we can handle them correctly once we
         # know who is in the room.
-        # TODO(faster joins): this fetches and processes a bunch of data that we don't
+        # TODO(faster_joins): this fetches and processes a bunch of data that we don't
         # use. Could be replaced by a tighter query e.g.
         #   SELECT EXISTS(SELECT 1 FROM partial_state_rooms)
         partial_rooms = await self.store.get_partial_state_room_resync_info()
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 444c08bc2e..75e89850f5 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,7 +15,7 @@
 import logging
 from typing import TYPE_CHECKING, Any, Dict
 
-from synapse.api.constants import EduTypes, ToDeviceEventTypes
+from synapse.api.constants import EduTypes, EventContentFields, ToDeviceEventTypes
 from synapse.api.errors import SynapseError
 from synapse.api.ratelimiting import Ratelimiter
 from synapse.logging.context import run_in_background
@@ -216,14 +216,24 @@ class DeviceMessageHandler:
         """
         sender_user_id = requester.user.to_string()
 
-        message_id = random_string(16)
-        set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
-
-        log_kv({"number_of_to_device_messages": len(messages)})
-        set_tag("sender", sender_user_id)
+        set_tag(SynapseTags.TO_DEVICE_TYPE, message_type)
+        set_tag(SynapseTags.TO_DEVICE_SENDER, sender_user_id)
         local_messages = {}
         remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
         for user_id, by_device in messages.items():
+            # add an opentracing log entry for each message
+            for device_id, message_content in by_device.items():
+                log_kv(
+                    {
+                        "event": "send_to_device_message",
+                        "user_id": user_id,
+                        "device_id": device_id,
+                        EventContentFields.TO_DEVICE_MSGID: message_content.get(
+                            EventContentFields.TO_DEVICE_MSGID
+                        ),
+                    }
+                )
+
             # Ratelimit local cross-user key requests by the sending device.
             if (
                 message_type == ToDeviceEventTypes.RoomKeyRequest
@@ -233,6 +243,7 @@ class DeviceMessageHandler:
                     requester, (sender_user_id, requester.device_id)
                 )
                 if not allowed:
+                    log_kv({"message": f"dropping key requests to {user_id}"})
                     logger.info(
                         "Dropping room_key_request from %s to %s due to rate limit",
                         sender_user_id,
@@ -247,18 +258,11 @@ class DeviceMessageHandler:
                         "content": message_content,
                         "type": message_type,
                         "sender": sender_user_id,
-                        "message_id": message_id,
                     }
                     for device_id, message_content in by_device.items()
                 }
                 if messages_by_device:
                     local_messages[user_id] = messages_by_device
-                    log_kv(
-                        {
-                            "user_id": user_id,
-                            "device_id": list(messages_by_device),
-                        }
-                    )
             else:
                 destination = get_domain_from_id(user_id)
                 remote_messages.setdefault(destination, {})[user_id] = by_device
@@ -267,7 +271,11 @@ class DeviceMessageHandler:
 
         remote_edu_contents = {}
         for destination, messages in remote_messages.items():
-            log_kv({"destination": destination})
+            # The EDU contains a "message_id" property which is used for
+            # idempotence. Make up a random one.
+            message_id = random_string(16)
+            log_kv({"destination": destination, "message_id": message_id})
+
             remote_edu_contents[destination] = {
                 "messages": messages,
                 "sender": sender_user_id,
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d92582fd5c..3398fcaf7d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -152,6 +152,7 @@ class FederationHandler:
         self._federation_event_handler = hs.get_federation_event_handler()
         self._device_handler = hs.get_device_handler()
         self._bulk_push_rule_evaluator = hs.get_bulk_push_rule_evaluator()
+        self._notifier = hs.get_notifier()
 
         self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
             hs
@@ -1692,6 +1693,9 @@ class FederationHandler:
                     self._storage_controllers.state.notify_room_un_partial_stated(
                         room_id
                     )
+                    # Poke the notifier so that other workers see the write to
+                    # the un-partial-stated rooms stream.
+                    self._notifier.notify_replication()
 
                     # TODO(faster_joins) update room stats and user directory?
                     #   https://github.com/matrix-org/synapse/issues/12814
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cf08737d11..2af90b25a3 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1692,10 +1692,12 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
 
             if from_key is not None:
                 # First get all users that have had a presence update
-                updated_users = stream_change_cache.get_all_entities_changed(from_key)
+                result = stream_change_cache.get_all_entities_changed(from_key)
 
                 # Cross-reference users we're interested in with those that have had updates.
-                if updated_users is not None:
+                if result.hit:
+                    updated_users = result.entities
+
                     # If we have the full list of changes for presence we can
                     # simply check which ones share a room with the user.
                     get_updates_counter.labels("stream").inc()
@@ -1764,14 +1766,14 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
         Returns:
             A list of presence states for the given user to receive.
         """
+        updated_users = None
         if from_key:
             # Only return updates since the last sync
-            updated_users = self.store.presence_stream_cache.get_all_entities_changed(
-                from_key
-            )
-            if not updated_users:
-                updated_users = []
+            result = self.store.presence_stream_cache.get_all_entities_changed(from_key)
+            if result.hit:
+                updated_users = result.entities
 
+        if updated_users is not None:
             # Get the actual presence update for each change
             users_to_state = await self.get_presence_handler().current_state_for_users(
                 updated_users
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c8858b22dd..dace9b606f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -31,14 +31,20 @@ from typing import (
 import attr
 from prometheus_client import Counter
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventContentFields, EventTypes, Membership
 from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
 from synapse.handlers.relations import BundledAggregations
 from synapse.logging.context import current_context
-from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
+from synapse.logging.opentracing import (
+    SynapseTags,
+    log_kv,
+    set_tag,
+    start_active_span,
+    trace,
+)
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.databases.main.event_push_actions import RoomNotifCounts
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
@@ -1528,10 +1534,12 @@ class SyncHandler:
             #
             # If we don't have that info cached then we get all the users that
             # share a room with our user and check if those users have changed.
-            changed_users = self.store.get_cached_device_list_changes(
+            cache_result = self.store.get_cached_device_list_changes(
                 since_token.device_list_key
             )
-            if changed_users is not None:
+            if cache_result.hit:
+                changed_users = cache_result.entities
+
                 result = await self.store.get_rooms_for_users(changed_users)
 
                 for changed_user_id, entries in result.items():
@@ -1584,6 +1592,7 @@ class SyncHandler:
         else:
             return DeviceListUpdates()
 
+    @trace
     async def _generate_sync_entry_for_to_device(
         self, sync_result_builder: "SyncResultBuilder"
     ) -> None:
@@ -1603,11 +1612,16 @@ class SyncHandler:
             )
 
             for message in messages:
-                # We pop here as we shouldn't be sending the message ID down
-                # `/sync`
-                message_id = message.pop("message_id", None)
-                if message_id:
-                    set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+                log_kv(
+                    {
+                        "event": "to_device_message",
+                        "sender": message["sender"],
+                        "type": message["type"],
+                        EventContentFields.TO_DEVICE_MSGID: message["content"].get(
+                            EventContentFields.TO_DEVICE_MSGID
+                        ),
+                    }
+                )
 
             logger.debug(
                 "Returning %d to-device messages between %d and %d (current token: %d)",
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a0ea719430..3f656ea4f5 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -420,11 +420,11 @@ class TypingWriterHandler(FollowerTypingHandler):
         if last_id == current_id:
             return [], current_id, False
 
-        changed_rooms: Optional[
-            Iterable[str]
-        ] = self._typing_stream_change_cache.get_all_entities_changed(last_id)
+        result = self._typing_stream_change_cache.get_all_entities_changed(last_id)
 
-        if changed_rooms is None:
+        if result.hit:
+            changed_rooms: Iterable[str] = result.entities
+        else:
             changed_rooms = self._room_serials
 
         rows = []
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index b69060854f..a705af8356 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -292,8 +292,15 @@ logger = logging.getLogger(__name__)
 
 
 class SynapseTags:
-    # The message ID of any to_device message processed
-    TO_DEVICE_MESSAGE_ID = "to_device.message_id"
+    # The message ID of any to_device EDU processed
+    TO_DEVICE_EDU_ID = "to_device.edu_id"
+
+    # Details about to-device messages
+    TO_DEVICE_TYPE = "to_device.type"
+    TO_DEVICE_SENDER = "to_device.sender"
+    TO_DEVICE_RECIPIENT = "to_device.recipient"
+    TO_DEVICE_RECIPIENT_DEVICE = "to_device.recipient_device"
+    TO_DEVICE_MSGID = "to_device.msgid"  # client-generated ID
 
     # Whether the sync response has new data to be returned to the client.
     SYNC_RESULT = "sync.new_data"
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index d6b377860f..9ed35d8461 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -106,6 +106,7 @@ class BulkPushRuleEvaluator:
         self.store = hs.get_datastores().main
         self.clock = hs.get_clock()
         self._event_auth_handler = hs.get_event_auth_handler()
+        self.should_calculate_push_rules = self.hs.config.push.enable_push
 
         self._related_event_match_enabled = self.hs.config.experimental.msc3664_enabled
 
@@ -269,6 +270,8 @@ class BulkPushRuleEvaluator:
         for each event, check if the message should increment the unread count, and
         insert the results into the event_push_actions_staging table.
         """
+        if not self.should_calculate_push_rules:
+            return
         # For batched events the power level events may not have been persisted yet,
         # so we pass in the batched events. Thus if the event cannot be found in the
         # database we can check in the batch.
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index edeba27a45..7ee07e4bee 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -17,7 +17,6 @@ from synapse.events import EventBase
 from synapse.push.presentable_names import calculate_room_name, name_from_member_event
 from synapse.storage.controllers import StorageControllers
 from synapse.storage.databases.main import DataStore
-from synapse.util.async_helpers import concurrently_execute
 
 
 async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int:
@@ -26,23 +25,12 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
 
     badge = len(invites)
 
-    room_notifs = []
-
-    async def get_room_unread_count(room_id: str) -> None:
-        room_notifs.append(
-            await store.get_unread_event_push_actions_by_room_for_user(
-                room_id,
-                user_id,
-            )
-        )
-
-    await concurrently_execute(get_room_unread_count, joins, 10)
-
-    for notifs in room_notifs:
-        # Combine the counts from all the threads.
-        notify_count = notifs.main_timeline.notify_count + sum(
-            n.notify_count for n in notifs.threads.values()
-        )
+    room_to_count = await store.get_unread_counts_by_room_for_user(user_id)
+    for room_id, notify_count in room_to_count.items():
+        # room_to_count may include rooms which the user has left,
+        # ignore those.
+        if room_id not in joins:
+            continue
 
         if notify_count == 0:
             continue
@@ -51,8 +39,10 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
             # return one badge count per conversation
             badge += 1
         else:
-            # increment the badge count by the number of unread messages in the room
+            # Increase badge by number of notifications in room
+            # NOTE: this includes threaded and unthreaded notifications.
             badge += notify_count
+
     return badge
 
 
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index b1cd55bf6f..8575666d9c 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -42,6 +42,7 @@ from synapse.replication.tcp.streams._base import (
 )
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.replication.tcp.streams.federation import FederationStream
+from synapse.replication.tcp.streams.partial_state import UnPartialStatedRoomStream
 
 STREAMS_MAP = {
     stream.NAME: stream
@@ -61,6 +62,7 @@ STREAMS_MAP = {
         TagAccountDataStream,
         AccountDataStream,
         UserSignatureStream,
+        UnPartialStatedRoomStream,
     )
 }
 
@@ -80,4 +82,5 @@ __all__ = [
     "TagAccountDataStream",
     "AccountDataStream",
     "UserSignatureStream",
+    "UnPartialStatedRoomStream",
 ]
diff --git a/synapse/replication/tcp/streams/partial_state.py b/synapse/replication/tcp/streams/partial_state.py
new file mode 100644
index 0000000000..18f087ffa2
--- /dev/null
+++ b/synapse/replication/tcp/streams/partial_state.py
@@ -0,0 +1,48 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import TYPE_CHECKING
+
+import attr
+
+from synapse.replication.tcp.streams import Stream
+from synapse.replication.tcp.streams._base import current_token_without_instance
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class UnPartialStatedRoomStreamRow:
+    # ID of the room that has been un-partial-stated.
+    room_id: str
+
+
+class UnPartialStatedRoomStream(Stream):
+    """
+    Stream to notify about rooms becoming un-partial-stated;
+    that is, when the background sync finishes such that we now have full state for
+    the room.
+    """
+
+    NAME = "un_partial_stated_room"
+    ROW_TYPE = UnPartialStatedRoomStreamRow
+
+    def __init__(self, hs: "HomeServer"):
+        store = hs.get_datastores().main
+        super().__init__(
+            hs.get_instance_name(),
+            # TODO(faster_joins, multiple writers): we need to account for instance names
+            current_token_without_instance(store.get_un_partial_stated_rooms_token),
+            store.get_un_partial_stated_rooms_from_stream,
+        )
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 28542cd774..14c4e6ebbb 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -29,7 +29,7 @@ from synapse.rest.client import (
     initial_sync,
     keys,
     knock,
-    login as v1_login,
+    login,
     login_token_request,
     logout,
     mutual_rooms,
@@ -82,6 +82,10 @@ class ClientRestResource(JsonResource):
 
     @staticmethod
     def register_servlets(client_resource: HttpServer, hs: "HomeServer") -> None:
+        # Some servlets are only registered on the main process (and not worker
+        # processes).
+        is_main_process = hs.config.worker.worker_app is None
+
         versions.register_servlets(hs, client_resource)
 
         # Deprecated in r0
@@ -92,45 +96,58 @@ class ClientRestResource(JsonResource):
         events.register_servlets(hs, client_resource)
 
         room.register_servlets(hs, client_resource)
-        v1_login.register_servlets(hs, client_resource)
+        login.register_servlets(hs, client_resource)
         profile.register_servlets(hs, client_resource)
         presence.register_servlets(hs, client_resource)
-        directory.register_servlets(hs, client_resource)
+        if is_main_process:
+            directory.register_servlets(hs, client_resource)
         voip.register_servlets(hs, client_resource)
-        pusher.register_servlets(hs, client_resource)
+        if is_main_process:
+            pusher.register_servlets(hs, client_resource)
         push_rule.register_servlets(hs, client_resource)
-        logout.register_servlets(hs, client_resource)
+        if is_main_process:
+            logout.register_servlets(hs, client_resource)
         sync.register_servlets(hs, client_resource)
-        filter.register_servlets(hs, client_resource)
+        if is_main_process:
+            filter.register_servlets(hs, client_resource)
         account.register_servlets(hs, client_resource)
         register.register_servlets(hs, client_resource)
-        auth.register_servlets(hs, client_resource)
+        if is_main_process:
+            auth.register_servlets(hs, client_resource)
         receipts.register_servlets(hs, client_resource)
         read_marker.register_servlets(hs, client_resource)
         room_keys.register_servlets(hs, client_resource)
         keys.register_servlets(hs, client_resource)
-        tokenrefresh.register_servlets(hs, client_resource)
+        if is_main_process:
+            tokenrefresh.register_servlets(hs, client_resource)
         tags.register_servlets(hs, client_resource)
         account_data.register_servlets(hs, client_resource)
-        report_event.register_servlets(hs, client_resource)
-        openid.register_servlets(hs, client_resource)
-        notifications.register_servlets(hs, client_resource)
+        if is_main_process:
+            report_event.register_servlets(hs, client_resource)
+            openid.register_servlets(hs, client_resource)
+            notifications.register_servlets(hs, client_resource)
         devices.register_servlets(hs, client_resource)
-        thirdparty.register_servlets(hs, client_resource)
+        if is_main_process:
+            thirdparty.register_servlets(hs, client_resource)
         sendtodevice.register_servlets(hs, client_resource)
         user_directory.register_servlets(hs, client_resource)
-        room_upgrade_rest_servlet.register_servlets(hs, client_resource)
+        if is_main_process:
+            room_upgrade_rest_servlet.register_servlets(hs, client_resource)
         room_batch.register_servlets(hs, client_resource)
-        capabilities.register_servlets(hs, client_resource)
-        account_validity.register_servlets(hs, client_resource)
+        if is_main_process:
+            capabilities.register_servlets(hs, client_resource)
+            account_validity.register_servlets(hs, client_resource)
         relations.register_servlets(hs, client_resource)
-        password_policy.register_servlets(hs, client_resource)
-        knock.register_servlets(hs, client_resource)
+        if is_main_process:
+            password_policy.register_servlets(hs, client_resource)
+            knock.register_servlets(hs, client_resource)
 
         # moving to /_synapse/admin
-        admin.register_servlets_for_client_rest_resource(hs, client_resource)
+        if is_main_process:
+            admin.register_servlets_for_client_rest_resource(hs, client_resource)
 
         # unstable
-        mutual_rooms.register_servlets(hs, client_resource)
-        login_token_request.register_servlets(hs, client_resource)
-        rendezvous.register_servlets(hs, client_resource)
+        if is_main_process:
+            mutual_rooms.register_servlets(hs, client_resource)
+            login_token_request.register_servlets(hs, client_resource)
+            rendezvous.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 44f622bcce..b4b92f0c99 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -875,19 +875,21 @@ class AccountStatusRestServlet(RestServlet):
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
-    EmailPasswordRequestTokenRestServlet(hs).register(http_server)
-    PasswordRestServlet(hs).register(http_server)
-    DeactivateAccountRestServlet(hs).register(http_server)
-    EmailThreepidRequestTokenRestServlet(hs).register(http_server)
-    MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
-    AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
-    AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
+    if hs.config.worker.worker_app is None:
+        EmailPasswordRequestTokenRestServlet(hs).register(http_server)
+        PasswordRestServlet(hs).register(http_server)
+        DeactivateAccountRestServlet(hs).register(http_server)
+        EmailThreepidRequestTokenRestServlet(hs).register(http_server)
+        MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
+        AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
+        AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
     ThreepidRestServlet(hs).register(http_server)
-    ThreepidAddRestServlet(hs).register(http_server)
-    ThreepidBindRestServlet(hs).register(http_server)
-    ThreepidUnbindRestServlet(hs).register(http_server)
-    ThreepidDeleteRestServlet(hs).register(http_server)
+    if hs.config.worker.worker_app is None:
+        ThreepidAddRestServlet(hs).register(http_server)
+        ThreepidBindRestServlet(hs).register(http_server)
+        ThreepidUnbindRestServlet(hs).register(http_server)
+        ThreepidDeleteRestServlet(hs).register(http_server)
     WhoamiRestServlet(hs).register(http_server)
 
-    if hs.config.experimental.msc3720_enabled:
+    if hs.config.worker.worker_app is None and hs.config.experimental.msc3720_enabled:
         AccountStatusRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py
index 69b803f9f8..486c6dbbc5 100644
--- a/synapse/rest/client/devices.py
+++ b/synapse/rest/client/devices.py
@@ -342,8 +342,10 @@ class ClaimDehydratedDeviceServlet(RestServlet):
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
-    DeleteDevicesRestServlet(hs).register(http_server)
+    if hs.config.worker.worker_app is None:
+        DeleteDevicesRestServlet(hs).register(http_server)
     DevicesRestServlet(hs).register(http_server)
-    DeviceRestServlet(hs).register(http_server)
-    DehydratedDeviceServlet(hs).register(http_server)
-    ClaimDehydratedDeviceServlet(hs).register(http_server)
+    if hs.config.worker.worker_app is None:
+        DeviceRestServlet(hs).register(http_server)
+        DehydratedDeviceServlet(hs).register(http_server)
+        ClaimDehydratedDeviceServlet(hs).register(http_server)
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index ee038c7192..7873b363c0 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -376,5 +376,6 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     KeyQueryServlet(hs).register(http_server)
     KeyChangesServlet(hs).register(http_server)
     OneTimeKeyServlet(hs).register(http_server)
-    SigningKeyUploadServlet(hs).register(http_server)
-    SignaturesUploadServlet(hs).register(http_server)
+    if hs.config.worker.worker_app is None:
+        SigningKeyUploadServlet(hs).register(http_server)
+        SignaturesUploadServlet(hs).register(http_server)
diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py
index de810ae3ec..3cb1e7e375 100644
--- a/synapse/rest/client/register.py
+++ b/synapse/rest/client/register.py
@@ -949,9 +949,10 @@ def _calculate_registration_flows(
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
-    EmailRegisterRequestTokenRestServlet(hs).register(http_server)
-    MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
-    UsernameAvailabilityRestServlet(hs).register(http_server)
-    RegistrationSubmitTokenServlet(hs).register(http_server)
+    if hs.config.worker.worker_app is None:
+        EmailRegisterRequestTokenRestServlet(hs).register(http_server)
+        MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
+        UsernameAvailabilityRestServlet(hs).register(http_server)
+        RegistrationSubmitTokenServlet(hs).register(http_server)
     RegistrationTokenValidityRestServlet(hs).register(http_server)
     RegisterRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 636cc62877..514eb6afc8 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -396,12 +396,7 @@ class JoinRoomAliasServlet(ResolveRoomIdMixin, TransactionRestServlet):
     ) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request, allow_guest=True)
 
-        try:
-            content = parse_json_object_from_request(request)
-        except Exception:
-            # Turns out we used to ignore the body entirely, and some clients
-            # cheekily send invalid bodies.
-            content = {}
+        content = parse_json_object_from_request(request, allow_empty_body=True)
 
         # twisted.web.server.Request.args is incorrectly defined as Optional[Any]
         args: Dict[bytes, List[bytes]] = request.args  # type: ignore
@@ -952,12 +947,7 @@ class RoomMembershipRestServlet(TransactionRestServlet):
         }:
             raise AuthError(403, "Guest access not allowed")
 
-        try:
-            content = parse_json_object_from_request(request)
-        except Exception:
-            # Turns out we used to ignore the body entirely, and some clients
-            # cheekily send invalid bodies.
-            content = {}
+        content = parse_json_object_from_request(request, allow_empty_body=True)
 
         if membership_action == "invite" and all(
             key in content for key in ("medium", "address")
@@ -1395,9 +1385,7 @@ class RoomSummaryRestServlet(ResolveRoomIdMixin, RestServlet):
         )
 
 
-def register_servlets(
-    hs: "HomeServer", http_server: HttpServer, is_worker: bool = False
-) -> None:
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     RoomStateEventRestServlet(hs).register(http_server)
     RoomMemberListRestServlet(hs).register(http_server)
     JoinedRoomMemberListRestServlet(hs).register(http_server)
@@ -1421,7 +1409,7 @@ def register_servlets(
     TimestampLookupRestServlet(hs).register(http_server)
 
     # Some servlets only get registered for the main process.
-    if not is_worker:
+    if hs.config.worker.worker_app is None:
         RoomForgetRestServlet(hs).register(http_server)
 
 
diff --git a/synapse/rest/client/sendtodevice.py b/synapse/rest/client/sendtodevice.py
index 46a8b03829..55d52f0b28 100644
--- a/synapse/rest/client/sendtodevice.py
+++ b/synapse/rest/client/sendtodevice.py
@@ -46,7 +46,6 @@ class SendToDeviceRestServlet(servlet.RestServlet):
     def on_PUT(
         self, request: SynapseRequest, message_type: str, txn_id: str
     ) -> Awaitable[Tuple[int, JsonDict]]:
-        set_tag("message_type", message_type)
         set_tag("txn_id", txn_id)
         return self.txns.fetch_or_execute_request(
             request, self._put, request, message_type, txn_id
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 3c0a90010b..e19c0946c0 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -77,6 +77,7 @@ class VersionsRestServlet(RestServlet):
                     "v1.2",
                     "v1.3",
                     "v1.4",
+                    "v1.5",
                 ],
                 # as per MSC1497:
                 "unstable_features": {
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 73c95ffb6f..48a54d9cb8 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -26,8 +26,15 @@ from typing import (
     cast,
 )
 
+from synapse.api.constants import EventContentFields
 from synapse.logging import issue9533_logger
-from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.logging.opentracing import (
+    SynapseTags,
+    log_kv,
+    set_tag,
+    start_active_span,
+    trace,
+)
 from synapse.replication.tcp.streams import ToDeviceStream
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import (
@@ -397,6 +404,17 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                     (recipient_user_id, recipient_device_id), []
                 ).append(message_dict)
 
+                # start a new span for each message, so that we can tag each separately
+                with start_active_span("get_to_device_message"):
+                    set_tag(SynapseTags.TO_DEVICE_TYPE, message_dict["type"])
+                    set_tag(SynapseTags.TO_DEVICE_SENDER, message_dict["sender"])
+                    set_tag(SynapseTags.TO_DEVICE_RECIPIENT, recipient_user_id)
+                    set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, recipient_device_id)
+                    set_tag(
+                        SynapseTags.TO_DEVICE_MSGID,
+                        message_dict["content"].get(EventContentFields.TO_DEVICE_MSGID),
+                    )
+
             if limit is not None and rowcount == limit:
                 # We ended up bumping up against the message limit. There may be more messages
                 # to retrieve. Return what we have, as well as the last stream position that
@@ -678,12 +696,35 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 ],
             )
 
-            if remote_messages_by_destination:
-                issue9533_logger.debug(
-                    "Queued outgoing to-device messages with stream_id %i for %s",
-                    stream_id,
-                    list(remote_messages_by_destination.keys()),
-                )
+            for destination, edu in remote_messages_by_destination.items():
+                if issue9533_logger.isEnabledFor(logging.DEBUG):
+                    issue9533_logger.debug(
+                        "Queued outgoing to-device messages with "
+                        "stream_id %i, EDU message_id %s, type %s for %s: %s",
+                        stream_id,
+                        edu["message_id"],
+                        edu["type"],
+                        destination,
+                        [
+                            f"{user_id}/{device_id} (msgid "
+                            f"{msg.get(EventContentFields.TO_DEVICE_MSGID)})"
+                            for (user_id, messages_by_device) in edu["messages"].items()
+                            for (device_id, msg) in messages_by_device.items()
+                        ],
+                    )
+
+                for (user_id, messages_by_device) in edu["messages"].items():
+                    for (device_id, msg) in messages_by_device.items():
+                        with start_active_span("store_outgoing_to_device_message"):
+                            set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["sender"])
+                            set_tag(SynapseTags.TO_DEVICE_EDU_ID, edu["message_id"])
+                            set_tag(SynapseTags.TO_DEVICE_TYPE, edu["type"])
+                            set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
+                            set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
+                            set_tag(
+                                SynapseTags.TO_DEVICE_MSGID,
+                                msg.get(EventContentFields.TO_DEVICE_MSGID),
+                            )
 
         async with self._device_inbox_id_gen.get_next() as stream_id:
             now_ms = self._clock.time_msec()
@@ -801,7 +842,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                     # Only insert into the local inbox if the device exists on
                     # this server
                     device_id = row["device_id"]
-                    message_json = json_encoder.encode(messages_by_device[device_id])
+
+                    with start_active_span("serialise_to_device_message"):
+                        msg = messages_by_device[device_id]
+                        set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
+                        set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
+                        set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
+                        set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
+                        set_tag(
+                            SynapseTags.TO_DEVICE_MSGID,
+                            msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
+                        )
+                        message_json = json_encoder.encode(msg)
+
                     messages_json_for_user[device_id] = message_json
 
             if messages_json_for_user:
@@ -821,15 +874,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             ],
         )
 
-        issue9533_logger.debug(
-            "Stored to-device messages with stream_id %i for %s",
-            stream_id,
-            [
-                (user_id, device_id)
-                for (user_id, messages_by_device) in local_by_user_then_device.items()
-                for device_id in messages_by_device.keys()
-            ],
-        )
+        if issue9533_logger.isEnabledFor(logging.DEBUG):
+            issue9533_logger.debug(
+                "Stored to-device messages with stream_id %i: %s",
+                stream_id,
+                [
+                    f"{user_id}/{device_id} (msgid "
+                    f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
+                    for (
+                        user_id,
+                        messages_by_device,
+                    ) in messages_by_user_then_device.items()
+                    for (device_id, msg) in messages_by_device.items()
+                ],
+            )
 
 
 class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 534f7fc04a..a5bb4d404e 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -58,7 +58,10 @@ from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
 from synapse.util import json_decoder, json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import LruCache
-from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.caches.stream_change_cache import (
+    AllEntitiesChangedResult,
+    StreamChangeCache,
+)
 from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 from synapse.util.stringutils import shortstr
@@ -799,7 +802,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
     def get_cached_device_list_changes(
         self,
         from_key: int,
-    ) -> Optional[List[str]]:
+    ) -> AllEntitiesChangedResult:
         """Get set of users whose devices have changed since `from_key`, or None
         if that information is not in our cache.
         """
@@ -807,10 +810,58 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
         return self._device_list_stream_cache.get_all_entities_changed(from_key)
 
     @cancellable
+    async def get_all_devices_changed(
+        self,
+        from_key: int,
+        to_key: int,
+    ) -> Set[str]:
+        """Get all users whose devices have changed in the given range.
+
+        Args:
+            from_key: The minimum device lists stream token to query device list
+                changes for, exclusive.
+            to_key: The maximum device lists stream token to query device list
+                changes for, inclusive.
+
+        Returns:
+            The set of user_ids whose devices have changed since `from_key`
+            (exclusive) until `to_key` (inclusive).
+        """
+
+        result = self._device_list_stream_cache.get_all_entities_changed(from_key)
+
+        if result.hit:
+            # We know which users might have changed devices.
+            if not result.entities:
+                # If no users then we can return early.
+                return set()
+
+            # Otherwise we need to filter down the list
+            return await self.get_users_whose_devices_changed(
+                from_key, result.entities, to_key
+            )
+
+        # If the cache didn't tell us anything, we just need to query the full
+        # range.
+        sql = """
+            SELECT DISTINCT user_id FROM device_lists_stream
+            WHERE ? < stream_id AND stream_id <= ?
+        """
+
+        rows = await self.db_pool.execute(
+            "get_all_devices_changed",
+            None,
+            sql,
+            from_key,
+            to_key,
+        )
+        return {u for u, in rows}
+
+    @cancellable
     async def get_users_whose_devices_changed(
         self,
         from_key: int,
-        user_ids: Optional[Collection[str]] = None,
+        user_ids: Collection[str],
         to_key: Optional[int] = None,
     ) -> Set[str]:
         """Get set of users whose devices have changed since `from_key` that
@@ -830,46 +881,31 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
         """
         # Get set of users who *may* have changed. Users not in the returned
         # list have definitely not changed.
-        user_ids_to_check: Optional[Collection[str]]
-        if user_ids is None:
-            # Get set of all users that have had device list changes since 'from_key'
-            user_ids_to_check = self._device_list_stream_cache.get_all_entities_changed(
-                from_key
-            )
-        else:
-            # The same as above, but filter results to only those users in 'user_ids'
-            user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
-                user_ids, from_key
-            )
+        user_ids_to_check = self._device_list_stream_cache.get_entities_changed(
+            user_ids, from_key
+        )
 
+        # If an empty set was returned, there's nothing to do.
         if not user_ids_to_check:
             return set()
 
-        def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
-            changes: Set[str] = set()
-
-            stream_id_where_clause = "stream_id > ?"
-            sql_args = [from_key]
-
-            if to_key:
-                stream_id_where_clause += " AND stream_id <= ?"
-                sql_args.append(to_key)
+        if to_key is None:
+            to_key = self._device_list_id_gen.get_current_token()
 
-            sql = f"""
+        def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
+            sql = """
                 SELECT DISTINCT user_id FROM device_lists_stream
-                WHERE {stream_id_where_clause}
-                AND
+                WHERE  ? < stream_id AND stream_id <= ? AND %s
             """
 
+            changes: Set[str] = set()
+
             # Query device changes with a batch of users at a time
-            # Assertion for mypy's benefit; see also
-            # https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions
-            assert user_ids_to_check is not None
             for chunk in batch_iter(user_ids_to_check, 100):
                 clause, args = make_in_list_sql_clause(
                     txn.database_engine, "user_id", chunk
                 )
-                txn.execute(sql + clause, sql_args + args)
+                txn.execute(sql % (clause,), [from_key, to_key] + args)
                 changes.update(user_id for user_id, in txn)
 
             return changes
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index b283ab0f9c..7ebe34f773 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -74,6 +74,7 @@ receipt.
 """
 
 import logging
+from collections import defaultdict
 from typing import (
     TYPE_CHECKING,
     Collection,
@@ -95,6 +96,7 @@ from synapse.storage.database import (
     DatabasePool,
     LoggingDatabaseConnection,
     LoggingTransaction,
+    PostgresEngine,
 )
 from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
 from synapse.storage.databases.main.stream import StreamWorkerStore
@@ -463,6 +465,153 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
 
         return result
 
+    async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]:
+        """Get the notification count by room for a user. Only considers notifications,
+        not highlight or unread counts, and threads are currently aggregated under their room.
+
+        This function is intentionally not cached because it is called to calculate the
+        unread badge for push notifications and thus the result is expected to change.
+
+        Note that this function assumes the user is a member of the room. Because
+        summary rows are not removed when a user leaves a room, the caller must
+        filter out those results from the result.
+
+        Returns:
+            A map of room ID to notification counts for the given user.
+        """
+        return await self.db_pool.runInteraction(
+            "get_unread_counts_by_room_for_user",
+            self._get_unread_counts_by_room_for_user_txn,
+            user_id,
+        )
+
+    def _get_unread_counts_by_room_for_user_txn(
+        self, txn: LoggingTransaction, user_id: str
+    ) -> Dict[str, int]:
+        receipt_types_clause, args = make_in_list_sql_clause(
+            self.database_engine,
+            "receipt_type",
+            (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
+        )
+        args.extend([user_id, user_id])
+
+        receipts_cte = f"""
+            WITH all_receipts AS (
+                SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering
+                FROM receipts_linearized
+                LEFT JOIN events USING (room_id, event_id)
+                WHERE
+                    {receipt_types_clause}
+                    AND user_id = ?
+                GROUP BY room_id, thread_id
+            )
+        """
+
+        receipts_joins = """
+            LEFT JOIN (
+                SELECT room_id, thread_id,
+                max_receipt_stream_ordering AS threaded_receipt_stream_ordering
+                FROM all_receipts
+                WHERE thread_id IS NOT NULL
+            ) AS threaded_receipts USING (room_id, thread_id)
+            LEFT JOIN (
+                SELECT room_id, thread_id,
+                max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering
+                FROM all_receipts
+                WHERE thread_id IS NULL
+            ) AS unthreaded_receipts USING (room_id)
+        """
+
+        # First get summary counts by room / thread for the user. We use the max receipt
+        # stream ordering of both threaded & unthreaded receipts to compare against the
+        # summary table.
+        #
+        # PostgreSQL and SQLite differ in comparing scalar numerics.
+        if isinstance(self.database_engine, PostgresEngine):
+            # GREATEST ignores NULLs.
+            max_clause = """GREATEST(
+                threaded_receipt_stream_ordering,
+                unthreaded_receipt_stream_ordering
+            )"""
+        else:
+            # MAX returns NULL if any are NULL, so COALESCE to 0 first.
+            max_clause = """MAX(
+                COALESCE(threaded_receipt_stream_ordering, 0),
+                COALESCE(unthreaded_receipt_stream_ordering, 0)
+            )"""
+
+        sql = f"""
+            {receipts_cte}
+            SELECT eps.room_id, eps.thread_id, notif_count
+            FROM event_push_summary AS eps
+            {receipts_joins}
+            WHERE user_id = ?
+                AND notif_count != 0
+                AND (
+                    (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause})
+                    OR last_receipt_stream_ordering = {max_clause}
+                )
+        """
+        txn.execute(sql, args)
+
+        seen_thread_ids = set()
+        room_to_count: Dict[str, int] = defaultdict(int)
+
+        for room_id, thread_id, notif_count in txn:
+            room_to_count[room_id] += notif_count
+            seen_thread_ids.add(thread_id)
+
+        # Now get any event push actions that haven't been rotated using the same OR
+        # join and filter by receipt and event push summary rotated up to stream ordering.
+        sql = f"""
+            {receipts_cte}
+            SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
+            FROM event_push_actions AS epa
+            {receipts_joins}
+            WHERE user_id = ?
+                AND epa.notif = 1
+                AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering)
+                AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
+                AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
+            GROUP BY epa.room_id, epa.thread_id
+        """
+        txn.execute(sql, args)
+
+        for room_id, thread_id, notif_count in txn:
+            # Note: only count push actions we have valid summaries for with up to date receipt.
+            if thread_id not in seen_thread_ids:
+                continue
+            room_to_count[room_id] += notif_count
+
+        thread_id_clause, thread_ids_args = make_in_list_sql_clause(
+            self.database_engine, "epa.thread_id", seen_thread_ids
+        )
+
+        # Finally re-check event_push_actions for any rooms not in the summary, ignoring
+        # the rotated up-to position. This handles the case where a read receipt has arrived
+        # but not been rotated meaning the summary table is out of date, so we go back to
+        # the push actions table.
+        sql = f"""
+            {receipts_cte}
+            SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
+            FROM event_push_actions AS epa
+            {receipts_joins}
+            WHERE user_id = ?
+            AND NOT {thread_id_clause}
+            AND epa.notif = 1
+            AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
+            AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
+            GROUP BY epa.room_id
+        """
+
+        args.extend(thread_ids_args)
+        txn.execute(sql, args)
+
+        for room_id, notif_count in txn:
+            room_to_count[room_id] += notif_count
+
+        return room_to_count
+
     @cached(tree=True, max_entries=5000, iterable=True)
     async def get_unread_event_push_actions_by_room_for_user(
         self,
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 1309bfd374..78906a5e1d 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1,5 +1,5 @@
 # Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019, 2022 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -50,8 +50,14 @@ from synapse.storage.database import (
     LoggingTransaction,
 )
 from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
+from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
-from synapse.storage.util.id_generators import IdGenerator
+from synapse.storage.util.id_generators import (
+    AbstractStreamIdGenerator,
+    IdGenerator,
+    MultiWriterIdGenerator,
+    StreamIdGenerator,
+)
 from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
@@ -114,6 +120,26 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
         self.config: HomeServerConfig = hs.config
 
+        self._un_partial_stated_rooms_stream_id_gen: AbstractStreamIdGenerator
+
+        if isinstance(database.engine, PostgresEngine):
+            self._un_partial_stated_rooms_stream_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                stream_name="un_partial_stated_room_stream",
+                instance_name=self._instance_name,
+                tables=[
+                    ("un_partial_stated_room_stream", "instance_name", "stream_id")
+                ],
+                sequence_name="un_partial_stated_room_stream_sequence",
+                # TODO(faster_joins, multiple writers) Support multiple writers.
+                writers=["master"],
+            )
+        else:
+            self._un_partial_stated_rooms_stream_id_gen = StreamIdGenerator(
+                db_conn, "un_partial_stated_room_stream", "stream_id"
+            )
+
     async def store_room(
         self,
         room_id: str,
@@ -1216,70 +1242,6 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
 
         return room_servers
 
-    async def clear_partial_state_room(self, room_id: str) -> bool:
-        """Clears the partial state flag for a room.
-
-        Args:
-            room_id: The room whose partial state flag is to be cleared.
-
-        Returns:
-            `True` if the partial state flag has been cleared successfully.
-
-            `False` if the partial state flag could not be cleared because the room
-            still contains events with partial state.
-        """
-        try:
-            await self.db_pool.runInteraction(
-                "clear_partial_state_room", self._clear_partial_state_room_txn, room_id
-            )
-            return True
-        except self.db_pool.engine.module.IntegrityError as e:
-            # Assume that any `IntegrityError`s are due to partial state events.
-            logger.info(
-                "Exception while clearing lazy partial-state-room %s, retrying: %s",
-                room_id,
-                e,
-            )
-            return False
-
-    def _clear_partial_state_room_txn(
-        self, txn: LoggingTransaction, room_id: str
-    ) -> None:
-        DatabasePool.simple_delete_txn(
-            txn,
-            table="partial_state_rooms_servers",
-            keyvalues={"room_id": room_id},
-        )
-        DatabasePool.simple_delete_one_txn(
-            txn,
-            table="partial_state_rooms",
-            keyvalues={"room_id": room_id},
-        )
-        self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
-        self._invalidate_cache_and_stream(
-            txn, self.get_partial_state_servers_at_join, (room_id,)
-        )
-
-        # We now delete anything from `device_lists_remote_pending` with a
-        # stream ID less than the minimum
-        # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
-        device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
-            txn,
-            table="partial_state_rooms",
-            keyvalues={},
-            retcol="MIN(device_lists_stream_id)",
-            allow_none=True,
-        )
-        if device_lists_stream_id is None:
-            # There are no rooms being currently partially joined, so we delete everything.
-            txn.execute("DELETE FROM device_lists_remote_pending")
-        else:
-            sql = """
-                DELETE FROM device_lists_remote_pending
-                WHERE stream_id <= ?
-            """
-            txn.execute(sql, (device_lists_stream_id,))
-
     @cached()
     async def is_partial_state_room(self, room_id: str) -> bool:
         """Checks if this room has partial state.
@@ -1315,6 +1277,66 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
         )
         return result["join_event_id"], result["device_lists_stream_id"]
 
+    def get_un_partial_stated_rooms_token(self) -> int:
+        # TODO(faster_joins, multiple writers): This is inappropriate if there
+        #     are multiple writers because workers that don't write often will
+        #     hold all readers up.
+        #     (See `MultiWriterIdGenerator.get_persisted_upto_position` for an
+        #      explanation.)
+        return self._un_partial_stated_rooms_stream_id_gen.get_current_token()
+
+    async def get_un_partial_stated_rooms_from_stream(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
+        """Get updates for caches replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
+        if last_id == current_id:
+            return [], current_id, False
+
+        def get_un_partial_stated_rooms_from_stream_txn(
+            txn: LoggingTransaction,
+        ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]:
+            sql = """
+                SELECT stream_id, room_id
+                FROM un_partial_stated_room_stream
+                WHERE ? < stream_id AND stream_id <= ? AND instance_name = ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, instance_name, limit))
+            updates = [(row[0], (row[1],)) for row in txn]
+            limited = False
+            upto_token = current_id
+            if len(updates) >= limit:
+                upto_token = updates[-1][0]
+                limited = True
+
+            return updates, upto_token, limited
+
+        return await self.db_pool.runInteraction(
+            "get_un_partial_stated_rooms_from_stream",
+            get_un_partial_stated_rooms_from_stream_txn,
+        )
+
 
 class _BackgroundUpdates:
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
@@ -1806,6 +1828,8 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
 
         self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
 
+        self._instance_name = hs.get_instance_name()
+
     async def upsert_room_on_join(
         self, room_id: str, room_version: RoomVersion, state_events: List[EventBase]
     ) -> None:
@@ -2270,3 +2294,84 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
             self.is_room_blocked,
             (room_id,),
         )
+
+    async def clear_partial_state_room(self, room_id: str) -> bool:
+        """Clears the partial state flag for a room.
+
+        Args:
+            room_id: The room whose partial state flag is to be cleared.
+
+        Returns:
+            `True` if the partial state flag has been cleared successfully.
+
+            `False` if the partial state flag could not be cleared because the room
+            still contains events with partial state.
+        """
+        try:
+            async with self._un_partial_stated_rooms_stream_id_gen.get_next() as un_partial_state_room_stream_id:
+                await self.db_pool.runInteraction(
+                    "clear_partial_state_room",
+                    self._clear_partial_state_room_txn,
+                    room_id,
+                    un_partial_state_room_stream_id,
+                )
+                return True
+        except self.db_pool.engine.module.IntegrityError as e:
+            # Assume that any `IntegrityError`s are due to partial state events.
+            logger.info(
+                "Exception while clearing lazy partial-state-room %s, retrying: %s",
+                room_id,
+                e,
+            )
+            return False
+
+    def _clear_partial_state_room_txn(
+        self,
+        txn: LoggingTransaction,
+        room_id: str,
+        un_partial_state_room_stream_id: int,
+    ) -> None:
+        DatabasePool.simple_delete_txn(
+            txn,
+            table="partial_state_rooms_servers",
+            keyvalues={"room_id": room_id},
+        )
+        DatabasePool.simple_delete_one_txn(
+            txn,
+            table="partial_state_rooms",
+            keyvalues={"room_id": room_id},
+        )
+        self._invalidate_cache_and_stream(txn, self.is_partial_state_room, (room_id,))
+        self._invalidate_cache_and_stream(
+            txn, self.get_partial_state_servers_at_join, (room_id,)
+        )
+
+        DatabasePool.simple_insert_txn(
+            txn,
+            "un_partial_stated_room_stream",
+            {
+                "stream_id": un_partial_state_room_stream_id,
+                "instance_name": self._instance_name,
+                "room_id": room_id,
+            },
+        )
+
+        # We now delete anything from `device_lists_remote_pending` with a
+        # stream ID less than the minimum
+        # `partial_state_rooms.device_lists_stream_id`, as we no longer need them.
+        device_lists_stream_id = DatabasePool.simple_select_one_onecol_txn(
+            txn,
+            table="partial_state_rooms",
+            keyvalues={},
+            retcol="MIN(device_lists_stream_id)",
+            allow_none=True,
+        )
+        if device_lists_stream_id is None:
+            # There are no rooms being currently partially joined, so we delete everything.
+            txn.execute("DELETE FROM device_lists_remote_pending")
+        else:
+            sql = """
+                DELETE FROM device_lists_remote_pending
+                WHERE stream_id <= ?
+            """
+            txn.execute(sql, (device_lists_stream_id,))
diff --git a/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
new file mode 100644
index 0000000000..743196cfe3
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/20_un_partial_stated_room_stream.sql
@@ -0,0 +1,32 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Stream for notifying that a room has become un-partial-stated.
+CREATE TABLE un_partial_stated_room_stream(
+    -- Position in the stream
+    stream_id BIGINT PRIMARY KEY NOT NULL,
+
+    -- Which instance wrote this entry.
+    instance_name TEXT NOT NULL,
+
+    -- Which room has been un-partial-stated.
+    room_id TEXT NOT NULL REFERENCES rooms(room_id) ON DELETE CASCADE
+);
+
+-- We want an index here because of the foreign key constraint:
+-- upon deleting a room, the database needs to be able to check here.
+-- This index is not unique because we can join a room multiple times in a server's lifetime,
+-- so the same room could be un-partial-stated multiple times!
+CREATE INDEX un_partial_stated_room_stream_room_id ON un_partial_stated_room_stream (room_id);
diff --git a/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
new file mode 100644
index 0000000000..c1aac0b385
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/21_un_partial_stated_room_stream_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS un_partial_stated_room_stream_sequence;
+
+SELECT setval('un_partial_stated_room_stream_sequence', (
+    SELECT COALESCE(MAX(stream_id), 1) FROM un_partial_stated_room_stream
+));
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 666f4b6895..c8b17acb59 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -16,6 +16,7 @@ import logging
 import math
 from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Union
 
+import attr
 from sortedcontainers import SortedDict
 
 from synapse.util import caches
@@ -26,14 +27,41 @@ logger = logging.getLogger(__name__)
 EntityType = str
 
 
+@attr.s(auto_attribs=True, frozen=True, slots=True)
+class AllEntitiesChangedResult:
+    """Return type of `get_all_entities_changed`.
+
+    Callers must check that there was a cache hit, via `result.hit`, before
+    using the entities in `result.entities`.
+
+    This specifically does *not* implement helpers such as `__bool__` to ensure
+    that callers do the correct checks.
+    """
+
+    _entities: Optional[List[EntityType]]
+
+    @property
+    def hit(self) -> bool:
+        return self._entities is not None
+
+    @property
+    def entities(self) -> List[EntityType]:
+        assert self._entities is not None
+        return self._entities
+
+
 class StreamChangeCache:
-    """Keeps track of the stream positions of the latest change in a set of entities.
+    """
+    Keeps track of the stream positions of the latest change in a set of entities.
 
-    Typically the entity will be a room or user id.
+    The entity will is typically a room ID or user ID, but can be any string.
 
-    Given a list of entities and a stream position, it will give a subset of
-    entities that may have changed since that position. If position key is too
-    old then the cache will simply return all given entities.
+    Can be queried for whether a specific entity has changed after a stream position
+    or for a list of changed entities after a stream position. See the individual
+    methods for more information.
+
+    Only tracks to a maximum cache size, any position earlier than the earliest
+    known stream position must be treated as unknown.
     """
 
     def __init__(
@@ -45,16 +73,20 @@ class StreamChangeCache:
     ) -> None:
         self._original_max_size: int = max_size
         self._max_size = math.floor(max_size)
-        self._entity_to_key: Dict[EntityType, int] = {}
 
-        # map from stream id to the a set of entities which changed at that stream id.
+        # map from stream id to the set of entities which changed at that stream id.
         self._cache: SortedDict[int, Set[EntityType]] = SortedDict()
+        # map from entity to the stream ID of the latest change for that entity.
+        #
+        # Must be kept in sync with _cache.
+        self._entity_to_key: Dict[EntityType, int] = {}
 
         # the earliest stream_pos for which we can reliably answer
         # get_all_entities_changed. In other words, one less than the earliest
         # stream_pos for which we know _cache is valid.
         #
         self._earliest_known_stream_pos = current_stream_pos
+
         self.name = name
         self.metrics = caches.register_cache(
             "cache", self.name, self._cache, resize_callback=self.set_cache_factor
@@ -82,22 +114,46 @@ class StreamChangeCache:
         return False
 
     def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
-        """Returns True if the entity may have been updated since stream_pos"""
+        """
+        Returns True if the entity may have been updated after stream_pos.
+
+        Args:
+            entity: The entity to check for changes.
+            stream_pos: The stream position to check for changes after.
+
+        Return:
+            True if the entity may have been updated, this happens if:
+                * The given stream position is at or earlier than the earliest
+                  known stream position.
+                * The given stream position is earlier than the latest change for
+                  the entity.
+
+            False otherwise:
+                * The entity is unknown.
+                * The given stream position is at or later than the latest change
+                  for the entity.
+        """
         assert isinstance(stream_pos, int)
 
-        if stream_pos < self._earliest_known_stream_pos:
+        # _cache is not valid at or before the earliest known stream position, so
+        # return that the entity has changed.
+        if stream_pos <= self._earliest_known_stream_pos:
             self.metrics.inc_misses()
             return True
 
+        # If the entity is unknown, it hasn't changed.
         latest_entity_change_pos = self._entity_to_key.get(entity, None)
         if latest_entity_change_pos is None:
             self.metrics.inc_hits()
             return False
 
+        # This is a known entity, return true if the stream position is earlier
+        # than the last change.
         if stream_pos < latest_entity_change_pos:
             self.metrics.inc_misses()
             return True
 
+        # Otherwise, the stream position is after the latest change: return false.
         self.metrics.inc_hits()
         return False
 
@@ -105,23 +161,35 @@ class StreamChangeCache:
         self, entities: Collection[EntityType], stream_pos: int
     ) -> Union[Set[EntityType], FrozenSet[EntityType]]:
         """
-        Returns subset of entities that have had new things since the given
-        position.  Entities unknown to the cache will be returned.  If the
-        position is too old it will just return the given list.
+        Returns the subset of the given entities that have had changes after the given position.
+
+        Entities unknown to the cache will be returned.
+
+        If the position is too old it will just return the given list.
+
+        Args:
+            entities: Entities to check for changes.
+            stream_pos: The stream position to check for changes after.
+
+        Return:
+            A subset of entities which have changed after the given stream position.
+
+            This will be all entities if the given stream position is at or earlier
+            than the earliest known stream position.
         """
-        changed_entities = self.get_all_entities_changed(stream_pos)
-        if changed_entities is not None:
+        cache_result = self.get_all_entities_changed(stream_pos)
+        if cache_result.hit:
             # We now do an intersection, trying to do so in the most efficient
             # way possible (some of these sets are *large*). First check in the
-            # given iterable is already set that we can reuse, otherwise we
+            # given iterable is already a set that we can reuse, otherwise we
             # create a set of the *smallest* of the two iterables and call
             # `intersection(..)` on it (this can be twice as fast as the reverse).
             if isinstance(entities, (set, frozenset)):
-                result = entities.intersection(changed_entities)
-            elif len(changed_entities) < len(entities):
-                result = set(changed_entities).intersection(entities)
+                result = entities.intersection(cache_result.entities)
+            elif len(cache_result.entities) < len(entities):
+                result = set(cache_result.entities).intersection(entities)
             else:
-                result = set(entities).intersection(changed_entities)
+                result = set(entities).intersection(cache_result.entities)
             self.metrics.inc_hits()
         else:
             result = set(entities)
@@ -130,43 +198,75 @@ class StreamChangeCache:
         return result
 
     def has_any_entity_changed(self, stream_pos: int) -> bool:
-        """Returns if any entity has changed"""
-        assert type(stream_pos) is int
+        """
+        Returns true if any entity has changed after the given stream position.
+
+        Args:
+            stream_pos: The stream position to check for changes after.
+
+        Return:
+            True if any entity has changed after the given stream position or
+            if the given stream position is at or earlier than the earliest
+            known stream position.
+
+            False otherwise.
+        """
+        assert isinstance(stream_pos, int)
 
         if not self._cache:
             # If the cache is empty, nothing can have changed.
             return False
 
-        if stream_pos >= self._earliest_known_stream_pos:
-            self.metrics.inc_hits()
-            return self._cache.bisect_right(stream_pos) < len(self._cache)
-        else:
+        # _cache is not valid at or before the earliest known stream position, so
+        # return that an entity has changed.
+        if stream_pos <= self._earliest_known_stream_pos:
             self.metrics.inc_misses()
             return True
 
-    def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]:
-        """Returns all entities that have had new things since the given
-        position. If the position is too old it will return None.
+        self.metrics.inc_hits()
+        return stream_pos < self._cache.peekitem()[0]
+
+    def get_all_entities_changed(self, stream_pos: int) -> AllEntitiesChangedResult:
+        """
+        Returns all entities that have had changes after the given position.
+
+        If the stream change cache does not go far enough back, i.e. the
+        position is too old, it will return None.
 
         Returns the entities in the order that they were changed.
+
+        Args:
+            stream_pos: The stream position to check for changes after.
+
+        Return:
+            A class indicating if we have the requested data cached, and if so
+            includes the entities in the order they were changed.
         """
-        assert type(stream_pos) is int
+        assert isinstance(stream_pos, int)
 
-        if stream_pos < self._earliest_known_stream_pos:
-            return None
+        # _cache is not valid at or before the earliest known stream position, so
+        # return None to mark that it is unknown if an entity has changed.
+        if stream_pos <= self._earliest_known_stream_pos:
+            return AllEntitiesChangedResult(None)
 
         changed_entities: List[EntityType] = []
 
         for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)):
             changed_entities.extend(self._cache[k])
-        return changed_entities
+        return AllEntitiesChangedResult(changed_entities)
 
     def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
-        """Informs the cache that the entity has been changed at the given
-        position.
         """
-        assert type(stream_pos) is int
+        Informs the cache that the entity has been changed at the given position.
+
+        Args:
+            entity: The entity to mark as changed.
+            stream_pos: The stream position to update the entity to.
+        """
+        assert isinstance(stream_pos, int)
 
+        # For a change before _cache is valid (e.g. at or before the earliest known
+        # stream position) there's nothing to do.
         if stream_pos <= self._earliest_known_stream_pos:
             return
 
@@ -189,6 +289,11 @@ class StreamChangeCache:
         self._evict()
 
     def _evict(self) -> None:
+        """
+        Ensure the cache has not exceeded the maximum size.
+
+        Evicts entries until it is at the maximum size.
+        """
         # if the cache is too big, remove entries
         while len(self._cache) > self._max_size:
             k, r = self._cache.popitem(0)
@@ -199,5 +304,12 @@ class StreamChangeCache:
     def get_max_pos_of_last_change(self, entity: EntityType) -> int:
         """Returns an upper bound of the stream id of the last change to an
         entity.
+
+        Args:
+            entity: The entity to check.
+
+        Return:
+            The stream position of the latest change for the given entity or
+            the earliest known stream position if the entitiy is unknown.
         """
         return self._entity_to_key.get(entity, self._earliest_known_stream_pos)
diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py
index 63628aa6b0..f7c309cad0 100644
--- a/tests/crypto/test_keyring.py
+++ b/tests/crypto/test_keyring.py
@@ -433,7 +433,7 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase):
 
         async def get_json(destination, path, **kwargs):
             self.assertEqual(destination, SERVER_NAME)
-            self.assertEqual(path, "/_matrix/key/v2/server/key1")
+            self.assertEqual(path, "/_matrix/key/v2/server")
             return response
 
         self.http_client.get_json.side_effect = get_json
@@ -469,18 +469,6 @@ class ServerKeyFetcherTestCase(unittest.HomeserverTestCase):
         keys = self.get_success(fetcher.get_keys(SERVER_NAME, ["key1"], 0))
         self.assertEqual(keys, {})
 
-    def test_keyid_containing_forward_slash(self) -> None:
-        """We should url-encode any url unsafe chars in key ids.
-
-        Detects https://github.com/matrix-org/synapse/issues/14488.
-        """
-        fetcher = ServerKeyFetcher(self.hs)
-        self.get_success(fetcher.get_keys("example.com", ["key/potato"], 0))
-
-        self.http_client.get_json.assert_called_once()
-        args, kwargs = self.http_client.get_json.call_args
-        self.assertEqual(kwargs["path"], "/_matrix/key/v2/server/key%2Fpotato")
-
 
 class PerspectivesKeyFetcherTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor, clock):
diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py
index 685a9a6d52..b703e4472e 100644
--- a/tests/events/test_presence_router.py
+++ b/tests/events/test_presence_router.py
@@ -126,6 +126,13 @@ class PresenceRouterTestModule:
 
 
 class PresenceRouterTestCase(FederatingHomeserverTestCase):
+    """
+    Test cases using a custom PresenceRouter
+
+    By default in test cases, federation sending is disabled. This class re-enables it
+    for the main process by setting `federation_sender_instances` to None.
+    """
+
     servlets = [
         admin.register_servlets,
         login.register_servlets,
@@ -150,6 +157,11 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
         self.sync_handler = self.hs.get_sync_handler()
         self.module_api = homeserver.get_module_api()
 
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        config["federation_sender_instances"] = None
+        return config
+
     @override_config(
         {
             "presence": {
@@ -162,7 +174,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
                     },
                 }
             },
-            "send_federation": True,
         }
     )
     def test_receiving_all_presence_legacy(self):
@@ -180,7 +191,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
                     },
                 },
             ],
-            "send_federation": True,
         }
     )
     def test_receiving_all_presence(self):
@@ -290,7 +300,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
                     },
                 }
             },
-            "send_federation": True,
         }
     )
     def test_send_local_online_presence_to_with_module_legacy(self):
@@ -310,7 +319,6 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
                     },
                 },
             ],
-            "send_federation": True,
         }
     )
     def test_send_local_online_presence_to_with_module(self):
diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index 2873b4d430..b8fee72898 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -7,13 +7,21 @@ from synapse.federation.sender import PerDestinationQueue, TransactionManager
 from synapse.federation.units import Edu
 from synapse.rest import admin
 from synapse.rest.client import login, room
+from synapse.types import JsonDict
 from synapse.util.retryutils import NotRetryingDestination
 
 from tests.test_utils import event_injection, make_awaitable
-from tests.unittest import FederatingHomeserverTestCase, override_config
+from tests.unittest import FederatingHomeserverTestCase
 
 
 class FederationCatchUpTestCases(FederatingHomeserverTestCase):
+    """
+    Tests cases of catching up over federation.
+
+    By default for test cases federation sending is disabled. This Test class has it
+    re-enabled for the main process.
+    """
+
     servlets = [
         admin.register_servlets,
         room.register_servlets,
@@ -42,6 +50,11 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
             self.record_transaction
         )
 
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        config["federation_sender_instances"] = None
+        return config
+
     async def record_transaction(self, txn, json_cb):
         if self.is_online:
             data = json_cb()
@@ -79,7 +92,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
         )[0]
         return {"event_id": event_id, "stream_ordering": stream_ordering}
 
-    @override_config({"send_federation": True})
     def test_catch_up_destination_rooms_tracking(self):
         """
         Tests that we populate the `destination_rooms` table as needed.
@@ -105,7 +117,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
         self.assertEqual(row_2["event_id"], event_id_2)
         self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)
 
-    @override_config({"send_federation": True})
     def test_catch_up_last_successful_stream_ordering_tracking(self):
         """
         Tests that we populate the `destination_rooms` table as needed.
@@ -163,7 +174,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
             "Send succeeded but not marked as last_successful_stream_ordering",
         )
 
-    @override_config({"send_federation": True})  # critical to federate
     def test_catch_up_from_blank_state(self):
         """
         Runs an overall test of federation catch-up from scratch.
@@ -260,7 +270,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
 
         return per_dest_queue, results_list
 
-    @override_config({"send_federation": True})
     def test_catch_up_loop(self):
         """
         Tests the behaviour of _catch_up_transmission_loop.
@@ -325,7 +334,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
             event_5.internal_metadata.stream_ordering,
         )
 
-    @override_config({"send_federation": True})
     def test_catch_up_on_synapse_startup(self):
         """
         Tests the behaviour of get_catch_up_outstanding_destinations and
@@ -424,7 +432,6 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
         # - all destinations are woken exactly once; they appear once in woken.
         self.assertCountEqual(woken, server_names[:-1])
 
-    @override_config({"send_federation": True})
     def test_not_latest_event(self):
         """Test that we send the latest event in the room even if its not ours."""
 
diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py
index 01f147418b..8692d8190f 100644
--- a/tests/federation/test_federation_sender.py
+++ b/tests/federation/test_federation_sender.py
@@ -25,10 +25,17 @@ from synapse.rest.client import login
 from synapse.types import JsonDict, ReadReceipt
 
 from tests.test_utils import make_awaitable
-from tests.unittest import HomeserverTestCase, override_config
+from tests.unittest import HomeserverTestCase
 
 
 class FederationSenderReceiptsTestCases(HomeserverTestCase):
+    """
+    Test federation sending to update receipts.
+
+    By default for test cases federation sending is disabled. This Test class has it
+    re-enabled for the main process.
+    """
+
     def make_homeserver(self, reactor, clock):
         hs = self.setup_test_homeserver(
             federation_transport_client=Mock(spec=["send_transaction"]),
@@ -38,9 +45,17 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
             return_value=make_awaitable({"test", "host2"})
         )
 
+        hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = (
+            hs.get_storage_controllers().state.get_current_hosts_in_room
+        )
+
         return hs
 
-    @override_config({"send_federation": True})
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        config["federation_sender_instances"] = None
+        return config
+
     def test_send_receipts(self):
         mock_send_transaction = (
             self.hs.get_federation_transport_client().send_transaction
@@ -83,7 +98,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
             ],
         )
 
-    @override_config({"send_federation": True})
     def test_send_receipts_thread(self):
         mock_send_transaction = (
             self.hs.get_federation_transport_client().send_transaction
@@ -160,7 +174,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
             ],
         )
 
-    @override_config({"send_federation": True})
     def test_send_receipts_with_backoff(self):
         """Send two receipts in quick succession; the second should be flushed, but
         only after 20ms"""
@@ -247,6 +260,13 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
 
 
 class FederationSenderDevicesTestCases(HomeserverTestCase):
+    """
+    Test federation sending to update devices.
+
+    By default for test cases federation sending is disabled. This Test class has it
+    re-enabled for the main process.
+    """
+
     servlets = [
         admin.register_servlets,
         login.register_servlets,
@@ -261,7 +281,8 @@ class FederationSenderDevicesTestCases(HomeserverTestCase):
 
     def default_config(self):
         c = super().default_config()
-        c["send_federation"] = True
+        # Enable federation sending on the main process.
+        c["federation_sender_instances"] = None
         return c
 
     def prepare(self, reactor, clock, hs):
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 9ed26d87a7..57bfbd7734 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -765,7 +765,12 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
         fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)]
         messages = {
             self.exclusive_as_user: {
-                device_id: to_device_message_content for device_id in fake_device_ids
+                device_id: {
+                    "type": "test_to_device_message",
+                    "sender": "@some:sender",
+                    "content": to_device_message_content,
+                }
+                for device_id in fake_device_ids
             }
         }
 
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index c5981ff965..584e7b8971 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -992,7 +992,8 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
 
     def default_config(self):
         config = super().default_config()
-        config["send_federation"] = True
+        # Enable federation sending on the main process.
+        config["federation_sender_instances"] = None
         return config
 
     def prepare(self, reactor, clock, hs):
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index 9c821b3042..efbb5a8dbb 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -200,7 +200,8 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
             ],
         )
 
-    @override_config({"send_federation": True})
+    # Enable federation sending on the main process.
+    @override_config({"federation_sender_instances": None})
     def test_started_typing_remote_send(self) -> None:
         self.room_members = [U_APPLE, U_ONION]
 
@@ -305,7 +306,8 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
         self.assertEqual(events[0], [])
         self.assertEqual(events[1], 0)
 
-    @override_config({"send_federation": True})
+    # Enable federation sending on the main process.
+    @override_config({"federation_sender_instances": None})
     def test_stopped_typing(self) -> None:
         self.room_members = [U_APPLE, U_BANANA, U_ONION]
 
diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py
index 9e39cd97e5..75fc5a17a4 100644
--- a/tests/handlers/test_user_directory.py
+++ b/tests/handlers/test_user_directory.py
@@ -56,7 +56,8 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
 
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
         config = self.default_config()
-        config["update_user_directory"] = True
+        # Re-enables updating the user directory, as that function is needed below.
+        config["update_user_directory_from_worker"] = None
 
         self.appservice = ApplicationService(
             token="i_am_an_app_service",
@@ -1045,7 +1046,9 @@ class TestUserDirSearchDisabled(unittest.HomeserverTestCase):
 
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
         config = self.default_config()
-        config["update_user_directory"] = True
+        # Re-enables updating the user directory, as that function is needed below. It
+        # will be force disabled later
+        config["update_user_directory_from_worker"] = None
         hs = self.setup_test_homeserver(config=config)
 
         self.config = hs.config
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index 058ca57e55..b0f3f4374d 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -336,7 +336,8 @@ class ModuleApiTestCase(HomeserverTestCase):
         # Test sending local online presence to users from the main process
         _test_sending_local_online_presence_to_local_user(self, test_with_workers=False)
 
-    @override_config({"send_federation": True})
+    # Enable federation sending on the main process.
+    @override_config({"federation_sender_instances": None})
     def test_send_local_online_presence_to_federation(self):
         """Tests that send_local_presence_to_users sends local online presence to remote users."""
         # Create a user who will send presence updates
diff --git a/tests/push/test_bulk_push_rule_evaluator.py b/tests/push/test_bulk_push_rule_evaluator.py
index 594e7937a8..1cd453248e 100644
--- a/tests/push/test_bulk_push_rule_evaluator.py
+++ b/tests/push/test_bulk_push_rule_evaluator.py
@@ -6,10 +6,11 @@ from synapse.rest import admin
 from synapse.rest.client import login, register, room
 from synapse.types import create_requester
 
-from tests import unittest
+from tests.test_utils import simple_async_mock
+from tests.unittest import HomeserverTestCase, override_config
 
 
-class TestBulkPushRuleEvaluator(unittest.HomeserverTestCase):
+class TestBulkPushRuleEvaluator(HomeserverTestCase):
 
     servlets = [
         admin.register_servlets_for_client_rest_resource,
@@ -72,3 +73,43 @@ class TestBulkPushRuleEvaluator(unittest.HomeserverTestCase):
         bulk_evaluator = BulkPushRuleEvaluator(self.hs)
         # should not raise
         self.get_success(bulk_evaluator.action_for_events_by_user([(event, context)]))
+
+    @override_config({"push": {"enabled": False}})
+    def test_action_for_event_by_user_disabled_by_config(self) -> None:
+        """Ensure that push rules are not calculated when disabled in the config"""
+        # Create a new user and room.
+        alice = self.register_user("alice", "pass")
+        token = self.login(alice, "pass")
+
+        room_id = self.helper.create_room_as(
+            alice, room_version=RoomVersions.V9.identifier, tok=token
+        )
+
+        # Alter the power levels in that room to include stringy and floaty levels.
+        # We need to suppress the validation logic or else it will reject these dodgy
+        # values. (Presumably this validation was not always present.)
+        event_creation_handler = self.hs.get_event_creation_handler()
+        requester = create_requester(alice)
+
+        # Create a new message event, and try to evaluate it under the dodgy
+        # power level event.
+        event, context = self.get_success(
+            event_creation_handler.create_event(
+                requester,
+                {
+                    "type": "m.room.message",
+                    "room_id": room_id,
+                    "content": {
+                        "msgtype": "m.text",
+                        "body": "helo",
+                    },
+                    "sender": alice,
+                },
+            )
+        )
+
+        bulk_evaluator = BulkPushRuleEvaluator(self.hs)
+        bulk_evaluator._action_for_event_by_user = simple_async_mock()  # type: ignore[assignment]
+        # should not raise
+        self.get_success(bulk_evaluator.action_for_events_by_user([(event, context)]))
+        bulk_evaluator._action_for_event_by_user.assert_not_called()
diff --git a/tests/push/test_email.py b/tests/push/test_email.py
index fd14568f55..57b2f0536e 100644
--- a/tests/push/test_email.py
+++ b/tests/push/test_email.py
@@ -66,7 +66,6 @@ class EmailPusherTests(HomeserverTestCase):
             "riot_base_url": None,
         }
         config["public_baseurl"] = "http://aaa"
-        config["start_pushers"] = True
 
         hs = self.setup_test_homeserver(config=config)
 
diff --git a/tests/push/test_http.py b/tests/push/test_http.py
index b383b8401f..afaafe79aa 100644
--- a/tests/push/test_http.py
+++ b/tests/push/test_http.py
@@ -11,7 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Any, Dict, List, Optional, Tuple
+from typing import List, Optional, Tuple
 from unittest.mock import Mock
 
 from twisted.internet.defer import Deferred
@@ -41,11 +41,6 @@ class HTTPPusherTests(HomeserverTestCase):
     user_id = True
     hijack_auth = False
 
-    def default_config(self) -> Dict[str, Any]:
-        config = super().default_config()
-        config["start_pushers"] = True
-        return config
-
     def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
         self.push_attempts: List[Tuple[Deferred, str, dict]] = []
 
diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index 3029a16dda..6a7174b333 100644
--- a/tests/replication/_base.py
+++ b/tests/replication/_base.py
@@ -307,7 +307,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
         stream to the master HS.
 
         Args:
-            worker_app: Type of worker, e.g. `synapse.app.federation_sender`.
+            worker_app: Type of worker, e.g. `synapse.app.generic_worker`.
             extra_config: Any extra config to use for this instances.
             **kwargs: Options that get passed to `self.setup_test_homeserver`,
                 useful to e.g. pass some mocks for things like `federation_http_client`
diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
index ffec06a0d6..bcb82c9c80 100644
--- a/tests/replication/tcp/streams/test_federation.py
+++ b/tests/replication/tcp/streams/test_federation.py
@@ -22,9 +22,8 @@ class FederationStreamTestCase(BaseStreamTestCase):
     def _get_worker_hs_config(self) -> dict:
         # enable federation sending on the worker
         config = super()._get_worker_hs_config()
-        # TODO: make it so we don't need both of these
-        config["send_federation"] = False
-        config["worker_app"] = "synapse.app.federation_sender"
+        config["worker_name"] = "federation_sender1"
+        config["federation_sender_instances"] = ["federation_sender1"]
         return config
 
     def test_catchup(self):
diff --git a/tests/replication/test_auth.py b/tests/replication/test_auth.py
index 43a16bb141..5d7a89e0c7 100644
--- a/tests/replication/test_auth.py
+++ b/tests/replication/test_auth.py
@@ -38,7 +38,7 @@ class WorkerAuthenticationTestCase(BaseMultiWorkerStreamTestCase):
 
     def _get_worker_hs_config(self) -> dict:
         config = self.default_config()
-        config["worker_app"] = "synapse.app.client_reader"
+        config["worker_app"] = "synapse.app.generic_worker"
         config["worker_replication_host"] = "testserv"
         config["worker_replication_http_port"] = "8765"
 
@@ -53,7 +53,7 @@ class WorkerAuthenticationTestCase(BaseMultiWorkerStreamTestCase):
         4. Return the final request.
 
         """
-        worker_hs = self.make_worker_hs("synapse.app.client_reader")
+        worker_hs = self.make_worker_hs("synapse.app.generic_worker")
         site = self._hs_to_site[worker_hs]
 
         channel_1 = make_request(
diff --git a/tests/replication/test_client_reader_shard.py b/tests/replication/test_client_reader_shard.py
index 995097d72c..eb5b376534 100644
--- a/tests/replication/test_client_reader_shard.py
+++ b/tests/replication/test_client_reader_shard.py
@@ -22,20 +22,20 @@ logger = logging.getLogger(__name__)
 
 
 class ClientReaderTestCase(BaseMultiWorkerStreamTestCase):
-    """Test using one or more client readers for registration."""
+    """Test using one or more generic workers for registration."""
 
     servlets = [register.register_servlets]
 
     def _get_worker_hs_config(self) -> dict:
         config = self.default_config()
-        config["worker_app"] = "synapse.app.client_reader"
+        config["worker_app"] = "synapse.app.generic_worker"
         config["worker_replication_host"] = "testserv"
         config["worker_replication_http_port"] = "8765"
         return config
 
     def test_register_single_worker(self):
-        """Test that registration works when using a single client reader worker."""
-        worker_hs = self.make_worker_hs("synapse.app.client_reader")
+        """Test that registration works when using a single generic worker."""
+        worker_hs = self.make_worker_hs("synapse.app.generic_worker")
         site = self._hs_to_site[worker_hs]
 
         channel_1 = make_request(
@@ -64,9 +64,9 @@ class ClientReaderTestCase(BaseMultiWorkerStreamTestCase):
         self.assertEqual(channel_2.json_body["user_id"], "@user:test")
 
     def test_register_multi_worker(self):
-        """Test that registration works when using multiple client reader workers."""
-        worker_hs_1 = self.make_worker_hs("synapse.app.client_reader")
-        worker_hs_2 = self.make_worker_hs("synapse.app.client_reader")
+        """Test that registration works when using multiple generic workers."""
+        worker_hs_1 = self.make_worker_hs("synapse.app.generic_worker")
+        worker_hs_2 = self.make_worker_hs("synapse.app.generic_worker")
 
         site_1 = self._hs_to_site[worker_hs_1]
         channel_1 = make_request(
diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py
index 26b8bd512a..63b1dd40b5 100644
--- a/tests/replication/test_federation_ack.py
+++ b/tests/replication/test_federation_ack.py
@@ -25,8 +25,9 @@ from tests.unittest import HomeserverTestCase
 class FederationAckTestCase(HomeserverTestCase):
     def default_config(self) -> dict:
         config = super().default_config()
-        config["worker_app"] = "synapse.app.federation_sender"
-        config["send_federation"] = False
+        config["worker_app"] = "synapse.app.generic_worker"
+        config["worker_name"] = "federation_sender1"
+        config["federation_sender_instances"] = ["federation_sender1"]
         return config
 
     def make_homeserver(self, reactor, clock):
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index 6104a55aa1..c28073b8f7 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -27,17 +27,19 @@ logger = logging.getLogger(__name__)
 
 
 class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
+    """
+    Various tests for federation sending on workers.
+
+    Federation sending is disabled by default, it will be enabled in each test by
+    updating 'federation_sender_instances'.
+    """
+
     servlets = [
         login.register_servlets,
         register_servlets_for_client_rest_resource,
         room.register_servlets,
     ]
 
-    def default_config(self):
-        conf = super().default_config()
-        conf["send_federation"] = False
-        return conf
-
     def test_send_event_single_sender(self):
         """Test that using a single federation sender worker correctly sends a
         new event.
@@ -46,8 +48,11 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         mock_client.put_json.return_value = make_awaitable({})
 
         self.make_worker_hs(
-            "synapse.app.federation_sender",
-            {"send_federation": False},
+            "synapse.app.generic_worker",
+            {
+                "worker_name": "federation_sender1",
+                "federation_sender_instances": ["federation_sender1"],
+            },
             federation_http_client=mock_client,
         )
 
@@ -73,11 +78,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         mock_client1 = Mock(spec=["put_json"])
         mock_client1.put_json.return_value = make_awaitable({})
         self.make_worker_hs(
-            "synapse.app.federation_sender",
+            "synapse.app.generic_worker",
             {
-                "send_federation": True,
-                "worker_name": "sender1",
-                "federation_sender_instances": ["sender1", "sender2"],
+                "worker_name": "federation_sender1",
+                "federation_sender_instances": [
+                    "federation_sender1",
+                    "federation_sender2",
+                ],
             },
             federation_http_client=mock_client1,
         )
@@ -85,11 +92,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         mock_client2 = Mock(spec=["put_json"])
         mock_client2.put_json.return_value = make_awaitable({})
         self.make_worker_hs(
-            "synapse.app.federation_sender",
+            "synapse.app.generic_worker",
             {
-                "send_federation": True,
-                "worker_name": "sender2",
-                "federation_sender_instances": ["sender1", "sender2"],
+                "worker_name": "federation_sender2",
+                "federation_sender_instances": [
+                    "federation_sender1",
+                    "federation_sender2",
+                ],
             },
             federation_http_client=mock_client2,
         )
@@ -136,11 +145,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         mock_client1 = Mock(spec=["put_json"])
         mock_client1.put_json.return_value = make_awaitable({})
         self.make_worker_hs(
-            "synapse.app.federation_sender",
+            "synapse.app.generic_worker",
             {
-                "send_federation": True,
-                "worker_name": "sender1",
-                "federation_sender_instances": ["sender1", "sender2"],
+                "worker_name": "federation_sender1",
+                "federation_sender_instances": [
+                    "federation_sender1",
+                    "federation_sender2",
+                ],
             },
             federation_http_client=mock_client1,
         )
@@ -148,11 +159,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         mock_client2 = Mock(spec=["put_json"])
         mock_client2.put_json.return_value = make_awaitable({})
         self.make_worker_hs(
-            "synapse.app.federation_sender",
+            "synapse.app.generic_worker",
             {
-                "send_federation": True,
-                "worker_name": "sender2",
-                "federation_sender_instances": ["sender1", "sender2"],
+                "worker_name": "federation_sender2",
+                "federation_sender_instances": [
+                    "federation_sender1",
+                    "federation_sender2",
+                ],
             },
             federation_http_client=mock_client2,
         )
diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py
index 59fea93e49..ca18ad6553 100644
--- a/tests/replication/test_pusher_shard.py
+++ b/tests/replication/test_pusher_shard.py
@@ -38,11 +38,6 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
         self.other_user_id = self.register_user("otheruser", "pass")
         self.other_access_token = self.login("otheruser", "pass")
 
-    def default_config(self):
-        conf = super().default_config()
-        conf["start_pushers"] = False
-        return conf
-
     def _create_pusher_and_send_msg(self, localpart):
         # Create a user that will get push notifications
         user_id = self.register_user(localpart, "pass")
@@ -92,8 +87,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
         )
 
         self.make_worker_hs(
-            "synapse.app.pusher",
-            {"start_pushers": False},
+            "synapse.app.generic_worker",
+            {"worker_name": "pusher1", "pusher_instances": ["pusher1"]},
             proxied_blacklisted_http_client=http_client_mock,
         )
 
@@ -122,9 +117,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
         )
 
         self.make_worker_hs(
-            "synapse.app.pusher",
+            "synapse.app.generic_worker",
             {
-                "start_pushers": True,
                 "worker_name": "pusher1",
                 "pusher_instances": ["pusher1", "pusher2"],
             },
@@ -137,9 +131,8 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
         )
 
         self.make_worker_hs(
-            "synapse.app.pusher",
+            "synapse.app.generic_worker",
             {
-                "start_pushers": True,
                 "worker_name": "pusher2",
                 "pusher_instances": ["pusher1", "pusher2"],
             },
diff --git a/tests/rest/key/v2/test_remote_key_resource.py b/tests/rest/key/v2/test_remote_key_resource.py
index 7f1fba1086..2bb6e27d94 100644
--- a/tests/rest/key/v2/test_remote_key_resource.py
+++ b/tests/rest/key/v2/test_remote_key_resource.py
@@ -11,7 +11,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import urllib.parse
 from io import BytesIO, StringIO
 from typing import Any, Dict, Optional, Union
 from unittest.mock import Mock
@@ -65,9 +64,7 @@ class BaseRemoteKeyResourceTestCase(unittest.HomeserverTestCase):
             self.assertTrue(ignore_backoff)
             self.assertEqual(destination, server_name)
             key_id = "%s:%s" % (signing_key.alg, signing_key.version)
-            self.assertEqual(
-                path, "/_matrix/key/v2/server/%s" % (urllib.parse.quote(key_id),)
-            )
+            self.assertEqual(path, "/_matrix/key/v2/server")
 
             response = {
                 "server_name": server_name,
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index ee48920f84..5fa8bd2d98 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -156,7 +156,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
 
         last_event_id: str
 
-        def _assert_counts(noitf_count: int, highlight_count: int) -> None:
+        def _assert_counts(notif_count: int, highlight_count: int) -> None:
             counts = self.get_success(
                 self.store.db_pool.runInteraction(
                     "get-unread-counts",
@@ -168,13 +168,22 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             self.assertEqual(
                 counts.main_timeline,
                 NotifCounts(
-                    notify_count=noitf_count,
+                    notify_count=notif_count,
                     unread_count=0,
                     highlight_count=highlight_count,
                 ),
             )
             self.assertEqual(counts.threads, {})
 
+            aggregate_counts = self.get_success(
+                self.store.db_pool.runInteraction(
+                    "get-aggregate-unread-counts",
+                    self.store._get_unread_counts_by_room_for_user_txn,
+                    user_id,
+                )
+            )
+            self.assertEqual(aggregate_counts[room_id], notif_count)
+
         def _create_event(highlight: bool = False) -> str:
             result = self.helper.send_event(
                 room_id,
@@ -283,7 +292,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         last_event_id: str
 
         def _assert_counts(
-            noitf_count: int,
+            notif_count: int,
             highlight_count: int,
             thread_notif_count: int,
             thread_highlight_count: int,
@@ -299,7 +308,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             self.assertEqual(
                 counts.main_timeline,
                 NotifCounts(
-                    notify_count=noitf_count,
+                    notify_count=notif_count,
                     unread_count=0,
                     highlight_count=highlight_count,
                 ),
@@ -318,6 +327,17 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             else:
                 self.assertEqual(counts.threads, {})
 
+            aggregate_counts = self.get_success(
+                self.store.db_pool.runInteraction(
+                    "get-aggregate-unread-counts",
+                    self.store._get_unread_counts_by_room_for_user_txn,
+                    user_id,
+                )
+            )
+            self.assertEqual(
+                aggregate_counts[room_id], notif_count + thread_notif_count
+            )
+
         def _create_event(
             highlight: bool = False, thread_id: Optional[str] = None
         ) -> str:
@@ -454,7 +474,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         last_event_id: str
 
         def _assert_counts(
-            noitf_count: int,
+            notif_count: int,
             highlight_count: int,
             thread_notif_count: int,
             thread_highlight_count: int,
@@ -470,7 +490,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             self.assertEqual(
                 counts.main_timeline,
                 NotifCounts(
-                    notify_count=noitf_count,
+                    notify_count=notif_count,
                     unread_count=0,
                     highlight_count=highlight_count,
                 ),
@@ -489,6 +509,17 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             else:
                 self.assertEqual(counts.threads, {})
 
+            aggregate_counts = self.get_success(
+                self.store.db_pool.runInteraction(
+                    "get-aggregate-unread-counts",
+                    self.store._get_unread_counts_by_room_for_user_txn,
+                    user_id,
+                )
+            )
+            self.assertEqual(
+                aggregate_counts[room_id], notif_count + thread_notif_count
+            )
+
         def _create_event(
             highlight: bool = False, thread_id: Optional[str] = None
         ) -> str:
@@ -646,7 +677,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             )
             return result["event_id"]
 
-        def _assert_counts(noitf_count: int, thread_notif_count: int) -> None:
+        def _assert_counts(notif_count: int, thread_notif_count: int) -> None:
             counts = self.get_success(
                 self.store.db_pool.runInteraction(
                     "get-unread-counts",
@@ -658,7 +689,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
             self.assertEqual(
                 counts.main_timeline,
                 NotifCounts(
-                    notify_count=noitf_count, unread_count=0, highlight_count=0
+                    notify_count=notif_count, unread_count=0, highlight_count=0
                 ),
             )
             if thread_notif_count:
diff --git a/tests/util/test_async_helpers.py b/tests/util/test_async_helpers.py
index 9d5010bf92..91cac9822a 100644
--- a/tests/util/test_async_helpers.py
+++ b/tests/util/test_async_helpers.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import traceback
+from typing import Generator, List, NoReturn, Optional
 
 from parameterized import parameterized_class
 
@@ -41,8 +42,8 @@ from tests.unittest import TestCase
 
 
 class ObservableDeferredTest(TestCase):
-    def test_succeed(self):
-        origin_d = Deferred()
+    def test_succeed(self) -> None:
+        origin_d: "Deferred[int]" = Deferred()
         observable = ObservableDeferred(origin_d)
 
         observer1 = observable.observe()
@@ -52,16 +53,18 @@ class ObservableDeferredTest(TestCase):
         self.assertFalse(observer2.called)
 
         # check the first observer is called first
-        def check_called_first(res):
+        def check_called_first(res: int) -> int:
             self.assertFalse(observer2.called)
             return res
 
         observer1.addBoth(check_called_first)
 
         # store the results
-        results = [None, None]
+        results: List[Optional[ObservableDeferred[int]]] = [None, None]
 
-        def check_val(res, idx):
+        def check_val(
+            res: ObservableDeferred[int], idx: int
+        ) -> ObservableDeferred[int]:
             results[idx] = res
             return res
 
@@ -72,8 +75,8 @@ class ObservableDeferredTest(TestCase):
         self.assertEqual(results[0], 123, "observer 1 callback result")
         self.assertEqual(results[1], 123, "observer 2 callback result")
 
-    def test_failure(self):
-        origin_d = Deferred()
+    def test_failure(self) -> None:
+        origin_d: Deferred = Deferred()
         observable = ObservableDeferred(origin_d, consumeErrors=True)
 
         observer1 = observable.observe()
@@ -83,16 +86,16 @@ class ObservableDeferredTest(TestCase):
         self.assertFalse(observer2.called)
 
         # check the first observer is called first
-        def check_called_first(res):
+        def check_called_first(res: int) -> int:
             self.assertFalse(observer2.called)
             return res
 
         observer1.addBoth(check_called_first)
 
         # store the results
-        results = [None, None]
+        results: List[Optional[ObservableDeferred[str]]] = [None, None]
 
-        def check_val(res, idx):
+        def check_val(res: ObservableDeferred[str], idx: int) -> None:
             results[idx] = res
             return None
 
@@ -103,10 +106,12 @@ class ObservableDeferredTest(TestCase):
             raise Exception("gah!")
         except Exception as e:
             origin_d.errback(e)
+        assert results[0] is not None
         self.assertEqual(str(results[0].value), "gah!", "observer 1 errback result")
+        assert results[1] is not None
         self.assertEqual(str(results[1].value), "gah!", "observer 2 errback result")
 
-    def test_cancellation(self):
+    def test_cancellation(self) -> None:
         """Test that cancelling an observer does not affect other observers."""
         origin_d: "Deferred[int]" = Deferred()
         observable = ObservableDeferred(origin_d, consumeErrors=True)
@@ -136,37 +141,38 @@ class ObservableDeferredTest(TestCase):
 
 
 class TimeoutDeferredTest(TestCase):
-    def setUp(self):
+    def setUp(self) -> None:
         self.clock = Clock()
 
-    def test_times_out(self):
+    def test_times_out(self) -> None:
         """Basic test case that checks that the original deferred is cancelled and that
         the timing-out deferred is errbacked
         """
-        cancelled = [False]
+        cancelled = False
 
-        def canceller(_d):
-            cancelled[0] = True
+        def canceller(_d: Deferred) -> None:
+            nonlocal cancelled
+            cancelled = True
 
-        non_completing_d = Deferred(canceller)
+        non_completing_d: Deferred = Deferred(canceller)
         timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock)
 
         self.assertNoResult(timing_out_d)
-        self.assertFalse(cancelled[0], "deferred was cancelled prematurely")
+        self.assertFalse(cancelled, "deferred was cancelled prematurely")
 
         self.clock.pump((1.0,))
 
-        self.assertTrue(cancelled[0], "deferred was not cancelled by timeout")
+        self.assertTrue(cancelled, "deferred was not cancelled by timeout")
         self.failureResultOf(timing_out_d, defer.TimeoutError)
 
-    def test_times_out_when_canceller_throws(self):
+    def test_times_out_when_canceller_throws(self) -> None:
         """Test that we have successfully worked around
         https://twistedmatrix.com/trac/ticket/9534"""
 
-        def canceller(_d):
+        def canceller(_d: Deferred) -> None:
             raise Exception("can't cancel this deferred")
 
-        non_completing_d = Deferred(canceller)
+        non_completing_d: Deferred = Deferred(canceller)
         timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock)
 
         self.assertNoResult(timing_out_d)
@@ -175,22 +181,24 @@ class TimeoutDeferredTest(TestCase):
 
         self.failureResultOf(timing_out_d, defer.TimeoutError)
 
-    def test_logcontext_is_preserved_on_cancellation(self):
-        blocking_was_cancelled = [False]
+    def test_logcontext_is_preserved_on_cancellation(self) -> None:
+        blocking_was_cancelled = False
 
         @defer.inlineCallbacks
-        def blocking():
-            non_completing_d = Deferred()
+        def blocking() -> Generator["Deferred[object]", object, None]:
+            nonlocal blocking_was_cancelled
+
+            non_completing_d: Deferred = Deferred()
             with PreserveLoggingContext():
                 try:
                     yield non_completing_d
                 except CancelledError:
-                    blocking_was_cancelled[0] = True
+                    blocking_was_cancelled = True
                     raise
 
         with LoggingContext("one") as context_one:
             # the errbacks should be run in the test logcontext
-            def errback(res, deferred_name):
+            def errback(res: Failure, deferred_name: str) -> Failure:
                 self.assertIs(
                     current_context(),
                     context_one,
@@ -209,7 +217,7 @@ class TimeoutDeferredTest(TestCase):
             self.clock.pump((1.0,))
 
             self.assertTrue(
-                blocking_was_cancelled[0], "non-completing deferred was not cancelled"
+                blocking_was_cancelled, "non-completing deferred was not cancelled"
             )
             self.failureResultOf(timing_out_d, defer.TimeoutError)
             self.assertIs(current_context(), context_one)
@@ -220,13 +228,13 @@ class _TestException(Exception):
 
 
 class ConcurrentlyExecuteTest(TestCase):
-    def test_limits_runners(self):
+    def test_limits_runners(self) -> None:
         """If we have more tasks than runners, we should get the limit of runners"""
         started = 0
         waiters = []
         processed = []
 
-        async def callback(v):
+        async def callback(v: int) -> None:
             # when we first enter, bump the start count
             nonlocal started
             started += 1
@@ -235,7 +243,7 @@ class ConcurrentlyExecuteTest(TestCase):
             processed.append(v)
 
             # wait for the goahead before returning
-            d2 = Deferred()
+            d2: "Deferred[int]" = Deferred()
             waiters.append(d2)
             await d2
 
@@ -265,16 +273,16 @@ class ConcurrentlyExecuteTest(TestCase):
         self.assertCountEqual(processed, [1, 2, 3, 4, 5])
         self.successResultOf(d2)
 
-    def test_preserves_stacktraces(self):
+    def test_preserves_stacktraces(self) -> None:
         """Test that the stacktrace from an exception thrown in the callback is preserved"""
-        d1 = Deferred()
+        d1: "Deferred[int]" = Deferred()
 
-        async def callback(v):
+        async def callback(v: int) -> None:
             # alas, this doesn't work at all without an await here
             await d1
             raise _TestException("bah")
 
-        async def caller():
+        async def caller() -> None:
             try:
                 await concurrently_execute(callback, [1], 2)
             except _TestException as e:
@@ -290,17 +298,17 @@ class ConcurrentlyExecuteTest(TestCase):
         d1.callback(0)
         self.successResultOf(d2)
 
-    def test_preserves_stacktraces_on_preformed_failure(self):
+    def test_preserves_stacktraces_on_preformed_failure(self) -> None:
         """Test that the stacktrace on a Failure returned by the callback is preserved"""
-        d1 = Deferred()
+        d1: "Deferred[int]" = Deferred()
         f = Failure(_TestException("bah"))
 
-        async def callback(v):
+        async def callback(v: int) -> None:
             # alas, this doesn't work at all without an await here
             await d1
             await defer.fail(f)
 
-        async def caller():
+        async def caller() -> None:
             try:
                 await concurrently_execute(callback, [1], 2)
             except _TestException as e:
@@ -336,7 +344,7 @@ class CancellationWrapperTests(TestCase):
         else:
             raise ValueError(f"Unsupported wrapper type: {self.wrapper}")
 
-    def test_succeed(self):
+    def test_succeed(self) -> None:
         """Test that the new `Deferred` receives the result."""
         deferred: "Deferred[str]" = Deferred()
         wrapper_deferred = self.wrap_deferred(deferred)
@@ -346,7 +354,7 @@ class CancellationWrapperTests(TestCase):
         self.assertTrue(wrapper_deferred.called)
         self.assertEqual("success", self.successResultOf(wrapper_deferred))
 
-    def test_failure(self):
+    def test_failure(self) -> None:
         """Test that the new `Deferred` receives the `Failure`."""
         deferred: "Deferred[str]" = Deferred()
         wrapper_deferred = self.wrap_deferred(deferred)
@@ -361,7 +369,7 @@ class CancellationWrapperTests(TestCase):
 class StopCancellationTests(TestCase):
     """Tests for the `stop_cancellation` function."""
 
-    def test_cancellation(self):
+    def test_cancellation(self) -> None:
         """Test that cancellation of the new `Deferred` leaves the original running."""
         deferred: "Deferred[str]" = Deferred()
         wrapper_deferred = stop_cancellation(deferred)
@@ -384,7 +392,7 @@ class StopCancellationTests(TestCase):
 class DelayCancellationTests(TestCase):
     """Tests for the `delay_cancellation` function."""
 
-    def test_deferred_cancellation(self):
+    def test_deferred_cancellation(self) -> None:
         """Test that cancellation of the new `Deferred` waits for the original."""
         deferred: "Deferred[str]" = Deferred()
         wrapper_deferred = delay_cancellation(deferred)
@@ -405,12 +413,12 @@ class DelayCancellationTests(TestCase):
         # Now that the original `Deferred` has failed, we should get a `CancelledError`.
         self.failureResultOf(wrapper_deferred, CancelledError)
 
-    def test_coroutine_cancellation(self):
+    def test_coroutine_cancellation(self) -> None:
         """Test that cancellation of the new `Deferred` waits for the original."""
         blocking_deferred: "Deferred[None]" = Deferred()
         completion_deferred: "Deferred[None]" = Deferred()
 
-        async def task():
+        async def task() -> NoReturn:
             await blocking_deferred
             completion_deferred.callback(None)
             # Raise an exception. Twisted should consume it, otherwise unwanted
@@ -434,7 +442,7 @@ class DelayCancellationTests(TestCase):
         # Now that the original coroutine has failed, we should get a `CancelledError`.
         self.failureResultOf(wrapper_deferred, CancelledError)
 
-    def test_suppresses_second_cancellation(self):
+    def test_suppresses_second_cancellation(self) -> None:
         """Test that a second cancellation is suppressed.
 
         Identical to `test_cancellation` except the new `Deferred` is cancelled twice.
@@ -459,7 +467,7 @@ class DelayCancellationTests(TestCase):
         # Now that the original `Deferred` has failed, we should get a `CancelledError`.
         self.failureResultOf(wrapper_deferred, CancelledError)
 
-    def test_propagates_cancelled_error(self):
+    def test_propagates_cancelled_error(self) -> None:
         """Test that a `CancelledError` from the original `Deferred` gets propagated."""
         deferred: "Deferred[str]" = Deferred()
         wrapper_deferred = delay_cancellation(deferred)
@@ -472,14 +480,14 @@ class DelayCancellationTests(TestCase):
         self.assertTrue(wrapper_deferred.called)
         self.assertIs(cancelled_error, self.failureResultOf(wrapper_deferred).value)
 
-    def test_preserves_logcontext(self):
+    def test_preserves_logcontext(self) -> None:
         """Test that logging contexts are preserved."""
         blocking_d: "Deferred[None]" = Deferred()
 
-        async def inner():
+        async def inner() -> None:
             await make_deferred_yieldable(blocking_d)
 
-        async def outer():
+        async def outer() -> None:
             with LoggingContext("c") as c:
                 try:
                     await delay_cancellation(inner())
@@ -503,7 +511,7 @@ class DelayCancellationTests(TestCase):
 class AwakenableSleeperTests(TestCase):
     "Tests AwakenableSleeper"
 
-    def test_sleep(self):
+    def test_sleep(self) -> None:
         reactor, _ = get_clock()
         sleeper = AwakenableSleeper(reactor)
 
@@ -518,7 +526,7 @@ class AwakenableSleeperTests(TestCase):
         reactor.advance(0.6)
         self.assertTrue(d.called)
 
-    def test_explicit_wake(self):
+    def test_explicit_wake(self) -> None:
         reactor, _ = get_clock()
         sleeper = AwakenableSleeper(reactor)
 
@@ -535,7 +543,7 @@ class AwakenableSleeperTests(TestCase):
 
         reactor.advance(0.6)
 
-    def test_multiple_sleepers_timeout(self):
+    def test_multiple_sleepers_timeout(self) -> None:
         reactor, _ = get_clock()
         sleeper = AwakenableSleeper(reactor)
 
@@ -555,7 +563,7 @@ class AwakenableSleeperTests(TestCase):
         reactor.advance(0.6)
         self.assertTrue(d2.called)
 
-    def test_multiple_sleepers_wake(self):
+    def test_multiple_sleepers_wake(self) -> None:
         reactor, _ = get_clock()
         sleeper = AwakenableSleeper(reactor)
 
diff --git a/tests/util/test_batching_queue.py b/tests/util/test_batching_queue.py
index 07be57d72c..94ef91f645 100644
--- a/tests/util/test_batching_queue.py
+++ b/tests/util/test_batching_queue.py
@@ -11,6 +11,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from typing import List, Tuple
+
+from prometheus_client import Gauge
+
 from twisted.internet import defer
 
 from synapse.logging.context import make_deferred_yieldable
@@ -26,7 +30,7 @@ from tests.unittest import TestCase
 
 
 class BatchingQueueTestCase(TestCase):
-    def setUp(self):
+    def setUp(self) -> None:
         self.clock, hs_clock = get_clock()
 
         # We ensure that we remove any existing metrics for "test_queue".
@@ -37,25 +41,27 @@ class BatchingQueueTestCase(TestCase):
         except KeyError:
             pass
 
-        self._pending_calls = []
-        self.queue = BatchingQueue("test_queue", hs_clock, self._process_queue)
+        self._pending_calls: List[Tuple[List[str], defer.Deferred]] = []
+        self.queue: BatchingQueue[str, str] = BatchingQueue(
+            "test_queue", hs_clock, self._process_queue
+        )
 
-    async def _process_queue(self, values):
-        d = defer.Deferred()
+    async def _process_queue(self, values: List[str]) -> str:
+        d: "defer.Deferred[str]" = defer.Deferred()
         self._pending_calls.append((values, d))
         return await make_deferred_yieldable(d)
 
-    def _get_sample_with_name(self, metric, name) -> int:
+    def _get_sample_with_name(self, metric: Gauge, name: str) -> float:
         """For a prometheus metric get the value of the sample that has a
         matching "name" label.
         """
-        for sample in metric.collect()[0].samples:
+        for sample in next(iter(metric.collect())).samples:
             if sample.labels.get("name") == name:
                 return sample.value
 
         self.fail("Found no matching sample")
 
-    def _assert_metrics(self, queued, keys, in_flight):
+    def _assert_metrics(self, queued: int, keys: int, in_flight: int) -> None:
         """Assert that the metrics are correct"""
 
         sample = self._get_sample_with_name(number_queued, self.queue._name)
@@ -75,7 +81,7 @@ class BatchingQueueTestCase(TestCase):
             "number_in_flight",
         )
 
-    def test_simple(self):
+    def test_simple(self) -> None:
         """Tests the basic case of calling `add_to_queue` once and having
         `_process_queue` return.
         """
@@ -106,7 +112,7 @@ class BatchingQueueTestCase(TestCase):
 
         self._assert_metrics(queued=0, keys=0, in_flight=0)
 
-    def test_batching(self):
+    def test_batching(self) -> None:
         """Test that multiple calls at the same time get batched up into one
         call to `_process_queue`.
         """
@@ -134,7 +140,7 @@ class BatchingQueueTestCase(TestCase):
         self.assertEqual(self.successResultOf(queue_d2), "bar")
         self._assert_metrics(queued=0, keys=0, in_flight=0)
 
-    def test_queuing(self):
+    def test_queuing(self) -> None:
         """Test that we queue up requests while a `_process_queue` is being
         called.
         """
@@ -184,7 +190,7 @@ class BatchingQueueTestCase(TestCase):
         self.assertEqual(self.successResultOf(queue_d3), "bar2")
         self._assert_metrics(queued=0, keys=0, in_flight=0)
 
-    def test_different_keys(self):
+    def test_different_keys(self) -> None:
         """Test that calls to different keys get processed in parallel."""
 
         self.assertFalse(self._pending_calls)
diff --git a/tests/util/test_check_dependencies.py b/tests/util/test_check_dependencies.py
index 6913de24b9..aa20fe6780 100644
--- a/tests/util/test_check_dependencies.py
+++ b/tests/util/test_check_dependencies.py
@@ -1,5 +1,20 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from contextlib import contextmanager
-from typing import Generator, Optional
+from os import PathLike
+from typing import Generator, Optional, Union
 from unittest.mock import patch
 
 from synapse.util.check_dependencies import (
@@ -12,17 +27,17 @@ from tests.unittest import TestCase
 
 
 class DummyDistribution(metadata.Distribution):
-    def __init__(self, version: object):
+    def __init__(self, version: str):
         self._version = version
 
     @property
-    def version(self):
+    def version(self) -> str:
         return self._version
 
-    def locate_file(self, path):
+    def locate_file(self, path: Union[str, PathLike]) -> PathLike:
         raise NotImplementedError()
 
-    def read_text(self, filename):
+    def read_text(self, filename: str) -> None:
         raise NotImplementedError()
 
 
@@ -30,7 +45,7 @@ old = DummyDistribution("0.1.2")
 old_release_candidate = DummyDistribution("0.1.2rc3")
 new = DummyDistribution("1.2.3")
 new_release_candidate = DummyDistribution("1.2.3rc4")
-distribution_with_no_version = DummyDistribution(None)
+distribution_with_no_version = DummyDistribution(None)  # type: ignore[arg-type]
 
 # could probably use stdlib TestCase --- no need for twisted here
 
@@ -45,7 +60,7 @@ class TestDependencyChecker(TestCase):
         If `distribution = None`, we pretend that the package is not installed.
         """
 
-        def mock_distribution(name: str):
+        def mock_distribution(name: str) -> DummyDistribution:
             if distribution is None:
                 raise metadata.PackageNotFoundError
             else:
diff --git a/tests/util/test_dict_cache.py b/tests/util/test_dict_cache.py
index e8b6246ab5..acb251bfea 100644
--- a/tests/util/test_dict_cache.py
+++ b/tests/util/test_dict_cache.py
@@ -19,10 +19,12 @@ from tests import unittest
 
 
 class DictCacheTestCase(unittest.TestCase):
-    def setUp(self):
-        self.cache = DictionaryCache("foobar", max_entries=10)
+    def setUp(self) -> None:
+        self.cache: DictionaryCache[str, str, str] = DictionaryCache(
+            "foobar", max_entries=10
+        )
 
-    def test_simple_cache_hit_full(self):
+    def test_simple_cache_hit_full(self) -> None:
         key = "test_simple_cache_hit_full"
 
         v = self.cache.get(key)
@@ -37,7 +39,7 @@ class DictCacheTestCase(unittest.TestCase):
         c = self.cache.get(key)
         self.assertEqual(test_value, c.value)
 
-    def test_simple_cache_hit_partial(self):
+    def test_simple_cache_hit_partial(self) -> None:
         key = "test_simple_cache_hit_partial"
 
         seq = self.cache.sequence
@@ -47,7 +49,7 @@ class DictCacheTestCase(unittest.TestCase):
         c = self.cache.get(key, ["test"])
         self.assertEqual(test_value, c.value)
 
-    def test_simple_cache_miss_partial(self):
+    def test_simple_cache_miss_partial(self) -> None:
         key = "test_simple_cache_miss_partial"
 
         seq = self.cache.sequence
@@ -57,7 +59,7 @@ class DictCacheTestCase(unittest.TestCase):
         c = self.cache.get(key, ["test2"])
         self.assertEqual({}, c.value)
 
-    def test_simple_cache_hit_miss_partial(self):
+    def test_simple_cache_hit_miss_partial(self) -> None:
         key = "test_simple_cache_hit_miss_partial"
 
         seq = self.cache.sequence
@@ -71,7 +73,7 @@ class DictCacheTestCase(unittest.TestCase):
         c = self.cache.get(key, ["test2"])
         self.assertEqual({"test2": "test_simple_cache_hit_miss_partial2"}, c.value)
 
-    def test_multi_insert(self):
+    def test_multi_insert(self) -> None:
         key = "test_simple_cache_hit_miss_partial"
 
         seq = self.cache.sequence
@@ -92,7 +94,7 @@ class DictCacheTestCase(unittest.TestCase):
         )
         self.assertEqual(c.full, False)
 
-    def test_invalidation(self):
+    def test_invalidation(self) -> None:
         """Test that the partial dict and full dicts get invalidated
         separately.
         """
@@ -106,7 +108,7 @@ class DictCacheTestCase(unittest.TestCase):
         # entry for "a" warm.
         for i in range(20):
             self.cache.get(key, ["a"])
-            self.cache.update(seq, f"key{i}", {1: 2})
+            self.cache.update(seq, f"key{i}", {"1": "2"})
 
         # We should have evicted the full dict...
         r = self.cache.get(key)
diff --git a/tests/util/test_expiring_cache.py b/tests/util/test_expiring_cache.py
index 7f60aae5ba..9cf920daf8 100644
--- a/tests/util/test_expiring_cache.py
+++ b/tests/util/test_expiring_cache.py
@@ -12,7 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import List, cast
 
+from synapse.util import Clock
 from synapse.util.caches.expiringcache import ExpiringCache
 
 from tests.utils import MockClock
@@ -21,17 +23,21 @@ from .. import unittest
 
 
 class ExpiringCacheTestCase(unittest.HomeserverTestCase):
-    def test_get_set(self):
+    def test_get_set(self) -> None:
         clock = MockClock()
-        cache = ExpiringCache("test", clock, max_len=1)
+        cache: ExpiringCache[str, str] = ExpiringCache(
+            "test", cast(Clock, clock), max_len=1
+        )
 
         cache["key"] = "value"
         self.assertEqual(cache.get("key"), "value")
         self.assertEqual(cache["key"], "value")
 
-    def test_eviction(self):
+    def test_eviction(self) -> None:
         clock = MockClock()
-        cache = ExpiringCache("test", clock, max_len=2)
+        cache: ExpiringCache[str, str] = ExpiringCache(
+            "test", cast(Clock, clock), max_len=2
+        )
 
         cache["key"] = "value"
         cache["key2"] = "value2"
@@ -43,9 +49,11 @@ class ExpiringCacheTestCase(unittest.HomeserverTestCase):
         self.assertEqual(cache.get("key2"), "value2")
         self.assertEqual(cache.get("key3"), "value3")
 
-    def test_iterable_eviction(self):
+    def test_iterable_eviction(self) -> None:
         clock = MockClock()
-        cache = ExpiringCache("test", clock, max_len=5, iterable=True)
+        cache: ExpiringCache[str, List[int]] = ExpiringCache(
+            "test", cast(Clock, clock), max_len=5, iterable=True
+        )
 
         cache["key"] = [1]
         cache["key2"] = [2, 3]
@@ -61,9 +69,11 @@ class ExpiringCacheTestCase(unittest.HomeserverTestCase):
         self.assertEqual(cache.get("key3"), [4, 5])
         self.assertEqual(cache.get("key4"), [6, 7])
 
-    def test_time_eviction(self):
+    def test_time_eviction(self) -> None:
         clock = MockClock()
-        cache = ExpiringCache("test", clock, expiry_ms=1000)
+        cache: ExpiringCache[str, int] = ExpiringCache(
+            "test", cast(Clock, clock), expiry_ms=1000
+        )
 
         cache["key"] = 1
         clock.advance_time(0.5)
diff --git a/tests/util/test_file_consumer.py b/tests/util/test_file_consumer.py
index 3bb4695405..4f3c983c15 100644
--- a/tests/util/test_file_consumer.py
+++ b/tests/util/test_file_consumer.py
@@ -12,22 +12,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
 import threading
-from io import StringIO
+from io import BytesIO
+from typing import BinaryIO, Generator, Optional, cast
 from unittest.mock import NonCallableMock
 
-from twisted.internet import defer, reactor
+from zope.interface import implementer
+
+from twisted.internet import defer, reactor as _reactor
+from twisted.internet.interfaces import IPullProducer
 
+from synapse.types import ISynapseReactor
 from synapse.util.file_consumer import BackgroundFileConsumer
 
 from tests import unittest
 
+reactor = cast(ISynapseReactor, _reactor)
+
 
 class FileConsumerTests(unittest.TestCase):
     @defer.inlineCallbacks
-    def test_pull_consumer(self):
-        string_file = StringIO()
+    def test_pull_consumer(self) -> Generator["defer.Deferred[object]", object, None]:
+        string_file = BytesIO()
         consumer = BackgroundFileConsumer(string_file, reactor=reactor)
 
         try:
@@ -35,55 +41,57 @@ class FileConsumerTests(unittest.TestCase):
 
             yield producer.register_with_consumer(consumer)
 
-            yield producer.write_and_wait("Foo")
+            yield producer.write_and_wait(b"Foo")
 
-            self.assertEqual(string_file.getvalue(), "Foo")
+            self.assertEqual(string_file.getvalue(), b"Foo")
 
-            yield producer.write_and_wait("Bar")
+            yield producer.write_and_wait(b"Bar")
 
-            self.assertEqual(string_file.getvalue(), "FooBar")
+            self.assertEqual(string_file.getvalue(), b"FooBar")
         finally:
             consumer.unregisterProducer()
 
-        yield consumer.wait()
+        yield consumer.wait()  # type: ignore[misc]
 
         self.assertTrue(string_file.closed)
 
     @defer.inlineCallbacks
-    def test_push_consumer(self):
-        string_file = BlockingStringWrite()
-        consumer = BackgroundFileConsumer(string_file, reactor=reactor)
+    def test_push_consumer(self) -> Generator["defer.Deferred[object]", object, None]:
+        string_file = BlockingBytesWrite()
+        consumer = BackgroundFileConsumer(cast(BinaryIO, string_file), reactor=reactor)
 
         try:
             producer = NonCallableMock(spec_set=[])
 
             consumer.registerProducer(producer, True)
 
-            consumer.write("Foo")
-            yield string_file.wait_for_n_writes(1)
+            consumer.write(b"Foo")
+            yield string_file.wait_for_n_writes(1)  # type: ignore[misc]
 
-            self.assertEqual(string_file.buffer, "Foo")
+            self.assertEqual(string_file.buffer, b"Foo")
 
-            consumer.write("Bar")
-            yield string_file.wait_for_n_writes(2)
+            consumer.write(b"Bar")
+            yield string_file.wait_for_n_writes(2)  # type: ignore[misc]
 
-            self.assertEqual(string_file.buffer, "FooBar")
+            self.assertEqual(string_file.buffer, b"FooBar")
         finally:
             consumer.unregisterProducer()
 
-        yield consumer.wait()
+        yield consumer.wait()  # type: ignore[misc]
 
         self.assertTrue(string_file.closed)
 
     @defer.inlineCallbacks
-    def test_push_producer_feedback(self):
-        string_file = BlockingStringWrite()
-        consumer = BackgroundFileConsumer(string_file, reactor=reactor)
+    def test_push_producer_feedback(
+        self,
+    ) -> Generator["defer.Deferred[object]", object, None]:
+        string_file = BlockingBytesWrite()
+        consumer = BackgroundFileConsumer(cast(BinaryIO, string_file), reactor=reactor)
 
         try:
             producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"])
 
-            resume_deferred = defer.Deferred()
+            resume_deferred: defer.Deferred = defer.Deferred()
             producer.resumeProducing.side_effect = lambda: resume_deferred.callback(
                 None
             )
@@ -93,65 +101,72 @@ class FileConsumerTests(unittest.TestCase):
             number_writes = 0
             with string_file.write_lock:
                 for _ in range(consumer._PAUSE_ON_QUEUE_SIZE):
-                    consumer.write("Foo")
+                    consumer.write(b"Foo")
                     number_writes += 1
 
                 producer.pauseProducing.assert_called_once()
 
-            yield string_file.wait_for_n_writes(number_writes)
+            yield string_file.wait_for_n_writes(number_writes)  # type: ignore[misc]
 
             yield resume_deferred
             producer.resumeProducing.assert_called_once()
         finally:
             consumer.unregisterProducer()
 
-        yield consumer.wait()
+        yield consumer.wait()  # type: ignore[misc]
 
         self.assertTrue(string_file.closed)
 
 
+@implementer(IPullProducer)
 class DummyPullProducer:
-    def __init__(self):
-        self.consumer = None
-        self.deferred = defer.Deferred()
+    def __init__(self) -> None:
+        self.consumer: Optional[BackgroundFileConsumer] = None
+        self.deferred: "defer.Deferred[object]" = defer.Deferred()
 
-    def resumeProducing(self):
+    def resumeProducing(self) -> None:
         d = self.deferred
         self.deferred = defer.Deferred()
         d.callback(None)
 
-    def write_and_wait(self, bytes):
+    def stopProducing(self) -> None:
+        raise RuntimeError("Unexpected call")
+
+    def write_and_wait(self, write_bytes: bytes) -> "defer.Deferred[object]":
+        assert self.consumer is not None
         d = self.deferred
-        self.consumer.write(bytes)
+        self.consumer.write(write_bytes)
         return d
 
-    def register_with_consumer(self, consumer):
+    def register_with_consumer(
+        self, consumer: BackgroundFileConsumer
+    ) -> "defer.Deferred[object]":
         d = self.deferred
         self.consumer = consumer
         self.consumer.registerProducer(self, False)
         return d
 
 
-class BlockingStringWrite:
-    def __init__(self):
-        self.buffer = ""
+class BlockingBytesWrite:
+    def __init__(self) -> None:
+        self.buffer = b""
         self.closed = False
         self.write_lock = threading.Lock()
 
-        self._notify_write_deferred = None
+        self._notify_write_deferred: Optional[defer.Deferred] = None
         self._number_of_writes = 0
 
-    def write(self, bytes):
+    def write(self, write_bytes: bytes) -> None:
         with self.write_lock:
-            self.buffer += bytes
+            self.buffer += write_bytes
             self._number_of_writes += 1
 
         reactor.callFromThread(self._notify_write)
 
-    def close(self):
+    def close(self) -> None:
         self.closed = True
 
-    def _notify_write(self):
+    def _notify_write(self) -> None:
         "Called by write to indicate a write happened"
         with self.write_lock:
             if not self._notify_write_deferred:
@@ -161,7 +176,9 @@ class BlockingStringWrite:
         d.callback(None)
 
     @defer.inlineCallbacks
-    def wait_for_n_writes(self, n):
+    def wait_for_n_writes(
+        self, n: int
+    ) -> Generator["defer.Deferred[object]", object, None]:
         "Wait for n writes to have happened"
         while True:
             with self.write_lock:
diff --git a/tests/util/test_itertools.py b/tests/util/test_itertools.py
index 3c0ddd4f18..406c16cdcf 100644
--- a/tests/util/test_itertools.py
+++ b/tests/util/test_itertools.py
@@ -19,7 +19,7 @@ from tests.unittest import TestCase
 
 
 class ChunkSeqTests(TestCase):
-    def test_short_seq(self):
+    def test_short_seq(self) -> None:
         parts = chunk_seq("123", 8)
 
         self.assertEqual(
@@ -27,7 +27,7 @@ class ChunkSeqTests(TestCase):
             ["123"],
         )
 
-    def test_long_seq(self):
+    def test_long_seq(self) -> None:
         parts = chunk_seq("abcdefghijklmnop", 8)
 
         self.assertEqual(
@@ -35,7 +35,7 @@ class ChunkSeqTests(TestCase):
             ["abcdefgh", "ijklmnop"],
         )
 
-    def test_uneven_parts(self):
+    def test_uneven_parts(self) -> None:
         parts = chunk_seq("abcdefghijklmnop", 5)
 
         self.assertEqual(
@@ -43,7 +43,7 @@ class ChunkSeqTests(TestCase):
             ["abcde", "fghij", "klmno", "p"],
         )
 
-    def test_empty_input(self):
+    def test_empty_input(self) -> None:
         parts: Iterable[Sequence] = chunk_seq([], 5)
 
         self.assertEqual(
@@ -53,13 +53,13 @@ class ChunkSeqTests(TestCase):
 
 
 class SortTopologically(TestCase):
-    def test_empty(self):
+    def test_empty(self) -> None:
         "Test that an empty graph works correctly"
 
         graph: Dict[int, List[int]] = {}
         self.assertEqual(list(sorted_topologically([], graph)), [])
 
-    def test_handle_empty_graph(self):
+    def test_handle_empty_graph(self) -> None:
         "Test that a graph where a node doesn't have an entry is treated as empty"
 
         graph: Dict[int, List[int]] = {}
@@ -67,7 +67,7 @@ class SortTopologically(TestCase):
         # For disconnected nodes the output is simply sorted.
         self.assertEqual(list(sorted_topologically([1, 2], graph)), [1, 2])
 
-    def test_disconnected(self):
+    def test_disconnected(self) -> None:
         "Test that a graph with no edges work"
 
         graph: Dict[int, List[int]] = {1: [], 2: []}
@@ -75,20 +75,20 @@ class SortTopologically(TestCase):
         # For disconnected nodes the output is simply sorted.
         self.assertEqual(list(sorted_topologically([1, 2], graph)), [1, 2])
 
-    def test_linear(self):
+    def test_linear(self) -> None:
         "Test that a simple `4 -> 3 -> 2 -> 1` graph works"
 
         graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
 
         self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
 
-    def test_subset(self):
+    def test_subset(self) -> None:
         "Test that only sorting a subset of the graph works"
         graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]}
 
         self.assertEqual(list(sorted_topologically([4, 3], graph)), [3, 4])
 
-    def test_fork(self):
+    def test_fork(self) -> None:
         "Test that a forked graph works"
         graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]}
 
@@ -96,13 +96,13 @@ class SortTopologically(TestCase):
         # always get the same one.
         self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
 
-    def test_duplicates(self):
+    def test_duplicates(self) -> None:
         "Test that a graph with duplicate edges work"
         graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]}
 
         self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4])
 
-    def test_multiple_paths(self):
+    def test_multiple_paths(self) -> None:
         "Test that a graph with multiple paths between two nodes work"
         graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]}
 
diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py
index 2ad321e184..d64c162e1d 100644
--- a/tests/util/test_logcontext.py
+++ b/tests/util/test_logcontext.py
@@ -1,5 +1,21 @@
+# Copyright 2014-2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Callable, Generator, cast
+
 import twisted.python.failure
-from twisted.internet import defer, reactor
+from twisted.internet import defer, reactor as _reactor
 
 from synapse.logging.context import (
     SENTINEL_CONTEXT,
@@ -10,25 +26,30 @@ from synapse.logging.context import (
     nested_logging_context,
     run_in_background,
 )
+from synapse.types import ISynapseReactor
 from synapse.util import Clock
 
 from .. import unittest
 
+reactor = cast(ISynapseReactor, _reactor)
+
 
 class LoggingContextTestCase(unittest.TestCase):
-    def _check_test_key(self, value):
-        self.assertEqual(current_context().name, value)
+    def _check_test_key(self, value: str) -> None:
+        context = current_context()
+        assert isinstance(context, LoggingContext)
+        self.assertEqual(context.name, value)
 
-    def test_with_context(self):
+    def test_with_context(self) -> None:
         with LoggingContext("test"):
             self._check_test_key("test")
 
     @defer.inlineCallbacks
-    def test_sleep(self):
+    def test_sleep(self) -> Generator["defer.Deferred[object]", object, None]:
         clock = Clock(reactor)
 
         @defer.inlineCallbacks
-        def competing_callback():
+        def competing_callback() -> Generator["defer.Deferred[object]", object, None]:
             with LoggingContext("competing"):
                 yield clock.sleep(0)
                 self._check_test_key("competing")
@@ -39,17 +60,18 @@ class LoggingContextTestCase(unittest.TestCase):
             yield clock.sleep(0)
             self._check_test_key("one")
 
-    def _test_run_in_background(self, function):
+    def _test_run_in_background(self, function: Callable[[], object]) -> defer.Deferred:
         sentinel_context = current_context()
 
-        callback_completed = [False]
+        callback_completed = False
 
         with LoggingContext("one"):
             # fire off function, but don't wait on it.
             d2 = run_in_background(function)
 
-            def cb(res):
-                callback_completed[0] = True
+            def cb(res: object) -> object:
+                nonlocal callback_completed
+                callback_completed = True
                 return res
 
             d2.addCallback(cb)
@@ -60,8 +82,8 @@ class LoggingContextTestCase(unittest.TestCase):
         # the logcontext is left in a sane state.
         d2 = defer.Deferred()
 
-        def check_logcontext():
-            if not callback_completed[0]:
+        def check_logcontext() -> None:
+            if not callback_completed:
                 reactor.callLater(0.01, check_logcontext)
                 return
 
@@ -78,31 +100,31 @@ class LoggingContextTestCase(unittest.TestCase):
         # test is done once d2 finishes
         return d2
 
-    def test_run_in_background_with_blocking_fn(self):
+    def test_run_in_background_with_blocking_fn(self) -> defer.Deferred:
         @defer.inlineCallbacks
-        def blocking_function():
+        def blocking_function() -> Generator["defer.Deferred[object]", object, None]:
             yield Clock(reactor).sleep(0)
 
         return self._test_run_in_background(blocking_function)
 
-    def test_run_in_background_with_non_blocking_fn(self):
+    def test_run_in_background_with_non_blocking_fn(self) -> defer.Deferred:
         @defer.inlineCallbacks
-        def nonblocking_function():
+        def nonblocking_function() -> Generator["defer.Deferred[object]", object, None]:
             with PreserveLoggingContext():
                 yield defer.succeed(None)
 
         return self._test_run_in_background(nonblocking_function)
 
-    def test_run_in_background_with_chained_deferred(self):
+    def test_run_in_background_with_chained_deferred(self) -> defer.Deferred:
         # a function which returns a deferred which looks like it has been
         # called, but is actually paused
-        def testfunc():
+        def testfunc() -> defer.Deferred:
             return make_deferred_yieldable(_chained_deferred_function())
 
         return self._test_run_in_background(testfunc)
 
-    def test_run_in_background_with_coroutine(self):
-        async def testfunc():
+    def test_run_in_background_with_coroutine(self) -> defer.Deferred:
+        async def testfunc() -> None:
             self._check_test_key("one")
             d = Clock(reactor).sleep(0)
             self.assertIs(current_context(), SENTINEL_CONTEXT)
@@ -111,18 +133,20 @@ class LoggingContextTestCase(unittest.TestCase):
 
         return self._test_run_in_background(testfunc)
 
-    def test_run_in_background_with_nonblocking_coroutine(self):
-        async def testfunc():
+    def test_run_in_background_with_nonblocking_coroutine(self) -> defer.Deferred:
+        async def testfunc() -> None:
             self._check_test_key("one")
 
         return self._test_run_in_background(testfunc)
 
     @defer.inlineCallbacks
-    def test_make_deferred_yieldable(self):
+    def test_make_deferred_yieldable(
+        self,
+    ) -> Generator["defer.Deferred[object]", object, None]:
         # a function which returns an incomplete deferred, but doesn't follow
         # the synapse rules.
-        def blocking_function():
-            d = defer.Deferred()
+        def blocking_function() -> defer.Deferred:
+            d: defer.Deferred = defer.Deferred()
             reactor.callLater(0, d.callback, None)
             return d
 
@@ -139,7 +163,9 @@ class LoggingContextTestCase(unittest.TestCase):
             self._check_test_key("one")
 
     @defer.inlineCallbacks
-    def test_make_deferred_yieldable_with_chained_deferreds(self):
+    def test_make_deferred_yieldable_with_chained_deferreds(
+        self,
+    ) -> Generator["defer.Deferred[object]", object, None]:
         sentinel_context = current_context()
 
         with LoggingContext("one"):
@@ -152,7 +178,7 @@ class LoggingContextTestCase(unittest.TestCase):
             # now it should be restored
             self._check_test_key("one")
 
-    def test_nested_logging_context(self):
+    def test_nested_logging_context(self) -> None:
         with LoggingContext("foo"):
             nested_context = nested_logging_context(suffix="bar")
             self.assertEqual(nested_context.name, "foo-bar")
@@ -161,11 +187,11 @@ class LoggingContextTestCase(unittest.TestCase):
 # a function which returns a deferred which has been "called", but
 # which had a function which returned another incomplete deferred on
 # its callback list, so won't yet call any other new callbacks.
-def _chained_deferred_function():
+def _chained_deferred_function() -> defer.Deferred:
     d = defer.succeed(None)
 
-    def cb(res):
-        d2 = defer.Deferred()
+    def cb(res: object) -> defer.Deferred:
+        d2: defer.Deferred = defer.Deferred()
         reactor.callLater(0, d2.callback, res)
         return d2
 
diff --git a/tests/util/test_logformatter.py b/tests/util/test_logformatter.py
index a2e08281e6..0dee69a6fe 100644
--- a/tests/util/test_logformatter.py
+++ b/tests/util/test_logformatter.py
@@ -23,7 +23,7 @@ class TestException(Exception):
 
 
 class LogFormatterTestCase(unittest.TestCase):
-    def test_formatter(self):
+    def test_formatter(self) -> None:
         formatter = LogFormatter()
 
         try:
diff --git a/tests/util/test_lrucache.py b/tests/util/test_lrucache.py
index 67173a4f5b..1fc5a473f0 100644
--- a/tests/util/test_lrucache.py
+++ b/tests/util/test_lrucache.py
@@ -13,10 +13,11 @@
 # limitations under the License.
 
 
-from typing import List
+from typing import List, Tuple
 from unittest.mock import Mock, patch
 
 from synapse.metrics.jemalloc import JemallocStats
+from synapse.types import JsonDict
 from synapse.util.caches.lrucache import LruCache, setup_expire_lru_cache_entries
 from synapse.util.caches.treecache import TreeCache
 
@@ -25,14 +26,14 @@ from tests.unittest import override_config
 
 
 class LruCacheTestCase(unittest.HomeserverTestCase):
-    def test_get_set(self):
-        cache = LruCache(1)
+    def test_get_set(self) -> None:
+        cache: LruCache[str, str] = LruCache(1)
         cache["key"] = "value"
         self.assertEqual(cache.get("key"), "value")
         self.assertEqual(cache["key"], "value")
 
-    def test_eviction(self):
-        cache = LruCache(2)
+    def test_eviction(self) -> None:
+        cache: LruCache[int, int] = LruCache(2)
         cache[1] = 1
         cache[2] = 2
 
@@ -45,8 +46,8 @@ class LruCacheTestCase(unittest.HomeserverTestCase):
         self.assertEqual(cache.get(2), 2)
         self.assertEqual(cache.get(3), 3)
 
-    def test_setdefault(self):
-        cache = LruCache(1)
+    def test_setdefault(self) -> None:
+        cache: LruCache[str, int] = LruCache(1)
         self.assertEqual(cache.setdefault("key", 1), 1)
         self.assertEqual(cache.get("key"), 1)
         self.assertEqual(cache.setdefault("key", 2), 1)
@@ -54,14 +55,15 @@ class LruCacheTestCase(unittest.HomeserverTestCase):
         cache["key"] = 2  # Make sure overriding works.
         self.assertEqual(cache.get("key"), 2)
 
-    def test_pop(self):
-        cache = LruCache(1)
+    def test_pop(self) -> None:
+        cache: LruCache[str, int] = LruCache(1)
         cache["key"] = 1
         self.assertEqual(cache.pop("key"), 1)
         self.assertEqual(cache.pop("key"), None)
 
-    def test_del_multi(self):
-        cache = LruCache(4, cache_type=TreeCache)
+    def test_del_multi(self) -> None:
+        # The type here isn't quite correct as they don't handle TreeCache well.
+        cache: LruCache[Tuple[str, str], str] = LruCache(4, cache_type=TreeCache)
         cache[("animal", "cat")] = "mew"
         cache[("animal", "dog")] = "woof"
         cache[("vehicles", "car")] = "vroom"
@@ -71,7 +73,7 @@ class LruCacheTestCase(unittest.HomeserverTestCase):
 
         self.assertEqual(cache.get(("animal", "cat")), "mew")
         self.assertEqual(cache.get(("vehicles", "car")), "vroom")
-        cache.del_multi(("animal",))
+        cache.del_multi(("animal",))  # type: ignore[arg-type]
         self.assertEqual(len(cache), 2)
         self.assertEqual(cache.get(("animal", "cat")), None)
         self.assertEqual(cache.get(("animal", "dog")), None)
@@ -79,22 +81,22 @@ class LruCacheTestCase(unittest.HomeserverTestCase):
         self.assertEqual(cache.get(("vehicles", "train")), "chuff")
         # Man from del_multi say "Yes".
 
-    def test_clear(self):
-        cache = LruCache(1)
+    def test_clear(self) -> None:
+        cache: LruCache[str, int] = LruCache(1)
         cache["key"] = 1
         cache.clear()
         self.assertEqual(len(cache), 0)
 
     @override_config({"caches": {"per_cache_factors": {"mycache": 10}}})
-    def test_special_size(self):
-        cache = LruCache(10, "mycache")
+    def test_special_size(self) -> None:
+        cache: LruCache = LruCache(10, "mycache")
         self.assertEqual(cache.max_size, 100)
 
 
 class LruCacheCallbacksTestCase(unittest.HomeserverTestCase):
-    def test_get(self):
+    def test_get(self) -> None:
         m = Mock()
-        cache = LruCache(1)
+        cache: LruCache[str, str] = LruCache(1)
 
         cache.set("key", "value")
         self.assertFalse(m.called)
@@ -111,9 +113,9 @@ class LruCacheCallbacksTestCase(unittest.HomeserverTestCase):
         cache.set("key", "value")
         self.assertEqual(m.call_count, 1)
 
-    def test_multi_get(self):
+    def test_multi_get(self) -> None:
         m = Mock()
-        cache = LruCache(1)
+        cache: LruCache[str, str] = LruCache(1)
 
         cache.set("key", "value")
         self.assertFalse(m.called)
@@ -130,9 +132,9 @@ class LruCacheCallbacksTestCase(unittest.HomeserverTestCase):
         cache.set("key", "value")
         self.assertEqual(m.call_count, 1)
 
-    def test_set(self):
+    def test_set(self) -> None:
         m = Mock()
-        cache = LruCache(1)
+        cache: LruCache[str, str] = LruCache(1)
 
         cache.set("key", "value", callbacks=[m])
         self.assertFalse(m.called)
@@ -146,9 +148,9 @@ class LruCacheCallbacksTestCase(unittest.HomeserverTestCase):
         cache.set("key", "value")
         self.assertEqual(m.call_count, 1)
 
-    def test_pop(self):
+    def test_pop(self) -> None:
         m = Mock()
-        cache = LruCache(1)
+        cache: LruCache[str, str] = LruCache(1)
 
         cache.set("key", "value", callbacks=[m])
         self.assertFalse(m.called)
@@ -162,12 +164,13 @@ class LruCacheCallbacksTestCase(unittest.HomeserverTestCase):
         cache.pop("key")
         self.assertEqual(m.call_count, 1)
 
-    def test_del_multi(self):
+    def test_del_multi(self) -> None:
         m1 = Mock()
         m2 = Mock()
         m3 = Mock()
         m4 = Mock()
-        cache = LruCache(4, cache_type=TreeCache)
+        # The type here isn't quite correct as they don't handle TreeCache well.
+        cache: LruCache[Tuple[str, str], str] = LruCache(4, cache_type=TreeCache)
 
         cache.set(("a", "1"), "value", callbacks=[m1])
         cache.set(("a", "2"), "value", callbacks=[m2])
@@ -179,17 +182,17 @@ class LruCacheCallbacksTestCase(unittest.HomeserverTestCase):
         self.assertEqual(m3.call_count, 0)
         self.assertEqual(m4.call_count, 0)
 
-        cache.del_multi(("a",))
+        cache.del_multi(("a",))  # type: ignore[arg-type]
 
         self.assertEqual(m1.call_count, 1)
         self.assertEqual(m2.call_count, 1)
         self.assertEqual(m3.call_count, 0)
         self.assertEqual(m4.call_count, 0)
 
-    def test_clear(self):
+    def test_clear(self) -> None:
         m1 = Mock()
         m2 = Mock()
-        cache = LruCache(5)
+        cache: LruCache[str, str] = LruCache(5)
 
         cache.set("key1", "value", callbacks=[m1])
         cache.set("key2", "value", callbacks=[m2])
@@ -202,11 +205,11 @@ class LruCacheCallbacksTestCase(unittest.HomeserverTestCase):
         self.assertEqual(m1.call_count, 1)
         self.assertEqual(m2.call_count, 1)
 
-    def test_eviction(self):
+    def test_eviction(self) -> None:
         m1 = Mock(name="m1")
         m2 = Mock(name="m2")
         m3 = Mock(name="m3")
-        cache = LruCache(2)
+        cache: LruCache[str, str] = LruCache(2)
 
         cache.set("key1", "value", callbacks=[m1])
         cache.set("key2", "value", callbacks=[m2])
@@ -241,8 +244,8 @@ class LruCacheCallbacksTestCase(unittest.HomeserverTestCase):
 
 
 class LruCacheSizedTestCase(unittest.HomeserverTestCase):
-    def test_evict(self):
-        cache = LruCache(5, size_callback=len)
+    def test_evict(self) -> None:
+        cache: LruCache[str, List[int]] = LruCache(5, size_callback=len)
         cache["key1"] = [0]
         cache["key2"] = [1, 2]
         cache["key3"] = [3]
@@ -269,6 +272,7 @@ class LruCacheSizedTestCase(unittest.HomeserverTestCase):
         cache["key1"] = []
 
         self.assertEqual(len(cache), 0)
+        assert isinstance(cache.cache, dict)
         cache.cache["key1"].drop_from_cache()
         self.assertIsNone(
             cache.pop("key1"), "Cache entry should have been evicted but wasn't"
@@ -278,17 +282,17 @@ class LruCacheSizedTestCase(unittest.HomeserverTestCase):
 class TimeEvictionTestCase(unittest.HomeserverTestCase):
     """Test that time based eviction works correctly."""
 
-    def default_config(self):
+    def default_config(self) -> JsonDict:
         config = super().default_config()
 
         config.setdefault("caches", {})["expiry_time"] = "30m"
 
         return config
 
-    def test_evict(self):
+    def test_evict(self) -> None:
         setup_expire_lru_cache_entries(self.hs)
 
-        cache = LruCache(5, clock=self.hs.get_clock())
+        cache: LruCache[str, int] = LruCache(5, clock=self.hs.get_clock())
 
         # Check that we evict entries we haven't accessed for 30 minutes.
         cache["key1"] = 1
@@ -332,7 +336,7 @@ class MemoryEvictionTestCase(unittest.HomeserverTestCase):
         }
     )
     @patch("synapse.util.caches.lrucache.get_jemalloc_stats")
-    def test_evict_memory(self, jemalloc_interface) -> None:
+    def test_evict_memory(self, jemalloc_interface: Mock) -> None:
         mock_jemalloc_class = Mock(spec=JemallocStats)
         jemalloc_interface.return_value = mock_jemalloc_class
 
@@ -340,7 +344,7 @@ class MemoryEvictionTestCase(unittest.HomeserverTestCase):
         mock_jemalloc_class.get_stat.return_value = 924288000
 
         setup_expire_lru_cache_entries(self.hs)
-        cache = LruCache(4, clock=self.hs.get_clock())
+        cache: LruCache[str, int] = LruCache(4, clock=self.hs.get_clock())
 
         cache["key1"] = 1
         cache["key2"] = 2
diff --git a/tests/util/test_macaroons.py b/tests/util/test_macaroons.py
index 40754a4711..f68377a05a 100644
--- a/tests/util/test_macaroons.py
+++ b/tests/util/test_macaroons.py
@@ -21,14 +21,14 @@ from tests.unittest import TestCase
 
 
 class MacaroonGeneratorTestCase(TestCase):
-    def setUp(self):
+    def setUp(self) -> None:
         self.reactor, hs_clock = get_clock()
         self.macaroon_generator = MacaroonGenerator(hs_clock, "tesths", b"verysecret")
         self.other_macaroon_generator = MacaroonGenerator(
             hs_clock, "tesths", b"anothersecretkey"
         )
 
-    def test_guest_access_token(self):
+    def test_guest_access_token(self) -> None:
         """Test the generation and verification of guest access tokens"""
         token = self.macaroon_generator.generate_guest_access_token("@user:tesths")
         user_id = self.macaroon_generator.verify_guest_token(token)
@@ -47,7 +47,7 @@ class MacaroonGeneratorTestCase(TestCase):
         with self.assertRaises(MacaroonVerificationFailedException):
             self.macaroon_generator.verify_guest_token(token)
 
-    def test_delete_pusher_token(self):
+    def test_delete_pusher_token(self) -> None:
         """Test the generation and verification of delete_pusher tokens"""
         token = self.macaroon_generator.generate_delete_pusher_token(
             "@user:tesths", "m.mail", "john@example.com"
@@ -84,7 +84,7 @@ class MacaroonGeneratorTestCase(TestCase):
         )
         self.assertEqual(user_id, "@user:tesths")
 
-    def test_oidc_session_token(self):
+    def test_oidc_session_token(self) -> None:
         """Test the generation and verification of OIDC session cookies"""
         state = "arandomstate"
         session_data = OidcSessionData(
diff --git a/tests/util/test_ratelimitutils.py b/tests/util/test_ratelimitutils.py
index 89d8656634..5b327b390e 100644
--- a/tests/util/test_ratelimitutils.py
+++ b/tests/util/test_ratelimitutils.py
@@ -13,16 +13,19 @@
 # limitations under the License.
 from typing import Optional
 
+from twisted.internet.defer import Deferred
+
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.ratelimiting import FederationRatelimitSettings
 from synapse.util.ratelimitutils import FederationRateLimiter
 
-from tests.server import get_clock
+from tests.server import ThreadedMemoryReactorClock, get_clock
 from tests.unittest import TestCase
 from tests.utils import default_config
 
 
 class FederationRateLimiterTestCase(TestCase):
-    def test_ratelimit(self):
+    def test_ratelimit(self) -> None:
         """A simple test with the default values"""
         reactor, clock = get_clock()
         rc_config = build_rc_config()
@@ -32,7 +35,7 @@ class FederationRateLimiterTestCase(TestCase):
             # shouldn't block
             self.successResultOf(d1)
 
-    def test_concurrent_limit(self):
+    def test_concurrent_limit(self) -> None:
         """Test what happens when we hit the concurrent limit"""
         reactor, clock = get_clock()
         rc_config = build_rc_config({"rc_federation": {"concurrent": 2}})
@@ -56,7 +59,7 @@ class FederationRateLimiterTestCase(TestCase):
             cm2.__exit__(None, None, None)
             self.successResultOf(d3)
 
-    def test_sleep_limit(self):
+    def test_sleep_limit(self) -> None:
         """Test what happens when we hit the sleep limit"""
         reactor, clock = get_clock()
         rc_config = build_rc_config(
@@ -79,7 +82,7 @@ class FederationRateLimiterTestCase(TestCase):
             self.assertAlmostEqual(sleep_time, 500, places=3)
 
 
-def _await_resolution(reactor, d):
+def _await_resolution(reactor: ThreadedMemoryReactorClock, d: Deferred) -> float:
     """advance the clock until the deferred completes.
 
     Returns the number of milliseconds it took to complete.
@@ -90,7 +93,7 @@ def _await_resolution(reactor, d):
     return (reactor.seconds() - start_time) * 1000
 
 
-def build_rc_config(settings: Optional[dict] = None):
+def build_rc_config(settings: Optional[dict] = None) -> FederationRatelimitSettings:
     config_dict = default_config("test")
     config_dict.update(settings or {})
     config = HomeServerConfig()
diff --git a/tests/util/test_retryutils.py b/tests/util/test_retryutils.py
index 26cb71c640..9529ee53c8 100644
--- a/tests/util/test_retryutils.py
+++ b/tests/util/test_retryutils.py
@@ -22,7 +22,7 @@ from tests.unittest import HomeserverTestCase
 
 
 class RetryLimiterTestCase(HomeserverTestCase):
-    def test_new_destination(self):
+    def test_new_destination(self) -> None:
         """A happy-path case with a new destination and a successful operation"""
         store = self.hs.get_datastores().main
         limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
@@ -36,7 +36,7 @@ class RetryLimiterTestCase(HomeserverTestCase):
         new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
         self.assertIsNone(new_timings)
 
-    def test_limiter(self):
+    def test_limiter(self) -> None:
         """General test case which walks through the process of a failing request"""
         store = self.hs.get_datastores().main
 
diff --git a/tests/util/test_rwlock.py b/tests/util/test_rwlock.py
index 5da04362a9..bc93de62eb 100644
--- a/tests/util/test_rwlock.py
+++ b/tests/util/test_rwlock.py
@@ -49,7 +49,7 @@ class ReadWriteLockTestCase(unittest.TestCase):
         acquired_d: "Deferred[None]" = Deferred()
         unblock_d: "Deferred[None]" = Deferred()
 
-        async def reader_or_writer():
+        async def reader_or_writer() -> str:
             async with read_or_write(key):
                 acquired_d.callback(None)
                 await unblock_d
@@ -134,7 +134,7 @@ class ReadWriteLockTestCase(unittest.TestCase):
                 d.called, msg="deferred %d was unexpectedly resolved" % (i + n)
             )
 
-    def test_rwlock(self):
+    def test_rwlock(self) -> None:
         rwlock = ReadWriteLock()
         key = "key"
 
@@ -197,7 +197,7 @@ class ReadWriteLockTestCase(unittest.TestCase):
         _, acquired_d = self._start_nonblocking_reader(rwlock, key, "last reader")
         self.assertTrue(acquired_d.called)
 
-    def test_lock_handoff_to_nonblocking_writer(self):
+    def test_lock_handoff_to_nonblocking_writer(self) -> None:
         """Test a writer handing the lock to another writer that completes instantly."""
         rwlock = ReadWriteLock()
         key = "key"
@@ -216,7 +216,7 @@ class ReadWriteLockTestCase(unittest.TestCase):
         d3, _ = self._start_nonblocking_writer(rwlock, key, "write 3 completed")
         self.assertTrue(d3.called)
 
-    def test_cancellation_while_holding_read_lock(self):
+    def test_cancellation_while_holding_read_lock(self) -> None:
         """Test cancellation while holding a read lock.
 
         A waiting writer should be given the lock when the reader holding the lock is
@@ -242,7 +242,7 @@ class ReadWriteLockTestCase(unittest.TestCase):
         )
         self.assertEqual("write completed", self.successResultOf(writer_d))
 
-    def test_cancellation_while_holding_write_lock(self):
+    def test_cancellation_while_holding_write_lock(self) -> None:
         """Test cancellation while holding a write lock.
 
         A waiting reader should be given the lock when the writer holding the lock is
@@ -268,7 +268,7 @@ class ReadWriteLockTestCase(unittest.TestCase):
         )
         self.assertEqual("read completed", self.successResultOf(reader_d))
 
-    def test_cancellation_while_waiting_for_read_lock(self):
+    def test_cancellation_while_waiting_for_read_lock(self) -> None:
         """Test cancellation while waiting for a read lock.
 
         Tests that cancelling a waiting reader:
@@ -319,7 +319,7 @@ class ReadWriteLockTestCase(unittest.TestCase):
         )
         self.assertEqual("write 2 completed", self.successResultOf(writer2_d))
 
-    def test_cancellation_while_waiting_for_write_lock(self):
+    def test_cancellation_while_waiting_for_write_lock(self) -> None:
         """Test cancellation while waiting for a write lock.
 
         Tests that cancelling a waiting writer:
diff --git a/tests/util/test_stream_change_cache.py b/tests/util/test_stream_change_cache.py
index 9ed01f7e0c..0305741c99 100644
--- a/tests/util/test_stream_change_cache.py
+++ b/tests/util/test_stream_change_cache.py
@@ -8,7 +8,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
     Tests for StreamChangeCache.
     """
 
-    def test_prefilled_cache(self):
+    def test_prefilled_cache(self) -> None:
         """
         Providing a prefilled cache to StreamChangeCache will result in a cache
         with the prefilled-cache entered in.
@@ -16,7 +16,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
         cache = StreamChangeCache("#test", 1, prefilled_cache={"user@foo.com": 2})
         self.assertTrue(cache.has_entity_changed("user@foo.com", 1))
 
-    def test_has_entity_changed(self):
+    def test_has_entity_changed(self) -> None:
         """
         StreamChangeCache.entity_has_changed will mark entities as changed, and
         has_entity_changed will observe the changed entities.
@@ -51,8 +51,10 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
         # return True, whether it's a known entity or not.
         self.assertTrue(cache.has_entity_changed("user@foo.com", 0))
         self.assertTrue(cache.has_entity_changed("not@here.website", 0))
+        self.assertTrue(cache.has_entity_changed("user@foo.com", 3))
+        self.assertTrue(cache.has_entity_changed("not@here.website", 3))
 
-    def test_entity_has_changed_pops_off_start(self):
+    def test_entity_has_changed_pops_off_start(self) -> None:
         """
         StreamChangeCache.entity_has_changed will respect the max size and
         purge the oldest items upon reaching that max size.
@@ -65,15 +67,16 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
 
         # The cache is at the max size, 2
         self.assertEqual(len(cache._cache), 2)
+        # The cache's earliest known position is 2.
+        self.assertEqual(cache._earliest_known_stream_pos, 2)
 
         # The oldest item has been popped off
         self.assertTrue("user@foo.com" not in cache._entity_to_key)
 
         self.assertEqual(
-            cache.get_all_entities_changed(2),
-            ["bar@baz.net", "user@elsewhere.org"],
+            cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
         )
-        self.assertIsNone(cache.get_all_entities_changed(1))
+        self.assertFalse(cache.get_all_entities_changed(2).hit)
 
         # If we update an existing entity, it keeps the two existing entities
         cache.entity_has_changed("bar@baz.net", 5)
@@ -81,12 +84,12 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
             {"bar@baz.net", "user@elsewhere.org"}, set(cache._entity_to_key)
         )
         self.assertEqual(
-            cache.get_all_entities_changed(2),
+            cache.get_all_entities_changed(3).entities,
             ["user@elsewhere.org", "bar@baz.net"],
         )
-        self.assertIsNone(cache.get_all_entities_changed(1))
+        self.assertFalse(cache.get_all_entities_changed(2).hit)
 
-    def test_get_all_entities_changed(self):
+    def test_get_all_entities_changed(self) -> None:
         """
         StreamChangeCache.get_all_entities_changed will return all changed
         entities since the given position.  If the position is before the start
@@ -99,28 +102,17 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
         cache.entity_has_changed("anotheruser@foo.com", 3)
         cache.entity_has_changed("user@elsewhere.org", 4)
 
-        r = cache.get_all_entities_changed(1)
-
-        # either of these are valid
-        ok1 = [
-            "user@foo.com",
-            "bar@baz.net",
-            "anotheruser@foo.com",
-            "user@elsewhere.org",
-        ]
-        ok2 = [
-            "user@foo.com",
-            "anotheruser@foo.com",
-            "bar@baz.net",
-            "user@elsewhere.org",
-        ]
-        self.assertTrue(r == ok1 or r == ok2)
-
         r = cache.get_all_entities_changed(2)
-        self.assertTrue(r == ok1[1:] or r == ok2[1:])
 
-        self.assertEqual(cache.get_all_entities_changed(3), ["user@elsewhere.org"])
-        self.assertEqual(cache.get_all_entities_changed(0), None)
+        # Results are ordered so either of these are valid.
+        ok1 = ["bar@baz.net", "anotheruser@foo.com", "user@elsewhere.org"]
+        ok2 = ["anotheruser@foo.com", "bar@baz.net", "user@elsewhere.org"]
+        self.assertTrue(r.entities == ok1 or r.entities == ok2)
+
+        self.assertEqual(
+            cache.get_all_entities_changed(3).entities, ["user@elsewhere.org"]
+        )
+        self.assertFalse(cache.get_all_entities_changed(1).hit)
 
         # ... later, things gest more updates
         cache.entity_has_changed("user@foo.com", 5)
@@ -140,9 +132,9 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
             "anotheruser@foo.com",
         ]
         r = cache.get_all_entities_changed(3)
-        self.assertTrue(r == ok1 or r == ok2)
+        self.assertTrue(r.entities == ok1 or r.entities == ok2)
 
-    def test_has_any_entity_changed(self):
+    def test_has_any_entity_changed(self) -> None:
         """
         StreamChangeCache.has_any_entity_changed will return True if any
         entities have been changed since the provided stream position, and
@@ -168,7 +160,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
         self.assertFalse(cache.has_any_entity_changed(2))
         self.assertFalse(cache.has_any_entity_changed(3))
 
-    def test_get_entities_changed(self):
+    def test_get_entities_changed(self) -> None:
         """
         StreamChangeCache.get_entities_changed will return the entities in the
         given list that have changed since the provided stream ID.  If the
@@ -228,7 +220,7 @@ class StreamChangeCacheTests(unittest.HomeserverTestCase):
             {"bar@baz.net"},
         )
 
-    def test_max_pos(self):
+    def test_max_pos(self) -> None:
         """
         StreamChangeCache.get_max_pos_of_last_change will return the most
         recent point where the entity could have changed.  If the entity is not
diff --git a/tests/util/test_stringutils.py b/tests/util/test_stringutils.py
index ad4dd7f007..f137e05191 100644
--- a/tests/util/test_stringutils.py
+++ b/tests/util/test_stringutils.py
@@ -19,7 +19,7 @@ from .. import unittest
 
 
 class StringUtilsTestCase(unittest.TestCase):
-    def test_client_secret_regex(self):
+    def test_client_secret_regex(self) -> None:
         """Ensure that client_secret does not contain illegal characters"""
         good = [
             "abcde12345",
@@ -46,7 +46,7 @@ class StringUtilsTestCase(unittest.TestCase):
             with self.assertRaises(SynapseError):
                 assert_valid_client_secret(client_secret)
 
-    def test_base62_encode(self):
+    def test_base62_encode(self) -> None:
         self.assertEqual("0", base62_encode(0))
         self.assertEqual("10", base62_encode(62))
         self.assertEqual("1c", base62_encode(100))
diff --git a/tests/util/test_threepids.py b/tests/util/test_threepids.py
index d957b953bb..3b35b8e4ec 100644
--- a/tests/util/test_threepids.py
+++ b/tests/util/test_threepids.py
@@ -18,31 +18,31 @@ from tests.unittest import HomeserverTestCase
 
 
 class CanonicaliseEmailTests(HomeserverTestCase):
-    def test_no_at(self):
+    def test_no_at(self) -> None:
         with self.assertRaises(ValueError):
             canonicalise_email("address-without-at.bar")
 
-    def test_two_at(self):
+    def test_two_at(self) -> None:
         with self.assertRaises(ValueError):
             canonicalise_email("foo@foo@test.bar")
 
-    def test_bad_format(self):
+    def test_bad_format(self) -> None:
         with self.assertRaises(ValueError):
             canonicalise_email("user@bad.example.net@good.example.com")
 
-    def test_valid_format(self):
+    def test_valid_format(self) -> None:
         self.assertEqual(canonicalise_email("foo@test.bar"), "foo@test.bar")
 
-    def test_domain_to_lower(self):
+    def test_domain_to_lower(self) -> None:
         self.assertEqual(canonicalise_email("foo@TEST.BAR"), "foo@test.bar")
 
-    def test_domain_with_umlaut(self):
+    def test_domain_with_umlaut(self) -> None:
         self.assertEqual(canonicalise_email("foo@Öumlaut.com"), "foo@öumlaut.com")
 
-    def test_address_casefold(self):
+    def test_address_casefold(self) -> None:
         self.assertEqual(
             canonicalise_email("Strauß@Example.com"), "strauss@example.com"
         )
 
-    def test_address_trim(self):
+    def test_address_trim(self) -> None:
         self.assertEqual(canonicalise_email(" foo@test.bar "), "foo@test.bar")
diff --git a/tests/util/test_treecache.py b/tests/util/test_treecache.py
index 567cb18468..fe3b4dc6a4 100644
--- a/tests/util/test_treecache.py
+++ b/tests/util/test_treecache.py
@@ -19,7 +19,7 @@ from .. import unittest
 
 
 class TreeCacheTestCase(unittest.TestCase):
-    def test_get_set_onelevel(self):
+    def test_get_set_onelevel(self) -> None:
         cache = TreeCache()
         cache[("a",)] = "A"
         cache[("b",)] = "B"
@@ -27,7 +27,7 @@ class TreeCacheTestCase(unittest.TestCase):
         self.assertEqual(cache.get(("b",)), "B")
         self.assertEqual(len(cache), 2)
 
-    def test_pop_onelevel(self):
+    def test_pop_onelevel(self) -> None:
         cache = TreeCache()
         cache[("a",)] = "A"
         cache[("b",)] = "B"
@@ -36,7 +36,7 @@ class TreeCacheTestCase(unittest.TestCase):
         self.assertEqual(cache.get(("b",)), "B")
         self.assertEqual(len(cache), 1)
 
-    def test_get_set_twolevel(self):
+    def test_get_set_twolevel(self) -> None:
         cache = TreeCache()
         cache[("a", "a")] = "AA"
         cache[("a", "b")] = "AB"
@@ -46,7 +46,7 @@ class TreeCacheTestCase(unittest.TestCase):
         self.assertEqual(cache.get(("b", "a")), "BA")
         self.assertEqual(len(cache), 3)
 
-    def test_pop_twolevel(self):
+    def test_pop_twolevel(self) -> None:
         cache = TreeCache()
         cache[("a", "a")] = "AA"
         cache[("a", "b")] = "AB"
@@ -58,7 +58,7 @@ class TreeCacheTestCase(unittest.TestCase):
         self.assertEqual(cache.pop(("b", "a")), None)
         self.assertEqual(len(cache), 1)
 
-    def test_pop_mixedlevel(self):
+    def test_pop_mixedlevel(self) -> None:
         cache = TreeCache()
         cache[("a", "a")] = "AA"
         cache[("a", "b")] = "AB"
@@ -72,14 +72,14 @@ class TreeCacheTestCase(unittest.TestCase):
 
         self.assertEqual({"AA", "AB"}, set(iterate_tree_cache_entry(popped)))
 
-    def test_clear(self):
+    def test_clear(self) -> None:
         cache = TreeCache()
         cache[("a",)] = "A"
         cache[("b",)] = "B"
         cache.clear()
         self.assertEqual(len(cache), 0)
 
-    def test_contains(self):
+    def test_contains(self) -> None:
         cache = TreeCache()
         cache[("a",)] = "A"
         self.assertTrue(("a",) in cache)
diff --git a/tests/util/test_wheel_timer.py b/tests/util/test_wheel_timer.py
index 0d5039de04..c9d22b6d8c 100644
--- a/tests/util/test_wheel_timer.py
+++ b/tests/util/test_wheel_timer.py
@@ -18,8 +18,8 @@ from .. import unittest
 
 
 class WheelTimerTestCase(unittest.TestCase):
-    def test_single_insert_fetch(self):
-        wheel = WheelTimer(bucket_size=5)
+    def test_single_insert_fetch(self) -> None:
+        wheel: WheelTimer[object] = WheelTimer(bucket_size=5)
 
         obj = object()
         wheel.insert(100, obj, 150)
@@ -32,8 +32,8 @@ class WheelTimerTestCase(unittest.TestCase):
         self.assertListEqual(wheel.fetch(156), [obj])
         self.assertListEqual(wheel.fetch(170), [])
 
-    def test_multi_insert(self):
-        wheel = WheelTimer(bucket_size=5)
+    def test_multi_insert(self) -> None:
+        wheel: WheelTimer[object] = WheelTimer(bucket_size=5)
 
         obj1 = object()
         obj2 = object()
@@ -50,15 +50,15 @@ class WheelTimerTestCase(unittest.TestCase):
         self.assertListEqual(wheel.fetch(200), [obj3])
         self.assertListEqual(wheel.fetch(210), [])
 
-    def test_insert_past(self):
-        wheel = WheelTimer(bucket_size=5)
+    def test_insert_past(self) -> None:
+        wheel: WheelTimer[object] = WheelTimer(bucket_size=5)
 
         obj = object()
         wheel.insert(100, obj, 50)
         self.assertListEqual(wheel.fetch(120), [obj])
 
-    def test_insert_past_multi(self):
-        wheel = WheelTimer(bucket_size=5)
+    def test_insert_past_multi(self) -> None:
+        wheel: WheelTimer[object] = WheelTimer(bucket_size=5)
 
         obj1 = object()
         obj2 = object()
diff --git a/tests/utils.py b/tests/utils.py
index 045a8b5fa7..d76bf9716a 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -125,7 +125,8 @@ def default_config(
     """
     config_dict = {
         "server_name": name,
-        "send_federation": False,
+        # Setting this to an empty list turns off federation sending.
+        "federation_sender_instances": [],
         "media_store_path": "media",
         # the test signing key is just an arbitrary ed25519 key to keep the config
         # parser happy
@@ -183,8 +184,9 @@ def default_config(
         # rooms will fail.
         "default_room_version": DEFAULT_ROOM_VERSION,
         # disable user directory updates, because they get done in the
-        # background, which upsets the test runner.
-        "update_user_directory": False,
+        # background, which upsets the test runner. Setting this to an
+        # (obviously) fake worker name disables updating the user directory.
+        "update_user_directory_from_worker": "does_not_exist_worker_name",
         "caches": {"global_factor": 1, "sync_response_cache_duration": 0},
         "listeners": [{"port": 0, "type": "http"}],
     }