summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/11157.misc1
-rw-r--r--changelog.d/11188.bugfix1
-rw-r--r--changelog.d/11207.bugfix1
-rw-r--r--changelog.d/11210.feature1
-rw-r--r--changelog.d/11228.feature1
-rw-r--r--changelog.d/11233.misc1
-rw-r--r--changelog.d/11234.bugfix1
-rw-r--r--changelog.d/11236.feature1
-rw-r--r--changelog.d/11237.misc1
-rw-r--r--changelog.d/11239.misc1
-rw-r--r--changelog.d/11240.bugfix1
-rw-r--r--changelog.d/11242.misc1
-rw-r--r--changelog.d/11244.misc1
-rw-r--r--changelog.d/11246.misc1
-rw-r--r--changelog.d/11247.misc1
-rw-r--r--changelog.d/11253.misc1
-rw-r--r--changelog.d/11255.bugfix1
-rw-r--r--changelog.d/11257.doc1
-rw-r--r--changelog.d/11262.bugfix1
-rw-r--r--changelog.d/11263.feature1
-rw-r--r--changelog.d/11269.misc1
-rw-r--r--changelog.d/11270.misc1
-rw-r--r--changelog.d/11273.misc1
-rw-r--r--changelog.d/11276.bugfix1
-rw-r--r--changelog.d/11278.misc1
-rw-r--r--changelog.d/11282.misc1
-rw-r--r--changelog.d/11285.misc1
-rw-r--r--changelog.d/11286.doc1
-rwxr-xr-xdebian/build_virtualenv1
-rw-r--r--debian/changelog6
-rw-r--r--debian/control2
-rw-r--r--debian/matrix-synapse.service2
-rw-r--r--debian/test/.gitignore2
-rw-r--r--debian/test/provision.sh24
-rw-r--r--debian/test/stretch/Vagrantfile13
-rw-r--r--debian/test/xenial/Vagrantfile10
-rw-r--r--docs/SUMMARY.md1
-rw-r--r--docs/admin_api/rooms.md18
-rw-r--r--docs/admin_api/user_admin_api.md2
-rw-r--r--docs/modules/password_auth_provider_callbacks.md2
-rw-r--r--docs/openid.md38
-rw-r--r--docs/sample_config.yaml13
-rw-r--r--docs/systemd-with-workers/system/matrix-synapse-worker@.service2
-rw-r--r--docs/systemd-with-workers/system/matrix-synapse.service2
-rw-r--r--docs/usage/administration/admin_api/background_updates.md84
-rw-r--r--mypy.ini242
-rwxr-xr-xsetup.py8
-rw-r--r--synapse/api/filtering.py115
-rw-r--r--synapse/api/urls.py3
-rw-r--r--synapse/config/account_validity.py4
-rw-r--r--synapse/config/cas.py10
-rw-r--r--synapse/config/emailconfig.py8
-rw-r--r--synapse/config/oidc.py2
-rw-r--r--synapse/config/registration.py15
-rw-r--r--synapse/config/saml2.py5
-rw-r--r--synapse/config/server.py45
-rw-r--r--synapse/config/sso.py18
-rw-r--r--synapse/config/workers.py18
-rw-r--r--synapse/federation/federation_client.py80
-rw-r--r--synapse/federation/federation_server.py9
-rw-r--r--synapse/handlers/appservice.py93
-rw-r--r--synapse/handlers/auth.py4
-rw-r--r--synapse/handlers/devicemessage.py31
-rw-r--r--synapse/handlers/e2e_keys.py211
-rw-r--r--synapse/handlers/identity.py4
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/room.py59
-rw-r--r--synapse/handlers/search.py8
-rw-r--r--synapse/handlers/sync.py18
-rw-r--r--synapse/handlers/typing.py6
-rw-r--r--synapse/replication/tcp/handler.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py3
-rw-r--r--synapse/rest/admin/__init__.py6
-rw-r--r--synapse/rest/admin/background_updates.py107
-rw-r--r--synapse/rest/admin/rooms.py26
-rw-r--r--synapse/rest/client/receipts.py12
-rw-r--r--synapse/rest/client/relations.py4
-rw-r--r--synapse/rest/client/room.py12
-rw-r--r--synapse/rest/client/room_batch.py29
-rw-r--r--synapse/rest/client/sync.py6
-rw-r--r--synapse/rest/well_known.py3
-rw-r--r--synapse/server.py4
-rw-r--r--synapse/storage/background_updates.py65
-rw-r--r--synapse/storage/database.py4
-rw-r--r--synapse/storage/databases/main/appservice.py8
-rw-r--r--synapse/storage/databases/main/deviceinbox.py23
-rw-r--r--synapse/storage/databases/main/events_worker.py56
-rw-r--r--synapse/storage/databases/main/lock.py31
-rw-r--r--synapse/storage/databases/main/relations.py58
-rw-r--r--synapse/storage/databases/main/room.py7
-rw-r--r--synapse/storage/databases/main/stream.py86
-rw-r--r--synapse/storage/prepare_database.py23
-rw-r--r--sytest-blacklist3
-rw-r--r--tests/api/test_filtering.py107
-rw-r--r--tests/handlers/test_appservice.py55
-rw-r--r--tests/handlers/test_e2e_keys.py151
-rw-r--r--tests/handlers/test_sync.py5
-rw-r--r--tests/module_api/test_api.py1
-rw-r--r--tests/push/test_email.py2
-rw-r--r--tests/rest/admin/test_background_updates.py218
-rw-r--r--tests/rest/admin/test_room.py28
-rw-r--r--tests/rest/client/test_consent.py1
-rw-r--r--tests/rest/client/test_register.py1
-rw-r--r--tests/rest/client/test_rooms.py154
-rw-r--r--tests/rest/client/test_sync.py30
-rw-r--r--tests/storage/test_rollback_worker.py69
-rw-r--r--tests/storage/test_stream.py207
-rw-r--r--tests/util/caches/test_descriptors.py43
108 files changed, 2344 insertions, 574 deletions
diff --git a/changelog.d/11157.misc b/changelog.d/11157.misc
new file mode 100644
index 0000000000..75444c51d1
--- /dev/null
+++ b/changelog.d/11157.misc
@@ -0,0 +1 @@
+Only allow old Element/Riot Android clients to send read receipts without a request body. All other clients must include a request body as required by the specification. Contributed by @rogersheu.
diff --git a/changelog.d/11188.bugfix b/changelog.d/11188.bugfix
new file mode 100644
index 0000000000..0688743c00
--- /dev/null
+++ b/changelog.d/11188.bugfix
@@ -0,0 +1 @@
+Allow an empty list of `state_events_at_start` to be sent when using the [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) `/batch_send` endpoint and the author of the historical messages is already part of the current room state at the given `?prev_event_id`.
diff --git a/changelog.d/11207.bugfix b/changelog.d/11207.bugfix
new file mode 100644
index 0000000000..7e98d565a1
--- /dev/null
+++ b/changelog.d/11207.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper.
diff --git a/changelog.d/11210.feature b/changelog.d/11210.feature
new file mode 100644
index 0000000000..8f8e386415
--- /dev/null
+++ b/changelog.d/11210.feature
@@ -0,0 +1 @@
+Calculate a default value for `public_baseurl` based on `server_name`.
diff --git a/changelog.d/11228.feature b/changelog.d/11228.feature
new file mode 100644
index 0000000000..33c1756b50
--- /dev/null
+++ b/changelog.d/11228.feature
@@ -0,0 +1 @@
+Allow the admin [Delete Room API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#delete-room-api) to block a room without the need to join it.
diff --git a/changelog.d/11233.misc b/changelog.d/11233.misc
new file mode 100644
index 0000000000..fdf9e5642e
--- /dev/null
+++ b/changelog.d/11233.misc
@@ -0,0 +1 @@
+Add `twine` and `towncrier` as dev dependencies, as they're used by the release script.
diff --git a/changelog.d/11234.bugfix b/changelog.d/11234.bugfix
new file mode 100644
index 0000000000..c0c02a58f6
--- /dev/null
+++ b/changelog.d/11234.bugfix
@@ -0,0 +1 @@
+Fix long-standing bug where cross signing keys were not included in the response to `/r0/keys/query` the first time a remote user was queried.
diff --git a/changelog.d/11236.feature b/changelog.d/11236.feature
new file mode 100644
index 0000000000..e7aeee2aa6
--- /dev/null
+++ b/changelog.d/11236.feature
@@ -0,0 +1 @@
+Support filtering by relation senders & types per [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440).
diff --git a/changelog.d/11237.misc b/changelog.d/11237.misc
new file mode 100644
index 0000000000..b90efc6535
--- /dev/null
+++ b/changelog.d/11237.misc
@@ -0,0 +1 @@
+Allow `stream_writers.typing` config to be a list of one worker.
diff --git a/changelog.d/11239.misc b/changelog.d/11239.misc
new file mode 100644
index 0000000000..48a796bed0
--- /dev/null
+++ b/changelog.d/11239.misc
@@ -0,0 +1 @@
+Remove debugging statement in tests.
diff --git a/changelog.d/11240.bugfix b/changelog.d/11240.bugfix
new file mode 100644
index 0000000000..94d73f67e3
--- /dev/null
+++ b/changelog.d/11240.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection.
diff --git a/changelog.d/11242.misc b/changelog.d/11242.misc
new file mode 100644
index 0000000000..3a98259edf
--- /dev/null
+++ b/changelog.d/11242.misc
@@ -0,0 +1 @@
+Split out federated PDU retrieval function into a non-cached version.
diff --git a/changelog.d/11244.misc b/changelog.d/11244.misc
new file mode 100644
index 0000000000..c6e65df97f
--- /dev/null
+++ b/changelog.d/11244.misc
@@ -0,0 +1 @@
+Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers.
diff --git a/changelog.d/11246.misc b/changelog.d/11246.misc
new file mode 100644
index 0000000000..e5e912c1b0
--- /dev/null
+++ b/changelog.d/11246.misc
@@ -0,0 +1 @@
+Add an additional test for the `cachedList` method decorator.
diff --git a/changelog.d/11247.misc b/changelog.d/11247.misc
new file mode 100644
index 0000000000..5ce701560e
--- /dev/null
+++ b/changelog.d/11247.misc
@@ -0,0 +1 @@
+Clean up code relating to to-device messages and sending ephemeral events to application services.
\ No newline at end of file
diff --git a/changelog.d/11253.misc b/changelog.d/11253.misc
new file mode 100644
index 0000000000..71c55a2751
--- /dev/null
+++ b/changelog.d/11253.misc
@@ -0,0 +1 @@
+Make minor correction to the type of `auth_checkers` callbacks.
diff --git a/changelog.d/11255.bugfix b/changelog.d/11255.bugfix
new file mode 100644
index 0000000000..ce72592624
--- /dev/null
+++ b/changelog.d/11255.bugfix
@@ -0,0 +1 @@
+Fix rolling back Synapse version when using workers.
diff --git a/changelog.d/11257.doc b/changelog.d/11257.doc
new file mode 100644
index 0000000000..1205be2add
--- /dev/null
+++ b/changelog.d/11257.doc
@@ -0,0 +1 @@
+Add documentation for using LemonLDAP as an OpenID Connect Identity Provider. Contributed by @l00ptr.
diff --git a/changelog.d/11262.bugfix b/changelog.d/11262.bugfix
new file mode 100644
index 0000000000..768fbb8973
--- /dev/null
+++ b/changelog.d/11262.bugfix
@@ -0,0 +1 @@
+Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1.
diff --git a/changelog.d/11263.feature b/changelog.d/11263.feature
new file mode 100644
index 0000000000..831e76ec9f
--- /dev/null
+++ b/changelog.d/11263.feature
@@ -0,0 +1 @@
+Add some background update admin APIs.
diff --git a/changelog.d/11269.misc b/changelog.d/11269.misc
new file mode 100644
index 0000000000..a2149c2d2d
--- /dev/null
+++ b/changelog.d/11269.misc
@@ -0,0 +1 @@
+Clean up trivial aspects of the Debian package build tooling.
diff --git a/changelog.d/11270.misc b/changelog.d/11270.misc
new file mode 100644
index 0000000000..e2181b9b2a
--- /dev/null
+++ b/changelog.d/11270.misc
@@ -0,0 +1 @@
+Blacklist new SyTest that checks that key uploads are valid pending the validation being implemented in Synapse.
diff --git a/changelog.d/11273.misc b/changelog.d/11273.misc
new file mode 100644
index 0000000000..a2149c2d2d
--- /dev/null
+++ b/changelog.d/11273.misc
@@ -0,0 +1 @@
+Clean up trivial aspects of the Debian package build tooling.
diff --git a/changelog.d/11276.bugfix b/changelog.d/11276.bugfix
new file mode 100644
index 0000000000..ce72592624
--- /dev/null
+++ b/changelog.d/11276.bugfix
@@ -0,0 +1 @@
+Fix rolling back Synapse version when using workers.
diff --git a/changelog.d/11278.misc b/changelog.d/11278.misc
new file mode 100644
index 0000000000..9b014bc8b4
--- /dev/null
+++ b/changelog.d/11278.misc
@@ -0,0 +1 @@
+Fix a small typo in the error response when a relation type other than 'm.annotation' is passed to `GET /rooms/{room_id}/aggregations/{event_id}`.
\ No newline at end of file
diff --git a/changelog.d/11282.misc b/changelog.d/11282.misc
new file mode 100644
index 0000000000..4720519cbc
--- /dev/null
+++ b/changelog.d/11282.misc
@@ -0,0 +1 @@
+Require all files in synapse/ and tests/ to pass mypy unless specifically excluded.
diff --git a/changelog.d/11285.misc b/changelog.d/11285.misc
new file mode 100644
index 0000000000..4720519cbc
--- /dev/null
+++ b/changelog.d/11285.misc
@@ -0,0 +1 @@
+Require all files in synapse/ and tests/ to pass mypy unless specifically excluded.
diff --git a/changelog.d/11286.doc b/changelog.d/11286.doc
new file mode 100644
index 0000000000..890d7b4ee4
--- /dev/null
+++ b/changelog.d/11286.doc
@@ -0,0 +1 @@
+Fix typo in the word `available` and fix HTTP method (should be `GET`) for the `username_available` admin API. Contributed by Stanislav Motylkov.
diff --git a/debian/build_virtualenv b/debian/build_virtualenv
index 3097371d59..e691163619 100755
--- a/debian/build_virtualenv
+++ b/debian/build_virtualenv
@@ -40,6 +40,7 @@ dh_virtualenv \
     --upgrade-pip \
     --preinstall="lxml" \
     --preinstall="mock" \
+    --preinstall="wheel" \
     --extra-pip-arg="--no-cache-dir" \
     --extra-pip-arg="--compile" \
     --extras="all,systemd,test"
diff --git a/debian/changelog b/debian/changelog
index 14748f8c25..74a98f0866 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,8 +1,12 @@
 matrix-synapse-py3 (1.47.0+nmu1) UNRELEASED; urgency=medium
 
   * Update scripts to pass Shellcheck lints.
+  * Remove unused Vagrant scripts from debian/ directory.
+  * Allow building Debian packages for any architecture, not just amd64.
+  * Preinstall the "wheel" package when building virtualenvs.
+  * Do not error if /etc/default/matrix-synapse is missing.
 
- -- root <root@cae79a6e79d7>  Fri, 22 Oct 2021 22:20:31 +0000
+ -- Dan Callahan <danc@element.io>  Fri, 22 Oct 2021 22:20:31 +0000
 
 matrix-synapse-py3 (1.46.0) stable; urgency=medium
 
diff --git a/debian/control b/debian/control
index 763fabd6f6..412a9e1d4c 100644
--- a/debian/control
+++ b/debian/control
@@ -19,7 +19,7 @@ Standards-Version: 3.9.8
 Homepage: https://github.com/matrix-org/synapse
 
 Package: matrix-synapse-py3
-Architecture: amd64
+Architecture: any
 Provides: matrix-synapse
 Conflicts:
  matrix-synapse (<< 0.34.0.1-0matrix2),
diff --git a/debian/matrix-synapse.service b/debian/matrix-synapse.service
index 553babf549..bde1c6cb9f 100644
--- a/debian/matrix-synapse.service
+++ b/debian/matrix-synapse.service
@@ -5,7 +5,7 @@ Description=Synapse Matrix homeserver
 Type=notify
 User=matrix-synapse
 WorkingDirectory=/var/lib/matrix-synapse
-EnvironmentFile=/etc/default/matrix-synapse
+EnvironmentFile=-/etc/default/matrix-synapse
 ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys
 ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/
 ExecReload=/bin/kill -HUP $MAINPID
diff --git a/debian/test/.gitignore b/debian/test/.gitignore
deleted file mode 100644
index 95eda73fcc..0000000000
--- a/debian/test/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-.vagrant
-*.log
diff --git a/debian/test/provision.sh b/debian/test/provision.sh
deleted file mode 100644
index 55d7b8e03a..0000000000
--- a/debian/test/provision.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/bin/bash
-#
-# provisioning script for vagrant boxes for testing the matrix-synapse debs.
-#
-# Will install the most recent matrix-synapse-py3 deb for this platform from
-# the /debs directory.
-
-set -e
-
-apt-get update
-apt-get install -y lsb-release
-
-deb=$(find /debs -name "matrix-synapse-py3_*+$(lsb_release -cs)*.deb" | sort | tail -n1)
-
-debconf-set-selections <<EOF
-matrix-synapse matrix-synapse/report-stats boolean false
-matrix-synapse matrix-synapse/server-name string localhost:18448
-EOF
-
-dpkg -i "$deb"
-
-sed -i -e 's/port: 8448$/port: 18448/; s/port: 8008$/port: 18008' /etc/matrix-synapse/homeserver.yaml
-echo 'registration_shared_secret: secret' >> /etc/matrix-synapse/homeserver.yaml
-systemctl restart matrix-synapse
diff --git a/debian/test/stretch/Vagrantfile b/debian/test/stretch/Vagrantfile
deleted file mode 100644
index d8eff6fe11..0000000000
--- a/debian/test/stretch/Vagrantfile
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- mode: ruby -*-
-# vi: set ft=ruby :
-
-ver = `cd ../../..; dpkg-parsechangelog -S Version`.strip()
-
-Vagrant.configure("2") do |config|
-  config.vm.box = "debian/stretch64"
-
-  config.vm.synced_folder ".", "/vagrant", disabled: true
-  config.vm.synced_folder "../../../../debs", "/debs", type: "nfs"
-
-  config.vm.provision "shell", path: "../provision.sh"
-end
diff --git a/debian/test/xenial/Vagrantfile b/debian/test/xenial/Vagrantfile
deleted file mode 100644
index 189236da17..0000000000
--- a/debian/test/xenial/Vagrantfile
+++ /dev/null
@@ -1,10 +0,0 @@
-# -*- mode: ruby -*-
-# vi: set ft=ruby :
-
-Vagrant.configure("2") do |config|
-  config.vm.box = "ubuntu/xenial64"
-
-  config.vm.synced_folder ".", "/vagrant", disabled: true
-  config.vm.synced_folder "../../../../debs", "/debs"
-  config.vm.provision "shell", path: "../provision.sh"
-end
diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md
index 35412ea92c..04320ab07b 100644
--- a/docs/SUMMARY.md
+++ b/docs/SUMMARY.md
@@ -51,6 +51,7 @@
   - [Administration](usage/administration/README.md)
     - [Admin API](usage/administration/admin_api/README.md)
       - [Account Validity](admin_api/account_validity.md)
+      - [Background Updates](usage/administration/admin_api/background_updates.md)
       - [Delete Group](admin_api/delete_group.md)
       - [Event Reports](admin_api/event_reports.md)
       - [Media](admin_api/media_admin_api.md)
diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md
index 1fc3cc3c42..41a4961d00 100644
--- a/docs/admin_api/rooms.md
+++ b/docs/admin_api/rooms.md
@@ -385,7 +385,7 @@ A response body like the following is returned:
 
 # Delete Room API
 
-The Delete Room admin API allows server admins to remove rooms from server
+The Delete Room admin API allows server admins to remove rooms from the server
 and block these rooms.
 
 Shuts down a room. Moves all local users and room aliases automatically to a
@@ -396,13 +396,17 @@ The new room will be created with the user specified by the `new_room_user_id` p
 as room administrator and will contain a message explaining what happened. Users invited
 to the new room will have power level `-10` by default, and thus be unable to speak.
 
-If `block` is `True` it prevents new joins to the old room.
+If `block` is `true`, users will be prevented from joining the old room.
+This option can also be used to pre-emptively block a room, even if it's unknown
+to this homeserver. In this case, the room will be blocked, and no further action
+will be taken. If `block` is `false`, attempting to delete an unknown room is
+invalid and will be rejected as a bad request.
 
 This API will remove all trace of the old room from your database after removing
 all local users. If `purge` is `true` (the default), all traces of the old room will
 be removed from your database after removing all local users. If you do not want
 this to happen, set `purge` to `false`.
-Depending on the amount of history being purged a call to the API may take
+Depending on the amount of history being purged, a call to the API may take
 several minutes or longer.
 
 The local server will only have the power to move local user and room aliases to
@@ -464,8 +468,9 @@ The following JSON body parameters are available:
               `new_room_user_id` in the new room. Ideally this will clearly convey why the
                original room was shut down. Defaults to `Sharing illegal content on this server
                is not permitted and rooms in violation will be blocked.`
-* `block` - Optional. If set to `true`, this room will be added to a blocking list, preventing
-            future attempts to join the room. Defaults to `false`.
+* `block` - Optional. If set to `true`, this room will be added to a blocking list,
+            preventing future attempts to join the room. Rooms can be blocked
+            even if they're not yet known to the homeserver. Defaults to `false`.
 * `purge` - Optional. If set to `true`, it will remove all traces of the room from your database.
             Defaults to `true`.
 * `force_purge` - Optional, and ignored unless `purge` is `true`. If set to `true`, it
@@ -483,7 +488,8 @@ The following fields are returned in the JSON response body:
 * `failed_to_kick_users` - An array of users (`user_id`) that that were not kicked.
 * `local_aliases` - An array of strings representing the local aliases that were migrated from
                     the old room to the new.
-* `new_room_id` - A string representing the room ID of the new room.
+* `new_room_id` - A string representing the room ID of the new room, or `null` if
+                  no such room was created.
 
 
 ## Undoing room deletions
diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md
index f03539c9f0..16ec33b3c1 100644
--- a/docs/admin_api/user_admin_api.md
+++ b/docs/admin_api/user_admin_api.md
@@ -1107,7 +1107,7 @@ This endpoint will work even if registration is disabled on the server, unlike
 The API is:
 
 ```
-POST /_synapse/admin/v1/username_availabile?username=$localpart
+GET /_synapse/admin/v1/username_available?username=$localpart
 ```
 
 The request and response format is the same as the [/_matrix/client/r0/register/available](https://matrix.org/docs/spec/client_server/r0.6.0#get-matrix-client-r0-register-available) API.
diff --git a/docs/modules/password_auth_provider_callbacks.md b/docs/modules/password_auth_provider_callbacks.md
index 0de60b128a..e53abf6409 100644
--- a/docs/modules/password_auth_provider_callbacks.md
+++ b/docs/modules/password_auth_provider_callbacks.md
@@ -11,7 +11,7 @@ registered by using the Module API's `register_password_auth_provider_callbacks`
 _First introduced in Synapse v1.46.0_
 
 ```python
- auth_checkers: Dict[Tuple[str,Tuple], Callable]
+auth_checkers: Dict[Tuple[str, Tuple[str, ...]], Callable]
 ```
 
 A dict mapping from tuples of a login type identifier (such as `m.login.password`) and a
diff --git a/docs/openid.md b/docs/openid.md
index 4a340ef107..c74e8bda60 100644
--- a/docs/openid.md
+++ b/docs/openid.md
@@ -22,6 +22,7 @@ such as [Github][github-idp].
 [google-idp]: https://developers.google.com/identity/protocols/oauth2/openid-connect
 [auth0]: https://auth0.com/
 [authentik]: https://goauthentik.io/
+[lemonldap]: https://lemonldap-ng.org/
 [okta]: https://www.okta.com/
 [dex-idp]: https://github.com/dexidp/dex
 [keycloak-idp]: https://www.keycloak.org/docs/latest/server_admin/#sso-protocols
@@ -243,6 +244,43 @@ oidc_providers:
         display_name_template: "{{ user.preferred_username|capitalize }}" # TO BE FILLED: If your users have names in Authentik and you want those in Synapse, this should be replaced with user.name|capitalize.
 ```
 
+### LemonLDAP
+
+[LemonLDAP::NG][lemonldap] is an open-source IdP solution.
+
+1. Create an OpenID Connect Relying Parties in LemonLDAP::NG
+2. The parameters are:
+- Client ID under the basic menu of the new Relying Parties (`Options > Basic >
+  Client ID`)
+- Client secret (`Options > Basic > Client secret`)
+- JWT Algorithm: RS256 within the security menu of the new Relying Parties
+  (`Options > Security > ID Token signature algorithm` and `Options > Security >
+  Access Token signature algorithm`)
+- Scopes: OpenID, Email and Profile
+- Allowed redirection addresses for login (`Options > Basic > Allowed
+  redirection addresses for login` ) :
+  `[synapse public baseurl]/_synapse/client/oidc/callback`
+
+Synapse config:
+```yaml
+oidc_providers:
+  - idp_id: lemonldap
+    idp_name: lemonldap
+    discover: true
+    issuer: "https://auth.example.org/" # TO BE FILLED: replace with your domain
+    client_id: "your client id" # TO BE FILLED
+    client_secret: "your client secret" # TO BE FILLED
+    scopes:
+      - "openid"
+      - "profile"
+      - "email"
+    user_mapping_provider:
+      config:
+        localpart_template: "{{ user.preferred_username }}}"
+        # TO BE FILLED: If your users have names in LemonLDAP::NG and you want those in Synapse, this should be replaced with user.name|capitalize or any valid filter.
+        display_name_template: "{{ user.preferred_username|capitalize }}"
+```
+
 ### GitHub
 
 [GitHub][github-idp] is a bit special as it is not an OpenID Connect compliant provider, but
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index c3a4148f74..d48c08f1d9 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -91,6 +91,8 @@ pid_file: DATADIR/homeserver.pid
 # Otherwise, it should be the URL to reach Synapse's client HTTP listener (see
 # 'listeners' below).
 #
+# Defaults to 'https://<server_name>/'.
+#
 #public_baseurl: https://example.com/
 
 # Uncomment the following to tell other servers to send federation traffic on
@@ -1265,7 +1267,7 @@ oembed:
 # in on this server.
 #
 # (By default, no suggestion is made, so it is left up to the client.
-# This setting is ignored unless public_baseurl is also set.)
+# This setting is ignored unless public_baseurl is also explicitly set.)
 #
 #default_identity_server: https://matrix.org
 
@@ -1290,8 +1292,6 @@ oembed:
 # by the Matrix Identity Service API specification:
 # https://matrix.org/docs/spec/identity_service/latest
 #
-# If a delegate is specified, the config option public_baseurl must also be filled out.
-#
 account_threepid_delegates:
     #email: https://example.com     # Delegate email sending to example.com
     #msisdn: http://localhost:8090  # Delegate SMS sending to this local process
@@ -1981,11 +1981,10 @@ sso:
     # phishing attacks from evil.site. To avoid this, include a slash after the
     # hostname: "https://my.client/".
     #
-    # If public_baseurl is set, then the login fallback page (used by clients
-    # that don't natively support the required login flows) is whitelisted in
-    # addition to any URLs in this list.
+    # The login fallback page (used by clients that don't natively support the
+    # required login flows) is whitelisted in addition to any URLs in this list.
     #
-    # By default, this list is empty.
+    # By default, this list contains only the login fallback page.
     #
     #client_whitelist:
     #  - https://riot.im/develop
diff --git a/docs/systemd-with-workers/system/matrix-synapse-worker@.service b/docs/systemd-with-workers/system/matrix-synapse-worker@.service
index d164e8ce1f..8f5c44c9d4 100644
--- a/docs/systemd-with-workers/system/matrix-synapse-worker@.service
+++ b/docs/systemd-with-workers/system/matrix-synapse-worker@.service
@@ -15,7 +15,7 @@ Type=notify
 NotifyAccess=main
 User=matrix-synapse
 WorkingDirectory=/var/lib/matrix-synapse
-EnvironmentFile=/etc/default/matrix-synapse
+EnvironmentFile=-/etc/default/matrix-synapse
 ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.generic_worker --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --config-path=/etc/matrix-synapse/workers/%i.yaml
 ExecReload=/bin/kill -HUP $MAINPID
 Restart=always
diff --git a/docs/systemd-with-workers/system/matrix-synapse.service b/docs/systemd-with-workers/system/matrix-synapse.service
index f6b6dfd3ce..0c73fb55fb 100644
--- a/docs/systemd-with-workers/system/matrix-synapse.service
+++ b/docs/systemd-with-workers/system/matrix-synapse.service
@@ -10,7 +10,7 @@ Type=notify
 NotifyAccess=main
 User=matrix-synapse
 WorkingDirectory=/var/lib/matrix-synapse
-EnvironmentFile=/etc/default/matrix-synapse
+EnvironmentFile=-/etc/default/matrix-synapse
 ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys
 ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/
 ExecReload=/bin/kill -HUP $MAINPID
diff --git a/docs/usage/administration/admin_api/background_updates.md b/docs/usage/administration/admin_api/background_updates.md
new file mode 100644
index 0000000000..b36d7fe398
--- /dev/null
+++ b/docs/usage/administration/admin_api/background_updates.md
@@ -0,0 +1,84 @@
+# Background Updates API
+
+This API allows a server administrator to manage the background updates being
+run against the database.
+
+## Status
+
+This API gets the current status of the background updates.
+
+
+The API is:
+
+```
+GET /_synapse/admin/v1/background_updates/status
+```
+
+Returning:
+
+```json
+{
+    "enabled": true,
+    "current_updates": {
+        "<db_name>": {
+            "name": "<background_update_name>",
+            "total_item_count": 50,
+            "total_duration_ms": 10000.0,
+            "average_items_per_ms": 2.2,
+        },
+    }
+}
+```
+
+`enabled` whether the background updates are enabled or disabled.
+
+`db_name` the database name (usually Synapse is configured with a single database named 'master').
+
+For each update:
+
+`name` the name of the update.
+`total_item_count` total number of "items" processed (the meaning of 'items' depends on the update in question).
+`total_duration_ms` how long the background process has been running, not including time spent sleeping.
+`average_items_per_ms` how many items are processed per millisecond based on an exponential average.
+
+
+
+## Enabled
+
+This API allow pausing background updates.
+
+Background updates should *not* be paused for significant periods of time, as
+this can affect the performance of Synapse.
+
+*Note*: This won't persist over restarts.
+
+*Note*: This won't cancel any update query that is currently running. This is
+usually fine since most queries are short lived, except for `CREATE INDEX`
+background updates which won't be cancelled once started.
+
+
+The API is:
+
+```
+POST /_synapse/admin/v1/background_updates/enabled
+```
+
+with the following body:
+
+```json
+{
+    "enabled": false
+}
+```
+
+`enabled` sets whether the background updates are enabled or disabled.
+
+The API returns the `enabled` param.
+
+```json
+{
+    "enabled": false
+}
+```
+
+There is also a `GET` version which returns the `enabled` state.
diff --git a/mypy.ini b/mypy.ini
index 600402a5d3..1752b82bc5 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -10,82 +10,173 @@ warn_unreachable = True
 local_partial_types = True
 no_implicit_optional = True
 
-# To find all folders that pass mypy you run:
-#
-#   find synapse/* -type d -not -name __pycache__ -exec bash -c "mypy '{}' > /dev/null"  \; -print
-
 files =
   scripts-dev/sign_json,
-  synapse/__init__.py,
-  synapse/api,
-  synapse/appservice,
-  synapse/config,
-  synapse/crypto,
-  synapse/event_auth.py,
-  synapse/events,
-  synapse/federation,
-  synapse/groups,
-  synapse/handlers,
-  synapse/http,
-  synapse/logging,
-  synapse/metrics,
-  synapse/module_api,
-  synapse/notifier.py,
-  synapse/push,
-  synapse/replication,
-  synapse/rest,
-  synapse/server.py,
-  synapse/server_notices,
-  synapse/spam_checker_api,
-  synapse/state,
-  synapse/storage/__init__.py,
-  synapse/storage/_base.py,
-  synapse/storage/background_updates.py,
-  synapse/storage/databases/main/appservice.py,
-  synapse/storage/databases/main/client_ips.py,
-  synapse/storage/databases/main/events.py,
-  synapse/storage/databases/main/keys.py,
-  synapse/storage/databases/main/pusher.py,
-  synapse/storage/databases/main/registration.py,
-  synapse/storage/databases/main/relations.py,
-  synapse/storage/databases/main/session.py,
-  synapse/storage/databases/main/stream.py,
-  synapse/storage/databases/main/ui_auth.py,
-  synapse/storage/databases/state,
-  synapse/storage/database.py,
-  synapse/storage/engines,
-  synapse/storage/keys.py,
-  synapse/storage/persist_events.py,
-  synapse/storage/prepare_database.py,
-  synapse/storage/purge_events.py,
-  synapse/storage/push_rule.py,
-  synapse/storage/relations.py,
-  synapse/storage/roommember.py,
-  synapse/storage/state.py,
-  synapse/storage/types.py,
-  synapse/storage/util,
-  synapse/streams,
-  synapse/types.py,
-  synapse/util,
-  synapse/visibility.py,
-  tests/replication,
-  tests/test_event_auth.py,
-  tests/test_utils,
-  tests/handlers/test_password_providers.py,
-  tests/handlers/test_room.py,
-  tests/handlers/test_room_summary.py,
-  tests/handlers/test_send_email.py,
-  tests/handlers/test_sync.py,
-  tests/handlers/test_user_directory.py,
-  tests/rest/client/test_login.py,
-  tests/rest/client/test_auth.py,
-  tests/rest/client/test_relations.py,
-  tests/rest/media/v1/test_filepath.py,
-  tests/rest/media/v1/test_oembed.py,
-  tests/storage/test_state.py,
-  tests/storage/test_user_directory.py,
-  tests/util/test_itertools.py,
-  tests/util/test_stream_change_cache.py
+  setup.py,
+  synapse/,
+  tests/
+
+# Note: Better exclusion syntax coming in mypy > 0.910
+# https://github.com/python/mypy/pull/11329
+#
+# For now, set the (?x) flag enable "verbose" regexes
+# https://docs.python.org/3/library/re.html#re.X
+exclude = (?x)
+  ^(
+   |synapse/_scripts/register_new_matrix_user.py
+   |synapse/_scripts/review_recent_signups.py
+   |synapse/app/__init__.py
+   |synapse/app/_base.py
+   |synapse/app/admin_cmd.py
+   |synapse/app/appservice.py
+   |synapse/app/client_reader.py
+   |synapse/app/event_creator.py
+   |synapse/app/federation_reader.py
+   |synapse/app/federation_sender.py
+   |synapse/app/frontend_proxy.py
+   |synapse/app/generic_worker.py
+   |synapse/app/homeserver.py
+   |synapse/app/media_repository.py
+   |synapse/app/phone_stats_home.py
+   |synapse/app/pusher.py
+   |synapse/app/synchrotron.py
+   |synapse/app/user_dir.py
+   |synapse/storage/databases/__init__.py
+   |synapse/storage/databases/main/__init__.py
+   |synapse/storage/databases/main/account_data.py
+   |synapse/storage/databases/main/cache.py
+   |synapse/storage/databases/main/censor_events.py
+   |synapse/storage/databases/main/deviceinbox.py
+   |synapse/storage/databases/main/devices.py
+   |synapse/storage/databases/main/directory.py
+   |synapse/storage/databases/main/e2e_room_keys.py
+   |synapse/storage/databases/main/end_to_end_keys.py
+   |synapse/storage/databases/main/event_federation.py
+   |synapse/storage/databases/main/event_push_actions.py
+   |synapse/storage/databases/main/events_bg_updates.py
+   |synapse/storage/databases/main/events_forward_extremities.py
+   |synapse/storage/databases/main/events_worker.py
+   |synapse/storage/databases/main/filtering.py
+   |synapse/storage/databases/main/group_server.py
+   |synapse/storage/databases/main/lock.py
+   |synapse/storage/databases/main/media_repository.py
+   |synapse/storage/databases/main/metrics.py
+   |synapse/storage/databases/main/monthly_active_users.py
+   |synapse/storage/databases/main/openid.py
+   |synapse/storage/databases/main/presence.py
+   |synapse/storage/databases/main/profile.py
+   |synapse/storage/databases/main/purge_events.py
+   |synapse/storage/databases/main/push_rule.py
+   |synapse/storage/databases/main/receipts.py
+   |synapse/storage/databases/main/rejections.py
+   |synapse/storage/databases/main/room.py
+   |synapse/storage/databases/main/room_batch.py
+   |synapse/storage/databases/main/roommember.py
+   |synapse/storage/databases/main/search.py
+   |synapse/storage/databases/main/signatures.py
+   |synapse/storage/databases/main/state.py
+   |synapse/storage/databases/main/state_deltas.py
+   |synapse/storage/databases/main/stats.py
+   |synapse/storage/databases/main/tags.py
+   |synapse/storage/databases/main/transactions.py
+   |synapse/storage/databases/main/user_directory.py
+   |synapse/storage/databases/main/user_erasure_store.py
+   |synapse/storage/schema/
+
+   |tests/api/test_auth.py
+   |tests/api/test_ratelimiting.py
+   |tests/app/test_openid_listener.py
+   |tests/appservice/test_scheduler.py
+   |tests/config/test_cache.py
+   |tests/config/test_tls.py
+   |tests/crypto/test_keyring.py
+   |tests/events/test_presence_router.py
+   |tests/events/test_utils.py
+   |tests/federation/test_federation_catch_up.py
+   |tests/federation/test_federation_sender.py
+   |tests/federation/test_federation_server.py
+   |tests/federation/transport/test_knocking.py
+   |tests/federation/transport/test_server.py
+   |tests/handlers/test_cas.py
+   |tests/handlers/test_directory.py
+   |tests/handlers/test_e2e_keys.py
+   |tests/handlers/test_federation.py
+   |tests/handlers/test_oidc.py
+   |tests/handlers/test_presence.py
+   |tests/handlers/test_profile.py
+   |tests/handlers/test_saml.py
+   |tests/handlers/test_typing.py
+   |tests/http/federation/test_matrix_federation_agent.py
+   |tests/http/federation/test_srv_resolver.py
+   |tests/http/test_fedclient.py
+   |tests/http/test_proxyagent.py
+   |tests/http/test_servlet.py
+   |tests/http/test_site.py
+   |tests/logging/__init__.py
+   |tests/logging/test_terse_json.py
+   |tests/module_api/test_api.py
+   |tests/push/test_email.py
+   |tests/push/test_http.py
+   |tests/push/test_presentable_names.py
+   |tests/push/test_push_rule_evaluator.py
+   |tests/rest/admin/test_admin.py
+   |tests/rest/admin/test_device.py
+   |tests/rest/admin/test_media.py
+   |tests/rest/admin/test_server_notice.py
+   |tests/rest/admin/test_user.py
+   |tests/rest/admin/test_username_available.py
+   |tests/rest/client/test_account.py
+   |tests/rest/client/test_events.py
+   |tests/rest/client/test_filter.py
+   |tests/rest/client/test_groups.py
+   |tests/rest/client/test_register.py
+   |tests/rest/client/test_report_event.py
+   |tests/rest/client/test_rooms.py
+   |tests/rest/client/test_third_party_rules.py
+   |tests/rest/client/test_transactions.py
+   |tests/rest/client/test_typing.py
+   |tests/rest/client/utils.py
+   |tests/rest/key/v2/test_remote_key_resource.py
+   |tests/rest/media/v1/test_base.py
+   |tests/rest/media/v1/test_media_storage.py
+   |tests/rest/media/v1/test_url_preview.py
+   |tests/scripts/test_new_matrix_user.py
+   |tests/server.py
+   |tests/server_notices/test_resource_limits_server_notices.py
+   |tests/state/test_v2.py
+   |tests/storage/test_account_data.py
+   |tests/storage/test_appservice.py
+   |tests/storage/test_background_update.py
+   |tests/storage/test_base.py
+   |tests/storage/test_client_ips.py
+   |tests/storage/test_database.py
+   |tests/storage/test_event_federation.py
+   |tests/storage/test_id_generators.py
+   |tests/storage/test_roommember.py
+   |tests/test_metrics.py
+   |tests/test_phone_home.py
+   |tests/test_server.py
+   |tests/test_state.py
+   |tests/test_terms_auth.py
+   |tests/test_visibility.py
+   |tests/unittest.py
+   |tests/util/caches/test_cached_call.py
+   |tests/util/caches/test_deferred_cache.py
+   |tests/util/caches/test_descriptors.py
+   |tests/util/caches/test_response_cache.py
+   |tests/util/caches/test_ttlcache.py
+   |tests/util/test_async_helpers.py
+   |tests/util/test_batching_queue.py
+   |tests/util/test_dict_cache.py
+   |tests/util/test_expiring_cache.py
+   |tests/util/test_file_consumer.py
+   |tests/util/test_linearizer.py
+   |tests/util/test_logcontext.py
+   |tests/util/test_lrucache.py
+   |tests/util/test_rwlock.py
+   |tests/util/test_wheel_timer.py
+   |tests/utils.py
+   )$
 
 [mypy-synapse.api.*]
 disallow_untyped_defs = True
@@ -272,6 +363,9 @@ ignore_missing_imports = True
 [mypy-opentracing]
 ignore_missing_imports = True
 
+[mypy-parameterized.*]
+ignore_missing_imports = True
+
 [mypy-phonenumbers.*]
 ignore_missing_imports = True
 
diff --git a/setup.py b/setup.py
index 220084a49d..5d602db240 100755
--- a/setup.py
+++ b/setup.py
@@ -17,6 +17,7 @@
 # limitations under the License.
 import glob
 import os
+from typing import Any, Dict
 
 from setuptools import Command, find_packages, setup
 
@@ -49,8 +50,6 @@ here = os.path.abspath(os.path.dirname(__file__))
 # [1]: http://tox.readthedocs.io/en/2.5.0/example/basic.html#integration-with-setup-py-test-command
 # [2]: https://pypi.python.org/pypi/setuptools_trial
 class TestCommand(Command):
-    user_options = []
-
     def initialize_options(self):
         pass
 
@@ -75,7 +74,7 @@ def read_file(path_segments):
 
 def exec_file(path_segments):
     """Execute a single python file to get the variables defined in it"""
-    result = {}
+    result: Dict[str, Any] = {}
     code = read_file(path_segments)
     exec(code, result)
     return result
@@ -132,6 +131,9 @@ CONDITIONAL_REQUIREMENTS["dev"] = (
         "GitPython==3.1.14",
         "commonmark==0.9.1",
         "pygithub==1.55",
+        # The following are executed as commands by the release script.
+        "twine",
+        "towncrier",
     ]
 )
 
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 4b0a9b2974..13dd6ce248 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -1,7 +1,7 @@
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
 # Copyright 2018-2019 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -86,6 +86,9 @@ ROOM_EVENT_FILTER_SCHEMA = {
         # cf https://github.com/matrix-org/matrix-doc/pull/2326
         "org.matrix.labels": {"type": "array", "items": {"type": "string"}},
         "org.matrix.not_labels": {"type": "array", "items": {"type": "string"}},
+        # MSC3440, filtering by event relations.
+        "io.element.relation_senders": {"type": "array", "items": {"type": "string"}},
+        "io.element.relation_types": {"type": "array", "items": {"type": "string"}},
     },
 }
 
@@ -146,14 +149,16 @@ def matrix_user_id_validator(user_id_str: str) -> UserID:
 
 class Filtering:
     def __init__(self, hs: "HomeServer"):
-        super().__init__()
+        self._hs = hs
         self.store = hs.get_datastore()
 
+        self.DEFAULT_FILTER_COLLECTION = FilterCollection(hs, {})
+
     async def get_user_filter(
         self, user_localpart: str, filter_id: Union[int, str]
     ) -> "FilterCollection":
         result = await self.store.get_user_filter(user_localpart, filter_id)
-        return FilterCollection(result)
+        return FilterCollection(self._hs, result)
 
     def add_user_filter(
         self, user_localpart: str, user_filter: JsonDict
@@ -191,21 +196,22 @@ FilterEvent = TypeVar("FilterEvent", EventBase, UserPresenceState, JsonDict)
 
 
 class FilterCollection:
-    def __init__(self, filter_json: JsonDict):
+    def __init__(self, hs: "HomeServer", filter_json: JsonDict):
         self._filter_json = filter_json
 
         room_filter_json = self._filter_json.get("room", {})
 
         self._room_filter = Filter(
-            {k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms")}
+            hs,
+            {k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms")},
         )
 
-        self._room_timeline_filter = Filter(room_filter_json.get("timeline", {}))
-        self._room_state_filter = Filter(room_filter_json.get("state", {}))
-        self._room_ephemeral_filter = Filter(room_filter_json.get("ephemeral", {}))
-        self._room_account_data = Filter(room_filter_json.get("account_data", {}))
-        self._presence_filter = Filter(filter_json.get("presence", {}))
-        self._account_data = Filter(filter_json.get("account_data", {}))
+        self._room_timeline_filter = Filter(hs, room_filter_json.get("timeline", {}))
+        self._room_state_filter = Filter(hs, room_filter_json.get("state", {}))
+        self._room_ephemeral_filter = Filter(hs, room_filter_json.get("ephemeral", {}))
+        self._room_account_data = Filter(hs, room_filter_json.get("account_data", {}))
+        self._presence_filter = Filter(hs, filter_json.get("presence", {}))
+        self._account_data = Filter(hs, filter_json.get("account_data", {}))
 
         self.include_leave = filter_json.get("room", {}).get("include_leave", False)
         self.event_fields = filter_json.get("event_fields", [])
@@ -232,25 +238,37 @@ class FilterCollection:
     def include_redundant_members(self) -> bool:
         return self._room_state_filter.include_redundant_members
 
-    def filter_presence(
+    async def filter_presence(
         self, events: Iterable[UserPresenceState]
     ) -> List[UserPresenceState]:
-        return self._presence_filter.filter(events)
+        return await self._presence_filter.filter(events)
 
-    def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
-        return self._account_data.filter(events)
+    async def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
+        return await self._account_data.filter(events)
 
-    def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]:
-        return self._room_state_filter.filter(self._room_filter.filter(events))
+    async def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]:
+        return await self._room_state_filter.filter(
+            await self._room_filter.filter(events)
+        )
 
-    def filter_room_timeline(self, events: Iterable[EventBase]) -> List[EventBase]:
-        return self._room_timeline_filter.filter(self._room_filter.filter(events))
+    async def filter_room_timeline(
+        self, events: Iterable[EventBase]
+    ) -> List[EventBase]:
+        return await self._room_timeline_filter.filter(
+            await self._room_filter.filter(events)
+        )
 
-    def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDict]:
-        return self._room_ephemeral_filter.filter(self._room_filter.filter(events))
+    async def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDict]:
+        return await self._room_ephemeral_filter.filter(
+            await self._room_filter.filter(events)
+        )
 
-    def filter_room_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]:
-        return self._room_account_data.filter(self._room_filter.filter(events))
+    async def filter_room_account_data(
+        self, events: Iterable[JsonDict]
+    ) -> List[JsonDict]:
+        return await self._room_account_data.filter(
+            await self._room_filter.filter(events)
+        )
 
     def blocks_all_presence(self) -> bool:
         return (
@@ -274,7 +292,9 @@ class FilterCollection:
 
 
 class Filter:
-    def __init__(self, filter_json: JsonDict):
+    def __init__(self, hs: "HomeServer", filter_json: JsonDict):
+        self._hs = hs
+        self._store = hs.get_datastore()
         self.filter_json = filter_json
 
         self.limit = filter_json.get("limit", 10)
@@ -297,6 +317,20 @@ class Filter:
         self.labels = filter_json.get("org.matrix.labels", None)
         self.not_labels = filter_json.get("org.matrix.not_labels", [])
 
+        # Ideally these would be rejected at the endpoint if they were provided
+        # and not supported, but that would involve modifying the JSON schema
+        # based on the homeserver configuration.
+        if hs.config.experimental.msc3440_enabled:
+            self.relation_senders = self.filter_json.get(
+                "io.element.relation_senders", None
+            )
+            self.relation_types = self.filter_json.get(
+                "io.element.relation_types", None
+            )
+        else:
+            self.relation_senders = None
+            self.relation_types = None
+
     def filters_all_types(self) -> bool:
         return "*" in self.not_types
 
@@ -306,7 +340,7 @@ class Filter:
     def filters_all_rooms(self) -> bool:
         return "*" in self.not_rooms
 
-    def check(self, event: FilterEvent) -> bool:
+    def _check(self, event: FilterEvent) -> bool:
         """Checks whether the filter matches the given event.
 
         Args:
@@ -420,8 +454,30 @@ class Filter:
 
         return room_ids
 
-    def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
-        return list(filter(self.check, events))
+    async def _check_event_relations(
+        self, events: Iterable[FilterEvent]
+    ) -> List[FilterEvent]:
+        # The event IDs to check, mypy doesn't understand the ifinstance check.
+        event_ids = [event.event_id for event in events if isinstance(event, EventBase)]  # type: ignore[attr-defined]
+        event_ids_to_keep = set(
+            await self._store.events_have_relations(
+                event_ids, self.relation_senders, self.relation_types
+            )
+        )
+
+        return [
+            event
+            for event in events
+            if not isinstance(event, EventBase) or event.event_id in event_ids_to_keep
+        ]
+
+    async def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]:
+        result = [event for event in events if self._check(event)]
+
+        if self.relation_senders or self.relation_types:
+            return await self._check_event_relations(result)
+
+        return result
 
     def with_room_ids(self, room_ids: Iterable[str]) -> "Filter":
         """Returns a new filter with the given room IDs appended.
@@ -433,7 +489,7 @@ class Filter:
             filter: A new filter including the given rooms and the old
                     filter's rooms.
         """
-        newFilter = Filter(self.filter_json)
+        newFilter = Filter(self._hs, self.filter_json)
         newFilter.rooms += room_ids
         return newFilter
 
@@ -444,6 +500,3 @@ def _matches_wildcard(actual_value: Optional[str], filter_value: str) -> bool:
         return actual_value.startswith(type_prefix)
     else:
         return actual_value == filter_value
-
-
-DEFAULT_FILTER_COLLECTION = FilterCollection({})
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 6e84b1524f..4486b3bc7d 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -38,9 +38,6 @@ class ConsentURIBuilder:
     def __init__(self, hs_config: HomeServerConfig):
         if hs_config.key.form_secret is None:
             raise ConfigError("form_secret not set in config")
-        if hs_config.server.public_baseurl is None:
-            raise ConfigError("public_baseurl not set in config")
-
         self._hmac_secret = hs_config.key.form_secret.encode("utf-8")
         self._public_baseurl = hs_config.server.public_baseurl
 
diff --git a/synapse/config/account_validity.py b/synapse/config/account_validity.py
index b56c2a24df..c533452cab 100644
--- a/synapse/config/account_validity.py
+++ b/synapse/config/account_validity.py
@@ -75,10 +75,6 @@ class AccountValidityConfig(Config):
                 self.account_validity_period * 10.0 / 100.0
             )
 
-        if self.account_validity_renew_by_email_enabled:
-            if not self.root.server.public_baseurl:
-                raise ConfigError("Can't send renewal emails without 'public_baseurl'")
-
         # Load account validity templates.
         account_validity_template_dir = account_validity_config.get("template_dir")
         if account_validity_template_dir is not None:
diff --git a/synapse/config/cas.py b/synapse/config/cas.py
index 9b58ecf3d8..3f81814043 100644
--- a/synapse/config/cas.py
+++ b/synapse/config/cas.py
@@ -16,7 +16,7 @@ from typing import Any, List
 
 from synapse.config.sso import SsoAttributeRequirement
 
-from ._base import Config, ConfigError
+from ._base import Config
 from ._util import validate_config
 
 
@@ -35,14 +35,10 @@ class CasConfig(Config):
         if self.cas_enabled:
             self.cas_server_url = cas_config["server_url"]
 
-            # The public baseurl is required because it is used by the redirect
-            # template.
-            public_baseurl = self.root.server.public_baseurl
-            if not public_baseurl:
-                raise ConfigError("cas_config requires a public_baseurl to be set")
-
             # TODO Update this to a _synapse URL.
+            public_baseurl = self.root.server.public_baseurl
             self.cas_service_url = public_baseurl + "_matrix/client/r0/login/cas/ticket"
+
             self.cas_displayname_attribute = cas_config.get("displayname_attribute")
             required_attributes = cas_config.get("required_attributes") or {}
             self.cas_required_attributes = _parsed_required_attributes_def(
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 8ff59aa2f8..afd65fecd3 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -186,11 +186,6 @@ class EmailConfig(Config):
             if not self.email_notif_from:
                 missing.append("email.notif_from")
 
-            # public_baseurl is required to build password reset and validation links that
-            # will be emailed to users
-            if config.get("public_baseurl") is None:
-                missing.append("public_baseurl")
-
             if missing:
                 raise ConfigError(
                     MISSING_PASSWORD_RESET_CONFIG_ERROR % (", ".join(missing),)
@@ -296,9 +291,6 @@ class EmailConfig(Config):
             if not self.email_notif_from:
                 missing.append("email.notif_from")
 
-            if config.get("public_baseurl") is None:
-                missing.append("public_baseurl")
-
             if missing:
                 raise ConfigError(
                     "email.enable_notifs is True but required keys are missing: %s"
diff --git a/synapse/config/oidc.py b/synapse/config/oidc.py
index 10f5796330..42f113cd24 100644
--- a/synapse/config/oidc.py
+++ b/synapse/config/oidc.py
@@ -59,8 +59,6 @@ class OIDCConfig(Config):
                 )
 
         public_baseurl = self.root.server.public_baseurl
-        if public_baseurl is None:
-            raise ConfigError("oidc_config requires a public_baseurl to be set")
         self.oidc_callback_url = public_baseurl + "_synapse/client/oidc/callback"
 
     @property
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index a3d2a38c4c..5379e80715 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -45,17 +45,6 @@ class RegistrationConfig(Config):
         account_threepid_delegates = config.get("account_threepid_delegates") or {}
         self.account_threepid_delegate_email = account_threepid_delegates.get("email")
         self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn")
-        if (
-            self.account_threepid_delegate_msisdn
-            and not self.root.server.public_baseurl
-        ):
-            raise ConfigError(
-                "The configuration option `public_baseurl` is required if "
-                "`account_threepid_delegate.msisdn` is set, such that "
-                "clients know where to submit validation tokens to. Please "
-                "configure `public_baseurl`."
-            )
-
         self.default_identity_server = config.get("default_identity_server")
         self.allow_guest_access = config.get("allow_guest_access", False)
 
@@ -240,7 +229,7 @@ class RegistrationConfig(Config):
         # in on this server.
         #
         # (By default, no suggestion is made, so it is left up to the client.
-        # This setting is ignored unless public_baseurl is also set.)
+        # This setting is ignored unless public_baseurl is also explicitly set.)
         #
         #default_identity_server: https://matrix.org
 
@@ -265,8 +254,6 @@ class RegistrationConfig(Config):
         # by the Matrix Identity Service API specification:
         # https://matrix.org/docs/spec/identity_service/latest
         #
-        # If a delegate is specified, the config option public_baseurl must also be filled out.
-        #
         account_threepid_delegates:
             #email: https://example.com     # Delegate email sending to example.com
             #msisdn: http://localhost:8090  # Delegate SMS sending to this local process
diff --git a/synapse/config/saml2.py b/synapse/config/saml2.py
index 9c51b6a25a..ba2b0905ff 100644
--- a/synapse/config/saml2.py
+++ b/synapse/config/saml2.py
@@ -199,14 +199,11 @@ class SAML2Config(Config):
         """
         import saml2
 
-        public_baseurl = self.root.server.public_baseurl
-        if public_baseurl is None:
-            raise ConfigError("saml2_config requires a public_baseurl to be set")
-
         if self.saml2_grandfathered_mxid_source_attribute:
             optional_attributes.add(self.saml2_grandfathered_mxid_source_attribute)
         optional_attributes -= required_attributes
 
+        public_baseurl = self.root.server.public_baseurl
         metadata_url = public_baseurl + "_synapse/client/saml2/metadata.xml"
         response_url = public_baseurl + "_synapse/client/saml2/authn_response"
         return {
diff --git a/synapse/config/server.py b/synapse/config/server.py
index a387fd9310..7bc0030a9e 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -16,6 +16,7 @@ import itertools
 import logging
 import os.path
 import re
+import urllib.parse
 from textwrap import indent
 from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union
 
@@ -264,10 +265,44 @@ class ServerConfig(Config):
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
         self.serve_server_wellknown = config.get("serve_server_wellknown", False)
 
-        self.public_baseurl = config.get("public_baseurl")
-        if self.public_baseurl is not None:
-            if self.public_baseurl[-1] != "/":
-                self.public_baseurl += "/"
+        # Whether we should serve a "client well-known":
+        #  (a) at .well-known/matrix/client on our client HTTP listener
+        #  (b) in the response to /login
+        #
+        # ... which together help ensure that clients use our public_baseurl instead of
+        # whatever they were told by the user.
+        #
+        # For the sake of backwards compatibility with existing installations, this is
+        # True if public_baseurl is specified explicitly, and otherwise False. (The
+        # reasoning here is that we have no way of knowing that the default
+        # public_baseurl is actually correct for existing installations - many things
+        # will not work correctly, but that's (probably?) better than sending clients
+        # to a completely broken URL.
+        self.serve_client_wellknown = False
+
+        public_baseurl = config.get("public_baseurl")
+        if public_baseurl is None:
+            public_baseurl = f"https://{self.server_name}/"
+            logger.info("Using default public_baseurl %s", public_baseurl)
+        else:
+            self.serve_client_wellknown = True
+            if public_baseurl[-1] != "/":
+                public_baseurl += "/"
+        self.public_baseurl = public_baseurl
+
+        # check that public_baseurl is valid
+        try:
+            splits = urllib.parse.urlsplit(self.public_baseurl)
+        except Exception as e:
+            raise ConfigError(f"Unable to parse URL: {e}", ("public_baseurl",))
+        if splits.scheme not in ("https", "http"):
+            raise ConfigError(
+                f"Invalid scheme '{splits.scheme}': only https and http are supported"
+            )
+        if splits.query or splits.fragment:
+            raise ConfigError(
+                "public_baseurl cannot contain query parameters or a #-fragment"
+            )
 
         # Whether to enable user presence.
         presence_config = config.get("presence") or {}
@@ -773,6 +808,8 @@ class ServerConfig(Config):
         # Otherwise, it should be the URL to reach Synapse's client HTTP listener (see
         # 'listeners' below).
         #
+        # Defaults to 'https://<server_name>/'.
+        #
         #public_baseurl: https://example.com/
 
         # Uncomment the following to tell other servers to send federation traffic on
diff --git a/synapse/config/sso.py b/synapse/config/sso.py
index 11a9b76aa0..60aacb13ea 100644
--- a/synapse/config/sso.py
+++ b/synapse/config/sso.py
@@ -101,13 +101,10 @@ class SSOConfig(Config):
         # gracefully to the client). This would make it pointless to ask the user for
         # confirmation, since the URL the confirmation page would be showing wouldn't be
         # the client's.
-        # public_baseurl is an optional setting, so we only add the fallback's URL to the
-        # list if it's provided (because we can't figure out what that URL is otherwise).
-        if self.root.server.public_baseurl:
-            login_fallback_url = (
-                self.root.server.public_baseurl + "_matrix/static/client/login"
-            )
-            self.sso_client_whitelist.append(login_fallback_url)
+        login_fallback_url = (
+            self.root.server.public_baseurl + "_matrix/static/client/login"
+        )
+        self.sso_client_whitelist.append(login_fallback_url)
 
     def generate_config_section(self, **kwargs):
         return """\
@@ -128,11 +125,10 @@ class SSOConfig(Config):
             # phishing attacks from evil.site. To avoid this, include a slash after the
             # hostname: "https://my.client/".
             #
-            # If public_baseurl is set, then the login fallback page (used by clients
-            # that don't natively support the required login flows) is whitelisted in
-            # addition to any URLs in this list.
+            # The login fallback page (used by clients that don't natively support the
+            # required login flows) is whitelisted in addition to any URLs in this list.
             #
-            # By default, this list is empty.
+            # By default, this list contains only the login fallback page.
             #
             #client_whitelist:
             #  - https://riot.im/develop
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 462630201d..4507992031 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -63,7 +63,8 @@ class WriterLocations:
 
     Attributes:
         events: The instances that write to the event and backfill streams.
-        typing: The instance that writes to the typing stream.
+        typing: The instances that write to the typing stream. Currently
+            can only be a single instance.
         to_device: The instances that write to the to_device stream. Currently
             can only be a single instance.
         account_data: The instances that write to the account data streams. Currently
@@ -75,9 +76,15 @@ class WriterLocations:
     """
 
     events = attr.ib(
-        default=["master"], type=List[str], converter=_instance_to_list_converter
+        default=["master"],
+        type=List[str],
+        converter=_instance_to_list_converter,
+    )
+    typing = attr.ib(
+        default=["master"],
+        type=List[str],
+        converter=_instance_to_list_converter,
     )
-    typing = attr.ib(default="master", type=str)
     to_device = attr.ib(
         default=["master"],
         type=List[str],
@@ -217,6 +224,11 @@ class WorkerConfig(Config):
                         % (instance, stream)
                     )
 
+        if len(self.writers.typing) != 1:
+            raise ConfigError(
+                "Must only specify one instance to handle `typing` messages."
+            )
+
         if len(self.writers.to_device) != 1:
             raise ConfigError(
                 "Must only specify one instance to handle `to_device` messages."
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 670186f548..3b85b135e0 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -277,6 +277,58 @@ class FederationClient(FederationBase):
 
         return pdus
 
+    async def get_pdu_from_destination_raw(
+        self,
+        destination: str,
+        event_id: str,
+        room_version: RoomVersion,
+        outlier: bool = False,
+        timeout: Optional[int] = None,
+    ) -> Optional[EventBase]:
+        """Requests the PDU with given origin and ID from the remote home
+        server. Does not have any caching or rate limiting!
+
+        Args:
+            destination: Which homeserver to query
+            event_id: event to fetch
+            room_version: version of the room
+            outlier: Indicates whether the PDU is an `outlier`, i.e. if
+                it's from an arbitrary point in the context as opposed to part
+                of the current block of PDUs. Defaults to `False`
+            timeout: How long to try (in ms) each destination for before
+                moving to the next destination. None indicates no timeout.
+
+        Returns:
+            The requested PDU, or None if we were unable to find it.
+
+        Raises:
+            SynapseError, NotRetryingDestination, FederationDeniedError
+        """
+        transaction_data = await self.transport_layer.get_event(
+            destination, event_id, timeout=timeout
+        )
+
+        logger.debug(
+            "retrieved event id %s from %s: %r",
+            event_id,
+            destination,
+            transaction_data,
+        )
+
+        pdu_list: List[EventBase] = [
+            event_from_pdu_json(p, room_version, outlier=outlier)
+            for p in transaction_data["pdus"]
+        ]
+
+        if pdu_list and pdu_list[0]:
+            pdu = pdu_list[0]
+
+            # Check signatures are correct.
+            signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
+            return signed_pdu
+
+        return None
+
     async def get_pdu(
         self,
         destinations: Iterable[str],
@@ -321,30 +373,14 @@ class FederationClient(FederationBase):
                 continue
 
             try:
-                transaction_data = await self.transport_layer.get_event(
-                    destination, event_id, timeout=timeout
-                )
-
-                logger.debug(
-                    "retrieved event id %s from %s: %r",
-                    event_id,
-                    destination,
-                    transaction_data,
+                signed_pdu = await self.get_pdu_from_destination_raw(
+                    destination=destination,
+                    event_id=event_id,
+                    room_version=room_version,
+                    outlier=outlier,
+                    timeout=timeout,
                 )
 
-                pdu_list: List[EventBase] = [
-                    event_from_pdu_json(p, room_version, outlier=outlier)
-                    for p in transaction_data["pdus"]
-                ]
-
-                if pdu_list and pdu_list[0]:
-                    pdu = pdu_list[0]
-
-                    # Check signatures are correct.
-                    signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
-
-                    break
-
                 pdu_attempts[destination] = now
 
             except SynapseError as e:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 32a75993d9..9a8758e9a6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -213,6 +213,11 @@ class FederationServer(FederationBase):
             self._started_handling_of_staged_events = True
             self._handle_old_staged_events()
 
+            # Start a periodic check for old staged events. This is to handle
+            # the case where locks time out, e.g. if another process gets killed
+            # without dropping its locks.
+            self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)
+
         # keep this as early as possible to make the calculated origin ts as
         # accurate as possible.
         request_time = self._clock.time_msec()
@@ -1232,10 +1237,6 @@ class FederationHandlerRegistry:
 
         self.query_handlers[query_type] = handler
 
-    def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None:
-        """Register that the EDU handler is on a different instance than master."""
-        self._edu_type_to_instance[edu_type] = [instance_name]
-
     def register_instances_for_edu(
         self, edu_type: str, instance_names: List[str]
     ) -> None:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 67f8ffcaff..9abdad262b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.storage.databases.main.directory import RoomAliasMapping
 from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.util.async_helpers import Linearizer
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -58,6 +59,10 @@ class ApplicationServicesHandler:
         self.current_max = 0
         self.is_processing = False
 
+        self._ephemeral_events_linearizer = Linearizer(
+            name="appservice_ephemeral_events"
+        )
+
     def notify_interested_services(self, max_token: RoomStreamToken) -> None:
         """Notifies (pushes) all application services interested in this event.
 
@@ -183,7 +188,7 @@ class ApplicationServicesHandler:
         self,
         stream_key: str,
         new_token: Union[int, RoomStreamToken],
-        users: Optional[Collection[Union[str, UserID]]] = None,
+        users: Collection[Union[str, UserID]],
     ) -> None:
         """
         This is called by the notifier in the background when an ephemeral event is handled
@@ -198,7 +203,9 @@ class ApplicationServicesHandler:
                 value for `stream_key` will cause this function to return early.
 
                 Ephemeral events will only be pushed to appservices that have opted into
-                them.
+                receiving them by setting `push_ephemeral` to true in their registration
+                file. Note that while MSC2409 is experimental, this option is called
+                `de.sorunome.msc2409.push_ephemeral`.
 
                 Appservices will only receive ephemeral events that fall within their
                 registered user and room namespaces.
@@ -209,6 +216,7 @@ class ApplicationServicesHandler:
         if not self.notify_appservices:
             return
 
+        # Ignore any unsupported streams
         if stream_key not in ("typing_key", "receipt_key", "presence_key"):
             return
 
@@ -225,18 +233,25 @@ class ApplicationServicesHandler:
         # Additional context: https://github.com/matrix-org/synapse/pull/11137
         assert isinstance(new_token, int)
 
+        # Check whether there are any appservices which have registered to receive
+        # ephemeral events.
+        #
+        # Note that whether these events are actually relevant to these appservices
+        # is decided later on.
         services = [
             service
             for service in self.store.get_app_services()
             if service.supports_ephemeral
         ]
         if not services:
+            # Bail out early if none of the target appservices have explicitly registered
+            # to receive these ephemeral events.
             return
 
         # We only start a new background process if necessary rather than
         # optimistically (to cut down on overhead).
         self._notify_interested_services_ephemeral(
-            services, stream_key, new_token, users or []
+            services, stream_key, new_token, users
         )
 
     @wrap_as_background_process("notify_interested_services_ephemeral")
@@ -247,7 +262,7 @@ class ApplicationServicesHandler:
         new_token: int,
         users: Collection[Union[str, UserID]],
     ) -> None:
-        logger.debug("Checking interested services for %s" % (stream_key))
+        logger.debug("Checking interested services for %s", stream_key)
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
                 if stream_key == "typing_key":
@@ -260,26 +275,37 @@ class ApplicationServicesHandler:
                     events = await self._handle_typing(service, new_token)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    continue
 
-                elif stream_key == "receipt_key":
-                    events = await self._handle_receipts(service)
-                    if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
-
-                    # Persist the latest handled stream token for this appservice
-                    await self.store.set_type_stream_id_for_appservice(
-                        service, "read_receipt", new_token
+                # Since we read/update the stream position for this AS/stream
+                with (
+                    await self._ephemeral_events_linearizer.queue(
+                        (service.id, stream_key)
                     )
+                ):
+                    if stream_key == "receipt_key":
+                        events = await self._handle_receipts(service, new_token)
+                        if events:
+                            self.scheduler.submit_ephemeral_events_for_as(
+                                service, events
+                            )
+
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "read_receipt", new_token
+                        )
 
-                elif stream_key == "presence_key":
-                    events = await self._handle_presence(service, users)
-                    if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    elif stream_key == "presence_key":
+                        events = await self._handle_presence(service, users, new_token)
+                        if events:
+                            self.scheduler.submit_ephemeral_events_for_as(
+                                service, events
+                            )
 
-                    # Persist the latest handled stream token for this appservice
-                    await self.store.set_type_stream_id_for_appservice(
-                        service, "presence", new_token
-                    )
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "presence", new_token
+                        )
 
     async def _handle_typing(
         self, service: ApplicationService, new_token: int
@@ -316,7 +342,9 @@ class ApplicationServicesHandler:
         )
         return typing
 
-    async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
+    async def _handle_receipts(
+        self, service: ApplicationService, new_token: Optional[int]
+    ) -> List[JsonDict]:
         """
         Return the latest read receipts that the given application service should receive.
 
@@ -327,6 +355,9 @@ class ApplicationServicesHandler:
 
         Args:
             service: The application service to check for which events it should receive.
+            new_token: A receipts event stream token. Purely used to double-check that the
+                from_token we pull from the database isn't greater than or equal to this
+                token. Prevents accidentally duplicating work.
 
         Returns:
             A list of JSON dictionaries containing data derived from the read receipts that
@@ -335,6 +366,12 @@ class ApplicationServicesHandler:
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "read_receipt"
         )
+        if new_token is not None and new_token <= from_key:
+            logger.debug(
+                "Rejecting token lower than or equal to stored: %s" % (new_token,)
+            )
+            return []
+
         receipts_source = self.event_sources.sources.receipt
         receipts, _ = await receipts_source.get_new_events_as(
             service=service, from_key=from_key
@@ -342,7 +379,10 @@ class ApplicationServicesHandler:
         return receipts
 
     async def _handle_presence(
-        self, service: ApplicationService, users: Collection[Union[str, UserID]]
+        self,
+        service: ApplicationService,
+        users: Collection[Union[str, UserID]],
+        new_token: Optional[int],
     ) -> List[JsonDict]:
         """
         Return the latest presence updates that the given application service should receive.
@@ -355,6 +395,9 @@ class ApplicationServicesHandler:
         Args:
             service: The application service that ephemeral events are being sent to.
             users: The users that should receive the presence update.
+            new_token: A presence update stream token. Purely used to double-check that the
+                from_token we pull from the database isn't greater than or equal to this
+                token. Prevents accidentally duplicating work.
 
         Returns:
             A list of json dictionaries containing data derived from the presence events
@@ -365,6 +408,12 @@ class ApplicationServicesHandler:
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "presence"
         )
+        if new_token is not None and new_token <= from_key:
+            logger.debug(
+                "Rejecting token lower than or equal to stored: %s" % (new_token,)
+            )
+            return []
+
         for user in users:
             if isinstance(user, str):
                 user = UserID.from_string(user)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d508d7d32a..60e59d11a0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1989,7 +1989,9 @@ class PasswordAuthProvider:
         self,
         check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None,
         on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None,
-        auth_checkers: Optional[Dict[Tuple[str, Tuple], CHECK_AUTH_CALLBACK]] = None,
+        auth_checkers: Optional[
+            Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
+        ] = None,
     ) -> None:
         # Register check_3pid_auth callback
         if check_3pid_auth is not None:
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index b6a2a34ab7..b582266af9 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -89,6 +89,13 @@ class DeviceMessageHandler:
         )
 
     async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
+        """
+        Handle receiving to-device messages from remote homeservers.
+
+        Args:
+            origin: The remote homeserver.
+            content: The JSON dictionary containing the to-device messages.
+        """
         local_messages = {}
         sender_user_id = content["sender"]
         if origin != get_domain_from_id(sender_user_id):
@@ -135,12 +142,16 @@ class DeviceMessageHandler:
                 message_type, sender_user_id, by_device
             )
 
-        stream_id = await self.store.add_messages_from_remote_to_device_inbox(
+        # Add messages to the database.
+        # Retrieve the stream id of the last-processed to-device message.
+        last_stream_id = await self.store.add_messages_from_remote_to_device_inbox(
             origin, message_id, local_messages
         )
 
+        # Notify listeners that there are new to-device messages to process,
+        # handing them the latest stream id.
         self.notifier.on_new_event(
-            "to_device_key", stream_id, users=local_messages.keys()
+            "to_device_key", last_stream_id, users=local_messages.keys()
         )
 
     async def _check_for_unknown_devices(
@@ -195,6 +206,14 @@ class DeviceMessageHandler:
         message_type: str,
         messages: Dict[str, Dict[str, JsonDict]],
     ) -> None:
+        """
+        Handle a request from a user to send to-device message(s).
+
+        Args:
+            requester: The user that is sending the to-device messages.
+            message_type: The type of to-device messages that are being sent.
+            messages: A dictionary containing recipients mapped to messages intended for them.
+        """
         sender_user_id = requester.user.to_string()
 
         message_id = random_string(16)
@@ -257,12 +276,16 @@ class DeviceMessageHandler:
                 "org.matrix.opentracing_context": json_encoder.encode(context),
             }
 
-        stream_id = await self.store.add_messages_to_device_inbox(
+        # Add messages to the database.
+        # Retrieve the stream id of the last-processed to-device message.
+        last_stream_id = await self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
         )
 
+        # Notify listeners that there are new to-device messages to process,
+        # handing them the latest stream id.
         self.notifier.on_new_event(
-            "to_device_key", stream_id, users=local_messages.keys()
+            "to_device_key", last_stream_id, users=local_messages.keys()
         )
 
         if self.federation_sender:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d0fb2fc7dc..60c11e3d21 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -201,95 +201,19 @@ class E2eKeysHandler:
                     r[user_id] = remote_queries[user_id]
 
             # Now fetch any devices that we don't have in our cache
-            @trace
-            async def do_remote_query(destination: str) -> None:
-                """This is called when we are querying the device list of a user on
-                a remote homeserver and their device list is not in the device list
-                cache. If we share a room with this user and we're not querying for
-                specific user we will update the cache with their device list.
-                """
-
-                destination_query = remote_queries_not_in_cache[destination]
-
-                # We first consider whether we wish to update the device list cache with
-                # the users device list. We want to track a user's devices when the
-                # authenticated user shares a room with the queried user and the query
-                # has not specified a particular device.
-                # If we update the cache for the queried user we remove them from further
-                # queries. We use the more efficient batched query_client_keys for all
-                # remaining users
-                user_ids_updated = []
-                for (user_id, device_list) in destination_query.items():
-                    if user_id in user_ids_updated:
-                        continue
-
-                    if device_list:
-                        continue
-
-                    room_ids = await self.store.get_rooms_for_user(user_id)
-                    if not room_ids:
-                        continue
-
-                    # We've decided we're sharing a room with this user and should
-                    # probably be tracking their device lists. However, we haven't
-                    # done an initial sync on the device list so we do it now.
-                    try:
-                        if self._is_master:
-                            user_devices = await self.device_handler.device_list_updater.user_device_resync(
-                                user_id
-                            )
-                        else:
-                            user_devices = await self._user_device_resync_client(
-                                user_id=user_id
-                            )
-
-                        user_devices = user_devices["devices"]
-                        user_results = results.setdefault(user_id, {})
-                        for device in user_devices:
-                            user_results[device["device_id"]] = device["keys"]
-                        user_ids_updated.append(user_id)
-                    except Exception as e:
-                        failures[destination] = _exception_to_failure(e)
-
-                if len(destination_query) == len(user_ids_updated):
-                    # We've updated all the users in the query and we do not need to
-                    # make any further remote calls.
-                    return
-
-                # Remove all the users from the query which we have updated
-                for user_id in user_ids_updated:
-                    destination_query.pop(user_id)
-
-                try:
-                    remote_result = await self.federation.query_client_keys(
-                        destination, {"device_keys": destination_query}, timeout=timeout
-                    )
-
-                    for user_id, keys in remote_result["device_keys"].items():
-                        if user_id in destination_query:
-                            results[user_id] = keys
-
-                    if "master_keys" in remote_result:
-                        for user_id, key in remote_result["master_keys"].items():
-                            if user_id in destination_query:
-                                cross_signing_keys["master_keys"][user_id] = key
-
-                    if "self_signing_keys" in remote_result:
-                        for user_id, key in remote_result["self_signing_keys"].items():
-                            if user_id in destination_query:
-                                cross_signing_keys["self_signing_keys"][user_id] = key
-
-                except Exception as e:
-                    failure = _exception_to_failure(e)
-                    failures[destination] = failure
-                    set_tag("error", True)
-                    set_tag("reason", failure)
-
             await make_deferred_yieldable(
                 defer.gatherResults(
                     [
-                        run_in_background(do_remote_query, destination)
-                        for destination in remote_queries_not_in_cache
+                        run_in_background(
+                            self._query_devices_for_destination,
+                            results,
+                            cross_signing_keys,
+                            failures,
+                            destination,
+                            queries,
+                            timeout,
+                        )
+                        for destination, queries in remote_queries_not_in_cache.items()
                     ],
                     consumeErrors=True,
                 ).addErrback(unwrapFirstError)
@@ -301,6 +225,121 @@ class E2eKeysHandler:
 
             return ret
 
+    @trace
+    async def _query_devices_for_destination(
+        self,
+        results: JsonDict,
+        cross_signing_keys: JsonDict,
+        failures: Dict[str, JsonDict],
+        destination: str,
+        destination_query: Dict[str, Iterable[str]],
+        timeout: int,
+    ) -> None:
+        """This is called when we are querying the device list of a user on
+        a remote homeserver and their device list is not in the device list
+        cache. If we share a room with this user and we're not querying for
+        specific user we will update the cache with their device list.
+
+        Args:
+            results: A map from user ID to their device keys, which gets
+                updated with the newly fetched keys.
+            cross_signing_keys: Map from user ID to their cross signing keys,
+                which gets updated with the newly fetched keys.
+            failures: Map of destinations to failures that have occurred while
+                attempting to fetch keys.
+            destination: The remote server to query
+            destination_query: The query dict of devices to query the remote
+                server for.
+            timeout: The timeout for remote HTTP requests.
+        """
+
+        # We first consider whether we wish to update the device list cache with
+        # the users device list. We want to track a user's devices when the
+        # authenticated user shares a room with the queried user and the query
+        # has not specified a particular device.
+        # If we update the cache for the queried user we remove them from further
+        # queries. We use the more efficient batched query_client_keys for all
+        # remaining users
+        user_ids_updated = []
+        for (user_id, device_list) in destination_query.items():
+            if user_id in user_ids_updated:
+                continue
+
+            if device_list:
+                continue
+
+            room_ids = await self.store.get_rooms_for_user(user_id)
+            if not room_ids:
+                continue
+
+            # We've decided we're sharing a room with this user and should
+            # probably be tracking their device lists. However, we haven't
+            # done an initial sync on the device list so we do it now.
+            try:
+                if self._is_master:
+                    resync_results = await self.device_handler.device_list_updater.user_device_resync(
+                        user_id
+                    )
+                else:
+                    resync_results = await self._user_device_resync_client(
+                        user_id=user_id
+                    )
+
+                # Add the device keys to the results.
+                user_devices = resync_results["devices"]
+                user_results = results.setdefault(user_id, {})
+                for device in user_devices:
+                    user_results[device["device_id"]] = device["keys"]
+                user_ids_updated.append(user_id)
+
+                # Add any cross signing keys to the results.
+                master_key = resync_results.get("master_key")
+                self_signing_key = resync_results.get("self_signing_key")
+
+                if master_key:
+                    cross_signing_keys["master_keys"][user_id] = master_key
+
+                if self_signing_key:
+                    cross_signing_keys["self_signing_keys"][user_id] = self_signing_key
+            except Exception as e:
+                failures[destination] = _exception_to_failure(e)
+
+        if len(destination_query) == len(user_ids_updated):
+            # We've updated all the users in the query and we do not need to
+            # make any further remote calls.
+            return
+
+        # Remove all the users from the query which we have updated
+        for user_id in user_ids_updated:
+            destination_query.pop(user_id)
+
+        try:
+            remote_result = await self.federation.query_client_keys(
+                destination, {"device_keys": destination_query}, timeout=timeout
+            )
+
+            for user_id, keys in remote_result["device_keys"].items():
+                if user_id in destination_query:
+                    results[user_id] = keys
+
+            if "master_keys" in remote_result:
+                for user_id, key in remote_result["master_keys"].items():
+                    if user_id in destination_query:
+                        cross_signing_keys["master_keys"][user_id] = key
+
+            if "self_signing_keys" in remote_result:
+                for user_id, key in remote_result["self_signing_keys"].items():
+                    if user_id in destination_query:
+                        cross_signing_keys["self_signing_keys"][user_id] = key
+
+        except Exception as e:
+            failure = _exception_to_failure(e)
+            failures[destination] = failure
+            set_tag("error", True)
+            set_tag("reason", failure)
+
+        return
+
     async def get_cross_signing_keys_from_cache(
         self, query: Iterable[str], from_user_id: Optional[str]
     ) -> Dict[str, Dict[str, dict]]:
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 6a315117ba..3dbe611f95 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -537,10 +537,6 @@ class IdentityHandler:
         except RequestTimedOutError:
             raise SynapseError(500, "Timed out contacting identity server")
 
-        # It is already checked that public_baseurl is configured since this code
-        # should only be used if account_threepid_delegate_msisdn is true.
-        assert self.hs.config.server.public_baseurl
-
         # we need to tell the client to send the token back to us, since it doesn't
         # otherwise know where to send it, so add submit_url response parameter
         # (see also MSC2078)
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index abfe7be0e3..aa26911aed 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -424,7 +424,7 @@ class PaginationHandler:
 
         if events:
             if event_filter:
-                events = event_filter.filter(events)
+                events = await event_filter.filter(events)
 
             events = await filter_events_for_client(
                 self.storage, user_id, events, is_peeking=(member_event_id is None)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 969eb3b9b0..11af30eee7 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -12,8 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""Contains functions for performing events on rooms."""
-
+"""Contains functions for performing actions on rooms."""
 import itertools
 import logging
 import math
@@ -31,6 +30,8 @@ from typing import (
     Tuple,
 )
 
+from typing_extensions import TypedDict
+
 from synapse.api.constants import (
     EventContentFields,
     EventTypes,
@@ -1158,8 +1159,10 @@ class RoomContextHandler:
         )
 
         if event_filter:
-            results["events_before"] = event_filter.filter(results["events_before"])
-            results["events_after"] = event_filter.filter(results["events_after"])
+            results["events_before"] = await event_filter.filter(
+                results["events_before"]
+            )
+            results["events_after"] = await event_filter.filter(results["events_after"])
 
         results["events_before"] = await filter_evts(results["events_before"])
         results["events_after"] = await filter_evts(results["events_after"])
@@ -1195,7 +1198,7 @@ class RoomContextHandler:
 
         state_events = list(state[last_event_id].values())
         if event_filter:
-            state_events = event_filter.filter(state_events)
+            state_events = await event_filter.filter(state_events)
 
         results["state"] = await filter_evts(state_events)
 
@@ -1275,6 +1278,13 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
         return self.store.get_room_events_max_id(room_id)
 
 
+class ShutdownRoomResponse(TypedDict):
+    kicked_users: List[str]
+    failed_to_kick_users: List[str]
+    local_aliases: List[str]
+    new_room_id: Optional[str]
+
+
 class RoomShutdownHandler:
 
     DEFAULT_MESSAGE = (
@@ -1300,7 +1310,7 @@ class RoomShutdownHandler:
         new_room_name: Optional[str] = None,
         message: Optional[str] = None,
         block: bool = False,
-    ) -> dict:
+    ) -> ShutdownRoomResponse:
         """
         Shuts down a room. Moves all local users and room aliases automatically
         to a new room if `new_room_user_id` is set. Otherwise local users only
@@ -1334,8 +1344,13 @@ class RoomShutdownHandler:
                 Defaults to `Sharing illegal content on this server is not
                 permitted and rooms in violation will be blocked.`
             block:
-                If set to `true`, this room will be added to a blocking list,
-                preventing future attempts to join the room. Defaults to `false`.
+                If set to `True`, users will be prevented from joining the old
+                room. This option can also be used to pre-emptively block a room,
+                even if it's unknown to this homeserver. In this case, the room
+                will be blocked, and no further action will be taken. If `False`,
+                attempting to delete an unknown room is invalid.
+
+                Defaults to `False`.
 
         Returns: a dict containing the following keys:
             kicked_users: An array of users (`user_id`) that were kicked.
@@ -1344,7 +1359,9 @@ class RoomShutdownHandler:
             local_aliases:
                 An array of strings representing the local aliases that were
                 migrated from the old room to the new.
-            new_room_id: A string representing the room ID of the new room.
+            new_room_id:
+                A string representing the room ID of the new room, or None if
+                no such room was created.
         """
 
         if not new_room_name:
@@ -1355,14 +1372,28 @@ class RoomShutdownHandler:
         if not RoomID.is_valid(room_id):
             raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
 
-        if not await self.store.get_room(room_id):
-            raise NotFoundError("Unknown room id %s" % (room_id,))
-
-        # This will work even if the room is already blocked, but that is
-        # desirable in case the first attempt at blocking the room failed below.
+        # Action the block first (even if the room doesn't exist yet)
         if block:
+            # This will work even if the room is already blocked, but that is
+            # desirable in case the first attempt at blocking the room failed below.
             await self.store.block_room(room_id, requester_user_id)
 
+        if not await self.store.get_room(room_id):
+            if block:
+                # We allow you to block an unknown room.
+                return {
+                    "kicked_users": [],
+                    "failed_to_kick_users": [],
+                    "local_aliases": [],
+                    "new_room_id": None,
+                }
+            else:
+                # But if you don't want to preventatively block another room,
+                # this function can't do anything useful.
+                raise NotFoundError(
+                    "Cannot shut down room: unknown room id %s" % (room_id,)
+                )
+
         if new_room_user_id is not None:
             if not self.hs.is_mine_id(new_room_user_id):
                 raise SynapseError(
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 6e4dff8056..ab7eaab2fb 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -180,7 +180,7 @@ class SearchHandler:
                 % (set(group_keys) - {"room_id", "sender"},),
             )
 
-        search_filter = Filter(filter_dict)
+        search_filter = Filter(self.hs, filter_dict)
 
         # TODO: Search through left rooms too
         rooms = await self.store.get_rooms_for_local_user_where_membership_is(
@@ -242,7 +242,7 @@ class SearchHandler:
 
             rank_map.update({r["event"].event_id: r["rank"] for r in results})
 
-            filtered_events = search_filter.filter([r["event"] for r in results])
+            filtered_events = await search_filter.filter([r["event"] for r in results])
 
             events = await filter_events_for_client(
                 self.storage, user.to_string(), filtered_events
@@ -292,7 +292,9 @@ class SearchHandler:
 
                 rank_map.update({r["event"].event_id: r["rank"] for r in results})
 
-                filtered_events = search_filter.filter([r["event"] for r in results])
+                filtered_events = await search_filter.filter(
+                    [r["event"] for r in results]
+                )
 
                 events = await filter_events_for_client(
                     self.storage, user.to_string(), filtered_events
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2c7c6d63a9..891435c14d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -510,7 +510,7 @@ class SyncHandler:
             log_kv({"limited": limited})
 
             if potential_recents:
-                recents = sync_config.filter_collection.filter_room_timeline(
+                recents = await sync_config.filter_collection.filter_room_timeline(
                     potential_recents
                 )
                 log_kv({"recents_after_sync_filtering": len(recents)})
@@ -575,8 +575,8 @@ class SyncHandler:
 
                 log_kv({"loaded_recents": len(events)})
 
-                loaded_recents = sync_config.filter_collection.filter_room_timeline(
-                    events
+                loaded_recents = (
+                    await sync_config.filter_collection.filter_room_timeline(events)
                 )
 
                 log_kv({"loaded_recents_after_sync_filtering": len(loaded_recents)})
@@ -1015,7 +1015,7 @@ class SyncHandler:
 
         return {
             (e.type, e.state_key): e
-            for e in sync_config.filter_collection.filter_room_state(
+            for e in await sync_config.filter_collection.filter_room_state(
                 list(state.values())
             )
             if e.type != EventTypes.Aliases  # until MSC2261 or alternative solution
@@ -1383,7 +1383,7 @@ class SyncHandler:
                 sync_config.user
             )
 
-        account_data_for_user = sync_config.filter_collection.filter_account_data(
+        account_data_for_user = await sync_config.filter_collection.filter_account_data(
             [
                 {"type": account_data_type, "content": content}
                 for account_data_type, content in account_data.items()
@@ -1448,7 +1448,7 @@ class SyncHandler:
             # Deduplicate the presence entries so that there's at most one per user
             presence = list({p.user_id: p for p in presence}.values())
 
-        presence = sync_config.filter_collection.filter_presence(presence)
+        presence = await sync_config.filter_collection.filter_presence(presence)
 
         sync_result_builder.presence = presence
 
@@ -2021,12 +2021,14 @@ class SyncHandler:
                 )
 
             account_data_events = (
-                sync_config.filter_collection.filter_room_account_data(
+                await sync_config.filter_collection.filter_room_account_data(
                     account_data_events
                 )
             )
 
-            ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral)
+            ephemeral = await sync_config.filter_collection.filter_room_ephemeral(
+                ephemeral
+            )
 
             if not (
                 always_include
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c411d69924..22c6174821 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -62,8 +62,8 @@ class FollowerTypingHandler:
         if hs.should_send_federation():
             self.federation = hs.get_federation_sender()
 
-        if hs.config.worker.writers.typing != hs.get_instance_name():
-            hs.get_federation_registry().register_instance_for_edu(
+        if hs.get_instance_name() not in hs.config.worker.writers.typing:
+            hs.get_federation_registry().register_instances_for_edu(
                 "m.typing",
                 hs.config.worker.writers.typing,
             )
@@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
-        assert hs.config.worker.writers.typing == hs.get_instance_name()
+        assert hs.get_instance_name() in hs.config.worker.writers.typing
 
         self.auth = hs.get_auth()
         self.notifier = hs.get_notifier()
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 06fd06fdf3..21293038ef 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -138,7 +138,7 @@ class ReplicationCommandHandler:
             if isinstance(stream, TypingStream):
                 # Only add TypingStream as a source on the instance in charge of
                 # typing.
-                if hs.config.worker.writers.typing == hs.get_instance_name():
+                if hs.get_instance_name() in hs.config.worker.writers.typing:
                     self._streams_to_replicate.append(stream)
 
                 continue
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index c8b188ae4e..743a01da08 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -328,8 +328,7 @@ class TypingStream(Stream):
     ROW_TYPE = TypingStreamRow
 
     def __init__(self, hs: "HomeServer"):
-        writer_instance = hs.config.worker.writers.typing
-        if writer_instance == hs.get_instance_name():
+        if hs.get_instance_name() in hs.config.worker.writers.typing:
             # On the writer, query the typing handler
             typing_writer_handler = hs.get_typing_writer_handler()
             update_function: Callable[
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 70514e814f..81e98f81d6 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -25,6 +25,10 @@ from synapse.http.server import HttpServer, JsonResource
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.http.site import SynapseRequest
 from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
+from synapse.rest.admin.background_updates import (
+    BackgroundUpdateEnabledRestServlet,
+    BackgroundUpdateRestServlet,
+)
 from synapse.rest.admin.devices import (
     DeleteDevicesRestServlet,
     DeviceRestServlet,
@@ -247,6 +251,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     # Some servlets only get registered for the main process.
     if hs.config.worker.worker_app is None:
         SendServerNoticeServlet(hs).register(http_server)
+        BackgroundUpdateEnabledRestServlet(hs).register(http_server)
+        BackgroundUpdateRestServlet(hs).register(http_server)
 
 
 def register_servlets_for_client_rest_resource(
diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py
new file mode 100644
index 0000000000..0d0183bf20
--- /dev/null
+++ b/synapse/rest/admin/background_updates.py
@@ -0,0 +1,107 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import TYPE_CHECKING, Tuple
+
+from synapse.api.errors import SynapseError
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.site import SynapseRequest
+from synapse.rest.admin._base import admin_patterns, assert_user_is_admin
+from synapse.types import JsonDict
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class BackgroundUpdateEnabledRestServlet(RestServlet):
+    """Allows temporarily disabling background updates"""
+
+    PATTERNS = admin_patterns("/background_updates/enabled")
+
+    def __init__(self, hs: "HomeServer"):
+        self.group_server = hs.get_groups_server_handler()
+        self.is_mine_id = hs.is_mine_id
+        self.auth = hs.get_auth()
+
+        self.data_stores = hs.get_datastores()
+
+    async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        # We need to check that all configured databases have updates enabled.
+        # (They *should* all be in sync.)
+        enabled = all(db.updates.enabled for db in self.data_stores.databases)
+
+        return 200, {"enabled": enabled}
+
+    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        body = parse_json_object_from_request(request)
+
+        enabled = body.get("enabled", True)
+
+        if not isinstance(enabled, bool):
+            raise SynapseError(400, "'enabled' parameter must be a boolean")
+
+        for db in self.data_stores.databases:
+            db.updates.enabled = enabled
+
+            # If we're re-enabling them ensure that we start the background
+            # process again.
+            if enabled:
+                db.updates.start_doing_background_updates()
+
+        return 200, {"enabled": enabled}
+
+
+class BackgroundUpdateRestServlet(RestServlet):
+    """Fetch information about background updates"""
+
+    PATTERNS = admin_patterns("/background_updates/status")
+
+    def __init__(self, hs: "HomeServer"):
+        self.group_server = hs.get_groups_server_handler()
+        self.is_mine_id = hs.is_mine_id
+        self.auth = hs.get_auth()
+
+        self.data_stores = hs.get_datastores()
+
+    async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        # We need to check that all configured databases have updates enabled.
+        # (They *should* all be in sync.)
+        enabled = all(db.updates.enabled for db in self.data_stores.databases)
+
+        current_updates = {}
+
+        for db in self.data_stores.databases:
+            update = db.updates.get_current_update()
+            if not update:
+                continue
+
+            current_updates[db.name()] = {
+                "name": update.name,
+                "total_item_count": update.total_item_count,
+                "total_duration_ms": update.total_duration_ms,
+                "average_items_per_ms": update.average_items_per_ms(),
+            }
+
+        return 200, {"enabled": enabled, "current_updates": current_updates}
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 05c5b4bf0c..a2f4edebb8 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 import logging
 from http import HTTPStatus
-from typing import TYPE_CHECKING, List, Optional, Tuple
+from typing import TYPE_CHECKING, List, Optional, Tuple, cast
 from urllib import parse as urlparse
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
@@ -239,9 +239,22 @@ class RoomRestServlet(RestServlet):
 
         # Purge room
         if purge:
-            await pagination_handler.purge_room(room_id, force=force_purge)
-
-        return 200, ret
+            try:
+                await pagination_handler.purge_room(room_id, force=force_purge)
+            except NotFoundError:
+                if block:
+                    # We can block unknown rooms with this endpoint, in which case
+                    # a failed purge is expected.
+                    pass
+                else:
+                    # But otherwise, we expect this purge to have succeeded.
+                    raise
+
+        # Cast safety: cast away the knowledge that this is a TypedDict.
+        # See https://github.com/python/mypy/issues/4976#issuecomment-579883622
+        # for some discussion on why this is necessary. Either way,
+        # `ret` is an opaque dictionary blob as far as the rest of the app cares.
+        return 200, cast(JsonDict, ret)
 
 
 class RoomMembersRestServlet(RestServlet):
@@ -583,6 +596,7 @@ class RoomEventContextServlet(RestServlet):
 
     def __init__(self, hs: "HomeServer"):
         super().__init__()
+        self._hs = hs
         self.clock = hs.get_clock()
         self.room_context_handler = hs.get_room_context_handler()
         self._event_serializer = hs.get_event_client_serializer()
@@ -600,7 +614,9 @@ class RoomEventContextServlet(RestServlet):
         filter_str = parse_string(request, "filter", encoding="utf-8")
         if filter_str:
             filter_json = urlparse.unquote(filter_str)
-            event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
+            event_filter: Optional[Filter] = Filter(
+                self._hs, json_decoder.decode(filter_json)
+            )
         else:
             event_filter = None
 
diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py
index 9770413c61..2b25b9aad6 100644
--- a/synapse/rest/client/receipts.py
+++ b/synapse/rest/client/receipts.py
@@ -13,10 +13,12 @@
 # limitations under the License.
 
 import logging
+import re
 from typing import TYPE_CHECKING, Tuple
 
 from synapse.api.constants import ReadReceiptEventFields
 from synapse.api.errors import Codes, SynapseError
+from synapse.http import get_request_user_agent
 from synapse.http.server import HttpServer
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.http.site import SynapseRequest
@@ -24,6 +26,8 @@ from synapse.types import JsonDict
 
 from ._base import client_patterns
 
+pattern = re.compile(r"(?:Element|SchildiChat)/1\.[012]\.")
+
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
@@ -52,7 +56,13 @@ class ReceiptRestServlet(RestServlet):
         if receipt_type != "m.read":
             raise SynapseError(400, "Receipt type must be 'm.read'")
 
-        body = parse_json_object_from_request(request, allow_empty_body=True)
+        # Do not allow older SchildiChat and Element Android clients (prior to Element/1.[012].x) to send an empty body.
+        user_agent = get_request_user_agent(request)
+        allow_empty_body = False
+        if "Android" in user_agent:
+            if pattern.match(user_agent) or "Riot" in user_agent:
+                allow_empty_body = True
+        body = parse_json_object_from_request(request, allow_empty_body)
         hidden = body.get(ReadReceiptEventFields.MSC2285_HIDDEN, False)
 
         if not isinstance(hidden, bool):
diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py
index 58f6699073..184cfbe196 100644
--- a/synapse/rest/client/relations.py
+++ b/synapse/rest/client/relations.py
@@ -298,7 +298,9 @@ class RelationAggregationPaginationServlet(RestServlet):
             raise SynapseError(404, "Unknown parent event.")
 
         if relation_type not in (RelationTypes.ANNOTATION, None):
-            raise SynapseError(400, "Relation type must be 'annotation'")
+            raise SynapseError(
+                400, f"Relation type must be '{RelationTypes.ANNOTATION}'"
+            )
 
         limit = parse_integer(request, "limit", default=5)
         from_token_str = parse_string(request, "from")
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index ed95189b6d..03a353d53c 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -550,6 +550,7 @@ class RoomMessageListRestServlet(RestServlet):
 
     def __init__(self, hs: "HomeServer"):
         super().__init__()
+        self._hs = hs
         self.pagination_handler = hs.get_pagination_handler()
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
@@ -567,7 +568,9 @@ class RoomMessageListRestServlet(RestServlet):
         filter_str = parse_string(request, "filter", encoding="utf-8")
         if filter_str:
             filter_json = urlparse.unquote(filter_str)
-            event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
+            event_filter: Optional[Filter] = Filter(
+                self._hs, json_decoder.decode(filter_json)
+            )
             if (
                 event_filter
                 and event_filter.filter_json.get("event_format", "client")
@@ -672,6 +675,7 @@ class RoomEventContextServlet(RestServlet):
 
     def __init__(self, hs: "HomeServer"):
         super().__init__()
+        self._hs = hs
         self.clock = hs.get_clock()
         self.room_context_handler = hs.get_room_context_handler()
         self._event_serializer = hs.get_event_client_serializer()
@@ -688,7 +692,9 @@ class RoomEventContextServlet(RestServlet):
         filter_str = parse_string(request, "filter", encoding="utf-8")
         if filter_str:
             filter_json = urlparse.unquote(filter_str)
-            event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json))
+            event_filter: Optional[Filter] = Filter(
+                self._hs, json_decoder.decode(filter_json)
+            )
         else:
             event_filter = None
 
@@ -914,7 +920,7 @@ class RoomTypingRestServlet(RestServlet):
         # If we're not on the typing writer instance we should scream if we get
         # requests.
         self._is_typing_writer = (
-            hs.config.worker.writers.typing == hs.get_instance_name()
+            hs.get_instance_name() in hs.config.worker.writers.typing
         )
 
     async def on_PUT(
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index 982763f2d8..c9509d2ae3 100644
--- a/synapse/rest/client/room_batch.py
+++ b/synapse/rest/client/room_batch.py
@@ -139,20 +139,22 @@ class RoomBatchSendEventRestServlet(RestServlet):
                 errcode=Codes.INVALID_PARAM,
             )
 
+        state_event_ids_at_start = []
         # Create and persist all of the state events that float off on their own
         # before the batch. These will most likely be all of the invite/member
         # state events used to auth the upcoming historical messages.
-        state_event_ids_at_start = (
-            await self.room_batch_handler.persist_state_events_at_start(
-                state_events_at_start=body["state_events_at_start"],
-                room_id=room_id,
-                initial_auth_event_ids=auth_event_ids,
-                app_service_requester=requester,
+        if body["state_events_at_start"]:
+            state_event_ids_at_start = (
+                await self.room_batch_handler.persist_state_events_at_start(
+                    state_events_at_start=body["state_events_at_start"],
+                    room_id=room_id,
+                    initial_auth_event_ids=auth_event_ids,
+                    app_service_requester=requester,
+                )
             )
-        )
-        # Update our ongoing auth event ID list with all of the new state we
-        # just created
-        auth_event_ids.extend(state_event_ids_at_start)
+            # Update our ongoing auth event ID list with all of the new state we
+            # just created
+            auth_event_ids.extend(state_event_ids_at_start)
 
         inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids(
             prev_event_ids_from_query
@@ -205,8 +207,11 @@ class RoomBatchSendEventRestServlet(RestServlet):
 
         # Also connect the historical event chain to the end of the floating
         # state chain, which causes the HS to ask for the state at the start of
-        # the batch later.
-        prev_event_ids = [state_event_ids_at_start[-1]]
+        # the batch later. If there is no state chain to connect to, just make
+        # the insertion event float itself.
+        prev_event_ids = []
+        if len(state_event_ids_at_start):
+            prev_event_ids = [state_event_ids_at_start[-1]]
 
         # Create and persist all of the historical events as well as insertion
         # and batch meta events to make the batch navigable in the DAG.
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 913216a7c4..8c0fdb1940 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -29,7 +29,7 @@ from typing import (
 
 from synapse.api.constants import Membership, PresenceState
 from synapse.api.errors import Codes, StoreError, SynapseError
-from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
+from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.events.utils import (
@@ -150,7 +150,7 @@ class SyncRestServlet(RestServlet):
         request_key = (user, timeout, since, filter_id, full_state, device_id)
 
         if filter_id is None:
-            filter_collection = DEFAULT_FILTER_COLLECTION
+            filter_collection = self.filtering.DEFAULT_FILTER_COLLECTION
         elif filter_id.startswith("{"):
             try:
                 filter_object = json_decoder.decode(filter_id)
@@ -160,7 +160,7 @@ class SyncRestServlet(RestServlet):
             except Exception:
                 raise SynapseError(400, "Invalid filter JSON")
             self.filtering.check_valid_filter(filter_object)
-            filter_collection = FilterCollection(filter_object)
+            filter_collection = FilterCollection(self.hs, filter_object)
         else:
             try:
                 filter_collection = await self.filtering.get_user_filter(
diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py
index edbf5ce5d0..04b035a1b1 100644
--- a/synapse/rest/well_known.py
+++ b/synapse/rest/well_known.py
@@ -34,8 +34,7 @@ class WellKnownBuilder:
         self._config = hs.config
 
     def get_well_known(self) -> Optional[JsonDict]:
-        # if we don't have a public_baseurl, we can't help much here.
-        if self._config.server.public_baseurl is None:
+        if not self._config.server.serve_client_wellknown:
             return None
 
         result = {"m.homeserver": {"base_url": self._config.server.public_baseurl}}
diff --git a/synapse/server.py b/synapse/server.py
index 0fbf36ba99..013a7bacaa 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -463,7 +463,7 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_typing_writer_handler(self) -> TypingWriterHandler:
-        if self.config.worker.writers.typing == self.get_instance_name():
+        if self.get_instance_name() in self.config.worker.writers.typing:
             return TypingWriterHandler(self)
         else:
             raise Exception("Workers cannot write typing")
@@ -474,7 +474,7 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_typing_handler(self) -> FollowerTypingHandler:
-        if self.config.worker.writers.typing == self.get_instance_name():
+        if self.get_instance_name() in self.config.worker.writers.typing:
             # Use get_typing_writer_handler to ensure that we use the same
             # cached version.
             return self.get_typing_writer_handler()
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 82b31d24f1..b9a8ca997e 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -100,29 +100,58 @@ class BackgroundUpdater:
         ] = {}
         self._all_done = False
 
+        # Whether we're currently running updates
+        self._running = False
+
+        # Whether background updates are enabled. This allows us to
+        # enable/disable background updates via the admin API.
+        self.enabled = True
+
+    def get_current_update(self) -> Optional[BackgroundUpdatePerformance]:
+        """Returns the current background update, if any."""
+
+        update_name = self._current_background_update
+        if not update_name:
+            return None
+
+        perf = self._background_update_performance.get(update_name)
+        if not perf:
+            perf = BackgroundUpdatePerformance(update_name)
+
+        return perf
+
     def start_doing_background_updates(self) -> None:
-        run_as_background_process("background_updates", self.run_background_updates)
+        if self.enabled:
+            run_as_background_process("background_updates", self.run_background_updates)
 
     async def run_background_updates(self, sleep: bool = True) -> None:
-        logger.info("Starting background schema updates")
-        while True:
-            if sleep:
-                await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
+        if self._running or not self.enabled:
+            return
 
-            try:
-                result = await self.do_next_background_update(
-                    self.BACKGROUND_UPDATE_DURATION_MS
-                )
-            except Exception:
-                logger.exception("Error doing update")
-            else:
-                if result:
-                    logger.info(
-                        "No more background updates to do."
-                        " Unscheduling background update task."
+        self._running = True
+
+        try:
+            logger.info("Starting background schema updates")
+            while self.enabled:
+                if sleep:
+                    await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
+
+                try:
+                    result = await self.do_next_background_update(
+                        self.BACKGROUND_UPDATE_DURATION_MS
                     )
-                    self._all_done = True
-                    return None
+                except Exception:
+                    logger.exception("Error doing update")
+                else:
+                    if result:
+                        logger.info(
+                            "No more background updates to do."
+                            " Unscheduling background update task."
+                        )
+                        self._all_done = True
+                        return None
+        finally:
+            self._running = False
 
     async def has_completed_background_updates(self) -> bool:
         """Check if all the background updates have completed
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 5c71e27518..d4cab69ebf 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -446,6 +446,10 @@ class DatabasePool:
                 self._check_safe_to_upsert,
             )
 
+    def name(self) -> str:
+        "Return the name of this database"
+        return self._database_config.name
+
     def is_running(self) -> bool:
         """Is the database pool currently running"""
         return self._db_pool.running
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 2da2659f41..baec35ee27 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -412,16 +412,16 @@ class ApplicationServiceTransactionWorkerStore(
         )
 
     async def set_type_stream_id_for_appservice(
-        self, service: ApplicationService, type: str, pos: Optional[int]
+        self, service: ApplicationService, stream_type: str, pos: Optional[int]
     ) -> None:
-        if type not in ("read_receipt", "presence"):
+        if stream_type not in ("read_receipt", "presence"):
             raise ValueError(
                 "Expected type to be a valid application stream id type, got %s"
-                % (type,)
+                % (stream_type,)
             )
 
         def set_type_stream_id_for_appservice_txn(txn):
-            stream_id_type = "%s_stream_id" % type
+            stream_id_type = "%s_stream_id" % stream_type
             txn.execute(
                 "UPDATE application_services_state SET %s = ? WHERE as_id=?"
                 % stream_id_type,
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 264e625bd7..ae3afdd5d2 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -134,7 +134,10 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             limit: The maximum number of messages to retrieve.
 
         Returns:
-            A list of messages for the device and where in the stream the messages got to.
+            A tuple containing:
+                * A list of messages for the device.
+                * The max stream token of these messages. There may be more to retrieve
+                  if the given limit was reached.
         """
         has_changed = self._device_inbox_stream_cache.has_entity_changed(
             user_id, last_stream_id
@@ -153,12 +156,19 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             txn.execute(
                 sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
             )
+
             messages = []
+            stream_pos = current_stream_id
+
             for row in txn:
                 stream_pos = row[0]
                 messages.append(db_to_json(row[1]))
+
+            # If the limit was not reached we know that there's no more data for this
+            # user/device pair up to current_stream_id.
             if len(messages) < limit:
                 stream_pos = current_stream_id
+
             return messages, stream_pos
 
         return await self.db_pool.runInteraction(
@@ -260,13 +270,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 " LIMIT ?"
             )
             txn.execute(sql, (destination, last_stream_id, current_stream_id, limit))
+
             messages = []
+            stream_pos = current_stream_id
+
             for row in txn:
                 stream_pos = row[0]
                 messages.append(db_to_json(row[1]))
+
+            # If the limit was not reached we know that there's no more data for this
+            # user/device pair up to current_stream_id.
             if len(messages) < limit:
                 log_kv({"message": "Set stream position to current position"})
                 stream_pos = current_stream_id
+
             return messages, stream_pos
 
         return await self.db_pool.runInteraction(
@@ -372,8 +389,8 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         """Used to send messages from this server.
 
         Args:
-            local_messages_by_user_and_device:
-                Dictionary of user_id to device_id to message.
+            local_messages_by_user_then_device:
+                Dictionary of recipient user_id to recipient device_id to message.
             remote_messages_by_destination:
                 Dictionary of destination server_name to the EDU JSON to send.
 
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index ae37901be9..c6bf316d5b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -28,6 +28,7 @@ from typing import (
 
 import attr
 from constantly import NamedConstant, Names
+from prometheus_client import Gauge
 from typing_extensions import Literal
 
 from twisted.internet import defer
@@ -81,6 +82,12 @@ EVENT_QUEUE_ITERATIONS = 3  # No. times we block waiting for requests for events
 EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
+event_fetch_ongoing_gauge = Gauge(
+    "synapse_event_fetch_ongoing",
+    "The number of event fetchers that are running",
+)
+
+
 @attr.s(slots=True, auto_attribs=True)
 class _EventCacheEntry:
     event: EventBase
@@ -222,6 +229,7 @@ class EventsWorkerStore(SQLBaseStore):
         self._event_fetch_lock = threading.Condition()
         self._event_fetch_list = []
         self._event_fetch_ongoing = 0
+        event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
 
         # We define this sequence here so that it can be referenced from both
         # the DataStore and PersistEventStore.
@@ -732,28 +740,31 @@ class EventsWorkerStore(SQLBaseStore):
         """Takes a database connection and waits for requests for events from
         the _event_fetch_list queue.
         """
-        i = 0
-        while True:
-            with self._event_fetch_lock:
-                event_list = self._event_fetch_list
-                self._event_fetch_list = []
-
-                if not event_list:
-                    single_threaded = self.database_engine.single_threaded
-                    if (
-                        not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
-                        or single_threaded
-                        or i > EVENT_QUEUE_ITERATIONS
-                    ):
-                        self._event_fetch_ongoing -= 1
-                        return
-                    else:
-                        self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
-                        i += 1
-                        continue
-                i = 0
-
-            self._fetch_event_list(conn, event_list)
+        try:
+            i = 0
+            while True:
+                with self._event_fetch_lock:
+                    event_list = self._event_fetch_list
+                    self._event_fetch_list = []
+
+                    if not event_list:
+                        single_threaded = self.database_engine.single_threaded
+                        if (
+                            not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
+                            or single_threaded
+                            or i > EVENT_QUEUE_ITERATIONS
+                        ):
+                            break
+                        else:
+                            self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+                            i += 1
+                            continue
+                    i = 0
+
+                self._fetch_event_list(conn, event_list)
+        finally:
+            self._event_fetch_ongoing -= 1
+            event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
 
     def _fetch_event_list(
         self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]]
@@ -977,6 +988,7 @@ class EventsWorkerStore(SQLBaseStore):
 
             if self._event_fetch_ongoing < EVENT_QUEUE_THREADS:
                 self._event_fetch_ongoing += 1
+                event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
                 should_start = True
             else:
                 should_start = False
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 3d1dff660b..3d0df0cbd4 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -14,6 +14,7 @@
 import logging
 from types import TracebackType
 from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type
+from weakref import WeakValueDictionary
 
 from twisted.internet.interfaces import IReactorCore
 
@@ -61,7 +62,7 @@ class LockStore(SQLBaseStore):
 
         # A map from `(lock_name, lock_key)` to the token of any locks that we
         # think we currently hold.
-        self._live_tokens: Dict[Tuple[str, str], str] = {}
+        self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary()
 
         # When we shut down we want to remove the locks. Technically this can
         # lead to a race, as we may drop the lock while we are still processing.
@@ -80,10 +81,10 @@ class LockStore(SQLBaseStore):
 
         # We need to take a copy of the tokens dict as dropping the locks will
         # cause the dictionary to change.
-        tokens = dict(self._live_tokens)
+        locks = dict(self._live_tokens)
 
-        for (lock_name, lock_key), token in tokens.items():
-            await self._drop_lock(lock_name, lock_key, token)
+        for lock in locks.values():
+            await lock.release()
 
         logger.info("Dropped locks due to shutdown")
 
@@ -93,6 +94,11 @@ class LockStore(SQLBaseStore):
         used (otherwise the lock will leak).
         """
 
+        # Check if this process has taken out a lock and if it's still valid.
+        lock = self._live_tokens.get((lock_name, lock_key))
+        if lock and await lock.is_still_valid():
+            return None
+
         now = self._clock.time_msec()
         token = random_string(6)
 
@@ -100,7 +106,9 @@ class LockStore(SQLBaseStore):
 
             def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
                 # We take out the lock if either a) there is no row for the lock
-                # already or b) the existing row has timed out.
+                # already, b) the existing row has timed out, or c) the row is
+                # for this instance (which means the process got killed and
+                # restarted)
                 sql = """
                     INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
                     VALUES (?, ?, ?, ?, ?)
@@ -112,6 +120,7 @@ class LockStore(SQLBaseStore):
                             last_renewed_ts = EXCLUDED.last_renewed_ts
                         WHERE
                             worker_locks.last_renewed_ts < ?
+                            OR worker_locks.instance_name = EXCLUDED.instance_name
                 """
                 txn.execute(
                     sql,
@@ -148,11 +157,11 @@ class LockStore(SQLBaseStore):
                     WHERE
                         lock_name = ?
                         AND lock_key = ?
-                        AND last_renewed_ts < ?
+                        AND (last_renewed_ts < ? OR instance_name = ?)
                 """
                 txn.execute(
                     sql,
-                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS),
+                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
                 )
 
                 inserted = self.db_pool.simple_upsert_txn_emulated(
@@ -179,9 +188,7 @@ class LockStore(SQLBaseStore):
             if not did_lock:
                 return None
 
-        self._live_tokens[(lock_name, lock_key)] = token
-
-        return Lock(
+        lock = Lock(
             self._reactor,
             self._clock,
             self,
@@ -190,6 +197,10 @@ class LockStore(SQLBaseStore):
             token=token,
         )
 
+        self._live_tokens[(lock_name, lock_key)] = lock
+
+        return lock
+
     async def _is_lock_still_valid(
         self, lock_name: str, lock_key: str, token: str
     ) -> bool:
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 53576ad52f..907af10995 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -20,7 +20,7 @@ import attr
 from synapse.api.constants import RelationTypes
 from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import LoggingTransaction
+from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
 from synapse.storage.databases.main.stream import generate_pagination_where_clause
 from synapse.storage.relations import (
     AggregationPaginationToken,
@@ -334,6 +334,62 @@ class RelationsWorkerStore(SQLBaseStore):
 
         return count, latest_event
 
+    async def events_have_relations(
+        self,
+        parent_ids: List[str],
+        relation_senders: Optional[List[str]],
+        relation_types: Optional[List[str]],
+    ) -> List[str]:
+        """Check which events have a relationship from the given senders of the
+        given types.
+
+        Args:
+            parent_ids: The events being annotated
+            relation_senders: The relation senders to check.
+            relation_types: The relation types to check.
+
+        Returns:
+            True if the event has at least one relationship from one of the given senders of the given type.
+        """
+        # If no restrictions are given then the event has the required relations.
+        if not relation_senders and not relation_types:
+            return parent_ids
+
+        sql = """
+            SELECT relates_to_id FROM event_relations
+            INNER JOIN events USING (event_id)
+            WHERE
+                %s;
+        """
+
+        def _get_if_event_has_relations(txn) -> List[str]:
+            clauses: List[str] = []
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "relates_to_id", parent_ids
+            )
+            clauses.append(clause)
+
+            if relation_senders:
+                clause, temp_args = make_in_list_sql_clause(
+                    txn.database_engine, "sender", relation_senders
+                )
+                clauses.append(clause)
+                args.extend(temp_args)
+            if relation_types:
+                clause, temp_args = make_in_list_sql_clause(
+                    txn.database_engine, "relation_type", relation_types
+                )
+                clauses.append(clause)
+                args.extend(temp_args)
+
+            txn.execute(sql % " AND ".join(clauses), args)
+
+            return [row[0] for row in txn]
+
+        return await self.db_pool.runInteraction(
+            "get_if_event_has_relations", _get_if_event_has_relations
+        )
+
     async def has_user_annotated_event(
         self, parent_id: str, event_type: str, aggregation_key: str, sender: str
     ) -> bool:
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index cefc77fa0f..17b398bb69 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1751,7 +1751,12 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
         )
 
     async def block_room(self, room_id: str, user_id: str) -> None:
-        """Marks the room as blocked. Can be called multiple times.
+        """Marks the room as blocked.
+
+        Can be called multiple times (though we'll only track the last user to
+        block this room).
+
+        Can be called on a room unknown to this homeserver.
 
         Args:
             room_id: Room to block
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index dc7884b1c0..42dc807d17 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -272,31 +272,37 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
     args = []
 
     if event_filter.types:
-        clauses.append("(%s)" % " OR ".join("type = ?" for _ in event_filter.types))
+        clauses.append(
+            "(%s)" % " OR ".join("event.type = ?" for _ in event_filter.types)
+        )
         args.extend(event_filter.types)
 
     for typ in event_filter.not_types:
-        clauses.append("type != ?")
+        clauses.append("event.type != ?")
         args.append(typ)
 
     if event_filter.senders:
-        clauses.append("(%s)" % " OR ".join("sender = ?" for _ in event_filter.senders))
+        clauses.append(
+            "(%s)" % " OR ".join("event.sender = ?" for _ in event_filter.senders)
+        )
         args.extend(event_filter.senders)
 
     for sender in event_filter.not_senders:
-        clauses.append("sender != ?")
+        clauses.append("event.sender != ?")
         args.append(sender)
 
     if event_filter.rooms:
-        clauses.append("(%s)" % " OR ".join("room_id = ?" for _ in event_filter.rooms))
+        clauses.append(
+            "(%s)" % " OR ".join("event.room_id = ?" for _ in event_filter.rooms)
+        )
         args.extend(event_filter.rooms)
 
     for room_id in event_filter.not_rooms:
-        clauses.append("room_id != ?")
+        clauses.append("event.room_id != ?")
         args.append(room_id)
 
     if event_filter.contains_url:
-        clauses.append("contains_url = ?")
+        clauses.append("event.contains_url = ?")
         args.append(event_filter.contains_url)
 
     # We're only applying the "labels" filter on the database query, because applying the
@@ -307,6 +313,23 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
         clauses.append("(%s)" % " OR ".join("label = ?" for _ in event_filter.labels))
         args.extend(event_filter.labels)
 
+    # Filter on relation_senders / relation types from the joined tables.
+    if event_filter.relation_senders:
+        clauses.append(
+            "(%s)"
+            % " OR ".join(
+                "related_event.sender = ?" for _ in event_filter.relation_senders
+            )
+        )
+        args.extend(event_filter.relation_senders)
+
+    if event_filter.relation_types:
+        clauses.append(
+            "(%s)"
+            % " OR ".join("relation_type = ?" for _ in event_filter.relation_types)
+        )
+        args.extend(event_filter.relation_types)
+
     return " AND ".join(clauses), args
 
 
@@ -1116,7 +1139,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
 
         bounds = generate_pagination_where_clause(
             direction=direction,
-            column_names=("topological_ordering", "stream_ordering"),
+            column_names=("event.topological_ordering", "event.stream_ordering"),
             from_token=from_bound,
             to_token=to_bound,
             engine=self.database_engine,
@@ -1133,32 +1156,51 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
 
         select_keywords = "SELECT"
         join_clause = ""
+        # Using DISTINCT in this SELECT query is quite expensive, because it
+        # requires the engine to sort on the entire (not limited) result set,
+        # i.e. the entire events table. Only use it in scenarios that could result
+        # in the same event ID occurring multiple times in the results.
+        needs_distinct = False
         if event_filter and event_filter.labels:
             # If we're not filtering on a label, then joining on event_labels will
             # return as many row for a single event as the number of labels it has. To
             # avoid this, only join if we're filtering on at least one label.
-            join_clause = """
+            join_clause += """
                 LEFT JOIN event_labels
                 USING (event_id, room_id, topological_ordering)
             """
             if len(event_filter.labels) > 1:
-                # Using DISTINCT in this SELECT query is quite expensive, because it
-                # requires the engine to sort on the entire (not limited) result set,
-                # i.e. the entire events table. We only need to use it when we're
-                # filtering on more than two labels, because that's the only scenario
-                # in which we can possibly to get multiple times the same event ID in
-                # the results.
-                select_keywords += "DISTINCT"
+                # Multiple labels could cause the same event to appear multiple times.
+                needs_distinct = True
+
+        # If there is a filter on relation_senders and relation_types join to the
+        # relations table.
+        if event_filter and (
+            event_filter.relation_senders or event_filter.relation_types
+        ):
+            # Filtering by relations could cause the same event to appear multiple
+            # times (since there's no limit on the number of relations to an event).
+            needs_distinct = True
+            join_clause += """
+                LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id)
+            """
+            if event_filter.relation_senders:
+                join_clause += """
+                    LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id)
+                """
+
+        if needs_distinct:
+            select_keywords += " DISTINCT"
 
         sql = """
             %(select_keywords)s
-                event_id, instance_name,
-                topological_ordering, stream_ordering
-            FROM events
+                event.event_id, event.instance_name,
+                event.topological_ordering, event.stream_ordering
+            FROM events AS event
             %(join_clause)s
-            WHERE outlier = ? AND room_id = ? AND %(bounds)s
-            ORDER BY topological_ordering %(order)s,
-            stream_ordering %(order)s LIMIT ?
+            WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s
+            ORDER BY event.topological_ordering %(order)s,
+            event.stream_ordering %(order)s LIMIT ?
         """ % {
             "select_keywords": select_keywords,
             "join_clause": join_clause,
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 1629d2a53c..8b9c6adae2 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -133,22 +133,23 @@ def prepare_database(
 
             # if it's a worker app, refuse to upgrade the database, to avoid multiple
             # workers doing it at once.
-            if (
-                config.worker.worker_app is not None
-                and version_info.current_version != SCHEMA_VERSION
-            ):
+            if config.worker.worker_app is None:
+                _upgrade_existing_database(
+                    cur,
+                    version_info,
+                    database_engine,
+                    config,
+                    databases=databases,
+                )
+            elif version_info.current_version < SCHEMA_VERSION:
+                # If the DB is on an older version than we expect then we refuse
+                # to start the worker (as the main process needs to run first to
+                # update the schema).
                 raise UpgradeDatabaseException(
                     OUTDATED_SCHEMA_ON_WORKER_ERROR
                     % (SCHEMA_VERSION, version_info.current_version)
                 )
 
-            _upgrade_existing_database(
-                cur,
-                version_info,
-                database_engine,
-                config,
-                databases=databases,
-            )
         else:
             logger.info("%r: Initialising new database", databases)
 
diff --git a/sytest-blacklist b/sytest-blacklist
index 65bf1774e3..57e603a4a6 100644
--- a/sytest-blacklist
+++ b/sytest-blacklist
@@ -32,3 +32,6 @@ We can't peek into rooms with invited history_visibility
 We can't peek into rooms with joined history_visibility
 Local users can peek by room alias
 Peeked rooms only turn up in the sync for the device who peeked them
+
+# Validation needs to be added to Synapse: #10554
+Rejects invalid device keys
diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py
index f44c91a373..b7fc33dc94 100644
--- a/tests/api/test_filtering.py
+++ b/tests/api/test_filtering.py
@@ -15,6 +15,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from unittest.mock import patch
+
 import jsonschema
 
 from synapse.api.constants import EventContentFields
@@ -51,9 +53,8 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             {"presence": {"senders": ["@bar;pik.test.com"]}},
         ]
         for filter in invalid_filters:
-            with self.assertRaises(SynapseError) as check_filter_error:
+            with self.assertRaises(SynapseError):
                 self.filtering.check_valid_filter(filter)
-                self.assertIsInstance(check_filter_error.exception, SynapseError)
 
     def test_valid_filters(self):
         valid_filters = [
@@ -119,12 +120,12 @@ class FilteringTestCase(unittest.HomeserverTestCase):
         definition = {"types": ["m.room.message", "org.matrix.foo.bar"]}
         event = MockEvent(sender="@foo:bar", type="m.room.message", room_id="!foo:bar")
 
-        self.assertTrue(Filter(definition).check(event))
+        self.assertTrue(Filter(self.hs, definition)._check(event))
 
     def test_definition_types_works_with_wildcards(self):
         definition = {"types": ["m.*", "org.matrix.foo.bar"]}
         event = MockEvent(sender="@foo:bar", type="m.room.message", room_id="!foo:bar")
-        self.assertTrue(Filter(definition).check(event))
+        self.assertTrue(Filter(self.hs, definition)._check(event))
 
     def test_definition_types_works_with_unknowns(self):
         definition = {"types": ["m.room.message", "org.matrix.foo.bar"]}
@@ -133,24 +134,24 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             type="now.for.something.completely.different",
             room_id="!foo:bar",
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_not_types_works_with_literals(self):
         definition = {"not_types": ["m.room.message", "org.matrix.foo.bar"]}
         event = MockEvent(sender="@foo:bar", type="m.room.message", room_id="!foo:bar")
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_not_types_works_with_wildcards(self):
         definition = {"not_types": ["m.room.message", "org.matrix.*"]}
         event = MockEvent(
             sender="@foo:bar", type="org.matrix.custom.event", room_id="!foo:bar"
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_not_types_works_with_unknowns(self):
         definition = {"not_types": ["m.*", "org.*"]}
         event = MockEvent(sender="@foo:bar", type="com.nom.nom.nom", room_id="!foo:bar")
-        self.assertTrue(Filter(definition).check(event))
+        self.assertTrue(Filter(self.hs, definition)._check(event))
 
     def test_definition_not_types_takes_priority_over_types(self):
         definition = {
@@ -158,35 +159,35 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             "types": ["m.room.message", "m.room.topic"],
         }
         event = MockEvent(sender="@foo:bar", type="m.room.topic", room_id="!foo:bar")
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_senders_works_with_literals(self):
         definition = {"senders": ["@flibble:wibble"]}
         event = MockEvent(
             sender="@flibble:wibble", type="com.nom.nom.nom", room_id="!foo:bar"
         )
-        self.assertTrue(Filter(definition).check(event))
+        self.assertTrue(Filter(self.hs, definition)._check(event))
 
     def test_definition_senders_works_with_unknowns(self):
         definition = {"senders": ["@flibble:wibble"]}
         event = MockEvent(
             sender="@challenger:appears", type="com.nom.nom.nom", room_id="!foo:bar"
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_not_senders_works_with_literals(self):
         definition = {"not_senders": ["@flibble:wibble"]}
         event = MockEvent(
             sender="@flibble:wibble", type="com.nom.nom.nom", room_id="!foo:bar"
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_not_senders_works_with_unknowns(self):
         definition = {"not_senders": ["@flibble:wibble"]}
         event = MockEvent(
             sender="@challenger:appears", type="com.nom.nom.nom", room_id="!foo:bar"
         )
-        self.assertTrue(Filter(definition).check(event))
+        self.assertTrue(Filter(self.hs, definition)._check(event))
 
     def test_definition_not_senders_takes_priority_over_senders(self):
         definition = {
@@ -196,14 +197,14 @@ class FilteringTestCase(unittest.HomeserverTestCase):
         event = MockEvent(
             sender="@misspiggy:muppets", type="m.room.topic", room_id="!foo:bar"
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_rooms_works_with_literals(self):
         definition = {"rooms": ["!secretbase:unknown"]}
         event = MockEvent(
             sender="@foo:bar", type="m.room.message", room_id="!secretbase:unknown"
         )
-        self.assertTrue(Filter(definition).check(event))
+        self.assertTrue(Filter(self.hs, definition)._check(event))
 
     def test_definition_rooms_works_with_unknowns(self):
         definition = {"rooms": ["!secretbase:unknown"]}
@@ -212,7 +213,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             type="m.room.message",
             room_id="!anothersecretbase:unknown",
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_not_rooms_works_with_literals(self):
         definition = {"not_rooms": ["!anothersecretbase:unknown"]}
@@ -221,7 +222,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             type="m.room.message",
             room_id="!anothersecretbase:unknown",
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_not_rooms_works_with_unknowns(self):
         definition = {"not_rooms": ["!secretbase:unknown"]}
@@ -230,7 +231,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             type="m.room.message",
             room_id="!anothersecretbase:unknown",
         )
-        self.assertTrue(Filter(definition).check(event))
+        self.assertTrue(Filter(self.hs, definition)._check(event))
 
     def test_definition_not_rooms_takes_priority_over_rooms(self):
         definition = {
@@ -240,7 +241,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
         event = MockEvent(
             sender="@foo:bar", type="m.room.message", room_id="!secretbase:unknown"
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_combined_event(self):
         definition = {
@@ -256,7 +257,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             type="m.room.message",  # yup
             room_id="!stage:unknown",  # yup
         )
-        self.assertTrue(Filter(definition).check(event))
+        self.assertTrue(Filter(self.hs, definition)._check(event))
 
     def test_definition_combined_event_bad_sender(self):
         definition = {
@@ -272,7 +273,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             type="m.room.message",  # yup
             room_id="!stage:unknown",  # yup
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_combined_event_bad_room(self):
         definition = {
@@ -288,7 +289,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             type="m.room.message",  # yup
             room_id="!piggyshouse:muppets",  # nope
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_definition_combined_event_bad_type(self):
         definition = {
@@ -304,7 +305,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             type="muppets.misspiggy.kisses",  # nope
             room_id="!stage:unknown",  # yup
         )
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_filter_labels(self):
         definition = {"org.matrix.labels": ["#fun"]}
@@ -315,7 +316,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             content={EventContentFields.LABELS: ["#fun"]},
         )
 
-        self.assertTrue(Filter(definition).check(event))
+        self.assertTrue(Filter(self.hs, definition)._check(event))
 
         event = MockEvent(
             sender="@foo:bar",
@@ -324,7 +325,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             content={EventContentFields.LABELS: ["#notfun"]},
         )
 
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
     def test_filter_not_labels(self):
         definition = {"org.matrix.not_labels": ["#fun"]}
@@ -335,7 +336,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             content={EventContentFields.LABELS: ["#fun"]},
         )
 
-        self.assertFalse(Filter(definition).check(event))
+        self.assertFalse(Filter(self.hs, definition)._check(event))
 
         event = MockEvent(
             sender="@foo:bar",
@@ -344,7 +345,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             content={EventContentFields.LABELS: ["#notfun"]},
         )
 
-        self.assertTrue(Filter(definition).check(event))
+        self.assertTrue(Filter(self.hs, definition)._check(event))
 
     def test_filter_presence_match(self):
         user_filter_json = {"presence": {"types": ["m.*"]}}
@@ -362,7 +363,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             )
         )
 
-        results = user_filter.filter_presence(events=events)
+        results = self.get_success(user_filter.filter_presence(events=events))
         self.assertEquals(events, results)
 
     def test_filter_presence_no_match(self):
@@ -386,7 +387,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             )
         )
 
-        results = user_filter.filter_presence(events=events)
+        results = self.get_success(user_filter.filter_presence(events=events))
         self.assertEquals([], results)
 
     def test_filter_room_state_match(self):
@@ -405,7 +406,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             )
         )
 
-        results = user_filter.filter_room_state(events=events)
+        results = self.get_success(user_filter.filter_room_state(events=events))
         self.assertEquals(events, results)
 
     def test_filter_room_state_no_match(self):
@@ -426,7 +427,7 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             )
         )
 
-        results = user_filter.filter_room_state(events)
+        results = self.get_success(user_filter.filter_room_state(events))
         self.assertEquals([], results)
 
     def test_filter_rooms(self):
@@ -441,10 +442,52 @@ class FilteringTestCase(unittest.HomeserverTestCase):
             "!not_included:example.com",  # Disallowed because not in rooms.
         ]
 
-        filtered_room_ids = list(Filter(definition).filter_rooms(room_ids))
+        filtered_room_ids = list(Filter(self.hs, definition).filter_rooms(room_ids))
 
         self.assertEquals(filtered_room_ids, ["!allowed:example.com"])
 
+    @unittest.override_config({"experimental_features": {"msc3440_enabled": True}})
+    def test_filter_relations(self):
+        events = [
+            # An event without a relation.
+            MockEvent(
+                event_id="$no_relation",
+                sender="@foo:bar",
+                type="org.matrix.custom.event",
+                room_id="!foo:bar",
+            ),
+            # An event with a relation.
+            MockEvent(
+                event_id="$with_relation",
+                sender="@foo:bar",
+                type="org.matrix.custom.event",
+                room_id="!foo:bar",
+            ),
+            # Non-EventBase objects get passed through.
+            {},
+        ]
+
+        # For the following tests we patch the datastore method (intead of injecting
+        # events). This is a bit cheeky, but tests the logic of _check_event_relations.
+
+        # Filter for a particular sender.
+        definition = {
+            "io.element.relation_senders": ["@foo:bar"],
+        }
+
+        async def events_have_relations(*args, **kwargs):
+            return ["$with_relation"]
+
+        with patch.object(
+            self.datastore, "events_have_relations", new=events_have_relations
+        ):
+            filtered_events = list(
+                self.get_success(
+                    Filter(self.hs, definition)._check_event_relations(events)
+                )
+            )
+        self.assertEquals(filtered_events, events[1:])
+
     def test_add_filter(self):
         user_filter_json = {"room": {"state": {"types": ["m.*"]}}}
 
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 43998020b2..d6f14e2dba 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -40,6 +40,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
         hs.get_application_service_scheduler.return_value = self.mock_scheduler
         hs.get_clock.return_value = MockClock()
         self.handler = ApplicationServicesHandler(hs)
+        self.event_source = hs.get_event_sources()
 
     def test_notify_interested_services(self):
         interested_service = self._mkservice(is_interested=True)
@@ -252,6 +253,60 @@ class AppServiceHandlerTestCase(unittest.TestCase):
             },
         )
 
+    def test_notify_interested_services_ephemeral(self):
+        """
+        Test sending ephemeral events to the appservice handler are scheduled
+        to be pushed out to interested appservices, and that the stream ID is
+        updated accordingly.
+        """
+        interested_service = self._mkservice(is_interested=True)
+        services = [interested_service]
+
+        self.mock_store.get_app_services.return_value = services
+        self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
+            579
+        )
+
+        event = Mock(event_id="event_1")
+        self.event_source.sources.receipt.get_new_events_as.return_value = (
+            make_awaitable(([event], None))
+        )
+
+        self.handler.notify_interested_services_ephemeral(
+            "receipt_key", 580, ["@fakerecipient:example.com"]
+        )
+        self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
+            interested_service, [event]
+        )
+        self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with(
+            interested_service,
+            "read_receipt",
+            580,
+        )
+
+    def test_notify_interested_services_ephemeral_out_of_order(self):
+        """
+        Test sending out of order ephemeral events to the appservice handler
+        are ignored.
+        """
+        interested_service = self._mkservice(is_interested=True)
+        services = [interested_service]
+
+        self.mock_store.get_app_services.return_value = services
+        self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
+            580
+        )
+
+        event = Mock(event_id="event_1")
+        self.event_source.sources.receipt.get_new_events_as.return_value = (
+            make_awaitable(([event], None))
+        )
+
+        self.handler.notify_interested_services_ephemeral(
+            "receipt_key", 580, ["@fakerecipient:example.com"]
+        )
+        self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called()
+
     def _mkservice(self, is_interested, protocols=None):
         service = Mock()
         service.is_interested.return_value = make_awaitable(is_interested)
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
index 39e7b1ab25..0c3b86fda9 100644
--- a/tests/handlers/test_e2e_keys.py
+++ b/tests/handlers/test_e2e_keys.py
@@ -17,6 +17,8 @@ from unittest import mock
 
 from signedjson import key as key, sign as sign
 
+from twisted.internet import defer
+
 from synapse.api.constants import RoomEncryptionAlgorithms
 from synapse.api.errors import Codes, SynapseError
 
@@ -630,3 +632,152 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
             ],
             other_master_key["signatures"][local_user]["ed25519:" + usersigning_pubkey],
         )
+
+    def test_query_devices_remote_no_sync(self):
+        """Tests that querying keys for a remote user that we don't share a room
+        with returns the cross signing keys correctly.
+        """
+
+        remote_user_id = "@test:other"
+        local_user_id = "@test:test"
+
+        remote_master_key = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
+        remote_self_signing_key = "QeIiFEjluPBtI7WQdG365QKZcFs9kqmHir6RBD0//nQ"
+
+        self.hs.get_federation_client().query_client_keys = mock.Mock(
+            return_value=defer.succeed(
+                {
+                    "device_keys": {remote_user_id: {}},
+                    "master_keys": {
+                        remote_user_id: {
+                            "user_id": remote_user_id,
+                            "usage": ["master"],
+                            "keys": {"ed25519:" + remote_master_key: remote_master_key},
+                        },
+                    },
+                    "self_signing_keys": {
+                        remote_user_id: {
+                            "user_id": remote_user_id,
+                            "usage": ["self_signing"],
+                            "keys": {
+                                "ed25519:"
+                                + remote_self_signing_key: remote_self_signing_key
+                            },
+                        }
+                    },
+                }
+            )
+        )
+
+        e2e_handler = self.hs.get_e2e_keys_handler()
+
+        query_result = self.get_success(
+            e2e_handler.query_devices(
+                {
+                    "device_keys": {remote_user_id: []},
+                },
+                timeout=10,
+                from_user_id=local_user_id,
+                from_device_id="some_device_id",
+            )
+        )
+
+        self.assertEqual(query_result["failures"], {})
+        self.assertEqual(
+            query_result["master_keys"],
+            {
+                remote_user_id: {
+                    "user_id": remote_user_id,
+                    "usage": ["master"],
+                    "keys": {"ed25519:" + remote_master_key: remote_master_key},
+                },
+            },
+        )
+        self.assertEqual(
+            query_result["self_signing_keys"],
+            {
+                remote_user_id: {
+                    "user_id": remote_user_id,
+                    "usage": ["self_signing"],
+                    "keys": {
+                        "ed25519:" + remote_self_signing_key: remote_self_signing_key
+                    },
+                }
+            },
+        )
+
+    def test_query_devices_remote_sync(self):
+        """Tests that querying keys for a remote user that we share a room with,
+        but haven't yet fetched the keys for, returns the cross signing keys
+        correctly.
+        """
+
+        remote_user_id = "@test:other"
+        local_user_id = "@test:test"
+
+        self.store.get_rooms_for_user = mock.Mock(
+            return_value=defer.succeed({"some_room_id"})
+        )
+
+        remote_master_key = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
+        remote_self_signing_key = "QeIiFEjluPBtI7WQdG365QKZcFs9kqmHir6RBD0//nQ"
+
+        self.hs.get_federation_client().query_user_devices = mock.Mock(
+            return_value=defer.succeed(
+                {
+                    "user_id": remote_user_id,
+                    "stream_id": 1,
+                    "devices": [],
+                    "master_key": {
+                        "user_id": remote_user_id,
+                        "usage": ["master"],
+                        "keys": {"ed25519:" + remote_master_key: remote_master_key},
+                    },
+                    "self_signing_key": {
+                        "user_id": remote_user_id,
+                        "usage": ["self_signing"],
+                        "keys": {
+                            "ed25519:"
+                            + remote_self_signing_key: remote_self_signing_key
+                        },
+                    },
+                }
+            )
+        )
+
+        e2e_handler = self.hs.get_e2e_keys_handler()
+
+        query_result = self.get_success(
+            e2e_handler.query_devices(
+                {
+                    "device_keys": {remote_user_id: []},
+                },
+                timeout=10,
+                from_user_id=local_user_id,
+                from_device_id="some_device_id",
+            )
+        )
+
+        self.assertEqual(query_result["failures"], {})
+        self.assertEqual(
+            query_result["master_keys"],
+            {
+                remote_user_id: {
+                    "user_id": remote_user_id,
+                    "usage": ["master"],
+                    "keys": {"ed25519:" + remote_master_key: remote_master_key},
+                }
+            },
+        )
+        self.assertEqual(
+            query_result["self_signing_keys"],
+            {
+                remote_user_id: {
+                    "user_id": remote_user_id,
+                    "usage": ["self_signing"],
+                    "keys": {
+                        "ed25519:" + remote_self_signing_key: remote_self_signing_key
+                    },
+                }
+            },
+        )
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 339c039914..638186f173 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -13,10 +13,11 @@
 # limitations under the License.
 
 from typing import Optional
+from unittest.mock import Mock
 
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.api.errors import Codes, ResourceLimitError
-from synapse.api.filtering import DEFAULT_FILTER_COLLECTION
+from synapse.api.filtering import Filtering
 from synapse.api.room_versions import RoomVersions
 from synapse.handlers.sync import SyncConfig
 from synapse.rest import admin
@@ -197,7 +198,7 @@ def generate_sync_config(
     _request_key += 1
     return SyncConfig(
         user=UserID.from_string(user_id),
-        filter_collection=DEFAULT_FILTER_COLLECTION,
+        filter_collection=Filtering(Mock()).DEFAULT_FILTER_COLLECTION,
         is_guest=False,
         request_key=("request_key", _request_key),
         device_id=device_id,
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index 525b83141b..d16cd141a7 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -116,7 +116,6 @@ class ModuleApiTestCase(HomeserverTestCase):
 
         # Insert a second ip, agent at a later date. We should be able to retrieve it.
         last_seen_2 = last_seen_1 + 10000
-        print("%s => %s" % (last_seen_1, last_seen_2))
         self.get_success(
             self.store.insert_client_ip(
                 user_id, "access_token", "ip_2", "user_agent_2", "device_2", last_seen_2
diff --git a/tests/push/test_email.py b/tests/push/test_email.py
index fa8018e5a7..90f800e564 100644
--- a/tests/push/test_email.py
+++ b/tests/push/test_email.py
@@ -65,7 +65,7 @@ class EmailPusherTests(HomeserverTestCase):
             "notif_from": "test@example.com",
             "riot_base_url": None,
         }
-        config["public_baseurl"] = "aaa"
+        config["public_baseurl"] = "http://aaa"
         config["start_pushers"] = True
 
         hs = self.setup_test_homeserver(config=config)
diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py
new file mode 100644
index 0000000000..78c48db552
--- /dev/null
+++ b/tests/rest/admin/test_background_updates.py
@@ -0,0 +1,218 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse.rest.admin
+from synapse.rest.client import login
+from synapse.server import HomeServer
+
+from tests import unittest
+
+
+class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+    ]
+
+    def prepare(self, reactor, clock, hs: HomeServer):
+        self.store = hs.get_datastore()
+        self.admin_user = self.register_user("admin", "pass", admin=True)
+        self.admin_user_tok = self.login("admin", "pass")
+
+    def _register_bg_update(self):
+        "Adds a bg update but doesn't start it"
+
+        async def _fake_update(progress, batch_size) -> int:
+            await self.clock.sleep(0.2)
+            return batch_size
+
+        self.store.db_pool.updates.register_background_update_handler(
+            "test_update",
+            _fake_update,
+        )
+
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                table="background_updates",
+                values={
+                    "update_name": "test_update",
+                    "progress_json": "{}",
+                },
+            )
+        )
+
+    def test_status_empty(self):
+        """Test the status API works."""
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/status",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # Background updates should be enabled, but none should be running.
+        self.assertDictEqual(
+            channel.json_body, {"current_updates": {}, "enabled": True}
+        )
+
+    def test_status_bg_update(self):
+        """Test the status API works with a background update."""
+
+        # Create a new background update
+
+        self._register_bg_update()
+
+        self.store.db_pool.updates.start_doing_background_updates()
+        self.reactor.pump([1.0, 1.0])
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/status",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # Background updates should be enabled, and one should be running.
+        self.assertDictEqual(
+            channel.json_body,
+            {
+                "current_updates": {
+                    "master": {
+                        "name": "test_update",
+                        "average_items_per_ms": 0.1,
+                        "total_duration_ms": 1000.0,
+                        "total_item_count": 100,
+                    }
+                },
+                "enabled": True,
+            },
+        )
+
+    def test_enabled(self):
+        """Test the enabled API works."""
+
+        # Create a new background update
+
+        self._register_bg_update()
+        self.store.db_pool.updates.start_doing_background_updates()
+
+        # Test that GET works and returns enabled is True.
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/enabled",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertDictEqual(channel.json_body, {"enabled": True})
+
+        # Disable the BG updates
+        channel = self.make_request(
+            "POST",
+            "/_synapse/admin/v1/background_updates/enabled",
+            content={"enabled": False},
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertDictEqual(channel.json_body, {"enabled": False})
+
+        # Advance a bit and get the current status, note this will finish the in
+        # flight background update so we call it the status API twice and check
+        # there was no change.
+        self.reactor.pump([1.0, 1.0])
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/status",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertDictEqual(
+            channel.json_body,
+            {
+                "current_updates": {
+                    "master": {
+                        "name": "test_update",
+                        "average_items_per_ms": 0.1,
+                        "total_duration_ms": 1000.0,
+                        "total_item_count": 100,
+                    }
+                },
+                "enabled": False,
+            },
+        )
+
+        # Run the reactor for a bit so the BG updates would have a chance to run
+        # if they were to.
+        self.reactor.pump([1.0, 1.0])
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/status",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # There should be no change from the previous /status response.
+        self.assertDictEqual(
+            channel.json_body,
+            {
+                "current_updates": {
+                    "master": {
+                        "name": "test_update",
+                        "average_items_per_ms": 0.1,
+                        "total_duration_ms": 1000.0,
+                        "total_item_count": 100,
+                    }
+                },
+                "enabled": False,
+            },
+        )
+
+        # Re-enable the background updates.
+
+        channel = self.make_request(
+            "POST",
+            "/_synapse/admin/v1/background_updates/enabled",
+            content={"enabled": True},
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        self.assertDictEqual(channel.json_body, {"enabled": True})
+
+        self.reactor.pump([1.0, 1.0])
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/background_updates/status",
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # Background updates should be enabled and making progress.
+        self.assertDictEqual(
+            channel.json_body,
+            {
+                "current_updates": {
+                    "master": {
+                        "name": "test_update",
+                        "average_items_per_ms": 0.1,
+                        "total_duration_ms": 2000.0,
+                        "total_item_count": 200,
+                    }
+                },
+                "enabled": True,
+            },
+        )
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index 46116644ce..11ec54c82e 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -14,9 +14,12 @@
 
 import json
 import urllib.parse
+from http import HTTPStatus
 from typing import List, Optional
 from unittest.mock import Mock
 
+from parameterized import parameterized
+
 import synapse.rest.admin
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import Codes
@@ -281,6 +284,31 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         self._is_blocked(self.room_id, expect=True)
         self._has_no_members(self.room_id)
 
+    @parameterized.expand([(True,), (False,)])
+    def test_block_unknown_room(self, purge: bool) -> None:
+        """
+        We can block an unknown room. In this case, the `purge` argument
+        should be ignored.
+        """
+        room_id = "!unknown:test"
+
+        # The room isn't already in the blocked rooms table
+        self._is_blocked(room_id, expect=False)
+
+        # Request the room be blocked.
+        channel = self.make_request(
+            "DELETE",
+            f"/_synapse/admin/v1/rooms/{room_id}",
+            {"block": True, "purge": purge},
+            access_token=self.admin_user_tok,
+        )
+
+        # The room is now blocked.
+        self.assertEqual(
+            HTTPStatus.OK, int(channel.result["code"]), msg=channel.result["body"]
+        )
+        self._is_blocked(room_id)
+
     def test_shutdown_room_consent(self):
         """Test that we can shutdown rooms with local users who have not
         yet accepted the privacy policy. This used to fail when we tried to
diff --git a/tests/rest/client/test_consent.py b/tests/rest/client/test_consent.py
index 84d092ca82..fcdc565814 100644
--- a/tests/rest/client/test_consent.py
+++ b/tests/rest/client/test_consent.py
@@ -35,7 +35,6 @@ class ConsentResourceTestCase(unittest.HomeserverTestCase):
     def make_homeserver(self, reactor, clock):
 
         config = self.default_config()
-        config["public_baseurl"] = "aaaa"
         config["form_secret"] = "123abc"
 
         # Make some temporary templates...
diff --git a/tests/rest/client/test_register.py b/tests/rest/client/test_register.py
index 66dcfc9f88..6e7c0f11df 100644
--- a/tests/rest/client/test_register.py
+++ b/tests/rest/client/test_register.py
@@ -891,7 +891,6 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase):
             "smtp_pass": None,
             "notif_from": "test@example.com",
         }
-        config["public_baseurl"] = "aaa"
 
         self.hs = self.setup_test_homeserver(config=config)
 
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index 376853fd65..10a4a4dc5e 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -25,7 +25,12 @@ from urllib import parse as urlparse
 from twisted.internet import defer
 
 import synapse.rest.admin
-from synapse.api.constants import EventContentFields, EventTypes, Membership
+from synapse.api.constants import (
+    EventContentFields,
+    EventTypes,
+    Membership,
+    RelationTypes,
+)
 from synapse.api.errors import Codes, HttpResponseException
 from synapse.handlers.pagination import PurgeStatus
 from synapse.rest import admin
@@ -2157,6 +2162,153 @@ class LabelsTestCase(unittest.HomeserverTestCase):
         return event_id
 
 
+class RelationsTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets_for_client_rest_resource,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def default_config(self):
+        config = super().default_config()
+        config["experimental_features"] = {"msc3440_enabled": True}
+        return config
+
+    def prepare(self, reactor, clock, homeserver):
+        self.user_id = self.register_user("test", "test")
+        self.tok = self.login("test", "test")
+        self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok)
+
+        self.second_user_id = self.register_user("second", "test")
+        self.second_tok = self.login("second", "test")
+        self.helper.join(
+            room=self.room_id, user=self.second_user_id, tok=self.second_tok
+        )
+
+        self.third_user_id = self.register_user("third", "test")
+        self.third_tok = self.login("third", "test")
+        self.helper.join(room=self.room_id, user=self.third_user_id, tok=self.third_tok)
+
+        # An initial event with a relation from second user.
+        res = self.helper.send_event(
+            room_id=self.room_id,
+            type=EventTypes.Message,
+            content={"msgtype": "m.text", "body": "Message 1"},
+            tok=self.tok,
+        )
+        self.event_id_1 = res["event_id"]
+        self.helper.send_event(
+            room_id=self.room_id,
+            type="m.reaction",
+            content={
+                "m.relates_to": {
+                    "rel_type": RelationTypes.ANNOTATION,
+                    "event_id": self.event_id_1,
+                    "key": "👍",
+                }
+            },
+            tok=self.second_tok,
+        )
+
+        # Another event with a relation from third user.
+        res = self.helper.send_event(
+            room_id=self.room_id,
+            type=EventTypes.Message,
+            content={"msgtype": "m.text", "body": "Message 2"},
+            tok=self.tok,
+        )
+        self.event_id_2 = res["event_id"]
+        self.helper.send_event(
+            room_id=self.room_id,
+            type="m.reaction",
+            content={
+                "m.relates_to": {
+                    "rel_type": RelationTypes.REFERENCE,
+                    "event_id": self.event_id_2,
+                }
+            },
+            tok=self.third_tok,
+        )
+
+        # An event with no relations.
+        self.helper.send_event(
+            room_id=self.room_id,
+            type=EventTypes.Message,
+            content={"msgtype": "m.text", "body": "No relations"},
+            tok=self.tok,
+        )
+
+    def _filter_messages(self, filter: JsonDict) -> List[JsonDict]:
+        """Make a request to /messages with a filter, returns the chunk of events."""
+        channel = self.make_request(
+            "GET",
+            "/rooms/%s/messages?filter=%s&dir=b" % (self.room_id, json.dumps(filter)),
+            access_token=self.tok,
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+
+        return channel.json_body["chunk"]
+
+    def test_filter_relation_senders(self):
+        # Messages which second user reacted to.
+        filter = {"io.element.relation_senders": [self.second_user_id]}
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0]["event_id"], self.event_id_1)
+
+        # Messages which third user reacted to.
+        filter = {"io.element.relation_senders": [self.third_user_id]}
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0]["event_id"], self.event_id_2)
+
+        # Messages which either user reacted to.
+        filter = {
+            "io.element.relation_senders": [self.second_user_id, self.third_user_id]
+        }
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 2, chunk)
+        self.assertCountEqual(
+            [c["event_id"] for c in chunk], [self.event_id_1, self.event_id_2]
+        )
+
+    def test_filter_relation_type(self):
+        # Messages which have annotations.
+        filter = {"io.element.relation_types": [RelationTypes.ANNOTATION]}
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0]["event_id"], self.event_id_1)
+
+        # Messages which have references.
+        filter = {"io.element.relation_types": [RelationTypes.REFERENCE]}
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0]["event_id"], self.event_id_2)
+
+        # Messages which have either annotations or references.
+        filter = {
+            "io.element.relation_types": [
+                RelationTypes.ANNOTATION,
+                RelationTypes.REFERENCE,
+            ]
+        }
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 2, chunk)
+        self.assertCountEqual(
+            [c["event_id"] for c in chunk], [self.event_id_1, self.event_id_2]
+        )
+
+    def test_filter_relation_senders_and_type(self):
+        # Messages which second user reacted to.
+        filter = {
+            "io.element.relation_senders": [self.second_user_id],
+            "io.element.relation_types": [RelationTypes.ANNOTATION],
+        }
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0]["event_id"], self.event_id_1)
+
+
 class ContextTestCase(unittest.HomeserverTestCase):
 
     servlets = [
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 95be369d4b..c427686376 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -14,6 +14,8 @@
 # limitations under the License.
 import json
 
+from parameterized import parameterized
+
 import synapse.rest.admin
 from synapse.api.constants import (
     EventContentFields,
@@ -417,7 +419,30 @@ class ReadReceiptsTestCase(unittest.HomeserverTestCase):
         # Test that the first user can't see the other user's hidden read receipt
         self.assertEqual(self._get_read_receipt(), None)
 
-    def test_read_receipt_with_empty_body(self):
+    @parameterized.expand(
+        [
+            # Old Element version, expected to send an empty body
+            (
+                "agent1",
+                "Element/1.2.2 (Linux; U; Android 9; MatrixAndroidSDK_X 0.0.1)",
+                200,
+            ),
+            # Old SchildiChat version, expected to send an empty body
+            ("agent2", "SchildiChat/1.2.1 (Android 10)", 200),
+            # Expected 400: Denies empty body starting at version 1.3+
+            ("agent3", "Element/1.3.6 (Android 10)", 400),
+            ("agent4", "SchildiChat/1.3.6 (Android 11)", 400),
+            # Contains "Riot": Receipts with empty bodies expected
+            ("agent5", "Element (Riot.im) (Android 9)", 200),
+            # Expected 400: Does not contain "Android"
+            ("agent6", "Element/1.2.1", 400),
+            # Expected 400: Different format, missing "/" after Element; existing build that should allow empty bodies, but minimal ongoing usage
+            ("agent7", "Element dbg/1.1.8-dev (Android)", 400),
+        ]
+    )
+    def test_read_receipt_with_empty_body(
+        self, name, user_agent: str, expected_status_code: int
+    ):
         # Send a message as the first user
         res = self.helper.send(self.room_id, body="hello", tok=self.tok)
 
@@ -426,8 +451,9 @@ class ReadReceiptsTestCase(unittest.HomeserverTestCase):
             "POST",
             "/rooms/%s/receipt/m.read/%s" % (self.room_id, res["event_id"]),
             access_token=self.tok2,
+            custom_headers=[("User-Agent", user_agent)],
         )
-        self.assertEqual(channel.code, 200)
+        self.assertEqual(channel.code, expected_status_code)
 
     def _get_read_receipt(self):
         """Syncs and returns the read receipt."""
diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py
new file mode 100644
index 0000000000..a6be9a1bb1
--- /dev/null
+++ b/tests/storage/test_rollback_worker.py
@@ -0,0 +1,69 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.app.generic_worker import GenericWorkerServer
+from synapse.storage.database import LoggingDatabaseConnection
+from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database
+from synapse.storage.schema import SCHEMA_VERSION
+
+from tests.unittest import HomeserverTestCase
+
+
+class WorkerSchemaTests(HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
+        hs = self.setup_test_homeserver(
+            federation_http_client=None, homeserver_to_use=GenericWorkerServer
+        )
+        return hs
+
+    def default_config(self):
+        conf = super().default_config()
+
+        # Mark this as a worker app.
+        conf["worker_app"] = "yes"
+
+        return conf
+
+    def test_rolling_back(self):
+        """Test that workers can start if the DB is a newer schema version"""
+
+        db_pool = self.hs.get_datastore().db_pool
+        db_conn = LoggingDatabaseConnection(
+            db_pool._db_pool.connect(),
+            db_pool.engine,
+            "tests",
+        )
+
+        cur = db_conn.cursor()
+        cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION + 1,))
+
+        db_conn.commit()
+
+        prepare_database(db_conn, db_pool.engine, self.hs.config)
+
+    def test_not_upgraded(self):
+        """Test that workers don't start if the DB has an older schema version"""
+        db_pool = self.hs.get_datastore().db_pool
+        db_conn = LoggingDatabaseConnection(
+            db_pool._db_pool.connect(),
+            db_pool.engine,
+            "tests",
+        )
+
+        cur = db_conn.cursor()
+        cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION - 1,))
+
+        db_conn.commit()
+
+        with self.assertRaises(PrepareDatabaseException):
+            prepare_database(db_conn, db_pool.engine, self.hs.config)
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
new file mode 100644
index 0000000000..ce782c7e1d
--- /dev/null
+++ b/tests/storage/test_stream.py
@@ -0,0 +1,207 @@
+#  Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from typing import List
+
+from synapse.api.constants import EventTypes, RelationTypes
+from synapse.api.filtering import Filter
+from synapse.events import EventBase
+from synapse.rest import admin
+from synapse.rest.client import login, room
+from synapse.types import JsonDict
+
+from tests.unittest import HomeserverTestCase
+
+
+class PaginationTestCase(HomeserverTestCase):
+    """
+    Test the pre-filtering done in the pagination code.
+
+    This is similar to some of the tests in tests.rest.client.test_rooms but here
+    we ensure that the filtering done in the database is applied successfully.
+    """
+
+    servlets = [
+        admin.register_servlets_for_client_rest_resource,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def default_config(self):
+        config = super().default_config()
+        config["experimental_features"] = {"msc3440_enabled": True}
+        return config
+
+    def prepare(self, reactor, clock, homeserver):
+        self.user_id = self.register_user("test", "test")
+        self.tok = self.login("test", "test")
+        self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok)
+
+        self.second_user_id = self.register_user("second", "test")
+        self.second_tok = self.login("second", "test")
+        self.helper.join(
+            room=self.room_id, user=self.second_user_id, tok=self.second_tok
+        )
+
+        self.third_user_id = self.register_user("third", "test")
+        self.third_tok = self.login("third", "test")
+        self.helper.join(room=self.room_id, user=self.third_user_id, tok=self.third_tok)
+
+        # An initial event with a relation from second user.
+        res = self.helper.send_event(
+            room_id=self.room_id,
+            type=EventTypes.Message,
+            content={"msgtype": "m.text", "body": "Message 1"},
+            tok=self.tok,
+        )
+        self.event_id_1 = res["event_id"]
+        self.helper.send_event(
+            room_id=self.room_id,
+            type="m.reaction",
+            content={
+                "m.relates_to": {
+                    "rel_type": RelationTypes.ANNOTATION,
+                    "event_id": self.event_id_1,
+                    "key": "👍",
+                }
+            },
+            tok=self.second_tok,
+        )
+
+        # Another event with a relation from third user.
+        res = self.helper.send_event(
+            room_id=self.room_id,
+            type=EventTypes.Message,
+            content={"msgtype": "m.text", "body": "Message 2"},
+            tok=self.tok,
+        )
+        self.event_id_2 = res["event_id"]
+        self.helper.send_event(
+            room_id=self.room_id,
+            type="m.reaction",
+            content={
+                "m.relates_to": {
+                    "rel_type": RelationTypes.REFERENCE,
+                    "event_id": self.event_id_2,
+                }
+            },
+            tok=self.third_tok,
+        )
+
+        # An event with no relations.
+        self.helper.send_event(
+            room_id=self.room_id,
+            type=EventTypes.Message,
+            content={"msgtype": "m.text", "body": "No relations"},
+            tok=self.tok,
+        )
+
+    def _filter_messages(self, filter: JsonDict) -> List[EventBase]:
+        """Make a request to /messages with a filter, returns the chunk of events."""
+
+        from_token = self.get_success(
+            self.hs.get_event_sources().get_current_token_for_pagination()
+        )
+
+        events, next_key = self.get_success(
+            self.hs.get_datastore().paginate_room_events(
+                room_id=self.room_id,
+                from_key=from_token.room_key,
+                to_key=None,
+                direction="b",
+                limit=10,
+                event_filter=Filter(self.hs, filter),
+            )
+        )
+
+        return events
+
+    def test_filter_relation_senders(self):
+        # Messages which second user reacted to.
+        filter = {"io.element.relation_senders": [self.second_user_id]}
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0].event_id, self.event_id_1)
+
+        # Messages which third user reacted to.
+        filter = {"io.element.relation_senders": [self.third_user_id]}
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0].event_id, self.event_id_2)
+
+        # Messages which either user reacted to.
+        filter = {
+            "io.element.relation_senders": [self.second_user_id, self.third_user_id]
+        }
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 2, chunk)
+        self.assertCountEqual(
+            [c.event_id for c in chunk], [self.event_id_1, self.event_id_2]
+        )
+
+    def test_filter_relation_type(self):
+        # Messages which have annotations.
+        filter = {"io.element.relation_types": [RelationTypes.ANNOTATION]}
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0].event_id, self.event_id_1)
+
+        # Messages which have references.
+        filter = {"io.element.relation_types": [RelationTypes.REFERENCE]}
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0].event_id, self.event_id_2)
+
+        # Messages which have either annotations or references.
+        filter = {
+            "io.element.relation_types": [
+                RelationTypes.ANNOTATION,
+                RelationTypes.REFERENCE,
+            ]
+        }
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 2, chunk)
+        self.assertCountEqual(
+            [c.event_id for c in chunk], [self.event_id_1, self.event_id_2]
+        )
+
+    def test_filter_relation_senders_and_type(self):
+        # Messages which second user reacted to.
+        filter = {
+            "io.element.relation_senders": [self.second_user_id],
+            "io.element.relation_types": [RelationTypes.ANNOTATION],
+        }
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0].event_id, self.event_id_1)
+
+    def test_duplicate_relation(self):
+        """An event should only be returned once if there are multiple relations to it."""
+        self.helper.send_event(
+            room_id=self.room_id,
+            type="m.reaction",
+            content={
+                "m.relates_to": {
+                    "rel_type": RelationTypes.ANNOTATION,
+                    "event_id": self.event_id_1,
+                    "key": "A",
+                }
+            },
+            tok=self.second_tok,
+        )
+
+        filter = {"io.element.relation_senders": [self.second_user_id]}
+        chunk = self._filter_messages(filter)
+        self.assertEqual(len(chunk), 1, chunk)
+        self.assertEqual(chunk[0].event_id, self.event_id_1)
diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py
index 39947a166b..ced3efd93f 100644
--- a/tests/util/caches/test_descriptors.py
+++ b/tests/util/caches/test_descriptors.py
@@ -17,6 +17,7 @@ from typing import Set
 from unittest import mock
 
 from twisted.internet import defer, reactor
+from twisted.internet.defer import Deferred
 
 from synapse.api.errors import SynapseError
 from synapse.logging.context import (
@@ -703,6 +704,48 @@ class CachedListDescriptorTestCase(unittest.TestCase):
             obj.mock.assert_called_once_with((40,), 2)
             self.assertEqual(r, {10: "fish", 40: "gravy"})
 
+    def test_concurrent_lookups(self):
+        """All concurrent lookups should get the same result"""
+
+        class Cls:
+            def __init__(self):
+                self.mock = mock.Mock()
+
+            @descriptors.cached()
+            def fn(self, arg1):
+                pass
+
+            @descriptors.cachedList("fn", "args1")
+            def list_fn(self, args1) -> "Deferred[dict]":
+                return self.mock(args1)
+
+        obj = Cls()
+        deferred_result = Deferred()
+        obj.mock.return_value = deferred_result
+
+        # start off several concurrent lookups of the same key
+        d1 = obj.list_fn([10])
+        d2 = obj.list_fn([10])
+        d3 = obj.list_fn([10])
+
+        # the mock should have been called exactly once
+        obj.mock.assert_called_once_with((10,))
+        obj.mock.reset_mock()
+
+        # ... and none of the calls should yet be complete
+        self.assertFalse(d1.called)
+        self.assertFalse(d2.called)
+        self.assertFalse(d3.called)
+
+        # complete the lookup. @cachedList functions need to complete with a map
+        # of input->result
+        deferred_result.callback({10: "peas"})
+
+        # ... which should give the right result to all the callers
+        self.assertEqual(self.successResultOf(d1), {10: "peas"})
+        self.assertEqual(self.successResultOf(d2), {10: "peas"})
+        self.assertEqual(self.successResultOf(d3), {10: "peas"})
+
     @defer.inlineCallbacks
     def test_invalidate(self):
         """Make sure that invalidation callbacks are called."""