summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-11-08 11:34:03 +0000
committerErik Johnston <erik@matrix.org>2021-11-08 11:34:03 +0000
commit58864265d15809e824bb6cb02e4c3a9f533e4aae (patch)
tree7578438369ce85ce6c189acee879578ad77ec007
parentAdd spans for sync (diff)
parentBlacklist new sytest validation test (#11270) (diff)
downloadsynapse-58864265d15809e824bb6cb02e4c3a9f533e4aae.tar.xz
Merge remote-tracking branch 'origin/develop' into erikj/slow_sync_diag
-rw-r--r--.github/PULL_REQUEST_TEMPLATE.md9
-rw-r--r--CHANGES.md13
-rw-r--r--changelog.d/11033.bugfix1
-rw-r--r--changelog.d/11098.misc1
-rw-r--r--changelog.d/11099.feature1
-rw-r--r--changelog.d/11137.misc1
-rw-r--r--changelog.d/11188.bugfix1
-rw-r--r--changelog.d/11199.bugfix1
-rw-r--r--changelog.d/11200.bugfix1
-rw-r--r--changelog.d/11207.bugfix1
-rw-r--r--changelog.d/11211.feature1
-rw-r--r--changelog.d/11213.removal1
-rw-r--r--changelog.d/11217.bugfix1
-rw-r--r--changelog.d/11225.misc1
-rw-r--r--changelog.d/11226.misc1
-rw-r--r--changelog.d/11229.misc1
-rw-r--r--changelog.d/11231.misc1
-rw-r--r--changelog.d/11233.misc1
-rw-r--r--changelog.d/11237.misc1
-rw-r--r--changelog.d/11239.misc1
-rw-r--r--changelog.d/11240.bugfix1
-rw-r--r--changelog.d/11246.misc1
-rw-r--r--changelog.d/11253.misc1
-rw-r--r--changelog.d/11255.bugfix1
-rw-r--r--changelog.d/11257.doc1
-rw-r--r--changelog.d/11262.bugfix1
-rw-r--r--changelog.d/11269.misc1
-rw-r--r--changelog.d/11270.misc1
-rwxr-xr-xdebian/build_virtualenv1
-rw-r--r--debian/changelog14
-rw-r--r--debian/control2
-rw-r--r--debian/matrix-synapse.service2
-rwxr-xr-xdebian/rules6
-rw-r--r--debian/test/.gitignore2
-rw-r--r--debian/test/provision.sh24
-rw-r--r--debian/test/stretch/Vagrantfile13
-rw-r--r--debian/test/xenial/Vagrantfile10
-rw-r--r--docs/admin_api/rooms.md23
-rw-r--r--docs/delegate.md82
-rw-r--r--docs/modules/password_auth_provider_callbacks.md2
-rw-r--r--docs/modules/third_party_rules_callbacks.md8
-rw-r--r--docs/openid.md38
-rw-r--r--docs/sample_config.yaml18
-rw-r--r--docs/systemd-with-workers/system/matrix-synapse-worker@.service2
-rw-r--r--docs/systemd-with-workers/system/matrix-synapse.service2
-rw-r--r--docs/upgrade.md10
-rw-r--r--mypy.ini8
-rwxr-xr-xscripts/synapse_port_db2
-rwxr-xr-xsetup.py3
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/errors.py7
-rw-r--r--synapse/app/generic_worker.py3
-rw-r--r--synapse/app/homeserver.py4
-rw-r--r--synapse/config/server.py19
-rw-r--r--synapse/config/workers.py18
-rw-r--r--synapse/events/__init__.py227
-rw-r--r--synapse/events/third_party_rules.py9
-rw-r--r--synapse/events/validator.py2
-rw-r--r--synapse/federation/federation_server.py9
-rw-r--r--synapse/federation/transport/client.py3
-rw-r--r--synapse/handlers/appservice.py91
-rw-r--r--synapse/handlers/auth.py4
-rw-r--r--synapse/handlers/federation_event.py2
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/room_batch.py2
-rw-r--r--synapse/handlers/room_member.py4
-rw-r--r--synapse/handlers/typing.py6
-rw-r--r--synapse/notifier.py38
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py4
-rw-r--r--synapse/push/push_rule_evaluator.py10
-rw-r--r--synapse/replication/tcp/handler.py2
-rw-r--r--synapse/replication/tcp/streams/_base.py3
-rw-r--r--synapse/rest/admin/__init__.py2
-rw-r--r--synapse/rest/admin/rooms.py141
-rw-r--r--synapse/rest/client/room.py2
-rw-r--r--synapse/rest/client/room_batch.py31
-rw-r--r--synapse/rest/media/v1/media_repository.py12
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py9
-rw-r--r--synapse/rest/media/v1/upload_resource.py2
-rw-r--r--synapse/rest/well_known.py47
-rw-r--r--synapse/server.py4
-rw-r--r--synapse/state/__init__.py2
-rw-r--r--synapse/storage/databases/main/deviceinbox.py89
-rw-r--r--synapse/storage/databases/main/devices.py4
-rw-r--r--synapse/storage/databases/main/events.py7
-rw-r--r--synapse/storage/databases/main/events_worker.py56
-rw-r--r--synapse/storage/databases/main/group_server.py17
-rw-r--r--synapse/storage/databases/main/lock.py31
-rw-r--r--synapse/storage/databases/main/presence.py2
-rw-r--r--synapse/storage/databases/main/room.py29
-rw-r--r--synapse/storage/databases/main/roommember.py8
-rw-r--r--synapse/storage/prepare_database.py23
-rw-r--r--synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql22
-rw-r--r--synapse/storage/schema/main/delta/65/04_local_group_updates.sql18
-rw-r--r--synapse/util/async_helpers.py34
-rw-r--r--sytest-blacklist3
-rw-r--r--tests/handlers/test_appservice.py51
-rw-r--r--tests/module_api/test_api.py1
-rw-r--r--tests/rest/admin/test_room.py131
-rw-r--r--tests/rest/client/test_third_party_rules.py16
-rw-r--r--tests/rest/media/v1/test_media_storage.py18
-rw-r--r--tests/rest/test_well_known.py32
-rw-r--r--tests/storage/databases/main/test_deviceinbox.py74
-rw-r--r--tests/storage/test_rollback_worker.py69
-rw-r--r--tests/test_preview.py15
-rw-r--r--tests/util/caches/test_deferred_cache.py4
-rw-r--r--tests/util/caches/test_descriptors.py43
-rw-r--r--tests/util/test_async_helpers.py (renamed from tests/util/test_async_utils.py)69
109 files changed, 1352 insertions, 571 deletions
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
index fc22d89426..6c3a998499 100644
--- a/.github/PULL_REQUEST_TEMPLATE.md
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -1,12 +1,13 @@
 ### Pull Request Checklist
 
-<!-- Please read CONTRIBUTING.md before submitting your pull request -->
+<!-- Please read https://matrix-org.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request -->
 
 * [ ] Pull request is based on the develop branch
-* [ ] Pull request includes a [changelog file](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#changelog). The entry should:
+* [ ] Pull request includes a [changelog file](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should:
   - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
   - Use markdown where necessary, mostly for `code blocks`.
   - End with either a period (.) or an exclamation mark (!).
   - Start with a capital letter.
-* [ ] Pull request includes a [sign off](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#sign-off)
-* [ ] Code style is correct (run the [linters](https://github.com/matrix-org/synapse/blob/master/CONTRIBUTING.md#code-style))
+* [ ] Pull request includes a [sign off](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#sign-off)
+* [ ] [Code style](https://matrix-org.github.io/synapse/latest/code_style.html) is correct
+  (run the [linters](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))
diff --git a/CHANGES.md b/CHANGES.md
index f61d5c706f..e74544f489 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,8 +1,17 @@
-Synapse 1.46.0rc1 (2021-10-27)
-==============================
+Synapse 1.46.0 (2021-11-02)
+===========================
 
 The cause of the [performance regression affecting Synapse 1.44](https://github.com/matrix-org/synapse/issues/11049) has been identified and fixed. ([\#11177](https://github.com/matrix-org/synapse/issues/11177))
 
+Bugfixes
+--------
+
+- Fix a bug introduced in v1.46.0rc1 where URL previews of some XML documents would fail. ([\#11196](https://github.com/matrix-org/synapse/issues/11196))
+
+
+Synapse 1.46.0rc1 (2021-10-27)
+==============================
+
 Features
 --------
 
diff --git a/changelog.d/11033.bugfix b/changelog.d/11033.bugfix
new file mode 100644
index 0000000000..fa99f187b8
--- /dev/null
+++ b/changelog.d/11033.bugfix
@@ -0,0 +1 @@
+Do not accept events if a third-party rule module API callback raises an exception.
diff --git a/changelog.d/11098.misc b/changelog.d/11098.misc
new file mode 100644
index 0000000000..1e337bee54
--- /dev/null
+++ b/changelog.d/11098.misc
@@ -0,0 +1 @@
+Add type hints to `synapse.events`.
diff --git a/changelog.d/11099.feature b/changelog.d/11099.feature
new file mode 100644
index 0000000000..c9126d4a9d
--- /dev/null
+++ b/changelog.d/11099.feature
@@ -0,0 +1 @@
+Add search by room ID and room alias to List Room admin API.
\ No newline at end of file
diff --git a/changelog.d/11137.misc b/changelog.d/11137.misc
new file mode 100644
index 0000000000..f0d6476f48
--- /dev/null
+++ b/changelog.d/11137.misc
@@ -0,0 +1 @@
+Remove and document unnecessary `RoomStreamToken` checks in application service ephemeral event code.
\ No newline at end of file
diff --git a/changelog.d/11188.bugfix b/changelog.d/11188.bugfix
new file mode 100644
index 0000000000..0688743c00
--- /dev/null
+++ b/changelog.d/11188.bugfix
@@ -0,0 +1 @@
+Allow an empty list of `state_events_at_start` to be sent when using the [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) `/batch_send` endpoint and the author of the historical messages is already part of the current room state at the given `?prev_event_id`.
diff --git a/changelog.d/11199.bugfix b/changelog.d/11199.bugfix
new file mode 100644
index 0000000000..dc3ea8d515
--- /dev/null
+++ b/changelog.d/11199.bugfix
@@ -0,0 +1 @@
+Delete `to_device` messages for hidden devices that will never be read, reducing database size.
\ No newline at end of file
diff --git a/changelog.d/11200.bugfix b/changelog.d/11200.bugfix
new file mode 100644
index 0000000000..c855081986
--- /dev/null
+++ b/changelog.d/11200.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug wherein a missing `Content-Type` header when downloading remote media would cause Synapse to throw an error.
\ No newline at end of file
diff --git a/changelog.d/11207.bugfix b/changelog.d/11207.bugfix
new file mode 100644
index 0000000000..7e98d565a1
--- /dev/null
+++ b/changelog.d/11207.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper.
diff --git a/changelog.d/11211.feature b/changelog.d/11211.feature
new file mode 100644
index 0000000000..feeb0cf089
--- /dev/null
+++ b/changelog.d/11211.feature
@@ -0,0 +1 @@
+Add support for serving `/.well-known/matrix/server` files, to redirect federation traffic to port 443.
diff --git a/changelog.d/11213.removal b/changelog.d/11213.removal
new file mode 100644
index 0000000000..9e5ec936e3
--- /dev/null
+++ b/changelog.d/11213.removal
@@ -0,0 +1 @@
+Remove deprecated admin API to delete rooms (`POST /_synapse/admin/v1/rooms/<room_id>/delete`).
\ No newline at end of file
diff --git a/changelog.d/11217.bugfix b/changelog.d/11217.bugfix
new file mode 100644
index 0000000000..67ebb0d0e3
--- /dev/null
+++ b/changelog.d/11217.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in 1.35.0 which made it impossible to join rooms that return a `send_join` response containing floats.
\ No newline at end of file
diff --git a/changelog.d/11225.misc b/changelog.d/11225.misc
new file mode 100644
index 0000000000..f14f65f9d4
--- /dev/null
+++ b/changelog.d/11225.misc
@@ -0,0 +1 @@
+Replace outdated links in the pull request checklist with links to the rendered documentation.
diff --git a/changelog.d/11226.misc b/changelog.d/11226.misc
new file mode 100644
index 0000000000..9ed4760ae0
--- /dev/null
+++ b/changelog.d/11226.misc
@@ -0,0 +1 @@
+Fix a bug in unit test `test_block_room_and_not_purge`.
diff --git a/changelog.d/11229.misc b/changelog.d/11229.misc
new file mode 100644
index 0000000000..7bb01cf079
--- /dev/null
+++ b/changelog.d/11229.misc
@@ -0,0 +1 @@
+`ObservableDeferred`: run registered observers in order.
diff --git a/changelog.d/11231.misc b/changelog.d/11231.misc
new file mode 100644
index 0000000000..c7fca7071e
--- /dev/null
+++ b/changelog.d/11231.misc
@@ -0,0 +1 @@
+Minor speed up to start up times and getting updates for groups by adding missing index to `local_group_updates.stream_id`.
diff --git a/changelog.d/11233.misc b/changelog.d/11233.misc
new file mode 100644
index 0000000000..fdf9e5642e
--- /dev/null
+++ b/changelog.d/11233.misc
@@ -0,0 +1 @@
+Add `twine` and `towncrier` as dev dependencies, as they're used by the release script.
diff --git a/changelog.d/11237.misc b/changelog.d/11237.misc
new file mode 100644
index 0000000000..b90efc6535
--- /dev/null
+++ b/changelog.d/11237.misc
@@ -0,0 +1 @@
+Allow `stream_writers.typing` config to be a list of one worker.
diff --git a/changelog.d/11239.misc b/changelog.d/11239.misc
new file mode 100644
index 0000000000..48a796bed0
--- /dev/null
+++ b/changelog.d/11239.misc
@@ -0,0 +1 @@
+Remove debugging statement in tests.
diff --git a/changelog.d/11240.bugfix b/changelog.d/11240.bugfix
new file mode 100644
index 0000000000..94d73f67e3
--- /dev/null
+++ b/changelog.d/11240.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection.
diff --git a/changelog.d/11246.misc b/changelog.d/11246.misc
new file mode 100644
index 0000000000..e5e912c1b0
--- /dev/null
+++ b/changelog.d/11246.misc
@@ -0,0 +1 @@
+Add an additional test for the `cachedList` method decorator.
diff --git a/changelog.d/11253.misc b/changelog.d/11253.misc
new file mode 100644
index 0000000000..71c55a2751
--- /dev/null
+++ b/changelog.d/11253.misc
@@ -0,0 +1 @@
+Make minor correction to the type of `auth_checkers` callbacks.
diff --git a/changelog.d/11255.bugfix b/changelog.d/11255.bugfix
new file mode 100644
index 0000000000..ce72592624
--- /dev/null
+++ b/changelog.d/11255.bugfix
@@ -0,0 +1 @@
+Fix rolling back Synapse version when using workers.
diff --git a/changelog.d/11257.doc b/changelog.d/11257.doc
new file mode 100644
index 0000000000..1205be2add
--- /dev/null
+++ b/changelog.d/11257.doc
@@ -0,0 +1 @@
+Add documentation for using LemonLDAP as an OpenID Connect Identity Provider. Contributed by @l00ptr.
diff --git a/changelog.d/11262.bugfix b/changelog.d/11262.bugfix
new file mode 100644
index 0000000000..768fbb8973
--- /dev/null
+++ b/changelog.d/11262.bugfix
@@ -0,0 +1 @@
+Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1.
diff --git a/changelog.d/11269.misc b/changelog.d/11269.misc
new file mode 100644
index 0000000000..a2149c2d2d
--- /dev/null
+++ b/changelog.d/11269.misc
@@ -0,0 +1 @@
+Clean up trivial aspects of the Debian package build tooling.
diff --git a/changelog.d/11270.misc b/changelog.d/11270.misc
new file mode 100644
index 0000000000..e2181b9b2a
--- /dev/null
+++ b/changelog.d/11270.misc
@@ -0,0 +1 @@
+Blacklist new SyTest that checks that key uploads are valid pending the validation being implemented in Synapse.
diff --git a/debian/build_virtualenv b/debian/build_virtualenv
index 3097371d59..e691163619 100755
--- a/debian/build_virtualenv
+++ b/debian/build_virtualenv
@@ -40,6 +40,7 @@ dh_virtualenv \
     --upgrade-pip \
     --preinstall="lxml" \
     --preinstall="mock" \
+    --preinstall="wheel" \
     --extra-pip-arg="--no-cache-dir" \
     --extra-pip-arg="--compile" \
     --extras="all,systemd,test"
diff --git a/debian/changelog b/debian/changelog
index 24842192b8..7e41bde858 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,9 +1,23 @@
 matrix-synapse-py3 (1.47.0+nmu1) UNRELEASED; urgency=medium
 
   * Update scripts to pass Shellcheck lints.
+  * Remove unused Vagrant scripts from debian/ directory.
+  * Change package Architecture to any.
+  * Preinstall the "wheel" package when building virtualenvs.
+  * Do not error if /etc/default/matrix-synapse is missing.
 
  -- root <root@cae79a6e79d7>  Fri, 22 Oct 2021 22:20:31 +0000
 
+matrix-synapse-py3 (1.46.0) stable; urgency=medium
+
+  [ Richard van der Hoff ]
+  * Compress debs with xz, to fix incompatibility of impish debs with reprepro.
+
+  [ Synapse Packaging team ]
+  * New synapse release 1.46.0.
+
+ -- Synapse Packaging team <packages@matrix.org>  Tue, 02 Nov 2021 13:22:53 +0000
+
 matrix-synapse-py3 (1.46.0~rc1) stable; urgency=medium
 
   * New synapse release 1.46.0~rc1.
diff --git a/debian/control b/debian/control
index 763fabd6f6..412a9e1d4c 100644
--- a/debian/control
+++ b/debian/control
@@ -19,7 +19,7 @@ Standards-Version: 3.9.8
 Homepage: https://github.com/matrix-org/synapse
 
 Package: matrix-synapse-py3
-Architecture: amd64
+Architecture: any
 Provides: matrix-synapse
 Conflicts:
  matrix-synapse (<< 0.34.0.1-0matrix2),
diff --git a/debian/matrix-synapse.service b/debian/matrix-synapse.service
index 553babf549..bde1c6cb9f 100644
--- a/debian/matrix-synapse.service
+++ b/debian/matrix-synapse.service
@@ -5,7 +5,7 @@ Description=Synapse Matrix homeserver
 Type=notify
 User=matrix-synapse
 WorkingDirectory=/var/lib/matrix-synapse
-EnvironmentFile=/etc/default/matrix-synapse
+EnvironmentFile=-/etc/default/matrix-synapse
 ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys
 ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/
 ExecReload=/bin/kill -HUP $MAINPID
diff --git a/debian/rules b/debian/rules
index b9d490adc9..5baf2475f0 100755
--- a/debian/rules
+++ b/debian/rules
@@ -51,5 +51,11 @@ override_dh_shlibdeps:
 override_dh_virtualenv:
 	./debian/build_virtualenv
 
+override_dh_builddeb:
+        # force the compression to xzip, to stop dpkg-deb on impish defaulting to zstd
+        # (which requires reprepro 5.3.0-1.3, which is currently only in 'experimental' in Debian:
+        # https://metadata.ftp-master.debian.org/changelogs/main/r/reprepro/reprepro_5.3.0-1.3_changelog)
+	dh_builddeb -- -Zxz
+
 %:
 	dh $@ --with python-virtualenv
diff --git a/debian/test/.gitignore b/debian/test/.gitignore
deleted file mode 100644
index 95eda73fcc..0000000000
--- a/debian/test/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-.vagrant
-*.log
diff --git a/debian/test/provision.sh b/debian/test/provision.sh
deleted file mode 100644
index 55d7b8e03a..0000000000
--- a/debian/test/provision.sh
+++ /dev/null
@@ -1,24 +0,0 @@
-#!/bin/bash
-#
-# provisioning script for vagrant boxes for testing the matrix-synapse debs.
-#
-# Will install the most recent matrix-synapse-py3 deb for this platform from
-# the /debs directory.
-
-set -e
-
-apt-get update
-apt-get install -y lsb-release
-
-deb=$(find /debs -name "matrix-synapse-py3_*+$(lsb_release -cs)*.deb" | sort | tail -n1)
-
-debconf-set-selections <<EOF
-matrix-synapse matrix-synapse/report-stats boolean false
-matrix-synapse matrix-synapse/server-name string localhost:18448
-EOF
-
-dpkg -i "$deb"
-
-sed -i -e 's/port: 8448$/port: 18448/; s/port: 8008$/port: 18008' /etc/matrix-synapse/homeserver.yaml
-echo 'registration_shared_secret: secret' >> /etc/matrix-synapse/homeserver.yaml
-systemctl restart matrix-synapse
diff --git a/debian/test/stretch/Vagrantfile b/debian/test/stretch/Vagrantfile
deleted file mode 100644
index d8eff6fe11..0000000000
--- a/debian/test/stretch/Vagrantfile
+++ /dev/null
@@ -1,13 +0,0 @@
-# -*- mode: ruby -*-
-# vi: set ft=ruby :
-
-ver = `cd ../../..; dpkg-parsechangelog -S Version`.strip()
-
-Vagrant.configure("2") do |config|
-  config.vm.box = "debian/stretch64"
-
-  config.vm.synced_folder ".", "/vagrant", disabled: true
-  config.vm.synced_folder "../../../../debs", "/debs", type: "nfs"
-
-  config.vm.provision "shell", path: "../provision.sh"
-end
diff --git a/debian/test/xenial/Vagrantfile b/debian/test/xenial/Vagrantfile
deleted file mode 100644
index 189236da17..0000000000
--- a/debian/test/xenial/Vagrantfile
+++ /dev/null
@@ -1,10 +0,0 @@
-# -*- mode: ruby -*-
-# vi: set ft=ruby :
-
-Vagrant.configure("2") do |config|
-  config.vm.box = "ubuntu/xenial64"
-
-  config.vm.synced_folder ".", "/vagrant", disabled: true
-  config.vm.synced_folder "../../../../debs", "/debs"
-  config.vm.provision "shell", path: "../provision.sh"
-end
diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md
index acf1cab2a2..ab6b82a082 100644
--- a/docs/admin_api/rooms.md
+++ b/docs/admin_api/rooms.md
@@ -38,9 +38,14 @@ The following query parameters are available:
   - `history_visibility` - Rooms are ordered alphabetically by visibility of history of the room.
   - `state_events` - Rooms are ordered by number of state events. Largest to smallest.
 * `dir` - Direction of room order. Either `f` for forwards or `b` for backwards. Setting
-          this value to `b` will reverse the above sort order. Defaults to `f`.
-* `search_term` - Filter rooms by their room name. Search term can be contained in any
-                  part of the room name. Defaults to no filtering.
+  this value to `b` will reverse the above sort order. Defaults to `f`.
+* `search_term` - Filter rooms by their room name, canonical alias and room id.
+  Specifically, rooms are selected if the search term is contained in
+  - the room's name,
+  - the local part of the room's canonical alias, or
+  - the complete (local and server part) room's id (case sensitive).
+
+  Defaults to no filtering.
 
 **Response**
 
@@ -380,7 +385,7 @@ A response body like the following is returned:
 
 # Delete Room API
 
-The Delete Room admin API allows server admins to remove rooms from server
+The Delete Room admin API allows server admins to remove rooms from the server
 and block these rooms.
 
 Shuts down a room. Moves all local users and room aliases automatically to a
@@ -520,16 +525,6 @@ With all that being said, if you still want to try and recover the room:
 4. If `new_room_user_id` was given, a 'Content Violation' will have been
    created. Consider whether you want to delete that roomm.
 
-## Deprecated endpoint
-
-The previous deprecated API will be removed in a future release, it was:
-
-```
-POST /_synapse/admin/v1/rooms/<room_id>/delete
-```
-
-It behaves the same way than the current endpoint except the path and the method.
-
 # Make Room Admin API
 
 Grants another user the highest power available to a local user who is in the room.
diff --git a/docs/delegate.md b/docs/delegate.md
index f3f89075d1..ee9cbb3b1c 100644
--- a/docs/delegate.md
+++ b/docs/delegate.md
@@ -1,4 +1,8 @@
-# Delegation
+# Delegation of incoming federation traffic
+
+In the following documentation, we use the term `server_name` to refer to that setting
+in your homeserver configuration file. It appears at the ends of user ids, and tells
+other homeservers where they can find your server.
 
 By default, other homeservers will expect to be able to reach yours via
 your `server_name`, on port 8448. For example, if you set your `server_name`
@@ -12,13 +16,21 @@ to a different server and/or port (e.g. `synapse.example.com:443`).
 
 ## .well-known delegation
 
-To use this method, you need to be able to alter the
-`server_name` 's https server to serve the `/.well-known/matrix/server`
-URL. Having an active server (with a valid TLS certificate) serving your
-`server_name` domain is out of the scope of this documentation.
+To use this method, you need to be able to configure the server at
+`https://<server_name>` to serve a file at
+`https://<server_name>/.well-known/matrix/server`.  There are two ways to do this, shown below.
+
+Note that the `.well-known` file is hosted on the default port for `https` (port 443).
+
+### External server
+
+For maximum flexibility, you need to configure an external server such as nginx, Apache
+or HAProxy to serve the `https://<server_name>/.well-known/matrix/server` file. Setting
+up such a server is out of the scope of this documentation, but note that it is often
+possible to configure your [reverse proxy](reverse_proxy.md) for this.
 
-The URL `https://<server_name>/.well-known/matrix/server` should
-return a JSON structure containing the key `m.server` like so:
+The URL `https://<server_name>/.well-known/matrix/server` should be configured
+return a JSON structure containing the key `m.server` like this:
 
 ```json
 {
@@ -26,8 +38,9 @@ return a JSON structure containing the key `m.server` like so:
 }
 ```
 
-In our example, this would mean that URL `https://example.com/.well-known/matrix/server`
-should return:
+In our example (where we want federation traffic to be routed to
+`https://synapse.example.com`, on port 443), this would mean that
+`https://example.com/.well-known/matrix/server` should return:
 
 ```json
 {
@@ -38,16 +51,29 @@ should return:
 Note, specifying a port is optional. If no port is specified, then it defaults
 to 8448.
 
-With .well-known delegation, federating servers will check for a valid TLS
-certificate for the delegated hostname (in our example: `synapse.example.com`).
+### Serving a `.well-known/matrix/server` file with Synapse
+
+If you are able to set up your domain so that `https://<server_name>` is routed to
+Synapse (i.e., the only change needed is to direct federation traffic to port 443
+instead of port 8448), then it is possible to configure Synapse to serve a suitable
+`.well-known/matrix/server` file. To do so, add the following to your `homeserver.yaml`
+file:
+
+```yaml
+serve_server_wellknown: true
+```
+
+**Note**: this *only* works if `https://<server_name>` is routed to Synapse, so is
+generally not suitable if Synapse is hosted at a subdomain such as
+`https://synapse.example.com`.
 
 ## SRV DNS record delegation
 
-It is also possible to do delegation using a SRV DNS record. However, that is
-considered an advanced topic since it's a bit complex to set up, and `.well-known`
-delegation is already enough in most cases.
+It is also possible to do delegation using a SRV DNS record. However, that is generally
+not recommended, as it can be difficult to configure the TLS certificates correctly in
+this case, and it offers little advantage over `.well-known` delegation.
 
-However, if you really need it, you can find some documentation on how such a
+However, if you really need it, you can find some documentation on what such a
 record should look like and how Synapse will use it in [the Matrix
 specification](https://matrix.org/docs/spec/server_server/latest#resolving-server-names).
 
@@ -68,27 +94,9 @@ wouldn't need any delegation set up.
 domain `server_name` points to, you will need to let other servers know how to
 find it using delegation.
 
-### Do you still recommend against using a reverse proxy on the federation port?
-
-We no longer actively recommend against using a reverse proxy. Many admins will
-find it easier to direct federation traffic to a reverse proxy and manage their
-own TLS certificates, and this is a supported configuration.
+### Should I use a reverse proxy for federation traffic?
 
-See [the reverse proxy documentation](reverse_proxy.md) for information on setting up a
+Generally, using a reverse proxy for both the federation and client traffic is a good
+idea, since it saves handling TLS traffic in Synapse. See
+[the reverse proxy documentation](reverse_proxy.md) for information on setting up a
 reverse proxy.
-
-### Do I still need to give my TLS certificates to Synapse if I am using a reverse proxy?
-
-This is no longer necessary. If you are using a reverse proxy for all of your
-TLS traffic, then you can set `no_tls: True` in the Synapse config.
-
-In that case, the only reason Synapse needs the certificate is to populate a legacy
-`tls_fingerprints` field in the federation API. This is ignored by Synapse 0.99.0
-and later, and the only time pre-0.99 Synapses will check it is when attempting to
-fetch the server keys - and generally this is delegated via `matrix.org`, which
-is running a modern version of Synapse.
-
-### Do I need the same certificate for the client and federation port?
-
-No. There is nothing stopping you from using different certificates,
-particularly if you are using a reverse proxy.
diff --git a/docs/modules/password_auth_provider_callbacks.md b/docs/modules/password_auth_provider_callbacks.md
index 0de60b128a..e53abf6409 100644
--- a/docs/modules/password_auth_provider_callbacks.md
+++ b/docs/modules/password_auth_provider_callbacks.md
@@ -11,7 +11,7 @@ registered by using the Module API's `register_password_auth_provider_callbacks`
 _First introduced in Synapse v1.46.0_
 
 ```python
- auth_checkers: Dict[Tuple[str,Tuple], Callable]
+auth_checkers: Dict[Tuple[str, Tuple[str, ...]], Callable]
 ```
 
 A dict mapping from tuples of a login type identifier (such as `m.login.password`) and a
diff --git a/docs/modules/third_party_rules_callbacks.md b/docs/modules/third_party_rules_callbacks.md
index a16e272f79..a3a17096a8 100644
--- a/docs/modules/third_party_rules_callbacks.md
+++ b/docs/modules/third_party_rules_callbacks.md
@@ -43,6 +43,14 @@ event with new data by returning the new event's data as a dictionary. In order
 that, it is recommended the module calls `event.get_dict()` to get the current event as a
 dictionary, and modify the returned dictionary accordingly.
 
+If `check_event_allowed` raises an exception, the module is assumed to have failed.
+The event will not be accepted but is not treated as explicitly rejected, either.
+An HTTP request causing the module check will likely result in a 500 Internal
+Server Error.
+
+When the boolean returned by the module is `False`, the event is rejected.
+(Module developers should not use exceptions for rejection.)
+
 Note that replacing the event only works for events sent by local users, not for events
 received over federation.
 
diff --git a/docs/openid.md b/docs/openid.md
index 4a340ef107..c74e8bda60 100644
--- a/docs/openid.md
+++ b/docs/openid.md
@@ -22,6 +22,7 @@ such as [Github][github-idp].
 [google-idp]: https://developers.google.com/identity/protocols/oauth2/openid-connect
 [auth0]: https://auth0.com/
 [authentik]: https://goauthentik.io/
+[lemonldap]: https://lemonldap-ng.org/
 [okta]: https://www.okta.com/
 [dex-idp]: https://github.com/dexidp/dex
 [keycloak-idp]: https://www.keycloak.org/docs/latest/server_admin/#sso-protocols
@@ -243,6 +244,43 @@ oidc_providers:
         display_name_template: "{{ user.preferred_username|capitalize }}" # TO BE FILLED: If your users have names in Authentik and you want those in Synapse, this should be replaced with user.name|capitalize.
 ```
 
+### LemonLDAP
+
+[LemonLDAP::NG][lemonldap] is an open-source IdP solution.
+
+1. Create an OpenID Connect Relying Parties in LemonLDAP::NG
+2. The parameters are:
+- Client ID under the basic menu of the new Relying Parties (`Options > Basic >
+  Client ID`)
+- Client secret (`Options > Basic > Client secret`)
+- JWT Algorithm: RS256 within the security menu of the new Relying Parties
+  (`Options > Security > ID Token signature algorithm` and `Options > Security >
+  Access Token signature algorithm`)
+- Scopes: OpenID, Email and Profile
+- Allowed redirection addresses for login (`Options > Basic > Allowed
+  redirection addresses for login` ) :
+  `[synapse public baseurl]/_synapse/client/oidc/callback`
+
+Synapse config:
+```yaml
+oidc_providers:
+  - idp_id: lemonldap
+    idp_name: lemonldap
+    discover: true
+    issuer: "https://auth.example.org/" # TO BE FILLED: replace with your domain
+    client_id: "your client id" # TO BE FILLED
+    client_secret: "your client secret" # TO BE FILLED
+    scopes:
+      - "openid"
+      - "profile"
+      - "email"
+    user_mapping_provider:
+      config:
+        localpart_template: "{{ user.preferred_username }}}"
+        # TO BE FILLED: If your users have names in LemonLDAP::NG and you want those in Synapse, this should be replaced with user.name|capitalize or any valid filter.
+        display_name_template: "{{ user.preferred_username|capitalize }}"
+```
+
 ### GitHub
 
 [GitHub][github-idp] is a bit special as it is not an OpenID Connect compliant provider, but
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index b90ed62d61..c3a4148f74 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -93,6 +93,24 @@ pid_file: DATADIR/homeserver.pid
 #
 #public_baseurl: https://example.com/
 
+# Uncomment the following to tell other servers to send federation traffic on
+# port 443.
+#
+# By default, other servers will try to reach our server on port 8448, which can
+# be inconvenient in some environments.
+#
+# Provided 'https://<server_name>/' on port 443 is routed to Synapse, this
+# option configures Synapse to serve a file at
+# 'https://<server_name>/.well-known/matrix/server'. This will tell other
+# servers to send traffic to port 443 instead.
+#
+# See https://matrix-org.github.io/synapse/latest/delegate.html for more
+# information.
+#
+# Defaults to 'false'.
+#
+#serve_server_wellknown: true
+
 # Set the soft limit on the number of file descriptors synapse can use
 # Zero is used to indicate synapse should set the soft limit to the
 # hard limit.
diff --git a/docs/systemd-with-workers/system/matrix-synapse-worker@.service b/docs/systemd-with-workers/system/matrix-synapse-worker@.service
index d164e8ce1f..8f5c44c9d4 100644
--- a/docs/systemd-with-workers/system/matrix-synapse-worker@.service
+++ b/docs/systemd-with-workers/system/matrix-synapse-worker@.service
@@ -15,7 +15,7 @@ Type=notify
 NotifyAccess=main
 User=matrix-synapse
 WorkingDirectory=/var/lib/matrix-synapse
-EnvironmentFile=/etc/default/matrix-synapse
+EnvironmentFile=-/etc/default/matrix-synapse
 ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.generic_worker --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --config-path=/etc/matrix-synapse/workers/%i.yaml
 ExecReload=/bin/kill -HUP $MAINPID
 Restart=always
diff --git a/docs/systemd-with-workers/system/matrix-synapse.service b/docs/systemd-with-workers/system/matrix-synapse.service
index f6b6dfd3ce..0c73fb55fb 100644
--- a/docs/systemd-with-workers/system/matrix-synapse.service
+++ b/docs/systemd-with-workers/system/matrix-synapse.service
@@ -10,7 +10,7 @@ Type=notify
 NotifyAccess=main
 User=matrix-synapse
 WorkingDirectory=/var/lib/matrix-synapse
-EnvironmentFile=/etc/default/matrix-synapse
+EnvironmentFile=-/etc/default/matrix-synapse
 ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys
 ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/
 ExecReload=/bin/kill -HUP $MAINPID
diff --git a/docs/upgrade.md b/docs/upgrade.md
index 06f479f86c..136c806c41 100644
--- a/docs/upgrade.md
+++ b/docs/upgrade.md
@@ -87,6 +87,16 @@ process, for example:
 
 # Upgrading to v1.47.0
 
+## Removal of old Room Admin API
+
+The following admin APIs were deprecated in [Synapse 1.34](https://github.com/matrix-org/synapse/blob/v1.34.0/CHANGES.md#deprecations-and-removals)
+(released on 2021-05-17) and have now been removed:
+
+- `POST /_synapse/admin/v1/<room_id>/delete`
+
+Any scripts still using the above APIs should be converted to use the
+[Delete Room API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#delete-room-api).
+
 ## Deprecation of the `user_may_create_room_with_invites` module callback
 
 The `user_may_create_room_with_invites` is deprecated and will be removed in a future
diff --git a/mypy.ini b/mypy.ini
index 119a7d8c91..600402a5d3 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -22,13 +22,7 @@ files =
   synapse/config,
   synapse/crypto,
   synapse/event_auth.py,
-  synapse/events/builder.py,
-  synapse/events/presence_router.py,
-  synapse/events/snapshot.py,
-  synapse/events/spamcheck.py,
-  synapse/events/third_party_rules.py,
-  synapse/events/utils.py,
-  synapse/events/validator.py,
+  synapse/events,
   synapse/federation,
   synapse/groups,
   synapse/handlers,
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 349866eb9a..640ff15277 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -43,6 +43,7 @@ from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyBackground
 from synapse.storage.databases.main.events_bg_updates import (
     EventsBackgroundUpdatesStore,
 )
+from synapse.storage.databases.main.group_server import GroupServerWorkerStore
 from synapse.storage.databases.main.media_repository import (
     MediaRepositoryBackgroundUpdateStore,
 )
@@ -181,6 +182,7 @@ class Store(
     StatsStore,
     PusherWorkerStore,
     PresenceBackgroundUpdateStore,
+    GroupServerWorkerStore,
 ):
     def execute(self, f, *args, **kwargs):
         return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
diff --git a/setup.py b/setup.py
index 220084a49d..345cff09c3 100755
--- a/setup.py
+++ b/setup.py
@@ -132,6 +132,9 @@ CONDITIONAL_REQUIREMENTS["dev"] = (
         "GitPython==3.1.14",
         "commonmark==0.9.1",
         "pygithub==1.55",
+        # The following are executed as commands by the release script.
+        "twine",
+        "towncrier",
     ]
 )
 
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 355b36fc63..5ef34bce40 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -47,7 +47,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.46.0rc1"
+__version__ = "1.46.0"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 685d1c25cf..85302163da 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -596,3 +596,10 @@ class ShadowBanError(Exception):
 
     This should be caught and a proper "fake" success response sent to the user.
     """
+
+
+class ModuleFailedException(Exception):
+    """
+    Raised when a module API callback fails, for example because it raised an
+    exception.
+    """
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 51eadf122d..218826741e 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -100,6 +100,7 @@ from synapse.rest.client.register import (
 from synapse.rest.health import HealthResource
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.rest.synapse.client import build_synapse_client_resource_tree
+from synapse.rest.well_known import well_known_resource
 from synapse.server import HomeServer
 from synapse.storage.databases.main.censor_events import CensorEventsStore
 from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
@@ -318,6 +319,8 @@ class GenericWorkerServer(HomeServer):
                     resources.update({CLIENT_API_PREFIX: resource})
 
                     resources.update(build_synapse_client_resource_tree(self))
+                    resources.update({"/.well-known": well_known_resource(self)})
+
                 elif name == "federation":
                     resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
                 elif name == "media":
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 93e2299266..336c279a44 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -66,7 +66,7 @@ from synapse.rest.admin import AdminRestResource
 from synapse.rest.health import HealthResource
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.rest.synapse.client import build_synapse_client_resource_tree
-from synapse.rest.well_known import WellKnownResource
+from synapse.rest.well_known import well_known_resource
 from synapse.server import HomeServer
 from synapse.storage import DataStore
 from synapse.util.httpresourcetree import create_resource_tree
@@ -189,7 +189,7 @@ class SynapseHomeServer(HomeServer):
                     "/_matrix/client/unstable": client_resource,
                     "/_matrix/client/v2_alpha": client_resource,
                     "/_matrix/client/versions": client_resource,
-                    "/.well-known/matrix/client": WellKnownResource(self),
+                    "/.well-known": well_known_resource(self),
                     "/_synapse/admin": AdminRestResource(self),
                     **build_synapse_client_resource_tree(self),
                 }
diff --git a/synapse/config/server.py b/synapse/config/server.py
index ed094bdc44..a387fd9310 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -262,6 +262,7 @@ class ServerConfig(Config):
         self.print_pidfile = config.get("print_pidfile")
         self.user_agent_suffix = config.get("user_agent_suffix")
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
+        self.serve_server_wellknown = config.get("serve_server_wellknown", False)
 
         self.public_baseurl = config.get("public_baseurl")
         if self.public_baseurl is not None:
@@ -774,6 +775,24 @@ class ServerConfig(Config):
         #
         #public_baseurl: https://example.com/
 
+        # Uncomment the following to tell other servers to send federation traffic on
+        # port 443.
+        #
+        # By default, other servers will try to reach our server on port 8448, which can
+        # be inconvenient in some environments.
+        #
+        # Provided 'https://<server_name>/' on port 443 is routed to Synapse, this
+        # option configures Synapse to serve a file at
+        # 'https://<server_name>/.well-known/matrix/server'. This will tell other
+        # servers to send traffic to port 443 instead.
+        #
+        # See https://matrix-org.github.io/synapse/latest/delegate.html for more
+        # information.
+        #
+        # Defaults to 'false'.
+        #
+        #serve_server_wellknown: true
+
         # Set the soft limit on the number of file descriptors synapse can use
         # Zero is used to indicate synapse should set the soft limit to the
         # hard limit.
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 462630201d..4507992031 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -63,7 +63,8 @@ class WriterLocations:
 
     Attributes:
         events: The instances that write to the event and backfill streams.
-        typing: The instance that writes to the typing stream.
+        typing: The instances that write to the typing stream. Currently
+            can only be a single instance.
         to_device: The instances that write to the to_device stream. Currently
             can only be a single instance.
         account_data: The instances that write to the account data streams. Currently
@@ -75,9 +76,15 @@ class WriterLocations:
     """
 
     events = attr.ib(
-        default=["master"], type=List[str], converter=_instance_to_list_converter
+        default=["master"],
+        type=List[str],
+        converter=_instance_to_list_converter,
+    )
+    typing = attr.ib(
+        default=["master"],
+        type=List[str],
+        converter=_instance_to_list_converter,
     )
-    typing = attr.ib(default="master", type=str)
     to_device = attr.ib(
         default=["master"],
         type=List[str],
@@ -217,6 +224,11 @@ class WorkerConfig(Config):
                         % (instance, stream)
                     )
 
+        if len(self.writers.typing) != 1:
+            raise ConfigError(
+                "Must only specify one instance to handle `typing` messages."
+            )
+
         if len(self.writers.to_device) != 1:
             raise ConfigError(
                 "Must only specify one instance to handle `to_device` messages."
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 157669ea88..38f3cf4d33 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -16,8 +16,23 @@
 
 import abc
 import os
-from typing import Dict, Optional, Tuple, Type
-
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Dict,
+    Generic,
+    Iterable,
+    List,
+    Optional,
+    Sequence,
+    Tuple,
+    Type,
+    TypeVar,
+    Union,
+    overload,
+)
+
+from typing_extensions import Literal
 from unpaddedbase64 import encode_base64
 
 from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
@@ -26,6 +41,9 @@ from synapse.util.caches import intern_dict
 from synapse.util.frozenutils import freeze
 from synapse.util.stringutils import strtobool
 
+if TYPE_CHECKING:
+    from synapse.events.builder import EventBuilder
+
 # Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents
 # bugs where we accidentally share e.g. signature dicts. However, converting a
 # dict to frozen_dicts is expensive.
@@ -37,7 +55,23 @@ from synapse.util.stringutils import strtobool
 USE_FROZEN_DICTS = strtobool(os.environ.get("SYNAPSE_USE_FROZEN_DICTS", "0"))
 
 
-class DictProperty:
+T = TypeVar("T")
+
+
+# DictProperty (and DefaultDictProperty) require the classes they're used with to
+# have a _dict property to pull properties from.
+#
+# TODO _DictPropertyInstance should not include EventBuilder but due to
+# https://github.com/python/mypy/issues/5570 it thinks the DictProperty and
+# DefaultDictProperty get applied to EventBuilder when it is in a Union with
+# EventBase. This is the least invasive hack to get mypy to comply.
+#
+# Note that DictProperty/DefaultDictProperty cannot actually be used with
+# EventBuilder as it lacks a _dict property.
+_DictPropertyInstance = Union["_EventInternalMetadata", "EventBase", "EventBuilder"]
+
+
+class DictProperty(Generic[T]):
     """An object property which delegates to the `_dict` within its parent object."""
 
     __slots__ = ["key"]
@@ -45,12 +79,33 @@ class DictProperty:
     def __init__(self, key: str):
         self.key = key
 
-    def __get__(self, instance, owner=None):
+    @overload
+    def __get__(
+        self,
+        instance: Literal[None],
+        owner: Optional[Type[_DictPropertyInstance]] = None,
+    ) -> "DictProperty":
+        ...
+
+    @overload
+    def __get__(
+        self,
+        instance: _DictPropertyInstance,
+        owner: Optional[Type[_DictPropertyInstance]] = None,
+    ) -> T:
+        ...
+
+    def __get__(
+        self,
+        instance: Optional[_DictPropertyInstance],
+        owner: Optional[Type[_DictPropertyInstance]] = None,
+    ) -> Union[T, "DictProperty"]:
         # if the property is accessed as a class property rather than an instance
         # property, return the property itself rather than the value
         if instance is None:
             return self
         try:
+            assert isinstance(instance, (EventBase, _EventInternalMetadata))
             return instance._dict[self.key]
         except KeyError as e1:
             # We want this to look like a regular attribute error (mostly so that
@@ -65,10 +120,12 @@ class DictProperty:
                 "'%s' has no '%s' property" % (type(instance), self.key)
             ) from e1.__context__
 
-    def __set__(self, instance, v):
+    def __set__(self, instance: _DictPropertyInstance, v: T) -> None:
+        assert isinstance(instance, (EventBase, _EventInternalMetadata))
         instance._dict[self.key] = v
 
-    def __delete__(self, instance):
+    def __delete__(self, instance: _DictPropertyInstance) -> None:
+        assert isinstance(instance, (EventBase, _EventInternalMetadata))
         try:
             del instance._dict[self.key]
         except KeyError as e1:
@@ -77,7 +134,7 @@ class DictProperty:
             ) from e1.__context__
 
 
-class DefaultDictProperty(DictProperty):
+class DefaultDictProperty(DictProperty, Generic[T]):
     """An extension of DictProperty which provides a default if the property is
     not present in the parent's _dict.
 
@@ -86,13 +143,34 @@ class DefaultDictProperty(DictProperty):
 
     __slots__ = ["default"]
 
-    def __init__(self, key, default):
+    def __init__(self, key: str, default: T):
         super().__init__(key)
         self.default = default
 
-    def __get__(self, instance, owner=None):
+    @overload
+    def __get__(
+        self,
+        instance: Literal[None],
+        owner: Optional[Type[_DictPropertyInstance]] = None,
+    ) -> "DefaultDictProperty":
+        ...
+
+    @overload
+    def __get__(
+        self,
+        instance: _DictPropertyInstance,
+        owner: Optional[Type[_DictPropertyInstance]] = None,
+    ) -> T:
+        ...
+
+    def __get__(
+        self,
+        instance: Optional[_DictPropertyInstance],
+        owner: Optional[Type[_DictPropertyInstance]] = None,
+    ) -> Union[T, "DefaultDictProperty"]:
         if instance is None:
             return self
+        assert isinstance(instance, (EventBase, _EventInternalMetadata))
         return instance._dict.get(self.key, self.default)
 
 
@@ -111,22 +189,22 @@ class _EventInternalMetadata:
         # in the DAG)
         self.outlier = False
 
-    out_of_band_membership: bool = DictProperty("out_of_band_membership")
-    send_on_behalf_of: str = DictProperty("send_on_behalf_of")
-    recheck_redaction: bool = DictProperty("recheck_redaction")
-    soft_failed: bool = DictProperty("soft_failed")
-    proactively_send: bool = DictProperty("proactively_send")
-    redacted: bool = DictProperty("redacted")
-    txn_id: str = DictProperty("txn_id")
-    token_id: int = DictProperty("token_id")
-    historical: bool = DictProperty("historical")
+    out_of_band_membership: DictProperty[bool] = DictProperty("out_of_band_membership")
+    send_on_behalf_of: DictProperty[str] = DictProperty("send_on_behalf_of")
+    recheck_redaction: DictProperty[bool] = DictProperty("recheck_redaction")
+    soft_failed: DictProperty[bool] = DictProperty("soft_failed")
+    proactively_send: DictProperty[bool] = DictProperty("proactively_send")
+    redacted: DictProperty[bool] = DictProperty("redacted")
+    txn_id: DictProperty[str] = DictProperty("txn_id")
+    token_id: DictProperty[int] = DictProperty("token_id")
+    historical: DictProperty[bool] = DictProperty("historical")
 
     # XXX: These are set by StreamWorkerStore._set_before_and_after.
     # I'm pretty sure that these are never persisted to the database, so shouldn't
     # be here
-    before: RoomStreamToken = DictProperty("before")
-    after: RoomStreamToken = DictProperty("after")
-    order: Tuple[int, int] = DictProperty("order")
+    before: DictProperty[RoomStreamToken] = DictProperty("before")
+    after: DictProperty[RoomStreamToken] = DictProperty("after")
+    order: DictProperty[Tuple[int, int]] = DictProperty("order")
 
     def get_dict(self) -> JsonDict:
         return dict(self._dict)
@@ -162,9 +240,6 @@ class _EventInternalMetadata:
 
         If the sender of the redaction event is allowed to redact any event
         due to auth rules, then this will always return false.
-
-        Returns:
-            bool
         """
         return self._dict.get("recheck_redaction", False)
 
@@ -176,32 +251,23 @@ class _EventInternalMetadata:
                sent to clients.
             2. They should not be added to the forward extremities (and
                therefore not to current state).
-
-        Returns:
-            bool
         """
         return self._dict.get("soft_failed", False)
 
-    def should_proactively_send(self):
+    def should_proactively_send(self) -> bool:
         """Whether the event, if ours, should be sent to other clients and
         servers.
 
         This is used for sending dummy events internally. Servers and clients
         can still explicitly fetch the event.
-
-        Returns:
-            bool
         """
         return self._dict.get("proactively_send", True)
 
-    def is_redacted(self):
+    def is_redacted(self) -> bool:
         """Whether the event has been redacted.
 
         This is used for efficiently checking whether an event has been
         marked as redacted without needing to make another database call.
-
-        Returns:
-            bool
         """
         return self._dict.get("redacted", False)
 
@@ -241,29 +307,31 @@ class EventBase(metaclass=abc.ABCMeta):
 
         self.internal_metadata = _EventInternalMetadata(internal_metadata_dict)
 
-    auth_events = DictProperty("auth_events")
-    depth = DictProperty("depth")
-    content = DictProperty("content")
-    hashes = DictProperty("hashes")
-    origin = DictProperty("origin")
-    origin_server_ts = DictProperty("origin_server_ts")
-    prev_events = DictProperty("prev_events")
-    redacts = DefaultDictProperty("redacts", None)
-    room_id = DictProperty("room_id")
-    sender = DictProperty("sender")
-    state_key = DictProperty("state_key")
-    type = DictProperty("type")
-    user_id = DictProperty("sender")
+    depth: DictProperty[int] = DictProperty("depth")
+    content: DictProperty[JsonDict] = DictProperty("content")
+    hashes: DictProperty[Dict[str, str]] = DictProperty("hashes")
+    origin: DictProperty[str] = DictProperty("origin")
+    origin_server_ts: DictProperty[int] = DictProperty("origin_server_ts")
+    redacts: DefaultDictProperty[Optional[str]] = DefaultDictProperty("redacts", None)
+    room_id: DictProperty[str] = DictProperty("room_id")
+    sender: DictProperty[str] = DictProperty("sender")
+    # TODO state_key should be Optional[str], this is generally asserted in Synapse
+    # by calling is_state() first (which ensures this), but it is hard (not possible?)
+    # to properly annotate that calling is_state() asserts that state_key exists
+    # and is non-None.
+    state_key: DictProperty[str] = DictProperty("state_key")
+    type: DictProperty[str] = DictProperty("type")
+    user_id: DictProperty[str] = DictProperty("sender")
 
     @property
     def event_id(self) -> str:
         raise NotImplementedError()
 
     @property
-    def membership(self):
+    def membership(self) -> str:
         return self.content["membership"]
 
-    def is_state(self):
+    def is_state(self) -> bool:
         return hasattr(self, "state_key") and self.state_key is not None
 
     def get_dict(self) -> JsonDict:
@@ -272,13 +340,13 @@ class EventBase(metaclass=abc.ABCMeta):
 
         return d
 
-    def get(self, key, default=None):
+    def get(self, key: str, default: Optional[Any] = None) -> Any:
         return self._dict.get(key, default)
 
-    def get_internal_metadata_dict(self):
+    def get_internal_metadata_dict(self) -> JsonDict:
         return self.internal_metadata.get_dict()
 
-    def get_pdu_json(self, time_now=None) -> JsonDict:
+    def get_pdu_json(self, time_now: Optional[int] = None) -> JsonDict:
         pdu_json = self.get_dict()
 
         if time_now is not None and "age_ts" in pdu_json["unsigned"]:
@@ -305,49 +373,46 @@ class EventBase(metaclass=abc.ABCMeta):
 
         return template_json
 
-    def __set__(self, instance, value):
-        raise AttributeError("Unrecognized attribute %s" % (instance,))
-
-    def __getitem__(self, field):
+    def __getitem__(self, field: str) -> Optional[Any]:
         return self._dict[field]
 
-    def __contains__(self, field):
+    def __contains__(self, field: str) -> bool:
         return field in self._dict
 
-    def items(self):
+    def items(self) -> List[Tuple[str, Optional[Any]]]:
         return list(self._dict.items())
 
-    def keys(self):
+    def keys(self) -> Iterable[str]:
         return self._dict.keys()
 
-    def prev_event_ids(self):
+    def prev_event_ids(self) -> Sequence[str]:
         """Returns the list of prev event IDs. The order matches the order
         specified in the event, though there is no meaning to it.
 
         Returns:
-            list[str]: The list of event IDs of this event's prev_events
+            The list of event IDs of this event's prev_events
         """
-        return [e for e, _ in self.prev_events]
+        return [e for e, _ in self._dict["prev_events"]]
 
-    def auth_event_ids(self):
+    def auth_event_ids(self) -> Sequence[str]:
         """Returns the list of auth event IDs. The order matches the order
         specified in the event, though there is no meaning to it.
 
         Returns:
-            list[str]: The list of event IDs of this event's auth_events
+            The list of event IDs of this event's auth_events
         """
-        return [e for e, _ in self.auth_events]
+        return [e for e, _ in self._dict["auth_events"]]
 
-    def freeze(self):
+    def freeze(self) -> None:
         """'Freeze' the event dict, so it cannot be modified by accident"""
 
         # this will be a no-op if the event dict is already frozen.
         self._dict = freeze(self._dict)
 
-    def __str__(self):
+    def __str__(self) -> str:
         return self.__repr__()
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         rejection = f"REJECTED={self.rejected_reason}, " if self.rejected_reason else ""
 
         return (
@@ -443,7 +508,7 @@ class FrozenEventV2(EventBase):
         else:
             frozen_dict = event_dict
 
-        self._event_id = None
+        self._event_id: Optional[str] = None
 
         super().__init__(
             frozen_dict,
@@ -455,7 +520,7 @@ class FrozenEventV2(EventBase):
         )
 
     @property
-    def event_id(self):
+    def event_id(self) -> str:
         # We have to import this here as otherwise we get an import loop which
         # is hard to break.
         from synapse.crypto.event_signing import compute_event_reference_hash
@@ -465,23 +530,23 @@ class FrozenEventV2(EventBase):
         self._event_id = "$" + encode_base64(compute_event_reference_hash(self)[1])
         return self._event_id
 
-    def prev_event_ids(self):
+    def prev_event_ids(self) -> Sequence[str]:
         """Returns the list of prev event IDs. The order matches the order
         specified in the event, though there is no meaning to it.
 
         Returns:
-            list[str]: The list of event IDs of this event's prev_events
+            The list of event IDs of this event's prev_events
         """
-        return self.prev_events
+        return self._dict["prev_events"]
 
-    def auth_event_ids(self):
+    def auth_event_ids(self) -> Sequence[str]:
         """Returns the list of auth event IDs. The order matches the order
         specified in the event, though there is no meaning to it.
 
         Returns:
-            list[str]: The list of event IDs of this event's auth_events
+            The list of event IDs of this event's auth_events
         """
-        return self.auth_events
+        return self._dict["auth_events"]
 
 
 class FrozenEventV3(FrozenEventV2):
@@ -490,7 +555,7 @@ class FrozenEventV3(FrozenEventV2):
     format_version = EventFormatVersions.V3  # All events of this type are V3
 
     @property
-    def event_id(self):
+    def event_id(self) -> str:
         # We have to import this here as otherwise we get an import loop which
         # is hard to break.
         from synapse.crypto.event_signing import compute_event_reference_hash
@@ -503,12 +568,14 @@ class FrozenEventV3(FrozenEventV2):
         return self._event_id
 
 
-def _event_type_from_format_version(format_version: int) -> Type[EventBase]:
+def _event_type_from_format_version(
+    format_version: int,
+) -> Type[Union[FrozenEvent, FrozenEventV2, FrozenEventV3]]:
     """Returns the python type to use to construct an Event object for the
     given event format version.
 
     Args:
-        format_version (int): The event format version
+        format_version: The event format version
 
     Returns:
         type: A type that can be initialized as per the initializer of
diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py
index 8816ef4b76..1bb8ca7145 100644
--- a/synapse/events/third_party_rules.py
+++ b/synapse/events/third_party_rules.py
@@ -14,7 +14,7 @@
 import logging
 from typing import TYPE_CHECKING, Any, Awaitable, Callable, List, Optional, Tuple
 
-from synapse.api.errors import SynapseError
+from synapse.api.errors import ModuleFailedException, SynapseError
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.types import Requester, StateMap
@@ -233,9 +233,10 @@ class ThirdPartyEventRules:
                 # This module callback needs a rework so that hacks such as
                 # this one are not necessary.
                 raise e
-            except Exception as e:
-                logger.warning("Failed to run module API callback %s: %s", callback, e)
-                continue
+            except Exception:
+                raise ModuleFailedException(
+                    "Failed to run `check_event_allowed` module API callback"
+                )
 
             # Return if the event shouldn't be allowed or if the module came up with a
             # replacement dict for the event.
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 4d459c17f1..cf86934968 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -55,7 +55,7 @@ class EventValidator:
         ]
 
         for k in required:
-            if not hasattr(event, k):
+            if k not in event:
                 raise SynapseError(400, "Event does not have key %s" % (k,))
 
         # Check that the following keys have string values
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 32a75993d9..9a8758e9a6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -213,6 +213,11 @@ class FederationServer(FederationBase):
             self._started_handling_of_staged_events = True
             self._handle_old_staged_events()
 
+            # Start a periodic check for old staged events. This is to handle
+            # the case where locks time out, e.g. if another process gets killed
+            # without dropping its locks.
+            self._clock.looping_call(self._handle_old_staged_events, 60 * 1000)
+
         # keep this as early as possible to make the calculated origin ts as
         # accurate as possible.
         request_time = self._clock.time_msec()
@@ -1232,10 +1237,6 @@ class FederationHandlerRegistry:
 
         self.query_handlers[query_type] = handler
 
-    def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None:
-        """Register that the EDU handler is on a different instance than master."""
-        self._edu_type_to_instance[edu_type] = [instance_name]
-
     def register_instances_for_edu(
         self, edu_type: str, instance_names: List[str]
     ) -> None:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index d963178838..10b5aa5af8 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -1310,14 +1310,17 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
         self._coro_state = ijson.items_coro(
             _event_list_parser(room_version, self._response.state),
             prefix + "state.item",
+            use_float=True,
         )
         self._coro_auth = ijson.items_coro(
             _event_list_parser(room_version, self._response.auth_events),
             prefix + "auth_chain.item",
+            use_float=True,
         )
         self._coro_event = ijson.kvitems_coro(
             _event_parser(self._response.event_dict),
             prefix + "org.matrix.msc3083.v2.event",
+            use_float=True,
         )
 
     def write(self, data: bytes) -> int:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 36c206dae6..ddc9105ee9 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.storage.databases.main.directory import RoomAliasMapping
 from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.util.async_helpers import Linearizer
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -58,6 +59,10 @@ class ApplicationServicesHandler:
         self.current_max = 0
         self.is_processing = False
 
+        self._ephemeral_events_linearizer = Linearizer(
+            name="appservice_ephemeral_events"
+        )
+
     def notify_interested_services(self, max_token: RoomStreamToken) -> None:
         """Notifies (pushes) all application services interested in this event.
 
@@ -182,7 +187,7 @@ class ApplicationServicesHandler:
     def notify_interested_services_ephemeral(
         self,
         stream_key: str,
-        new_token: Optional[int],
+        new_token: Union[int, RoomStreamToken],
         users: Optional[Collection[Union[str, UserID]]] = None,
     ) -> None:
         """
@@ -203,7 +208,7 @@ class ApplicationServicesHandler:
                 Appservices will only receive ephemeral events that fall within their
                 registered user and room namespaces.
 
-            new_token: The latest stream token.
+            new_token: The stream token of the event.
             users: The users that should be informed of the new event, if any.
         """
         if not self.notify_appservices:
@@ -212,6 +217,19 @@ class ApplicationServicesHandler:
         if stream_key not in ("typing_key", "receipt_key", "presence_key"):
             return
 
+        # Assert that new_token is an integer (and not a RoomStreamToken).
+        # All of the supported streams that this function handles use an
+        # integer to track progress (rather than a RoomStreamToken - a
+        # vector clock implementation) as they don't support multiple
+        # stream writers.
+        #
+        # As a result, we simply assert that new_token is an integer.
+        # If we do end up needing to pass a RoomStreamToken down here
+        # in the future, using RoomStreamToken.stream (the minimum stream
+        # position) to convert to an ascending integer value should work.
+        # Additional context: https://github.com/matrix-org/synapse/pull/11137
+        assert isinstance(new_token, int)
+
         services = [
             service
             for service in self.store.get_app_services()
@@ -231,14 +249,13 @@ class ApplicationServicesHandler:
         self,
         services: List[ApplicationService],
         stream_key: str,
-        new_token: Optional[int],
+        new_token: int,
         users: Collection[Union[str, UserID]],
     ) -> None:
         logger.debug("Checking interested services for %s" % (stream_key))
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
-                # Only handle typing if we have the latest token
-                if stream_key == "typing_key" and new_token is not None:
+                if stream_key == "typing_key":
                     # Note that we don't persist the token (via set_type_stream_id_for_appservice)
                     # for typing_key due to performance reasons and due to their highly
                     # ephemeral nature.
@@ -248,26 +265,37 @@ class ApplicationServicesHandler:
                     events = await self._handle_typing(service, new_token)
                     if events:
                         self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    continue
 
-                elif stream_key == "receipt_key":
-                    events = await self._handle_receipts(service)
-                    if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
-
-                    # Persist the latest handled stream token for this appservice
-                    await self.store.set_type_stream_id_for_appservice(
-                        service, "read_receipt", new_token
+                # Since we read/update the stream position for this AS/stream
+                with (
+                    await self._ephemeral_events_linearizer.queue(
+                        (service.id, stream_key)
                     )
+                ):
+                    if stream_key == "receipt_key":
+                        events = await self._handle_receipts(service, new_token)
+                        if events:
+                            self.scheduler.submit_ephemeral_events_for_as(
+                                service, events
+                            )
+
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "read_receipt", new_token
+                        )
 
-                elif stream_key == "presence_key":
-                    events = await self._handle_presence(service, users)
-                    if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                    elif stream_key == "presence_key":
+                        events = await self._handle_presence(service, users, new_token)
+                        if events:
+                            self.scheduler.submit_ephemeral_events_for_as(
+                                service, events
+                            )
 
-                    # Persist the latest handled stream token for this appservice
-                    await self.store.set_type_stream_id_for_appservice(
-                        service, "presence", new_token
-                    )
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "presence", new_token
+                        )
 
     async def _handle_typing(
         self, service: ApplicationService, new_token: int
@@ -304,7 +332,9 @@ class ApplicationServicesHandler:
         )
         return typing
 
-    async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
+    async def _handle_receipts(
+        self, service: ApplicationService, new_token: Optional[int]
+    ) -> List[JsonDict]:
         """
         Return the latest read receipts that the given application service should receive.
 
@@ -323,6 +353,12 @@ class ApplicationServicesHandler:
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "read_receipt"
         )
+        if new_token is not None and new_token <= from_key:
+            logger.debug(
+                "Rejecting token lower than or equal to stored: %s" % (new_token,)
+            )
+            return []
+
         receipts_source = self.event_sources.sources.receipt
         receipts, _ = await receipts_source.get_new_events_as(
             service=service, from_key=from_key
@@ -330,7 +366,10 @@ class ApplicationServicesHandler:
         return receipts
 
     async def _handle_presence(
-        self, service: ApplicationService, users: Collection[Union[str, UserID]]
+        self,
+        service: ApplicationService,
+        users: Collection[Union[str, UserID]],
+        new_token: Optional[int],
     ) -> List[JsonDict]:
         """
         Return the latest presence updates that the given application service should receive.
@@ -353,6 +392,12 @@ class ApplicationServicesHandler:
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "presence"
         )
+        if new_token is not None and new_token <= from_key:
+            logger.debug(
+                "Rejecting token lower than or equal to stored: %s" % (new_token,)
+            )
+            return []
+
         for user in users:
             if isinstance(user, str):
                 user = UserID.from_string(user)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d508d7d32a..60e59d11a0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1989,7 +1989,9 @@ class PasswordAuthProvider:
         self,
         check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None,
         on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None,
-        auth_checkers: Optional[Dict[Tuple[str, Tuple], CHECK_AUTH_CALLBACK]] = None,
+        auth_checkers: Optional[
+            Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
+        ] = None,
     ) -> None:
         # Register check_3pid_auth callback
         if check_3pid_auth is not None:
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index e617db4c0d..1a1cd93b1a 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -1643,7 +1643,7 @@ class FederationEventHandler:
             event: the event whose auth_events we want
 
         Returns:
-            all of the events in `event.auth_events`, after deduplication
+            all of the events listed in `event.auth_events_ids`, after deduplication
 
         Raises:
             AuthError if we were unable to fetch the auth_events for any reason.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4a0fccfcc6..b7bc187169 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1318,6 +1318,8 @@ class EventCreationHandler:
             # user is actually admin or not).
             is_admin_redaction = False
             if event.type == EventTypes.Redaction:
+                assert event.redacts is not None
+
                 original_event = await self.store.get_event(
                     event.redacts,
                     redact_behaviour=EventRedactBehaviour.AS_IS,
@@ -1413,6 +1415,8 @@ class EventCreationHandler:
                 )
 
         if event.type == EventTypes.Redaction:
+            assert event.redacts is not None
+
             original_event = await self.store.get_event(
                 event.redacts,
                 redact_behaviour=EventRedactBehaviour.AS_IS,
@@ -1500,11 +1504,13 @@ class EventCreationHandler:
                 next_batch_id = event.content.get(
                     EventContentFields.MSC2716_NEXT_BATCH_ID
                 )
-                conflicting_insertion_event_id = (
-                    await self.store.get_insertion_event_by_batch_id(
-                        event.room_id, next_batch_id
+                conflicting_insertion_event_id = None
+                if next_batch_id:
+                    conflicting_insertion_event_id = (
+                        await self.store.get_insertion_event_by_batch_id(
+                            event.room_id, next_batch_id
+                        )
                     )
-                )
                 if conflicting_insertion_event_id is not None:
                     # The current insertion event that we're processing is invalid
                     # because an insertion event already exists in the room with the
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 99e9b37344..969eb3b9b0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -525,7 +525,7 @@ class RoomCreationHandler:
             ):
                 await self.room_member_handler.update_membership(
                     requester,
-                    UserID.from_string(old_event["state_key"]),
+                    UserID.from_string(old_event.state_key),
                     new_room_id,
                     "ban",
                     ratelimit=False,
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index 2f5a3e4d19..0723286383 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -355,7 +355,7 @@ class RoomBatchHandler:
         for (event, context) in reversed(events_to_persist):
             await self.event_creation_handler.handle_new_client_event(
                 await self.create_requester_for_user_id_from_app_service(
-                    event["sender"], app_service_requester.app_service
+                    event.sender, app_service_requester.app_service
                 ),
                 event=event,
                 context=context,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 74e6c7eca6..08244b690d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1669,7 +1669,9 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         #
         # the prev_events consist solely of the previous membership event.
         prev_event_ids = [previous_membership_event.event_id]
-        auth_event_ids = previous_membership_event.auth_event_ids() + prev_event_ids
+        auth_event_ids = (
+            list(previous_membership_event.auth_event_ids()) + prev_event_ids
+        )
 
         event, context = await self.event_creation_handler.create_event(
             requester,
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c411d69924..22c6174821 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -62,8 +62,8 @@ class FollowerTypingHandler:
         if hs.should_send_federation():
             self.federation = hs.get_federation_sender()
 
-        if hs.config.worker.writers.typing != hs.get_instance_name():
-            hs.get_federation_registry().register_instance_for_edu(
+        if hs.get_instance_name() not in hs.config.worker.writers.typing:
+            hs.get_federation_registry().register_instances_for_edu(
                 "m.typing",
                 hs.config.worker.writers.typing,
             )
@@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
-        assert hs.config.worker.writers.typing == hs.get_instance_name()
+        assert hs.get_instance_name() in hs.config.worker.writers.typing
 
         self.auth = hs.get_auth()
         self.notifier = hs.get_notifier()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1882fffd2a..60e5409895 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -383,29 +383,6 @@ class Notifier:
         except Exception:
             logger.exception("Error notifying application services of event")
 
-    def _notify_app_services_ephemeral(
-        self,
-        stream_key: str,
-        new_token: Union[int, RoomStreamToken],
-        users: Optional[Collection[Union[str, UserID]]] = None,
-    ) -> None:
-        """Notify application services of ephemeral event activity.
-
-        Args:
-            stream_key: The stream the event came from.
-            new_token: The value of the new stream token.
-            users: The users that should be informed of the new event, if any.
-        """
-        try:
-            stream_token = None
-            if isinstance(new_token, int):
-                stream_token = new_token
-            self.appservice_handler.notify_interested_services_ephemeral(
-                stream_key, stream_token, users or []
-            )
-        except Exception:
-            logger.exception("Error notifying application services of event")
-
     def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
         try:
             self._pusher_pool.on_new_notifications(max_room_stream_token)
@@ -467,12 +444,15 @@ class Notifier:
 
             self.notify_replication()
 
-            # Notify appservices
-            self._notify_app_services_ephemeral(
-                stream_key,
-                new_token,
-                users,
-            )
+            # Notify appservices.
+            try:
+                self.appservice_handler.notify_interested_services_ephemeral(
+                    stream_key,
+                    new_token,
+                    users,
+                )
+            except Exception:
+                logger.exception("Error notifying application services of event")
 
     def on_new_replication_data(self) -> None:
         """Used to inform replication listeners that something has happened
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 0622a37ae8..009d8e77b0 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -232,6 +232,8 @@ class BulkPushRuleEvaluator:
                 # that user, as they might not be already joined.
                 if event.type == EventTypes.Member and event.state_key == uid:
                     display_name = event.content.get("displayname", None)
+                    if not isinstance(display_name, str):
+                        display_name = None
 
             if count_as_unread:
                 # Add an element for the current user if the event needs to be marked as
@@ -268,7 +270,7 @@ def _condition_checker(
     evaluator: PushRuleEvaluatorForEvent,
     conditions: List[dict],
     uid: str,
-    display_name: str,
+    display_name: Optional[str],
     cache: Dict[str, bool],
 ) -> bool:
     for cond in conditions:
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 7a8dc63976..7f68092ec5 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -18,7 +18,7 @@ import re
 from typing import Any, Dict, List, Optional, Pattern, Tuple, Union
 
 from synapse.events import EventBase
-from synapse.types import UserID
+from synapse.types import JsonDict, UserID
 from synapse.util import glob_to_regex, re_word_boundary
 from synapse.util.caches.lrucache import LruCache
 
@@ -129,7 +129,7 @@ class PushRuleEvaluatorForEvent:
         self._value_cache = _flatten_dict(event)
 
     def matches(
-        self, condition: Dict[str, Any], user_id: str, display_name: str
+        self, condition: Dict[str, Any], user_id: str, display_name: Optional[str]
     ) -> bool:
         if condition["kind"] == "event_match":
             return self._event_match(condition, user_id)
@@ -172,7 +172,7 @@ class PushRuleEvaluatorForEvent:
 
             return _glob_matches(pattern, haystack)
 
-    def _contains_display_name(self, display_name: str) -> bool:
+    def _contains_display_name(self, display_name: Optional[str]) -> bool:
         if not display_name:
             return False
 
@@ -222,7 +222,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
 
 
 def _flatten_dict(
-    d: Union[EventBase, dict],
+    d: Union[EventBase, JsonDict],
     prefix: Optional[List[str]] = None,
     result: Optional[Dict[str, str]] = None,
 ) -> Dict[str, str]:
@@ -233,7 +233,7 @@ def _flatten_dict(
     for key, value in d.items():
         if isinstance(value, str):
             result[".".join(prefix + [key])] = value.lower()
-        elif hasattr(value, "items"):
+        elif isinstance(value, dict):
             _flatten_dict(value, prefix=(prefix + [key]), result=result)
 
     return result
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 06fd06fdf3..21293038ef 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -138,7 +138,7 @@ class ReplicationCommandHandler:
             if isinstance(stream, TypingStream):
                 # Only add TypingStream as a source on the instance in charge of
                 # typing.
-                if hs.config.worker.writers.typing == hs.get_instance_name():
+                if hs.get_instance_name() in hs.config.worker.writers.typing:
                     self._streams_to_replicate.append(stream)
 
                 continue
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index c8b188ae4e..743a01da08 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -328,8 +328,7 @@ class TypingStream(Stream):
     ROW_TYPE = TypingStreamRow
 
     def __init__(self, hs: "HomeServer"):
-        writer_instance = hs.config.worker.writers.typing
-        if writer_instance == hs.get_instance_name():
+        if hs.get_instance_name() in hs.config.worker.writers.typing:
             # On the writer, query the typing handler
             typing_writer_handler = hs.get_typing_writer_handler()
             update_function: Callable[
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index e1506deb2b..70514e814f 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -42,7 +42,6 @@ from synapse.rest.admin.registration_tokens import (
     RegistrationTokenRestServlet,
 )
 from synapse.rest.admin.rooms import (
-    DeleteRoomRestServlet,
     ForwardExtremitiesRestServlet,
     JoinRoomAliasServlet,
     ListRoomRestServlet,
@@ -221,7 +220,6 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     RoomStateRestServlet(hs).register(http_server)
     RoomRestServlet(hs).register(http_server)
     RoomMembersRestServlet(hs).register(http_server)
-    DeleteRoomRestServlet(hs).register(http_server)
     JoinRoomAliasServlet(hs).register(http_server)
     VersionServlet(hs).register(http_server)
     UserAdminServlet(hs).register(http_server)
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index a4823ca6e7..05c5b4bf0c 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -46,41 +46,6 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-class DeleteRoomRestServlet(RestServlet):
-    """Delete a room from server.
-
-    It is a combination and improvement of shutdown and purge room.
-
-    Shuts down a room by removing all local users from the room.
-    Blocking all future invites and joins to the room is optional.
-
-    If desired any local aliases will be repointed to a new room
-    created by `new_room_user_id` and kicked users will be auto-
-    joined to the new room.
-
-    If 'purge' is true, it will remove all traces of a room from the database.
-    """
-
-    PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]+)/delete$")
-
-    def __init__(self, hs: "HomeServer"):
-        self.hs = hs
-        self.auth = hs.get_auth()
-        self.room_shutdown_handler = hs.get_room_shutdown_handler()
-        self.pagination_handler = hs.get_pagination_handler()
-
-    async def on_POST(
-        self, request: SynapseRequest, room_id: str
-    ) -> Tuple[int, JsonDict]:
-        return await _delete_room(
-            request,
-            room_id,
-            self.auth,
-            self.room_shutdown_handler,
-            self.pagination_handler,
-        )
-
-
 class ListRoomRestServlet(RestServlet):
     """
     List all rooms that are known to the homeserver. Results are returned
@@ -218,7 +183,7 @@ class RoomRestServlet(RestServlet):
     async def on_DELETE(
         self, request: SynapseRequest, room_id: str
     ) -> Tuple[int, JsonDict]:
-        return await _delete_room(
+        return await self._delete_room(
             request,
             room_id,
             self.auth,
@@ -226,6 +191,58 @@ class RoomRestServlet(RestServlet):
             self.pagination_handler,
         )
 
+    async def _delete_room(
+        self,
+        request: SynapseRequest,
+        room_id: str,
+        auth: "Auth",
+        room_shutdown_handler: "RoomShutdownHandler",
+        pagination_handler: "PaginationHandler",
+    ) -> Tuple[int, JsonDict]:
+        requester = await auth.get_user_by_req(request)
+        await assert_user_is_admin(auth, requester.user)
+
+        content = parse_json_object_from_request(request)
+
+        block = content.get("block", False)
+        if not isinstance(block, bool):
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "Param 'block' must be a boolean, if given",
+                Codes.BAD_JSON,
+            )
+
+        purge = content.get("purge", True)
+        if not isinstance(purge, bool):
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "Param 'purge' must be a boolean, if given",
+                Codes.BAD_JSON,
+            )
+
+        force_purge = content.get("force_purge", False)
+        if not isinstance(force_purge, bool):
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "Param 'force_purge' must be a boolean, if given",
+                Codes.BAD_JSON,
+            )
+
+        ret = await room_shutdown_handler.shutdown_room(
+            room_id=room_id,
+            new_room_user_id=content.get("new_room_user_id"),
+            new_room_name=content.get("room_name"),
+            message=content.get("message"),
+            requester_user_id=requester.user.to_string(),
+            block=block,
+        )
+
+        # Purge room
+        if purge:
+            await pagination_handler.purge_room(room_id, force=force_purge)
+
+        return 200, ret
+
 
 class RoomMembersRestServlet(RestServlet):
     """
@@ -617,55 +634,3 @@ class RoomEventContextServlet(RestServlet):
         )
 
         return 200, results
-
-
-async def _delete_room(
-    request: SynapseRequest,
-    room_id: str,
-    auth: "Auth",
-    room_shutdown_handler: "RoomShutdownHandler",
-    pagination_handler: "PaginationHandler",
-) -> Tuple[int, JsonDict]:
-    requester = await auth.get_user_by_req(request)
-    await assert_user_is_admin(auth, requester.user)
-
-    content = parse_json_object_from_request(request)
-
-    block = content.get("block", False)
-    if not isinstance(block, bool):
-        raise SynapseError(
-            HTTPStatus.BAD_REQUEST,
-            "Param 'block' must be a boolean, if given",
-            Codes.BAD_JSON,
-        )
-
-    purge = content.get("purge", True)
-    if not isinstance(purge, bool):
-        raise SynapseError(
-            HTTPStatus.BAD_REQUEST,
-            "Param 'purge' must be a boolean, if given",
-            Codes.BAD_JSON,
-        )
-
-    force_purge = content.get("force_purge", False)
-    if not isinstance(force_purge, bool):
-        raise SynapseError(
-            HTTPStatus.BAD_REQUEST,
-            "Param 'force_purge' must be a boolean, if given",
-            Codes.BAD_JSON,
-        )
-
-    ret = await room_shutdown_handler.shutdown_room(
-        room_id=room_id,
-        new_room_user_id=content.get("new_room_user_id"),
-        new_room_name=content.get("room_name"),
-        message=content.get("message"),
-        requester_user_id=requester.user.to_string(),
-        block=block,
-    )
-
-    # Purge room
-    if purge:
-        await pagination_handler.purge_room(room_id, force=force_purge)
-
-    return 200, ret
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index ed95189b6d..6a876cfa2f 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -914,7 +914,7 @@ class RoomTypingRestServlet(RestServlet):
         # If we're not on the typing writer instance we should scream if we get
         # requests.
         self._is_typing_writer = (
-            hs.config.worker.writers.typing == hs.get_instance_name()
+            hs.get_instance_name() in hs.config.worker.writers.typing
         )
 
     async def on_PUT(
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index 99f8156ad0..46f033eee2 100644
--- a/synapse/rest/client/room_batch.py
+++ b/synapse/rest/client/room_batch.py
@@ -131,20 +131,22 @@ class RoomBatchSendEventRestServlet(RestServlet):
             prev_event_ids_from_query
         )
 
+        state_event_ids_at_start = []
         # Create and persist all of the state events that float off on their own
         # before the batch. These will most likely be all of the invite/member
         # state events used to auth the upcoming historical messages.
-        state_event_ids_at_start = (
-            await self.room_batch_handler.persist_state_events_at_start(
-                state_events_at_start=body["state_events_at_start"],
-                room_id=room_id,
-                initial_auth_event_ids=auth_event_ids,
-                app_service_requester=requester,
+        if body["state_events_at_start"]:
+            state_event_ids_at_start = (
+                await self.room_batch_handler.persist_state_events_at_start(
+                    state_events_at_start=body["state_events_at_start"],
+                    room_id=room_id,
+                    initial_auth_event_ids=auth_event_ids,
+                    app_service_requester=requester,
+                )
             )
-        )
-        # Update our ongoing auth event ID list with all of the new state we
-        # just created
-        auth_event_ids.extend(state_event_ids_at_start)
+            # Update our ongoing auth event ID list with all of the new state we
+            # just created
+            auth_event_ids.extend(state_event_ids_at_start)
 
         inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids(
             prev_event_ids_from_query
@@ -191,14 +193,17 @@ class RoomBatchSendEventRestServlet(RestServlet):
                 depth=inherited_depth,
             )
 
-            batch_id_to_connect_to = base_insertion_event["content"][
+            batch_id_to_connect_to = base_insertion_event.content[
                 EventContentFields.MSC2716_NEXT_BATCH_ID
             ]
 
         # Also connect the historical event chain to the end of the floating
         # state chain, which causes the HS to ask for the state at the start of
-        # the batch later.
-        prev_event_ids = [state_event_ids_at_start[-1]]
+        # the batch later. If there is no state chain to connect to, just make
+        # the insertion event float itself.
+        prev_event_ids = []
+        if len(state_event_ids_at_start):
+            prev_event_ids = [state_event_ids_at_start[-1]]
 
         # Create and persist all of the historical events as well as insertion
         # and batch meta events to make the batch navigable in the DAG.
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index abd88a2d4f..244ba261bb 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -215,6 +215,8 @@ class MediaRepository:
         self.mark_recently_accessed(None, media_id)
 
         media_type = media_info["media_type"]
+        if not media_type:
+            media_type = "application/octet-stream"
         media_length = media_info["media_length"]
         upload_name = name if name else media_info["upload_name"]
         url_cache = media_info["url_cache"]
@@ -333,6 +335,9 @@ class MediaRepository:
                 logger.info("Media is quarantined")
                 raise NotFoundError()
 
+            if not media_info["media_type"]:
+                media_info["media_type"] = "application/octet-stream"
+
             responder = await self.media_storage.fetch_media(file_info)
             if responder:
                 return responder, media_info
@@ -354,6 +359,8 @@ class MediaRepository:
                 raise e
 
         file_id = media_info["filesystem_id"]
+        if not media_info["media_type"]:
+            media_info["media_type"] = "application/octet-stream"
         file_info = FileInfo(server_name, file_id)
 
         # We generate thumbnails even if another process downloaded the media
@@ -445,7 +452,10 @@ class MediaRepository:
 
             await finish()
 
-            media_type = headers[b"Content-Type"][0].decode("ascii")
+            if b"Content-Type" in headers:
+                media_type = headers[b"Content-Type"][0].decode("ascii")
+            else:
+                media_type = "application/octet-stream"
             upload_name = get_filename_from_headers(headers)
             time_now_ms = self.clock.time_msec()
 
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 278fd901e2..8ca97b5b18 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -718,9 +718,12 @@ def decode_body(
     if not body:
         return None
 
+    # The idea here is that multiple encodings are tried until one works.
+    # Unfortunately the result is never used and then LXML will decode the string
+    # again with the found encoding.
     for encoding in get_html_media_encodings(body, content_type):
         try:
-            body_str = body.decode(encoding)
+            body.decode(encoding)
         except Exception:
             pass
         else:
@@ -732,11 +735,11 @@ def decode_body(
     from lxml import etree
 
     # Create an HTML parser.
-    parser = etree.HTMLParser(recover=True, encoding="utf-8")
+    parser = etree.HTMLParser(recover=True, encoding=encoding)
 
     # Attempt to parse the body. Returns None if the body was successfully
     # parsed, but no tree was found.
-    return etree.fromstring(body_str, parser)
+    return etree.fromstring(body, parser)
 
 
 def _calc_og(tree: "etree.Element", media_uri: str) -> Dict[str, Optional[str]]:
diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index 7dcb1428e4..8162094cf6 100644
--- a/synapse/rest/media/v1/upload_resource.py
+++ b/synapse/rest/media/v1/upload_resource.py
@@ -80,7 +80,7 @@ class UploadResource(DirectServeJsonResource):
             assert content_type_headers  # for mypy
             media_type = content_type_headers[0].decode("ascii")
         else:
-            raise SynapseError(msg="Upload request missing 'Content-Type'", code=400)
+            media_type = "application/octet-stream"
 
         # if headers.hasHeader(b"Content-Disposition"):
         #     disposition = headers.getRawHeaders(b"Content-Disposition")[0]
diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py
index 7ac01faab4..edbf5ce5d0 100644
--- a/synapse/rest/well_known.py
+++ b/synapse/rest/well_known.py
@@ -21,6 +21,7 @@ from twisted.web.server import Request
 from synapse.http.server import set_cors_headers
 from synapse.types import JsonDict
 from synapse.util import json_encoder
+from synapse.util.stringutils import parse_server_name
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -47,8 +48,8 @@ class WellKnownBuilder:
         return result
 
 
-class WellKnownResource(Resource):
-    """A Twisted web resource which renders the .well-known file"""
+class ClientWellKnownResource(Resource):
+    """A Twisted web resource which renders the .well-known/matrix/client file"""
 
     isLeaf = 1
 
@@ -67,3 +68,45 @@ class WellKnownResource(Resource):
         logger.debug("returning: %s", r)
         request.setHeader(b"Content-Type", b"application/json")
         return json_encoder.encode(r).encode("utf-8")
+
+
+class ServerWellKnownResource(Resource):
+    """Resource for .well-known/matrix/server, redirecting to port 443"""
+
+    isLeaf = 1
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__()
+        self._serve_server_wellknown = hs.config.server.serve_server_wellknown
+
+        host, port = parse_server_name(hs.config.server.server_name)
+
+        # If we've got this far, then https://<server_name>/ must route to us, so
+        # we just redirect the traffic to port 443 instead of 8448.
+        if port is None:
+            port = 443
+
+        self._response = json_encoder.encode({"m.server": f"{host}:{port}"}).encode(
+            "utf-8"
+        )
+
+    def render_GET(self, request: Request) -> bytes:
+        if not self._serve_server_wellknown:
+            request.setResponseCode(404)
+            request.setHeader(b"Content-Type", b"text/plain")
+            return b"404. Is anything ever truly *well* known?\n"
+
+        request.setHeader(b"Content-Type", b"application/json")
+        return self._response
+
+
+def well_known_resource(hs: "HomeServer") -> Resource:
+    """Returns a Twisted web resource which handles '.well-known' requests"""
+    res = Resource()
+    matrix_resource = Resource()
+    res.putChild(b"matrix", matrix_resource)
+
+    matrix_resource.putChild(b"server", ServerWellKnownResource(hs))
+    matrix_resource.putChild(b"client", ClientWellKnownResource(hs))
+
+    return res
diff --git a/synapse/server.py b/synapse/server.py
index 0fbf36ba99..013a7bacaa 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -463,7 +463,7 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_typing_writer_handler(self) -> TypingWriterHandler:
-        if self.config.worker.writers.typing == self.get_instance_name():
+        if self.get_instance_name() in self.config.worker.writers.typing:
             return TypingWriterHandler(self)
         else:
             raise Exception("Workers cannot write typing")
@@ -474,7 +474,7 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_typing_handler(self) -> FollowerTypingHandler:
-        if self.config.worker.writers.typing == self.get_instance_name():
+        if self.get_instance_name() in self.config.worker.writers.typing:
             # Use get_typing_writer_handler to ensure that we use the same
             # cached version.
             return self.get_typing_writer_handler()
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 98a0239759..1605411b00 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -247,7 +247,7 @@ class StateHandler:
         return await self.get_hosts_in_room_at_events(room_id, event_ids)
 
     async def get_hosts_in_room_at_events(
-        self, room_id: str, event_ids: List[str]
+        self, room_id: str, event_ids: Iterable[str]
     ) -> Set[str]:
         """Get the hosts that were in a room at the given event ids
 
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 25e9c1efe1..264e625bd7 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -561,6 +561,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
     DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
     REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox"
+    REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox"
 
     def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
@@ -581,6 +582,11 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
             self._remove_deleted_devices_from_device_inbox,
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            self.REMOVE_HIDDEN_DEVICES,
+            self._remove_hidden_devices_from_device_inbox,
+        )
+
     async def _background_drop_index_device_inbox(self, progress, batch_size):
         def reindex_txn(conn):
             txn = conn.cursor()
@@ -676,6 +682,89 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
 
         return number_deleted
 
+    async def _remove_hidden_devices_from_device_inbox(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """A background update that deletes all device_inboxes for hidden devices.
+
+        This should only need to be run once (when users upgrade to v1.47.0)
+
+        Args:
+            progress: JsonDict used to store progress of this background update
+            batch_size: the maximum number of rows to retrieve in a single select query
+
+        Returns:
+            The number of deleted rows
+        """
+
+        def _remove_hidden_devices_from_device_inbox_txn(
+            txn: LoggingTransaction,
+        ) -> int:
+            """stream_id is not unique
+            we need to use an inclusive `stream_id >= ?` clause,
+            since we might not have deleted all hidden device messages for the stream_id
+            returned from the previous query
+
+            Then delete only rows matching the `(user_id, device_id, stream_id)` tuple,
+            to avoid problems of deleting a large number of rows all at once
+            due to a single device having lots of device messages.
+            """
+
+            last_stream_id = progress.get("stream_id", 0)
+
+            sql = """
+                SELECT device_id, user_id, stream_id
+                FROM device_inbox
+                WHERE
+                    stream_id >= ?
+                    AND (device_id, user_id) IN (
+                        SELECT device_id, user_id FROM devices WHERE hidden = ?
+                    )
+                ORDER BY stream_id
+                LIMIT ?
+            """
+
+            txn.execute(sql, (last_stream_id, True, batch_size))
+            rows = txn.fetchall()
+
+            num_deleted = 0
+            for row in rows:
+                num_deleted += self.db_pool.simple_delete_txn(
+                    txn,
+                    "device_inbox",
+                    {"device_id": row[0], "user_id": row[1], "stream_id": row[2]},
+                )
+
+            if rows:
+                # We don't just save the `stream_id` in progress as
+                # otherwise it can happen in large deployments that
+                # no change of status is visible in the log file, as
+                # it may be that the stream_id does not change in several runs
+                self.db_pool.updates._background_update_progress_txn(
+                    txn,
+                    self.REMOVE_HIDDEN_DEVICES,
+                    {
+                        "device_id": rows[-1][0],
+                        "user_id": rows[-1][1],
+                        "stream_id": rows[-1][2],
+                    },
+                )
+
+            return num_deleted
+
+        number_deleted = await self.db_pool.runInteraction(
+            "_remove_hidden_devices_from_device_inbox",
+            _remove_hidden_devices_from_device_inbox_txn,
+        )
+
+        # The task is finished when no more lines are deleted.
+        if not number_deleted:
+            await self.db_pool.updates._end_background_update(
+                self.REMOVE_HIDDEN_DEVICES
+            )
+
+        return number_deleted
+
 
 class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore):
     pass
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index b15cd030e0..9ccc66e589 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -427,7 +427,7 @@ class DeviceWorkerStore(SQLBaseStore):
             user_ids: the users who were signed
 
         Returns:
-            THe new stream ID.
+            The new stream ID.
         """
 
         async with self._device_list_id_gen.get_next() as stream_id:
@@ -1322,7 +1322,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
     async def add_device_change_to_streams(
         self, user_id: str, device_ids: Collection[str], hosts: List[str]
-    ):
+    ) -> int:
         """Persist that a user's devices have been updated, and which hosts
         (if any) should be poked.
         """
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 8d9086ecf0..596275c23c 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -24,6 +24,7 @@ from typing import (
     Iterable,
     List,
     Optional,
+    Sequence,
     Set,
     Tuple,
 )
@@ -494,7 +495,7 @@ class PersistEventsStore:
         event_chain_id_gen: SequenceGenerator,
         event_to_room_id: Dict[str, str],
         event_to_types: Dict[str, Tuple[str, str]],
-        event_to_auth_chain: Dict[str, List[str]],
+        event_to_auth_chain: Dict[str, Sequence[str]],
     ) -> None:
         """Calculate the chain cover index for the given events.
 
@@ -786,7 +787,7 @@ class PersistEventsStore:
         event_chain_id_gen: SequenceGenerator,
         event_to_room_id: Dict[str, str],
         event_to_types: Dict[str, Tuple[str, str]],
-        event_to_auth_chain: Dict[str, List[str]],
+        event_to_auth_chain: Dict[str, Sequence[str]],
         events_to_calc_chain_id_for: Set[str],
         chain_map: Dict[str, Tuple[int, int]],
     ) -> Dict[str, Tuple[int, int]]:
@@ -1794,7 +1795,7 @@ class PersistEventsStore:
         )
 
         # Insert an edge for every prev_event connection
-        for prev_event_id in event.prev_events:
+        for prev_event_id in event.prev_event_ids():
             self.db_pool.simple_insert_txn(
                 txn,
                 table="insertion_event_edges",
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index ae37901be9..c6bf316d5b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -28,6 +28,7 @@ from typing import (
 
 import attr
 from constantly import NamedConstant, Names
+from prometheus_client import Gauge
 from typing_extensions import Literal
 
 from twisted.internet import defer
@@ -81,6 +82,12 @@ EVENT_QUEUE_ITERATIONS = 3  # No. times we block waiting for requests for events
 EVENT_QUEUE_TIMEOUT_S = 0.1  # Timeout when waiting for requests for events
 
 
+event_fetch_ongoing_gauge = Gauge(
+    "synapse_event_fetch_ongoing",
+    "The number of event fetchers that are running",
+)
+
+
 @attr.s(slots=True, auto_attribs=True)
 class _EventCacheEntry:
     event: EventBase
@@ -222,6 +229,7 @@ class EventsWorkerStore(SQLBaseStore):
         self._event_fetch_lock = threading.Condition()
         self._event_fetch_list = []
         self._event_fetch_ongoing = 0
+        event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
 
         # We define this sequence here so that it can be referenced from both
         # the DataStore and PersistEventStore.
@@ -732,28 +740,31 @@ class EventsWorkerStore(SQLBaseStore):
         """Takes a database connection and waits for requests for events from
         the _event_fetch_list queue.
         """
-        i = 0
-        while True:
-            with self._event_fetch_lock:
-                event_list = self._event_fetch_list
-                self._event_fetch_list = []
-
-                if not event_list:
-                    single_threaded = self.database_engine.single_threaded
-                    if (
-                        not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
-                        or single_threaded
-                        or i > EVENT_QUEUE_ITERATIONS
-                    ):
-                        self._event_fetch_ongoing -= 1
-                        return
-                    else:
-                        self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
-                        i += 1
-                        continue
-                i = 0
-
-            self._fetch_event_list(conn, event_list)
+        try:
+            i = 0
+            while True:
+                with self._event_fetch_lock:
+                    event_list = self._event_fetch_list
+                    self._event_fetch_list = []
+
+                    if not event_list:
+                        single_threaded = self.database_engine.single_threaded
+                        if (
+                            not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING
+                            or single_threaded
+                            or i > EVENT_QUEUE_ITERATIONS
+                        ):
+                            break
+                        else:
+                            self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
+                            i += 1
+                            continue
+                    i = 0
+
+                self._fetch_event_list(conn, event_list)
+        finally:
+            self._event_fetch_ongoing -= 1
+            event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
 
     def _fetch_event_list(
         self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]]
@@ -977,6 +988,7 @@ class EventsWorkerStore(SQLBaseStore):
 
             if self._event_fetch_ongoing < EVENT_QUEUE_THREADS:
                 self._event_fetch_ongoing += 1
+                event_fetch_ongoing_gauge.set(self._event_fetch_ongoing)
                 should_start = True
             else:
                 should_start = False
diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py
index e70d3649ff..bb621df0dd 100644
--- a/synapse/storage/databases/main/group_server.py
+++ b/synapse/storage/databases/main/group_server.py
@@ -13,15 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Any, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
 
 from typing_extensions import TypedDict
 
 from synapse.api.errors import SynapseError
 from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.storage.database import DatabasePool
+from synapse.storage.types import Connection
 from synapse.types import JsonDict
 from synapse.util import json_encoder
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 # The category ID for the "default" category. We don't store as null in the
 # database to avoid the fun of null != null
 _DEFAULT_CATEGORY_ID = ""
@@ -35,6 +40,16 @@ class _RoomInGroup(TypedDict):
 
 
 class GroupServerWorkerStore(SQLBaseStore):
+    def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
+        database.updates.register_background_index_update(
+            update_name="local_group_updates_index",
+            index_name="local_group_updates_stream_id_index",
+            table="local_group_updates",
+            columns=("stream_id",),
+            unique=True,
+        )
+        super().__init__(database, db_conn, hs)
+
     async def get_group(self, group_id: str) -> Optional[Dict[str, Any]]:
         return await self.db_pool.simple_select_one(
             table="groups",
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 3d1dff660b..3d0df0cbd4 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -14,6 +14,7 @@
 import logging
 from types import TracebackType
 from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type
+from weakref import WeakValueDictionary
 
 from twisted.internet.interfaces import IReactorCore
 
@@ -61,7 +62,7 @@ class LockStore(SQLBaseStore):
 
         # A map from `(lock_name, lock_key)` to the token of any locks that we
         # think we currently hold.
-        self._live_tokens: Dict[Tuple[str, str], str] = {}
+        self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary()
 
         # When we shut down we want to remove the locks. Technically this can
         # lead to a race, as we may drop the lock while we are still processing.
@@ -80,10 +81,10 @@ class LockStore(SQLBaseStore):
 
         # We need to take a copy of the tokens dict as dropping the locks will
         # cause the dictionary to change.
-        tokens = dict(self._live_tokens)
+        locks = dict(self._live_tokens)
 
-        for (lock_name, lock_key), token in tokens.items():
-            await self._drop_lock(lock_name, lock_key, token)
+        for lock in locks.values():
+            await lock.release()
 
         logger.info("Dropped locks due to shutdown")
 
@@ -93,6 +94,11 @@ class LockStore(SQLBaseStore):
         used (otherwise the lock will leak).
         """
 
+        # Check if this process has taken out a lock and if it's still valid.
+        lock = self._live_tokens.get((lock_name, lock_key))
+        if lock and await lock.is_still_valid():
+            return None
+
         now = self._clock.time_msec()
         token = random_string(6)
 
@@ -100,7 +106,9 @@ class LockStore(SQLBaseStore):
 
             def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
                 # We take out the lock if either a) there is no row for the lock
-                # already or b) the existing row has timed out.
+                # already, b) the existing row has timed out, or c) the row is
+                # for this instance (which means the process got killed and
+                # restarted)
                 sql = """
                     INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
                     VALUES (?, ?, ?, ?, ?)
@@ -112,6 +120,7 @@ class LockStore(SQLBaseStore):
                             last_renewed_ts = EXCLUDED.last_renewed_ts
                         WHERE
                             worker_locks.last_renewed_ts < ?
+                            OR worker_locks.instance_name = EXCLUDED.instance_name
                 """
                 txn.execute(
                     sql,
@@ -148,11 +157,11 @@ class LockStore(SQLBaseStore):
                     WHERE
                         lock_name = ?
                         AND lock_key = ?
-                        AND last_renewed_ts < ?
+                        AND (last_renewed_ts < ? OR instance_name = ?)
                 """
                 txn.execute(
                     sql,
-                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS),
+                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
                 )
 
                 inserted = self.db_pool.simple_upsert_txn_emulated(
@@ -179,9 +188,7 @@ class LockStore(SQLBaseStore):
             if not did_lock:
                 return None
 
-        self._live_tokens[(lock_name, lock_key)] = token
-
-        return Lock(
+        lock = Lock(
             self._reactor,
             self._clock,
             self,
@@ -190,6 +197,10 @@ class LockStore(SQLBaseStore):
             token=token,
         )
 
+        self._live_tokens[(lock_name, lock_key)] = lock
+
+        return lock
+
     async def _is_lock_still_valid(
         self, lock_name: str, lock_key: str, token: str
     ) -> bool:
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index 12cf6995eb..cc0eebdb46 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -92,7 +92,7 @@ class PresenceStore(PresenceBackgroundUpdateStore):
             prefilled_cache=presence_cache_prefill,
         )
 
-    async def update_presence(self, presence_states):
+    async def update_presence(self, presence_states) -> Tuple[int, int]:
         assert self._can_persist_presence
 
         stream_ordering_manager = self._presence_id_gen.get_next_mult(
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index f879bbe7c7..cefc77fa0f 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -412,22 +412,33 @@ class RoomWorkerStore(SQLBaseStore):
             limit: maximum amount of rooms to retrieve
             order_by: the sort order of the returned list
             reverse_order: whether to reverse the room list
-            search_term: a string to filter room names by
+            search_term: a string to filter room names,
+                canonical alias and room ids by.
+                Room ID must match exactly. Canonical alias must match a substring of the local part.
         Returns:
             A list of room dicts and an integer representing the total number of
             rooms that exist given this query
         """
         # Filter room names by a string
         where_statement = ""
+        search_pattern = []
         if search_term:
-            where_statement = "WHERE LOWER(state.name) LIKE ?"
+            where_statement = """
+                WHERE LOWER(state.name) LIKE ?
+                OR LOWER(state.canonical_alias) LIKE ?
+                OR state.room_id = ?
+            """
 
             # Our postgres db driver converts ? -> %s in SQL strings as that's the
             # placeholder for postgres.
             # HOWEVER, if you put a % into your SQL then everything goes wibbly.
             # To get around this, we're going to surround search_term with %'s
             # before giving it to the database in python instead
-            search_term = "%" + search_term.lower() + "%"
+            search_pattern = [
+                "%" + search_term.lower() + "%",
+                "#%" + search_term.lower() + "%:%",
+                search_term,
+            ]
 
         # Set ordering
         if RoomSortOrder(order_by) == RoomSortOrder.SIZE:
@@ -519,12 +530,9 @@ class RoomWorkerStore(SQLBaseStore):
         )
 
         def _get_rooms_paginate_txn(txn):
-            # Execute the data query
-            sql_values = (limit, start)
-            if search_term:
-                # Add the search term into the WHERE clause
-                sql_values = (search_term,) + sql_values
-            txn.execute(info_sql, sql_values)
+            # Add the search term into the WHERE clause
+            # and execute the data query
+            txn.execute(info_sql, search_pattern + [limit, start])
 
             # Refactor room query data into a structured dictionary
             rooms = []
@@ -551,8 +559,7 @@ class RoomWorkerStore(SQLBaseStore):
             # Execute the count query
 
             # Add the search term into the WHERE clause if present
-            sql_values = (search_term,) if search_term else ()
-            txn.execute(count_sql, sql_values)
+            txn.execute(count_sql, search_pattern)
 
             room_count = txn.fetchone()
             return rooms, room_count[0]
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 4b288bb2e7..033a9831d6 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -570,7 +570,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
     async def get_joined_users_from_context(
         self, event: EventBase, context: EventContext
-    ):
+    ) -> Dict[str, ProfileInfo]:
         state_group = context.state_group
         if not state_group:
             # If state_group is None it means it has yet to be assigned a
@@ -584,7 +584,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             event.room_id, state_group, current_state_ids, event=event, context=context
         )
 
-    async def get_joined_users_from_state(self, room_id, state_entry):
+    async def get_joined_users_from_state(
+        self, room_id, state_entry
+    ) -> Dict[str, ProfileInfo]:
         state_group = state_entry.state_group
         if not state_group:
             # If state_group is None it means it has yet to be assigned a
@@ -607,7 +609,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         cache_context,
         event=None,
         context=None,
-    ):
+    ) -> Dict[str, ProfileInfo]:
         # We don't use `state_group`, it's there so that we can cache based
         # on it. However, it's important that it's never None, since two current_states
         # with a state_group of None are likely to be different.
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 1629d2a53c..b5c1c14ee3 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -133,22 +133,23 @@ def prepare_database(
 
             # if it's a worker app, refuse to upgrade the database, to avoid multiple
             # workers doing it at once.
-            if (
-                config.worker.worker_app is not None
-                and version_info.current_version != SCHEMA_VERSION
-            ):
+            if config.worker.worker_app is None:
+                _upgrade_existing_database(
+                    cur,
+                    version_info,
+                    database_engine,
+                    config,
+                    databases=databases,
+                )
+            elif version_info.current_version < SCHEMA_VERSION:
+                # If the DB is on an older version than we expect the we refuse
+                # to start the worker (as the main process needs to run first to
+                # update the schema).
                 raise UpgradeDatabaseException(
                     OUTDATED_SCHEMA_ON_WORKER_ERROR
                     % (SCHEMA_VERSION, version_info.current_version)
                 )
 
-            _upgrade_existing_database(
-                cur,
-                version_info,
-                database_engine,
-                config,
-                databases=databases,
-            )
         else:
             logger.info("%r: Initialising new database", databases)
 
diff --git a/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql
new file mode 100644
index 0000000000..7b3592dcf0
--- /dev/null
+++ b/synapse/storage/schema/main/delta/65/03remove_hidden_devices_from_device_inbox.sql
@@ -0,0 +1,22 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- Remove messages from the device_inbox table which were orphaned
+-- because a device was hidden using Synapse earlier than 1.47.0.
+-- This runs as background task, but may take a bit to finish.
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (6503, 'remove_hidden_devices_from_device_inbox', '{}');
diff --git a/synapse/storage/schema/main/delta/65/04_local_group_updates.sql b/synapse/storage/schema/main/delta/65/04_local_group_updates.sql
new file mode 100644
index 0000000000..a178abfe12
--- /dev/null
+++ b/synapse/storage/schema/main/delta/65/04_local_group_updates.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Check index on `local_group_updates.stream_id`.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (6504, 'local_group_updates_index', '{}');
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 5df80ea8e7..96efc5f3e3 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -22,11 +22,11 @@ from typing import (
     Any,
     Awaitable,
     Callable,
+    Collection,
     Dict,
     Generic,
     Hashable,
     Iterable,
-    List,
     Optional,
     Set,
     TypeVar,
@@ -76,12 +76,17 @@ class ObservableDeferred(Generic[_T]):
     def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False):
         object.__setattr__(self, "_deferred", deferred)
         object.__setattr__(self, "_result", None)
-        object.__setattr__(self, "_observers", set())
+        object.__setattr__(self, "_observers", [])
 
         def callback(r):
             object.__setattr__(self, "_result", (True, r))
-            while self._observers:
-                observer = self._observers.pop()
+
+            # once we have set _result, no more entries will be added to _observers,
+            # so it's safe to replace it with the empty tuple.
+            observers = self._observers
+            object.__setattr__(self, "_observers", ())
+
+            for observer in observers:
                 try:
                     observer.callback(r)
                 except Exception as e:
@@ -95,12 +100,16 @@ class ObservableDeferred(Generic[_T]):
 
         def errback(f):
             object.__setattr__(self, "_result", (False, f))
-            while self._observers:
+
+            # once we have set _result, no more entries will be added to _observers,
+            # so it's safe to replace it with the empty tuple.
+            observers = self._observers
+            object.__setattr__(self, "_observers", ())
+
+            for observer in observers:
                 # This is a little bit of magic to correctly propagate stack
                 # traces when we `await` on one of the observer deferreds.
                 f.value.__failure__ = f
-
-                observer = self._observers.pop()
                 try:
                     observer.errback(f)
                 except Exception as e:
@@ -127,20 +136,13 @@ class ObservableDeferred(Generic[_T]):
         """
         if not self._result:
             d: "defer.Deferred[_T]" = defer.Deferred()
-
-            def remove(r):
-                self._observers.discard(d)
-                return r
-
-            d.addBoth(remove)
-
-            self._observers.add(d)
+            self._observers.append(d)
             return d
         else:
             success, res = self._result
             return defer.succeed(res) if success else defer.fail(res)
 
-    def observers(self) -> "List[defer.Deferred[_T]]":
+    def observers(self) -> "Collection[defer.Deferred[_T]]":
         return self._observers
 
     def has_called(self) -> bool:
diff --git a/sytest-blacklist b/sytest-blacklist
index 65bf1774e3..57e603a4a6 100644
--- a/sytest-blacklist
+++ b/sytest-blacklist
@@ -32,3 +32,6 @@ We can't peek into rooms with invited history_visibility
 We can't peek into rooms with joined history_visibility
 Local users can peek by room alias
 Peeked rooms only turn up in the sync for the device who peeked them
+
+# Validation needs to be added to Synapse: #10554
+Rejects invalid device keys
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 43998020b2..1f6a924452 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -40,6 +40,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
         hs.get_application_service_scheduler.return_value = self.mock_scheduler
         hs.get_clock.return_value = MockClock()
         self.handler = ApplicationServicesHandler(hs)
+        self.event_source = hs.get_event_sources()
 
     def test_notify_interested_services(self):
         interested_service = self._mkservice(is_interested=True)
@@ -252,6 +253,56 @@ class AppServiceHandlerTestCase(unittest.TestCase):
             },
         )
 
+    def test_notify_interested_services_ephemeral(self):
+        """
+        Test sending ephemeral events to the appservice handler are scheduled
+        to be pushed out to interested appservices, and that the stream ID is
+        updated accordingly.
+        """
+        interested_service = self._mkservice(is_interested=True)
+        services = [interested_service]
+
+        self.mock_store.get_app_services.return_value = services
+        self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
+            579
+        )
+
+        event = Mock(event_id="event_1")
+        self.event_source.sources.receipt.get_new_events_as.return_value = (
+            make_awaitable(([event], None))
+        )
+
+        self.handler.notify_interested_services_ephemeral("receipt_key", 580)
+        self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
+            interested_service, [event]
+        )
+        self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with(
+            interested_service,
+            "read_receipt",
+            580,
+        )
+
+    def test_notify_interested_services_ephemeral_out_of_order(self):
+        """
+        Test sending out of order ephemeral events to the appservice handler
+        are ignored.
+        """
+        interested_service = self._mkservice(is_interested=True)
+        services = [interested_service]
+
+        self.mock_store.get_app_services.return_value = services
+        self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
+            580
+        )
+
+        event = Mock(event_id="event_1")
+        self.event_source.sources.receipt.get_new_events_as.return_value = (
+            make_awaitable(([event], None))
+        )
+
+        self.handler.notify_interested_services_ephemeral("receipt_key", 579)
+        self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called()
+
     def _mkservice(self, is_interested, protocols=None):
         service = Mock()
         service.is_interested.return_value = make_awaitable(is_interested)
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index 525b83141b..d16cd141a7 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -116,7 +116,6 @@ class ModuleApiTestCase(HomeserverTestCase):
 
         # Insert a second ip, agent at a later date. We should be able to retrieve it.
         last_seen_2 = last_seen_1 + 10000
-        print("%s => %s" % (last_seen_1, last_seen_2))
         self.get_success(
             self.store.insert_client_ip(
                 user_id, "access_token", "ip_2", "user_agent_2", "device_2", last_seen_2
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index 0fa55e03b4..46116644ce 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -17,8 +17,6 @@ import urllib.parse
 from typing import List, Optional
 from unittest.mock import Mock
 
-from parameterized import parameterized_class
-
 import synapse.rest.admin
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import Codes
@@ -29,13 +27,6 @@ from tests import unittest
 """Tests admin REST events for /rooms paths."""
 
 
-@parameterized_class(
-    ("method", "url_template"),
-    [
-        ("POST", "/_synapse/admin/v1/rooms/%s/delete"),
-        ("DELETE", "/_synapse/admin/v1/rooms/%s"),
-    ],
-)
 class DeleteRoomTestCase(unittest.HomeserverTestCase):
     servlets = [
         synapse.rest.admin.register_servlets,
@@ -67,7 +58,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         self.room_id = self.helper.create_room_as(
             self.other_user, tok=self.other_user_tok
         )
-        self.url = self.url_template % self.room_id
+        self.url = "/_synapse/admin/v1/rooms/%s" % self.room_id
 
     def test_requester_is_no_admin(self):
         """
@@ -75,7 +66,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         """
 
         channel = self.make_request(
-            self.method,
+            "DELETE",
             self.url,
             json.dumps({}),
             access_token=self.other_user_tok,
@@ -88,10 +79,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         """
         Check that unknown rooms/server return error 404.
         """
-        url = self.url_template % "!unknown:test"
+        url = "/_synapse/admin/v1/rooms/%s" % "!unknown:test"
 
         channel = self.make_request(
-            self.method,
+            "DELETE",
             url,
             json.dumps({}),
             access_token=self.admin_user_tok,
@@ -104,10 +95,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         """
         Check that invalid room names, return an error 400.
         """
-        url = self.url_template % "invalidroom"
+        url = "/_synapse/admin/v1/rooms/%s" % "invalidroom"
 
         channel = self.make_request(
-            self.method,
+            "DELETE",
             url,
             json.dumps({}),
             access_token=self.admin_user_tok,
@@ -126,7 +117,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         body = json.dumps({"new_room_user_id": "@unknown:test"})
 
         channel = self.make_request(
-            self.method,
+            "DELETE",
             self.url,
             content=body.encode(encoding="utf_8"),
             access_token=self.admin_user_tok,
@@ -145,7 +136,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         body = json.dumps({"new_room_user_id": "@not:exist.bla"})
 
         channel = self.make_request(
-            self.method,
+            "DELETE",
             self.url,
             content=body.encode(encoding="utf_8"),
             access_token=self.admin_user_tok,
@@ -164,7 +155,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         body = json.dumps({"block": "NotBool"})
 
         channel = self.make_request(
-            self.method,
+            "DELETE",
             self.url,
             content=body.encode(encoding="utf_8"),
             access_token=self.admin_user_tok,
@@ -180,7 +171,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         body = json.dumps({"purge": "NotBool"})
 
         channel = self.make_request(
-            self.method,
+            "DELETE",
             self.url,
             content=body.encode(encoding="utf_8"),
             access_token=self.admin_user_tok,
@@ -206,7 +197,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         body = json.dumps({"block": True, "purge": True})
 
         channel = self.make_request(
-            self.method,
+            "DELETE",
             self.url.encode("ascii"),
             content=body.encode(encoding="utf_8"),
             access_token=self.admin_user_tok,
@@ -239,7 +230,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         body = json.dumps({"block": False, "purge": True})
 
         channel = self.make_request(
-            self.method,
+            "DELETE",
             self.url.encode("ascii"),
             content=body.encode(encoding="utf_8"),
             access_token=self.admin_user_tok,
@@ -270,10 +261,10 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
         # Assert one user in room
         self._is_member(room_id=self.room_id, user_id=self.other_user)
 
-        body = json.dumps({"block": False, "purge": False})
+        body = json.dumps({"block": True, "purge": False})
 
         channel = self.make_request(
-            self.method,
+            "DELETE",
             self.url.encode("ascii"),
             content=body.encode(encoding="utf_8"),
             access_token=self.admin_user_tok,
@@ -287,7 +278,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
 
         with self.assertRaises(AssertionError):
             self._is_purged(self.room_id)
-        self._is_blocked(self.room_id, expect=False)
+        self._is_blocked(self.room_id, expect=True)
         self._has_no_members(self.room_id)
 
     def test_shutdown_room_consent(self):
@@ -319,7 +310,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
 
         # Test that the admin can still send shutdown
         channel = self.make_request(
-            self.method,
+            "DELETE",
             self.url,
             json.dumps({"new_room_user_id": self.admin_user}),
             access_token=self.admin_user_tok,
@@ -365,7 +356,7 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase):
 
         # Test that the admin can still send shutdown
         channel = self.make_request(
-            self.method,
+            "DELETE",
             self.url,
             json.dumps({"new_room_user_id": self.admin_user}),
             access_token=self.admin_user_tok,
@@ -689,36 +680,6 @@ class RoomTestCase(unittest.HomeserverTestCase):
         reversing the order, etc.
         """
 
-        def _set_canonical_alias(room_id: str, test_alias: str, admin_user_tok: str):
-            # Create a new alias to this room
-            url = "/_matrix/client/r0/directory/room/%s" % (
-                urllib.parse.quote(test_alias),
-            )
-            channel = self.make_request(
-                "PUT",
-                url.encode("ascii"),
-                {"room_id": room_id},
-                access_token=admin_user_tok,
-            )
-            self.assertEqual(
-                200, int(channel.result["code"]), msg=channel.result["body"]
-            )
-
-            # Set this new alias as the canonical alias for this room
-            self.helper.send_state(
-                room_id,
-                "m.room.aliases",
-                {"aliases": [test_alias]},
-                tok=admin_user_tok,
-                state_key="test",
-            )
-            self.helper.send_state(
-                room_id,
-                "m.room.canonical_alias",
-                {"alias": test_alias},
-                tok=admin_user_tok,
-            )
-
         def _order_test(
             order_type: str,
             expected_room_list: List[str],
@@ -790,9 +751,9 @@ class RoomTestCase(unittest.HomeserverTestCase):
         )
 
         # Set room canonical room aliases
-        _set_canonical_alias(room_id_1, "#A_alias:test", self.admin_user_tok)
-        _set_canonical_alias(room_id_2, "#B_alias:test", self.admin_user_tok)
-        _set_canonical_alias(room_id_3, "#C_alias:test", self.admin_user_tok)
+        self._set_canonical_alias(room_id_1, "#A_alias:test", self.admin_user_tok)
+        self._set_canonical_alias(room_id_2, "#B_alias:test", self.admin_user_tok)
+        self._set_canonical_alias(room_id_3, "#C_alias:test", self.admin_user_tok)
 
         # Set room member size in the reverse order. room 1 -> 1 member, 2 -> 2, 3 -> 3
         user_1 = self.register_user("bob1", "pass")
@@ -859,7 +820,7 @@ class RoomTestCase(unittest.HomeserverTestCase):
         room_id_2 = self.helper.create_room_as(self.admin_user, tok=self.admin_user_tok)
 
         room_name_1 = "something"
-        room_name_2 = "else"
+        room_name_2 = "LoremIpsum"
 
         # Set the name for each room
         self.helper.send_state(
@@ -875,6 +836,8 @@ class RoomTestCase(unittest.HomeserverTestCase):
             tok=self.admin_user_tok,
         )
 
+        self._set_canonical_alias(room_id_1, "#Room_Alias1:test", self.admin_user_tok)
+
         def _search_test(
             expected_room_id: Optional[str],
             search_term: str,
@@ -923,24 +886,36 @@ class RoomTestCase(unittest.HomeserverTestCase):
                 r = rooms[0]
                 self.assertEqual(expected_room_id, r["room_id"])
 
-        # Perform search tests
+        # Test searching by room name
         _search_test(room_id_1, "something")
         _search_test(room_id_1, "thing")
 
-        _search_test(room_id_2, "else")
-        _search_test(room_id_2, "se")
+        _search_test(room_id_2, "LoremIpsum")
+        _search_test(room_id_2, "lorem")
 
         # Test case insensitive
         _search_test(room_id_1, "SOMETHING")
         _search_test(room_id_1, "THING")
 
-        _search_test(room_id_2, "ELSE")
-        _search_test(room_id_2, "SE")
+        _search_test(room_id_2, "LOREMIPSUM")
+        _search_test(room_id_2, "LOREM")
 
         _search_test(None, "foo")
         _search_test(None, "bar")
         _search_test(None, "", expected_http_code=400)
 
+        # Test that the whole room id returns the room
+        _search_test(room_id_1, room_id_1)
+        # Test that the search by room_id is case sensitive
+        _search_test(None, room_id_1.lower())
+        # Test search part of local part of room id do not match
+        _search_test(None, room_id_1[1:10])
+
+        # Test that whole room alias return no result, because of domain
+        _search_test(None, "#Room_Alias1:test")
+        # Test search local part of alias
+        _search_test(room_id_1, "alias1")
+
     def test_search_term_non_ascii(self):
         """Test that searching for a room with non-ASCII characters works correctly"""
 
@@ -1123,6 +1098,32 @@ class RoomTestCase(unittest.HomeserverTestCase):
         # the create_room already does the right thing, so no need to verify that we got
         # the state events it created.
 
+    def _set_canonical_alias(self, room_id: str, test_alias: str, admin_user_tok: str):
+        # Create a new alias to this room
+        url = "/_matrix/client/r0/directory/room/%s" % (urllib.parse.quote(test_alias),)
+        channel = self.make_request(
+            "PUT",
+            url.encode("ascii"),
+            {"room_id": room_id},
+            access_token=admin_user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # Set this new alias as the canonical alias for this room
+        self.helper.send_state(
+            room_id,
+            "m.room.aliases",
+            {"aliases": [test_alias]},
+            tok=admin_user_tok,
+            state_key="test",
+        )
+        self.helper.send_state(
+            room_id,
+            "m.room.canonical_alias",
+            {"alias": test_alias},
+            tok=admin_user_tok,
+        )
+
 
 class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
 
diff --git a/tests/rest/client/test_third_party_rules.py b/tests/rest/client/test_third_party_rules.py
index 1c42c46630..4e71b6ec12 100644
--- a/tests/rest/client/test_third_party_rules.py
+++ b/tests/rest/client/test_third_party_rules.py
@@ -216,19 +216,9 @@ class ThirdPartyRulesTestCase(unittest.FederatingHomeserverTestCase):
             {"x": "x"},
             access_token=self.tok,
         )
-        # check_event_allowed has some error handling, so it shouldn't 500 just because a
-        # module did something bad.
-        self.assertEqual(channel.code, 200, channel.result)
-        event_id = channel.json_body["event_id"]
-
-        channel = self.make_request(
-            "GET",
-            "/_matrix/client/r0/rooms/%s/event/%s" % (self.room_id, event_id),
-            access_token=self.tok,
-        )
-        self.assertEqual(channel.code, 200, channel.result)
-        ev = channel.json_body
-        self.assertEqual(ev["content"]["x"], "x")
+        # Because check_event_allowed raises an exception, it leads to a
+        # 500 Internal Server Error
+        self.assertEqual(channel.code, 500, channel.result)
 
     def test_modify_event(self):
         """The module can return a modified version of the event"""
diff --git a/tests/rest/media/v1/test_media_storage.py b/tests/rest/media/v1/test_media_storage.py
index 4ae00755c9..4cf1ed5ddf 100644
--- a/tests/rest/media/v1/test_media_storage.py
+++ b/tests/rest/media/v1/test_media_storage.py
@@ -248,7 +248,7 @@ class MediaRepoTests(unittest.HomeserverTestCase):
 
         self.media_id = "example.com/12345"
 
-    def _req(self, content_disposition):
+    def _req(self, content_disposition, include_content_type=True):
 
         channel = make_request(
             self.reactor,
@@ -271,8 +271,11 @@ class MediaRepoTests(unittest.HomeserverTestCase):
 
         headers = {
             b"Content-Length": [b"%d" % (len(self.test_image.data))],
-            b"Content-Type": [self.test_image.content_type],
         }
+
+        if include_content_type:
+            headers[b"Content-Type"] = [self.test_image.content_type]
+
         if content_disposition:
             headers[b"Content-Disposition"] = [content_disposition]
 
@@ -285,6 +288,17 @@ class MediaRepoTests(unittest.HomeserverTestCase):
 
         return channel
 
+    def test_handle_missing_content_type(self):
+        channel = self._req(
+            b"inline; filename=out" + self.test_image.extension,
+            include_content_type=False,
+        )
+        headers = channel.headers
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(
+            headers.getRawHeaders(b"Content-Type"), [b"application/octet-stream"]
+        )
+
     def test_disposition_filename_ascii(self):
         """
         If the filename is filename=<ascii> then Synapse will decode it as an
diff --git a/tests/rest/test_well_known.py b/tests/rest/test_well_known.py
index b2c0279ba0..118aa93a32 100644
--- a/tests/rest/test_well_known.py
+++ b/tests/rest/test_well_known.py
@@ -11,17 +11,19 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from twisted.web.resource import Resource
 
-
-from synapse.rest.well_known import WellKnownResource
+from synapse.rest.well_known import well_known_resource
 
 from tests import unittest
 
 
 class WellKnownTests(unittest.HomeserverTestCase):
     def create_test_resource(self):
-        # replace the JsonResource with a WellKnownResource
-        return WellKnownResource(self.hs)
+        # replace the JsonResource with a Resource wrapping the WellKnownResource
+        res = Resource()
+        res.putChild(b".well-known", well_known_resource(self.hs))
+        return res
 
     @unittest.override_config(
         {
@@ -29,7 +31,7 @@ class WellKnownTests(unittest.HomeserverTestCase):
             "default_identity_server": "https://testis",
         }
     )
-    def test_well_known(self):
+    def test_client_well_known(self):
         channel = self.make_request(
             "GET", "/.well-known/matrix/client", shorthand=False
         )
@@ -48,9 +50,27 @@ class WellKnownTests(unittest.HomeserverTestCase):
             "public_baseurl": None,
         }
     )
-    def test_well_known_no_public_baseurl(self):
+    def test_client_well_known_no_public_baseurl(self):
         channel = self.make_request(
             "GET", "/.well-known/matrix/client", shorthand=False
         )
 
         self.assertEqual(channel.code, 404)
+
+    @unittest.override_config({"serve_server_wellknown": True})
+    def test_server_well_known(self):
+        channel = self.make_request(
+            "GET", "/.well-known/matrix/server", shorthand=False
+        )
+
+        self.assertEqual(channel.code, 200)
+        self.assertEqual(
+            channel.json_body,
+            {"m.server": "test:443"},
+        )
+
+    def test_server_well_known_disabled(self):
+        channel = self.make_request(
+            "GET", "/.well-known/matrix/server", shorthand=False
+        )
+        self.assertEqual(channel.code, 404)
diff --git a/tests/storage/databases/main/test_deviceinbox.py b/tests/storage/databases/main/test_deviceinbox.py
index 4cfd2677f7..4b67bd15b7 100644
--- a/tests/storage/databases/main/test_deviceinbox.py
+++ b/tests/storage/databases/main/test_deviceinbox.py
@@ -88,3 +88,77 @@ class DeviceInboxBackgroundUpdateStoreTestCase(HomeserverTestCase):
         )
         self.assertEqual(1, len(res))
         self.assertEqual(res[0], "cur_device")
+
+    def test_background_remove_hidden_devices_from_device_inbox(self):
+        """Test that the background task to delete hidden devices
+        from device_inboxes works properly."""
+
+        # create a valid device
+        self.get_success(
+            self.store.store_device(self.user_id, "cur_device", "display_name")
+        )
+
+        # create a hidden device
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                "devices",
+                values={
+                    "user_id": self.user_id,
+                    "device_id": "hidden_device",
+                    "display_name": "hidden_display_name",
+                    "hidden": True,
+                },
+            )
+        )
+
+        # Add device_inbox to devices
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                "device_inbox",
+                {
+                    "user_id": self.user_id,
+                    "device_id": "cur_device",
+                    "stream_id": 1,
+                    "message_json": "{}",
+                },
+            )
+        )
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                "device_inbox",
+                {
+                    "user_id": self.user_id,
+                    "device_id": "hidden_device",
+                    "stream_id": 2,
+                    "message_json": "{}",
+                },
+            )
+        )
+
+        # Insert and run the background update.
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                "background_updates",
+                {
+                    "update_name": "remove_hidden_devices_from_device_inbox",
+                    "progress_json": "{}",
+                },
+            )
+        )
+
+        # ... and tell the DataStore that it hasn't finished all updates yet
+        self.store.db_pool.updates._all_done = False
+
+        self.wait_for_background_updates()
+
+        # Make sure the background task deleted hidden devices from device_inbox
+        res = self.get_success(
+            self.store.db_pool.simple_select_onecol(
+                table="device_inbox",
+                keyvalues={},
+                retcol="device_id",
+                desc="get_device_id_from_device_inbox",
+            )
+        )
+        self.assertEqual(1, len(res))
+        self.assertEqual(res[0], "cur_device")
diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py
new file mode 100644
index 0000000000..a6be9a1bb1
--- /dev/null
+++ b/tests/storage/test_rollback_worker.py
@@ -0,0 +1,69 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.app.generic_worker import GenericWorkerServer
+from synapse.storage.database import LoggingDatabaseConnection
+from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database
+from synapse.storage.schema import SCHEMA_VERSION
+
+from tests.unittest import HomeserverTestCase
+
+
+class WorkerSchemaTests(HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
+        hs = self.setup_test_homeserver(
+            federation_http_client=None, homeserver_to_use=GenericWorkerServer
+        )
+        return hs
+
+    def default_config(self):
+        conf = super().default_config()
+
+        # Mark this as a worker app.
+        conf["worker_app"] = "yes"
+
+        return conf
+
+    def test_rolling_back(self):
+        """Test that workers can start if the DB is a newer schema version"""
+
+        db_pool = self.hs.get_datastore().db_pool
+        db_conn = LoggingDatabaseConnection(
+            db_pool._db_pool.connect(),
+            db_pool.engine,
+            "tests",
+        )
+
+        cur = db_conn.cursor()
+        cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION + 1,))
+
+        db_conn.commit()
+
+        prepare_database(db_conn, db_pool.engine, self.hs.config)
+
+    def test_not_upgraded(self):
+        """Test that workers don't start if the DB has an older schema version"""
+        db_pool = self.hs.get_datastore().db_pool
+        db_conn = LoggingDatabaseConnection(
+            db_pool._db_pool.connect(),
+            db_pool.engine,
+            "tests",
+        )
+
+        cur = db_conn.cursor()
+        cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION - 1,))
+
+        db_conn.commit()
+
+        with self.assertRaises(PrepareDatabaseException):
+            prepare_database(db_conn, db_pool.engine, self.hs.config)
diff --git a/tests/test_preview.py b/tests/test_preview.py
index 9a576f9a4e..40b89fb2ef 100644
--- a/tests/test_preview.py
+++ b/tests/test_preview.py
@@ -277,6 +277,21 @@ class CalcOgTestCase(unittest.TestCase):
         tree = decode_body(html, "http://example.com/test.html")
         self.assertIsNone(tree)
 
+    def test_xml(self):
+        """Test decoding XML and ensure it works properly."""
+        # Note that the strip() call is important to ensure the xml tag starts
+        # at the initial byte.
+        html = b"""
+        <?xml version="1.0" encoding="UTF-8"?>
+
+        <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+        <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="en" lang="en">
+        <head><title>Foo</title></head><body>Some text.</body></html>
+        """.strip()
+        tree = decode_body(html, "http://example.com/test.html")
+        og = _calc_og(tree, "http://example.com/test.html")
+        self.assertEqual(og, {"og:title": "Foo", "og:description": "Some text."})
+
     def test_invalid_encoding(self):
         """An invalid character encoding should be ignored and treated as UTF-8, if possible."""
         html = b"""
diff --git a/tests/util/caches/test_deferred_cache.py b/tests/util/caches/test_deferred_cache.py
index 54a88a8325..c613ce3f10 100644
--- a/tests/util/caches/test_deferred_cache.py
+++ b/tests/util/caches/test_deferred_cache.py
@@ -47,9 +47,7 @@ class DeferredCacheTestCase(TestCase):
             self.assertTrue(set_d.called)
             return r
 
-        # TODO: Actually ObservableDeferred *doesn't* run its tests in order on py3.8.
-        #   maybe we should fix that?
-        # get_d.addCallback(check1)
+        get_d.addCallback(check1)
 
         # now fire off all the deferreds
         origin_d.callback(99)
diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py
index 39947a166b..ced3efd93f 100644
--- a/tests/util/caches/test_descriptors.py
+++ b/tests/util/caches/test_descriptors.py
@@ -17,6 +17,7 @@ from typing import Set
 from unittest import mock
 
 from twisted.internet import defer, reactor
+from twisted.internet.defer import Deferred
 
 from synapse.api.errors import SynapseError
 from synapse.logging.context import (
@@ -703,6 +704,48 @@ class CachedListDescriptorTestCase(unittest.TestCase):
             obj.mock.assert_called_once_with((40,), 2)
             self.assertEqual(r, {10: "fish", 40: "gravy"})
 
+    def test_concurrent_lookups(self):
+        """All concurrent lookups should get the same result"""
+
+        class Cls:
+            def __init__(self):
+                self.mock = mock.Mock()
+
+            @descriptors.cached()
+            def fn(self, arg1):
+                pass
+
+            @descriptors.cachedList("fn", "args1")
+            def list_fn(self, args1) -> "Deferred[dict]":
+                return self.mock(args1)
+
+        obj = Cls()
+        deferred_result = Deferred()
+        obj.mock.return_value = deferred_result
+
+        # start off several concurrent lookups of the same key
+        d1 = obj.list_fn([10])
+        d2 = obj.list_fn([10])
+        d3 = obj.list_fn([10])
+
+        # the mock should have been called exactly once
+        obj.mock.assert_called_once_with((10,))
+        obj.mock.reset_mock()
+
+        # ... and none of the calls should yet be complete
+        self.assertFalse(d1.called)
+        self.assertFalse(d2.called)
+        self.assertFalse(d3.called)
+
+        # complete the lookup. @cachedList functions need to complete with a map
+        # of input->result
+        deferred_result.callback({10: "peas"})
+
+        # ... which should give the right result to all the callers
+        self.assertEqual(self.successResultOf(d1), {10: "peas"})
+        self.assertEqual(self.successResultOf(d2), {10: "peas"})
+        self.assertEqual(self.successResultOf(d3), {10: "peas"})
+
     @defer.inlineCallbacks
     def test_invalidate(self):
         """Make sure that invalidation callbacks are called."""
diff --git a/tests/util/test_async_utils.py b/tests/util/test_async_helpers.py
index 069f875962..ab89cab812 100644
--- a/tests/util/test_async_utils.py
+++ b/tests/util/test_async_helpers.py
@@ -21,11 +21,78 @@ from synapse.logging.context import (
     PreserveLoggingContext,
     current_context,
 )
-from synapse.util.async_helpers import timeout_deferred
+from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
 
 from tests.unittest import TestCase
 
 
+class ObservableDeferredTest(TestCase):
+    def test_succeed(self):
+        origin_d = Deferred()
+        observable = ObservableDeferred(origin_d)
+
+        observer1 = observable.observe()
+        observer2 = observable.observe()
+
+        self.assertFalse(observer1.called)
+        self.assertFalse(observer2.called)
+
+        # check the first observer is called first
+        def check_called_first(res):
+            self.assertFalse(observer2.called)
+            return res
+
+        observer1.addBoth(check_called_first)
+
+        # store the results
+        results = [None, None]
+
+        def check_val(res, idx):
+            results[idx] = res
+            return res
+
+        observer1.addCallback(check_val, 0)
+        observer2.addCallback(check_val, 1)
+
+        origin_d.callback(123)
+        self.assertEqual(results[0], 123, "observer 1 callback result")
+        self.assertEqual(results[1], 123, "observer 2 callback result")
+
+    def test_failure(self):
+        origin_d = Deferred()
+        observable = ObservableDeferred(origin_d, consumeErrors=True)
+
+        observer1 = observable.observe()
+        observer2 = observable.observe()
+
+        self.assertFalse(observer1.called)
+        self.assertFalse(observer2.called)
+
+        # check the first observer is called first
+        def check_called_first(res):
+            self.assertFalse(observer2.called)
+            return res
+
+        observer1.addBoth(check_called_first)
+
+        # store the results
+        results = [None, None]
+
+        def check_val(res, idx):
+            results[idx] = res
+            return None
+
+        observer1.addErrback(check_val, 0)
+        observer2.addErrback(check_val, 1)
+
+        try:
+            raise Exception("gah!")
+        except Exception as e:
+            origin_d.errback(e)
+        self.assertEqual(str(results[0].value), "gah!", "observer 1 errback result")
+        self.assertEqual(str(results[1].value), "gah!", "observer 2 errback result")
+
+
 class TimeoutDeferredTest(TestCase):
     def setUp(self):
         self.clock = Clock()