summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--INSTALL.md62
-rw-r--r--README.rst5
-rw-r--r--changelog.d/7267.bugfix1
-rw-r--r--changelog.d/7385.feature1
-rw-r--r--changelog.d/7561.misc1
-rw-r--r--changelog.d/7567.misc1
-rw-r--r--changelog.d/7575.bugfix1
-rw-r--r--changelog.d/7584.misc1
-rw-r--r--changelog.d/7585.bugfix1
-rw-r--r--changelog.d/7587.doc1
-rw-r--r--changelog.d/7591.misc1
-rw-r--r--changelog.d/7594.bugfix1
-rw-r--r--changelog.d/7597.bugfix1
-rw-r--r--changelog.d/7599.bugfix1
-rw-r--r--changelog.d/7600.misc1
-rw-r--r--changelog.d/7602.doc1
-rw-r--r--changelog.d/7603.doc1
-rw-r--r--changelog.d/7607.bugfix1
-rw-r--r--changelog.d/7609.bugfix1
-rw-r--r--changelog.d/7614.misc1
-rw-r--r--contrib/grafana/synapse.json613
-rw-r--r--contrib/systemd/matrix-synapse.service3
-rw-r--r--docs/sample_config.yaml8
-rw-r--r--docs/sso_mapping_providers.md2
-rwxr-xr-xsetup.py1
-rw-r--r--synapse/app/generic_worker.py31
-rw-r--r--synapse/config/saml2_config.py18
-rw-r--r--synapse/groups/groups_server.py257
-rw-r--r--synapse/handlers/device.py94
-rw-r--r--synapse/handlers/e2e_keys.py22
-rw-r--r--synapse/handlers/groups_local.py82
-rw-r--r--synapse/handlers/identity.py94
-rw-r--r--synapse/handlers/room_list.py20
-rw-r--r--synapse/handlers/saml_handler.py53
-rw-r--r--synapse/handlers/ui_auth/checkers.py15
-rw-r--r--synapse/http/server.py43
-rw-r--r--synapse/metrics/background_process_metrics.py104
-rw-r--r--synapse/rest/admin/users.py16
-rw-r--r--synapse/rest/client/v1/login.py15
-rw-r--r--synapse/rest/saml2/response_resource.py26
-rw-r--r--synapse/storage/data_stores/main/receipts.py13
-rw-r--r--synapse/storage/data_stores/state/bg_updates.py12
-rw-r--r--synapse/storage/prepare_database.py5
-rw-r--r--synapse/util/async_helpers.py12
-rw-r--r--tests/rest/admin/test_user.py75
-rw-r--r--tests/rest/client/v1/test_login.py153
-rw-r--r--tests/test_federation.py56
-rw-r--r--tests/util/test_linearizer.py32
-rw-r--r--tox.ini1
49 files changed, 1427 insertions, 534 deletions
diff --git a/INSTALL.md b/INSTALL.md
index b8f8a67329..ef80a26c3f 100644
--- a/INSTALL.md
+++ b/INSTALL.md
@@ -180,35 +180,41 @@ sudo zypper in python-pip python-setuptools sqlite3 python-virtualenv \
 
 #### OpenBSD
 
-Installing prerequisites on OpenBSD:
+A port of Synapse is available under `net/synapse`. The filesystem
+underlying the homeserver directory (defaults to `/var/synapse`) has to be
+mounted with `wxallowed` (cf. `mount(8)`), so creating a separate filesystem
+and mounting it to `/var/synapse` should be taken into consideration.
+
+To be able to build Synapse's dependency on python the `WRKOBJDIR`
+(cf. `bsd.port.mk(5)`) for building python, too, needs to be on a filesystem
+mounted with `wxallowed` (cf. `mount(8)`).
+
+Creating a `WRKOBJDIR` for building python under `/usr/local` (which on a
+default OpenBSD installation is mounted with `wxallowed`):
 
 ```
-doas pkg_add python libffi py-pip py-setuptools sqlite3 py-virtualenv \
-              libxslt jpeg
+doas mkdir /usr/local/pobj_wxallowed
 ```
 
-There is currently no port for OpenBSD. Additionally, OpenBSD's security
-settings require a slightly more difficult installation process.
+Assuming `PORTS_PRIVSEP=Yes` (cf. `bsd.port.mk(5)`) and `SUDO=doas` are
+configured in `/etc/mk.conf`:
+
+```
+doas chown _pbuild:_pbuild /usr/local/pobj_wxallowed
+```
 
-(XXX: I suspect this is out of date)
+Setting the `WRKOBJDIR` for building python:
 
-1. Create a new directory in `/usr/local` called `_synapse`. Also, create a
-   new user called `_synapse` and set that directory as the new user's home.
-   This is required because, by default, OpenBSD only allows binaries which need
-   write and execute permissions on the same memory space to be run from
-   `/usr/local`.
-2. `su` to the new `_synapse` user and change to their home directory.
-3. Create a new virtualenv: `virtualenv -p python3 ~/.synapse`
-4. Source the virtualenv configuration located at
-   `/usr/local/_synapse/.synapse/bin/activate`. This is done in `ksh` by
-   using the `.` command, rather than `bash`'s `source`.
-5. Optionally, use `pip` to install `lxml`, which Synapse needs to parse
-   webpages for their titles.
-6. Use `pip` to install this repository: `pip install matrix-synapse`
-7. Optionally, change `_synapse`'s shell to `/bin/false` to reduce the
-   chance of a compromised Synapse server being used to take over your box.
+```
+echo WRKOBJDIR_lang/python/3.7=/usr/local/pobj_wxallowed  \\nWRKOBJDIR_lang/python/2.7=/usr/local/pobj_wxallowed >> /etc/mk.conf
+```
 
-After this, you may proceed with the rest of the install directions.
+Building Synapse:
+
+```
+cd /usr/ports/net/synapse
+make install
+```
 
 #### Windows
 
@@ -350,6 +356,18 @@ Synapse can be installed via FreeBSD Ports or Packages contributed by Brendan Mo
  - Ports: `cd /usr/ports/net-im/py-matrix-synapse && make install clean`
  - Packages: `pkg install py37-matrix-synapse`
 
+### OpenBSD
+
+As of OpenBSD 6.7 Synapse is available as a pre-compiled binary. The filesystem
+underlying the homeserver directory (defaults to `/var/synapse`) has to be
+mounted with `wxallowed` (cf. `mount(8)`), so creating a separate filesystem
+and mounting it to `/var/synapse` should be taken into consideration.
+
+Installing Synapse:
+
+```
+doas pkg_add synapse
+```
 
 ### NixOS
 
diff --git a/README.rst b/README.rst
index 677689bc03..31d375d19b 100644
--- a/README.rst
+++ b/README.rst
@@ -267,7 +267,7 @@ First calculate the hash of the new password::
     Confirm password:
     $2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
 
-Then update the `users` table in the database::
+Then update the ``users`` table in the database::
 
     UPDATE users SET password_hash='$2a$12$xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
         WHERE name='@test:test.com';
@@ -335,6 +335,9 @@ Building internal API documentation::
 Troubleshooting
 ===============
 
+Need help? Join our community support room on Matrix:
+`#synapse:matrix.org <https://matrix.to/#/#synapse:matrix.org>`_
+
 Running out of File Handles
 ---------------------------
 
diff --git a/changelog.d/7267.bugfix b/changelog.d/7267.bugfix
new file mode 100644
index 0000000000..0af316c1a2
--- /dev/null
+++ b/changelog.d/7267.bugfix
@@ -0,0 +1 @@
+Fix email notifications not being enabled for new users when created via the Admin API.
diff --git a/changelog.d/7385.feature b/changelog.d/7385.feature
new file mode 100644
index 0000000000..9d8fb2311a
--- /dev/null
+++ b/changelog.d/7385.feature
@@ -0,0 +1 @@
+For SAML authentication, add the ability to pass email addresses to be added to new users' accounts via SAML attributes. Contributed by Christopher Cooper.
diff --git a/changelog.d/7561.misc b/changelog.d/7561.misc
new file mode 100644
index 0000000000..448dbd5699
--- /dev/null
+++ b/changelog.d/7561.misc
@@ -0,0 +1 @@
+Convert the identity handler to async/await.
diff --git a/changelog.d/7567.misc b/changelog.d/7567.misc
new file mode 100644
index 0000000000..b086d5d026
--- /dev/null
+++ b/changelog.d/7567.misc
@@ -0,0 +1 @@
+Improve query performance for fetching state from a PostgreSQL database.
diff --git a/changelog.d/7575.bugfix b/changelog.d/7575.bugfix
new file mode 100644
index 0000000000..0ab5516eb3
--- /dev/null
+++ b/changelog.d/7575.bugfix
@@ -0,0 +1 @@
+Fix str placeholders in an instance of `PrepareDatabaseException`. Introduced in Synapse v1.8.0.
diff --git a/changelog.d/7584.misc b/changelog.d/7584.misc
new file mode 100644
index 0000000000..55d1689f77
--- /dev/null
+++ b/changelog.d/7584.misc
@@ -0,0 +1 @@
+Speed up processing of federation stream RDATA rows.
diff --git a/changelog.d/7585.bugfix b/changelog.d/7585.bugfix
new file mode 100644
index 0000000000..263295599d
--- /dev/null
+++ b/changelog.d/7585.bugfix
@@ -0,0 +1 @@
+Fix a bug in automatic user creation during first time login with `m.login.jwt`. Regression in v1.6.0. Contributed by @olof.
diff --git a/changelog.d/7587.doc b/changelog.d/7587.doc
new file mode 100644
index 0000000000..ec4a430436
--- /dev/null
+++ b/changelog.d/7587.doc
@@ -0,0 +1 @@
+Update the OpenBSD installation instructions.
\ No newline at end of file
diff --git a/changelog.d/7591.misc b/changelog.d/7591.misc
new file mode 100644
index 0000000000..17785b1f21
--- /dev/null
+++ b/changelog.d/7591.misc
@@ -0,0 +1 @@
+Add comment to systemd example to show postgresql dependency.
diff --git a/changelog.d/7594.bugfix b/changelog.d/7594.bugfix
new file mode 100644
index 0000000000..f0c067e184
--- /dev/null
+++ b/changelog.d/7594.bugfix
@@ -0,0 +1 @@
+Fix a bug causing the cross-signing keys to be ignored when resyncing a device list.
diff --git a/changelog.d/7597.bugfix b/changelog.d/7597.bugfix
new file mode 100644
index 0000000000..e2ff951915
--- /dev/null
+++ b/changelog.d/7597.bugfix
@@ -0,0 +1 @@
+Fix metrics failing when there is a large number of active background processes.
diff --git a/changelog.d/7599.bugfix b/changelog.d/7599.bugfix
new file mode 100644
index 0000000000..deefe5680f
--- /dev/null
+++ b/changelog.d/7599.bugfix
@@ -0,0 +1 @@
+Fix bug where returning rooms for a group would fail if it included a room that the server was not in.
diff --git a/changelog.d/7600.misc b/changelog.d/7600.misc
new file mode 100644
index 0000000000..3c2affbe6f
--- /dev/null
+++ b/changelog.d/7600.misc
@@ -0,0 +1 @@
+Convert groups handlers to async/await.
diff --git a/changelog.d/7602.doc b/changelog.d/7602.doc
new file mode 100644
index 0000000000..09039353db
--- /dev/null
+++ b/changelog.d/7602.doc
@@ -0,0 +1 @@
+Advertise Python 3.8 support in `setup.py`.
diff --git a/changelog.d/7603.doc b/changelog.d/7603.doc
new file mode 100644
index 0000000000..e450888d4b
--- /dev/null
+++ b/changelog.d/7603.doc
@@ -0,0 +1 @@
+Add a link to `#synapse:matrix.org` in the troubleshooting section of the README.
diff --git a/changelog.d/7607.bugfix b/changelog.d/7607.bugfix
new file mode 100644
index 0000000000..04b22e5ffe
--- /dev/null
+++ b/changelog.d/7607.bugfix
@@ -0,0 +1 @@
+Fix duplicate key violation when persisting read markers.
diff --git a/changelog.d/7609.bugfix b/changelog.d/7609.bugfix
new file mode 100644
index 0000000000..e2eceeef0c
--- /dev/null
+++ b/changelog.d/7609.bugfix
@@ -0,0 +1 @@
+Prevent an entire iteration of the device list resync loop from failing if one server responds with a malformed result.
diff --git a/changelog.d/7614.misc b/changelog.d/7614.misc
new file mode 100644
index 0000000000..f0e24f9f61
--- /dev/null
+++ b/changelog.d/7614.misc
@@ -0,0 +1 @@
+Clean up exception handling in `SAML2ResponseResource`.
diff --git a/contrib/grafana/synapse.json b/contrib/grafana/synapse.json
index 656a442597..30a8681f5a 100644
--- a/contrib/grafana/synapse.json
+++ b/contrib/grafana/synapse.json
@@ -18,7 +18,7 @@
   "gnetId": null,
   "graphTooltip": 0,
   "id": 1,
-  "iteration": 1584612489167,
+  "iteration": 1591098104645,
   "links": [
     {
       "asDropdown": true,
@@ -577,13 +577,15 @@
           "editable": true,
           "error": false,
           "fill": 1,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 0,
-            "y": 29
+            "y": 2
           },
+          "hiddenSeries": false,
           "id": 5,
           "legend": {
             "alignAsTable": false,
@@ -602,6 +604,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -704,12 +709,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 12,
-            "y": 29
+            "y": 2
           },
+          "hiddenSeries": false,
           "id": 37,
           "legend": {
             "avg": false,
@@ -724,6 +731,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -810,13 +820,15 @@
           "editable": true,
           "error": false,
           "fill": 0,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 0,
-            "y": 36
+            "y": 9
           },
+          "hiddenSeries": false,
           "id": 34,
           "legend": {
             "avg": false,
@@ -831,6 +843,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -898,13 +913,16 @@
           "datasource": "$datasource",
           "description": "Shows the time in which the given percentage of reactor ticks completed, over the sampled timespan",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 12,
-            "y": 36
+            "y": 9
           },
+          "hiddenSeries": false,
           "id": 105,
+          "interval": "",
           "legend": {
             "avg": false,
             "current": false,
@@ -918,6 +936,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -952,9 +973,10 @@
               "refId": "C"
             },
             {
-              "expr": "",
+              "expr": "rate(python_twisted_reactor_tick_time_sum{index=~\"$index\",instance=\"$instance\",job=~\"$job\"}[$bucket_size]) / rate(python_twisted_reactor_tick_time_count{index=~\"$index\",instance=\"$instance\",job=~\"$job\"}[$bucket_size])",
               "format": "time_series",
               "intervalFactor": 1,
+              "legendFormat": "{{job}}-{{index}} mean",
               "refId": "D"
             }
           ],
@@ -1005,14 +1027,16 @@
           "dashLength": 10,
           "dashes": false,
           "datasource": "$datasource",
-          "fill": 1,
+          "fill": 0,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 0,
-            "y": 43
+            "y": 16
           },
-          "id": 50,
+          "hiddenSeries": false,
+          "id": 53,
           "legend": {
             "avg": false,
             "current": false,
@@ -1026,6 +1050,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -1037,20 +1064,18 @@
           "steppedLine": false,
           "targets": [
             {
-              "expr": "rate(python_twisted_reactor_tick_time_sum{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])/rate(python_twisted_reactor_tick_time_count[$bucket_size])",
+              "expr": "min_over_time(up{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
-              "interval": "",
               "intervalFactor": 2,
               "legendFormat": "{{job}}-{{index}}",
-              "refId": "A",
-              "step": 20
+              "refId": "A"
             }
           ],
           "thresholds": [],
           "timeFrom": null,
           "timeRegions": [],
           "timeShift": null,
-          "title": "Avg reactor tick time",
+          "title": "Up",
           "tooltip": {
             "shared": true,
             "sort": 0,
@@ -1066,7 +1091,7 @@
           },
           "yaxes": [
             {
-              "format": "s",
+              "format": "short",
               "label": null,
               "logBase": 1,
               "max": null,
@@ -1079,7 +1104,7 @@
               "logBase": 1,
               "max": null,
               "min": null,
-              "show": false
+              "show": true
             }
           ],
           "yaxis": {
@@ -1094,12 +1119,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 12,
-            "y": 43
+            "y": 16
           },
+          "hiddenSeries": false,
           "id": 49,
           "legend": {
             "avg": false,
@@ -1114,6 +1141,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -1188,14 +1218,17 @@
           "dashLength": 10,
           "dashes": false,
           "datasource": "$datasource",
-          "fill": 0,
+          "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 0,
-            "y": 50
+            "y": 23
           },
-          "id": 53,
+          "hiddenSeries": false,
+          "id": 136,
+          "interval": "",
           "legend": {
             "avg": false,
             "current": false,
@@ -1207,11 +1240,12 @@
           },
           "lines": true,
           "linewidth": 1,
-          "links": [],
           "nullPointMode": "null",
-          "paceLength": 10,
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
-          "pointradius": 5,
+          "pointradius": 2,
           "points": false,
           "renderer": "flot",
           "seriesOverrides": [],
@@ -1220,18 +1254,21 @@
           "steppedLine": false,
           "targets": [
             {
-              "expr": "min_over_time(up{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
-              "format": "time_series",
-              "intervalFactor": 2,
-              "legendFormat": "{{job}}-{{index}}",
+              "expr": "rate(synapse_http_client_requests{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
+              "legendFormat": "{{job}}-{{index}} {{method}}",
               "refId": "A"
+            },
+            {
+              "expr": "rate(synapse_http_matrixfederationclient_requests{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
+              "legendFormat": "{{job}}-{{index}} {{method}} (federation)",
+              "refId": "B"
             }
           ],
           "thresholds": [],
           "timeFrom": null,
           "timeRegions": [],
           "timeShift": null,
-          "title": "Up",
+          "title": "Outgoing HTTP request rate",
           "tooltip": {
             "shared": true,
             "sort": 0,
@@ -1247,7 +1284,7 @@
           },
           "yaxes": [
             {
-              "format": "short",
+              "format": "reqps",
               "label": null,
               "logBase": 1,
               "max": null,
@@ -1275,12 +1312,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 12,
-            "y": 50
+            "y": 23
           },
+          "hiddenSeries": false,
           "id": 120,
           "legend": {
             "avg": false,
@@ -1295,6 +1334,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 2,
           "points": false,
@@ -1307,6 +1349,7 @@
             {
               "expr": "rate(synapse_http_server_response_ru_utime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])+rate(synapse_http_server_response_ru_stime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
+              "hide": false,
               "instant": false,
               "intervalFactor": 1,
               "legendFormat": "{{job}}-{{index}} {{method}} {{servlet}} {{tag}}",
@@ -1315,6 +1358,7 @@
             {
               "expr": "rate(synapse_background_process_ru_utime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])+rate(synapse_background_process_ru_stime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
+              "hide": false,
               "instant": false,
               "interval": "",
               "intervalFactor": 1,
@@ -1770,6 +1814,7 @@
           "editable": true,
           "error": false,
           "fill": 2,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 8,
@@ -1777,6 +1822,7 @@
             "x": 0,
             "y": 31
           },
+          "hiddenSeries": false,
           "id": 4,
           "legend": {
             "alignAsTable": true,
@@ -1795,7 +1841,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -1878,6 +1926,7 @@
           "editable": true,
           "error": false,
           "fill": 1,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 8,
@@ -1885,6 +1934,7 @@
             "x": 12,
             "y": 31
           },
+          "hiddenSeries": false,
           "id": 32,
           "legend": {
             "avg": false,
@@ -1899,7 +1949,9 @@
           "linewidth": 2,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -1968,6 +2020,7 @@
           "editable": true,
           "error": false,
           "fill": 2,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 8,
@@ -1975,7 +2028,8 @@
             "x": 0,
             "y": 39
           },
-          "id": 23,
+          "hiddenSeries": false,
+          "id": 139,
           "legend": {
             "alignAsTable": true,
             "avg": false,
@@ -1993,7 +2047,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -2004,7 +2060,7 @@
           "steppedLine": false,
           "targets": [
             {
-              "expr": "rate(synapse_http_server_response_ru_utime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])+rate(synapse_http_server_response_ru_stime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_http_server_in_flight_requests_ru_utime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])+rate(synapse_http_server_in_flight_requests_ru_stime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 1,
@@ -2079,6 +2135,7 @@
           "editable": true,
           "error": false,
           "fill": 2,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 8,
@@ -2086,6 +2143,7 @@
             "x": 12,
             "y": 39
           },
+          "hiddenSeries": false,
           "id": 52,
           "legend": {
             "alignAsTable": true,
@@ -2104,7 +2162,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -2115,7 +2175,7 @@
           "steppedLine": false,
           "targets": [
             {
-              "expr": "(rate(synapse_http_server_response_ru_utime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])+rate(synapse_http_server_response_ru_stime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])) / rate(synapse_http_server_response_count{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "(rate(synapse_http_server_in_flight_requests_ru_utime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])+rate(synapse_http_server_in_flight_requests_ru_stime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])) / rate(synapse_http_server_requests_received{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -2187,6 +2247,7 @@
           "editable": true,
           "error": false,
           "fill": 1,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 8,
@@ -2194,6 +2255,7 @@
             "x": 0,
             "y": 47
           },
+          "hiddenSeries": false,
           "id": 7,
           "legend": {
             "alignAsTable": true,
@@ -2211,7 +2273,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -2222,7 +2286,7 @@
           "steppedLine": false,
           "targets": [
             {
-              "expr": "rate(synapse_http_server_response_db_txn_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_http_server_in_flight_requests_db_txn_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -2280,6 +2344,7 @@
           "editable": true,
           "error": false,
           "fill": 2,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 8,
@@ -2287,6 +2352,7 @@
             "x": 12,
             "y": 47
           },
+          "hiddenSeries": false,
           "id": 47,
           "legend": {
             "alignAsTable": true,
@@ -2305,7 +2371,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -2372,12 +2440,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 9,
             "w": 12,
             "x": 0,
             "y": 55
           },
+          "hiddenSeries": false,
           "id": 103,
           "legend": {
             "avg": false,
@@ -2392,7 +2462,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -2475,12 +2547,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 9,
             "w": 12,
             "x": 0,
             "y": 32
           },
+          "hiddenSeries": false,
           "id": 99,
           "legend": {
             "avg": false,
@@ -2495,7 +2569,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -2563,12 +2639,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 9,
             "w": 12,
             "x": 12,
             "y": 32
           },
+          "hiddenSeries": false,
           "id": 101,
           "legend": {
             "avg": false,
@@ -2583,7 +2661,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -2649,6 +2729,93 @@
             "align": false,
             "alignLevel": null
           }
+        },
+        {
+          "aliasColors": {},
+          "bars": false,
+          "dashLength": 10,
+          "dashes": false,
+          "datasource": "$datasource",
+          "fill": 1,
+          "fillGradient": 0,
+          "gridPos": {
+            "h": 8,
+            "w": 12,
+            "x": 0,
+            "y": 41
+          },
+          "hiddenSeries": false,
+          "id": 138,
+          "legend": {
+            "avg": false,
+            "current": false,
+            "max": false,
+            "min": false,
+            "show": true,
+            "total": false,
+            "values": false
+          },
+          "lines": true,
+          "linewidth": 1,
+          "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
+          "percentage": false,
+          "pointradius": 2,
+          "points": false,
+          "renderer": "flot",
+          "seriesOverrides": [],
+          "spaceLength": 10,
+          "stack": false,
+          "steppedLine": false,
+          "targets": [
+            {
+              "expr": "synapse_background_process_in_flight_count{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}",
+              "legendFormat": "{{job}}-{{index}} {{name}}",
+              "refId": "A"
+            }
+          ],
+          "thresholds": [],
+          "timeFrom": null,
+          "timeRegions": [],
+          "timeShift": null,
+          "title": "Background jobs in flight",
+          "tooltip": {
+            "shared": false,
+            "sort": 0,
+            "value_type": "individual"
+          },
+          "type": "graph",
+          "xaxis": {
+            "buckets": null,
+            "mode": "time",
+            "name": null,
+            "show": true,
+            "values": []
+          },
+          "yaxes": [
+            {
+              "format": "short",
+              "label": null,
+              "logBase": 1,
+              "max": null,
+              "min": null,
+              "show": true
+            },
+            {
+              "format": "short",
+              "label": null,
+              "logBase": 1,
+              "max": null,
+              "min": null,
+              "show": true
+            }
+          ],
+          "yaxis": {
+            "align": false,
+            "alignLevel": null
+          }
         }
       ],
       "title": "Background jobs",
@@ -2679,6 +2846,7 @@
             "x": 0,
             "y": 33
           },
+          "hiddenSeries": false,
           "id": 79,
           "legend": {
             "avg": false,
@@ -2774,6 +2942,7 @@
             "x": 12,
             "y": 33
           },
+          "hiddenSeries": false,
           "id": 83,
           "legend": {
             "avg": false,
@@ -2871,6 +3040,7 @@
             "x": 0,
             "y": 42
           },
+          "hiddenSeries": false,
           "id": 109,
           "legend": {
             "avg": false,
@@ -2969,6 +3139,7 @@
             "x": 12,
             "y": 42
           },
+          "hiddenSeries": false,
           "id": 111,
           "legend": {
             "avg": false,
@@ -3045,6 +3216,144 @@
             "align": false,
             "alignLevel": null
           }
+        },
+        {
+          "aliasColors": {},
+          "bars": false,
+          "dashLength": 10,
+          "dashes": false,
+          "datasource": "$datasource",
+          "description": "",
+          "fill": 1,
+          "fillGradient": 0,
+          "gridPos": {
+            "h": 9,
+            "w": 12,
+            "x": 0,
+            "y": 51
+          },
+          "hiddenSeries": false,
+          "id": 140,
+          "legend": {
+            "avg": false,
+            "current": false,
+            "max": false,
+            "min": false,
+            "show": true,
+            "total": false,
+            "values": false
+          },
+          "lines": true,
+          "linewidth": 1,
+          "links": [],
+          "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
+          "paceLength": 10,
+          "percentage": false,
+          "pointradius": 5,
+          "points": false,
+          "renderer": "flot",
+          "seriesOverrides": [],
+          "spaceLength": 10,
+          "stack": false,
+          "steppedLine": false,
+          "targets": [
+            {
+              "expr": "synapse_federation_send_queue_presence_changed_size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
+              "format": "time_series",
+              "interval": "",
+              "intervalFactor": 1,
+              "legendFormat": "presence changed",
+              "refId": "A"
+            },
+            {
+              "expr": "synapse_federation_send_queue_presence_map_size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
+              "format": "time_series",
+              "hide": false,
+              "interval": "",
+              "intervalFactor": 1,
+              "legendFormat": "presence map",
+              "refId": "B"
+            },
+            {
+              "expr": "synapse_federation_send_queue_presence_destinations_size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
+              "format": "time_series",
+              "hide": false,
+              "interval": "",
+              "intervalFactor": 1,
+              "legendFormat": "presence destinations",
+              "refId": "E"
+            },
+            {
+              "expr": "synapse_federation_send_queue_keyed_edu_size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
+              "format": "time_series",
+              "hide": false,
+              "interval": "",
+              "intervalFactor": 1,
+              "legendFormat": "keyed edus",
+              "refId": "C"
+            },
+            {
+              "expr": "synapse_federation_send_queue_edus_size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
+              "format": "time_series",
+              "hide": false,
+              "interval": "",
+              "intervalFactor": 1,
+              "legendFormat": "other edus",
+              "refId": "D"
+            },
+            {
+              "expr": "synapse_federation_send_queue_pos_time_size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
+              "format": "time_series",
+              "hide": false,
+              "interval": "",
+              "intervalFactor": 1,
+              "legendFormat": "stream positions",
+              "refId": "F"
+            }
+          ],
+          "thresholds": [],
+          "timeFrom": null,
+          "timeRegions": [],
+          "timeShift": null,
+          "title": "Outgoing EDU queues on master",
+          "tooltip": {
+            "shared": true,
+            "sort": 0,
+            "value_type": "individual"
+          },
+          "type": "graph",
+          "xaxis": {
+            "buckets": null,
+            "mode": "time",
+            "name": null,
+            "show": true,
+            "values": []
+          },
+          "yaxes": [
+            {
+              "format": "none",
+              "label": null,
+              "logBase": 1,
+              "max": null,
+              "min": "0",
+              "show": true
+            },
+            {
+              "format": "short",
+              "label": null,
+              "logBase": 1,
+              "max": null,
+              "min": null,
+              "show": true
+            }
+          ],
+          "yaxis": {
+            "align": false,
+            "alignLevel": null
+          }
         }
       ],
       "title": "Federation",
@@ -3274,12 +3583,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 0,
-            "y": 35
+            "y": 52
           },
+          "hiddenSeries": false,
           "id": 48,
           "legend": {
             "avg": false,
@@ -3294,7 +3605,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -3364,12 +3677,14 @@
           "datasource": "$datasource",
           "description": "Shows the time in which the given percentage of database queries were scheduled, over the sampled timespan",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 12,
-            "y": 35
+            "y": 52
           },
+          "hiddenSeries": false,
           "id": 104,
           "legend": {
             "alignAsTable": true,
@@ -3385,7 +3700,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -3479,13 +3796,15 @@
           "editable": true,
           "error": false,
           "fill": 0,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 0,
-            "y": 42
+            "y": 59
           },
+          "hiddenSeries": false,
           "id": 10,
           "legend": {
             "avg": false,
@@ -3502,7 +3821,9 @@
           "linewidth": 2,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -3571,13 +3892,15 @@
           "editable": true,
           "error": false,
           "fill": 1,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 12,
-            "y": 42
+            "y": 59
           },
+          "hiddenSeries": false,
           "id": 11,
           "legend": {
             "avg": false,
@@ -3594,7 +3917,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -3686,8 +4011,9 @@
             "h": 13,
             "w": 12,
             "x": 0,
-            "y": 36
+            "y": 67
           },
+          "hiddenSeries": false,
           "id": 12,
           "legend": {
             "alignAsTable": true,
@@ -3780,8 +4106,9 @@
             "h": 13,
             "w": 12,
             "x": 12,
-            "y": 36
+            "y": 67
           },
+          "hiddenSeries": false,
           "id": 26,
           "legend": {
             "alignAsTable": true,
@@ -3874,8 +4201,9 @@
             "h": 13,
             "w": 12,
             "x": 0,
-            "y": 49
+            "y": 80
           },
+          "hiddenSeries": false,
           "id": 13,
           "legend": {
             "alignAsTable": true,
@@ -3905,7 +4233,7 @@
           "steppedLine": false,
           "targets": [
             {
-              "expr": "rate(synapse_util_metrics_block_db_txn_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\",block_name!=\"wrapped_request_handler\"}[$bucket_size])",
+              "expr": "rate(synapse_util_metrics_block_db_txn_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -3959,6 +4287,7 @@
           "dashLength": 10,
           "dashes": false,
           "datasource": "$datasource",
+          "description": "The time each database transaction takes to execute, on average, broken down by metrics block.",
           "editable": true,
           "error": false,
           "fill": 1,
@@ -3968,8 +4297,9 @@
             "h": 13,
             "w": 12,
             "x": 12,
-            "y": 49
+            "y": 80
           },
+          "hiddenSeries": false,
           "id": 27,
           "legend": {
             "alignAsTable": true,
@@ -4012,7 +4342,7 @@
           "timeFrom": null,
           "timeRegions": [],
           "timeShift": null,
-          "title": "Average Database Time per Block",
+          "title": "Average Database Transaction time, by Block",
           "tooltip": {
             "shared": false,
             "sort": 0,
@@ -4056,13 +4386,15 @@
           "editable": true,
           "error": false,
           "fill": 1,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 13,
             "w": 12,
             "x": 0,
-            "y": 62
+            "y": 93
           },
+          "hiddenSeries": false,
           "id": 28,
           "legend": {
             "avg": false,
@@ -4077,7 +4409,9 @@
           "linewidth": 2,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -4146,13 +4480,15 @@
           "editable": true,
           "error": false,
           "fill": 1,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 13,
             "w": 12,
             "x": 12,
-            "y": 62
+            "y": 93
           },
+          "hiddenSeries": false,
           "id": 25,
           "legend": {
             "avg": false,
@@ -4167,7 +4503,9 @@
           "linewidth": 2,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -4253,6 +4591,7 @@
           "editable": true,
           "error": false,
           "fill": 0,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 10,
@@ -4260,6 +4599,7 @@
             "x": 0,
             "y": 37
           },
+          "hiddenSeries": false,
           "id": 1,
           "legend": {
             "alignAsTable": true,
@@ -4277,6 +4617,9 @@
           "linewidth": 2,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -4346,6 +4689,7 @@
           "editable": true,
           "error": false,
           "fill": 1,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 10,
@@ -4353,6 +4697,7 @@
             "x": 12,
             "y": 37
           },
+          "hiddenSeries": false,
           "id": 8,
           "legend": {
             "alignAsTable": true,
@@ -4369,6 +4714,9 @@
           "linewidth": 2,
           "links": [],
           "nullPointMode": "connected",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -4437,6 +4785,7 @@
           "editable": true,
           "error": false,
           "fill": 1,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 10,
@@ -4444,6 +4793,7 @@
             "x": 0,
             "y": 47
           },
+          "hiddenSeries": false,
           "id": 38,
           "legend": {
             "alignAsTable": true,
@@ -4460,6 +4810,9 @@
           "linewidth": 2,
           "links": [],
           "nullPointMode": "connected",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -4525,12 +4878,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 10,
             "w": 12,
             "x": 12,
             "y": 47
           },
+          "hiddenSeries": false,
           "id": 39,
           "legend": {
             "alignAsTable": true,
@@ -4546,6 +4901,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -4612,12 +4970,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 9,
             "w": 12,
             "x": 0,
             "y": 57
           },
+          "hiddenSeries": false,
           "id": 65,
           "legend": {
             "alignAsTable": true,
@@ -4633,6 +4993,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -4715,12 +5078,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 9,
             "w": 12,
             "x": 0,
             "y": 66
           },
+          "hiddenSeries": false,
           "id": 91,
           "legend": {
             "avg": false,
@@ -4735,6 +5100,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -4805,6 +5173,7 @@
           "editable": true,
           "error": false,
           "fill": 1,
+          "fillGradient": 0,
           "grid": {},
           "gridPos": {
             "h": 9,
@@ -4812,6 +5181,7 @@
             "x": 12,
             "y": 66
           },
+          "hiddenSeries": false,
           "id": 21,
           "legend": {
             "alignAsTable": true,
@@ -4827,6 +5197,9 @@
           "linewidth": 2,
           "links": [],
           "nullPointMode": "null as zero",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -4893,12 +5266,14 @@
           "datasource": "$datasource",
           "description": "'gen 0' shows the number of objects allocated since the last gen0 GC.\n'gen 1' / 'gen 2' show the number of gen0/gen1 GCs since the last gen1/gen2 GC.",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 9,
             "w": 12,
             "x": 0,
             "y": 75
           },
+          "hiddenSeries": false,
           "id": 89,
           "legend": {
             "avg": false,
@@ -4915,6 +5290,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -4986,12 +5364,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 9,
             "w": 12,
             "x": 12,
             "y": 75
           },
+          "hiddenSeries": false,
           "id": 93,
           "legend": {
             "avg": false,
@@ -5006,6 +5386,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "connected",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -5071,12 +5454,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 9,
             "w": 12,
             "x": 0,
             "y": 84
           },
+          "hiddenSeries": false,
           "id": 95,
           "legend": {
             "avg": false,
@@ -5091,6 +5476,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 5,
           "points": false,
@@ -5179,6 +5567,7 @@
             "show": true
           },
           "links": [],
+          "options": {},
           "reverseYBuckets": false,
           "targets": [
             {
@@ -5236,12 +5625,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 0,
             "y": 39
           },
+          "hiddenSeries": false,
           "id": 2,
           "legend": {
             "avg": false,
@@ -5256,7 +5647,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -5356,12 +5749,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 12,
             "y": 39
           },
+          "hiddenSeries": false,
           "id": 41,
           "legend": {
             "avg": false,
@@ -5376,7 +5771,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -5445,12 +5842,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 0,
             "y": 46
           },
+          "hiddenSeries": false,
           "id": 42,
           "legend": {
             "avg": false,
@@ -5465,7 +5864,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -5533,12 +5934,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 12,
             "y": 46
           },
+          "hiddenSeries": false,
           "id": 43,
           "legend": {
             "avg": false,
@@ -5553,7 +5956,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -5621,12 +6026,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 0,
             "y": 53
           },
+          "hiddenSeries": false,
           "id": 113,
           "legend": {
             "avg": false,
@@ -5641,7 +6048,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -5658,6 +6067,13 @@
               "intervalFactor": 1,
               "legendFormat": "{{job}}-{{index}} {{stream_name}}",
               "refId": "A"
+            },
+            {
+              "expr": "synapse_replication_tcp_resource_total_connections{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}",
+              "format": "time_series",
+              "intervalFactor": 1,
+              "legendFormat": "{{job}}-{{index}}",
+              "refId": "B"
             }
           ],
           "thresholds": [],
@@ -5684,7 +6100,7 @@
               "label": null,
               "logBase": 1,
               "max": null,
-              "min": null,
+              "min": "0",
               "show": true
             },
             {
@@ -5708,12 +6124,14 @@
           "dashes": false,
           "datasource": "$datasource",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 7,
             "w": 12,
             "x": 12,
             "y": 53
           },
+          "hiddenSeries": false,
           "id": 115,
           "legend": {
             "avg": false,
@@ -5728,7 +6146,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "paceLength": 10,
           "percentage": false,
           "pointradius": 5,
@@ -5816,8 +6236,9 @@
             "h": 9,
             "w": 12,
             "x": 0,
-            "y": 40
+            "y": 58
           },
+          "hiddenSeries": false,
           "id": 67,
           "legend": {
             "avg": false,
@@ -5907,8 +6328,9 @@
             "h": 9,
             "w": 12,
             "x": 12,
-            "y": 40
+            "y": 58
           },
+          "hiddenSeries": false,
           "id": 71,
           "legend": {
             "avg": false,
@@ -5998,8 +6420,9 @@
             "h": 9,
             "w": 12,
             "x": 0,
-            "y": 49
+            "y": 67
           },
+          "hiddenSeries": false,
           "id": 121,
           "interval": "",
           "legend": {
@@ -6116,7 +6539,7 @@
             "h": 8,
             "w": 12,
             "x": 0,
-            "y": 14
+            "y": 41
           },
           "heatmap": {},
           "hideZeroBuckets": true,
@@ -6171,12 +6594,14 @@
           "datasource": "$datasource",
           "description": "Number of rooms with the given number of forward extremities or fewer.\n\nThis is only updated once an hour.",
           "fill": 0,
+          "fillGradient": 0,
           "gridPos": {
             "h": 8,
             "w": 12,
             "x": 12,
-            "y": 14
+            "y": 41
           },
+          "hiddenSeries": false,
           "id": 124,
           "interval": "",
           "legend": {
@@ -6192,7 +6617,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 2,
           "points": false,
@@ -6273,7 +6700,7 @@
             "h": 8,
             "w": 12,
             "x": 0,
-            "y": 22
+            "y": 49
           },
           "heatmap": {},
           "hideZeroBuckets": true,
@@ -6328,12 +6755,14 @@
           "datasource": "$datasource",
           "description": "For a given percentage P, the number X where P% of events were persisted to rooms with X forward extremities or fewer.",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 8,
             "w": 12,
             "x": 12,
-            "y": 22
+            "y": 49
           },
+          "hiddenSeries": false,
           "id": 128,
           "legend": {
             "avg": false,
@@ -6348,7 +6777,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 2,
           "points": false,
@@ -6448,7 +6879,7 @@
             "h": 8,
             "w": 12,
             "x": 0,
-            "y": 30
+            "y": 57
           },
           "heatmap": {},
           "hideZeroBuckets": true,
@@ -6503,12 +6934,14 @@
           "datasource": "$datasource",
           "description": "For  given percentage P, the number X where P% of events were persisted to rooms with X stale forward extremities or fewer.\n\nStale forward extremities are those that were in the previous set of extremities as well as the new.",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 8,
             "w": 12,
             "x": 12,
-            "y": 30
+            "y": 57
           },
+          "hiddenSeries": false,
           "id": 130,
           "legend": {
             "avg": false,
@@ -6523,7 +6956,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 2,
           "points": false,
@@ -6623,7 +7058,7 @@
             "h": 8,
             "w": 12,
             "x": 0,
-            "y": 38
+            "y": 65
           },
           "heatmap": {},
           "hideZeroBuckets": true,
@@ -6678,12 +7113,14 @@
           "datasource": "$datasource",
           "description": "For a given percentage P, the number X where P% of state resolution operations took place over X state groups or fewer.",
           "fill": 1,
+          "fillGradient": 0,
           "gridPos": {
             "h": 8,
             "w": 12,
             "x": 12,
-            "y": 38
+            "y": 65
           },
+          "hiddenSeries": false,
           "id": 132,
           "interval": "",
           "legend": {
@@ -6699,7 +7136,9 @@
           "linewidth": 1,
           "links": [],
           "nullPointMode": "null",
-          "options": {},
+          "options": {
+            "dataLinks": []
+          },
           "percentage": false,
           "pointradius": 2,
           "points": false,
@@ -6794,7 +7233,6 @@
     "list": [
       {
         "current": {
-          "selected": true,
           "text": "Prometheus",
           "value": "Prometheus"
         },
@@ -6929,10 +7367,9 @@
         "allFormat": "regex wildcard",
         "allValue": ".*",
         "current": {
+          "selected": false,
           "text": "All",
-          "value": [
-            "$__all"
-          ]
+          "value": "$__all"
         },
         "datasource": "$datasource",
         "definition": "",
@@ -6991,5 +7428,5 @@
   "timezone": "",
   "title": "Synapse",
   "uid": "000000012",
-  "version": 19
+  "version": 29
 }
\ No newline at end of file
diff --git a/contrib/systemd/matrix-synapse.service b/contrib/systemd/matrix-synapse.service
index 813717b032..a754078410 100644
--- a/contrib/systemd/matrix-synapse.service
+++ b/contrib/systemd/matrix-synapse.service
@@ -15,6 +15,9 @@
 
 [Unit]
 Description=Synapse Matrix homeserver
+# If you are using postgresql to persist data, uncomment this line to make sure
+# synapse starts after the postgresql service.
+# After=postgresql.service
 
 [Service]
 Type=notify
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index ce2c235994..6784234d5f 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -1512,7 +1512,13 @@ saml2_config:
   # * HTML page to display to users if something goes wrong during the
   #   authentication process: 'saml_error.html'.
   #
-  #   This template doesn't currently need any variable to render.
+  #   When rendering, this template is given the following variables:
+  #     * code: an HTML error code corresponding to the error that is being
+  #       returned (typically 400 or 500)
+  #
+  #     * msg: a textual message describing the error.
+  #
+  #   The variables will automatically be HTML-escaped.
   #
   # You can see the default templates at:
   # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
diff --git a/docs/sso_mapping_providers.md b/docs/sso_mapping_providers.md
index 4cd3a568f2..abea432343 100644
--- a/docs/sso_mapping_providers.md
+++ b/docs/sso_mapping_providers.md
@@ -138,6 +138,8 @@ A custom mapping provider must specify the following methods:
        * `mxid_localpart` - Required. The mxid localpart of the new user.
        * `displayname` - The displayname of the new user. If not provided, will default to
                          the value of `mxid_localpart`.
+       * `emails` - A list of emails for the new user. If not provided, will
+                    default to an empty list.
 
 ### Default SAML Mapping Provider
 
diff --git a/setup.py b/setup.py
index 5ce06c8987..54ddec8f9f 100755
--- a/setup.py
+++ b/setup.py
@@ -114,6 +114,7 @@ setup(
         "Programming Language :: Python :: 3.5",
         "Programming Language :: Python :: 3.6",
         "Programming Language :: Python :: 3.7",
+        "Programming Language :: Python :: 3.8",
     ],
     scripts=["synctl"] + glob.glob("scripts/*"),
     cmdclass={"test": TestCommand},
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 5afe52f8d4..f3ec2a34ec 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -863,9 +863,24 @@ class FederationSenderHandler(object):
         a FEDERATION_ACK back to the master, and stores the token that we have processed
          in `federation_stream_position` so that we can restart where we left off.
         """
-        try:
-            self.federation_position = token
+        self.federation_position = token
+
+        # We save and send the ACK to master asynchronously, so we don't block
+        # processing on persistence. We don't need to do this operation for
+        # every single RDATA we receive, we just need to do it periodically.
+
+        if self._fed_position_linearizer.is_queued(None):
+            # There is already a task queued up to save and send the token, so
+            # no need to queue up another task.
+            return
+
+        run_as_background_process("_save_and_send_ack", self._save_and_send_ack)
 
+    async def _save_and_send_ack(self):
+        """Save the current federation position in the database and send an ACK
+        to master with where we're up to.
+        """
+        try:
             # We linearize here to ensure we don't have races updating the token
             #
             # XXX this appears to be redundant, since the ReplicationCommandHandler
@@ -875,16 +890,18 @@ class FederationSenderHandler(object):
             # we're not being re-entered?
 
             with (await self._fed_position_linearizer.queue(None)):
+                # We persist and ack the same position, so we take a copy of it
+                # here as otherwise it can get modified from underneath us.
+                current_position = self.federation_position
+
                 await self.store.update_federation_out_pos(
-                    "federation", self.federation_position
+                    "federation", current_position
                 )
 
                 # We ACK this token over replication so that the master can drop
                 # its in memory queues
-                self._hs.get_tcp_replication().send_federation_ack(
-                    self.federation_position
-                )
-                self._last_ack = self.federation_position
+                self._hs.get_tcp_replication().send_federation_ack(current_position)
+                self._last_ack = current_position
         except Exception:
             logger.exception("Error updating federation stream position")
 
diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py
index 726a27d7b2..38ec256984 100644
--- a/synapse/config/saml2_config.py
+++ b/synapse/config/saml2_config.py
@@ -15,8 +15,8 @@
 # limitations under the License.
 
 import logging
-import os
 
+import jinja2
 import pkg_resources
 
 from synapse.python_dependencies import DependencyException, check_requirements
@@ -167,9 +167,11 @@ class SAML2Config(Config):
         if not template_dir:
             template_dir = pkg_resources.resource_filename("synapse", "res/templates",)
 
-        self.saml2_error_html_content = self.read_file(
-            os.path.join(template_dir, "saml_error.html"), "saml2_config.saml_error",
-        )
+        loader = jinja2.FileSystemLoader(template_dir)
+        # enable auto-escape here, to having to remember to escape manually in the
+        # template
+        env = jinja2.Environment(loader=loader, autoescape=True)
+        self.saml2_error_html_template = env.get_template("saml_error.html")
 
     def _default_saml_config_dict(
         self, required_attributes: set, optional_attributes: set
@@ -349,7 +351,13 @@ class SAML2Config(Config):
           # * HTML page to display to users if something goes wrong during the
           #   authentication process: 'saml_error.html'.
           #
-          #   This template doesn't currently need any variable to render.
+          #   When rendering, this template is given the following variables:
+          #     * code: an HTML error code corresponding to the error that is being
+          #       returned (typically 400 or 500)
+          #
+          #     * msg: a textual message describing the error.
+          #
+          #   The variables will automatically be HTML-escaped.
           #
           # You can see the default templates at:
           # https://github.com/matrix-org/synapse/tree/master/synapse/res/templates
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 4acb4fa489..8a9de913b3 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -19,8 +19,6 @@ import logging
 
 from six import string_types
 
-from twisted.internet import defer
-
 from synapse.api.errors import Codes, SynapseError
 from synapse.types import GroupID, RoomID, UserID, get_domain_from_id
 from synapse.util.async_helpers import concurrently_execute
@@ -51,8 +49,7 @@ class GroupsServerWorkerHandler(object):
         self.transport_client = hs.get_federation_transport_client()
         self.profile_handler = hs.get_profile_handler()
 
-    @defer.inlineCallbacks
-    def check_group_is_ours(
+    async def check_group_is_ours(
         self, group_id, requester_user_id, and_exists=False, and_is_admin=None
     ):
         """Check that the group is ours, and optionally if it exists.
@@ -68,25 +65,24 @@ class GroupsServerWorkerHandler(object):
         if not self.is_mine_id(group_id):
             raise SynapseError(400, "Group not on this server")
 
-        group = yield self.store.get_group(group_id)
+        group = await self.store.get_group(group_id)
         if and_exists and not group:
             raise SynapseError(404, "Unknown group")
 
-        is_user_in_group = yield self.store.is_user_in_group(
+        is_user_in_group = await self.store.is_user_in_group(
             requester_user_id, group_id
         )
         if group and not is_user_in_group and not group["is_public"]:
             raise SynapseError(404, "Unknown group")
 
         if and_is_admin:
-            is_admin = yield self.store.is_user_admin_in_group(group_id, and_is_admin)
+            is_admin = await self.store.is_user_admin_in_group(group_id, and_is_admin)
             if not is_admin:
                 raise SynapseError(403, "User is not admin in group")
 
         return group
 
-    @defer.inlineCallbacks
-    def get_group_summary(self, group_id, requester_user_id):
+    async def get_group_summary(self, group_id, requester_user_id):
         """Get the summary for a group as seen by requester_user_id.
 
         The group summary consists of the profile of the room, and a curated
@@ -95,28 +91,28 @@ class GroupsServerWorkerHandler(object):
 
         A user/room may appear in multiple roles/categories.
         """
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
-        is_user_in_group = yield self.store.is_user_in_group(
+        is_user_in_group = await self.store.is_user_in_group(
             requester_user_id, group_id
         )
 
-        profile = yield self.get_group_profile(group_id, requester_user_id)
+        profile = await self.get_group_profile(group_id, requester_user_id)
 
-        users, roles = yield self.store.get_users_for_summary_by_role(
+        users, roles = await self.store.get_users_for_summary_by_role(
             group_id, include_private=is_user_in_group
         )
 
         # TODO: Add profiles to users
 
-        rooms, categories = yield self.store.get_rooms_for_summary_by_category(
+        rooms, categories = await self.store.get_rooms_for_summary_by_category(
             group_id, include_private=is_user_in_group
         )
 
         for room_entry in rooms:
             room_id = room_entry["room_id"]
-            joined_users = yield self.store.get_users_in_room(room_id)
-            entry = yield self.room_list_handler.generate_room_entry(
+            joined_users = await self.store.get_users_in_room(room_id)
+            entry = await self.room_list_handler.generate_room_entry(
                 room_id, len(joined_users), with_alias=False, allow_private=True
             )
             entry = dict(entry)  # so we don't change whats cached
@@ -130,7 +126,7 @@ class GroupsServerWorkerHandler(object):
             user_id = entry["user_id"]
 
             if not self.is_mine_id(requester_user_id):
-                attestation = yield self.store.get_remote_attestation(group_id, user_id)
+                attestation = await self.store.get_remote_attestation(group_id, user_id)
                 if not attestation:
                     continue
 
@@ -140,12 +136,12 @@ class GroupsServerWorkerHandler(object):
                     group_id, user_id
                 )
 
-            user_profile = yield self.profile_handler.get_profile_from_cache(user_id)
+            user_profile = await self.profile_handler.get_profile_from_cache(user_id)
             entry.update(user_profile)
 
         users.sort(key=lambda e: e.get("order", 0))
 
-        membership_info = yield self.store.get_users_membership_info_in_group(
+        membership_info = await self.store.get_users_membership_info_in_group(
             group_id, requester_user_id
         )
 
@@ -164,22 +160,20 @@ class GroupsServerWorkerHandler(object):
             "user": membership_info,
         }
 
-    @defer.inlineCallbacks
-    def get_group_categories(self, group_id, requester_user_id):
+    async def get_group_categories(self, group_id, requester_user_id):
         """Get all categories in a group (as seen by user)
         """
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
-        categories = yield self.store.get_group_categories(group_id=group_id)
+        categories = await self.store.get_group_categories(group_id=group_id)
         return {"categories": categories}
 
-    @defer.inlineCallbacks
-    def get_group_category(self, group_id, requester_user_id, category_id):
+    async def get_group_category(self, group_id, requester_user_id, category_id):
         """Get a specific category in a group (as seen by user)
         """
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
-        res = yield self.store.get_group_category(
+        res = await self.store.get_group_category(
             group_id=group_id, category_id=category_id
         )
 
@@ -187,32 +181,29 @@ class GroupsServerWorkerHandler(object):
 
         return res
 
-    @defer.inlineCallbacks
-    def get_group_roles(self, group_id, requester_user_id):
+    async def get_group_roles(self, group_id, requester_user_id):
         """Get all roles in a group (as seen by user)
         """
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
-        roles = yield self.store.get_group_roles(group_id=group_id)
+        roles = await self.store.get_group_roles(group_id=group_id)
         return {"roles": roles}
 
-    @defer.inlineCallbacks
-    def get_group_role(self, group_id, requester_user_id, role_id):
+    async def get_group_role(self, group_id, requester_user_id, role_id):
         """Get a specific role in a group (as seen by user)
         """
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
-        res = yield self.store.get_group_role(group_id=group_id, role_id=role_id)
+        res = await self.store.get_group_role(group_id=group_id, role_id=role_id)
         return res
 
-    @defer.inlineCallbacks
-    def get_group_profile(self, group_id, requester_user_id):
+    async def get_group_profile(self, group_id, requester_user_id):
         """Get the group profile as seen by requester_user_id
         """
 
-        yield self.check_group_is_ours(group_id, requester_user_id)
+        await self.check_group_is_ours(group_id, requester_user_id)
 
-        group = yield self.store.get_group(group_id)
+        group = await self.store.get_group(group_id)
 
         if group:
             cols = [
@@ -229,20 +220,19 @@ class GroupsServerWorkerHandler(object):
         else:
             raise SynapseError(404, "Unknown group")
 
-    @defer.inlineCallbacks
-    def get_users_in_group(self, group_id, requester_user_id):
+    async def get_users_in_group(self, group_id, requester_user_id):
         """Get the users in group as seen by requester_user_id.
 
         The ordering is arbitrary at the moment
         """
 
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
-        is_user_in_group = yield self.store.is_user_in_group(
+        is_user_in_group = await self.store.is_user_in_group(
             requester_user_id, group_id
         )
 
-        user_results = yield self.store.get_users_in_group(
+        user_results = await self.store.get_users_in_group(
             group_id, include_private=is_user_in_group
         )
 
@@ -254,14 +244,14 @@ class GroupsServerWorkerHandler(object):
 
             entry = {"user_id": g_user_id}
 
-            profile = yield self.profile_handler.get_profile_from_cache(g_user_id)
+            profile = await self.profile_handler.get_profile_from_cache(g_user_id)
             entry.update(profile)
 
             entry["is_public"] = bool(is_public)
             entry["is_privileged"] = bool(is_privileged)
 
             if not self.is_mine_id(g_user_id):
-                attestation = yield self.store.get_remote_attestation(
+                attestation = await self.store.get_remote_attestation(
                     group_id, g_user_id
                 )
                 if not attestation:
@@ -279,30 +269,29 @@ class GroupsServerWorkerHandler(object):
 
         return {"chunk": chunk, "total_user_count_estimate": len(user_results)}
 
-    @defer.inlineCallbacks
-    def get_invited_users_in_group(self, group_id, requester_user_id):
+    async def get_invited_users_in_group(self, group_id, requester_user_id):
         """Get the users that have been invited to a group as seen by requester_user_id.
 
         The ordering is arbitrary at the moment
         """
 
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
-        is_user_in_group = yield self.store.is_user_in_group(
+        is_user_in_group = await self.store.is_user_in_group(
             requester_user_id, group_id
         )
 
         if not is_user_in_group:
             raise SynapseError(403, "User not in group")
 
-        invited_users = yield self.store.get_invited_users_in_group(group_id)
+        invited_users = await self.store.get_invited_users_in_group(group_id)
 
         user_profiles = []
 
         for user_id in invited_users:
             user_profile = {"user_id": user_id}
             try:
-                profile = yield self.profile_handler.get_profile_from_cache(user_id)
+                profile = await self.profile_handler.get_profile_from_cache(user_id)
                 user_profile.update(profile)
             except Exception as e:
                 logger.warning("Error getting profile for %s: %s", user_id, e)
@@ -310,20 +299,19 @@ class GroupsServerWorkerHandler(object):
 
         return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)}
 
-    @defer.inlineCallbacks
-    def get_rooms_in_group(self, group_id, requester_user_id):
+    async def get_rooms_in_group(self, group_id, requester_user_id):
         """Get the rooms in group as seen by requester_user_id
 
         This returns rooms in order of decreasing number of joined users
         """
 
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
-        is_user_in_group = yield self.store.is_user_in_group(
+        is_user_in_group = await self.store.is_user_in_group(
             requester_user_id, group_id
         )
 
-        room_results = yield self.store.get_rooms_in_group(
+        room_results = await self.store.get_rooms_in_group(
             group_id, include_private=is_user_in_group
         )
 
@@ -331,8 +319,8 @@ class GroupsServerWorkerHandler(object):
         for room_result in room_results:
             room_id = room_result["room_id"]
 
-            joined_users = yield self.store.get_users_in_room(room_id)
-            entry = yield self.room_list_handler.generate_room_entry(
+            joined_users = await self.store.get_users_in_room(room_id)
+            entry = await self.room_list_handler.generate_room_entry(
                 room_id, len(joined_users), with_alias=False, allow_private=True
             )
 
@@ -355,13 +343,12 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
         # Ensure attestations get renewed
         hs.get_groups_attestation_renewer()
 
-    @defer.inlineCallbacks
-    def update_group_summary_room(
+    async def update_group_summary_room(
         self, group_id, requester_user_id, room_id, category_id, content
     ):
         """Add/update a room to the group summary
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
@@ -371,7 +358,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
         is_public = _parse_visibility_from_contents(content)
 
-        yield self.store.add_room_to_summary(
+        await self.store.add_room_to_summary(
             group_id=group_id,
             room_id=room_id,
             category_id=category_id,
@@ -381,31 +368,29 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
         return {}
 
-    @defer.inlineCallbacks
-    def delete_group_summary_room(
+    async def delete_group_summary_room(
         self, group_id, requester_user_id, room_id, category_id
     ):
         """Remove a room from the summary
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
-        yield self.store.remove_room_from_summary(
+        await self.store.remove_room_from_summary(
             group_id=group_id, room_id=room_id, category_id=category_id
         )
 
         return {}
 
-    @defer.inlineCallbacks
-    def set_group_join_policy(self, group_id, requester_user_id, content):
+    async def set_group_join_policy(self, group_id, requester_user_id, content):
         """Sets the group join policy.
 
         Currently supported policies are:
          - "invite": an invite must be received and accepted in order to join.
          - "open": anyone can join.
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
@@ -413,22 +398,23 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
         if join_policy is None:
             raise SynapseError(400, "No value specified for 'm.join_policy'")
 
-        yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
+        await self.store.set_group_join_policy(group_id, join_policy=join_policy)
 
         return {}
 
-    @defer.inlineCallbacks
-    def update_group_category(self, group_id, requester_user_id, category_id, content):
+    async def update_group_category(
+        self, group_id, requester_user_id, category_id, content
+    ):
         """Add/Update a group category
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
         is_public = _parse_visibility_from_contents(content)
         profile = content.get("profile")
 
-        yield self.store.upsert_group_category(
+        await self.store.upsert_group_category(
             group_id=group_id,
             category_id=category_id,
             is_public=is_public,
@@ -437,25 +423,23 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
         return {}
 
-    @defer.inlineCallbacks
-    def delete_group_category(self, group_id, requester_user_id, category_id):
+    async def delete_group_category(self, group_id, requester_user_id, category_id):
         """Delete a group category
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
-        yield self.store.remove_group_category(
+        await self.store.remove_group_category(
             group_id=group_id, category_id=category_id
         )
 
         return {}
 
-    @defer.inlineCallbacks
-    def update_group_role(self, group_id, requester_user_id, role_id, content):
+    async def update_group_role(self, group_id, requester_user_id, role_id, content):
         """Add/update a role in a group
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
@@ -463,31 +447,29 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
         profile = content.get("profile")
 
-        yield self.store.upsert_group_role(
+        await self.store.upsert_group_role(
             group_id=group_id, role_id=role_id, is_public=is_public, profile=profile
         )
 
         return {}
 
-    @defer.inlineCallbacks
-    def delete_group_role(self, group_id, requester_user_id, role_id):
+    async def delete_group_role(self, group_id, requester_user_id, role_id):
         """Remove role from group
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
-        yield self.store.remove_group_role(group_id=group_id, role_id=role_id)
+        await self.store.remove_group_role(group_id=group_id, role_id=role_id)
 
         return {}
 
-    @defer.inlineCallbacks
-    def update_group_summary_user(
+    async def update_group_summary_user(
         self, group_id, requester_user_id, user_id, role_id, content
     ):
         """Add/update a users entry in the group summary
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
@@ -495,7 +477,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
         is_public = _parse_visibility_from_contents(content)
 
-        yield self.store.add_user_to_summary(
+        await self.store.add_user_to_summary(
             group_id=group_id,
             user_id=user_id,
             role_id=role_id,
@@ -505,25 +487,25 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
         return {}
 
-    @defer.inlineCallbacks
-    def delete_group_summary_user(self, group_id, requester_user_id, user_id, role_id):
+    async def delete_group_summary_user(
+        self, group_id, requester_user_id, user_id, role_id
+    ):
         """Remove a user from the group summary
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
-        yield self.store.remove_user_from_summary(
+        await self.store.remove_user_from_summary(
             group_id=group_id, user_id=user_id, role_id=role_id
         )
 
         return {}
 
-    @defer.inlineCallbacks
-    def update_group_profile(self, group_id, requester_user_id, content):
+    async def update_group_profile(self, group_id, requester_user_id, content):
         """Update the group profile
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
@@ -535,40 +517,38 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
                     raise SynapseError(400, "%r value is not a string" % (keyname,))
                 profile[keyname] = value
 
-        yield self.store.update_group_profile(group_id, profile)
+        await self.store.update_group_profile(group_id, profile)
 
-    @defer.inlineCallbacks
-    def add_room_to_group(self, group_id, requester_user_id, room_id, content):
+    async def add_room_to_group(self, group_id, requester_user_id, room_id, content):
         """Add room to group
         """
         RoomID.from_string(room_id)  # Ensure valid room id
 
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
         is_public = _parse_visibility_from_contents(content)
 
-        yield self.store.add_room_to_group(group_id, room_id, is_public=is_public)
+        await self.store.add_room_to_group(group_id, room_id, is_public=is_public)
 
         return {}
 
-    @defer.inlineCallbacks
-    def update_room_in_group(
+    async def update_room_in_group(
         self, group_id, requester_user_id, room_id, config_key, content
     ):
         """Update room in group
         """
         RoomID.from_string(room_id)  # Ensure valid room id
 
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
         if config_key == "m.visibility":
             is_public = _parse_visibility_dict(content)
 
-            yield self.store.update_room_in_group_visibility(
+            await self.store.update_room_in_group_visibility(
                 group_id, room_id, is_public=is_public
             )
         else:
@@ -576,36 +556,34 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
         return {}
 
-    @defer.inlineCallbacks
-    def remove_room_from_group(self, group_id, requester_user_id, room_id):
+    async def remove_room_from_group(self, group_id, requester_user_id, room_id):
         """Remove room from group
         """
-        yield self.check_group_is_ours(
+        await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
-        yield self.store.remove_room_from_group(group_id, room_id)
+        await self.store.remove_room_from_group(group_id, room_id)
 
         return {}
 
-    @defer.inlineCallbacks
-    def invite_to_group(self, group_id, user_id, requester_user_id, content):
+    async def invite_to_group(self, group_id, user_id, requester_user_id, content):
         """Invite user to group
         """
 
-        group = yield self.check_group_is_ours(
+        group = await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
         )
 
         # TODO: Check if user knocked
 
-        invited_users = yield self.store.get_invited_users_in_group(group_id)
+        invited_users = await self.store.get_invited_users_in_group(group_id)
         if user_id in invited_users:
             raise SynapseError(
                 400, "User already invited to group", errcode=Codes.BAD_STATE
             )
 
-        user_results = yield self.store.get_users_in_group(
+        user_results = await self.store.get_users_in_group(
             group_id, include_private=True
         )
         if user_id in (user_result["user_id"] for user_result in user_results):
@@ -618,18 +596,18 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
         if self.hs.is_mine_id(user_id):
             groups_local = self.hs.get_groups_local_handler()
-            res = yield groups_local.on_invite(group_id, user_id, content)
+            res = await groups_local.on_invite(group_id, user_id, content)
             local_attestation = None
         else:
             local_attestation = self.attestations.create_attestation(group_id, user_id)
             content.update({"attestation": local_attestation})
 
-            res = yield self.transport_client.invite_to_group_notification(
+            res = await self.transport_client.invite_to_group_notification(
                 get_domain_from_id(user_id), group_id, user_id, content
             )
 
             user_profile = res.get("user_profile", {})
-            yield self.store.add_remote_profile_cache(
+            await self.store.add_remote_profile_cache(
                 user_id,
                 displayname=user_profile.get("displayname"),
                 avatar_url=user_profile.get("avatar_url"),
@@ -639,13 +617,13 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
             if not self.hs.is_mine_id(user_id):
                 remote_attestation = res["attestation"]
 
-                yield self.attestations.verify_attestation(
+                await self.attestations.verify_attestation(
                     remote_attestation, user_id=user_id, group_id=group_id
                 )
             else:
                 remote_attestation = None
 
-            yield self.store.add_user_to_group(
+            await self.store.add_user_to_group(
                 group_id,
                 user_id,
                 is_admin=False,
@@ -654,15 +632,14 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
                 remote_attestation=remote_attestation,
             )
         elif res["state"] == "invite":
-            yield self.store.add_group_invite(group_id, user_id)
+            await self.store.add_group_invite(group_id, user_id)
             return {"state": "invite"}
         elif res["state"] == "reject":
             return {"state": "reject"}
         else:
             raise SynapseError(502, "Unknown state returned by HS")
 
-    @defer.inlineCallbacks
-    def _add_user(self, group_id, user_id, content):
+    async def _add_user(self, group_id, user_id, content):
         """Add a user to a group based on a content dict.
 
         See accept_invite, join_group.
@@ -672,7 +649,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
             remote_attestation = content["attestation"]
 
-            yield self.attestations.verify_attestation(
+            await self.attestations.verify_attestation(
                 remote_attestation, user_id=user_id, group_id=group_id
             )
         else:
@@ -681,7 +658,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
         is_public = _parse_visibility_from_contents(content)
 
-        yield self.store.add_user_to_group(
+        await self.store.add_user_to_group(
             group_id,
             user_id,
             is_admin=False,
@@ -692,59 +669,55 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
 
         return local_attestation
 
-    @defer.inlineCallbacks
-    def accept_invite(self, group_id, requester_user_id, content):
+    async def accept_invite(self, group_id, requester_user_id, content):
         """User tries to accept an invite to the group.
 
         This is different from them asking to join, and so should error if no
         invite exists (and they're not a member of the group)
         """
 
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
-        is_invited = yield self.store.is_user_invited_to_local_group(
+        is_invited = await self.store.is_user_invited_to_local_group(
             group_id, requester_user_id
         )
         if not is_invited:
             raise SynapseError(403, "User not invited to group")
 
-        local_attestation = yield self._add_user(group_id, requester_user_id, content)
+        local_attestation = await self._add_user(group_id, requester_user_id, content)
 
         return {"state": "join", "attestation": local_attestation}
 
-    @defer.inlineCallbacks
-    def join_group(self, group_id, requester_user_id, content):
+    async def join_group(self, group_id, requester_user_id, content):
         """User tries to join the group.
 
         This will error if the group requires an invite/knock to join
         """
 
-        group_info = yield self.check_group_is_ours(
+        group_info = await self.check_group_is_ours(
             group_id, requester_user_id, and_exists=True
         )
         if group_info["join_policy"] != "open":
             raise SynapseError(403, "Group is not publicly joinable")
 
-        local_attestation = yield self._add_user(group_id, requester_user_id, content)
+        local_attestation = await self._add_user(group_id, requester_user_id, content)
 
         return {"state": "join", "attestation": local_attestation}
 
-    @defer.inlineCallbacks
-    def knock(self, group_id, requester_user_id, content):
+    async def knock(self, group_id, requester_user_id, content):
         """A user requests becoming a member of the group
         """
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
         raise NotImplementedError()
 
-    @defer.inlineCallbacks
-    def accept_knock(self, group_id, requester_user_id, content):
+    async def accept_knock(self, group_id, requester_user_id, content):
         """Accept a users knock to the room.
 
         Errors if the user hasn't knocked, rather than inviting them.
         """
 
-        yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+        await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
 
         raise NotImplementedError()
 
@@ -872,8 +845,6 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
             group_id (str)
             request_user_id (str)
 
-        Returns:
-            Deferred
         """
 
         await self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 29a19b4572..230d170258 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -15,6 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from typing import Any, Dict, Optional
 
 from six import iteritems, itervalues
 
@@ -30,7 +31,11 @@ from synapse.api.errors import (
 )
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import RoomStreamToken, get_domain_from_id
+from synapse.types import (
+    RoomStreamToken,
+    get_domain_from_id,
+    get_verify_key_from_cross_signing_key,
+)
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -704,22 +709,27 @@ class DeviceListUpdater(object):
             need_resync = yield self.store.get_user_ids_requiring_device_list_resync()
             # Iterate over the set of user IDs.
             for user_id in need_resync:
-                # Try to resync the current user's devices list. Exception handling
-                # isn't necessary here, since user_device_resync catches all instances
-                # of "Exception" that might be raised from the federation request. This
-                # means that if an exception is raised by this function, it must be
-                # because of a database issue, which means _maybe_retry_device_resync
-                # probably won't be able to go much further anyway.
-                result = yield self.user_device_resync(
-                    user_id=user_id, mark_failed_as_stale=False,
-                )
-                # user_device_resync only returns a result if it managed to successfully
-                # resync and update the database. Updating the table of users requiring
-                # resync isn't necessary here as user_device_resync already does it
-                # (through self.store.update_remote_device_list_cache).
-                if result:
+                try:
+                    # Try to resync the current user's devices list.
+                    result = yield self.user_device_resync(
+                        user_id=user_id, mark_failed_as_stale=False,
+                    )
+
+                    # user_device_resync only returns a result if it managed to
+                    # successfully resync and update the database. Updating the table
+                    # of users requiring resync isn't necessary here as
+                    # user_device_resync already does it (through
+                    # self.store.update_remote_device_list_cache).
+                    if result:
+                        logger.debug(
+                            "Successfully resynced the device list for %s", user_id,
+                        )
+                except Exception as e:
+                    # If there was an issue resyncing this user, e.g. if the remote
+                    # server sent a malformed result, just log the error instead of
+                    # aborting all the subsequent resyncs.
                     logger.debug(
-                        "Successfully resynced the device list for %s" % user_id,
+                        "Could not resync the device list for %s: %s", user_id, e,
                     )
         finally:
             # Allow future calls to retry resyncinc out of sync device lists.
@@ -738,6 +748,7 @@ class DeviceListUpdater(object):
             request:
             https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
         """
+        logger.debug("Attempting to resync the device list for %s", user_id)
         log_kv({"message": "Doing resync to update device list."})
         # Fetch all devices for the user.
         origin = get_domain_from_id(user_id)
@@ -789,6 +800,13 @@ class DeviceListUpdater(object):
         stream_id = result["stream_id"]
         devices = result["devices"]
 
+        # Get the master key and the self-signing key for this user if provided in the
+        # response (None if not in the response).
+        # The response will not contain the user signing key, as this key is only used by
+        # its owner, thus it doesn't make sense to send it over federation.
+        master_key = result.get("master_key")
+        self_signing_key = result.get("self_signing_key")
+
         # If the remote server has more than ~1000 devices for this user
         # we assume that something is going horribly wrong (e.g. a bot
         # that logs in and creates a new device every time it tries to
@@ -818,6 +836,13 @@ class DeviceListUpdater(object):
 
         yield self.store.update_remote_device_list_cache(user_id, devices, stream_id)
         device_ids = [device["device_id"] for device in devices]
+
+        # Handle cross-signing keys.
+        cross_signing_device_ids = yield self.process_cross_signing_key_update(
+            user_id, master_key, self_signing_key,
+        )
+        device_ids = device_ids + cross_signing_device_ids
+
         yield self.device_handler.notify_device_update(user_id, device_ids)
 
         # We clobber the seen updates since we've re-synced from a given
@@ -825,3 +850,40 @@ class DeviceListUpdater(object):
         self._seen_updates[user_id] = {stream_id}
 
         defer.returnValue(result)
+
+    @defer.inlineCallbacks
+    def process_cross_signing_key_update(
+        self,
+        user_id: str,
+        master_key: Optional[Dict[str, Any]],
+        self_signing_key: Optional[Dict[str, Any]],
+    ) -> list:
+        """Process the given new master and self-signing key for the given remote user.
+
+        Args:
+            user_id: The ID of the user these keys are for.
+            master_key: The dict of the cross-signing master key as returned by the
+                remote server.
+            self_signing_key: The dict of the cross-signing self-signing key as returned
+                by the remote server.
+
+        Return:
+            The device IDs for the given keys.
+        """
+        device_ids = []
+
+        if master_key:
+            yield self.store.set_e2e_cross_signing_key(user_id, "master", master_key)
+            _, verify_key = get_verify_key_from_cross_signing_key(master_key)
+            # verify_key is a VerifyKey from signedjson, which uses
+            # .version to denote the portion of the key ID after the
+            # algorithm and colon, which is the device ID
+            device_ids.append(verify_key.version)
+        if self_signing_key:
+            yield self.store.set_e2e_cross_signing_key(
+                user_id, "self_signing", self_signing_key
+            )
+            _, verify_key = get_verify_key_from_cross_signing_key(self_signing_key)
+            device_ids.append(verify_key.version)
+
+        return device_ids
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 8f1bc0323c..774a252619 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -1291,6 +1291,7 @@ class SigningKeyEduUpdater(object):
         """
 
         device_handler = self.e2e_keys_handler.device_handler
+        device_list_updater = device_handler.device_list_updater
 
         with (yield self._remote_edu_linearizer.queue(user_id)):
             pending_updates = self._pending_updates.pop(user_id, [])
@@ -1303,22 +1304,9 @@ class SigningKeyEduUpdater(object):
             logger.info("pending updates: %r", pending_updates)
 
             for master_key, self_signing_key in pending_updates:
-                if master_key:
-                    yield self.store.set_e2e_cross_signing_key(
-                        user_id, "master", master_key
-                    )
-                    _, verify_key = get_verify_key_from_cross_signing_key(master_key)
-                    # verify_key is a VerifyKey from signedjson, which uses
-                    # .version to denote the portion of the key ID after the
-                    # algorithm and colon, which is the device ID
-                    device_ids.append(verify_key.version)
-                if self_signing_key:
-                    yield self.store.set_e2e_cross_signing_key(
-                        user_id, "self_signing", self_signing_key
-                    )
-                    _, verify_key = get_verify_key_from_cross_signing_key(
-                        self_signing_key
-                    )
-                    device_ids.append(verify_key.version)
+                new_device_ids = yield device_list_updater.process_cross_signing_key_update(
+                    user_id, master_key, self_signing_key,
+                )
+                device_ids = device_ids + new_device_ids
 
             yield device_handler.notify_device_update(user_id, device_ids)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index ca5c83811a..ebe8d25bd8 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -18,8 +18,6 @@ import logging
 
 from six import iteritems
 
-from twisted.internet import defer
-
 from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
 from synapse.types import get_domain_from_id
 
@@ -92,19 +90,18 @@ class GroupsLocalWorkerHandler(object):
     get_group_role = _create_rerouter("get_group_role")
     get_group_roles = _create_rerouter("get_group_roles")
 
-    @defer.inlineCallbacks
-    def get_group_summary(self, group_id, requester_user_id):
+    async def get_group_summary(self, group_id, requester_user_id):
         """Get the group summary for a group.
 
         If the group is remote we check that the users have valid attestations.
         """
         if self.is_mine_id(group_id):
-            res = yield self.groups_server_handler.get_group_summary(
+            res = await self.groups_server_handler.get_group_summary(
                 group_id, requester_user_id
             )
         else:
             try:
-                res = yield self.transport_client.get_group_summary(
+                res = await self.transport_client.get_group_summary(
                     get_domain_from_id(group_id), group_id, requester_user_id
                 )
             except HttpResponseException as e:
@@ -122,7 +119,7 @@ class GroupsLocalWorkerHandler(object):
                 attestation = entry.pop("attestation", {})
                 try:
                     if get_domain_from_id(g_user_id) != group_server_name:
-                        yield self.attestations.verify_attestation(
+                        await self.attestations.verify_attestation(
                             attestation,
                             group_id=group_id,
                             user_id=g_user_id,
@@ -139,19 +136,18 @@ class GroupsLocalWorkerHandler(object):
 
         # Add `is_publicised` flag to indicate whether the user has publicised their
         # membership of the group on their profile
-        result = yield self.store.get_publicised_groups_for_user(requester_user_id)
+        result = await self.store.get_publicised_groups_for_user(requester_user_id)
         is_publicised = group_id in result
 
         res.setdefault("user", {})["is_publicised"] = is_publicised
 
         return res
 
-    @defer.inlineCallbacks
-    def get_users_in_group(self, group_id, requester_user_id):
+    async def get_users_in_group(self, group_id, requester_user_id):
         """Get users in a group
         """
         if self.is_mine_id(group_id):
-            res = yield self.groups_server_handler.get_users_in_group(
+            res = await self.groups_server_handler.get_users_in_group(
                 group_id, requester_user_id
             )
             return res
@@ -159,7 +155,7 @@ class GroupsLocalWorkerHandler(object):
         group_server_name = get_domain_from_id(group_id)
 
         try:
-            res = yield self.transport_client.get_users_in_group(
+            res = await self.transport_client.get_users_in_group(
                 get_domain_from_id(group_id), group_id, requester_user_id
             )
         except HttpResponseException as e:
@@ -174,7 +170,7 @@ class GroupsLocalWorkerHandler(object):
             attestation = entry.pop("attestation", {})
             try:
                 if get_domain_from_id(g_user_id) != group_server_name:
-                    yield self.attestations.verify_attestation(
+                    await self.attestations.verify_attestation(
                         attestation,
                         group_id=group_id,
                         user_id=g_user_id,
@@ -188,15 +184,13 @@ class GroupsLocalWorkerHandler(object):
 
         return res
 
-    @defer.inlineCallbacks
-    def get_joined_groups(self, user_id):
-        group_ids = yield self.store.get_joined_groups(user_id)
+    async def get_joined_groups(self, user_id):
+        group_ids = await self.store.get_joined_groups(user_id)
         return {"groups": group_ids}
 
-    @defer.inlineCallbacks
-    def get_publicised_groups_for_user(self, user_id):
+    async def get_publicised_groups_for_user(self, user_id):
         if self.hs.is_mine_id(user_id):
-            result = yield self.store.get_publicised_groups_for_user(user_id)
+            result = await self.store.get_publicised_groups_for_user(user_id)
 
             # Check AS associated groups for this user - this depends on the
             # RegExps in the AS registration file (under `users`)
@@ -206,7 +200,7 @@ class GroupsLocalWorkerHandler(object):
             return {"groups": result}
         else:
             try:
-                bulk_result = yield self.transport_client.bulk_get_publicised_groups(
+                bulk_result = await self.transport_client.bulk_get_publicised_groups(
                     get_domain_from_id(user_id), [user_id]
                 )
             except HttpResponseException as e:
@@ -218,8 +212,7 @@ class GroupsLocalWorkerHandler(object):
             # TODO: Verify attestations
             return {"groups": result}
 
-    @defer.inlineCallbacks
-    def bulk_get_publicised_groups(self, user_ids, proxy=True):
+    async def bulk_get_publicised_groups(self, user_ids, proxy=True):
         destinations = {}
         local_users = set()
 
@@ -236,7 +229,7 @@ class GroupsLocalWorkerHandler(object):
         failed_results = []
         for destination, dest_user_ids in iteritems(destinations):
             try:
-                r = yield self.transport_client.bulk_get_publicised_groups(
+                r = await self.transport_client.bulk_get_publicised_groups(
                     destination, list(dest_user_ids)
                 )
                 results.update(r["users"])
@@ -244,7 +237,7 @@ class GroupsLocalWorkerHandler(object):
                 failed_results.extend(dest_user_ids)
 
         for uid in local_users:
-            results[uid] = yield self.store.get_publicised_groups_for_user(uid)
+            results[uid] = await self.store.get_publicised_groups_for_user(uid)
 
             # Check AS associated groups for this user - this depends on the
             # RegExps in the AS registration file (under `users`)
@@ -333,12 +326,11 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
 
         return res
 
-    @defer.inlineCallbacks
-    def join_group(self, group_id, user_id, content):
+    async def join_group(self, group_id, user_id, content):
         """Request to join a group
         """
         if self.is_mine_id(group_id):
-            yield self.groups_server_handler.join_group(group_id, user_id, content)
+            await self.groups_server_handler.join_group(group_id, user_id, content)
             local_attestation = None
             remote_attestation = None
         else:
@@ -346,7 +338,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
             content["attestation"] = local_attestation
 
             try:
-                res = yield self.transport_client.join_group(
+                res = await self.transport_client.join_group(
                     get_domain_from_id(group_id), group_id, user_id, content
                 )
             except HttpResponseException as e:
@@ -356,7 +348,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
 
             remote_attestation = res["attestation"]
 
-            yield self.attestations.verify_attestation(
+            await self.attestations.verify_attestation(
                 remote_attestation,
                 group_id=group_id,
                 user_id=user_id,
@@ -366,7 +358,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
         # TODO: Check that the group is public and we're being added publically
         is_publicised = content.get("publicise", False)
 
-        token = yield self.store.register_user_group_membership(
+        token = await self.store.register_user_group_membership(
             group_id,
             user_id,
             membership="join",
@@ -379,12 +371,11 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
 
         return {}
 
-    @defer.inlineCallbacks
-    def accept_invite(self, group_id, user_id, content):
+    async def accept_invite(self, group_id, user_id, content):
         """Accept an invite to a group
         """
         if self.is_mine_id(group_id):
-            yield self.groups_server_handler.accept_invite(group_id, user_id, content)
+            await self.groups_server_handler.accept_invite(group_id, user_id, content)
             local_attestation = None
             remote_attestation = None
         else:
@@ -392,7 +383,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
             content["attestation"] = local_attestation
 
             try:
-                res = yield self.transport_client.accept_group_invite(
+                res = await self.transport_client.accept_group_invite(
                     get_domain_from_id(group_id), group_id, user_id, content
                 )
             except HttpResponseException as e:
@@ -402,7 +393,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
 
             remote_attestation = res["attestation"]
 
-            yield self.attestations.verify_attestation(
+            await self.attestations.verify_attestation(
                 remote_attestation,
                 group_id=group_id,
                 user_id=user_id,
@@ -412,7 +403,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
         # TODO: Check that the group is public and we're being added publically
         is_publicised = content.get("publicise", False)
 
-        token = yield self.store.register_user_group_membership(
+        token = await self.store.register_user_group_membership(
             group_id,
             user_id,
             membership="join",
@@ -425,18 +416,17 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
 
         return {}
 
-    @defer.inlineCallbacks
-    def invite(self, group_id, user_id, requester_user_id, config):
+    async def invite(self, group_id, user_id, requester_user_id, config):
         """Invite a user to a group
         """
         content = {"requester_user_id": requester_user_id, "config": config}
         if self.is_mine_id(group_id):
-            res = yield self.groups_server_handler.invite_to_group(
+            res = await self.groups_server_handler.invite_to_group(
                 group_id, user_id, requester_user_id, content
             )
         else:
             try:
-                res = yield self.transport_client.invite_to_group(
+                res = await self.transport_client.invite_to_group(
                     get_domain_from_id(group_id),
                     group_id,
                     user_id,
@@ -450,8 +440,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
 
         return res
 
-    @defer.inlineCallbacks
-    def on_invite(self, group_id, user_id, content):
+    async def on_invite(self, group_id, user_id, content):
         """One of our users were invited to a group
         """
         # TODO: Support auto join and rejection
@@ -466,7 +455,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
             if "avatar_url" in content["profile"]:
                 local_profile["avatar_url"] = content["profile"]["avatar_url"]
 
-        token = yield self.store.register_user_group_membership(
+        token = await self.store.register_user_group_membership(
             group_id,
             user_id,
             membership="invite",
@@ -474,7 +463,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
         )
         self.notifier.on_new_event("groups_key", token, users=[user_id])
         try:
-            user_profile = yield self.profile_handler.get_profile(user_id)
+            user_profile = await self.profile_handler.get_profile(user_id)
         except Exception as e:
             logger.warning("No profile for user %s: %s", user_id, e)
             user_profile = {}
@@ -516,12 +505,11 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
 
         return res
 
-    @defer.inlineCallbacks
-    def user_removed_from_group(self, group_id, user_id, content):
+    async def user_removed_from_group(self, group_id, user_id, content):
         """One of our users was removed/kicked from a group
         """
         # TODO: Check if user in group
-        token = yield self.store.register_user_group_membership(
+        token = await self.store.register_user_group_membership(
             group_id, user_id, membership="leave"
         )
         self.notifier.on_new_event("groups_key", token, users=[user_id])
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 9ed0d23b0f..4ba0042768 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -25,7 +25,6 @@ from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
 
-from twisted.internet import defer
 from twisted.internet.error import TimeoutError
 
 from synapse.api.errors import (
@@ -60,8 +59,7 @@ class IdentityHandler(BaseHandler):
         self.federation_http_client = hs.get_http_client()
         self.hs = hs
 
-    @defer.inlineCallbacks
-    def threepid_from_creds(self, id_server, creds):
+    async def threepid_from_creds(self, id_server, creds):
         """
         Retrieve and validate a threepid identifier from a "credentials" dictionary against a
         given identity server
@@ -97,7 +95,7 @@ class IdentityHandler(BaseHandler):
         url = id_server + "/_matrix/identity/api/v1/3pid/getValidated3pid"
 
         try:
-            data = yield self.http_client.get_json(url, query_params)
+            data = await self.http_client.get_json(url, query_params)
         except TimeoutError:
             raise SynapseError(500, "Timed out contacting identity server")
         except HttpResponseException as e:
@@ -120,8 +118,7 @@ class IdentityHandler(BaseHandler):
         logger.info("%s reported non-validated threepid: %s", id_server, creds)
         return None
 
-    @defer.inlineCallbacks
-    def bind_threepid(
+    async def bind_threepid(
         self, client_secret, sid, mxid, id_server, id_access_token=None, use_v2=True
     ):
         """Bind a 3PID to an identity server
@@ -161,12 +158,12 @@ class IdentityHandler(BaseHandler):
         try:
             # Use the blacklisting http client as this call is only to identity servers
             # provided by a client
-            data = yield self.blacklisting_http_client.post_json_get_json(
+            data = await self.blacklisting_http_client.post_json_get_json(
                 bind_url, bind_data, headers=headers
             )
 
             # Remember where we bound the threepid
-            yield self.store.add_user_bound_threepid(
+            await self.store.add_user_bound_threepid(
                 user_id=mxid,
                 medium=data["medium"],
                 address=data["address"],
@@ -185,13 +182,12 @@ class IdentityHandler(BaseHandler):
             return data
 
         logger.info("Got 404 when POSTing JSON %s, falling back to v1 URL", bind_url)
-        res = yield self.bind_threepid(
+        res = await self.bind_threepid(
             client_secret, sid, mxid, id_server, id_access_token, use_v2=False
         )
         return res
 
-    @defer.inlineCallbacks
-    def try_unbind_threepid(self, mxid, threepid):
+    async def try_unbind_threepid(self, mxid, threepid):
         """Attempt to remove a 3PID from an identity server, or if one is not provided, all
         identity servers we're aware the binding is present on
 
@@ -211,7 +207,7 @@ class IdentityHandler(BaseHandler):
         if threepid.get("id_server"):
             id_servers = [threepid["id_server"]]
         else:
-            id_servers = yield self.store.get_id_servers_user_bound(
+            id_servers = await self.store.get_id_servers_user_bound(
                 user_id=mxid, medium=threepid["medium"], address=threepid["address"]
             )
 
@@ -221,14 +217,13 @@ class IdentityHandler(BaseHandler):
 
         changed = True
         for id_server in id_servers:
-            changed &= yield self.try_unbind_threepid_with_id_server(
+            changed &= await self.try_unbind_threepid_with_id_server(
                 mxid, threepid, id_server
             )
 
         return changed
 
-    @defer.inlineCallbacks
-    def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server):
+    async def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server):
         """Removes a binding from an identity server
 
         Args:
@@ -266,7 +261,7 @@ class IdentityHandler(BaseHandler):
         try:
             # Use the blacklisting http client as this call is only to identity servers
             # provided by a client
-            yield self.blacklisting_http_client.post_json_get_json(
+            await self.blacklisting_http_client.post_json_get_json(
                 url, content, headers
             )
             changed = True
@@ -281,7 +276,7 @@ class IdentityHandler(BaseHandler):
         except TimeoutError:
             raise SynapseError(500, "Timed out contacting identity server")
 
-        yield self.store.remove_user_bound_threepid(
+        await self.store.remove_user_bound_threepid(
             user_id=mxid,
             medium=threepid["medium"],
             address=threepid["address"],
@@ -376,8 +371,7 @@ class IdentityHandler(BaseHandler):
 
         return session_id
 
-    @defer.inlineCallbacks
-    def requestEmailToken(
+    async def requestEmailToken(
         self, id_server, email, client_secret, send_attempt, next_link=None
     ):
         """
@@ -412,7 +406,7 @@ class IdentityHandler(BaseHandler):
             )
 
         try:
-            data = yield self.http_client.post_json_get_json(
+            data = await self.http_client.post_json_get_json(
                 id_server + "/_matrix/identity/api/v1/validate/email/requestToken",
                 params,
             )
@@ -423,8 +417,7 @@ class IdentityHandler(BaseHandler):
         except TimeoutError:
             raise SynapseError(500, "Timed out contacting identity server")
 
-    @defer.inlineCallbacks
-    def requestMsisdnToken(
+    async def requestMsisdnToken(
         self,
         id_server,
         country,
@@ -466,7 +459,7 @@ class IdentityHandler(BaseHandler):
             )
 
         try:
-            data = yield self.http_client.post_json_get_json(
+            data = await self.http_client.post_json_get_json(
                 id_server + "/_matrix/identity/api/v1/validate/msisdn/requestToken",
                 params,
             )
@@ -487,8 +480,7 @@ class IdentityHandler(BaseHandler):
         )
         return data
 
-    @defer.inlineCallbacks
-    def validate_threepid_session(self, client_secret, sid):
+    async def validate_threepid_session(self, client_secret, sid):
         """Validates a threepid session with only the client secret and session ID
         Tries validating against any configured account_threepid_delegates as well as locally.
 
@@ -510,12 +502,12 @@ class IdentityHandler(BaseHandler):
         # Try to validate as email
         if self.hs.config.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
             # Ask our delegated email identity server
-            validation_session = yield self.threepid_from_creds(
+            validation_session = await self.threepid_from_creds(
                 self.hs.config.account_threepid_delegate_email, threepid_creds
             )
         elif self.hs.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
             # Get a validated session matching these details
-            validation_session = yield self.store.get_threepid_validation_session(
+            validation_session = await self.store.get_threepid_validation_session(
                 "email", client_secret, sid=sid, validated=True
             )
 
@@ -525,14 +517,13 @@ class IdentityHandler(BaseHandler):
         # Try to validate as msisdn
         if self.hs.config.account_threepid_delegate_msisdn:
             # Ask our delegated msisdn identity server
-            validation_session = yield self.threepid_from_creds(
+            validation_session = await self.threepid_from_creds(
                 self.hs.config.account_threepid_delegate_msisdn, threepid_creds
             )
 
         return validation_session
 
-    @defer.inlineCallbacks
-    def proxy_msisdn_submit_token(self, id_server, client_secret, sid, token):
+    async def proxy_msisdn_submit_token(self, id_server, client_secret, sid, token):
         """Proxy a POST submitToken request to an identity server for verification purposes
 
         Args:
@@ -553,11 +544,9 @@ class IdentityHandler(BaseHandler):
         body = {"client_secret": client_secret, "sid": sid, "token": token}
 
         try:
-            return (
-                yield self.http_client.post_json_get_json(
-                    id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken",
-                    body,
-                )
+            return await self.http_client.post_json_get_json(
+                id_server + "/_matrix/identity/api/v1/validate/msisdn/submitToken",
+                body,
             )
         except TimeoutError:
             raise SynapseError(500, "Timed out contacting identity server")
@@ -565,8 +554,7 @@ class IdentityHandler(BaseHandler):
             logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
             raise SynapseError(400, "Error contacting the identity server")
 
-    @defer.inlineCallbacks
-    def lookup_3pid(self, id_server, medium, address, id_access_token=None):
+    async def lookup_3pid(self, id_server, medium, address, id_access_token=None):
         """Looks up a 3pid in the passed identity server.
 
         Args:
@@ -582,7 +570,7 @@ class IdentityHandler(BaseHandler):
         """
         if id_access_token is not None:
             try:
-                results = yield self._lookup_3pid_v2(
+                results = await self._lookup_3pid_v2(
                     id_server, id_access_token, medium, address
                 )
                 return results
@@ -601,10 +589,9 @@ class IdentityHandler(BaseHandler):
                     logger.warning("Error when looking up hashing details: %s", e)
                     return None
 
-        return (yield self._lookup_3pid_v1(id_server, medium, address))
+        return await self._lookup_3pid_v1(id_server, medium, address)
 
-    @defer.inlineCallbacks
-    def _lookup_3pid_v1(self, id_server, medium, address):
+    async def _lookup_3pid_v1(self, id_server, medium, address):
         """Looks up a 3pid in the passed identity server using v1 lookup.
 
         Args:
@@ -617,7 +604,7 @@ class IdentityHandler(BaseHandler):
             str: the matrix ID of the 3pid, or None if it is not recognized.
         """
         try:
-            data = yield self.blacklisting_http_client.get_json(
+            data = await self.blacklisting_http_client.get_json(
                 "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
                 {"medium": medium, "address": address},
             )
@@ -625,7 +612,7 @@ class IdentityHandler(BaseHandler):
             if "mxid" in data:
                 if "signatures" not in data:
                     raise AuthError(401, "No signatures on 3pid binding")
-                yield self._verify_any_signature(data, id_server)
+                await self._verify_any_signature(data, id_server)
                 return data["mxid"]
         except TimeoutError:
             raise SynapseError(500, "Timed out contacting identity server")
@@ -634,8 +621,7 @@ class IdentityHandler(BaseHandler):
 
         return None
 
-    @defer.inlineCallbacks
-    def _lookup_3pid_v2(self, id_server, id_access_token, medium, address):
+    async def _lookup_3pid_v2(self, id_server, id_access_token, medium, address):
         """Looks up a 3pid in the passed identity server using v2 lookup.
 
         Args:
@@ -650,7 +636,7 @@ class IdentityHandler(BaseHandler):
         """
         # Check what hashing details are supported by this identity server
         try:
-            hash_details = yield self.blacklisting_http_client.get_json(
+            hash_details = await self.blacklisting_http_client.get_json(
                 "%s%s/_matrix/identity/v2/hash_details" % (id_server_scheme, id_server),
                 {"access_token": id_access_token},
             )
@@ -717,7 +703,7 @@ class IdentityHandler(BaseHandler):
         headers = {"Authorization": create_id_access_token_header(id_access_token)}
 
         try:
-            lookup_results = yield self.blacklisting_http_client.post_json_get_json(
+            lookup_results = await self.blacklisting_http_client.post_json_get_json(
                 "%s%s/_matrix/identity/v2/lookup" % (id_server_scheme, id_server),
                 {
                     "addresses": [lookup_value],
@@ -745,13 +731,12 @@ class IdentityHandler(BaseHandler):
         mxid = lookup_results["mappings"].get(lookup_value)
         return mxid
 
-    @defer.inlineCallbacks
-    def _verify_any_signature(self, data, server_hostname):
+    async def _verify_any_signature(self, data, server_hostname):
         if server_hostname not in data["signatures"]:
             raise AuthError(401, "No signature from server %s" % (server_hostname,))
         for key_name, signature in data["signatures"][server_hostname].items():
             try:
-                key_data = yield self.blacklisting_http_client.get_json(
+                key_data = await self.blacklisting_http_client.get_json(
                     "%s%s/_matrix/identity/api/v1/pubkey/%s"
                     % (id_server_scheme, server_hostname, key_name)
                 )
@@ -770,8 +755,7 @@ class IdentityHandler(BaseHandler):
             )
             return
 
-    @defer.inlineCallbacks
-    def ask_id_server_for_third_party_invite(
+    async def ask_id_server_for_third_party_invite(
         self,
         requester,
         id_server,
@@ -844,7 +828,7 @@ class IdentityHandler(BaseHandler):
             # Attempt a v2 lookup
             url = base_url + "/v2/store-invite"
             try:
-                data = yield self.blacklisting_http_client.post_json_get_json(
+                data = await self.blacklisting_http_client.post_json_get_json(
                     url,
                     invite_config,
                     {"Authorization": create_id_access_token_header(id_access_token)},
@@ -864,7 +848,7 @@ class IdentityHandler(BaseHandler):
             url = base_url + "/api/v1/store-invite"
 
             try:
-                data = yield self.blacklisting_http_client.post_json_get_json(
+                data = await self.blacklisting_http_client.post_json_get_json(
                     url, invite_config
                 )
             except TimeoutError:
@@ -882,7 +866,7 @@ class IdentityHandler(BaseHandler):
                 # types. This is especially true with old instances of Sydent, see
                 # https://github.com/matrix-org/sydent/pull/170
                 try:
-                    data = yield self.blacklisting_http_client.post_urlencoded_get_json(
+                    data = await self.blacklisting_http_client.post_urlencoded_get_json(
                         url, invite_config
                     )
                 except HttpResponseException as e:
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index e75dabcd77..4cbc02b0d0 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -253,10 +253,21 @@ class RoomListHandler(BaseHandler):
         """
         result = {"room_id": room_id, "num_joined_members": num_joined_users}
 
+        if with_alias:
+            aliases = yield self.store.get_aliases_for_room(
+                room_id, on_invalidate=cache_context.invalidate
+            )
+            if aliases:
+                result["aliases"] = aliases
+
         current_state_ids = yield self.store.get_current_state_ids(
             room_id, on_invalidate=cache_context.invalidate
         )
 
+        if not current_state_ids:
+            # We're not in the room, so may as well bail out here.
+            return result
+
         event_map = yield self.store.get_events(
             [
                 event_id
@@ -289,14 +300,7 @@ class RoomListHandler(BaseHandler):
         create_event = current_state.get((EventTypes.Create, ""))
         result["m.federate"] = create_event.content.get("m.federate", True)
 
-        if with_alias:
-            aliases = yield self.store.get_aliases_for_room(
-                room_id, on_invalidate=cache_context.invalidate
-            )
-            if aliases:
-                result["aliases"] = aliases
-
-        name_event = yield current_state.get((EventTypes.Name, ""))
+        name_event = current_state.get((EventTypes.Name, ""))
         if name_event:
             name = name_event.content.get("name", None)
             if name:
diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py
index e7015c704f..abecaa8313 100644
--- a/synapse/handlers/saml_handler.py
+++ b/synapse/handlers/saml_handler.py
@@ -23,11 +23,9 @@ from saml2.client import Saml2Client
 
 from synapse.api.errors import SynapseError
 from synapse.config import ConfigError
-from synapse.http.server import finish_request
 from synapse.http.servlet import parse_string
 from synapse.http.site import SynapseRequest
 from synapse.module_api import ModuleApi
-from synapse.module_api.errors import RedirectException
 from synapse.types import (
     UserID,
     map_username_to_mxid_localpart,
@@ -80,8 +78,6 @@ class SamlHandler:
         # a lock on the mappings
         self._mapping_lock = Linearizer(name="saml_mapping", clock=self._clock)
 
-        self._error_html_content = hs.config.saml2_error_html_content
-
     def handle_redirect_request(
         self, client_redirect_url: bytes, ui_auth_session_id: Optional[str] = None
     ) -> bytes:
@@ -129,26 +125,9 @@ class SamlHandler:
         # the dict.
         self.expire_sessions()
 
-        try:
-            user_id, current_session = await self._map_saml_response_to_user(
-                resp_bytes, relay_state
-            )
-        except RedirectException:
-            # Raise the exception as per the wishes of the SAML module response
-            raise
-        except Exception as e:
-            # If decoding the response or mapping it to a user failed, then log the
-            # error and tell the user that something went wrong.
-            logger.error(e)
-
-            request.setResponseCode(400)
-            request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
-            request.setHeader(
-                b"Content-Length", b"%d" % (len(self._error_html_content),)
-            )
-            request.write(self._error_html_content.encode("utf8"))
-            finish_request(request)
-            return
+        user_id, current_session = await self._map_saml_response_to_user(
+            resp_bytes, relay_state
+        )
 
         # Complete the interactive auth session or the login.
         if current_session and current_session.ui_auth_session_id:
@@ -171,6 +150,11 @@ class SamlHandler:
 
         Returns:
              Tuple of the user ID and SAML session associated with this response.
+
+        Raises:
+            SynapseError if there was a problem with the response.
+            RedirectException: some mapping providers may raise this if they need
+                to redirect to an interstitial page.
         """
         try:
             saml2_auth = self._saml_client.parse_authn_request_response(
@@ -179,11 +163,9 @@ class SamlHandler:
                 outstanding=self._outstanding_requests_dict,
             )
         except Exception as e:
-            logger.warning("Exception parsing SAML2 response: %s", e)
             raise SynapseError(400, "Unable to parse SAML2 response: %s" % (e,))
 
         if saml2_auth.not_signed:
-            logger.warning("SAML2 response was not signed")
             raise SynapseError(400, "SAML2 response was not signed")
 
         logger.debug("SAML2 response: %s", saml2_auth.origxml)
@@ -264,13 +246,13 @@ class SamlHandler:
 
                 localpart = attribute_dict.get("mxid_localpart")
                 if not localpart:
-                    logger.error(
-                        "SAML mapping provider plugin did not return a "
-                        "mxid_localpart object"
+                    raise Exception(
+                        "Error parsing SAML2 response: SAML mapping provider plugin "
+                        "did not return a mxid_localpart value"
                     )
-                    raise SynapseError(500, "Error parsing SAML2 response")
 
                 displayname = attribute_dict.get("displayname")
+                emails = attribute_dict.get("emails", [])
 
                 # Check if this mxid already exists
                 if not await self._datastore.get_users_by_id_case_insensitive(
@@ -288,7 +270,9 @@ class SamlHandler:
             logger.info("Mapped SAML user to local part %s", localpart)
 
             registered_user_id = await self._registration_handler.register_user(
-                localpart=localpart, default_display_name=displayname
+                localpart=localpart,
+                default_display_name=displayname,
+                bind_emails=emails,
             )
 
             await self._datastore.record_user_external_id(
@@ -381,6 +365,7 @@ class DefaultSamlMappingProvider(object):
             dict: A dict containing new user attributes. Possible keys:
                 * mxid_localpart (str): Required. The localpart of the user's mxid
                 * displayname (str): The displayname of the user
+                * emails (list[str]): Any emails for the user
         """
         try:
             mxid_source = saml_response.ava[self._mxid_source_attribute][0]
@@ -403,9 +388,13 @@ class DefaultSamlMappingProvider(object):
         # If displayname is None, the mxid_localpart will be used instead
         displayname = saml_response.ava.get("displayName", [None])[0]
 
+        # Retrieve any emails present in the saml response
+        emails = saml_response.ava.get("email", [])
+
         return {
             "mxid_localpart": localpart,
             "displayname": displayname,
+            "emails": emails,
         }
 
     @staticmethod
@@ -444,4 +433,4 @@ class DefaultSamlMappingProvider(object):
                 second set consists of those attributes which can be used if
                 available, but are not necessary
         """
-        return {"uid", config.mxid_source_attribute}, {"displayName"}
+        return {"uid", config.mxid_source_attribute}, {"displayName", "email"}
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 8363d887a9..8b24a73319 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -138,8 +138,7 @@ class _BaseThreepidAuthChecker:
         self.hs = hs
         self.store = hs.get_datastore()
 
-    @defer.inlineCallbacks
-    def _check_threepid(self, medium, authdict):
+    async def _check_threepid(self, medium, authdict):
         if "threepid_creds" not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
@@ -155,18 +154,18 @@ class _BaseThreepidAuthChecker:
                 raise SynapseError(
                     400, "Phone number verification is not enabled on this homeserver"
                 )
-            threepid = yield identity_handler.threepid_from_creds(
+            threepid = await identity_handler.threepid_from_creds(
                 self.hs.config.account_threepid_delegate_msisdn, threepid_creds
             )
         elif medium == "email":
             if self.hs.config.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
                 assert self.hs.config.account_threepid_delegate_email
-                threepid = yield identity_handler.threepid_from_creds(
+                threepid = await identity_handler.threepid_from_creds(
                     self.hs.config.account_threepid_delegate_email, threepid_creds
                 )
             elif self.hs.config.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
                 threepid = None
-                row = yield self.store.get_threepid_validation_session(
+                row = await self.store.get_threepid_validation_session(
                     medium,
                     threepid_creds["client_secret"],
                     sid=threepid_creds["sid"],
@@ -181,7 +180,7 @@ class _BaseThreepidAuthChecker:
                     }
 
                     # Valid threepid returned, delete from the db
-                    yield self.store.delete_threepid_session(threepid_creds["sid"])
+                    await self.store.delete_threepid_session(threepid_creds["sid"])
             else:
                 raise SynapseError(
                     400, "Email address verification is not enabled on this homeserver"
@@ -220,7 +219,7 @@ class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChec
         )
 
     def check_auth(self, authdict, clientip):
-        return self._check_threepid("email", authdict)
+        return defer.ensureDeferred(self._check_threepid("email", authdict))
 
 
 class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
@@ -234,7 +233,7 @@ class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
         return bool(self.hs.config.account_threepid_delegate_msisdn)
 
     def check_auth(self, authdict, clientip):
-        return self._check_threepid("msisdn", authdict)
+        return defer.ensureDeferred(self._check_threepid("msisdn", authdict))
 
 
 INTERACTIVE_AUTH_CHECKERS = [
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 9cc2e2e154..2487a72171 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -21,13 +21,15 @@ import logging
 import types
 import urllib
 from io import BytesIO
+from typing import Awaitable, Callable, TypeVar, Union
 
+import jinja2
 from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json
 
 from twisted.internet import defer
 from twisted.python import failure
 from twisted.web import resource
-from twisted.web.server import NOT_DONE_YET
+from twisted.web.server import NOT_DONE_YET, Request
 from twisted.web.static import NoRangeStaticProducer
 from twisted.web.util import redirectTo
 
@@ -40,6 +42,7 @@ from synapse.api.errors import (
     SynapseError,
     UnrecognizedRequestError,
 )
+from synapse.http.site import SynapseRequest
 from synapse.logging.context import preserve_fn
 from synapse.logging.opentracing import trace_servlet
 from synapse.util.caches import intern_dict
@@ -130,7 +133,12 @@ def wrap_json_request_handler(h):
     return wrap_async_request_handler(wrapped_request_handler)
 
 
-def wrap_html_request_handler(h):
+TV = TypeVar("TV")
+
+
+def wrap_html_request_handler(
+    h: Callable[[TV, SynapseRequest], Awaitable]
+) -> Callable[[TV, SynapseRequest], Awaitable[None]]:
     """Wraps a request handler method with exception handling.
 
     Also does the wrapping with request.processing as per wrap_async_request_handler.
@@ -141,20 +149,26 @@ def wrap_html_request_handler(h):
 
     async def wrapped_request_handler(self, request):
         try:
-            return await h(self, request)
+            await h(self, request)
         except Exception:
             f = failure.Failure()
-            return _return_html_error(f, request)
+            return_html_error(f, request, HTML_ERROR_TEMPLATE)
 
     return wrap_async_request_handler(wrapped_request_handler)
 
 
-def _return_html_error(f, request):
-    """Sends an HTML error page corresponding to the given failure
+def return_html_error(
+    f: failure.Failure, request: Request, error_template: Union[str, jinja2.Template],
+) -> None:
+    """Sends an HTML error page corresponding to the given failure.
+
+    Handles RedirectException and other CodeMessageExceptions (such as SynapseError)
 
     Args:
-        f (twisted.python.failure.Failure):
-        request (twisted.web.server.Request):
+        f: the error to report
+        request: the failing request
+        error_template: the HTML template. Can be either a string (with `{code}`,
+            `{msg}` placeholders), or a jinja2 template
     """
     if f.check(CodeMessageException):
         cme = f.value
@@ -174,7 +188,7 @@ def _return_html_error(f, request):
                 exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     else:
-        code = http.client.INTERNAL_SERVER_ERROR
+        code = http.HTTPStatus.INTERNAL_SERVER_ERROR
         msg = "Internal server error"
 
         logger.error(
@@ -183,11 +197,16 @@ def _return_html_error(f, request):
             exc_info=(f.type, f.value, f.getTracebackObject()),
         )
 
-    body = HTML_ERROR_TEMPLATE.format(code=code, msg=html.escape(msg)).encode("utf-8")
+    if isinstance(error_template, str):
+        body = error_template.format(code=code, msg=html.escape(msg))
+    else:
+        body = error_template.render(code=code, msg=msg)
+
+    body_bytes = body.encode("utf-8")
     request.setResponseCode(code)
     request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
-    request.setHeader(b"Content-Length", b"%i" % (len(body),))
-    request.write(body)
+    request.setHeader(b"Content-Length", b"%i" % (len(body_bytes),))
+    request.write(body_bytes)
     finish_request(request)
 
 
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 8449ef82f7..13785038ad 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -17,16 +17,18 @@ import logging
 import threading
 from asyncio import iscoroutine
 from functools import wraps
-from typing import Dict, Set
+from typing import TYPE_CHECKING, Dict, Optional, Set
 
-import six
-
-from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, Counter, Gauge
 
 from twisted.internet import defer
 
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
 
+if TYPE_CHECKING:
+    import resource
+
+
 logger = logging.getLogger(__name__)
 
 
@@ -36,6 +38,12 @@ _background_process_start_count = Counter(
     ["name"],
 )
 
+_background_process_in_flight_count = Gauge(
+    "synapse_background_process_in_flight_count",
+    "Number of background processes in flight",
+    labelnames=["name"],
+)
+
 # we set registry=None in all of these to stop them getting registered with
 # the default registry. Instead we collect them all via the CustomCollector,
 # which ensures that we can update them before they are collected.
@@ -83,13 +91,17 @@ _background_process_db_sched_duration = Counter(
 # it's much simpler to do so than to try to combine them.)
 _background_process_counts = {}  # type: Dict[str, int]
 
-# map from description to the currently running background processes.
+# Set of all running background processes that became active active since the
+# last time metrics were scraped (i.e. background processes that performed some
+# work since the last scrape.)
 #
-# it's kept as a dict of sets rather than a big set so that we can keep track
-# of process descriptions that no longer have any active processes.
-_background_processes = {}  # type: Dict[str, Set[_BackgroundProcess]]
+# We do it like this to handle the case where we have a large number of
+# background processes stacking up behind a lock or linearizer, where we then
+# only need to iterate over and update metrics for the process that have
+# actually been active and can ignore the idle ones.
+_background_processes_active_since_last_scrape = set()  # type: Set[_BackgroundProcess]
 
-# A lock that covers the above dicts
+# A lock that covers the above set and dict
 _bg_metrics_lock = threading.Lock()
 
 
@@ -101,25 +113,16 @@ class _Collector(object):
     """
 
     def collect(self):
-        background_process_in_flight_count = GaugeMetricFamily(
-            "synapse_background_process_in_flight_count",
-            "Number of background processes in flight",
-            labels=["name"],
-        )
+        global _background_processes_active_since_last_scrape
 
-        # We copy the dict so that it doesn't change from underneath us.
-        # We also copy the process lists as that can also change
+        # We swap out the _background_processes set with an empty one so that
+        # we can safely iterate over the set without holding the lock.
         with _bg_metrics_lock:
-            _background_processes_copy = {
-                k: list(v) for k, v in six.iteritems(_background_processes)
-            }
+            _background_processes_copy = _background_processes_active_since_last_scrape
+            _background_processes_active_since_last_scrape = set()
 
-        for desc, processes in six.iteritems(_background_processes_copy):
-            background_process_in_flight_count.add_metric((desc,), len(processes))
-            for process in processes:
-                process.update_metrics()
-
-        yield background_process_in_flight_count
+        for process in _background_processes_copy:
+            process.update_metrics()
 
         # now we need to run collect() over each of the static Counters, and
         # yield each metric they return.
@@ -191,13 +194,10 @@ def run_as_background_process(desc, func, *args, **kwargs):
             _background_process_counts[desc] = count + 1
 
         _background_process_start_count.labels(desc).inc()
+        _background_process_in_flight_count.labels(desc).inc()
 
-        with LoggingContext(desc) as context:
+        with BackgroundProcessLoggingContext(desc) as context:
             context.request = "%s-%i" % (desc, count)
-            proc = _BackgroundProcess(desc, context)
-
-            with _bg_metrics_lock:
-                _background_processes.setdefault(desc, set()).add(proc)
 
             try:
                 result = func(*args, **kwargs)
@@ -214,10 +214,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
             except Exception:
                 logger.exception("Background process '%s' threw an exception", desc)
             finally:
-                proc.update_metrics()
-
-                with _bg_metrics_lock:
-                    _background_processes[desc].remove(proc)
+                _background_process_in_flight_count.labels(desc).dec()
 
     with PreserveLoggingContext():
         return run()
@@ -238,3 +235,42 @@ def wrap_as_background_process(desc):
         return wrap_as_background_process_inner_2
 
     return wrap_as_background_process_inner
+
+
+class BackgroundProcessLoggingContext(LoggingContext):
+    """A logging context that tracks in flight metrics for background
+    processes.
+    """
+
+    __slots__ = ["_proc"]
+
+    def __init__(self, name: str):
+        super().__init__(name)
+
+        self._proc = _BackgroundProcess(name, self)
+
+    def start(self, rusage: "Optional[resource._RUsage]"):
+        """Log context has started running (again).
+        """
+
+        super().start(rusage)
+
+        # We've become active again so we make sure we're in the list of active
+        # procs. (Note that "start" here means we've become active, as opposed
+        # to starting for the first time.)
+        with _bg_metrics_lock:
+            _background_processes_active_since_last_scrape.add(self._proc)
+
+    def __exit__(self, type, value, traceback) -> None:
+        """Log context has finished.
+        """
+
+        super().__exit__(type, value, traceback)
+
+        # The background process has finished. We explictly remove and manually
+        # update the metrics here so that if nothing is scraping metrics the set
+        # doesn't infinitely grow.
+        with _bg_metrics_lock:
+            _background_processes_active_since_last_scrape.discard(self._proc)
+
+        self._proc.update_metrics()
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index e7f6928c85..82251dbe5f 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -142,6 +142,7 @@ class UserRestServletV2(RestServlet):
         self.set_password_handler = hs.get_set_password_handler()
         self.deactivate_account_handler = hs.get_deactivate_account_handler()
         self.registration_handler = hs.get_registration_handler()
+        self.pusher_pool = hs.get_pusherpool()
 
     async def on_GET(self, request, user_id):
         await assert_requester_is_admin(self.auth, request)
@@ -281,6 +282,21 @@ class UserRestServletV2(RestServlet):
                     await self.auth_handler.add_threepid(
                         user_id, threepid["medium"], threepid["address"], current_time
                     )
+                    if (
+                        self.hs.config.email_enable_notifs
+                        and self.hs.config.email_notif_for_new_users
+                    ):
+                        await self.pusher_pool.add_pusher(
+                            user_id=user_id,
+                            access_token=None,
+                            kind="email",
+                            app_id="m.email",
+                            app_display_name="Email Notifications",
+                            device_display_name=threepid["address"],
+                            pushkey=threepid["address"],
+                            lang=None,  # We don't know a user's language here
+                            data={},
+                        )
 
             if "avatar_url" in body and type(body["avatar_url"]) == str:
                 await self.profile_handler.set_avatar_url(
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index d89b2e5532..36aca82346 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -299,7 +299,7 @@ class LoginRestServlet(RestServlet):
         return result
 
     async def _complete_login(
-        self, user_id, login_submission, callback=None, create_non_existant_users=False
+        self, user_id, login_submission, callback=None, create_non_existent_users=False
     ):
         """Called when we've successfully authed the user and now need to
         actually login them in (e.g. create devices). This gets called on
@@ -312,7 +312,7 @@ class LoginRestServlet(RestServlet):
             user_id (str): ID of the user to register.
             login_submission (dict): Dictionary of login information.
             callback (func|None): Callback function to run after registration.
-            create_non_existant_users (bool): Whether to create the user if
+            create_non_existent_users (bool): Whether to create the user if
                 they don't exist. Defaults to False.
 
         Returns:
@@ -331,12 +331,13 @@ class LoginRestServlet(RestServlet):
             update=True,
         )
 
-        if create_non_existant_users:
-            user_id = await self.auth_handler.check_user_exists(user_id)
-            if not user_id:
-                user_id = await self.registration_handler.register_user(
+        if create_non_existent_users:
+            canonical_uid = await self.auth_handler.check_user_exists(user_id)
+            if not canonical_uid:
+                canonical_uid = await self.registration_handler.register_user(
                     localpart=UserID.from_string(user_id).localpart
                 )
+            user_id = canonical_uid
 
         device_id = login_submission.get("device_id")
         initial_display_name = login_submission.get("initial_device_display_name")
@@ -391,7 +392,7 @@ class LoginRestServlet(RestServlet):
 
         user_id = UserID(user, self.hs.hostname).to_string()
         result = await self._complete_login(
-            user_id, login_submission, create_non_existant_users=True
+            user_id, login_submission, create_non_existent_users=True
         )
         return result
 
diff --git a/synapse/rest/saml2/response_resource.py b/synapse/rest/saml2/response_resource.py
index a545c13db7..75e58043b4 100644
--- a/synapse/rest/saml2/response_resource.py
+++ b/synapse/rest/saml2/response_resource.py
@@ -13,12 +13,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from twisted.python import failure
 
-from synapse.http.server import (
-    DirectServeResource,
-    finish_request,
-    wrap_html_request_handler,
-)
+from synapse.api.errors import SynapseError
+from synapse.http.server import DirectServeResource, return_html_error
 
 
 class SAML2ResponseResource(DirectServeResource):
@@ -28,20 +26,22 @@ class SAML2ResponseResource(DirectServeResource):
 
     def __init__(self, hs):
         super().__init__()
-        self._error_html_content = hs.config.saml2_error_html_content
         self._saml_handler = hs.get_saml_handler()
+        self._error_html_template = hs.config.saml2.saml2_error_html_template
 
     async def _async_render_GET(self, request):
         # We're not expecting any GET request on that resource if everything goes right,
         # but some IdPs sometimes end up responding with a 302 redirect on this endpoint.
         # In this case, just tell the user that something went wrong and they should
         # try to authenticate again.
-        request.setResponseCode(400)
-        request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
-        request.setHeader(b"Content-Length", b"%d" % (len(self._error_html_content),))
-        request.write(self._error_html_content.encode("utf8"))
-        finish_request(request)
+        f = failure.Failure(
+            SynapseError(400, "Unexpected GET request on /saml2/authn_response")
+        )
+        return_html_error(f, request, self._error_html_template)
 
-    @wrap_html_request_handler
     async def _async_render_POST(self, request):
-        return await self._saml_handler.handle_saml_response(request)
+        try:
+            await self._saml_handler.handle_saml_response(request)
+        except Exception:
+            f = failure.Failure()
+            return_html_error(f, request, self._error_html_template)
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index 0d932a0672..cebdcd409f 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -391,7 +391,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             (user_id, room_id, receipt_type),
         )
 
-        self.db.simple_delete_txn(
+        self.db.simple_upsert_txn(
             txn,
             table="receipts_linearized",
             keyvalues={
@@ -399,19 +399,14 @@ class ReceiptsStore(ReceiptsWorkerStore):
                 "receipt_type": receipt_type,
                 "user_id": user_id,
             },
-        )
-
-        self.db.simple_insert_txn(
-            txn,
-            table="receipts_linearized",
             values={
                 "stream_id": stream_id,
-                "room_id": room_id,
-                "receipt_type": receipt_type,
-                "user_id": user_id,
                 "event_id": event_id,
                 "data": json.dumps(data),
             },
+            # receipts_linearized has a unique constraint on
+            # (user_id, room_id, receipt_type), so no need to lock
+            lock=False,
         )
 
         if receipt_type == "m.read" and stream_ordering is not None:
diff --git a/synapse/storage/data_stores/state/bg_updates.py b/synapse/storage/data_stores/state/bg_updates.py
index e8edaf9f7b..ff000bc9ec 100644
--- a/synapse/storage/data_stores/state/bg_updates.py
+++ b/synapse/storage/data_stores/state/bg_updates.py
@@ -109,20 +109,20 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore):
                     SELECT prev_state_group FROM state_group_edges e, state s
                     WHERE s.state_group = e.state_group
                 )
-                SELECT DISTINCT type, state_key, last_value(event_id) OVER (
-                    PARTITION BY type, state_key ORDER BY state_group ASC
-                    ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
-                ) AS event_id FROM state_groups_state
+                SELECT DISTINCT ON (type, state_key)
+                    type, state_key, event_id
+                FROM state_groups_state
                 WHERE state_group IN (
                     SELECT state_group FROM state
-                )
+                ) %s
+                ORDER BY type, state_key, state_group DESC
             """
 
             for group in groups:
                 args = [group]
                 args.extend(where_args)
 
-                txn.execute(sql + where_clause, args)
+                txn.execute(sql % (where_clause,), args)
                 for row in txn:
                     typ, state_key, event_id = row
                     key = (typ, state_key)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 9afc145340..b95434f031 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -366,9 +366,8 @@ def _upgrade_existing_database(
         if duplicates:
             # We don't support using the same file name in the same delta version.
             raise PrepareDatabaseException(
-                "Found multiple delta files with the same name in v%d: %s",
-                v,
-                duplicates,
+                "Found multiple delta files with the same name in v%d: %s"
+                % (v, duplicates,)
             )
 
         # We sort to ensure that we apply the delta files in a consistent
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 581dffd8a0..f7af2bca7f 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -225,6 +225,18 @@ class Linearizer(object):
             {}
         )  # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]]
 
+    def is_queued(self, key) -> bool:
+        """Checks whether there is a process queued up waiting
+        """
+        entry = self.key_to_defer.get(key)
+        if not entry:
+            # No entry so nothing is waiting.
+            return False
+
+        # There are waiting deferreds only in the OrderedDict of deferreds is
+        # non-empty.
+        return bool(entry[1])
+
     def queue(self, key):
         # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly.
         # (https://twistedmatrix.com/trac/ticket/4632 meant that cancellations were not
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 6c88ab06e2..e29cc24a8a 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -516,6 +516,81 @@ class UserRestTestCase(unittest.HomeserverTestCase):
         self.assertEqual(False, channel.json_body["is_guest"])
         self.assertEqual(False, channel.json_body["deactivated"])
 
+    def test_create_user_email_notif_for_new_users(self):
+        """
+        Check that a new regular user is created successfully and
+        got an email pusher.
+        """
+        self.hs.config.registration_shared_secret = None
+        self.hs.config.email_enable_notifs = True
+        self.hs.config.email_notif_for_new_users = True
+        url = "/_synapse/admin/v2/users/@bob:test"
+
+        # Create user
+        body = json.dumps(
+            {
+                "password": "abc123",
+                "threepids": [{"medium": "email", "address": "bob@bob.bob"}],
+            }
+        )
+
+        request, channel = self.make_request(
+            "PUT",
+            url,
+            access_token=self.admin_user_tok,
+            content=body.encode(encoding="utf_8"),
+        )
+        self.render(request)
+
+        self.assertEqual(201, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual("@bob:test", channel.json_body["name"])
+        self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
+        self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"])
+
+        pushers = self.get_success(
+            self.store.get_pushers_by({"user_name": "@bob:test"})
+        )
+        pushers = list(pushers)
+        self.assertEqual(len(pushers), 1)
+        self.assertEqual("@bob:test", pushers[0]["user_name"])
+
+    def test_create_user_email_no_notif_for_new_users(self):
+        """
+        Check that a new regular user is created successfully and
+        got not an email pusher.
+        """
+        self.hs.config.registration_shared_secret = None
+        self.hs.config.email_enable_notifs = False
+        self.hs.config.email_notif_for_new_users = False
+        url = "/_synapse/admin/v2/users/@bob:test"
+
+        # Create user
+        body = json.dumps(
+            {
+                "password": "abc123",
+                "threepids": [{"medium": "email", "address": "bob@bob.bob"}],
+            }
+        )
+
+        request, channel = self.make_request(
+            "PUT",
+            url,
+            access_token=self.admin_user_tok,
+            content=body.encode(encoding="utf_8"),
+        )
+        self.render(request)
+
+        self.assertEqual(201, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual("@bob:test", channel.json_body["name"])
+        self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
+        self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"])
+
+        pushers = self.get_success(
+            self.store.get_pushers_by({"user_name": "@bob:test"})
+        )
+        pushers = list(pushers)
+        self.assertEqual(len(pushers), 0)
+
     def test_set_password(self):
         """
         Test setting a new password for another user.
diff --git a/tests/rest/client/v1/test_login.py b/tests/rest/client/v1/test_login.py
index eb8f6264fd..0f0f7ca72d 100644
--- a/tests/rest/client/v1/test_login.py
+++ b/tests/rest/client/v1/test_login.py
@@ -1,8 +1,11 @@
 import json
+import time
 import urllib.parse
 
 from mock import Mock
 
+import jwt
+
 import synapse.rest.admin
 from synapse.rest.client.v1 import login, logout
 from synapse.rest.client.v2_alpha import devices
@@ -473,3 +476,153 @@ class CASTestCase(unittest.HomeserverTestCase):
         # Because the user is deactivated they are served an error template.
         self.assertEqual(channel.code, 403)
         self.assertIn(b"SSO account deactivated", channel.result["body"])
+
+
+class JWTTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets_for_client_rest_resource,
+        login.register_servlets,
+    ]
+
+    jwt_secret = "secret"
+
+    def make_homeserver(self, reactor, clock):
+        self.hs = self.setup_test_homeserver()
+        self.hs.config.jwt_enabled = True
+        self.hs.config.jwt_secret = self.jwt_secret
+        self.hs.config.jwt_algorithm = "HS256"
+        return self.hs
+
+    def jwt_encode(self, token, secret=jwt_secret):
+        return jwt.encode(token, secret, "HS256").decode("ascii")
+
+    def jwt_login(self, *args):
+        params = json.dumps({"type": "m.login.jwt", "token": self.jwt_encode(*args)})
+        request, channel = self.make_request(b"POST", LOGIN_URL, params)
+        self.render(request)
+        return channel
+
+    def test_login_jwt_valid_registered(self):
+        self.register_user("kermit", "monkey")
+        channel = self.jwt_login({"sub": "kermit"})
+        self.assertEqual(channel.result["code"], b"200", channel.result)
+        self.assertEqual(channel.json_body["user_id"], "@kermit:test")
+
+    def test_login_jwt_valid_unregistered(self):
+        channel = self.jwt_login({"sub": "frog"})
+        self.assertEqual(channel.result["code"], b"200", channel.result)
+        self.assertEqual(channel.json_body["user_id"], "@frog:test")
+
+    def test_login_jwt_invalid_signature(self):
+        channel = self.jwt_login({"sub": "frog"}, "notsecret")
+        self.assertEqual(channel.result["code"], b"401", channel.result)
+        self.assertEqual(channel.json_body["errcode"], "M_UNAUTHORIZED")
+        self.assertEqual(channel.json_body["error"], "Invalid JWT")
+
+    def test_login_jwt_expired(self):
+        channel = self.jwt_login({"sub": "frog", "exp": 864000})
+        self.assertEqual(channel.result["code"], b"401", channel.result)
+        self.assertEqual(channel.json_body["errcode"], "M_UNAUTHORIZED")
+        self.assertEqual(channel.json_body["error"], "JWT expired")
+
+    def test_login_jwt_not_before(self):
+        now = int(time.time())
+        channel = self.jwt_login({"sub": "frog", "nbf": now + 3600})
+        self.assertEqual(channel.result["code"], b"401", channel.result)
+        self.assertEqual(channel.json_body["errcode"], "M_UNAUTHORIZED")
+        self.assertEqual(channel.json_body["error"], "Invalid JWT")
+
+    def test_login_no_sub(self):
+        channel = self.jwt_login({"username": "root"})
+        self.assertEqual(channel.result["code"], b"401", channel.result)
+        self.assertEqual(channel.json_body["errcode"], "M_UNAUTHORIZED")
+        self.assertEqual(channel.json_body["error"], "Invalid JWT")
+
+    def test_login_no_token(self):
+        params = json.dumps({"type": "m.login.jwt"})
+        request, channel = self.make_request(b"POST", LOGIN_URL, params)
+        self.render(request)
+        self.assertEqual(channel.result["code"], b"401", channel.result)
+        self.assertEqual(channel.json_body["errcode"], "M_UNAUTHORIZED")
+        self.assertEqual(channel.json_body["error"], "Token field for JWT is missing")
+
+
+# The JWTPubKeyTestCase is a complement to JWTTestCase where we instead use
+# RSS256, with a public key configured in synapse as "jwt_secret", and tokens
+# signed by the private key.
+class JWTPubKeyTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        login.register_servlets,
+    ]
+
+    # This key's pubkey is used as the jwt_secret setting of synapse. Valid
+    # tokens are signed by this and validated using the pubkey. It is generated
+    # with `openssl genrsa 512` (not a secure way to generate real keys, but
+    # good enough for tests!)
+    jwt_privatekey = "\n".join(
+        [
+            "-----BEGIN RSA PRIVATE KEY-----",
+            "MIIBPAIBAAJBAM50f1Q5gsdmzifLstzLHb5NhfajiOt7TKO1vSEWdq7u9x8SMFiB",
+            "492RM9W/XFoh8WUfL9uL6Now6tPRDsWv3xsCAwEAAQJAUv7OOSOtiU+wzJq82rnk",
+            "yR4NHqt7XX8BvkZPM7/+EjBRanmZNSp5kYZzKVaZ/gTOM9+9MwlmhidrUOweKfB/",
+            "kQIhAPZwHazbjo7dYlJs7wPQz1vd+aHSEH+3uQKIysebkmm3AiEA1nc6mDdmgiUq",
+            "TpIN8A4MBKmfZMWTLq6z05y/qjKyxb0CIQDYJxCwTEenIaEa4PdoJl+qmXFasVDN",
+            "ZU0+XtNV7yul0wIhAMI9IhiStIjS2EppBa6RSlk+t1oxh2gUWlIh+YVQfZGRAiEA",
+            "tqBR7qLZGJ5CVKxWmNhJZGt1QHoUtOch8t9C4IdOZ2g=",
+            "-----END RSA PRIVATE KEY-----",
+        ]
+    )
+
+    # Generated with `openssl rsa -in foo.key -pubout`, with the the above
+    # private key placed in foo.key (jwt_privatekey).
+    jwt_pubkey = "\n".join(
+        [
+            "-----BEGIN PUBLIC KEY-----",
+            "MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBAM50f1Q5gsdmzifLstzLHb5NhfajiOt7",
+            "TKO1vSEWdq7u9x8SMFiB492RM9W/XFoh8WUfL9uL6Now6tPRDsWv3xsCAwEAAQ==",
+            "-----END PUBLIC KEY-----",
+        ]
+    )
+
+    # This key is used to sign tokens that shouldn't be accepted by synapse.
+    # Generated just like jwt_privatekey.
+    bad_privatekey = "\n".join(
+        [
+            "-----BEGIN RSA PRIVATE KEY-----",
+            "MIIBOgIBAAJBAL//SQrKpKbjCCnv/FlasJCv+t3k/MPsZfniJe4DVFhsktF2lwQv",
+            "gLjmQD3jBUTz+/FndLSBvr3F4OHtGL9O/osCAwEAAQJAJqH0jZJW7Smzo9ShP02L",
+            "R6HRZcLExZuUrWI+5ZSP7TaZ1uwJzGFspDrunqaVoPobndw/8VsP8HFyKtceC7vY",
+            "uQIhAPdYInDDSJ8rFKGiy3Ajv5KWISBicjevWHF9dbotmNO9AiEAxrdRJVU+EI9I",
+            "eB4qRZpY6n4pnwyP0p8f/A3NBaQPG+cCIFlj08aW/PbxNdqYoBdeBA0xDrXKfmbb",
+            "iwYxBkwL0JCtAiBYmsi94sJn09u2Y4zpuCbJeDPKzWkbuwQh+W1fhIWQJQIhAKR0",
+            "KydN6cRLvphNQ9c/vBTdlzWxzcSxREpguC7F1J1m",
+            "-----END RSA PRIVATE KEY-----",
+        ]
+    )
+
+    def make_homeserver(self, reactor, clock):
+        self.hs = self.setup_test_homeserver()
+        self.hs.config.jwt_enabled = True
+        self.hs.config.jwt_secret = self.jwt_pubkey
+        self.hs.config.jwt_algorithm = "RS256"
+        return self.hs
+
+    def jwt_encode(self, token, secret=jwt_privatekey):
+        return jwt.encode(token, secret, "RS256").decode("ascii")
+
+    def jwt_login(self, *args):
+        params = json.dumps({"type": "m.login.jwt", "token": self.jwt_encode(*args)})
+        request, channel = self.make_request(b"POST", LOGIN_URL, params)
+        self.render(request)
+        return channel
+
+    def test_login_jwt_valid(self):
+        channel = self.jwt_login({"sub": "kermit"})
+        self.assertEqual(channel.result["code"], b"200", channel.result)
+        self.assertEqual(channel.json_body["user_id"], "@kermit:test")
+
+    def test_login_jwt_invalid_signature(self):
+        channel = self.jwt_login({"sub": "frog"}, self.bad_privatekey)
+        self.assertEqual(channel.result["code"], b"401", channel.result)
+        self.assertEqual(channel.json_body["errcode"], "M_UNAUTHORIZED")
+        self.assertEqual(channel.json_body["error"], "Invalid JWT")
diff --git a/tests/test_federation.py b/tests/test_federation.py
index c5099dd039..c662195eec 100644
--- a/tests/test_federation.py
+++ b/tests/test_federation.py
@@ -206,3 +206,59 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
         # list.
         self.reactor.advance(30)
         self.assertEqual(self.resync_attempts, 2)
+
+    def test_cross_signing_keys_retry(self):
+        """Tests that resyncing a device list correctly processes cross-signing keys from
+        the remote server.
+        """
+        remote_user_id = "@john:test_remote"
+        remote_master_key = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
+        remote_self_signing_key = "QeIiFEjluPBtI7WQdG365QKZcFs9kqmHir6RBD0//nQ"
+
+        # Register mock device list retrieval on the federation client.
+        federation_client = self.homeserver.get_federation_client()
+        federation_client.query_user_devices = Mock(
+            return_value={
+                "user_id": remote_user_id,
+                "stream_id": 1,
+                "devices": [],
+                "master_key": {
+                    "user_id": remote_user_id,
+                    "usage": ["master"],
+                    "keys": {"ed25519:" + remote_master_key: remote_master_key},
+                },
+                "self_signing_key": {
+                    "user_id": remote_user_id,
+                    "usage": ["self_signing"],
+                    "keys": {
+                        "ed25519:" + remote_self_signing_key: remote_self_signing_key
+                    },
+                },
+            }
+        )
+
+        # Resync the device list.
+        device_handler = self.homeserver.get_device_handler()
+        self.get_success(
+            device_handler.device_list_updater.user_device_resync(remote_user_id),
+        )
+
+        # Retrieve the cross-signing keys for this user.
+        keys = self.get_success(
+            self.store.get_e2e_cross_signing_keys_bulk(user_ids=[remote_user_id]),
+        )
+        self.assertTrue(remote_user_id in keys)
+
+        # Check that the master key is the one returned by the mock.
+        master_key = keys[remote_user_id]["master"]
+        self.assertEqual(len(master_key["keys"]), 1)
+        self.assertTrue("ed25519:" + remote_master_key in master_key["keys"].keys())
+        self.assertTrue(remote_master_key in master_key["keys"].values())
+
+        # Check that the self-signing key is the one returned by the mock.
+        self_signing_key = keys[remote_user_id]["self_signing"]
+        self.assertEqual(len(self_signing_key["keys"]), 1)
+        self.assertTrue(
+            "ed25519:" + remote_self_signing_key in self_signing_key["keys"].keys(),
+        )
+        self.assertTrue(remote_self_signing_key in self_signing_key["keys"].values())
diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py
index 852ef23185..ca3858b184 100644
--- a/tests/util/test_linearizer.py
+++ b/tests/util/test_linearizer.py
@@ -45,6 +45,38 @@ class LinearizerTestCase(unittest.TestCase):
         with (yield d2):
             pass
 
+    @defer.inlineCallbacks
+    def test_linearizer_is_queued(self):
+        linearizer = Linearizer()
+
+        key = object()
+
+        d1 = linearizer.queue(key)
+        cm1 = yield d1
+
+        # Since d1 gets called immediately, "is_queued" should return false.
+        self.assertFalse(linearizer.is_queued(key))
+
+        d2 = linearizer.queue(key)
+        self.assertFalse(d2.called)
+
+        # Now d2 is queued up behind successful completion of cm1
+        self.assertTrue(linearizer.is_queued(key))
+
+        with cm1:
+            self.assertFalse(d2.called)
+
+            # cm1 still not done, so d2 still queued.
+            self.assertTrue(linearizer.is_queued(key))
+
+        # And now d2 is called and nothing is in the queue again
+        self.assertFalse(linearizer.is_queued(key))
+
+        with (yield d2):
+            self.assertFalse(linearizer.is_queued(key))
+
+        self.assertFalse(linearizer.is_queued(key))
+
     def test_lots_of_queued_things(self):
         # we have one slow thing, and lots of fast things queued up behind it.
         # it should *not* explode the stack.
diff --git a/tox.ini b/tox.ini
index 9fefcb72b5..463a34d137 100644
--- a/tox.ini
+++ b/tox.ini
@@ -193,6 +193,7 @@ commands = mypy \
             synapse/handlers/saml_handler.py \
             synapse/handlers/sync.py \
             synapse/handlers/ui_auth \
+            synapse/http/server.py \
             synapse/http/site.py \
             synapse/logging/ \
             synapse/metrics \