summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.circleci/config.yml2
-rw-r--r--.github/workflows/tests.yml6
-rw-r--r--INSTALL.md12
-rw-r--r--README.rst46
-rw-r--r--changelog.d/10040.feature1
-rw-r--r--changelog.d/10046.doc1
-rw-r--r--changelog.d/10048.misc1
-rw-r--r--changelog.d/10054.misc1
-rw-r--r--changelog.d/10055.misc1
-rw-r--r--changelog.d/10057.doc1
-rw-r--r--changelog.d/10059.misc1
-rw-r--r--changelog.d/10063.removal1
-rw-r--r--changelog.d/10069.misc1
-rw-r--r--changelog.d/10074.misc1
-rw-r--r--changelog.d/10077.feature1
-rw-r--r--changelog.d/10078.misc1
-rw-r--r--changelog.d/10082.bugfix1
-rw-r--r--changelog.d/10084.feature1
-rw-r--r--changelog.d/10091.misc1
-rw-r--r--changelog.d/10092.bugfix1
-rw-r--r--changelog.d/9221.doc1
-rw-r--r--changelog.d/9906.misc1
-rw-r--r--changelog.d/9953.feature1
-rw-r--r--changelog.d/9973.feature1
-rw-r--r--docs/CAPTCHA_SETUP.md50
-rw-r--r--docs/admin_api/event_reports.md4
-rw-r--r--docs/admin_api/media_admin_api.md21
-rw-r--r--docs/sample_config.yaml15
-rw-r--r--docs/workers.md3
-rw-r--r--scripts-dev/convert_server_keys.py108
-rw-r--r--synapse/api/auth.py8
-rw-r--r--synapse/api/room_versions.py2
-rw-r--r--synapse/app/_base.py5
-rw-r--r--synapse/app/generic_worker.py4
-rw-r--r--synapse/config/experimental.py23
-rw-r--r--synapse/config/tls.py22
-rw-r--r--synapse/federation/transport/server.py16
-rw-r--r--synapse/handlers/federation.py12
-rw-r--r--synapse/http/servlet.py196
-rw-r--r--synapse/logging/opentracing.py31
-rw-r--r--synapse/metrics/background_process_metrics.py10
-rw-r--r--synapse/replication/slave/storage/devices.py2
-rw-r--r--synapse/rest/admin/media.py28
-rw-r--r--synapse/rest/client/v1/room.py12
-rw-r--r--synapse/rest/client/v2_alpha/report_event.py13
-rw-r--r--synapse/storage/databases/main/cache.py7
-rw-r--r--synapse/storage/databases/main/devices.py2
-rw-r--r--synapse/storage/databases/main/event_push_actions.py2
-rw-r--r--synapse/storage/databases/main/events.py8
-rw-r--r--synapse/storage/databases/main/events_worker.py61
-rw-r--r--synapse/storage/databases/main/media_repository.py7
-rw-r--r--synapse/storage/databases/main/purge_events.py26
-rw-r--r--synapse/storage/databases/main/receipts.py6
-rw-r--r--synapse/storage/databases/main/room.py2
-rw-r--r--synapse/util/batching_queue.py70
-rw-r--r--synapse/util/caches/deferred_cache.py42
-rw-r--r--synapse/util/caches/descriptors.py8
-rw-r--r--synapse/util/caches/lrucache.py18
-rw-r--r--synapse/util/caches/treecache.py3
-rwxr-xr-xsynctl12
-rw-r--r--tests/config/test_tls.py3
-rw-r--r--tests/rest/admin/test_event_reports.py15
-rw-r--r--tests/rest/admin/test_media.py99
-rw-r--r--tests/rest/client/v2_alpha/test_report_event.py83
-rw-r--r--tests/storage/databases/__init__.py13
-rw-r--r--tests/storage/databases/main/__init__.py13
-rw-r--r--tests/storage/databases/main/test_events_worker.py96
-rw-r--r--tests/util/caches/test_descriptors.py6
-rw-r--r--tests/util/test_batching_queue.py78
69 files changed, 948 insertions, 403 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 1ac48a71ba..cf1989eff9 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -41,7 +41,7 @@ workflows:
       - dockerhubuploadlatest:
           filters:
             branches:
-              only: master
+              only: [ master, main ]
 
 commands:
   docker_prepare:
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index e7f3be1b4e..2ae81b5fcf 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -34,7 +34,13 @@ jobs:
     if: ${{ github.base_ref == 'develop'  || contains(github.base_ref, 'release-') }}
     runs-on: ubuntu-latest
     steps:
+      # Note: This and the script can be simplified once we drop Buildkite. See:
+      #   https://github.com/actions/checkout/issues/266#issuecomment-638346893
+      #   https://github.com/actions/checkout/issues/416
       - uses: actions/checkout@v2
+        with:
+          ref: ${{ github.event.pull_request.head.sha }}
+          fetch-depth: 0
       - uses: actions/setup-python@v2
       - run: pip install tox
       - name: Patch Buildkite-specific test script
diff --git a/INSTALL.md b/INSTALL.md
index 7b40689234..3c498edd29 100644
--- a/INSTALL.md
+++ b/INSTALL.md
@@ -399,11 +399,9 @@ Once you have installed synapse as above, you will need to configure it.
 
 ### Using PostgreSQL
 
-By default Synapse uses [SQLite](https://sqlite.org/) and in doing so trades performance for convenience.
-SQLite is only recommended in Synapse for testing purposes or for servers with
-very light workloads.
-
-Almost all installations should opt to use [PostgreSQL](https://www.postgresql.org). Advantages include:
+By default Synapse uses an [SQLite](https://sqlite.org/) database and in doing so trades
+performance for convenience. Almost all installations should opt to use [PostgreSQL](https://www.postgresql.org)
+instead. Advantages include:
 
 - significant performance improvements due to the superior threading and
   caching model, smarter query optimiser
@@ -412,6 +410,10 @@ Almost all installations should opt to use [PostgreSQL](https://www.postgresql.o
 For information on how to install and use PostgreSQL in Synapse, please see
 [docs/postgres.md](docs/postgres.md)
 
+SQLite is only acceptable for testing purposes. SQLite should not be used in
+a production server. Synapse will perform poorly when using
+SQLite, especially when participating in large rooms.
+
 ### TLS certificates
 
 The default configuration exposes a single HTTP port on the local
diff --git a/README.rst b/README.rst
index 1a5503572e..a14a687fd1 100644
--- a/README.rst
+++ b/README.rst
@@ -149,21 +149,45 @@ For details on having Synapse manage your federation TLS certificates
 automatically, please see `<docs/ACME.md>`_.
 
 
-Security Note
+Security note
 =============
 
-Matrix serves raw user generated data in some APIs - specifically the `content
-repository endpoints <https://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid>`_.
+Matrix serves raw, user-supplied data in some APIs -- specifically the `content
+repository endpoints`_.
 
-Whilst we have tried to mitigate against possible XSS attacks (e.g.
-https://github.com/matrix-org/synapse/pull/1021) we recommend running
-matrix homeservers on a dedicated domain name, to limit any malicious user generated
-content served to web browsers a matrix API from being able to attack webapps hosted
-on the same domain.  This is particularly true of sharing a matrix webclient and
-server on the same domain.
+.. _content repository endpoints: https://matrix.org/docs/spec/client_server/latest.html#get-matrix-media-r0-download-servername-mediaid
 
-See https://github.com/vector-im/riot-web/issues/1977 and
-https://developer.github.com/changes/2014-04-25-user-content-security for more details.
+Whilst we make a reasonable effort to mitigate against XSS attacks (for
+instance, by using `CSP`_), a Matrix homeserver should not be hosted on a
+domain hosting other web applications. This especially applies to sharing
+the domain with Matrix web clients and other sensitive applications like
+webmail. See
+https://developer.github.com/changes/2014-04-25-user-content-security for more
+information.
+
+.. _CSP: https://github.com/matrix-org/synapse/pull/1021
+
+Ideally, the homeserver should not simply be on a different subdomain, but on
+a completely different `registered domain`_ (also known as top-level site or
+eTLD+1). This is because `some attacks`_ are still possible as long as the two
+applications share the same registered domain.
+
+.. _registered domain: https://tools.ietf.org/html/draft-ietf-httpbis-rfc6265bis-03#section-2.3
+
+.. _some attacks: https://en.wikipedia.org/wiki/Session_fixation#Attacks_using_cross-subdomain_cookie
+
+To illustrate this with an example, if your Element Web or other sensitive web
+application is hosted on ``A.example1.com``, you should ideally host Synapse on
+``example2.com``. Some amount of protection is offered by hosting on
+``B.example1.com`` instead, so this is also acceptable in some scenarios.
+However, you should *not* host your Synapse on ``A.example1.com``.
+
+Note that all of the above refers exclusively to the domain used in Synapse's
+``public_baseurl`` setting. In particular, it has no bearing on the domain
+mentioned in MXIDs hosted on that server.
+
+Following this advice ensures that even if an XSS is found in Synapse, the
+impact to other applications will be minimal.
 
 
 Upgrading an existing Synapse
diff --git a/changelog.d/10040.feature b/changelog.d/10040.feature
new file mode 100644
index 0000000000..ec78a30f00
--- /dev/null
+++ b/changelog.d/10040.feature
@@ -0,0 +1 @@
+Add an admin API for unprotecting local media from quarantine. Contributed by @dklimpel.
diff --git a/changelog.d/10046.doc b/changelog.d/10046.doc
new file mode 100644
index 0000000000..995960163b
--- /dev/null
+++ b/changelog.d/10046.doc
@@ -0,0 +1 @@
+Update CAPTCHA documentation to mention turning off the verify origin feature. Contributed by @aaronraimist.
diff --git a/changelog.d/10048.misc b/changelog.d/10048.misc
new file mode 100644
index 0000000000..a901f8431e
--- /dev/null
+++ b/changelog.d/10048.misc
@@ -0,0 +1 @@
+Add `parse_strings_from_args` for parsing an array from query parameters.
diff --git a/changelog.d/10054.misc b/changelog.d/10054.misc
new file mode 100644
index 0000000000..cebe39ce54
--- /dev/null
+++ b/changelog.d/10054.misc
@@ -0,0 +1 @@
+Remove some dead code regarding TLS certificate handling.
diff --git a/changelog.d/10055.misc b/changelog.d/10055.misc
new file mode 100644
index 0000000000..da84a2dde8
--- /dev/null
+++ b/changelog.d/10055.misc
@@ -0,0 +1 @@
+Remove redundant, unmaintained `convert_server_keys` script.
diff --git a/changelog.d/10057.doc b/changelog.d/10057.doc
new file mode 100644
index 0000000000..35437cb017
--- /dev/null
+++ b/changelog.d/10057.doc
@@ -0,0 +1 @@
+Tweak wording of database recommendation in `INSTALL.md`. Contributed by @aaronraimist.
\ No newline at end of file
diff --git a/changelog.d/10059.misc b/changelog.d/10059.misc
new file mode 100644
index 0000000000..ca6e0e8a5a
--- /dev/null
+++ b/changelog.d/10059.misc
@@ -0,0 +1 @@
+Improve the error message printed by synctl when synapse fails to start.
diff --git a/changelog.d/10063.removal b/changelog.d/10063.removal
new file mode 100644
index 0000000000..0f8889b6b4
--- /dev/null
+++ b/changelog.d/10063.removal
@@ -0,0 +1 @@
+Remove the experimental `spaces_enabled` flag. The spaces features are always available now.
diff --git a/changelog.d/10069.misc b/changelog.d/10069.misc
new file mode 100644
index 0000000000..a8d2629e9b
--- /dev/null
+++ b/changelog.d/10069.misc
@@ -0,0 +1 @@
+Fix GitHub Actions lint for newsfragments.
diff --git a/changelog.d/10074.misc b/changelog.d/10074.misc
new file mode 100644
index 0000000000..8dbe2cd2bc
--- /dev/null
+++ b/changelog.d/10074.misc
@@ -0,0 +1 @@
+Update opentracing to inject the right context into the carrier.
diff --git a/changelog.d/10077.feature b/changelog.d/10077.feature
new file mode 100644
index 0000000000..808feb2215
--- /dev/null
+++ b/changelog.d/10077.feature
@@ -0,0 +1 @@
+Make reason and score parameters optional for reporting content. Implements [MSC2414](https://github.com/matrix-org/matrix-doc/pull/2414). Contributed by Callum Brown.
diff --git a/changelog.d/10078.misc b/changelog.d/10078.misc
new file mode 100644
index 0000000000..a4b089d0fd
--- /dev/null
+++ b/changelog.d/10078.misc
@@ -0,0 +1 @@
+Fix up `BatchingQueue` implementation.
diff --git a/changelog.d/10082.bugfix b/changelog.d/10082.bugfix
new file mode 100644
index 0000000000..b4f8bcc4fa
--- /dev/null
+++ b/changelog.d/10082.bugfix
@@ -0,0 +1 @@
+Fixed a bug causing replication requests to fail when receiving a lot of events via federation.
diff --git a/changelog.d/10084.feature b/changelog.d/10084.feature
new file mode 100644
index 0000000000..602cb6ff51
--- /dev/null
+++ b/changelog.d/10084.feature
@@ -0,0 +1 @@
+Add support for routing more requests to workers.
diff --git a/changelog.d/10091.misc b/changelog.d/10091.misc
new file mode 100644
index 0000000000..dbe310fd17
--- /dev/null
+++ b/changelog.d/10091.misc
@@ -0,0 +1 @@
+Log method and path when dropping request due to size limit.
diff --git a/changelog.d/10092.bugfix b/changelog.d/10092.bugfix
new file mode 100644
index 0000000000..09b2aba7ff
--- /dev/null
+++ b/changelog.d/10092.bugfix
@@ -0,0 +1 @@
+Fix a bug in the `force_tracing_for_users` option introduced in Synapse v1.35 which meant that the OpenTracing spans produced were missing most tags.
diff --git a/changelog.d/9221.doc b/changelog.d/9221.doc
new file mode 100644
index 0000000000..9b3476064b
--- /dev/null
+++ b/changelog.d/9221.doc
@@ -0,0 +1 @@
+Clarify security note regarding hosting Synapse on the same domain as other web applications.
diff --git a/changelog.d/9906.misc b/changelog.d/9906.misc
new file mode 100644
index 0000000000..667d51a4c0
--- /dev/null
+++ b/changelog.d/9906.misc
@@ -0,0 +1 @@
+Tell CircleCI to build Docker images from `main` branch.
diff --git a/changelog.d/9953.feature b/changelog.d/9953.feature
new file mode 100644
index 0000000000..6b3d1adc70
--- /dev/null
+++ b/changelog.d/9953.feature
@@ -0,0 +1 @@
+Improve performance of incoming federation transactions in large rooms.
diff --git a/changelog.d/9973.feature b/changelog.d/9973.feature
new file mode 100644
index 0000000000..6b3d1adc70
--- /dev/null
+++ b/changelog.d/9973.feature
@@ -0,0 +1 @@
+Improve performance of incoming federation transactions in large rooms.
diff --git a/docs/CAPTCHA_SETUP.md b/docs/CAPTCHA_SETUP.md
index 331e5d059a..fabdd7b726 100644
--- a/docs/CAPTCHA_SETUP.md
+++ b/docs/CAPTCHA_SETUP.md
@@ -1,31 +1,37 @@
 # Overview
-Captcha can be enabled for this home server. This file explains how to do that.
-The captcha mechanism used is Google's ReCaptcha. This requires API keys from Google.
-
-## Getting keys
-
-Requires a site/secret key pair from:
-
-<https://developers.google.com/recaptcha/>
-
-Must be a reCAPTCHA v2 key using the "I'm not a robot" Checkbox option
-
-## Setting ReCaptcha Keys
-
-The keys are a config option on the home server config. If they are not
-visible, you can generate them via `--generate-config`. Set the following value:
-
+A captcha can be enabled on your homeserver to help prevent bots from registering
+accounts. Synapse currently uses Google's reCAPTCHA service which requires API keys
+from Google.
+
+## Getting API keys
+
+1. Create a new site at <https://www.google.com/recaptcha/admin/create>
+1. Set the label to anything you want
+1. Set the type to reCAPTCHA v2 using the "I'm not a robot" Checkbox option.
+This is the only type of captcha that works with Synapse.
+1. Add the public hostname for your server, as set in `public_baseurl`
+in `homeserver.yaml`, to the list of authorized domains. If you have not set
+`public_baseurl`, use `server_name`.
+1. Agree to the terms of service and submit.
+1. Copy your site key and secret key and add them to your `homeserver.yaml`
+configuration file
+    ```
     recaptcha_public_key: YOUR_SITE_KEY
     recaptcha_private_key: YOUR_SECRET_KEY
-
-In addition, you MUST enable captchas via:
-
+    ```
+1. Enable the CAPTCHA for new registrations
+    ```
     enable_registration_captcha: true
+    ```
+1. Go to the settings page for the CAPTCHA you just created
+1. Uncheck the "Verify the origin of reCAPTCHA solutions" checkbox so that the
+captcha can be displayed in any client. If you do not disable this option then you
+must specify the domains of every client that is allowed to display the CAPTCHA.
 
 ## Configuring IP used for auth
 
-The ReCaptcha API requires that the IP address of the user who solved the
-captcha is sent. If the client is connecting through a proxy or load balancer,
+The reCAPTCHA API requires that the IP address of the user who solved the
+CAPTCHA is sent. If the client is connecting through a proxy or load balancer,
 it may be required to use the `X-Forwarded-For` (XFF) header instead of the origin
 IP address. This can be configured using the `x_forwarded` directive in the
-listeners section of the homeserver.yaml configuration file.
+listeners section of the `homeserver.yaml` configuration file.
diff --git a/docs/admin_api/event_reports.md b/docs/admin_api/event_reports.md
index 0159098138..bfec06f755 100644
--- a/docs/admin_api/event_reports.md
+++ b/docs/admin_api/event_reports.md
@@ -75,9 +75,9 @@ The following fields are returned in the JSON response body:
 * `name`: string - The name of the room.
 * `event_id`: string - The ID of the reported event.
 * `user_id`: string - This is the user who reported the event and wrote the reason.
-* `reason`: string - Comment made by the `user_id` in this report. May be blank.
+* `reason`: string - Comment made by the `user_id` in this report. May be blank or `null`.
 * `score`: integer - Content is reported based upon a negative score, where -100 is
-  "most offensive" and 0 is "inoffensive".
+  "most offensive" and 0 is "inoffensive". May be `null`.
 * `sender`: string - This is the ID of the user who sent the original message/event that
   was reported.
 * `canonical_alias`: string - The canonical alias of the room. `null` if the room does not
diff --git a/docs/admin_api/media_admin_api.md b/docs/admin_api/media_admin_api.md
index 9dbec68c19..d1b7e390d5 100644
--- a/docs/admin_api/media_admin_api.md
+++ b/docs/admin_api/media_admin_api.md
@@ -7,6 +7,7 @@
   * [Quarantining media in a room](#quarantining-media-in-a-room)
   * [Quarantining all media of a user](#quarantining-all-media-of-a-user)
   * [Protecting media from being quarantined](#protecting-media-from-being-quarantined)
+  * [Unprotecting media from being quarantined](#unprotecting-media-from-being-quarantined)
 - [Delete local media](#delete-local-media)
   * [Delete a specific local media](#delete-a-specific-local-media)
   * [Delete local media by date or size](#delete-local-media-by-date-or-size)
@@ -159,6 +160,26 @@ Response:
 {}
 ```
 
+## Unprotecting media from being quarantined
+
+This API reverts the protection of a media.
+
+Request:
+
+```
+POST /_synapse/admin/v1/media/unprotect/<media_id>
+
+{}
+```
+
+Where `media_id` is in the  form of `abcdefg12345...`.
+
+Response:
+
+```json
+{}
+```
+
 # Delete local media
 This API deletes the *local* media from the disk of your own server.
 This includes any local thumbnails and copies of media downloaded from
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 6576b153d0..7b97f73a29 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -2916,18 +2916,3 @@ redis:
   # Optional password if configured on the Redis instance
   #
   #password: <secret_password>
-
-
-# Enable experimental features in Synapse.
-#
-# Experimental features might break or be removed without a deprecation
-# period.
-#
-experimental_features:
-  # Support for Spaces (MSC1772), it enables the following:
-  #
-  # * The Spaces Summary API (MSC2946).
-  # * Restricting room membership based on space membership (MSC3083).
-  #
-  # Uncomment to disable support for Spaces.
-  #spaces_enabled: false
diff --git a/docs/workers.md b/docs/workers.md
index c6282165b0..46b5e4b737 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -228,6 +228,9 @@ expressions:
     ^/_matrix/client/(api/v1|r0|unstable)/joined_groups$
     ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$
     ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/
+    ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/event/
+    ^/_matrix/client/(api/v1|r0|unstable)/joined_rooms$
+    ^/_matrix/client/(api/v1|r0|unstable)/search$
 
     # Registration/login requests
     ^/_matrix/client/(api/v1|r0|unstable)/login$
diff --git a/scripts-dev/convert_server_keys.py b/scripts-dev/convert_server_keys.py
deleted file mode 100644
index d4314a054c..0000000000
--- a/scripts-dev/convert_server_keys.py
+++ /dev/null
@@ -1,108 +0,0 @@
-import json
-import sys
-import time
-
-import psycopg2
-import yaml
-from canonicaljson import encode_canonical_json
-from signedjson.key import read_signing_keys
-from signedjson.sign import sign_json
-from unpaddedbase64 import encode_base64
-
-db_binary_type = memoryview
-
-
-def select_v1_keys(connection):
-    cursor = connection.cursor()
-    cursor.execute("SELECT server_name, key_id, verify_key FROM server_signature_keys")
-    rows = cursor.fetchall()
-    cursor.close()
-    results = {}
-    for server_name, key_id, verify_key in rows:
-        results.setdefault(server_name, {})[key_id] = encode_base64(verify_key)
-    return results
-
-
-def select_v1_certs(connection):
-    cursor = connection.cursor()
-    cursor.execute("SELECT server_name, tls_certificate FROM server_tls_certificates")
-    rows = cursor.fetchall()
-    cursor.close()
-    results = {}
-    for server_name, tls_certificate in rows:
-        results[server_name] = tls_certificate
-    return results
-
-
-def select_v2_json(connection):
-    cursor = connection.cursor()
-    cursor.execute("SELECT server_name, key_id, key_json FROM server_keys_json")
-    rows = cursor.fetchall()
-    cursor.close()
-    results = {}
-    for server_name, key_id, key_json in rows:
-        results.setdefault(server_name, {})[key_id] = json.loads(
-            str(key_json).decode("utf-8")
-        )
-    return results
-
-
-def convert_v1_to_v2(server_name, valid_until, keys, certificate):
-    return {
-        "old_verify_keys": {},
-        "server_name": server_name,
-        "verify_keys": {key_id: {"key": key} for key_id, key in keys.items()},
-        "valid_until_ts": valid_until,
-    }
-
-
-def rows_v2(server, json):
-    valid_until = json["valid_until_ts"]
-    key_json = encode_canonical_json(json)
-    for key_id in json["verify_keys"]:
-        yield (server, key_id, "-", valid_until, valid_until, db_binary_type(key_json))
-
-
-def main():
-    config = yaml.safe_load(open(sys.argv[1]))
-    valid_until = int(time.time() / (3600 * 24)) * 1000 * 3600 * 24
-
-    server_name = config["server_name"]
-    signing_key = read_signing_keys(open(config["signing_key_path"]))[0]
-
-    database = config["database"]
-    assert database["name"] == "psycopg2", "Can only convert for postgresql"
-    args = database["args"]
-    args.pop("cp_max")
-    args.pop("cp_min")
-    connection = psycopg2.connect(**args)
-    keys = select_v1_keys(connection)
-    certificates = select_v1_certs(connection)
-    json = select_v2_json(connection)
-
-    result = {}
-    for server in keys:
-        if server not in json:
-            v2_json = convert_v1_to_v2(
-                server, valid_until, keys[server], certificates[server]
-            )
-            v2_json = sign_json(v2_json, server_name, signing_key)
-            result[server] = v2_json
-
-    yaml.safe_dump(result, sys.stdout, default_flow_style=False)
-
-    rows = [row for server, json in result.items() for row in rows_v2(server, json)]
-
-    cursor = connection.cursor()
-    cursor.executemany(
-        "INSERT INTO server_keys_json ("
-        " server_name, key_id, from_server,"
-        " ts_added_ms, ts_valid_until_ms, key_json"
-        ") VALUES (%s, %s, %s, %s, %s, %s)",
-        rows,
-    )
-    connection.commit()
-
-
-if __name__ == "__main__":
-    main()
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 458306eba5..26a3b38918 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -206,11 +206,11 @@ class Auth:
                 requester = create_requester(user_id, app_service=app_service)
 
                 request.requester = user_id
+                if user_id in self._force_tracing_for_users:
+                    opentracing.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1)
                 opentracing.set_tag("authenticated_entity", user_id)
                 opentracing.set_tag("user_id", user_id)
                 opentracing.set_tag("appservice_id", app_service.id)
-                if user_id in self._force_tracing_for_users:
-                    opentracing.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1)
 
                 return requester
 
@@ -259,12 +259,12 @@ class Auth:
             )
 
             request.requester = requester
+            if user_info.token_owner in self._force_tracing_for_users:
+                opentracing.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1)
             opentracing.set_tag("authenticated_entity", user_info.token_owner)
             opentracing.set_tag("user_id", user_info.user_id)
             if device_id:
                 opentracing.set_tag("device_id", device_id)
-            if user_info.token_owner in self._force_tracing_for_users:
-                opentracing.set_tag(opentracing.tags.SAMPLING_PRIORITY, 1)
 
             return requester
         except KeyError:
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index c9f9596ada..373a4669d0 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -181,6 +181,6 @@ KNOWN_ROOM_VERSIONS = {
         RoomVersions.V5,
         RoomVersions.V6,
         RoomVersions.MSC2176,
+        RoomVersions.MSC3083,
     )
-    # Note that we do not include MSC3083 here unless it is enabled in the config.
 }  # type: Dict[str, RoomVersion]
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 59918d789e..1329af2e2b 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -261,13 +261,10 @@ def refresh_certificate(hs):
     Refresh the TLS certificates that Synapse is using by re-reading them from
     disk and updating the TLS context factories to use them.
     """
-
     if not hs.config.has_tls_listener():
-        # attempt to reload the certs for the good of the tls_fingerprints
-        hs.config.read_certificate_from_disk(require_cert_and_key=False)
         return
 
-    hs.config.read_certificate_from_disk(require_cert_and_key=True)
+    hs.config.read_certificate_from_disk()
     hs.tls_server_context_factory = context_factory.ServerContextFactory(hs.config)
 
     if hs._listening_services:
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 91ad326f19..57c2fc2e88 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -109,7 +109,7 @@ from synapse.storage.databases.main.monthly_active_users import (
     MonthlyActiveUsersWorkerStore,
 )
 from synapse.storage.databases.main.presence import PresenceStore
-from synapse.storage.databases.main.search import SearchWorkerStore
+from synapse.storage.databases.main.search import SearchStore
 from synapse.storage.databases.main.stats import StatsStore
 from synapse.storage.databases.main.transactions import TransactionWorkerStore
 from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
@@ -242,7 +242,7 @@ class GenericWorkerSlavedStore(
     MonthlyActiveUsersWorkerStore,
     MediaRepositoryStore,
     ServerMetricsStore,
-    SearchWorkerStore,
+    SearchStore,
     TransactionWorkerStore,
     BaseSlavedStore,
 ):
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index cc67377f0f..6ebce4b2f7 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -12,7 +12,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.config._base import Config
 from synapse.types import JsonDict
 
@@ -28,27 +27,5 @@ class ExperimentalConfig(Config):
         # MSC2858 (multiple SSO identity providers)
         self.msc2858_enabled = experimental.get("msc2858_enabled", False)  # type: bool
 
-        # Spaces (MSC1772, MSC2946, MSC3083, etc)
-        self.spaces_enabled = experimental.get("spaces_enabled", True)  # type: bool
-        if self.spaces_enabled:
-            KNOWN_ROOM_VERSIONS[RoomVersions.MSC3083.identifier] = RoomVersions.MSC3083
-
         # MSC3026 (busy presence state)
         self.msc3026_enabled = experimental.get("msc3026_enabled", False)  # type: bool
-
-    def generate_config_section(self, **kwargs):
-        return """\
-        # Enable experimental features in Synapse.
-        #
-        # Experimental features might break or be removed without a deprecation
-        # period.
-        #
-        experimental_features:
-          # Support for Spaces (MSC1772), it enables the following:
-          #
-          # * The Spaces Summary API (MSC2946).
-          # * Restricting room membership based on space membership (MSC3083).
-          #
-          # Uncomment to disable support for Spaces.
-          #spaces_enabled: false
-        """
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index 26f1150ca5..0e9bba53c9 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -215,28 +215,12 @@ class TlsConfig(Config):
         days_remaining = (expires_on - now).days
         return days_remaining
 
-    def read_certificate_from_disk(self, require_cert_and_key: bool):
+    def read_certificate_from_disk(self):
         """
         Read the certificates and private key from disk.
-
-        Args:
-            require_cert_and_key: set to True to throw an error if the certificate
-                and key file are not given
         """
-        if require_cert_and_key:
-            self.tls_private_key = self.read_tls_private_key()
-            self.tls_certificate = self.read_tls_certificate()
-        elif self.tls_certificate_file:
-            # we only need the certificate for the tls_fingerprints. Reload it if we
-            # can, but it's not a fatal error if we can't.
-            try:
-                self.tls_certificate = self.read_tls_certificate()
-            except Exception as e:
-                logger.info(
-                    "Unable to read TLS certificate (%s). Ignoring as no "
-                    "tls listeners enabled.",
-                    e,
-                )
+        self.tls_private_key = self.read_tls_private_key()
+        self.tls_certificate = self.read_tls_certificate()
 
     def generate_config_section(
         self,
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 40eab45549..fdeaa0f37c 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -37,6 +37,7 @@ from synapse.http.servlet import (
 )
 from synapse.logging.context import run_in_background
 from synapse.logging.opentracing import (
+    SynapseTags,
     start_active_span,
     start_active_span_from_request,
     tags,
@@ -314,7 +315,7 @@ class BaseFederationServlet:
                 raise
 
             request_tags = {
-                "request_id": request.get_request_id(),
+                SynapseTags.REQUEST_ID: request.get_request_id(),
                 tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
                 tags.HTTP_METHOD: request.get_method(),
                 tags.HTTP_URL: request.get_redacted_uri(),
@@ -1562,13 +1563,12 @@ def register_servlets(
                 server_name=hs.hostname,
             ).register(resource)
 
-        if hs.config.experimental.spaces_enabled:
-            FederationSpaceSummaryServlet(
-                handler=hs.get_space_summary_handler(),
-                authenticator=authenticator,
-                ratelimiter=ratelimiter,
-                server_name=hs.hostname,
-            ).register(resource)
+        FederationSpaceSummaryServlet(
+            handler=hs.get_space_summary_handler(),
+            authenticator=authenticator,
+            ratelimiter=ratelimiter,
+            server_name=hs.hostname,
+        ).register(resource)
 
     if "openid" in servlet_groups:
         for servletclass in OPENID_SERVLET_CLASSES:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index bf11315251..49ed7cabcc 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -577,7 +577,9 @@ class FederationHandler(BaseHandler):
 
         # Fetch the state events from the DB, and check we have the auth events.
         event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
-        auth_events_in_store = await self.store.have_seen_events(auth_event_ids)
+        auth_events_in_store = await self.store.have_seen_events(
+            room_id, auth_event_ids
+        )
 
         # Check for missing events. We handle state and auth event seperately,
         # as we want to pull the state from the DB, but we don't for the auth
@@ -610,7 +612,7 @@ class FederationHandler(BaseHandler):
 
             if missing_auth_events:
                 auth_events_in_store = await self.store.have_seen_events(
-                    missing_auth_events
+                    room_id, missing_auth_events
                 )
                 missing_auth_events.difference_update(auth_events_in_store)
 
@@ -710,7 +712,7 @@ class FederationHandler(BaseHandler):
 
         missing_auth_events = set(auth_event_ids) - fetched_events.keys()
         missing_auth_events.difference_update(
-            await self.store.have_seen_events(missing_auth_events)
+            await self.store.have_seen_events(room_id, missing_auth_events)
         )
         logger.debug("We are also missing %i auth events", len(missing_auth_events))
 
@@ -2475,7 +2477,7 @@ class FederationHandler(BaseHandler):
         #
         # we start by checking if they are in the store, and then try calling /event_auth/.
         if missing_auth:
-            have_events = await self.store.have_seen_events(missing_auth)
+            have_events = await self.store.have_seen_events(event.room_id, missing_auth)
             logger.debug("Events %s are in the store", have_events)
             missing_auth.difference_update(have_events)
 
@@ -2494,7 +2496,7 @@ class FederationHandler(BaseHandler):
                     return context
 
                 seen_remotes = await self.store.have_seen_events(
-                    [e.event_id for e in remote_auth_chain]
+                    event.room_id, [e.event_id for e in remote_auth_chain]
                 )
 
                 for e in remote_auth_chain:
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 31897546a9..3f4f2411fc 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -15,6 +15,9 @@
 """ This module contains base REST classes for constructing REST servlets. """
 
 import logging
+from typing import Iterable, List, Optional, Union, overload
+
+from typing_extensions import Literal
 
 from synapse.api.errors import Codes, SynapseError
 from synapse.util import json_decoder
@@ -107,12 +110,11 @@ def parse_boolean_from_args(args, name, default=None, required=False):
 
 def parse_string(
     request,
-    name,
-    default=None,
-    required=False,
-    allowed_values=None,
-    param_type="string",
-    encoding="ascii",
+    name: Union[bytes, str],
+    default: Optional[str] = None,
+    required: bool = False,
+    allowed_values: Optional[Iterable[str]] = None,
+    encoding: Optional[str] = "ascii",
 ):
     """
     Parse a string parameter from the request query string.
@@ -122,18 +124,17 @@ def parse_string(
 
     Args:
         request: the twisted HTTP request.
-        name (bytes|unicode): the name of the query parameter.
-        default (bytes|unicode|None): value to use if the parameter is absent,
+        name: the name of the query parameter.
+        default: value to use if the parameter is absent,
             defaults to None. Must be bytes if encoding is None.
-        required (bool): whether to raise a 400 SynapseError if the
+        required: whether to raise a 400 SynapseError if the
             parameter is absent, defaults to False.
-        allowed_values (list[bytes|unicode]): List of allowed values for the
+        allowed_values: List of allowed values for the
             string, or None if any value is allowed, defaults to None. Must be
             the same type as name, if given.
-        encoding (str|None): The encoding to decode the string content with.
-
+        encoding : The encoding to decode the string content with.
     Returns:
-        bytes/unicode|None: A string value or the default. Unicode if encoding
+        A string value or the default. Unicode if encoding
         was given, bytes otherwise.
 
     Raises:
@@ -142,45 +143,105 @@ def parse_string(
             is not one of those allowed values.
     """
     return parse_string_from_args(
-        request.args, name, default, required, allowed_values, param_type, encoding
+        request.args, name, default, required, allowed_values, encoding
     )
 
 
-def parse_string_from_args(
-    args,
-    name,
-    default=None,
-    required=False,
-    allowed_values=None,
-    param_type="string",
-    encoding="ascii",
-):
+def _parse_string_value(
+    value: Union[str, bytes],
+    allowed_values: Optional[Iterable[str]],
+    name: str,
+    encoding: Optional[str],
+) -> Union[str, bytes]:
+    if encoding:
+        try:
+            value = value.decode(encoding)
+        except ValueError:
+            raise SynapseError(400, "Query parameter %r must be %s" % (name, encoding))
+
+    if allowed_values is not None and value not in allowed_values:
+        message = "Query parameter %r must be one of [%s]" % (
+            name,
+            ", ".join(repr(v) for v in allowed_values),
+        )
+        raise SynapseError(400, message)
+    else:
+        return value
+
+
+@overload
+def parse_strings_from_args(
+    args: List[str],
+    name: Union[bytes, str],
+    default: Optional[List[str]] = None,
+    required: bool = False,
+    allowed_values: Optional[Iterable[str]] = None,
+    encoding: Literal[None] = None,
+) -> Optional[List[bytes]]:
+    ...
+
+
+@overload
+def parse_strings_from_args(
+    args: List[str],
+    name: Union[bytes, str],
+    default: Optional[List[str]] = None,
+    required: bool = False,
+    allowed_values: Optional[Iterable[str]] = None,
+    encoding: str = "ascii",
+) -> Optional[List[str]]:
+    ...
+
+
+def parse_strings_from_args(
+    args: List[str],
+    name: Union[bytes, str],
+    default: Optional[List[str]] = None,
+    required: bool = False,
+    allowed_values: Optional[Iterable[str]] = None,
+    encoding: Optional[str] = "ascii",
+) -> Optional[List[Union[bytes, str]]]:
+    """
+    Parse a string parameter from the request query string list.
+
+    If encoding is not None, the content of the query param will be
+    decoded to Unicode using the encoding, otherwise it will be encoded
+
+    Args:
+        args: the twisted HTTP request.args list.
+        name: the name of the query parameter.
+        default: value to use if the parameter is absent,
+            defaults to None. Must be bytes if encoding is None.
+        required : whether to raise a 400 SynapseError if the
+            parameter is absent, defaults to False.
+        allowed_values (list[bytes|unicode]): List of allowed values for the
+            string, or None if any value is allowed, defaults to None. Must be
+            the same type as name, if given.
+        encoding: The encoding to decode the string content with.
+
+    Returns:
+        A string value or the default. Unicode if encoding
+        was given, bytes otherwise.
+
+    Raises:
+        SynapseError if the parameter is absent and required, or if the
+            parameter is present, must be one of a list of allowed values and
+            is not one of those allowed values.
+    """
 
     if not isinstance(name, bytes):
         name = name.encode("ascii")
 
     if name in args:
-        value = args[name][0]
-
-        if encoding:
-            try:
-                value = value.decode(encoding)
-            except ValueError:
-                raise SynapseError(
-                    400, "Query parameter %r must be %s" % (name, encoding)
-                )
-
-        if allowed_values is not None and value not in allowed_values:
-            message = "Query parameter %r must be one of [%s]" % (
-                name,
-                ", ".join(repr(v) for v in allowed_values),
-            )
-            raise SynapseError(400, message)
-        else:
-            return value
+        values = args[name]
+
+        return [
+            _parse_string_value(value, allowed_values, name=name, encoding=encoding)
+            for value in values
+        ]
     else:
         if required:
-            message = "Missing %s query parameter %r" % (param_type, name)
+            message = "Missing string query parameter %r" % (name)
             raise SynapseError(400, message, errcode=Codes.MISSING_PARAM)
         else:
 
@@ -190,6 +251,55 @@ def parse_string_from_args(
             return default
 
 
+def parse_string_from_args(
+    args: List[str],
+    name: Union[bytes, str],
+    default: Optional[str] = None,
+    required: bool = False,
+    allowed_values: Optional[Iterable[str]] = None,
+    encoding: Optional[str] = "ascii",
+) -> Optional[Union[bytes, str]]:
+    """
+    Parse the string parameter from the request query string list
+    and return the first result.
+
+    If encoding is not None, the content of the query param will be
+    decoded to Unicode using the encoding, otherwise it will be encoded
+
+    Args:
+        args: the twisted HTTP request.args list.
+        name: the name of the query parameter.
+        default: value to use if the parameter is absent,
+            defaults to None. Must be bytes if encoding is None.
+        required: whether to raise a 400 SynapseError if the
+            parameter is absent, defaults to False.
+        allowed_values: List of allowed values for the
+            string, or None if any value is allowed, defaults to None. Must be
+            the same type as name, if given.
+        encoding: The encoding to decode the string content with.
+
+    Returns:
+        A string value or the default. Unicode if encoding
+        was given, bytes otherwise.
+
+    Raises:
+        SynapseError if the parameter is absent and required, or if the
+            parameter is present, must be one of a list of allowed values and
+            is not one of those allowed values.
+    """
+
+    strings = parse_strings_from_args(
+        args,
+        name,
+        default=[default],
+        required=required,
+        allowed_values=allowed_values,
+        encoding=encoding,
+    )
+
+    return strings[0]
+
+
 def parse_json_value_from_request(request, allow_empty_body=False):
     """Parse a JSON value from the body of a twisted HTTP request.
 
@@ -215,7 +325,7 @@ def parse_json_value_from_request(request, allow_empty_body=False):
     try:
         content = json_decoder.decode(content_bytes.decode("utf-8"))
     except Exception as e:
-        logger.warning("Unable to parse JSON: %s", e)
+        logger.warning("Unable to parse JSON: %s (%s)", e, content_bytes)
         raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
 
     return content
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index fba2fa3904..f64845b80c 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -265,6 +265,12 @@ class SynapseTags:
     # Whether the sync response has new data to be returned to the client.
     SYNC_RESULT = "sync.new_data"
 
+    # incoming HTTP request ID  (as written in the logs)
+    REQUEST_ID = "request_id"
+
+    # HTTP request tag (used to distinguish full vs incremental syncs, etc)
+    REQUEST_TAG = "request_tag"
+
 
 # Block everything by default
 # A regex which matches the server_names to expose traces for.
@@ -588,7 +594,7 @@ def inject_active_span_twisted_headers(headers, destination, check_destination=T
 
     span = opentracing.tracer.active_span
     carrier = {}  # type: Dict[str, str]
-    opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier)
+    opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier)
 
     for key, value in carrier.items():
         headers.addRawHeaders(key, value)
@@ -625,7 +631,7 @@ def inject_active_span_byte_dict(headers, destination, check_destination=True):
     span = opentracing.tracer.active_span
 
     carrier = {}  # type: Dict[str, str]
-    opentracing.tracer.inject(span, opentracing.Format.HTTP_HEADERS, carrier)
+    opentracing.tracer.inject(span.context, opentracing.Format.HTTP_HEADERS, carrier)
 
     for key, value in carrier.items():
         headers[key.encode()] = [value.encode()]
@@ -659,7 +665,7 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
         return
 
     opentracing.tracer.inject(
-        opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
+        opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier
     )
 
 
@@ -681,7 +687,7 @@ def get_active_span_text_map(destination=None):
 
     carrier = {}  # type: Dict[str, str]
     opentracing.tracer.inject(
-        opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
+        opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier
     )
 
     return carrier
@@ -696,7 +702,7 @@ def active_span_context_as_string():
     carrier = {}  # type: Dict[str, str]
     if opentracing:
         opentracing.tracer.inject(
-            opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
+            opentracing.tracer.active_span.context, opentracing.Format.TEXT_MAP, carrier
         )
     return json_encoder.encode(carrier)
 
@@ -824,7 +830,7 @@ def trace_servlet(request: "SynapseRequest", extract_context: bool = False):
         return
 
     request_tags = {
-        "request_id": request.get_request_id(),
+        SynapseTags.REQUEST_ID: request.get_request_id(),
         tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
         tags.HTTP_METHOD: request.get_method(),
         tags.HTTP_URL: request.get_redacted_uri(),
@@ -833,9 +839,9 @@ def trace_servlet(request: "SynapseRequest", extract_context: bool = False):
 
     request_name = request.request_metrics.name
     if extract_context:
-        scope = start_active_span_from_request(request, request_name, tags=request_tags)
+        scope = start_active_span_from_request(request, request_name)
     else:
-        scope = start_active_span(request_name, tags=request_tags)
+        scope = start_active_span(request_name)
 
     with scope:
         try:
@@ -845,4 +851,11 @@ def trace_servlet(request: "SynapseRequest", extract_context: bool = False):
             # with JsonResource).
             scope.span.set_operation_name(request.request_metrics.name)
 
-            scope.span.set_tag("request_tag", request.request_metrics.start_context.tag)
+            # set the tags *after* the servlet completes, in case it decided to
+            # prioritise the span (tags will get dropped on unprioritised spans)
+            request_tags[
+                SynapseTags.REQUEST_TAG
+            ] = request.request_metrics.start_context.tag
+
+            for k, v in request_tags.items():
+                scope.span.set_tag(k, v)
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 714caf84c3..0d6d643d35 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -22,7 +22,11 @@ from prometheus_client.core import REGISTRY, Counter, Gauge
 from twisted.internet import defer
 
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
-from synapse.logging.opentracing import noop_context_manager, start_active_span
+from synapse.logging.opentracing import (
+    SynapseTags,
+    noop_context_manager,
+    start_active_span,
+)
 from synapse.util.async_helpers import maybe_awaitable
 
 if TYPE_CHECKING:
@@ -202,7 +206,9 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar
             try:
                 ctx = noop_context_manager()
                 if bg_start_span:
-                    ctx = start_active_span(desc, tags={"request_id": str(context)})
+                    ctx = start_active_span(
+                        desc, tags={SynapseTags.REQUEST_ID: str(context)}
+                    )
                 with ctx:
                     return await maybe_awaitable(func(*args, **kwargs))
             except Exception:
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index 70207420a6..26bdead565 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -68,7 +68,7 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
             if row.entity.startswith("@"):
                 self._device_list_stream_cache.entity_has_changed(row.entity, token)
                 self.get_cached_devices_for_user.invalidate((row.entity,))
-                self._get_cached_user_device.invalidate_many((row.entity,))
+                self._get_cached_user_device.invalidate((row.entity,))
                 self.get_device_list_last_stream_id_for_remote.invalidate((row.entity,))
 
             else:
diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py
index 24dd46113a..2c71af4279 100644
--- a/synapse/rest/admin/media.py
+++ b/synapse/rest/admin/media.py
@@ -137,8 +137,31 @@ class ProtectMediaByID(RestServlet):
 
         logging.info("Protecting local media by ID: %s", media_id)
 
-        # Quarantine this media id
-        await self.store.mark_local_media_as_safe(media_id)
+        # Protect this media id
+        await self.store.mark_local_media_as_safe(media_id, safe=True)
+
+        return 200, {}
+
+
+class UnprotectMediaByID(RestServlet):
+    """Unprotect local media from being quarantined."""
+
+    PATTERNS = admin_patterns("/media/unprotect/(?P<media_id>[^/]+)")
+
+    def __init__(self, hs: "HomeServer"):
+        self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+
+    async def on_POST(
+        self, request: SynapseRequest, media_id: str
+    ) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
+
+        logging.info("Unprotecting local media by ID: %s", media_id)
+
+        # Unprotect this media id
+        await self.store.mark_local_media_as_safe(media_id, safe=False)
 
         return 200, {}
 
@@ -269,6 +292,7 @@ def register_servlets_for_media_repo(hs: "HomeServer", http_server):
     QuarantineMediaByID(hs).register(http_server)
     QuarantineMediaByUser(hs).register(http_server)
     ProtectMediaByID(hs).register(http_server)
+    UnprotectMediaByID(hs).register(http_server)
     ListMediaInRoom(hs).register(http_server)
     DeleteMediaByID(hs).register(http_server)
     DeleteMediaByDateSize(hs).register(http_server)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 51813cccbe..70286b0ff7 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -1060,18 +1060,16 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False):
     RoomRedactEventRestServlet(hs).register(http_server)
     RoomTypingRestServlet(hs).register(http_server)
     RoomEventContextServlet(hs).register(http_server)
-
-    if hs.config.experimental.spaces_enabled:
-        RoomSpaceSummaryRestServlet(hs).register(http_server)
+    RoomSpaceSummaryRestServlet(hs).register(http_server)
+    RoomEventServlet(hs).register(http_server)
+    JoinedRoomsRestServlet(hs).register(http_server)
+    RoomAliasListServlet(hs).register(http_server)
+    SearchRestServlet(hs).register(http_server)
 
     # Some servlets only get registered for the main process.
     if not is_worker:
         RoomCreateRestServlet(hs).register(http_server)
         RoomForgetRestServlet(hs).register(http_server)
-        SearchRestServlet(hs).register(http_server)
-        JoinedRoomsRestServlet(hs).register(http_server)
-        RoomEventServlet(hs).register(http_server)
-        RoomAliasListServlet(hs).register(http_server)
 
 
 def register_deprecated_servlets(hs, http_server):
diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py
index 2c169abbf3..07ea39a8a3 100644
--- a/synapse/rest/client/v2_alpha/report_event.py
+++ b/synapse/rest/client/v2_alpha/report_event.py
@@ -16,11 +16,7 @@ import logging
 from http import HTTPStatus
 
 from synapse.api.errors import Codes, SynapseError
-from synapse.http.servlet import (
-    RestServlet,
-    assert_params_in_dict,
-    parse_json_object_from_request,
-)
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
 
 from ._base import client_patterns
 
@@ -42,15 +38,14 @@ class ReportEventRestServlet(RestServlet):
         user_id = requester.user.to_string()
 
         body = parse_json_object_from_request(request)
-        assert_params_in_dict(body, ("reason", "score"))
 
-        if not isinstance(body["reason"], str):
+        if not isinstance(body.get("reason", ""), str):
             raise SynapseError(
                 HTTPStatus.BAD_REQUEST,
                 "Param 'reason' must be a string",
                 Codes.BAD_JSON,
             )
-        if not isinstance(body["score"], int):
+        if not isinstance(body.get("score", 0), int):
             raise SynapseError(
                 HTTPStatus.BAD_REQUEST,
                 "Param 'score' must be an integer",
@@ -61,7 +56,7 @@ class ReportEventRestServlet(RestServlet):
             room_id=room_id,
             event_id=event_id,
             user_id=user_id,
-            reason=body["reason"],
+            reason=body.get("reason"),
             content=body,
             received_ts=self.clock.time_msec(),
         )
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index ecc1f935e2..c57ae5ef15 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -168,10 +168,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         backfilled,
     ):
         self._invalidate_get_event_cache(event_id)
+        self.have_seen_event.invalidate((room_id, event_id))
 
         self.get_latest_event_ids_in_room.invalidate((room_id,))
 
-        self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
+        self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))
 
         if not backfilled:
             self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
@@ -184,8 +185,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             self.get_invited_rooms_for_local_user.invalidate((state_key,))
 
         if relates_to:
-            self.get_relations_for_event.invalidate_many((relates_to,))
-            self.get_aggregation_groups_for_event.invalidate_many((relates_to,))
+            self.get_relations_for_event.invalidate((relates_to,))
+            self.get_aggregation_groups_for_event.invalidate((relates_to,))
             self.get_applicable_edit.invalidate((relates_to,))
 
     async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index fd87ba71ab..18f07d96dc 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1282,7 +1282,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         )
 
         txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
-        txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
+        txn.call_after(self._get_cached_user_device.invalidate, (user_id,))
         txn.call_after(
             self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
         )
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 5845322118..d1237c65cc 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -860,7 +860,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                                   not be deleted.
         """
         txn.call_after(
-            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+            self.get_unread_event_push_actions_by_room_for_user.invalidate,
             (room_id, user_id),
         )
 
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index fd25c8112d..897fa06639 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1748,9 +1748,9 @@ class PersistEventsStore:
             },
         )
 
-        txn.call_after(self.store.get_relations_for_event.invalidate_many, (parent_id,))
+        txn.call_after(self.store.get_relations_for_event.invalidate, (parent_id,))
         txn.call_after(
-            self.store.get_aggregation_groups_for_event.invalidate_many, (parent_id,)
+            self.store.get_aggregation_groups_for_event.invalidate, (parent_id,)
         )
 
         if rel_type == RelationTypes.REPLACE:
@@ -1903,7 +1903,7 @@ class PersistEventsStore:
 
                 for user_id in user_ids:
                     txn.call_after(
-                        self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+                        self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
                         (room_id, user_id),
                     )
 
@@ -1917,7 +1917,7 @@ class PersistEventsStore:
     def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
         # Sad that we have to blow away the cache for the whole room here
         txn.call_after(
-            self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+            self.store.get_unread_event_push_actions_by_room_for_user.invalidate,
             (room_id,),
         )
         txn.execute(
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 6963bbf7f4..403a5ddaba 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -22,6 +22,7 @@ from typing import (
     Iterable,
     List,
     Optional,
+    Set,
     Tuple,
     overload,
 )
@@ -55,7 +56,7 @@ from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import JsonDict, get_domain_from_id
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
@@ -1045,32 +1046,74 @@ class EventsWorkerStore(SQLBaseStore):
 
         return {r["event_id"] for r in rows}
 
-    async def have_seen_events(self, event_ids):
+    async def have_seen_events(
+        self, room_id: str, event_ids: Iterable[str]
+    ) -> Set[str]:
         """Given a list of event ids, check if we have already processed them.
 
+        The room_id is only used to structure the cache (so that it can later be
+        invalidated by room_id) - there is no guarantee that the events are actually
+        in the room in question.
+
         Args:
-            event_ids (iterable[str]):
+            room_id: Room we are polling
+            event_ids: events we are looking for
 
         Returns:
             set[str]: The events we have already seen.
         """
+        res = await self._have_seen_events_dict(
+            (room_id, event_id) for event_id in event_ids
+        )
+        return {eid for ((_rid, eid), have_event) in res.items() if have_event}
+
+    @cachedList("have_seen_event", "keys")
+    async def _have_seen_events_dict(
+        self, keys: Iterable[Tuple[str, str]]
+    ) -> Dict[Tuple[str, str], bool]:
+        """Helper for have_seen_events
+
+        Returns:
+             a dict {(room_id, event_id)-> bool}
+        """
         # if the event cache contains the event, obviously we've seen it.
-        results = {x for x in event_ids if self._get_event_cache.contains(x)}
 
-        def have_seen_events_txn(txn, chunk):
-            sql = "SELECT event_id FROM events as e WHERE "
+        cache_results = {
+            (rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,))
+        }
+        results = {x: True for x in cache_results}
+
+        def have_seen_events_txn(txn, chunk: Tuple[Tuple[str, str], ...]):
+            # we deliberately do *not* query the database for room_id, to make the
+            # query an index-only lookup on `events_event_id_key`.
+            #
+            # We therefore pull the events from the database into a set...
+
+            sql = "SELECT event_id FROM events AS e WHERE "
             clause, args = make_in_list_sql_clause(
-                txn.database_engine, "e.event_id", chunk
+                txn.database_engine, "e.event_id", [eid for (_rid, eid) in chunk]
             )
             txn.execute(sql + clause, args)
-            results.update(row[0] for row in txn)
+            found_events = {eid for eid, in txn}
 
-        for chunk in batch_iter((x for x in event_ids if x not in results), 100):
+            # ... and then we can update the results for each row in the batch
+            results.update({(rid, eid): (eid in found_events) for (rid, eid) in chunk})
+
+        # each batch requires its own index scan, so we make the batches as big as
+        # possible.
+        for chunk in batch_iter((k for k in keys if k not in cache_results), 500):
             await self.db_pool.runInteraction(
                 "have_seen_events", have_seen_events_txn, chunk
             )
+
         return results
 
+    @cached(max_entries=100000, tree=True)
+    async def have_seen_event(self, room_id: str, event_id: str):
+        # this only exists for the benefit of the @cachedList descriptor on
+        # _have_seen_events_dict
+        raise NotImplementedError()
+
     def _get_current_state_event_counts_txn(self, txn, room_id):
         """
         See get_current_state_event_counts.
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index c584868188..2fa945d171 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -143,6 +143,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "created_ts",
                 "quarantined_by",
                 "url_cache",
+                "safe_from_quarantine",
             ),
             allow_none=True,
             desc="get_local_media",
@@ -296,12 +297,12 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="store_local_media",
         )
 
-    async def mark_local_media_as_safe(self, media_id: str) -> None:
-        """Mark a local media as safe from quarantining."""
+    async def mark_local_media_as_safe(self, media_id: str, safe: bool = True) -> None:
+        """Mark a local media as safe or unsafe from quarantining."""
         await self.db_pool.simple_update_one(
             table="local_media_repository",
             keyvalues={"media_id": media_id},
-            updatevalues={"safe_from_quarantine": True},
+            updatevalues={"safe_from_quarantine": safe},
             desc="mark_local_media_as_safe",
         )
 
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 8f83748b5e..7fb7780d0f 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -16,14 +16,14 @@ import logging
 from typing import Any, List, Set, Tuple
 
 from synapse.api.errors import SynapseError
-from synapse.storage._base import SQLBaseStore
+from synapse.storage.databases.main import CacheInvalidationWorkerStore
 from synapse.storage.databases.main.state import StateGroupWorkerStore
 from synapse.types import RoomStreamToken
 
 logger = logging.getLogger(__name__)
 
 
-class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
+class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
     async def purge_history(
         self, room_id: str, token: str, delete_local_events: bool
     ) -> Set[int]:
@@ -203,8 +203,6 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
             "DELETE FROM event_to_state_groups "
             "WHERE event_id IN (SELECT event_id from events_to_purge)"
         )
-        for event_id, _ in event_rows:
-            txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
 
         # Delete all remote non-state events
         for table in (
@@ -283,6 +281,20 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
         # so make sure to keep this actually last.
         txn.execute("DROP TABLE events_to_purge")
 
+        for event_id, should_delete in event_rows:
+            self._invalidate_cache_and_stream(
+                txn, self._get_state_group_for_event, (event_id,)
+            )
+
+            # XXX: This is racy, since have_seen_events could be called between the
+            #    transaction completing and the invalidation running. On the other hand,
+            #    that's no different to calling `have_seen_events` just before the
+            #    event is deleted from the database.
+            if should_delete:
+                self._invalidate_cache_and_stream(
+                    txn, self.have_seen_event, (room_id, event_id)
+                )
+
         logger.info("[purge] done")
 
         return referenced_state_groups
@@ -422,7 +434,11 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
         #       index on them. In any case we should be clearing out 'stream' tables
         #       periodically anyway (#5888)
 
-        # TODO: we could probably usefully do a bunch of cache invalidation here
+        # TODO: we could probably usefully do a bunch more cache invalidation here
+
+        # XXX: as with purge_history, this is racy, but no worse than other races
+        #   that already exist.
+        self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
 
         logger.info("[purge] done")
 
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3647276acb..edeaacd7a6 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -460,7 +460,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
     def invalidate_caches_for_receipt(self, room_id, receipt_type, user_id):
         self.get_receipts_for_user.invalidate((user_id, receipt_type))
-        self._get_linearized_receipts_for_room.invalidate_many((room_id,))
+        self._get_linearized_receipts_for_room.invalidate((room_id,))
         self.get_last_receipt_event_id_for_user.invalidate(
             (user_id, room_id, receipt_type)
         )
@@ -659,9 +659,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
         )
         txn.call_after(self.get_receipts_for_user.invalidate, (user_id, receipt_type))
         # FIXME: This shouldn't invalidate the whole cache
-        txn.call_after(
-            self._get_linearized_receipts_for_room.invalidate_many, (room_id,)
-        )
+        txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
 
         self.db_pool.simple_delete_txn(
             txn,
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 5f38634f48..0cf450f81d 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1498,7 +1498,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
         room_id: str,
         event_id: str,
         user_id: str,
-        reason: str,
+        reason: Optional[str],
         content: JsonDict,
         received_ts: int,
     ) -> None:
diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py
index 44bbb7b1a8..8fd5bfb69b 100644
--- a/synapse/util/batching_queue.py
+++ b/synapse/util/batching_queue.py
@@ -25,10 +25,11 @@ from typing import (
     TypeVar,
 )
 
+from prometheus_client import Gauge
+
 from twisted.internet import defer
 
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
-from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util import Clock
 
@@ -38,6 +39,24 @@ logger = logging.getLogger(__name__)
 V = TypeVar("V")
 R = TypeVar("R")
 
+number_queued = Gauge(
+    "synapse_util_batching_queue_number_queued",
+    "The number of items waiting in the queue across all keys",
+    labelnames=("name",),
+)
+
+number_in_flight = Gauge(
+    "synapse_util_batching_queue_number_pending",
+    "The number of items across all keys either being processed or waiting in a queue",
+    labelnames=("name",),
+)
+
+number_of_keys = Gauge(
+    "synapse_util_batching_queue_number_of_keys",
+    "The number of distinct keys that have items queued",
+    labelnames=("name",),
+)
+
 
 class BatchingQueue(Generic[V, R]):
     """A queue that batches up work, calling the provided processing function
@@ -48,10 +67,20 @@ class BatchingQueue(Generic[V, R]):
     called, and will keep being called until the queue has been drained (for the
     given key).
 
+    If the processing function raises an exception then the exception is proxied
+    through to the callers waiting on that batch of work.
+
     Note that the return value of `add_to_queue` will be the return value of the
     processing function that processed the given item. This means that the
     returned value will likely include data for other items that were in the
     batch.
+
+    Args:
+        name: A name for the queue, used for logging contexts and metrics.
+            This must be unique, otherwise the metrics will be wrong.
+        clock: The clock to use to schedule work.
+        process_batch_callback: The callback to to be run to process a batch of
+            work.
     """
 
     def __init__(
@@ -73,19 +102,15 @@ class BatchingQueue(Generic[V, R]):
         # The function to call with batches of values.
         self._process_batch_callback = process_batch_callback
 
-        LaterGauge(
-            "synapse_util_batching_queue_number_queued",
-            "The number of items waiting in the queue across all keys",
-            labels=("name",),
-            caller=lambda: sum(len(v) for v in self._next_values.values()),
+        number_queued.labels(self._name).set_function(
+            lambda: sum(len(q) for q in self._next_values.values())
         )
 
-        LaterGauge(
-            "synapse_util_batching_queue_number_of_keys",
-            "The number of distinct keys that have items queued",
-            labels=("name",),
-            caller=lambda: len(self._next_values),
-        )
+        number_of_keys.labels(self._name).set_function(lambda: len(self._next_values))
+
+        self._number_in_flight_metric = number_in_flight.labels(
+            self._name
+        )  # type: Gauge
 
     async def add_to_queue(self, value: V, key: Hashable = ()) -> R:
         """Adds the value to the queue with the given key, returning the result
@@ -107,17 +132,18 @@ class BatchingQueue(Generic[V, R]):
         if key not in self._processing_keys:
             run_as_background_process(self._name, self._process_queue, key)
 
-        return await make_deferred_yieldable(d)
+        with self._number_in_flight_metric.track_inprogress():
+            return await make_deferred_yieldable(d)
 
     async def _process_queue(self, key: Hashable) -> None:
         """A background task to repeatedly pull things off the queue for the
         given key and call the `self._process_batch_callback` with the values.
         """
 
-        try:
-            if key in self._processing_keys:
-                return
+        if key in self._processing_keys:
+            return
 
+        try:
             self._processing_keys.add(key)
 
             while True:
@@ -137,16 +163,16 @@ class BatchingQueue(Generic[V, R]):
                     values = [value for value, _ in next_values]
                     results = await self._process_batch_callback(values)
 
-                    for _, deferred in next_values:
-                        with PreserveLoggingContext():
+                    with PreserveLoggingContext():
+                        for _, deferred in next_values:
                             deferred.callback(results)
 
                 except Exception as e:
-                    for _, deferred in next_values:
-                        if deferred.called:
-                            continue
+                    with PreserveLoggingContext():
+                        for _, deferred in next_values:
+                            if deferred.called:
+                                continue
 
-                        with PreserveLoggingContext():
                             deferred.errback(e)
 
         finally:
diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py
index 371e7e4dd0..1044139119 100644
--- a/synapse/util/caches/deferred_cache.py
+++ b/synapse/util/caches/deferred_cache.py
@@ -16,16 +16,7 @@
 
 import enum
 import threading
-from typing import (
-    Callable,
-    Generic,
-    Iterable,
-    MutableMapping,
-    Optional,
-    TypeVar,
-    Union,
-    cast,
-)
+from typing import Callable, Generic, Iterable, MutableMapping, Optional, TypeVar, Union
 
 from prometheus_client import Gauge
 
@@ -91,7 +82,7 @@ class DeferredCache(Generic[KT, VT]):
         # _pending_deferred_cache maps from the key value to a `CacheEntry` object.
         self._pending_deferred_cache = (
             cache_type()
-        )  # type: MutableMapping[KT, CacheEntry]
+        )  # type: Union[TreeCache, MutableMapping[KT, CacheEntry]]
 
         def metrics_cb():
             cache_pending_metric.labels(name).set(len(self._pending_deferred_cache))
@@ -287,8 +278,17 @@ class DeferredCache(Generic[KT, VT]):
         self.cache.set(key, value, callbacks=callbacks)
 
     def invalidate(self, key):
+        """Delete a key, or tree of entries
+
+        If the cache is backed by a regular dict, then "key" must be of
+        the right type for this cache
+
+        If the cache is backed by a TreeCache, then "key" must be a tuple, but
+        may be of lower cardinality than the TreeCache - in which case the whole
+        subtree is deleted.
+        """
         self.check_thread()
-        self.cache.pop(key, None)
+        self.cache.del_multi(key)
 
         # if we have a pending lookup for this key, remove it from the
         # _pending_deferred_cache, which will (a) stop it being returned
@@ -299,20 +299,10 @@ class DeferredCache(Generic[KT, VT]):
         # run the invalidation callbacks now, rather than waiting for the
         # deferred to resolve.
         if entry:
-            entry.invalidate()
-
-    def invalidate_many(self, key: KT):
-        self.check_thread()
-        if not isinstance(key, tuple):
-            raise TypeError("The cache key must be a tuple not %r" % (type(key),))
-        key = cast(KT, key)
-        self.cache.del_multi(key)
-
-        # if we have a pending lookup for this key, remove it from the
-        # _pending_deferred_cache, as above
-        entry_dict = self._pending_deferred_cache.pop(key, None)
-        if entry_dict is not None:
-            for entry in iterate_tree_cache_entry(entry_dict):
+            # _pending_deferred_cache.pop should either return a CacheEntry, or, in the
+            # case of a TreeCache, a dict of keys to cache entries. Either way calling
+            # iterate_tree_cache_entry on it will do the right thing.
+            for entry in iterate_tree_cache_entry(entry):
                 entry.invalidate()
 
     def invalidate_all(self):
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 2ac24a2f25..d77e8edeea 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -48,7 +48,6 @@ F = TypeVar("F", bound=Callable[..., Any])
 class _CachedFunction(Generic[F]):
     invalidate = None  # type: Any
     invalidate_all = None  # type: Any
-    invalidate_many = None  # type: Any
     prefill = None  # type: Any
     cache = None  # type: Any
     num_args = None  # type: Any
@@ -262,6 +261,11 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
     ):
         super().__init__(orig, num_args=num_args, cache_context=cache_context)
 
+        if tree and self.num_args < 2:
+            raise RuntimeError(
+                "tree=True is nonsensical for cached functions with a single parameter"
+            )
+
         self.max_entries = max_entries
         self.tree = tree
         self.iterable = iterable
@@ -302,11 +306,11 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
         wrapped = cast(_CachedFunction, _wrapped)
 
         if self.num_args == 1:
+            assert not self.tree
             wrapped.invalidate = lambda key: cache.invalidate(key[0])
             wrapped.prefill = lambda key, val: cache.prefill(key[0], val)
         else:
             wrapped.invalidate = cache.invalidate
-            wrapped.invalidate_many = cache.invalidate_many
             wrapped.prefill = cache.prefill
 
         wrapped.invalidate_all = cache.invalidate_all
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 54df407ff7..d89e9d9b1d 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -152,7 +152,6 @@ class LruCache(Generic[KT, VT]):
     """
     Least-recently-used cache, supporting prometheus metrics and invalidation callbacks.
 
-    Supports del_multi only if cache_type=TreeCache
     If cache_type=TreeCache, all keys must be tuples.
     """
 
@@ -393,10 +392,16 @@ class LruCache(Generic[KT, VT]):
 
         @synchronized
         def cache_del_multi(key: KT) -> None:
+            """Delete an entry, or tree of entries
+
+            If the LruCache is backed by a regular dict, then "key" must be of
+            the right type for this cache
+
+            If the LruCache is backed by a TreeCache, then "key" must be a tuple, but
+            may be of lower cardinality than the TreeCache - in which case the whole
+            subtree is deleted.
             """
-            This will only work if constructed with cache_type=TreeCache
-            """
-            popped = cache.pop(key)
+            popped = cache.pop(key, None)
             if popped is None:
                 return
             # for each deleted node, we now need to remove it from the linked list
@@ -430,11 +435,10 @@ class LruCache(Generic[KT, VT]):
         self.set = cache_set
         self.setdefault = cache_set_default
         self.pop = cache_pop
+        self.del_multi = cache_del_multi
         # `invalidate` is exposed for consistency with DeferredCache, so that it can be
         # invalidated by the cache invalidation replication stream.
-        self.invalidate = cache_pop
-        if cache_type is TreeCache:
-            self.del_multi = cache_del_multi
+        self.invalidate = cache_del_multi
         self.len = synchronized(cache_len)
         self.contains = cache_contains
         self.clear = cache_clear
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index 73502a8b06..a6df81ebff 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -89,6 +89,9 @@ class TreeCache:
             value. If the key is partial, the TreeCacheNode corresponding to the part
             of the tree that was removed.
         """
+        if not isinstance(key, tuple):
+            raise TypeError("The cache key must be a tuple not %r" % (type(key),))
+
         # a list of the nodes we have touched on the way down the tree
         nodes = []
 
diff --git a/synctl b/synctl
index 6ce19918d2..90559ded62 100755
--- a/synctl
+++ b/synctl
@@ -97,11 +97,15 @@ def start(pidfile: str, app: str, config_files: Iterable[str], daemonize: bool)
         write("started %s(%s)" % (app, ",".join(config_files)), colour=GREEN)
         return True
     except subprocess.CalledProcessError as e:
-        write(
-            "error starting %s(%s) (exit code: %d); see above for logs"
-            % (app, ",".join(config_files), e.returncode),
-            colour=RED,
+        err = "%s(%s) failed to start (exit code: %d). Check the Synapse logfile" % (
+            app,
+            ",".join(config_files),
+            e.returncode,
         )
+        if daemonize:
+            err += ", or run synctl with --no-daemonize"
+        err += "."
+        write(err, colour=RED, stream=sys.stderr)
         return False
 
 
diff --git a/tests/config/test_tls.py b/tests/config/test_tls.py
index 183034f7d4..dcf336416c 100644
--- a/tests/config/test_tls.py
+++ b/tests/config/test_tls.py
@@ -74,12 +74,11 @@ s4niecZKPBizL6aucT59CsunNmmb5Glq8rlAcU+1ZTZZzGYqVYhF6axB9Qg=
 
         config = {
             "tls_certificate_path": os.path.join(config_dir, "cert.pem"),
-            "tls_fingerprints": [],
         }
 
         t = TestConfig()
         t.read_config(config, config_dir_path="", data_dir_path="")
-        t.read_certificate_from_disk(require_cert_and_key=False)
+        t.read_tls_certificate()
 
         warnings = self.flushWarnings()
         self.assertEqual(len(warnings), 1)
diff --git a/tests/rest/admin/test_event_reports.py b/tests/rest/admin/test_event_reports.py
index 29341bc6e9..f15d1cf6f7 100644
--- a/tests/rest/admin/test_event_reports.py
+++ b/tests/rest/admin/test_event_reports.py
@@ -64,7 +64,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
                 user_tok=self.admin_user_tok,
             )
         for _ in range(5):
-            self._create_event_and_report(
+            self._create_event_and_report_without_parameters(
                 room_id=self.room_id2,
                 user_tok=self.admin_user_tok,
             )
@@ -378,6 +378,19 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
         )
         self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
 
+    def _create_event_and_report_without_parameters(self, room_id, user_tok):
+        """Create and report an event, but omit reason and score"""
+        resp = self.helper.send(room_id, tok=user_tok)
+        event_id = resp["event_id"]
+
+        channel = self.make_request(
+            "POST",
+            "rooms/%s/report/%s" % (room_id, event_id),
+            json.dumps({}),
+            access_token=user_tok,
+        )
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
     def _check_fields(self, content):
         """Checks that all attributes are present in an event report"""
         for c in content:
diff --git a/tests/rest/admin/test_media.py b/tests/rest/admin/test_media.py
index ac7b219700..f741121ea2 100644
--- a/tests/rest/admin/test_media.py
+++ b/tests/rest/admin/test_media.py
@@ -16,6 +16,8 @@ import json
 import os
 from binascii import unhexlify
 
+from parameterized import parameterized
+
 import synapse.rest.admin
 from synapse.api.errors import Codes
 from synapse.rest.client.v1 import login, profile, room
@@ -562,3 +564,100 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase):
             )
             # Test that the file is deleted
             self.assertFalse(os.path.exists(local_path))
+
+
+class ProtectMediaByIDTestCase(unittest.HomeserverTestCase):
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        synapse.rest.admin.register_servlets_for_media_repo,
+        login.register_servlets,
+    ]
+
+    def prepare(self, reactor, clock, hs):
+        media_repo = hs.get_media_repository_resource()
+        self.store = hs.get_datastore()
+
+        self.admin_user = self.register_user("admin", "pass", admin=True)
+        self.admin_user_tok = self.login("admin", "pass")
+
+        # Create media
+        upload_resource = media_repo.children[b"upload"]
+        # file size is 67 Byte
+        image_data = unhexlify(
+            b"89504e470d0a1a0a0000000d4948445200000001000000010806"
+            b"0000001f15c4890000000a49444154789c63000100000500010d"
+            b"0a2db40000000049454e44ae426082"
+        )
+
+        # Upload some media into the room
+        response = self.helper.upload_media(
+            upload_resource, image_data, tok=self.admin_user_tok, expect_code=200
+        )
+        # Extract media ID from the response
+        server_and_media_id = response["content_uri"][6:]  # Cut off 'mxc://'
+        self.media_id = server_and_media_id.split("/")[1]
+
+        self.url = "/_synapse/admin/v1/media/%s/%s"
+
+    @parameterized.expand(["protect", "unprotect"])
+    def test_no_auth(self, action: str):
+        """
+        Try to protect media without authentication.
+        """
+
+        channel = self.make_request("POST", self.url % (action, self.media_id), b"{}")
+
+        self.assertEqual(401, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(Codes.MISSING_TOKEN, channel.json_body["errcode"])
+
+    @parameterized.expand(["protect", "unprotect"])
+    def test_requester_is_no_admin(self, action: str):
+        """
+        If the user is not a server admin, an error is returned.
+        """
+        self.other_user = self.register_user("user", "pass")
+        self.other_user_token = self.login("user", "pass")
+
+        channel = self.make_request(
+            "POST",
+            self.url % (action, self.media_id),
+            access_token=self.other_user_token,
+        )
+
+        self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+    def test_protect_media(self):
+        """
+        Tests that protect and unprotect a media is successfully
+        """
+
+        media_info = self.get_success(self.store.get_local_media(self.media_id))
+        self.assertFalse(media_info["safe_from_quarantine"])
+
+        # protect
+        channel = self.make_request(
+            "POST",
+            self.url % ("protect", self.media_id),
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(200, channel.code, msg=channel.json_body)
+        self.assertFalse(channel.json_body)
+
+        media_info = self.get_success(self.store.get_local_media(self.media_id))
+        self.assertTrue(media_info["safe_from_quarantine"])
+
+        # unprotect
+        channel = self.make_request(
+            "POST",
+            self.url % ("unprotect", self.media_id),
+            access_token=self.admin_user_tok,
+        )
+
+        self.assertEqual(200, channel.code, msg=channel.json_body)
+        self.assertFalse(channel.json_body)
+
+        media_info = self.get_success(self.store.get_local_media(self.media_id))
+        self.assertFalse(media_info["safe_from_quarantine"])
diff --git a/tests/rest/client/v2_alpha/test_report_event.py b/tests/rest/client/v2_alpha/test_report_event.py
new file mode 100644
index 0000000000..1ec6b05e5b
--- /dev/null
+++ b/tests/rest/client/v2_alpha/test_report_event.py
@@ -0,0 +1,83 @@
+# Copyright 2021 Callum Brown
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+import synapse.rest.admin
+from synapse.rest.client.v1 import login, room
+from synapse.rest.client.v2_alpha import report_event
+
+from tests import unittest
+
+
+class ReportEventTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        report_event.register_servlets,
+    ]
+
+    def prepare(self, reactor, clock, hs):
+        self.admin_user = self.register_user("admin", "pass", admin=True)
+        self.admin_user_tok = self.login("admin", "pass")
+        self.other_user = self.register_user("user", "pass")
+        self.other_user_tok = self.login("user", "pass")
+
+        self.room_id = self.helper.create_room_as(
+            self.other_user, tok=self.other_user_tok, is_public=True
+        )
+        self.helper.join(self.room_id, user=self.admin_user, tok=self.admin_user_tok)
+        resp = self.helper.send(self.room_id, tok=self.admin_user_tok)
+        self.event_id = resp["event_id"]
+        self.report_path = "rooms/{}/report/{}".format(self.room_id, self.event_id)
+
+    def test_reason_str_and_score_int(self):
+        data = {"reason": "this makes me sad", "score": -100}
+        self._assert_status(200, data)
+
+    def test_no_reason(self):
+        data = {"score": 0}
+        self._assert_status(200, data)
+
+    def test_no_score(self):
+        data = {"reason": "this makes me sad"}
+        self._assert_status(200, data)
+
+    def test_no_reason_and_no_score(self):
+        data = {}
+        self._assert_status(200, data)
+
+    def test_reason_int_and_score_str(self):
+        data = {"reason": 10, "score": "string"}
+        self._assert_status(400, data)
+
+    def test_reason_zero_and_score_blank(self):
+        data = {"reason": 0, "score": ""}
+        self._assert_status(400, data)
+
+    def test_reason_and_score_null(self):
+        data = {"reason": None, "score": None}
+        self._assert_status(400, data)
+
+    def _assert_status(self, response_status, data):
+        channel = self.make_request(
+            "POST",
+            self.report_path,
+            json.dumps(data),
+            access_token=self.other_user_tok,
+        )
+        self.assertEqual(
+            response_status, int(channel.result["code"]), msg=channel.result["body"]
+        )
diff --git a/tests/storage/databases/__init__.py b/tests/storage/databases/__init__.py
new file mode 100644
index 0000000000..c24c7ecd92
--- /dev/null
+++ b/tests/storage/databases/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/tests/storage/databases/main/__init__.py b/tests/storage/databases/main/__init__.py
new file mode 100644
index 0000000000..c24c7ecd92
--- /dev/null
+++ b/tests/storage/databases/main/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py
new file mode 100644
index 0000000000..932970fd9a
--- /dev/null
+++ b/tests/storage/databases/main/test_events_worker.py
@@ -0,0 +1,96 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+
+from synapse.logging.context import LoggingContext
+from synapse.storage.databases.main.events_worker import EventsWorkerStore
+
+from tests import unittest
+
+
+class HaveSeenEventsTestCase(unittest.HomeserverTestCase):
+    def prepare(self, reactor, clock, hs):
+        self.store: EventsWorkerStore = hs.get_datastore()
+
+        # insert some test data
+        for rid in ("room1", "room2"):
+            self.get_success(
+                self.store.db_pool.simple_insert(
+                    "rooms",
+                    {"room_id": rid, "room_version": 4},
+                )
+            )
+
+        for idx, (rid, eid) in enumerate(
+            (
+                ("room1", "event10"),
+                ("room1", "event11"),
+                ("room1", "event12"),
+                ("room2", "event20"),
+            )
+        ):
+            self.get_success(
+                self.store.db_pool.simple_insert(
+                    "events",
+                    {
+                        "event_id": eid,
+                        "room_id": rid,
+                        "topological_ordering": idx,
+                        "stream_ordering": idx,
+                        "type": "test",
+                        "processed": True,
+                        "outlier": False,
+                    },
+                )
+            )
+            self.get_success(
+                self.store.db_pool.simple_insert(
+                    "event_json",
+                    {
+                        "event_id": eid,
+                        "room_id": rid,
+                        "json": json.dumps({"type": "test", "room_id": rid}),
+                        "internal_metadata": "{}",
+                        "format_version": 3,
+                    },
+                )
+            )
+
+    def test_simple(self):
+        with LoggingContext(name="test") as ctx:
+            res = self.get_success(
+                self.store.have_seen_events("room1", ["event10", "event19"])
+            )
+            self.assertEquals(res, {"event10"})
+
+            # that should result in a single db query
+            self.assertEquals(ctx.get_resource_usage().db_txn_count, 1)
+
+        # a second lookup of the same events should cause no queries
+        with LoggingContext(name="test") as ctx:
+            res = self.get_success(
+                self.store.have_seen_events("room1", ["event10", "event19"])
+            )
+            self.assertEquals(res, {"event10"})
+            self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)
+
+    def test_query_via_event_cache(self):
+        # fetch an event into the event cache
+        self.get_success(self.store.get_event("event10"))
+
+        # looking it up should now cause no db hits
+        with LoggingContext(name="test") as ctx:
+            res = self.get_success(self.store.have_seen_events("room1", ["event10"]))
+            self.assertEquals(res, {"event10"})
+            self.assertEquals(ctx.get_resource_usage().db_txn_count, 0)
diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py
index bbbc276697..0277998cbe 100644
--- a/tests/util/caches/test_descriptors.py
+++ b/tests/util/caches/test_descriptors.py
@@ -622,17 +622,17 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase):
         self.assertEquals(callcount2[0], 1)
 
         a.func2.invalidate(("foo",))
-        self.assertEquals(a.func2.cache.cache.pop.call_count, 1)
+        self.assertEquals(a.func2.cache.cache.del_multi.call_count, 1)
 
         yield a.func2("foo")
         a.func2.invalidate(("foo",))
-        self.assertEquals(a.func2.cache.cache.pop.call_count, 2)
+        self.assertEquals(a.func2.cache.cache.del_multi.call_count, 2)
 
         self.assertEquals(callcount[0], 1)
         self.assertEquals(callcount2[0], 2)
 
         a.func.invalidate(("foo",))
-        self.assertEquals(a.func2.cache.cache.pop.call_count, 3)
+        self.assertEquals(a.func2.cache.cache.del_multi.call_count, 3)
         yield a.func("foo")
 
         self.assertEquals(callcount[0], 2)
diff --git a/tests/util/test_batching_queue.py b/tests/util/test_batching_queue.py
index 5def1e56c9..edf29e5b96 100644
--- a/tests/util/test_batching_queue.py
+++ b/tests/util/test_batching_queue.py
@@ -14,7 +14,12 @@
 from twisted.internet import defer
 
 from synapse.logging.context import make_deferred_yieldable
-from synapse.util.batching_queue import BatchingQueue
+from synapse.util.batching_queue import (
+    BatchingQueue,
+    number_in_flight,
+    number_of_keys,
+    number_queued,
+)
 
 from tests.server import get_clock
 from tests.unittest import TestCase
@@ -24,6 +29,14 @@ class BatchingQueueTestCase(TestCase):
     def setUp(self):
         self.clock, hs_clock = get_clock()
 
+        # We ensure that we remove any existing metrics for "test_queue".
+        try:
+            number_queued.remove("test_queue")
+            number_of_keys.remove("test_queue")
+            number_in_flight.remove("test_queue")
+        except KeyError:
+            pass
+
         self._pending_calls = []
         self.queue = BatchingQueue("test_queue", hs_clock, self._process_queue)
 
@@ -32,6 +45,41 @@ class BatchingQueueTestCase(TestCase):
         self._pending_calls.append((values, d))
         return await make_deferred_yieldable(d)
 
+    def _assert_metrics(self, queued, keys, in_flight):
+        """Assert that the metrics are correct"""
+
+        self.assertEqual(len(number_queued.collect()), 1)
+        self.assertEqual(len(number_queued.collect()[0].samples), 1)
+        self.assertEqual(
+            number_queued.collect()[0].samples[0].labels,
+            {"name": self.queue._name},
+        )
+        self.assertEqual(
+            number_queued.collect()[0].samples[0].value,
+            queued,
+            "number_queued",
+        )
+
+        self.assertEqual(len(number_of_keys.collect()), 1)
+        self.assertEqual(len(number_of_keys.collect()[0].samples), 1)
+        self.assertEqual(
+            number_queued.collect()[0].samples[0].labels, {"name": self.queue._name}
+        )
+        self.assertEqual(
+            number_of_keys.collect()[0].samples[0].value, keys, "number_of_keys"
+        )
+
+        self.assertEqual(len(number_in_flight.collect()), 1)
+        self.assertEqual(len(number_in_flight.collect()[0].samples), 1)
+        self.assertEqual(
+            number_queued.collect()[0].samples[0].labels, {"name": self.queue._name}
+        )
+        self.assertEqual(
+            number_in_flight.collect()[0].samples[0].value,
+            in_flight,
+            "number_in_flight",
+        )
+
     def test_simple(self):
         """Tests the basic case of calling `add_to_queue` once and having
         `_process_queue` return.
@@ -41,6 +89,8 @@ class BatchingQueueTestCase(TestCase):
 
         queue_d = defer.ensureDeferred(self.queue.add_to_queue("foo"))
 
+        self._assert_metrics(queued=1, keys=1, in_flight=1)
+
         # The queue should wait a reactor tick before calling the processing
         # function.
         self.assertFalse(self._pending_calls)
@@ -52,12 +102,15 @@ class BatchingQueueTestCase(TestCase):
         self.assertEqual(len(self._pending_calls), 1)
         self.assertEqual(self._pending_calls[0][0], ["foo"])
         self.assertFalse(queue_d.called)
+        self._assert_metrics(queued=0, keys=0, in_flight=1)
 
         # Return value of the `_process_queue` should be propagated back.
         self._pending_calls.pop()[1].callback("bar")
 
         self.assertEqual(self.successResultOf(queue_d), "bar")
 
+        self._assert_metrics(queued=0, keys=0, in_flight=0)
+
     def test_batching(self):
         """Test that multiple calls at the same time get batched up into one
         call to `_process_queue`.
@@ -68,6 +121,8 @@ class BatchingQueueTestCase(TestCase):
         queue_d1 = defer.ensureDeferred(self.queue.add_to_queue("foo1"))
         queue_d2 = defer.ensureDeferred(self.queue.add_to_queue("foo2"))
 
+        self._assert_metrics(queued=2, keys=1, in_flight=2)
+
         self.clock.pump([0])
 
         # We should see only *one* call to `_process_queue`
@@ -75,12 +130,14 @@ class BatchingQueueTestCase(TestCase):
         self.assertEqual(self._pending_calls[0][0], ["foo1", "foo2"])
         self.assertFalse(queue_d1.called)
         self.assertFalse(queue_d2.called)
+        self._assert_metrics(queued=0, keys=0, in_flight=2)
 
         # Return value of the `_process_queue` should be propagated back to both.
         self._pending_calls.pop()[1].callback("bar")
 
         self.assertEqual(self.successResultOf(queue_d1), "bar")
         self.assertEqual(self.successResultOf(queue_d2), "bar")
+        self._assert_metrics(queued=0, keys=0, in_flight=0)
 
     def test_queuing(self):
         """Test that we queue up requests while a `_process_queue` is being
@@ -92,13 +149,20 @@ class BatchingQueueTestCase(TestCase):
         queue_d1 = defer.ensureDeferred(self.queue.add_to_queue("foo1"))
         self.clock.pump([0])
 
+        self.assertEqual(len(self._pending_calls), 1)
+
+        # We queue up work after the process function has been called, testing
+        # that they get correctly queued up.
         queue_d2 = defer.ensureDeferred(self.queue.add_to_queue("foo2"))
+        queue_d3 = defer.ensureDeferred(self.queue.add_to_queue("foo3"))
 
         # We should see only *one* call to `_process_queue`
         self.assertEqual(len(self._pending_calls), 1)
         self.assertEqual(self._pending_calls[0][0], ["foo1"])
         self.assertFalse(queue_d1.called)
         self.assertFalse(queue_d2.called)
+        self.assertFalse(queue_d3.called)
+        self._assert_metrics(queued=2, keys=1, in_flight=3)
 
         # Return value of the `_process_queue` should be propagated back to the
         # first.
@@ -106,18 +170,24 @@ class BatchingQueueTestCase(TestCase):
 
         self.assertEqual(self.successResultOf(queue_d1), "bar1")
         self.assertFalse(queue_d2.called)
+        self.assertFalse(queue_d3.called)
+        self._assert_metrics(queued=2, keys=1, in_flight=2)
 
         # We should now see a second call to `_process_queue`
         self.clock.pump([0])
         self.assertEqual(len(self._pending_calls), 1)
-        self.assertEqual(self._pending_calls[0][0], ["foo2"])
+        self.assertEqual(self._pending_calls[0][0], ["foo2", "foo3"])
         self.assertFalse(queue_d2.called)
+        self.assertFalse(queue_d3.called)
+        self._assert_metrics(queued=0, keys=0, in_flight=2)
 
         # Return value of the `_process_queue` should be propagated back to the
         # second.
         self._pending_calls.pop()[1].callback("bar2")
 
         self.assertEqual(self.successResultOf(queue_d2), "bar2")
+        self.assertEqual(self.successResultOf(queue_d3), "bar2")
+        self._assert_metrics(queued=0, keys=0, in_flight=0)
 
     def test_different_keys(self):
         """Test that calls to different keys get processed in parallel."""
@@ -140,6 +210,7 @@ class BatchingQueueTestCase(TestCase):
         self.assertFalse(queue_d1.called)
         self.assertFalse(queue_d2.called)
         self.assertFalse(queue_d3.called)
+        self._assert_metrics(queued=1, keys=1, in_flight=3)
 
         # Return value of the `_process_queue` should be propagated back to the
         # first.
@@ -148,6 +219,7 @@ class BatchingQueueTestCase(TestCase):
         self.assertEqual(self.successResultOf(queue_d1), "bar1")
         self.assertFalse(queue_d2.called)
         self.assertFalse(queue_d3.called)
+        self._assert_metrics(queued=1, keys=1, in_flight=2)
 
         # Return value of the `_process_queue` should be propagated back to the
         # second.
@@ -161,9 +233,11 @@ class BatchingQueueTestCase(TestCase):
         self.assertEqual(len(self._pending_calls), 1)
         self.assertEqual(self._pending_calls[0][0], ["foo3"])
         self.assertFalse(queue_d3.called)
+        self._assert_metrics(queued=0, keys=0, in_flight=1)
 
         # Return value of the `_process_queue` should be propagated back to the
         # third deferred.
         self._pending_calls.pop()[1].callback("bar4")
 
         self.assertEqual(self.successResultOf(queue_d3), "bar4")
+        self._assert_metrics(queued=0, keys=0, in_flight=0)