summary refs log tree commit diff
diff options
context:
space:
mode:
-rwxr-xr-x.ci/scripts/prepare_old_deps.sh (renamed from .ci/scripts/test_old_deps.sh)25
-rw-r--r--.dockerignore4
-rw-r--r--.github/workflows/latest_deps.yml34
-rw-r--r--.github/workflows/release-artifacts.yml65
-rw-r--r--.github/workflows/tests.yml158
-rw-r--r--.github/workflows/twisted_trunk.yml24
-rw-r--r--.gitignore7
-rw-r--r--Cargo.toml5
-rw-r--r--build_rust.py20
-rw-r--r--changelog.d/12595.misc1
-rw-r--r--changelog.d/13480.doc1
-rw-r--r--changelog.d/13506.bugfix1
-rw-r--r--changelog.d/13672.feature1
-rw-r--r--changelog.d/13680.feature1
-rw-r--r--changelog.d/13687.feature1
-rw-r--r--changelog.d/13703.misc1
-rw-r--r--changelog.d/13706.misc1
-rw-r--r--changelog.d/13707.misc1
-rw-r--r--changelog.d/13714.misc1
-rw-r--r--changelog.d/13717.misc1
-rw-r--r--changelog.d/13718.misc1
-rw-r--r--changelog.d/13724.misc1
-rw-r--r--changelog.d/13725.misc1
-rw-r--r--changelog.d/13726.doc1
-rw-r--r--changelog.d/13727.doc1
-rw-r--r--changelog.d/13728.doc1
-rw-r--r--changelog.d/13729.misc1
-rw-r--r--changelog.d/13730.misc1
-rw-r--r--changelog.d/13734.misc1
-rw-r--r--changelog.d/13735.misc1
-rw-r--r--changelog.d/13738.bugfix1
-rw-r--r--changelog.d/13741.feature1
-rw-r--r--changelog.d/13743.misc1
-rw-r--r--changelog.d/13745.misc1
-rw-r--r--changelog.d/13746.bugfix1
-rw-r--r--changelog.d/13748.misc1
-rw-r--r--changelog.d/13750.misc1
-rw-r--r--changelog.d/13752.misc1
-rw-r--r--changelog.d/13754.misc1
-rw-r--r--changelog.d/13756.misc1
-rw-r--r--changelog.d/13759.misc1
-rw-r--r--changelog.d/13760.removal1
-rw-r--r--changelog.d/13761.misc1
-rw-r--r--changelog.d/13763.misc1
-rw-r--r--changelog.d/13765.misc1
-rw-r--r--changelog.d/13769.misc1
-rw-r--r--changelog.d/13770.misc1
-rw-r--r--changelog.d/13778.misc1
-rw-r--r--changelog.d/13784.misc1
-rw-r--r--contrib/grafana/synapse.json138
-rw-r--r--contrib/prometheus/synapse-v1.rules21
-rw-r--r--contrib/prometheus/synapse-v2.rules49
-rwxr-xr-xdebian/build_virtualenv7
-rw-r--r--debian/changelog4
-rwxr-xr-xdebian/rules2
-rw-r--r--docker/Dockerfile14
-rw-r--r--docker/Dockerfile-dhvirtualenv10
-rw-r--r--docs/admin_api/rooms.md145
-rw-r--r--docs/admin_api/user_admin_api.md2
-rw-r--r--docs/deprecation_policy.md26
-rw-r--r--docs/development/contributing_guide.md10
-rw-r--r--docs/setup/installation.md9
-rw-r--r--docs/usage/configuration/config_documentation.md8
-rw-r--r--mypy.ini6
-rw-r--r--poetry.lock35
-rw-r--r--pyproject.toml41
-rw-r--r--rust/Cargo.toml25
-rw-r--r--rust/build.rs45
-rw-r--r--rust/src/lib.rs24
-rwxr-xr-xscripts-dev/make_full_schema.sh48
-rw-r--r--stubs/synapse/__init__.pyi0
-rw-r--r--stubs/synapse/synapse_rust.pyi2
-rw-r--r--synapse/__init__.py5
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py2
-rw-r--r--synapse/api/auth.py12
-rw-r--r--synapse/api/filtering.py8
-rw-r--r--synapse/api/room_versions.py45
-rw-r--r--synapse/app/phone_stats_home.py6
-rw-r--r--synapse/config/key.py13
-rw-r--r--synapse/event_auth.py4
-rw-r--r--synapse/events/__init__.py12
-rw-r--r--synapse/events/builder.py4
-rw-r--r--synapse/events/validator.py2
-rw-r--r--synapse/federation/federation_base.py2
-rw-r--r--synapse/federation/federation_client.py2
-rw-r--r--synapse/federation/sender/__init__.py4
-rw-r--r--synapse/handlers/admin.py1
-rw-r--r--synapse/handlers/device.py3
-rw-r--r--synapse/handlers/e2e_keys.py40
-rw-r--r--synapse/handlers/pagination.py37
-rw-r--r--synapse/handlers/room_summary.py1
-rw-r--r--synapse/handlers/sync.py20
-rw-r--r--synapse/http/servlet.py19
-rw-r--r--synapse/logging/opentracing.py19
-rw-r--r--synapse/metrics/_legacy_exposition.py16
-rw-r--r--synapse/push/push_tools.py13
-rw-r--r--synapse/rest/admin/__init__.py4
-rw-r--r--synapse/rest/admin/rooms.py104
-rw-r--r--synapse/rest/client/account.py54
-rw-r--r--synapse/rest/client/keys.py6
-rw-r--r--synapse/rest/client/models.py24
-rw-r--r--synapse/server.py12
-rw-r--r--synapse/storage/controllers/state.py4
-rw-r--r--synapse/storage/database.py47
-rw-r--r--synapse/storage/databases/main/devices.py4
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py5
-rw-r--r--synapse/storage/databases/main/event_federation.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py10
-rw-r--r--synapse/storage/databases/main/lock.py121
-rw-r--r--synapse/storage/databases/main/receipts.py86
-rw-r--r--synapse/storage/databases/main/registration.py6
-rw-r--r--synapse/storage/databases/main/roommember.py226
-rw-r--r--synapse/storage/databases/main/state.py4
-rw-r--r--synapse/storage/databases/main/stats.py86
-rw-r--r--synapse/storage/databases/main/stream.py2
-rw-r--r--synapse/storage/databases/main/transactions.py30
-rw-r--r--synapse/storage/databases/state/store.py3
-rw-r--r--synapse/storage/engines/_base.py8
-rw-r--r--synapse/storage/engines/postgres.py7
-rw-r--r--synapse/storage/engines/sqlite.py13
-rw-r--r--synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql19
-rw-r--r--synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql16
-rw-r--r--synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py52
-rw-r--r--synapse/storage/schema/state/delta/30/state_stream.sql4
-rw-r--r--synapse/storage/util/partial_state_events_tracker.py3
-rw-r--r--synapse/types.py5
-rw-r--r--synapse/util/caches/__init__.py60
-rw-r--r--synapse/util/metrics.py34
-rw-r--r--synapse/util/rust.py84
-rw-r--r--tests/http/server/_base.py10
-rw-r--r--tests/rest/admin/test_room.py158
-rw-r--r--tests/rest/admin/test_user.py1
-rw-r--r--tests/rest/client/test_keys.py29
-rw-r--r--tests/storage/databases/main/test_events_worker.py2
-rw-r--r--tests/storage/test_base.py1
-rw-r--r--tests/storage/test_event_federation.py2
-rw-r--r--tests/storage/test_registration.py33
-rw-r--r--tests/test_event_auth.py4
-rw-r--r--tests/test_rust.py11
-rw-r--r--tests/test_types.py26
140 files changed, 2013 insertions, 754 deletions
diff --git a/.ci/scripts/test_old_deps.sh b/.ci/scripts/prepare_old_deps.sh
index 478c8d639a..7e4f060b17 100755
--- a/.ci/scripts/test_old_deps.sh
+++ b/.ci/scripts/prepare_old_deps.sh
@@ -5,18 +5,8 @@
 # - creates a venv with these old versions using poetry; and finally
 # - invokes `trial` to run the tests with old deps.
 
-# Prevent tzdata from asking for user input
-export DEBIAN_FRONTEND=noninteractive
-
 set -ex
 
-apt-get update
-apt-get install -y \
-        python3 python3-dev python3-pip python3-venv pipx \
-        libxml2-dev libxslt-dev xmlsec1 zlib1g-dev libjpeg-dev libwebp-dev
-
-export LANG="C.UTF-8"
-
 # Prevent virtualenv from auto-updating pip to an incompatible version
 export VIRTUALENV_NO_DOWNLOAD=1
 
@@ -33,12 +23,6 @@ export VIRTUALENV_NO_DOWNLOAD=1
 #   a `cryptography` compiled against OpenSSL 1.1.
 # - Omit systemd: we're not logging to journal here.
 
-# TODO: also replace caret bounds, see https://python-poetry.org/docs/dependency-specification/#version-constraints
-# We don't use these yet, but IIRC they are the default bound used when you `poetry add`.
-# The sed expression 's/\^/==/g' ought to do the trick. But it would also change
-# `python = "^3.7"` to `python = "==3.7", which would mean we fail because olddeps
-# runs on 3.8 (#12343).
-
 sed -i \
    -e "s/[~>]=/==/g" \
    -e '/^python = "^/!s/\^/==/g' \
@@ -55,7 +39,7 @@ sed -i \
 # toml file. This means we don't have to ensure compatibility between old deps and
 # dev tools.
 
-pip install --user toml
+pip install toml wheel
 
 REMOVE_DEV_DEPENDENCIES="
 import toml
@@ -69,8 +53,8 @@ with open('pyproject.toml', 'w') as f:
 "
 python3 -c "$REMOVE_DEV_DEPENDENCIES"
 
-pipx install poetry==1.1.14
-~/.local/bin/poetry lock
+pip install poetry==1.2.0
+poetry lock
 
 echo "::group::Patched pyproject.toml"
 cat pyproject.toml
@@ -78,6 +62,3 @@ echo "::endgroup::"
 echo "::group::Lockfile after patch"
 cat poetry.lock
 echo "::endgroup::"
-
-~/.local/bin/poetry install -E "all test"
-~/.local/bin/poetry run trial --jobs=2 tests
diff --git a/.dockerignore b/.dockerignore
index 7809863ef3..8eb1e4df8a 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -4,8 +4,12 @@
 # things to include
 !docker
 !synapse
+!rust
 !README.rst
 !pyproject.toml
 !poetry.lock
+!build_rust.py
+
+rust/target
 
 **/__pycache__
diff --git a/.github/workflows/latest_deps.yml b/.github/workflows/latest_deps.yml
index 7dac617c4b..8366ac9393 100644
--- a/.github/workflows/latest_deps.yml
+++ b/.github/workflows/latest_deps.yml
@@ -5,7 +5,7 @@
 #
 # As an overview this workflow:
 # - checks out develop,
-# - installs from source, pulling in the dependencies like a fresh `pip install` would, and 
+# - installs from source, pulling in the dependencies like a fresh `pip install` would, and
 # - runs mypy and test suites in that checkout.
 #
 # Based on the twisted trunk CI job.
@@ -26,12 +26,19 @@ jobs:
     runs-on: ubuntu-latest
     steps:
       - uses: actions/checkout@v2
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: stable
+            override: true
+      - uses: Swatinem/rust-cache@v2
+
       # The dev dependencies aren't exposed in the wheel metadata (at least with current
       # poetry-core versions), so we install with poetry.
       - uses: matrix-org/setup-python-poetry@v1
         with:
           python-version: "3.x"
-          poetry-version: "1.2.0b1"
+          poetry-version: "1.2.0"
           extras: "all"
       # Dump installed versions for debugging.
       - run: poetry run pip list > before.txt
@@ -53,6 +60,14 @@ jobs:
 
     steps:
       - uses: actions/checkout@v2
+
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: stable
+            override: true
+      - uses: Swatinem/rust-cache@v2
+
       - run: sudo apt-get -qq install xmlsec1
       - name: Set up PostgreSQL ${{ matrix.postgres-version }}
         if: ${{ matrix.postgres-version }}
@@ -69,6 +84,12 @@ jobs:
         if: ${{ matrix.postgres-version }}
         timeout-minutes: 2
         run: until pg_isready -h localhost; do sleep 1; done
+
+      # We nuke the local copy, as we've installed synapse into the virtualenv
+      # (rather than use an editable install, which we no longer support). If we
+      # don't do this then python can't find the native lib.
+      - run: rm -rf synapse/
+
       - run: python -m twisted.trial --jobs=2 tests
         env:
           SYNAPSE_POSTGRES: ${{ matrix.database == 'postgres' || '' }}
@@ -113,6 +134,14 @@ jobs:
 
     steps:
       - uses: actions/checkout@v2
+
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: stable
+            override: true
+      - uses: Swatinem/rust-cache@v2
+
       - name: Ensure sytest runs `pip install`
         # Delete the lockfile so sytest will `pip install` rather than `poetry install`
         run: rm /src/poetry.lock
@@ -187,4 +216,3 @@ jobs:
         with:
           update_existing: true
           filename: .ci/latest_deps_build_failed_issue_template.md
-
diff --git a/.github/workflows/release-artifacts.yml b/.github/workflows/release-artifacts.yml
index ed4fc6179d..0708d631cd 100644
--- a/.github/workflows/release-artifacts.yml
+++ b/.github/workflows/release-artifacts.yml
@@ -15,7 +15,7 @@ on:
 concurrency:
   group: ${{ github.workflow }}-${{ github.ref }}
   cancel-in-progress: true
-  
+
 permissions:
   contents: write
 
@@ -89,9 +89,67 @@ jobs:
           name: debs
           path: debs/*
 
+  build-wheels:
+    name: Build wheels on ${{ matrix.os }}
+    runs-on: ${{ matrix.os }}
+    strategy:
+      matrix:
+        os: [ubuntu-20.04, macos-10.15]
+        is_pr:
+          - ${{ startsWith(github.ref, 'refs/pull/') }}
+
+        exclude:
+          # Don't build macos wheels on PR CI.
+          - is_pr: true
+            os: "macos-10.15"
+
+    steps:
+      - uses: actions/checkout@v3
+
+      - uses: actions/setup-python@v3
+
+      - name: Install cibuildwheel
+        run: python -m pip install cibuildwheel==2.9.0 poetry==1.2.0
+
+      # Only build a single wheel in CI.
+      - name: Set env vars.
+        run: |
+          echo "CIBW_BUILD="cp37-manylinux_x86_64"" >> $GITHUB_ENV
+        if: startsWith(github.ref, 'refs/pull/')
+
+      - name: Build wheels
+        run: python -m cibuildwheel --output-dir wheelhouse
+        env:
+          # Skip testing for platforms which various libraries don't have wheels
+          # for, and so need extra build deps.
+          CIBW_TEST_SKIP: pp39-* *i686* *musl* pp37-macosx*
+
+      - uses: actions/upload-artifact@v3
+        with:
+          name: Wheel
+          path: ./wheelhouse/*.whl
+
   build-sdist:
-    name: "Build pypi distribution files"
-    uses: "matrix-org/backend-meta/.github/workflows/packaging.yml@v1"
+    name: Build sdist
+    runs-on: ubuntu-latest
+    if: ${{ !startsWith(github.ref, 'refs/pull/') }}
+
+    steps:
+      - uses: actions/checkout@v3
+      - uses: actions/setup-python@v4
+        with:
+          python-version: '3.10'
+
+      - run: pip install build
+
+      - name: Build sdist
+        run: python -m build --sdist
+
+      - uses: actions/upload-artifact@v2
+        with:
+          name: Sdist
+          path: dist/*.tar.gz
+
 
   # if it's a tag, create a release and attach the artifacts to it
   attach-assets:
@@ -99,6 +157,7 @@ jobs:
     if: ${{ !failure() && !cancelled() && startsWith(github.ref, 'refs/tags/') }}
     needs:
       - build-debs
+      - build-wheels
       - build-sdist
     runs-on: ubuntu-latest
     steps:
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index bc1de2893c..a5a217d015 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -10,6 +10,23 @@ concurrency:
   cancel-in-progress: true
 
 jobs:
+  # Job to detect what has changed so we don't run e.g. Rust checks on PRs that
+  # don't modify Rust code.
+  changes:
+    runs-on: ubuntu-latest
+    outputs:
+      rust: ${{ !startsWith(github.ref, 'refs/pull/') || steps.filter.outputs.rust }}
+    steps:
+    - uses: dorny/paths-filter@v2
+      id: filter
+      # We only check on PRs
+      if: startsWith(github.ref, 'refs/pull/')
+      with:
+        filters: |
+          rust:
+            - 'rust/**'
+            - 'Cargo.toml'
+
   check-sampleconfig:
     runs-on: ubuntu-latest
     steps:
@@ -65,10 +82,54 @@ jobs:
           extras: "all"
       - run: poetry run scripts-dev/check_pydantic_models.py
 
+  lint-clippy:
+    runs-on: ubuntu-latest
+    needs: changes
+    if: ${{ needs.changes.outputs.rust == 'true' }}
+
+    steps:
+      - uses: actions/checkout@v2
+
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: 1.61.0
+            override: true
+            components: clippy
+      - uses: Swatinem/rust-cache@v2
+
+      - run: cargo clippy
+
+  lint-rustfmt:
+    runs-on: ubuntu-latest
+    needs: changes
+    if: ${{ needs.changes.outputs.rust == 'true' }}
+
+    steps:
+      - uses: actions/checkout@v2
+
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: 1.61.0
+            override: true
+            components: rustfmt
+      - uses: Swatinem/rust-cache@v2
+
+      - run: cargo fmt --check
+
   # Dummy step to gate other tests on without repeating the whole list
   linting-done:
     if: ${{ !cancelled() }} # Run this even if prior jobs were skipped
-    needs: [lint, lint-crlf, lint-newsfile, lint-pydantic, check-sampleconfig, check-schema-delta]
+    needs:
+      - lint
+      - lint-crlf
+      - lint-newsfile
+      - lint-pydantic
+      - check-sampleconfig
+      - check-schema-delta
+      - lint-clippy
+      - lint-rustfmt
     runs-on: ubuntu-latest
     steps:
       - run: "true"
@@ -135,16 +196,54 @@ jobs:
     # Note: sqlite only; no postgres
     if: ${{ !cancelled() && !failure() }} # Allow previous steps to be skipped, but not fail
     needs: linting-done
-    runs-on: ubuntu-latest
+    runs-on: ubuntu-20.04
     steps:
       - uses: actions/checkout@v2
-      - name: Test with old deps
-        uses: docker://ubuntu:focal # For old python and sqlite
-        # Note: focal seems to be using 3.8, but the oldest is 3.7?
-        # See https://github.com/matrix-org/synapse/issues/12343
+
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
         with:
-          workdir: /github/workspace
-          entrypoint: .ci/scripts/test_old_deps.sh
+            toolchain: 1.61.0
+            override: true
+      - uses: Swatinem/rust-cache@v2
+
+      # There aren't wheels for some of the older deps, so we need to install
+      # their build dependencies
+      - run: |
+          sudo apt-get -qq install build-essential libffi-dev python-dev \
+          libxml2-dev libxslt-dev xmlsec1 zlib1g-dev libjpeg-dev libwebp-dev
+
+      - uses: actions/setup-python@v4
+        with:
+          python-version: '3.7'
+
+      # Calculating the old-deps actually takes a bunch of time, so we cache the
+      # pyproject.toml / poetry.lock. We need to cache pyproject.toml as
+      # otherwise the `poetry install` step will error due to the poetry.lock
+      # file being outdated.
+      #
+      # This caches the output of `Prepare old deps`, which should generate the
+      # same `pyproject.toml` and `poetry.lock` for a given `pyproject.toml` input.
+      - uses: actions/cache@v3
+        id: cache-poetry-old-deps
+        name: Cache poetry.lock
+        with:
+          path: |
+            poetry.lock
+            pyproject.toml
+          key: poetry-old-deps2-${{ hashFiles('pyproject.toml') }}
+      - name: Prepare old deps
+        if: steps.cache-poetry-old-deps.outputs.cache-hit != 'true'
+        run: .ci/scripts/prepare_old_deps.sh
+
+      # We only now install poetry so that `setup-python-poetry` caches the
+      # right poetry.lock's dependencies.
+      - uses: matrix-org/setup-python-poetry@v1
+        with:
+          python-version: '3.7'
+          extras: "all test"
+
+      - run: poetry run trial -j2 tests
       - name: Dump logs
         # Logs are most useful when the command fails, always include them.
         if: ${{ always() }}
@@ -216,6 +315,14 @@ jobs:
       - uses: actions/checkout@v2
       - name: Prepare test blacklist
         run: cat sytest-blacklist .ci/worker-blacklist > synapse-blacklist-with-workers
+
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: 1.61.0
+            override: true
+      - uses: Swatinem/rust-cache@v2
+
       - name: Run SyTest
         run: /bootstrap.sh synapse
         working-directory: /src
@@ -322,6 +429,13 @@ jobs:
         with:
           path: synapse
 
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: 1.61.0
+            override: true
+      - uses: Swatinem/rust-cache@v2
+
       - name: Prepare Complement's Prerequisites
         run: synapse/.ci/scripts/setup_complement_prerequisites.sh
 
@@ -331,20 +445,36 @@ jobs:
         shell: bash
         name: Run Complement Tests
 
+  cargo-test:
+    if: ${{ needs.changes.outputs.rust == 'true' }}
+    runs-on: ubuntu-latest
+    needs:
+      - linting-done
+      - changes
+
+    steps:
+      - uses: actions/checkout@v2
+
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: 1.61.0
+            override: true
+      - uses: Swatinem/rust-cache@v2
+
+      - run: cargo test
+
   # a job which marks all the other jobs as complete, thus allowing PRs to be merged.
   tests-done:
     if: ${{ always() }}
     needs:
-      - check-sampleconfig
-      - lint
-      - lint-crlf
-      - lint-newsfile
       - trial
       - trial-olddeps
       - sytest
       - export-data
       - portdb
       - complement
+      - cargo-test
     runs-on: ubuntu-latest
     steps:
       - uses: matrix-org/done-action@v2
@@ -352,5 +482,7 @@ jobs:
           needs: ${{ toJSON(needs) }}
 
           # The newsfile lint may be skipped on non PR builds
-          skippable:
+          # Cargo test is skipped if there is no changes on Rust code
+          skippable: |
             lint-newsfile
+            cargo-test
diff --git a/.github/workflows/twisted_trunk.yml b/.github/workflows/twisted_trunk.yml
index 0906101cc1..8fa2fbdea0 100644
--- a/.github/workflows/twisted_trunk.yml
+++ b/.github/workflows/twisted_trunk.yml
@@ -16,6 +16,14 @@ jobs:
 
     steps:
       - uses: actions/checkout@v2
+
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: stable
+            override: true
+      - uses: Swatinem/rust-cache@v2
+
       - uses: matrix-org/setup-python-poetry@v1
         with:
           python-version: "3.x"
@@ -34,6 +42,14 @@ jobs:
     steps:
       - uses: actions/checkout@v2
       - run: sudo apt-get -qq install xmlsec1
+
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: stable
+            override: true
+      - uses: Swatinem/rust-cache@v2
+
       - uses: matrix-org/setup-python-poetry@v1
         with:
           python-version: "3.x"
@@ -66,6 +82,14 @@ jobs:
 
     steps:
       - uses: actions/checkout@v2
+
+      - name: Install Rust
+        uses: actions-rs/toolchain@v1
+        with:
+            toolchain: stable
+            override: true
+      - uses: Swatinem/rust-cache@v2
+
       - name: Patch dependencies
         # Note: The poetry commands want to create a virtualenv in /src/.venv/,
         #       but the sytest-synapse container expects it to be in /venv/.
diff --git a/.gitignore b/.gitignore
index e58affb241..31a60bb7bd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,3 +60,10 @@ book/
 # complement
 /complement-*
 /master.tar.gz
+
+# rust
+/target/
+/synapse/*.so
+
+# Poetry will create a setup.py, which we don't want to include.
+/setup.py
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644
index 0000000000..de141bdee9
--- /dev/null
+++ b/Cargo.toml
@@ -0,0 +1,5 @@
+# We make the whole Synapse folder a workspace so that we can run `cargo`
+# commands from the root (rather than having to cd into rust/).
+
+[workspace]
+members = ["rust"]
diff --git a/build_rust.py b/build_rust.py
new file mode 100644
index 0000000000..5c5e557ee8
--- /dev/null
+++ b/build_rust.py
@@ -0,0 +1,20 @@
+# A build script for poetry that adds the rust extension.
+
+import os
+from typing import Any, Dict
+
+from setuptools_rust import Binding, RustExtension
+
+
+def build(setup_kwargs: Dict[str, Any]) -> None:
+    original_project_dir = os.path.dirname(os.path.realpath(__file__))
+    cargo_toml_path = os.path.join(original_project_dir, "rust", "Cargo.toml")
+
+    extension = RustExtension(
+        target="synapse.synapse_rust",
+        path=cargo_toml_path,
+        binding=Binding.PyO3,
+        py_limited_api=True,
+    )
+    setup_kwargs.setdefault("rust_extensions", []).append(extension)
+    setup_kwargs["zip_safe"] = False
diff --git a/changelog.d/12595.misc b/changelog.d/12595.misc
new file mode 100644
index 0000000000..2e0dd68a0f
--- /dev/null
+++ b/changelog.d/12595.misc
@@ -0,0 +1 @@
+Add a stub Rust crate.
diff --git a/changelog.d/13480.doc b/changelog.d/13480.doc
new file mode 100644
index 0000000000..ae5df16367
--- /dev/null
+++ b/changelog.d/13480.doc
@@ -0,0 +1 @@
+Note that `libpq` is required on ARM-based Macs.
diff --git a/changelog.d/13506.bugfix b/changelog.d/13506.bugfix
new file mode 100644
index 0000000000..2e43668865
--- /dev/null
+++ b/changelog.d/13506.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse v1.41.0 where the `/hierarchy` API returned non-standard information (a `room_id` field under each entry in `children_state`).
\ No newline at end of file
diff --git a/changelog.d/13672.feature b/changelog.d/13672.feature
new file mode 100644
index 0000000000..2334e6fe15
--- /dev/null
+++ b/changelog.d/13672.feature
@@ -0,0 +1 @@
+Add admin APIs to fetch messages within a particular window of time.
diff --git a/changelog.d/13680.feature b/changelog.d/13680.feature
new file mode 100644
index 0000000000..4234c7e082
--- /dev/null
+++ b/changelog.d/13680.feature
@@ -0,0 +1 @@
+Cancel the processing of key query requests when they time out.
\ No newline at end of file
diff --git a/changelog.d/13687.feature b/changelog.d/13687.feature
new file mode 100644
index 0000000000..dac53ec122
--- /dev/null
+++ b/changelog.d/13687.feature
@@ -0,0 +1 @@
+Improve validation of request bodies for the following client-server API endpoints: [`/account/3pid/msisdn/requestToken`](https://spec.matrix.org/v1.3/client-server-api/#post_matrixclientv3account3pidmsisdnrequesttoken) and [`/org.matrix.msc3720/account_status`](https://github.com/matrix-org/matrix-spec-proposals/blob/babolivier/user_status/proposals/3720-account-status.md#post-_matrixclientv1account_status).
\ No newline at end of file
diff --git a/changelog.d/13703.misc b/changelog.d/13703.misc
new file mode 100644
index 0000000000..685a29b17d
--- /dev/null
+++ b/changelog.d/13703.misc
@@ -0,0 +1 @@
+Add & populate `event_stream_ordering` column on receipts table for future optimisation of push action processing. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/changelog.d/13706.misc b/changelog.d/13706.misc
new file mode 100644
index 0000000000..65c854c7a9
--- /dev/null
+++ b/changelog.d/13706.misc
@@ -0,0 +1 @@
+Rename the `EventFormatVersions` enum values so that they line up with room version numbers.
\ No newline at end of file
diff --git a/changelog.d/13707.misc b/changelog.d/13707.misc
new file mode 100644
index 0000000000..e72c322d2e
--- /dev/null
+++ b/changelog.d/13707.misc
@@ -0,0 +1 @@
+Update trial old deps CI to use poetry 1.2.0.
diff --git a/changelog.d/13714.misc b/changelog.d/13714.misc
new file mode 100644
index 0000000000..07ace50b12
--- /dev/null
+++ b/changelog.d/13714.misc
@@ -0,0 +1 @@
+Add experimental configuration option to allow disabling legacy Prometheus metric names.
\ No newline at end of file
diff --git a/changelog.d/13717.misc b/changelog.d/13717.misc
new file mode 100644
index 0000000000..07ace50b12
--- /dev/null
+++ b/changelog.d/13717.misc
@@ -0,0 +1 @@
+Add experimental configuration option to allow disabling legacy Prometheus metric names.
\ No newline at end of file
diff --git a/changelog.d/13718.misc b/changelog.d/13718.misc
new file mode 100644
index 0000000000..07ace50b12
--- /dev/null
+++ b/changelog.d/13718.misc
@@ -0,0 +1 @@
+Add experimental configuration option to allow disabling legacy Prometheus metric names.
\ No newline at end of file
diff --git a/changelog.d/13724.misc b/changelog.d/13724.misc
new file mode 100644
index 0000000000..2c4f6b19f6
--- /dev/null
+++ b/changelog.d/13724.misc
@@ -0,0 +1 @@
+Fix typechecking with latest types-jsonschema.
diff --git a/changelog.d/13725.misc b/changelog.d/13725.misc
new file mode 100644
index 0000000000..e72c322d2e
--- /dev/null
+++ b/changelog.d/13725.misc
@@ -0,0 +1 @@
+Update trial old deps CI to use poetry 1.2.0.
diff --git a/changelog.d/13726.doc b/changelog.d/13726.doc
new file mode 100644
index 0000000000..ab840e1a92
--- /dev/null
+++ b/changelog.d/13726.doc
@@ -0,0 +1 @@
+Fix a mistake in the config manual: the `event_cache_size` _is_ scaled by `caches.global_factor`. The documentation was incorrect since Synapse 1.22.
diff --git a/changelog.d/13727.doc b/changelog.d/13727.doc
new file mode 100644
index 0000000000..ba530b409d
--- /dev/null
+++ b/changelog.d/13727.doc
@@ -0,0 +1 @@
+Fix a typo in the documentation for the login ratelimiting configuration.
diff --git a/changelog.d/13728.doc b/changelog.d/13728.doc
new file mode 100644
index 0000000000..75ca7b7ec3
--- /dev/null
+++ b/changelog.d/13728.doc
@@ -0,0 +1 @@
+Define Synapse's compatability policy for SQLite versions.
diff --git a/changelog.d/13729.misc b/changelog.d/13729.misc
new file mode 100644
index 0000000000..c6a6f617e3
--- /dev/null
+++ b/changelog.d/13729.misc
@@ -0,0 +1 @@
+Strip number suffix from instance name to consolidate services that traces are spread over.
diff --git a/changelog.d/13730.misc b/changelog.d/13730.misc
new file mode 100644
index 0000000000..06da6581a4
--- /dev/null
+++ b/changelog.d/13730.misc
@@ -0,0 +1 @@
+Instrument `get_metadata_for_events` for understandable traces in Jaeger.
diff --git a/changelog.d/13734.misc b/changelog.d/13734.misc
new file mode 100644
index 0000000000..2e0dd68a0f
--- /dev/null
+++ b/changelog.d/13734.misc
@@ -0,0 +1 @@
+Add a stub Rust crate.
diff --git a/changelog.d/13735.misc b/changelog.d/13735.misc
new file mode 100644
index 0000000000..2e0dd68a0f
--- /dev/null
+++ b/changelog.d/13735.misc
@@ -0,0 +1 @@
+Add a stub Rust crate.
diff --git a/changelog.d/13738.bugfix b/changelog.d/13738.bugfix
new file mode 100644
index 0000000000..d64fa0b4de
--- /dev/null
+++ b/changelog.d/13738.bugfix
@@ -0,0 +1 @@
+Fix a bug where Synapse fails to start if a signing key file contains an empty line.
\ No newline at end of file
diff --git a/changelog.d/13741.feature b/changelog.d/13741.feature
new file mode 100644
index 0000000000..dff46f373f
--- /dev/null
+++ b/changelog.d/13741.feature
@@ -0,0 +1 @@
+Document the timestamp when a user accepts the consent, if [consent tracking](https://matrix-org.github.io/synapse/latest/consent_tracking.html) is used.
\ No newline at end of file
diff --git a/changelog.d/13743.misc b/changelog.d/13743.misc
new file mode 100644
index 0000000000..2e0dd68a0f
--- /dev/null
+++ b/changelog.d/13743.misc
@@ -0,0 +1 @@
+Add a stub Rust crate.
diff --git a/changelog.d/13745.misc b/changelog.d/13745.misc
new file mode 100644
index 0000000000..e97a789c0e
--- /dev/null
+++ b/changelog.d/13745.misc
@@ -0,0 +1 @@
+Remove old queries to join room memberships to current state events. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/changelog.d/13746.bugfix b/changelog.d/13746.bugfix
new file mode 100644
index 0000000000..b692af8fd5
--- /dev/null
+++ b/changelog.d/13746.bugfix
@@ -0,0 +1 @@
+Fix a long standing bug where Synapse would fail to handle malformed user IDs or room aliases gracefully in certain cases.
diff --git a/changelog.d/13748.misc b/changelog.d/13748.misc
new file mode 100644
index 0000000000..2f419bb659
--- /dev/null
+++ b/changelog.d/13748.misc
@@ -0,0 +1 @@
+Avoid raising an error due to malformed user IDs in `get_current_hosts_in_room`. Malformed user IDs cannot currently join a room, so this error would not be hit.
diff --git a/changelog.d/13750.misc b/changelog.d/13750.misc
new file mode 100644
index 0000000000..3bccc21fc5
--- /dev/null
+++ b/changelog.d/13750.misc
@@ -0,0 +1 @@
+Update the docstrings for `get_users_in_room` and `get_current_hosts_in_room` to explain the impact of partial state.
diff --git a/changelog.d/13752.misc b/changelog.d/13752.misc
new file mode 100644
index 0000000000..7624861b9f
--- /dev/null
+++ b/changelog.d/13752.misc
@@ -0,0 +1 @@
+User an additional database query when persisting receipts.
diff --git a/changelog.d/13754.misc b/changelog.d/13754.misc
new file mode 100644
index 0000000000..662ee00e99
--- /dev/null
+++ b/changelog.d/13754.misc
@@ -0,0 +1 @@
+Re-type hint some collections as read-only.
diff --git a/changelog.d/13756.misc b/changelog.d/13756.misc
new file mode 100644
index 0000000000..06e9cd09bf
--- /dev/null
+++ b/changelog.d/13756.misc
@@ -0,0 +1 @@
+Remove unused Prometheus recording rules from `synapse-v2.rules` and add comments describing where the rest are used.
\ No newline at end of file
diff --git a/changelog.d/13759.misc b/changelog.d/13759.misc
new file mode 100644
index 0000000000..f91c512483
--- /dev/null
+++ b/changelog.d/13759.misc
@@ -0,0 +1 @@
+Add a check for editable installs if the Rust library needs rebuilding.
diff --git a/changelog.d/13760.removal b/changelog.d/13760.removal
new file mode 100644
index 0000000000..624e7c3678
--- /dev/null
+++ b/changelog.d/13760.removal
@@ -0,0 +1 @@
+Synapse will now refuse to start if configured to use SQLite < 3.27.
diff --git a/changelog.d/13761.misc b/changelog.d/13761.misc
new file mode 100644
index 0000000000..f7aa8c459a
--- /dev/null
+++ b/changelog.d/13761.misc
@@ -0,0 +1 @@
+Tag traces with the instance name to be able to easily jump into the right logs and filter traces by instance.
diff --git a/changelog.d/13763.misc b/changelog.d/13763.misc
new file mode 100644
index 0000000000..2e0dd68a0f
--- /dev/null
+++ b/changelog.d/13763.misc
@@ -0,0 +1 @@
+Add a stub Rust crate.
diff --git a/changelog.d/13765.misc b/changelog.d/13765.misc
new file mode 100644
index 0000000000..fdda5cf3b6
--- /dev/null
+++ b/changelog.d/13765.misc
@@ -0,0 +1 @@
+Concurrently fetch room push actions when calculating badge counts. Contributed by Nick @ Beeper (@fizzadar).
diff --git a/changelog.d/13769.misc b/changelog.d/13769.misc
new file mode 100644
index 0000000000..2e0dd68a0f
--- /dev/null
+++ b/changelog.d/13769.misc
@@ -0,0 +1 @@
+Add a stub Rust crate.
diff --git a/changelog.d/13770.misc b/changelog.d/13770.misc
new file mode 100644
index 0000000000..36ac91400a
--- /dev/null
+++ b/changelog.d/13770.misc
@@ -0,0 +1 @@
+Update the script which makes full schema dumps.
diff --git a/changelog.d/13778.misc b/changelog.d/13778.misc
new file mode 100644
index 0000000000..2e0dd68a0f
--- /dev/null
+++ b/changelog.d/13778.misc
@@ -0,0 +1 @@
+Add a stub Rust crate.
diff --git a/changelog.d/13784.misc b/changelog.d/13784.misc
new file mode 100644
index 0000000000..e7a542cd80
--- /dev/null
+++ b/changelog.d/13784.misc
@@ -0,0 +1 @@
+Simplify the dependency DAG in the tests workflow.
diff --git a/contrib/grafana/synapse.json b/contrib/grafana/synapse.json
index 248cd6d9ad..58061e2fce 100644
--- a/contrib/grafana/synapse.json
+++ b/contrib/grafana/synapse.json
@@ -335,7 +335,7 @@
           "datasource": {
             "uid": "$datasource"
           },
-          "expr": "sum(rate(synapse_storage_events_persisted_events{instance=\"$instance\"}[$bucket_size]))",
+          "expr": "sum(rate(synapse_storage_events_persisted_events_total{instance=\"$instance\"}[$bucket_size]))",
           "hide": false,
           "instant": false,
           "legendFormat": "Events",
@@ -1423,7 +1423,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_background_process_ru_utime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])+rate(synapse_background_process_ru_stime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_background_process_ru_utime_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])+rate(synapse_background_process_ru_stime_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "hide": false,
               "instant": false,
@@ -1804,7 +1804,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "sum(rate(synapse_storage_events_persisted_events{instance=\"$instance\"}[$bucket_size])) without (job,index)",
+              "expr": "sum(rate(synapse_storage_events_persisted_events_total{instance=\"$instance\"}[$bucket_size])) without (job,index)",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -2437,7 +2437,7 @@
                 "uid": "$datasource"
               },
               "exemplar": false,
-              "expr": "sum(rate(synapse_state_res_db_for_biggest_room_seconds{instance=\"$instance\"}[1m]))",
+              "expr": "sum(rate(synapse_state_res_db_for_biggest_room_seconds_total{instance=\"$instance\"}[1m]))",
               "format": "time_series",
               "hide": false,
               "instant": false,
@@ -2451,7 +2451,7 @@
                 "uid": "$datasource"
               },
               "exemplar": false,
-              "expr": "sum(rate(synapse_state_res_cpu_for_biggest_room_seconds{instance=\"$instance\"}[1m]))",
+              "expr": "sum(rate(synapse_state_res_cpu_for_biggest_room_seconds_total{instance=\"$instance\"}[1m]))",
               "format": "time_series",
               "hide": false,
               "instant": false,
@@ -3425,7 +3425,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_background_process_ru_utime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])+rate(synapse_background_process_ru_stime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_background_process_ru_utime_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])+rate(synapse_background_process_ru_stime_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 1,
@@ -3518,7 +3518,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_background_process_db_txn_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) +  rate(synapse_background_process_db_sched_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_background_process_db_txn_duration_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) +  rate(synapse_background_process_db_sched_duration_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "hide": false,
               "intervalFactor": 1,
@@ -3726,7 +3726,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "sum(rate(synapse_federation_client_sent_transactions{instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_federation_client_sent_transactions_total{instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "successful txn rate",
@@ -3736,7 +3736,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "sum(rate(synapse_util_metrics_block_count{block_name=\"_send_new_transaction\",instance=\"$instance\"}[$bucket_size]) - ignoring (block_name) rate(synapse_federation_client_sent_transactions{instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_util_metrics_block_count_total{block_name=\"_send_new_transaction\",instance=\"$instance\"}[$bucket_size]) - ignoring (block_name) rate(synapse_federation_client_sent_transactions_total{instance=\"$instance\"}[$bucket_size]))",
               "legendFormat": "failed txn rate",
               "refId": "B"
             }
@@ -3826,7 +3826,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "sum(rate(synapse_federation_server_received_pdus{instance=~\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_federation_server_received_pdus_total{instance=~\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "pdus",
@@ -3836,7 +3836,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "sum(rate(synapse_federation_server_received_edus{instance=~\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_federation_server_received_edus_total{instance=~\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "edus",
@@ -3928,7 +3928,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "sum(rate(synapse_federation_client_sent_pdu_destinations:total{instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_federation_client_sent_pdu_destinations:total_total{instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 1,
@@ -3939,7 +3939,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "sum(rate(synapse_federation_client_sent_edus{instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_federation_client_sent_edus_total{instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "edus",
@@ -5042,7 +5042,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_http_httppusher_http_pushes_processed{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) and on (instance, job, index) (synapse_http_httppusher_http_pushes_failed + synapse_http_httppusher_http_pushes_processed) > 0",
+              "expr": "rate(synapse_http_httppusher_http_pushes_processed_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) and on (instance, job, index) (synapse_http_httppusher_http_pushes_failed_total + synapse_http_httppusher_http_pushes_processed_total) > 0",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -5054,7 +5054,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_http_httppusher_http_pushes_failed{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) and on (instance, job, index) (synapse_http_httppusher_http_pushes_failed + synapse_http_httppusher_http_pushes_processed) > 0",
+              "expr": "rate(synapse_http_httppusher_http_pushes_failed_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) and on (instance, job, index) (synapse_http_httppusher_http_pushes_failed_total + synapse_http_httppusher_http_pushes_processed_total) > 0",
               "format": "time_series",
               "intervalFactor": 2,
               "legendFormat": "failed {{job}}",
@@ -5268,12 +5268,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "exemplar": true,
-              "expr": "sum(rate(synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter_total{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
               "legendFormat": "{{index}}",
-              "metric": "synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter",
+              "metric": "synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter_total",
               "refId": "A",
               "step": 2
             }
@@ -5369,12 +5369,12 @@
                 "uid": "$datasource"
               },
               "exemplar": true,
-              "expr": "sum(rate(synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter_total{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
               "legendFormat": "{{index}}",
-              "metric": "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter",
+              "metric": "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter_total",
               "refId": "A",
               "step": 2
             }
@@ -5475,12 +5475,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "exemplar": true,
-              "expr": "sum(rate(synapse_util_caches_cache:hits{job=\"$job\",index=~\"$index\",name=\"push_rules_delta_state_cache_metric\",instance=\"$instance\"}[$bucket_size]))/sum(rate(synapse_util_caches_cache:total{job=\"$job\",index=~\"$index\", name=\"push_rules_delta_state_cache_metric\",instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_util_caches_cache_hits{job=\"$job\",index=~\"$index\",name=\"push_rules_delta_state_cache_metric\",instance=\"$instance\"}[$bucket_size]))/sum(rate(synapse_util_caches_cache{job=\"$job\",index=~\"$index\", name=\"push_rules_delta_state_cache_metric\",instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
               "legendFormat": "Hit Rate",
-              "metric": "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter",
+              "metric": "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter_total",
               "refId": "A",
               "step": 2
             },
@@ -5490,7 +5490,7 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "exemplar": true,
-              "expr": "sum(rate(synapse_util_caches_cache:total{job=\"$job\",index=~\"$index\", name=\"push_rules_delta_state_cache_metric\",instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_util_caches_cache{job=\"$job\",index=~\"$index\", name=\"push_rules_delta_state_cache_metric\",instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -5598,12 +5598,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "exemplar": true,
-              "expr": "sum(rate(synapse_util_caches_cache:hits{job=\"$job\",index=~\"$index\",name=\"room_push_rule_cache\",instance=\"$instance\"}[$bucket_size]))/sum(rate(synapse_util_caches_cache:total{job=\"$job\",index=~\"$index\", name=\"room_push_rule_cache\",instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_util_caches_cache_hits{job=\"$job\",index=~\"$index\",name=\"room_push_rule_cache\",instance=\"$instance\"}[$bucket_size]))/sum(rate(synapse_util_caches_cache{job=\"$job\",index=~\"$index\", name=\"room_push_rule_cache\",instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
               "legendFormat": "Hit Rate",
-              "metric": "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter",
+              "metric": "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter_total",
               "refId": "A",
               "step": 2
             },
@@ -5613,7 +5613,7 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "exemplar": true,
-              "expr": "sum(rate(synapse_util_caches_cache:total{job=\"$job\",index=~\"$index\", name=\"room_push_rule_cache\",instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_util_caches_cache{job=\"$job\",index=~\"$index\", name=\"room_push_rule_cache\",instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -5719,12 +5719,12 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "exemplar": true,
-              "expr": "sum(rate(synapse_util_caches_cache:hits{job=\"$job\",index=~\"$index\",name=\"_get_rules_for_room\",instance=\"$instance\"}[$bucket_size]))/sum(rate(synapse_util_caches_cache:total{job=\"$job\",index=~\"$index\", name=\"_get_rules_for_room\",instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_util_caches_cache_hits{job=\"$job\",index=~\"$index\",name=\"_get_rules_for_room\",instance=\"$instance\"}[$bucket_size]))/sum(rate(synapse_util_caches_cache{job=\"$job\",index=~\"$index\", name=\"_get_rules_for_room\",instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
               "legendFormat": "Hit Rate",
-              "metric": "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter",
+              "metric": "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter_total",
               "refId": "A",
               "step": 2
             },
@@ -5734,7 +5734,7 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "exemplar": true,
-              "expr": "sum(rate(synapse_util_caches_cache:total{job=\"$job\",index=~\"$index\", name=\"_get_rules_for_room\",instance=\"$instance\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_util_caches_cache{job=\"$job\",index=~\"$index\", name=\"_get_rules_for_room\",instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -6087,7 +6087,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "topk(10, rate(synapse_storage_transaction_time_count{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
+              "expr": "topk(10, rate(synapse_storage_transaction_time_count_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -6187,7 +6187,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_storage_transaction_time_sum{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_storage_transaction_time_sum_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "instant": false,
               "interval": "",
@@ -6287,7 +6287,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_storage_transaction_time_sum{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])/rate(synapse_storage_transaction_time_count{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_storage_transaction_time_sum_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])/rate(synapse_storage_transaction_time_count_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "instant": false,
               "interval": "",
@@ -6538,7 +6538,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_util_metrics_block_ru_utime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\",block_name!=\"wrapped_request_handler\"}[$bucket_size]) + rate(synapse_util_metrics_block_ru_stime_seconds[$bucket_size])",
+              "expr": "rate(synapse_util_metrics_block_ru_utime_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\",block_name!=\"wrapped_request_handler\"}[$bucket_size]) + rate(synapse_util_metrics_block_ru_stime_seconds_total[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -6636,7 +6636,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "(rate(synapse_util_metrics_block_ru_utime_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) + rate(synapse_util_metrics_block_ru_stime_seconds[$bucket_size])) / rate(synapse_util_metrics_block_count[$bucket_size])",
+              "expr": "(rate(synapse_util_metrics_block_ru_utime_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) + rate(synapse_util_metrics_block_ru_stime_seconds_total[$bucket_size])) / rate(synapse_util_metrics_block_count_total[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -6737,7 +6737,7 @@
                 "uid": "${DS_PROMETHEUS}"
               },
               "exemplar": true,
-              "expr": "rate(synapse_util_metrics_block_db_txn_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_util_metrics_block_db_txn_duration_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -6839,7 +6839,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_util_metrics_block_db_txn_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) / rate(synapse_util_metrics_block_db_txn_count{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_util_metrics_block_db_txn_duration_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) / rate(synapse_util_metrics_block_db_txn_count_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -6936,7 +6936,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_util_metrics_block_db_txn_duration_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) / rate(synapse_util_metrics_block_db_txn_count{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_util_metrics_block_db_txn_duration_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) / rate(synapse_util_metrics_block_db_txn_count_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -7033,7 +7033,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_util_metrics_block_time_seconds{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) / rate(synapse_util_metrics_block_count{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_util_metrics_block_time_seconds_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]) / rate(synapse_util_metrics_block_count_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -7122,7 +7122,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_util_metrics_block_count{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_util_metrics_block_count_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "interval": "",
               "legendFormat": "{{job}}-{{index}} {{block_name}}",
               "refId": "A"
@@ -7246,7 +7246,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_util_caches_cache:hits{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])/rate(synapse_util_caches_cache:total{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
+              "expr": "rate(synapse_util_caches_cache_hits{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])/rate(synapse_util_caches_cache{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
               "format": "time_series",
               "intervalFactor": 2,
               "legendFormat": "{{name}} {{job}}-{{index}}",
@@ -7347,7 +7347,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "synapse_util_caches_cache:size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
+              "expr": "synapse_util_caches_cache_size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
               "format": "time_series",
               "hide": false,
               "interval": "",
@@ -7447,7 +7447,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_util_caches_cache:total{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
+              "expr": "rate(synapse_util_caches_cache{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -7547,7 +7547,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "topk(10, rate(synapse_util_caches_cache:total{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size]) - rate(synapse_util_caches_cache:hits{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size]))",
+              "expr": "topk(10, rate(synapse_util_caches_cache{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size]) - rate(synapse_util_caches_cache_hits{job=~\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size]))",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 2,
@@ -7643,7 +7643,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_util_caches_cache:evicted_size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_util_caches_cache_evicted_size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size])",
               "format": "time_series",
               "interval": "",
               "intervalFactor": 1,
@@ -7763,7 +7763,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "synapse_util_caches_response_cache:size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
+              "expr": "synapse_util_caches_response_cache_size{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}",
               "interval": "",
               "legendFormat": "{{name}} {{job}}-{{index}}",
               "refId": "A"
@@ -7853,7 +7853,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_util_caches_response_cache:hits{instance=\"$instance\", job=~\"$job\", index=~\"$index\"}[$bucket_size])/rate(synapse_util_caches_response_cache:total{instance=\"$instance\", job=~\"$job\", index=~\"$index\"}[$bucket_size])",
+              "expr": "rate(synapse_util_caches_response_cache_hits{instance=\"$instance\", job=~\"$job\", index=~\"$index\"}[$bucket_size])/rate(synapse_util_caches_response_cache{instance=\"$instance\", job=~\"$job\", index=~\"$index\"}[$bucket_size])",
               "interval": "",
               "legendFormat": "{{name}} {{job}}-{{index}}",
               "refId": "A"
@@ -9556,7 +9556,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "synapse_forward_extremities_bucket{instance=\"$instance\"} and on (index, instance, job) (synapse_storage_events_persisted_events > 0)",
+              "expr": "synapse_forward_extremities_bucket{instance=\"$instance\"} and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0)",
               "format": "heatmap",
               "intervalFactor": 1,
               "legendFormat": "{{le}}",
@@ -9716,7 +9716,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_storage_events_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events > 0)",
+              "expr": "rate(synapse_storage_events_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0)",
               "format": "heatmap",
               "intervalFactor": 1,
               "legendFormat": "{{le}}",
@@ -9793,7 +9793,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "histogram_quantile(0.5, rate(synapse_storage_events_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events > 0))",
+              "expr": "histogram_quantile(0.5, rate(synapse_storage_events_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "50%",
@@ -9803,7 +9803,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "histogram_quantile(0.75, rate(synapse_storage_events_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events > 0))",
+              "expr": "histogram_quantile(0.75, rate(synapse_storage_events_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "75%",
@@ -9813,7 +9813,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "histogram_quantile(0.90, rate(synapse_storage_events_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events > 0))",
+              "expr": "histogram_quantile(0.90, rate(synapse_storage_events_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "90%",
@@ -9823,7 +9823,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "histogram_quantile(0.99, rate(synapse_storage_events_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events > 0))",
+              "expr": "histogram_quantile(0.99, rate(synapse_storage_events_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "99%",
@@ -9905,7 +9905,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_storage_events_stale_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events > 0)",
+              "expr": "rate(synapse_storage_events_stale_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0)",
               "format": "heatmap",
               "intervalFactor": 1,
               "legendFormat": "{{le}}",
@@ -9982,7 +9982,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "histogram_quantile(0.5, rate(synapse_storage_events_stale_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events > 0))",
+              "expr": "histogram_quantile(0.5, rate(synapse_storage_events_stale_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "50%",
@@ -9992,7 +9992,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "histogram_quantile(0.75, rate(synapse_storage_events_stale_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events > 0))",
+              "expr": "histogram_quantile(0.75, rate(synapse_storage_events_stale_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "75%",
@@ -10002,7 +10002,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "histogram_quantile(0.90, rate(synapse_storage_events_stale_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events > 0))",
+              "expr": "histogram_quantile(0.90, rate(synapse_storage_events_stale_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "90%",
@@ -10012,7 +10012,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "histogram_quantile(0.99, rate(synapse_storage_events_stale_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events > 0))",
+              "expr": "histogram_quantile(0.99, rate(synapse_storage_events_stale_forward_extremities_persisted_bucket{instance=\"$instance\"}[$bucket_size]) and on (index, instance, job) (synapse_storage_events_persisted_events_total > 0))",
               "format": "time_series",
               "intervalFactor": 1,
               "legendFormat": "99%",
@@ -10297,7 +10297,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "sum(rate(synapse_storage_events_state_resolutions_during_persistence{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_storage_events_state_resolutions_during_persistence_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
               "interval": "",
               "legendFormat": "State res ",
               "refId": "A"
@@ -10306,7 +10306,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "sum(rate(synapse_storage_events_potential_times_prune_extremities{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_storage_events_potential_times_prune_extremities_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
               "interval": "",
               "legendFormat": "Potential to prune",
               "refId": "B"
@@ -10315,7 +10315,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "sum(rate(synapse_storage_events_times_pruned_extremities{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
+              "expr": "sum(rate(synapse_storage_events_times_pruned_extremities_total{instance=\"$instance\",job=~\"$job\",index=~\"$index\"}[$bucket_size]))",
               "interval": "",
               "legendFormat": "Pruned",
               "refId": "C"
@@ -11069,7 +11069,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_handler_presence_notified_presence{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
+              "expr": "rate(synapse_handler_presence_notified_presence_total{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
               "interval": "",
               "legendFormat": "Notified",
               "refId": "A"
@@ -11078,7 +11078,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_handler_presence_federation_presence_out{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
+              "expr": "rate(synapse_handler_presence_federation_presence_out_total{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
               "interval": "",
               "legendFormat": "Remote ping",
               "refId": "B"
@@ -11087,7 +11087,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_handler_presence_presence_updates{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
+              "expr": "rate(synapse_handler_presence_presence_updates_total{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
               "interval": "",
               "legendFormat": "Total updates",
               "refId": "C"
@@ -11096,7 +11096,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_handler_presence_federation_presence{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
+              "expr": "rate(synapse_handler_presence_federation_presence_total{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
               "interval": "",
               "legendFormat": "Remote updates",
               "refId": "D"
@@ -11105,7 +11105,7 @@
               "datasource": {
                 "uid": "$datasource"
               },
-              "expr": "rate(synapse_handler_presence_bump_active_time{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
+              "expr": "rate(synapse_handler_presence_bump_active_time_total{job=\"$job\",index=~\"$index\",instance=\"$instance\"}[$bucket_size])",
               "interval": "",
               "legendFormat": "Bump active time",
               "refId": "E"
@@ -11789,7 +11789,7 @@
         "name": "instance",
         "options": [],
         "query": {
-          "query": "label_values(synapse_util_metrics_block_ru_utime_seconds, instance)",
+          "query": "label_values(synapse_util_metrics_block_ru_utime_seconds_total, instance)",
           "refId": "Prometheus-instance-Variable-Query"
         },
         "refresh": 2,
@@ -11818,7 +11818,7 @@
         "name": "job",
         "options": [],
         "query": {
-          "query": "label_values(synapse_util_metrics_block_ru_utime_seconds, job)",
+          "query": "label_values(synapse_util_metrics_block_ru_utime_seconds_total, job)",
           "refId": "Prometheus-job-Variable-Query"
         },
         "refresh": 2,
@@ -11848,7 +11848,7 @@
         "name": "index",
         "options": [],
         "query": {
-          "query": "label_values(synapse_util_metrics_block_ru_utime_seconds, index)",
+          "query": "label_values(synapse_util_metrics_block_ru_utime_seconds_total, index)",
           "refId": "Prometheus-index-Variable-Query"
         },
         "refresh": 2,
@@ -11896,6 +11896,6 @@
   "timezone": "",
   "title": "Synapse",
   "uid": "000000012",
-  "version": 132,
+  "version": 133,
   "weekStart": ""
-}
\ No newline at end of file
+}
diff --git a/contrib/prometheus/synapse-v1.rules b/contrib/prometheus/synapse-v1.rules
deleted file mode 100644
index 4c900ba537..0000000000
--- a/contrib/prometheus/synapse-v1.rules
+++ /dev/null
@@ -1,21 +0,0 @@
-synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)
-synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)
-
-synapse_http_server_request_count:method{servlet=""} = sum(synapse_http_server_request_count) by (method)
-synapse_http_server_request_count:servlet{method=""} = sum(synapse_http_server_request_count) by (servlet)
-
-synapse_http_server_request_count:total{servlet=""} = sum(synapse_http_server_request_count:by_method) by (servlet)
-
-synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])
-synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])
-
-synapse_federation_client_sent{type="EDU"} = synapse_federation_client_sent_edus + 0
-synapse_federation_client_sent{type="PDU"} = synapse_federation_client_sent_pdu_destinations:count + 0
-synapse_federation_client_sent{type="Query"} = sum(synapse_federation_client_sent_queries) by (job)
-
-synapse_federation_server_received{type="EDU"} = synapse_federation_server_received_edus + 0
-synapse_federation_server_received{type="PDU"} = synapse_federation_server_received_pdus + 0
-synapse_federation_server_received{type="Query"} = sum(synapse_federation_server_received_queries) by (job)
-
-synapse_federation_transaction_queue_pending{type="EDU"} = synapse_federation_transaction_queue_pending_edus + 0
-synapse_federation_transaction_queue_pending{type="PDU"} = synapse_federation_transaction_queue_pending_pdus + 0
diff --git a/contrib/prometheus/synapse-v2.rules b/contrib/prometheus/synapse-v2.rules
index 7e405bf7f0..cbe6f7beba 100644
--- a/contrib/prometheus/synapse-v2.rules
+++ b/contrib/prometheus/synapse-v2.rules
@@ -1,55 +1,35 @@
 groups:
 - name: synapse
   rules:
-  - record: "synapse_federation_transaction_queue_pendingEdus:total"
-    expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)"
-  - record: "synapse_federation_transaction_queue_pendingPdus:total"
-    expr:   "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)"
-  - record: 'synapse_http_server_request_count:method'
-    labels:
-      servlet: ""
-    expr: "sum(synapse_http_server_request_count) by (method)"
-  - record: 'synapse_http_server_request_count:servlet'
-    labels:
-      method: ""
-    expr: 'sum(synapse_http_server_request_count) by (servlet)'
-
-  - record: 'synapse_http_server_request_count:total'
-    labels:
-      servlet: ""
-    expr: 'sum(synapse_http_server_request_count:by_method) by (servlet)'
-
-  - record: 'synapse_cache:hit_ratio_5m'
-    expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])'
-  - record: 'synapse_cache:hit_ratio_30s'
-    expr: 'rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])'
-
+  # These 3 rules are used in the included Prometheus console
   - record: 'synapse_federation_client_sent'
     labels:
       type: "EDU"
-    expr: 'synapse_federation_client_sent_edus + 0'
+    expr: 'synapse_federation_client_sent_edus_total + 0'
   - record: 'synapse_federation_client_sent'
     labels:
       type: "PDU"
-    expr: 'synapse_federation_client_sent_pdu_destinations:count + 0'
+    expr: 'synapse_federation_client_sent_pdu_destinations_count_total + 0'
   - record: 'synapse_federation_client_sent'
     labels:
       type: "Query"
     expr: 'sum(synapse_federation_client_sent_queries) by (job)'
 
+  # These 3 rules are used in the included Prometheus console
   - record: 'synapse_federation_server_received'
     labels:
       type: "EDU"
-    expr: 'synapse_federation_server_received_edus + 0'
+    expr: 'synapse_federation_server_received_edus_total + 0'
   - record: 'synapse_federation_server_received'
     labels:
       type: "PDU"
-    expr: 'synapse_federation_server_received_pdus + 0'
+    expr: 'synapse_federation_server_received_pdus_total + 0'
   - record: 'synapse_federation_server_received'
     labels:
       type: "Query"
     expr: 'sum(synapse_federation_server_received_queries) by (job)'
 
+  # These 2 rules are used in the included Prometheus console
   - record: 'synapse_federation_transaction_queue_pending'
     labels:
       type: "EDU"
@@ -59,20 +39,25 @@ groups:
       type: "PDU"
     expr: 'synapse_federation_transaction_queue_pending_pdus + 0'
 
+  # These 3 rules are used in the included Grafana dashboard
   - record: synapse_storage_events_persisted_by_source_type
-    expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep{origin_type="remote"})
+    expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_type="remote"})
     labels:
       type: remote
   - record: synapse_storage_events_persisted_by_source_type
-    expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep{origin_entity="*client*",origin_type="local"})
+    expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_entity="*client*",origin_type="local"})
     labels:
       type: local
   - record: synapse_storage_events_persisted_by_source_type
-    expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep{origin_entity!="*client*",origin_type="local"})
+    expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_entity!="*client*",origin_type="local"})
     labels:
       type: bridges
+
+  # This rule is used in the included Grafana dashboard
   - record: synapse_storage_events_persisted_by_event_type
-    expr: sum without(origin_entity, origin_type) (synapse_storage_events_persisted_events_sep)
+    expr: sum without(origin_entity, origin_type) (synapse_storage_events_persisted_events_sep_total)
+
+  # This rule is used in the included Grafana dashboard
   - record: synapse_storage_events_persisted_by_origin
-    expr: sum without(type) (synapse_storage_events_persisted_events_sep)
+    expr: sum without(type) (synapse_storage_events_persisted_events_sep_total)
 
diff --git a/debian/build_virtualenv b/debian/build_virtualenv
index ed916ac97a..dd97e888ba 100755
--- a/debian/build_virtualenv
+++ b/debian/build_virtualenv
@@ -61,7 +61,7 @@ dh_virtualenv \
     --extras="all,systemd,test" \
     --requirements="exported_requirements.txt"
 
-PACKAGE_BUILD_DIR="debian/matrix-synapse-py3"
+PACKAGE_BUILD_DIR="$(pwd)/debian/matrix-synapse-py3"
 VIRTUALENV_DIR="${PACKAGE_BUILD_DIR}${DH_VIRTUALENV_INSTALL_ROOT}/matrix-synapse"
 TARGET_PYTHON="${VIRTUALENV_DIR}/bin/python"
 
@@ -78,9 +78,14 @@ case "$DEB_BUILD_OPTIONS" in
 
         cp -r tests "$tmpdir"
 
+        # To avoid pulling in the unbuilt Synapse in the local directory
+        pushd /
+
         PYTHONPATH="$tmpdir" \
             "${TARGET_PYTHON}" -m twisted.trial --reporter=text -j2 tests
 
+        popd
+
         ;;
 esac
 
diff --git a/debian/changelog b/debian/changelog
index 77c8f7ad0b..0b2ad35bc1 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -22,11 +22,15 @@ matrix-synapse-py3 (1.66.0) stable; urgency=medium
 
 matrix-synapse-py3 (1.66.0~rc2+nmu1) UNRELEASED; urgency=medium
 
+  [ Jörg Behrmann ]
   * Update debhelper to compatibility level 12.
   * Drop the preinst script stopping synapse.
   * Allocate a group for the system user.
   * Change dpkg-statoverride to --force-statoverride-add.
 
+  [ Erik Johnston ]
+  * Disable `dh_auto_configure` as it broke during Rust build.
+
  -- Jörg Behrmann <behrmann@physik.fu-berlin.de>  Tue, 23 Aug 2022 17:17:00 +0100
 
 matrix-synapse-py3 (1.66.0~rc2) stable; urgency=medium
diff --git a/debian/rules b/debian/rules
index 3b79d56074..914d068f2a 100755
--- a/debian/rules
+++ b/debian/rules
@@ -12,6 +12,8 @@ override_dh_installsystemd:
 # we don't really want to strip the symbols from our object files.
 override_dh_strip:
 
+override_dh_auto_configure:
+
 # many libraries pulled from PyPI have allocatable sections after
 # non-allocatable ones on which dwz errors out. For those without the issue the
 # gains are only marginal
diff --git a/docker/Dockerfile b/docker/Dockerfile
index b87d263cff..a057bf397b 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -92,11 +92,20 @@ RUN \
     libxml++2.6-dev \
     libxslt1-dev \
     openssl \
-    rustc \
     zlib1g-dev \
     git \
+    curl \
     && rm -rf /var/lib/apt/lists/*
 
+
+# Install rust and ensure its in the PATH
+ENV RUSTUP_HOME=/rust
+ENV CARGO_HOME=/cargo
+ENV PATH=/cargo/bin:/rust/bin:$PATH
+RUN mkdir /rust /cargo
+
+RUN curl -sSf https://sh.rustup.rs | sh -s -- -y --no-modify-path --default-toolchain stable
+
 # To speed up rebuilds, install all of the dependencies before we copy over
 # the whole synapse project, so that this layer in the Docker cache can be
 # used while you develop on the source
@@ -108,8 +117,9 @@ RUN --mount=type=cache,target=/root/.cache/pip \
 
 # Copy over the rest of the synapse source code.
 COPY synapse /synapse/synapse/
+COPY rust /synapse/rust/
 # ... and what we need to `pip install`.
-COPY pyproject.toml README.rst /synapse/
+COPY pyproject.toml README.rst build_rust.py /synapse/
 
 # Repeat of earlier build argument declaration, as this is a new build stage.
 ARG TEST_ONLY_IGNORE_POETRY_LOCKFILE
diff --git a/docker/Dockerfile-dhvirtualenv b/docker/Dockerfile-dhvirtualenv
index fbc1d2346f..ca3a259081 100644
--- a/docker/Dockerfile-dhvirtualenv
+++ b/docker/Dockerfile-dhvirtualenv
@@ -72,6 +72,7 @@ RUN apt-get update -qq -o Acquire::Languages=none \
     && env DEBIAN_FRONTEND=noninteractive apt-get install \
         -yqq --no-install-recommends -o Dpkg::Options::=--force-unsafe-io \
         build-essential \
+        curl \
         debhelper \
         devscripts \
         libsystemd-dev \
@@ -85,6 +86,15 @@ RUN apt-get update -qq -o Acquire::Languages=none \
         libpq-dev \
         xmlsec1
 
+# Install rust and ensure it's in the PATH
+ENV RUSTUP_HOME=/rust
+ENV CARGO_HOME=/cargo
+ENV PATH=/cargo/bin:/rust/bin:$PATH
+RUN mkdir /rust /cargo
+
+RUN curl -sSf https://sh.rustup.rs | sh -s -- -y --no-modify-path --default-toolchain stable
+
+
 COPY --from=builder /dh-virtualenv_1.2.2-1_all.deb /
 
 # install dhvirtualenv. Update the apt cache again first, in case we got a
diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md
index 7526956bec..8f727b363e 100644
--- a/docs/admin_api/rooms.md
+++ b/docs/admin_api/rooms.md
@@ -393,6 +393,151 @@ A response body like the following is returned:
 }
 ```
 
+# Room Messages API
+
+The Room Messages admin API allows server admins to get all messages
+sent to a room in a given timeframe. There are various parameters available
+that allow for filtering and ordering the returned list. This API supports pagination.
+
+To use it, you will need to authenticate by providing an `access_token`
+for a server admin: see [Admin API](../usage/administration/admin_api).
+
+This endpoint mirrors the [Matrix Spec defined Messages API](https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3roomsroomidmessages).
+
+The API is:
+```
+GET /_synapse/admin/v1/rooms/<room_id>/messages
+```
+
+**Parameters**
+
+The following path parameters are required:
+
+* `room_id` - The ID of the room you wish you fetch messages from.
+
+The following query parameters are available:
+
+* `from` (required) - The token to start returning events from. This token can be obtained from a prev_batch
+  or next_batch token returned by the /sync endpoint, or from an end token returned by a previous request to this endpoint.
+* `to` - The token to spot returning events at.
+* `limit` - The maximum number of events to return. Defaults to `10`.
+* `filter` - A JSON RoomEventFilter to filter returned events with.
+* `dir` - The direction to return events from. Either `f` for forwards or `b` for backwards. Setting
+  this value to `b` will reverse the above sort order. Defaults to `f`.
+
+**Response**
+
+The following fields are possible in the JSON response body:
+
+* `chunk` - A list of room events. The order depends on the dir parameter.
+          Note that an empty chunk does not necessarily imply that no more events are available. Clients should continue to paginate until no end property is returned.
+* `end` - A token corresponding to the end of chunk. This token can be passed back to this endpoint to request further events.
+          If no further events are available, this property is omitted from the response.
+* `start` - A token corresponding to the start of chunk.
+* `state` - A list of state events relevant to showing the chunk.
+
+**Example**
+
+For more details on each chunk, read [the Matrix specification](https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3roomsroomidmessages).
+
+```json
+{
+  "chunk": [
+    {
+      "content": {
+        "body": "This is an example text message",
+        "format": "org.matrix.custom.html",
+        "formatted_body": "<b>This is an example text message</b>",
+        "msgtype": "m.text"
+      },
+      "event_id": "$143273582443PhrSn:example.org",
+      "origin_server_ts": 1432735824653,
+      "room_id": "!636q39766251:example.com",
+      "sender": "@example:example.org",
+      "type": "m.room.message",
+      "unsigned": {
+        "age": 1234
+      }
+    },
+    {
+      "content": {
+        "name": "The room name"
+      },
+      "event_id": "$143273582443PhrSn:example.org",
+      "origin_server_ts": 1432735824653,
+      "room_id": "!636q39766251:example.com",
+      "sender": "@example:example.org",
+      "state_key": "",
+      "type": "m.room.name",
+      "unsigned": {
+        "age": 1234
+      }
+    },
+    {
+      "content": {
+        "body": "Gangnam Style",
+        "info": {
+          "duration": 2140786,
+          "h": 320,
+          "mimetype": "video/mp4",
+          "size": 1563685,
+          "thumbnail_info": {
+            "h": 300,
+            "mimetype": "image/jpeg",
+            "size": 46144,
+            "w": 300
+          },
+          "thumbnail_url": "mxc://example.org/FHyPlCeYUSFFxlgbQYZmoEoe",
+          "w": 480
+        },
+        "msgtype": "m.video",
+        "url": "mxc://example.org/a526eYUSFFxlgbQYZmo442"
+      },
+      "event_id": "$143273582443PhrSn:example.org",
+      "origin_server_ts": 1432735824653,
+      "room_id": "!636q39766251:example.com",
+      "sender": "@example:example.org",
+      "type": "m.room.message",
+      "unsigned": {
+        "age": 1234
+      }
+    }
+  ],
+  "end": "t47409-4357353_219380_26003_2265",
+  "start": "t47429-4392820_219380_26003_2265"
+}
+```
+
+# Room Timestamp to Event API
+
+The Room Timestamp to Event API endpoint fetches the `event_id` of the closest event to the given
+timestamp (`ts` query parameter) in the given direction (`dir` query parameter).
+
+Useful for cases like jump to date so you can start paginating messages from
+a given date in the archive.
+
+The API is:
+```
+  GET /_synapse/admin/v1/rooms/<room_id>/timestamp_to_event
+```
+
+**Parameters**
+
+The following path parameters are required:
+
+* `room_id` - The ID of the room you wish to check.
+
+The following query parameters are available:
+
+* `ts` - a timestamp in milliseconds where we will find the closest event in
+  the given direction.
+* `dir` - can be `f` or `b` to indicate forwards and backwards in time from the
+  given timestamp. Defaults to `f`.
+
+**Response**
+
+* `event_id` - converted from timestamp
+
 # Block Room API
 The Block Room admin API allows server admins to block and unblock rooms,
 and query to see if a given room is blocked.
diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md
index c1ca0c8a64..975f05c929 100644
--- a/docs/admin_api/user_admin_api.md
+++ b/docs/admin_api/user_admin_api.md
@@ -42,6 +42,7 @@ It returns a JSON body like the following:
     "appservice_id": null,
     "consent_server_notice_sent": null,
     "consent_version": null,
+    "consent_ts": null,
     "external_ids": [
         {
             "auth_provider": "<provider1>",
@@ -364,6 +365,7 @@ The following actions are **NOT** performed. The list may be incomplete.
 - Remove the user's creation (registration) timestamp
 - [Remove rate limit overrides](#override-ratelimiting-for-users)
 - Remove from monthly active users
+- Remove user's consent information (consent version and timestamp)
 
 ## Reset password
 
diff --git a/docs/deprecation_policy.md b/docs/deprecation_policy.md
index 359dac07c3..46c18d7d32 100644
--- a/docs/deprecation_policy.md
+++ b/docs/deprecation_policy.md
@@ -1,9 +1,9 @@
 Deprecation Policy for Platform Dependencies
 ============================================
 
-Synapse has a number of platform dependencies, including Python and PostgreSQL.
-This document outlines the policy towards which versions we support, and when we
-drop support for versions in the future.
+Synapse has a number of platform dependencies, including Python, Rust, 
+PostgreSQL and SQLite. This document outlines the policy towards which versions 
+we support, and when we drop support for versions in the future.
 
 
 Policy
@@ -17,6 +17,14 @@ Details on the upstream support life cycles for Python and PostgreSQL are
 documented at [https://endoflife.date/python](https://endoflife.date/python) and
 [https://endoflife.date/postgresql](https://endoflife.date/postgresql).
 
+A Rust compiler is required to build Synapse from source. For any given release
+the minimum required version may be bumped up to a recent Rust version, and so
+people building from source should ensure they can fetch recent versions of Rust
+(e.g. by using [rustup](https://rustup.rs/)).
+
+The oldest supported version of SQLite is the version
+[provided](https://packages.debian.org/buster/libsqlite3-0) by
+[Debian oldstable](https://wiki.debian.org/DebianOldStable).
 
 Context
 -------
@@ -31,3 +39,15 @@ long process.
 By following the upstream support life cycles Synapse can ensure that its
 dependencies continue to get security patches, while not requiring system admins
 to constantly update their platform dependencies to the latest versions.
+
+For Rust, the situation is a bit different given that a) the Rust foundation
+does not generally support older Rust versions, and b) the library ecosystem
+generally bump their minimum support Rust versions frequently. In general, the
+Synapse team will try to avoid updating the dependency on Rust to the absolute
+latest version, but introducing a formal policy is hard given the constraints of
+the ecosystem.
+
+On a similar note, SQLite does not generally have a concept of "supported 
+release"; bugfixes are published for the latest minor release only. We chose to
+track Debian's oldstable as this is relatively conservative, predictably updated
+and is consistent with the `.deb` packages released by Matrix.org.
\ No newline at end of file
diff --git a/docs/development/contributing_guide.md b/docs/development/contributing_guide.md
index 4e1df51164..cb0d727efa 100644
--- a/docs/development/contributing_guide.md
+++ b/docs/development/contributing_guide.md
@@ -28,6 +28,9 @@ The source code of Synapse is hosted on GitHub. You will also need [a recent ver
 
 For some tests, you will need [a recent version of Docker](https://docs.docker.com/get-docker/).
 
+A recent version of the Rust compiler is needed to build the native modules. The
+easiest way of installing the latest version is to use [rustup](https://rustup.rs/).
+
 
 # 3. Get the source.
 
@@ -114,6 +117,11 @@ Some documentation also exists in [Synapse's GitHub
 Wiki](https://github.com/matrix-org/synapse/wiki), although this is primarily
 contributed to by community authors.
 
+When changes are made to any Rust code then you must call either `poetry install`
+or `maturin develop` (if installed) to rebuild the Rust code. Using [`maturin`](https://github.com/PyO3/maturin)
+is quicker than `poetry install`, so is recommended when making frequent
+changes to the Rust code.
+
 
 # 8. Test, test, test!
 <a name="test-test-test"></a>
@@ -195,7 +203,7 @@ The database file can then be inspected with:
 sqlite3 _trial_temp/test.db
 ```
 
-Note that the database file is cleared at the beginning of each test run. Thus it 
+Note that the database file is cleared at the beginning of each test run. Thus it
 will always only contain the data generated by the *last run test*. Though generally
 when debugging, one is only running a single test anyway.
 
diff --git a/docs/setup/installation.md b/docs/setup/installation.md
index bb78b3267a..96833effc6 100644
--- a/docs/setup/installation.md
+++ b/docs/setup/installation.md
@@ -196,6 +196,10 @@ System requirements:
 - Python 3.7 or later, up to Python 3.10.
 - At least 1GB of free RAM if you want to join large public rooms like #matrix:matrix.org
 
+If building on an uncommon architecture for which pre-built wheels are
+unavailable, you will need to have a recent Rust compiler installed. The easiest
+way of installing the latest version is to use [rustup](https://rustup.rs/).
+
 To install the Synapse homeserver run:
 
 ```sh
@@ -299,9 +303,10 @@ You may need to install the latest Xcode developer tools:
 xcode-select --install
 ```
 
-On ARM-based Macs you may need to explicitly install libjpeg which is a pillow dependency. You can use Homebrew (https://brew.sh):
+On ARM-based Macs you may need to install libjpeg and libpq. 
+You can use Homebrew (https://brew.sh):
 ```sh
- brew install jpeg
+ brew install jpeg libpq
  ```
 
 On macOS Catalina (10.15) you may need to explicitly install OpenSSL
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 757957a1d5..cd546041b2 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -1069,8 +1069,10 @@ Options related to caching.
 ---
 ### `event_cache_size`
 
-The number of events to cache in memory. Not affected by
-`caches.global_factor` and is not part of the `caches` section. Defaults to 10K.
+The number of events to cache in memory. Defaults to 10K. Like other caches,
+this is affected by `caches.global_factor` (see below).
+
+Note that this option is not part of the `caches` section.
 
 Example configuration:
 ```yaml
@@ -1391,7 +1393,7 @@ This option specifies several limits for login:
   client is attempting to log into. Defaults to `per_second: 0.17`,
   `burst_count: 3`.
 
-* `failted_attempts` ratelimits login requests based on the account the
+* `failed_attempts` ratelimits login requests based on the account the
   client is attempting to log into, based on the amount of failed login
   attempts for this account. Defaults to `per_second: 0.17`, `burst_count: 3`.
 
diff --git a/mypy.ini b/mypy.ini
index e2034e411f..64f9097206 100644
--- a/mypy.ini
+++ b/mypy.ini
@@ -16,7 +16,8 @@ files =
   docker/,
   scripts-dev/,
   synapse/,
-  tests/
+  tests/,
+  build_rust.py
 
 # Note: Better exclusion syntax coming in mypy > 0.910
 # https://github.com/python/mypy/pull/11329
@@ -181,3 +182,6 @@ ignore_missing_imports = True
 
 [mypy-incremental.*]
 ignore_missing_imports = True
+
+[mypy-setuptools_rust.*]
+ignore_missing_imports = True
diff --git a/poetry.lock b/poetry.lock
index 35021390bf..cdc69f8ea9 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -1036,6 +1036,18 @@ cryptography = ">=2.0"
 jeepney = ">=0.6"
 
 [[package]]
+name = "semantic-version"
+version = "2.10.0"
+description = "A library implementing the 'SemVer' scheme."
+category = "main"
+optional = false
+python-versions = ">=2.7"
+
+[package.extras]
+dev = ["Django (>=1.11)", "check-manifest", "colorama (<=0.4.1)", "coverage", "flake8", "nose2", "readme-renderer (<25.0)", "tox", "wheel", "zest.releaser[recommended]"]
+doc = ["Sphinx", "sphinx-rtd-theme"]
+
+[[package]]
 name = "sentry-sdk"
 version = "1.5.11"
 description = "Python client for Sentry (https://sentry.io)"
@@ -1100,6 +1112,19 @@ testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8 (<5)", "flake8-202
 testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"]
 
 [[package]]
+name = "setuptools-rust"
+version = "1.5.1"
+description = "Setuptools Rust extension plugin"
+category = "main"
+optional = false
+python-versions = ">=3.7"
+
+[package.dependencies]
+semantic-version = ">=2.8.2,<3"
+setuptools = ">=62.4"
+typing-extensions = ">=3.7.4.3"
+
+[[package]]
 name = "signedjson"
 version = "1.1.4"
 description = "Sign JSON with Ed25519 signatures"
@@ -1600,7 +1625,7 @@ url_preview = ["lxml"]
 [metadata]
 lock-version = "1.1"
 python-versions = "^3.7.1"
-content-hash = "7de518bf27967b3547eab8574342cfb67f87d6b47b4145c13de11112141dbf2d"
+content-hash = "79cfa09d59f9f8b5ef24318fb860df1915f54328692aa56d04331ecbdd92a8cb"
 
 [metadata.files]
 attrs = [
@@ -2472,6 +2497,10 @@ secretstorage = [
     {file = "SecretStorage-3.3.1-py3-none-any.whl", hash = "sha256:422d82c36172d88d6a0ed5afdec956514b189ddbfb72fefab0c8a1cee4eaf71f"},
     {file = "SecretStorage-3.3.1.tar.gz", hash = "sha256:fd666c51a6bf200643495a04abb261f83229dcb6fd8472ec393df7ffc8b6f195"},
 ]
+semantic-version = [
+    {file = "semantic_version-2.10.0-py2.py3-none-any.whl", hash = "sha256:de78a3b8e0feda74cabc54aab2da702113e33ac9d9eb9d2389bcf1f58b7d9177"},
+    {file = "semantic_version-2.10.0.tar.gz", hash = "sha256:bdabb6d336998cbb378d4b9db3a4b56a1e3235701dc05ea2690d9a997ed5041c"},
+]
 sentry-sdk = [
     {file = "sentry-sdk-1.5.11.tar.gz", hash = "sha256:6c01d9d0b65935fd275adc120194737d1df317dce811e642cbf0394d0d37a007"},
     {file = "sentry_sdk-1.5.11-py2.py3-none-any.whl", hash = "sha256:c17179183cac614e900cbd048dab03f49a48e2820182ec686c25e7ce46f8548f"},
@@ -2484,6 +2513,10 @@ setuptools = [
     {file = "setuptools-65.3.0-py3-none-any.whl", hash = "sha256:2e24e0bec025f035a2e72cdd1961119f557d78ad331bb00ff82efb2ab8da8e82"},
     {file = "setuptools-65.3.0.tar.gz", hash = "sha256:7732871f4f7fa58fb6bdcaeadb0161b2bd046c85905dbaa066bdcbcc81953b57"},
 ]
+setuptools-rust = [
+    {file = "setuptools-rust-1.5.1.tar.gz", hash = "sha256:0e05e456645d59429cb1021370aede73c0760e9360bbfdaaefb5bced530eb9d7"},
+    {file = "setuptools_rust-1.5.1-py3-none-any.whl", hash = "sha256:306b236ff3aa5229180e58292610d0c2c51bb488191122d2fc559ae4caeb7d5e"},
+]
 signedjson = [
     {file = "signedjson-1.1.4-py3-none-any.whl", hash = "sha256:45569ec54241c65d2403fe3faf7169be5322547706a231e884ca2b427f23d228"},
     {file = "signedjson-1.1.4.tar.gz", hash = "sha256:cd91c56af53f169ef032c62e9c4a3292dc158866933318d0592e3462db3d6492"},
diff --git a/pyproject.toml b/pyproject.toml
index 97be0f15eb..157385ad8a 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -52,6 +52,9 @@ include_trailing_comma = true
 combine_as_imports = true
 skip_gitignore = true
 
+[tool.maturin]
+manifest-path = "rust/Cargo.toml"
+
 [tool.poetry]
 name = "matrix-synapse"
 version = "1.67.0"
@@ -82,8 +85,17 @@ include = [
     { path = "sytest-blacklist", format = "sdist" },
     { path = "tests", format = "sdist" },
     { path = "UPGRADE.rst", format = "sdist" },
+    { path = "Cargo.toml", format = "sdist" },
+    { path = "rust/Cargo.toml", format = "sdist" },
+    { path = "rust/Cargo.lock", format = "sdist" },
+    { path = "rust/src/**", format = "sdist" },
+]
+exclude = [
+    { path = "synapse/*.so", format = "sdist"}
 ]
 
+build = "build_rust.py"
+
 [tool.poetry.scripts]
 synapse_homeserver = "synapse.app.homeserver:main"
 synapse_worker = "synapse.app.generic_worker:main"
@@ -126,7 +138,7 @@ pyOpenSSL = ">=16.0.0"
 PyYAML = ">=3.11"
 pyasn1 = ">=0.1.9"
 pyasn1-modules = ">=0.0.7"
-bcrypt = ">=3.1.0"
+bcrypt = ">=3.1.7"
 Pillow = ">=5.4.0"
 sortedcontainers = ">=1.4.4"
 pymacaroons = ">=0.13.0"
@@ -161,6 +173,15 @@ importlib_metadata = { version = ">=1.4", python = "<3.8" }
 # This is the most recent version of Pydantic with available on common distros.
 pydantic = ">=1.7.4"
 
+# This is for building the rust components during "poetry install", which
+# currently ignores the `build-system.requires` directive (c.f.
+# https://github.com/python-poetry/poetry/issues/6154). Both `pip install` and
+# `poetry build` do the right thing without this explicit dependency.
+#
+# This isn't really a dev-dependency, as `poetry install --no-dev` will fail,
+# but the alternative is to add it to the main list of deps where it isn't
+# needed.
+setuptools_rust = ">=1.3"
 
 
 # Optional Dependencies
@@ -285,5 +306,21 @@ twine = "*"
 towncrier = ">=18.6.0rc1"
 
 [build-system]
-requires = ["poetry-core>=1.0.0"]
+requires = ["poetry-core>=1.0.0", "setuptools_rust>=1.3"]
 build-backend = "poetry.core.masonry.api"
+
+
+[tool.cibuildwheel]
+# Skip unsupported platforms (by us or by Rust).
+skip = "cp36* *-musllinux_i686"
+
+# We need a rust compiler
+before-all =  "curl https://sh.rustup.rs -sSf | sh -s -- --default-toolchain stable -y"
+environment= { PATH = "$PATH:$HOME/.cargo/bin" }
+
+# For some reason if we don't manually clean the build directory we
+# can end up polluting the next build with a .so that is for the wrong
+# Python version.
+before-build = "rm -rf {project}/build"
+build-frontend = "build"
+test-command = "python -c 'from synapse.synapse_rust import sum_as_string; print(sum_as_string(1, 2))'"
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
new file mode 100644
index 0000000000..deddf3cec2
--- /dev/null
+++ b/rust/Cargo.toml
@@ -0,0 +1,25 @@
+[package]
+# We name the package `synapse` so that things like logging have the right
+# logging target.
+name = "synapse"
+
+# dummy version. See pyproject.toml for the Synapse's version number.
+version = "0.1.0"
+
+edition = "2021"
+rust-version = "1.61.0"
+
+[lib]
+name = "synapse"
+crate-type = ["cdylib"]
+
+[package.metadata.maturin]
+# This is where we tell maturin where to place the built library.
+name = "synapse.synapse_rust"
+
+[dependencies]
+pyo3 = { version = "0.16.5", features = ["extension-module", "macros", "abi3", "abi3-py37"] }
+
+[build-dependencies]
+blake2 = "0.10.4"
+hex = "0.4.3"
diff --git a/rust/build.rs b/rust/build.rs
new file mode 100644
index 0000000000..2117975e56
--- /dev/null
+++ b/rust/build.rs
@@ -0,0 +1,45 @@
+//! This build script calculates the hash of all files in the `src/`
+//! directory and adds it as an environment variable during build time.
+//!
+//! This is used so that the python code can detect when the built native module
+//! does not match the source in-tree, helping to detect the case where the
+//! source has been updated but the library hasn't been rebuilt.
+
+use std::path::PathBuf;
+
+use blake2::{Blake2b512, Digest};
+
+fn main() -> Result<(), std::io::Error> {
+    let mut dirs = vec![PathBuf::from("src")];
+
+    let mut paths = Vec::new();
+    while let Some(path) = dirs.pop() {
+        let mut entries = std::fs::read_dir(path)?
+            .map(|res| res.map(|e| e.path()))
+            .collect::<Result<Vec<_>, std::io::Error>>()?;
+
+        entries.sort();
+
+        for entry in entries {
+            if entry.is_dir() {
+                dirs.push(entry)
+            } else {
+                paths.push(entry.to_str().expect("valid rust paths").to_string());
+            }
+        }
+    }
+
+    paths.sort();
+
+    let mut hasher = Blake2b512::new();
+
+    for path in paths {
+        let bytes = std::fs::read(path)?;
+        hasher.update(bytes);
+    }
+
+    let hex_digest = hex::encode(hasher.finalize());
+    println!("cargo:rustc-env=SYNAPSE_RUST_DIGEST={hex_digest}");
+
+    Ok(())
+}
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
new file mode 100644
index 0000000000..ba42465fb8
--- /dev/null
+++ b/rust/src/lib.rs
@@ -0,0 +1,24 @@
+use pyo3::prelude::*;
+
+/// Returns the hash of all the rust source files at the time it was compiled.
+///
+/// Used by python to detect if the rust library is outdated.
+#[pyfunction]
+fn get_rust_file_digest() -> &'static str {
+    env!("SYNAPSE_RUST_DIGEST")
+}
+
+/// Formats the sum of two numbers as string.
+#[pyfunction]
+#[pyo3(text_signature = "(a, b, /)")]
+fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
+    Ok((a + b).to_string())
+}
+
+/// The entry point for defining the Python module.
+#[pymodule]
+fn synapse_rust(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
+    m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
+    m.add_function(wrap_pyfunction!(get_rust_file_digest, m)?)?;
+    Ok(())
+}
diff --git a/scripts-dev/make_full_schema.sh b/scripts-dev/make_full_schema.sh
index f0e22d4ca2..61394360ce 100755
--- a/scripts-dev/make_full_schema.sh
+++ b/scripts-dev/make_full_schema.sh
@@ -9,8 +9,10 @@
 export PGHOST="localhost"
 POSTGRES_DB_NAME="synapse_full_schema.$$"
 
-SQLITE_FULL_SCHEMA_OUTPUT_FILE="full.sql.sqlite"
-POSTGRES_FULL_SCHEMA_OUTPUT_FILE="full.sql.postgres"
+SQLITE_SCHEMA_FILE="schema.sql.sqlite"
+SQLITE_ROWS_FILE="rows.sql.sqlite"
+POSTGRES_SCHEMA_FILE="full.sql.postgres"
+POSTGRES_ROWS_FILE="rows.sql.postgres"
 
 REQUIRED_DEPS=("matrix-synapse" "psycopg2")
 
@@ -22,7 +24,7 @@ usage() {
   echo "  Username to connect to local postgres instance. The password will be requested"
   echo "  during script execution."
   echo "-c"
-  echo "  CI mode. Enables coverage tracking and prints every command that the script runs."
+  echo "  CI mode. Prints every command that the script runs."
   echo "-o <path>"
   echo "  Directory to output full schema files to."
   echo "-h"
@@ -37,11 +39,6 @@ while getopts "p:co:h" opt; do
     c)
       # Print all commands that are being executed
       set -x
-
-      # Modify required dependencies for coverage
-      REQUIRED_DEPS+=("coverage" "coverage-enable-subprocess")
-
-      COVERAGE=1
       ;;
     o)
       command -v realpath > /dev/null || (echo "The -o flag requires the 'realpath' binary to be installed" && exit 1)
@@ -102,6 +99,7 @@ SQLITE_DB=$TMPDIR/homeserver.db
 POSTGRES_CONFIG=$TMPDIR/postgres.conf
 
 # Ensure these files are delete on script exit
+# TODO: the trap should also drop the temp postgres DB
 trap 'rm -rf $TMPDIR' EXIT
 
 cat > "$SQLITE_CONFIG" <<EOF
@@ -147,48 +145,34 @@ python -m synapse.app.homeserver --generate-keys -c "$SQLITE_CONFIG"
 
 # Make sure the SQLite3 database is using the latest schema and has no pending background update.
 echo "Running db background jobs..."
-synapse/_scripts/update_synapse_database.py --database-config --run-background-updates "$SQLITE_CONFIG"
+synapse/_scripts/update_synapse_database.py --database-config "$SQLITE_CONFIG" --run-background-updates
 
 # Create the PostgreSQL database.
 echo "Creating postgres database..."
 createdb --lc-collate=C --lc-ctype=C --template=template0 "$POSTGRES_DB_NAME"
 
-echo "Copying data from SQLite3 to Postgres with synapse_port_db..."
-if [ -z "$COVERAGE" ]; then
-  # No coverage needed
-  synapse/_scripts/synapse_port_db.py --sqlite-database "$SQLITE_DB" --postgres-config "$POSTGRES_CONFIG"
-else
-  # Coverage desired
-  coverage run synapse/_scripts/synapse_port_db.py --sqlite-database "$SQLITE_DB" --postgres-config "$POSTGRES_CONFIG"
-fi
+echo "Running db background jobs..."
+synapse/_scripts/update_synapse_database.py --database-config "$POSTGRES_CONFIG" --run-background-updates
+
 
 # Delete schema_version, applied_schema_deltas and applied_module_schemas tables
 # Also delete any shadow tables from fts4
-# This needs to be done after synapse_port_db is run
 echo "Dropping unwanted db tables..."
 SQL="
 DROP TABLE schema_version;
 DROP TABLE applied_schema_deltas;
 DROP TABLE applied_module_schemas;
-DROP TABLE event_search_content;
-DROP TABLE event_search_segments;
-DROP TABLE event_search_segdir;
-DROP TABLE event_search_docsize;
-DROP TABLE event_search_stat;
-DROP TABLE user_directory_search_content;
-DROP TABLE user_directory_search_segments;
-DROP TABLE user_directory_search_segdir;
-DROP TABLE user_directory_search_docsize;
-DROP TABLE user_directory_search_stat;
 "
 sqlite3 "$SQLITE_DB" <<< "$SQL"
 psql "$POSTGRES_DB_NAME" -w <<< "$SQL"
 
-echo "Dumping SQLite3 schema to '$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE'..."
-sqlite3 "$SQLITE_DB" ".dump" > "$OUTPUT_DIR/$SQLITE_FULL_SCHEMA_OUTPUT_FILE"
+echo "Dumping SQLite3 schema to '$OUTPUT_DIR/$SQLITE_SCHEMA_FILE' and '$OUTPUT_DIR/$SQLITE_ROWS_FILE'..."
+sqlite3 "$SQLITE_DB" ".schema --indent" > "$OUTPUT_DIR/$SQLITE_SCHEMA_FILE"
+sqlite3 "$SQLITE_DB" ".dump --data-only --nosys" > "$OUTPUT_DIR/$SQLITE_ROWS_FILE"
 
-echo "Dumping Postgres schema to '$OUTPUT_DIR/$POSTGRES_FULL_SCHEMA_OUTPUT_FILE'..."
-pg_dump --format=plain --no-tablespaces --no-acl --no-owner $POSTGRES_DB_NAME | sed -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d' > "$OUTPUT_DIR/$POSTGRES_FULL_SCHEMA_OUTPUT_FILE"
+echo "Dumping Postgres schema to '$OUTPUT_DIR/$POSTGRES_SCHEMA_FILE' and '$OUTPUT_DIR/$POSTGRES_ROWS_FILE'..."
+pg_dump --format=plain --schema-only         --no-tablespaces --no-acl --no-owner "$POSTGRES_DB_NAME" | sed -e '/^$/d' -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d' > "$OUTPUT_DIR/$POSTGRES_SCHEMA_FILE"
+pg_dump --format=plain --data-only --inserts --no-tablespaces --no-acl --no-owner "$POSTGRES_DB_NAME" | sed -e '/^$/d' -e '/^--/d' -e 's/public\.//g' -e '/^SET /d' -e '/^SELECT /d' > "$OUTPUT_DIR/$POSTGRES_ROWS_FILE"
 
 echo "Cleaning up temporary Postgres database..."
 dropdb $POSTGRES_DB_NAME
diff --git a/stubs/synapse/__init__.pyi b/stubs/synapse/__init__.pyi
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/stubs/synapse/__init__.pyi
diff --git a/stubs/synapse/synapse_rust.pyi b/stubs/synapse/synapse_rust.pyi
new file mode 100644
index 0000000000..8658d3138f
--- /dev/null
+++ b/stubs/synapse/synapse_rust.pyi
@@ -0,0 +1,2 @@
+def sum_as_string(a: int, b: int) -> str: ...
+def get_rust_file_digest() -> str: ...
diff --git a/synapse/__init__.py b/synapse/__init__.py
index b1369aca8f..1bed6393bd 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -20,6 +20,8 @@ import json
 import os
 import sys
 
+from synapse.util.rust import check_rust_lib_up_to_date
+
 # Check that we're not running on an unsupported Python version.
 if sys.version_info < (3, 7):
     print("Synapse requires Python 3.7 or above.")
@@ -78,3 +80,6 @@ if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     from synapse.util.patch_inline_callbacks import do_patch
 
     do_patch()
+
+
+check_rust_lib_up_to_date()
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 543bba27c2..30983c47fb 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -67,6 +67,7 @@ from synapse.storage.databases.main.media_repository import (
 )
 from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
 from synapse.storage.databases.main.pusher import PusherWorkerStore
+from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
 from synapse.storage.databases.main.registration import (
     RegistrationBackgroundUpdateStore,
     find_max_generated_user_id_localpart,
@@ -203,6 +204,7 @@ class Store(
     PushRuleStore,
     PusherWorkerStore,
     PresenceBackgroundUpdateStore,
+    ReceiptsBackgroundUpdateStore,
 ):
     def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
         return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 9a1aea083f..4a75eb6b21 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -32,12 +32,14 @@ from synapse.appservice import ApplicationService
 from synapse.http import get_request_user_agent
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import (
+    SynapseTags,
     active_span,
     force_tracing,
     start_active_span,
     trace,
 )
 from synapse.types import Requester, create_requester
+from synapse.util.cancellation import cancellable
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -118,6 +120,7 @@ class Auth:
             errcode=Codes.NOT_JOINED,
         )
 
+    @cancellable
     async def get_user_by_req(
         self,
         request: SynapseRequest,
@@ -159,6 +162,12 @@ class Auth:
                 parent_span.set_tag(
                     "authenticated_entity", requester.authenticated_entity
                 )
+                # We tag the Synapse instance name so that it's an easy jumping
+                # off point into the logs. Can also be used to filter for an
+                # instance that is under load.
+                parent_span.set_tag(
+                    SynapseTags.INSTANCE_NAME, self.hs.get_instance_name()
+                )
                 parent_span.set_tag("user_id", requester.user.to_string())
                 if requester.device_id is not None:
                     parent_span.set_tag("device_id", requester.device_id)
@@ -166,6 +175,7 @@ class Auth:
                     parent_span.set_tag("appservice_id", requester.app_service.id)
             return requester
 
+    @cancellable
     async def _wrapped_get_user_by_req(
         self,
         request: SynapseRequest,
@@ -281,6 +291,7 @@ class Auth:
                 403, "Application service has not registered this user (%s)" % user_id
             )
 
+    @cancellable
     async def _get_appservice_user(self, request: Request) -> Optional[Requester]:
         """
         Given a request, reads the request parameters to determine:
@@ -523,6 +534,7 @@ class Auth:
         return bool(query_params) or bool(auth_headers)
 
     @staticmethod
+    @cancellable
     def get_access_token_from_request(request: Request) -> str:
         """Extracts the access_token from the request.
 
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 102889ac49..f7f46f8d80 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -140,13 +140,13 @@ USER_FILTER_SCHEMA = {
 
 
 @FormatChecker.cls_checks("matrix_room_id")
-def matrix_room_id_validator(room_id_str: str) -> bool:
-    return RoomID.is_valid(room_id_str)
+def matrix_room_id_validator(room_id: object) -> bool:
+    return isinstance(room_id, str) and RoomID.is_valid(room_id)
 
 
 @FormatChecker.cls_checks("matrix_user_id")
-def matrix_user_id_validator(user_id_str: str) -> bool:
-    return UserID.is_valid(user_id_str)
+def matrix_user_id_validator(user_id: object) -> bool:
+    return isinstance(user_id, str) and UserID.is_valid(user_id)
 
 
 class Filtering:
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index a0e4ab6db6..e37acb0f1e 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -19,18 +19,23 @@ import attr
 
 class EventFormatVersions:
     """This is an internal enum for tracking the version of the event format,
-    independently from the room version.
+    independently of the room version.
+
+    To reduce confusion, the event format versions are named after the room
+    versions that they were used or introduced in.
+    The concept of an 'event format version' is specific to Synapse (the
+    specification does not mention this term.)
     """
 
-    V1 = 1  # $id:server event id format
-    V2 = 2  # MSC1659-style $hash event id format: introduced for room v3
-    V3 = 3  # MSC1884-style $hash format: introduced for room v4
+    ROOM_V1_V2 = 1  # $id:server event id format: used for room v1 and v2
+    ROOM_V3 = 2  # MSC1659-style $hash event id format: used for room v3
+    ROOM_V4_PLUS = 3  # MSC1884-style $hash format: introduced for room v4
 
 
 KNOWN_EVENT_FORMAT_VERSIONS = {
-    EventFormatVersions.V1,
-    EventFormatVersions.V2,
-    EventFormatVersions.V3,
+    EventFormatVersions.ROOM_V1_V2,
+    EventFormatVersions.ROOM_V3,
+    EventFormatVersions.ROOM_V4_PLUS,
 }
 
 
@@ -92,7 +97,7 @@ class RoomVersions:
     V1 = RoomVersion(
         "1",
         RoomDisposition.STABLE,
-        EventFormatVersions.V1,
+        EventFormatVersions.ROOM_V1_V2,
         StateResolutionVersions.V1,
         enforce_key_validity=False,
         special_case_aliases_auth=True,
@@ -110,7 +115,7 @@ class RoomVersions:
     V2 = RoomVersion(
         "2",
         RoomDisposition.STABLE,
-        EventFormatVersions.V1,
+        EventFormatVersions.ROOM_V1_V2,
         StateResolutionVersions.V2,
         enforce_key_validity=False,
         special_case_aliases_auth=True,
@@ -128,7 +133,7 @@ class RoomVersions:
     V3 = RoomVersion(
         "3",
         RoomDisposition.STABLE,
-        EventFormatVersions.V2,
+        EventFormatVersions.ROOM_V3,
         StateResolutionVersions.V2,
         enforce_key_validity=False,
         special_case_aliases_auth=True,
@@ -146,7 +151,7 @@ class RoomVersions:
     V4 = RoomVersion(
         "4",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=False,
         special_case_aliases_auth=True,
@@ -164,7 +169,7 @@ class RoomVersions:
     V5 = RoomVersion(
         "5",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=True,
@@ -182,7 +187,7 @@ class RoomVersions:
     V6 = RoomVersion(
         "6",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -200,7 +205,7 @@ class RoomVersions:
     MSC2176 = RoomVersion(
         "org.matrix.msc2176",
         RoomDisposition.UNSTABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -218,7 +223,7 @@ class RoomVersions:
     V7 = RoomVersion(
         "7",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -236,7 +241,7 @@ class RoomVersions:
     V8 = RoomVersion(
         "8",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -254,7 +259,7 @@ class RoomVersions:
     V9 = RoomVersion(
         "9",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -272,7 +277,7 @@ class RoomVersions:
     MSC3787 = RoomVersion(
         "org.matrix.msc3787",
         RoomDisposition.UNSTABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -290,7 +295,7 @@ class RoomVersions:
     V10 = RoomVersion(
         "10",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -308,7 +313,7 @@ class RoomVersions:
     MSC2716v4 = RoomVersion(
         "org.matrix.msc2716v4",
         RoomDisposition.UNSTABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py
index 51c8d15711..53db1e85b3 100644
--- a/synapse/app/phone_stats_home.py
+++ b/synapse/app/phone_stats_home.py
@@ -32,15 +32,15 @@ logger = logging.getLogger("synapse.app.homeserver")
 _stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
 
 # Gauges to expose monthly active user control metrics
-current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+current_mau_gauge = Gauge("synapse_admin_mau_current", "Current MAU")
 current_mau_by_service_gauge = Gauge(
     "synapse_admin_mau_current_mau_by_service",
     "Current MAU by service",
     ["app_service"],
 )
-max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
+max_mau_gauge = Gauge("synapse_admin_mau_max", "MAU Limit")
 registered_reserved_users_mau_gauge = Gauge(
-    "synapse_admin_mau:registered_reserved_users",
+    "synapse_admin_mau_registered_reserved_users",
     "Registered users with reserved threepids",
 )
 
diff --git a/synapse/config/key.py b/synapse/config/key.py
index cc75efdf8f..f3dc4df695 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -217,7 +217,18 @@ class KeyConfig(Config):
 
         signing_keys = self.read_file(signing_key_path, name)
         try:
-            return read_signing_keys(signing_keys.splitlines(True))
+            loaded_signing_keys = read_signing_keys(
+                [
+                    signing_key_line
+                    for signing_key_line in signing_keys.splitlines(keepends=False)
+                    if signing_key_line.strip()
+                ]
+            )
+
+            if not loaded_signing_keys:
+                raise ConfigError(f"No signing keys in file {signing_key_path}")
+
+            return loaded_signing_keys
         except Exception as e:
             raise ConfigError("Error reading %s: %s" % (name, str(e)))
 
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 389b0c5d53..c7d5ef92fc 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -109,7 +109,7 @@ def validate_event_for_room_version(event: "EventBase") -> None:
         if not is_invite_via_3pid:
             raise AuthError(403, "Event not signed by sender's server")
 
-    if event.format_version in (EventFormatVersions.V1,):
+    if event.format_version in (EventFormatVersions.ROOM_V1_V2,):
         # Only older room versions have event IDs to check.
         event_id_domain = get_domain_from_id(event.event_id)
 
@@ -716,7 +716,7 @@ def check_redaction(
     if user_level >= redact_level:
         return False
 
-    if room_version_obj.event_format == EventFormatVersions.V1:
+    if room_version_obj.event_format == EventFormatVersions.ROOM_V1_V2:
         redacter_domain = get_domain_from_id(event.event_id)
         if not isinstance(event.redacts, str):
             return False
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 39ad2793d9..b2c9119fd0 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -442,7 +442,7 @@ class EventBase(metaclass=abc.ABCMeta):
 
 
 class FrozenEvent(EventBase):
-    format_version = EventFormatVersions.V1  # All events of this type are V1
+    format_version = EventFormatVersions.ROOM_V1_V2  # All events of this type are V1
 
     def __init__(
         self,
@@ -490,7 +490,7 @@ class FrozenEvent(EventBase):
 
 
 class FrozenEventV2(EventBase):
-    format_version = EventFormatVersions.V2  # All events of this type are V2
+    format_version = EventFormatVersions.ROOM_V3  # All events of this type are V2
 
     def __init__(
         self,
@@ -567,7 +567,7 @@ class FrozenEventV2(EventBase):
 class FrozenEventV3(FrozenEventV2):
     """FrozenEventV3, which differs from FrozenEventV2 only in the event_id format"""
 
-    format_version = EventFormatVersions.V3  # All events of this type are V3
+    format_version = EventFormatVersions.ROOM_V4_PLUS  # All events of this type are V3
 
     @property
     def event_id(self) -> str:
@@ -597,11 +597,11 @@ def _event_type_from_format_version(
         `FrozenEvent`
     """
 
-    if format_version == EventFormatVersions.V1:
+    if format_version == EventFormatVersions.ROOM_V1_V2:
         return FrozenEvent
-    elif format_version == EventFormatVersions.V2:
+    elif format_version == EventFormatVersions.ROOM_V3:
         return FrozenEventV2
-    elif format_version == EventFormatVersions.V3:
+    elif format_version == EventFormatVersions.ROOM_V4_PLUS:
         return FrozenEventV3
     else:
         raise Exception("No event format %r" % (format_version,))
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 17f624b68f..746bd3978d 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -137,7 +137,7 @@ class EventBuilder:
         # The types of auth/prev events changes between event versions.
         prev_events: Union[List[str], List[Tuple[str, Dict[str, str]]]]
         auth_events: Union[List[str], List[Tuple[str, Dict[str, str]]]]
-        if format_version == EventFormatVersions.V1:
+        if format_version == EventFormatVersions.ROOM_V1_V2:
             auth_events = await self._store.add_event_hashes(auth_event_ids)
             prev_events = await self._store.add_event_hashes(prev_event_ids)
         else:
@@ -253,7 +253,7 @@ def create_local_event_from_event_dict(
 
     time_now = int(clock.time_msec())
 
-    if format_version == EventFormatVersions.V1:
+    if format_version == EventFormatVersions.ROOM_V1_V2:
         event_dict["event_id"] = _create_event_id(clock, hostname)
 
     event_dict["origin"] = hostname
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 27c8beba25..a6f0104396 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -45,7 +45,7 @@ class EventValidator:
         """
         self.validate_builder(event)
 
-        if event.format_version == EventFormatVersions.V1:
+        if event.format_version == EventFormatVersions.ROOM_V1_V2:
             EventID.from_string(event.event_id)
 
         required = [
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 4269a98db2..abe2c1971a 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -194,7 +194,7 @@ async def _check_sigs_on_pdu(
     # event id's domain (normally only the case for joins/leaves), and add additional
     # checks. Only do this if the room version has a concept of event ID domain
     # (ie, the room version uses old-style non-hash event IDs).
-    if room_version.event_format == EventFormatVersions.V1:
+    if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
         event_domain = get_domain_from_id(pdu.event_id)
         if event_domain != sender_domain:
             try:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7ee2974bb1..4a4289ee7c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -1190,7 +1190,7 @@ class FederationClient(FederationBase):
             # Otherwise, consider it a legitimate error and raise.
             err = e.to_synapse_error()
             if self._is_unknown_endpoint(e, err):
-                if room_version.event_format != EventFormatVersions.V1:
+                if room_version.event_format != EventFormatVersions.ROOM_V1_V2:
                     raise SynapseError(
                         400,
                         "User's homeserver does not support this room version",
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8bc60e3e3e..a6cb3ba58f 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -62,12 +62,12 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 sent_pdus_destination_dist_count = Counter(
-    "synapse_federation_client_sent_pdu_destinations:count",
+    "synapse_federation_client_sent_pdu_destinations_count",
     "Number of PDUs queued for sending to one or more destinations",
 )
 
 sent_pdus_destination_dist_total = Counter(
-    "synapse_federation_client_sent_pdu_destinations:total",
+    "synapse_federation_client_sent_pdu_destinations",
     "Total number of PDUs queued for sending across all destinations",
 )
 
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index d4fe7df533..cf9f19608a 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -70,6 +70,7 @@ class AdminHandler:
             "appservice_id",
             "consent_server_notice_sent",
             "consent_version",
+            "consent_ts",
             "user_type",
             "is_guest",
         }
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9c2c3a0e68..c5ac169644 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -52,6 +52,7 @@ from synapse.types import (
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.cancellation import cancellable
 from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -124,6 +125,7 @@ class DeviceWorkerHandler:
 
         return device
 
+    @cancellable
     async def get_device_changes_in_shared_rooms(
         self, user_id: str, room_ids: Collection[str], from_token: StreamToken
     ) -> Collection[str]:
@@ -163,6 +165,7 @@ class DeviceWorkerHandler:
 
     @trace
     @measure_func("device.get_user_ids_changed")
+    @cancellable
     async def get_user_ids_changed(
         self, user_id: str, from_token: StreamToken
     ) -> JsonDict:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index c938339ddd..ec81639c78 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -37,7 +37,8 @@ from synapse.types import (
     get_verify_key_from_cross_signing_key,
 )
 from synapse.util import json_decoder, unwrapFirstError
-from synapse.util.async_helpers import Linearizer
+from synapse.util.async_helpers import Linearizer, delay_cancellation
+from synapse.util.cancellation import cancellable
 from synapse.util.retryutils import NotRetryingDestination
 
 if TYPE_CHECKING:
@@ -91,6 +92,7 @@ class E2eKeysHandler:
         )
 
     @trace
+    @cancellable
     async def query_devices(
         self,
         query_body: JsonDict,
@@ -208,22 +210,26 @@ class E2eKeysHandler:
                     r[user_id] = remote_queries[user_id]
 
             # Now fetch any devices that we don't have in our cache
+            # TODO It might make sense to propagate cancellations into the
+            #      deferreds which are querying remote homeservers.
             await make_deferred_yieldable(
-                defer.gatherResults(
-                    [
-                        run_in_background(
-                            self._query_devices_for_destination,
-                            results,
-                            cross_signing_keys,
-                            failures,
-                            destination,
-                            queries,
-                            timeout,
-                        )
-                        for destination, queries in remote_queries_not_in_cache.items()
-                    ],
-                    consumeErrors=True,
-                ).addErrback(unwrapFirstError)
+                delay_cancellation(
+                    defer.gatherResults(
+                        [
+                            run_in_background(
+                                self._query_devices_for_destination,
+                                results,
+                                cross_signing_keys,
+                                failures,
+                                destination,
+                                queries,
+                                timeout,
+                            )
+                            for destination, queries in remote_queries_not_in_cache.items()
+                        ],
+                        consumeErrors=True,
+                    ).addErrback(unwrapFirstError)
+                )
             )
 
             ret = {"device_keys": results, "failures": failures}
@@ -347,6 +353,7 @@ class E2eKeysHandler:
 
         return
 
+    @cancellable
     async def get_cross_signing_keys_from_cache(
         self, query: Iterable[str], from_user_id: Optional[str]
     ) -> Dict[str, Dict[str, dict]]:
@@ -393,6 +400,7 @@ class E2eKeysHandler:
         }
 
     @trace
+    @cancellable
     async def query_local_devices(
         self, query: Mapping[str, Optional[List[str]]]
     ) -> Dict[str, Dict[str, dict]]:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index a0c39778ab..1f83bab836 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -26,6 +26,7 @@ from synapse.events.utils import SerializeEventConfig
 from synapse.handlers.room import ShutdownRoomResponse
 from synapse.logging.opentracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.rest.admin._base import assert_user_is_admin
 from synapse.storage.state import StateFilter
 from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, Requester, StreamKeyType
@@ -423,6 +424,7 @@ class PaginationHandler:
         pagin_config: PaginationConfig,
         as_client_event: bool = True,
         event_filter: Optional[Filter] = None,
+        use_admin_priviledge: bool = False,
     ) -> JsonDict:
         """Get messages in a room.
 
@@ -432,10 +434,16 @@ class PaginationHandler:
             pagin_config: The pagination config rules to apply, if any.
             as_client_event: True to get events in client-server format.
             event_filter: Filter to apply to results or None
+            use_admin_priviledge: if `True`, return all events, regardless
+                of whether `user` has access to them. To be used **ONLY**
+                from the admin API.
 
         Returns:
             Pagination API results
         """
+        if use_admin_priviledge:
+            await assert_user_is_admin(self.auth, requester)
+
         user_id = requester.user.to_string()
 
         if pagin_config.from_token:
@@ -458,12 +466,14 @@ class PaginationHandler:
         room_token = from_token.room_key
 
         async with self.pagination_lock.read(room_id):
-            (
-                membership,
-                member_event_id,
-            ) = await self.auth.check_user_in_room_or_world_readable(
-                room_id, requester, allow_departed_users=True
-            )
+            (membership, member_event_id) = (None, None)
+            if not use_admin_priviledge:
+                (
+                    membership,
+                    member_event_id,
+                ) = await self.auth.check_user_in_room_or_world_readable(
+                    room_id, requester, allow_departed_users=True
+                )
 
             if pagin_config.direction == "b":
                 # if we're going backwards, we might need to backfill. This
@@ -475,7 +485,7 @@ class PaginationHandler:
                         room_id, room_token.stream
                     )
 
-                if membership == Membership.LEAVE:
+                if not use_admin_priviledge and membership == Membership.LEAVE:
                     # If they have left the room then clamp the token to be before
                     # they left the room, to save the effort of loading from the
                     # database.
@@ -528,12 +538,13 @@ class PaginationHandler:
         if event_filter:
             events = await event_filter.filter(events)
 
-        events = await filter_events_for_client(
-            self._storage_controllers,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
+        if not use_admin_priviledge:
+            events = await filter_events_for_client(
+                self._storage_controllers,
+                user_id,
+                events,
+                is_peeking=(member_event_id is None),
+            )
 
         # if after the filter applied there are no more events
         # return immediately - but there might be more in next_token batch
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 732b0310bc..ebd445adca 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -453,7 +453,6 @@ class RoomSummaryHandler:
                 "type": e.type,
                 "state_key": e.state_key,
                 "content": e.content,
-                "room_id": e.room_id,
                 "sender": e.sender,
                 "origin_server_ts": e.origin_server_ts,
             }
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2d95b1fa24..5293fa4d0e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,6 +15,7 @@ import itertools
 import logging
 from typing import (
     TYPE_CHECKING,
+    AbstractSet,
     Any,
     Collection,
     Dict,
@@ -1413,10 +1414,10 @@ class SyncHandler:
     async def _generate_sync_entry_for_device_list(
         self,
         sync_result_builder: "SyncResultBuilder",
-        newly_joined_rooms: Set[str],
-        newly_joined_or_invited_or_knocked_users: Set[str],
-        newly_left_rooms: Set[str],
-        newly_left_users: Set[str],
+        newly_joined_rooms: AbstractSet[str],
+        newly_joined_or_invited_or_knocked_users: AbstractSet[str],
+        newly_left_rooms: AbstractSet[str],
+        newly_left_users: AbstractSet[str],
     ) -> DeviceListUpdates:
         """Generate the DeviceListUpdates section of sync
 
@@ -1434,8 +1435,7 @@ class SyncHandler:
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
 
-        # We're going to mutate these fields, so lets copy them rather than
-        # assume they won't get used later.
+        # Take a copy since these fields will be mutated later.
         newly_joined_or_invited_or_knocked_users = set(
             newly_joined_or_invited_or_knocked_users
         )
@@ -1635,8 +1635,8 @@ class SyncHandler:
     async def _generate_sync_entry_for_presence(
         self,
         sync_result_builder: "SyncResultBuilder",
-        newly_joined_rooms: Set[str],
-        newly_joined_or_invited_users: Set[str],
+        newly_joined_rooms: AbstractSet[str],
+        newly_joined_or_invited_users: AbstractSet[str],
     ) -> None:
         """Generates the presence portion of the sync response. Populates the
         `sync_result_builder` with the result.
@@ -1694,7 +1694,7 @@ class SyncHandler:
         self,
         sync_result_builder: "SyncResultBuilder",
         account_data_by_room: Dict[str, Dict[str, JsonDict]],
-    ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
+    ) -> Tuple[AbstractSet[str], AbstractSet[str], AbstractSet[str], AbstractSet[str]]:
         """Generates the rooms portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
@@ -2534,7 +2534,7 @@ class SyncResultBuilder:
     archived: List[ArchivedSyncResult] = attr.Factory(list)
     to_device: List[JsonDict] = attr.Factory(list)
 
-    def calculate_user_changes(self) -> Tuple[Set[str], Set[str]]:
+    def calculate_user_changes(self) -> Tuple[AbstractSet[str], AbstractSet[str]]:
         """Work out which other users have joined or left rooms we are joined to.
 
         This data only is only useful for an incremental sync.
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 26aaabfb34..80acbdcf3c 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -28,7 +28,8 @@ from typing import (
     overload,
 )
 
-from pydantic import BaseModel, ValidationError
+from pydantic import BaseModel, MissingError, PydanticValueError, ValidationError
+from pydantic.error_wrappers import ErrorWrapper
 from typing_extensions import Literal
 
 from twisted.web.server import Request
@@ -714,7 +715,21 @@ def parse_and_validate_json_object_from_request(
     try:
         instance = model_type.parse_obj(content)
     except ValidationError as e:
-        raise SynapseError(HTTPStatus.BAD_REQUEST, str(e), errcode=Codes.BAD_JSON)
+        # Choose a matrix error code. The catch-all is BAD_JSON, but we try to find a
+        # more specific error if possible (which occasionally helps us to be spec-
+        # compliant) This is a bit awkward because the spec's error codes aren't very
+        # clear-cut: BAD_JSON arguably overlaps with MISSING_PARAM and INVALID_PARAM.
+        errcode = Codes.BAD_JSON
+
+        raw_errors = e.raw_errors
+        if len(raw_errors) == 1 and isinstance(raw_errors[0], ErrorWrapper):
+            raw_error = raw_errors[0].exc
+            if isinstance(raw_error, MissingError):
+                errcode = Codes.MISSING_PARAM
+            elif isinstance(raw_error, PydanticValueError):
+                errcode = Codes.INVALID_PARAM
+
+        raise SynapseError(HTTPStatus.BAD_REQUEST, str(e), errcode=errcode)
 
     return instance
 
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 482316a1ff..ca2735dd6d 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -203,6 +203,9 @@ if TYPE_CHECKING:
 
 # Helper class
 
+# Matches the number suffix in an instance name like "matrix.org client_reader-8"
+STRIP_INSTANCE_NUMBER_SUFFIX_REGEX = re.compile(r"[_-]?\d+$")
+
 
 class _DummyTagNames:
     """wrapper of opentracings tags. We need to have them if we
@@ -295,6 +298,8 @@ class SynapseTags:
     # Whether the sync response has new data to be returned to the client.
     SYNC_RESULT = "sync.new_data"
 
+    INSTANCE_NAME = "instance_name"
+
     # incoming HTTP request ID  (as written in the logs)
     REQUEST_ID = "request_id"
 
@@ -441,9 +446,17 @@ def init_tracer(hs: "HomeServer") -> None:
 
     from jaeger_client.metrics.prometheus import PrometheusMetricsFactory
 
+    # Instance names are opaque strings but by stripping off the number suffix,
+    # we can get something that looks like a "worker type", e.g.
+    # "client_reader-1" -> "client_reader" so we don't spread the traces across
+    # so many services.
+    instance_name_by_type = re.sub(
+        STRIP_INSTANCE_NUMBER_SUFFIX_REGEX, "", hs.get_instance_name()
+    )
+
     config = JaegerConfig(
         config=hs.config.tracing.jaeger_config,
-        service_name=f"{hs.config.server.server_name} {hs.get_instance_name()}",
+        service_name=f"{hs.config.server.server_name} {instance_name_by_type}",
         scope_manager=LogContextScopeManager(),
         metrics_factory=PrometheusMetricsFactory(),
     )
@@ -1032,11 +1045,11 @@ def trace_servlet(
             # with JsonResource).
             scope.span.set_operation_name(request.request_metrics.name)
 
-            # set the tags *after* the servlet completes, in case it decided to
-            # prioritise the span (tags will get dropped on unprioritised spans)
             request_tags[
                 SynapseTags.REQUEST_TAG
             ] = request.request_metrics.start_context.tag
 
+            # set the tags *after* the servlet completes, in case it decided to
+            # prioritise the span (tags will get dropped on unprioritised spans)
             for k, v in request_tags.items():
                 scope.span.set_tag(k, v)
diff --git a/synapse/metrics/_legacy_exposition.py b/synapse/metrics/_legacy_exposition.py
index ff640a49af..563d8cc2c6 100644
--- a/synapse/metrics/_legacy_exposition.py
+++ b/synapse/metrics/_legacy_exposition.py
@@ -34,8 +34,6 @@ from prometheus_client.core import Sample
 from twisted.web.resource import Resource
 from twisted.web.server import Request
 
-from synapse.util import caches
-
 CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
 
 
@@ -88,11 +86,16 @@ LEGACY_METRIC_NAMES = {
     "synapse_util_caches_cache_hits": "synapse_util_caches_cache:hits",
     "synapse_util_caches_cache_size": "synapse_util_caches_cache:size",
     "synapse_util_caches_cache_evicted_size": "synapse_util_caches_cache:evicted_size",
-    "synapse_util_caches_cache_total": "synapse_util_caches_cache:total",
+    "synapse_util_caches_cache": "synapse_util_caches_cache:total",
     "synapse_util_caches_response_cache_size": "synapse_util_caches_response_cache:size",
     "synapse_util_caches_response_cache_hits": "synapse_util_caches_response_cache:hits",
     "synapse_util_caches_response_cache_evicted_size": "synapse_util_caches_response_cache:evicted_size",
-    "synapse_util_caches_response_cache_total": "synapse_util_caches_response_cache:total",
+    "synapse_util_caches_response_cache": "synapse_util_caches_response_cache:total",
+    "synapse_federation_client_sent_pdu_destinations": "synapse_federation_client_sent_pdu_destinations:total",
+    "synapse_federation_client_sent_pdu_destinations_count": "synapse_federation_client_sent_pdu_destinations:count",
+    "synapse_admin_mau_current": "synapse_admin_mau:current",
+    "synapse_admin_mau_max": "synapse_admin_mau:max",
+    "synapse_admin_mau_registered_reserved_users": "synapse_admin_mau:registered_reserved_users",
 }
 
 
@@ -102,11 +105,6 @@ def generate_latest(registry: CollectorRegistry, emit_help: bool = False) -> byt
     by prometheus-client.
     """
 
-    # Trigger the cache metrics to be rescraped, which updates the common
-    # metrics but do not produce metrics themselves
-    for collector in caches.collectors_by_name.values():
-        collector.collect()
-
     output = []
 
     for metric in registry.collect():
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 6661887d9f..658bf373b7 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -17,6 +17,7 @@ from synapse.events import EventBase
 from synapse.push.presentable_names import calculate_room_name, name_from_member_event
 from synapse.storage.controllers import StorageControllers
 from synapse.storage.databases.main import DataStore
+from synapse.util.async_helpers import concurrently_execute
 
 
 async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int:
@@ -25,13 +26,19 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
 
     badge = len(invites)
 
-    for room_id in joins:
-        notifs = await (
-            store.get_unread_event_push_actions_by_room_for_user(
+    room_notifs = []
+
+    async def get_room_unread_count(room_id: str) -> None:
+        room_notifs.append(
+            await store.get_unread_event_push_actions_by_room_for_user(
                 room_id,
                 user_id,
             )
         )
+
+    await concurrently_execute(get_room_unread_count, joins, 10)
+
+    for notifs in room_notifs:
         if notifs.notify_count == 0:
             continue
 
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index fa3266720b..bac754e1b1 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -61,9 +61,11 @@ from synapse.rest.admin.rooms import (
     MakeRoomAdminRestServlet,
     RoomEventContextServlet,
     RoomMembersRestServlet,
+    RoomMessagesRestServlet,
     RoomRestServlet,
     RoomRestV2Servlet,
     RoomStateRestServlet,
+    RoomTimestampToEventRestServlet,
 )
 from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
 from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
@@ -271,6 +273,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     DestinationResetConnectionRestServlet(hs).register(http_server)
     DestinationRestServlet(hs).register(http_server)
     ListDestinationsRestServlet(hs).register(http_server)
+    RoomMessagesRestServlet(hs).register(http_server)
+    RoomTimestampToEventRestServlet(hs).register(http_server)
 
     # Some servlets only get registered for the main process.
     if hs.config.worker.worker_app is None:
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 3d870629c4..747e6fda83 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -35,6 +35,7 @@ from synapse.rest.admin._base import (
 )
 from synapse.storage.databases.main.room import RoomSortOrder
 from synapse.storage.state import StateFilter
+from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, RoomID, UserID, create_requester
 from synapse.util import json_decoder
 
@@ -858,3 +859,106 @@ class BlockRoomRestServlet(RestServlet):
             await self._store.unblock_room(room_id)
 
         return HTTPStatus.OK, {"block": block}
+
+
+class RoomMessagesRestServlet(RestServlet):
+    """
+    Get messages list of a room.
+    """
+
+    PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/messages$")
+
+    def __init__(self, hs: "HomeServer"):
+        self._hs = hs
+        self._clock = hs.get_clock()
+        self._pagination_handler = hs.get_pagination_handler()
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastores().main
+
+    async def on_GET(
+        self, request: SynapseRequest, room_id: str
+    ) -> Tuple[int, JsonDict]:
+        requester = await self._auth.get_user_by_req(request)
+        await assert_user_is_admin(self._auth, requester)
+
+        pagination_config = await PaginationConfig.from_request(
+            self._store, request, default_limit=10
+        )
+        # Twisted will have processed the args by now.
+        assert request.args is not None
+        as_client_event = b"raw" not in request.args
+        filter_str = parse_string(request, "filter", encoding="utf-8")
+        if filter_str:
+            filter_json = urlparse.unquote(filter_str)
+            event_filter: Optional[Filter] = Filter(
+                self._hs, json_decoder.decode(filter_json)
+            )
+            if (
+                event_filter
+                and event_filter.filter_json.get("event_format", "client")
+                == "federation"
+            ):
+                as_client_event = False
+        else:
+            event_filter = None
+
+        msgs = await self._pagination_handler.get_messages(
+            room_id=room_id,
+            requester=requester,
+            pagin_config=pagination_config,
+            as_client_event=as_client_event,
+            event_filter=event_filter,
+            use_admin_priviledge=True,
+        )
+
+        return HTTPStatus.OK, msgs
+
+
+class RoomTimestampToEventRestServlet(RestServlet):
+    """
+    API endpoint to fetch the `event_id` of the closest event to the given
+    timestamp (`ts` query parameter) in the given direction (`dir` query
+    parameter).
+
+    Useful for cases like jump to date so you can start paginating messages from
+    a given date in the archive.
+
+    `ts` is a timestamp in milliseconds where we will find the closest event in
+    the given direction.
+
+    `dir` can be `f` or `b` to indicate forwards and backwards in time from the
+    given timestamp.
+
+    GET /_synapse/admin/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
+    {
+        "event_id": ...
+    }
+    """
+
+    PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/timestamp_to_event$")
+
+    def __init__(self, hs: "HomeServer"):
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastores().main
+        self._timestamp_lookup_handler = hs.get_timestamp_lookup_handler()
+
+    async def on_GET(
+        self, request: SynapseRequest, room_id: str
+    ) -> Tuple[int, JsonDict]:
+        requester = await self._auth.get_user_by_req(request)
+        await assert_user_is_admin(self._auth, requester)
+
+        timestamp = parse_integer(request, "ts", required=True)
+        direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])
+
+        (
+            event_id,
+            origin_server_ts,
+        ) = await self._timestamp_lookup_handler.get_event_for_timestamp(
+            requester, room_id, timestamp, direction
+        )
+
+        return HTTPStatus.OK, {
+            "event_id": event_id,
+            "origin_server_ts": origin_server_ts,
+        }
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 1f9a8ccc23..a09aaf3448 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import logging
 import random
-from typing import TYPE_CHECKING, Optional, Tuple
+from typing import TYPE_CHECKING, List, Optional, Tuple
 from urllib.parse import urlparse
 
 from pydantic import StrictBool, StrictStr, constr
@@ -41,7 +41,11 @@ from synapse.http.servlet import (
 from synapse.http.site import SynapseRequest
 from synapse.metrics import threepid_send_requests
 from synapse.push.mailer import Mailer
-from synapse.rest.client.models import AuthenticationData, EmailRequestTokenBody
+from synapse.rest.client.models import (
+    AuthenticationData,
+    EmailRequestTokenBody,
+    MsisdnRequestTokenBody,
+)
 from synapse.rest.models import RequestBodyModel
 from synapse.types import JsonDict
 from synapse.util.msisdn import phone_number_to_msisdn
@@ -400,23 +404,16 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
         self.identity_handler = hs.get_identity_handler()
 
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
-        body = parse_json_object_from_request(request)
-        assert_params_in_dict(
-            body, ["client_secret", "country", "phone_number", "send_attempt"]
+        body = parse_and_validate_json_object_from_request(
+            request, MsisdnRequestTokenBody
         )
-        client_secret = body["client_secret"]
-        assert_valid_client_secret(client_secret)
-
-        country = body["country"]
-        phone_number = body["phone_number"]
-        send_attempt = body["send_attempt"]
-        next_link = body.get("next_link")  # Optional param
-
-        msisdn = phone_number_to_msisdn(country, phone_number)
+        msisdn = phone_number_to_msisdn(body.country, body.phone_number)
 
         if not await check_3pid_allowed(self.hs, "msisdn", msisdn):
             raise SynapseError(
                 403,
+                # TODO: is this error message accurate? Looks like we've only rejected
+                #       this phone number, not necessarily all phone numbers
                 "Account phone numbers are not authorized on this server",
                 Codes.THREEPID_DENIED,
             )
@@ -425,9 +422,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
             request, "msisdn", msisdn
         )
 
-        if next_link:
+        if body.next_link:
             # Raise if the provided next_link value isn't valid
-            assert_valid_next_link(self.hs, next_link)
+            assert_valid_next_link(self.hs, body.next_link)
 
         existing_user_id = await self.store.get_user_id_by_threepid("msisdn", msisdn)
 
@@ -454,15 +451,15 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
 
         ret = await self.identity_handler.requestMsisdnToken(
             self.hs.config.registration.account_threepid_delegate_msisdn,
-            country,
-            phone_number,
-            client_secret,
-            send_attempt,
-            next_link,
+            body.country,
+            body.phone_number,
+            body.client_secret,
+            body.send_attempt,
+            body.next_link,
         )
 
         threepid_send_requests.labels(type="msisdn", reason="add_threepid").observe(
-            send_attempt
+            body.send_attempt
         )
 
         return 200, ret
@@ -845,17 +842,18 @@ class AccountStatusRestServlet(RestServlet):
         self._auth = hs.get_auth()
         self._account_handler = hs.get_account_handler()
 
+    class PostBody(RequestBodyModel):
+        # TODO: we could validate that each user id is an mxid here, and/or parse it
+        #       as a UserID
+        user_ids: List[StrictStr]
+
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         await self._auth.get_user_by_req(request)
 
-        body = parse_json_object_from_request(request)
-        if "user_ids" not in body:
-            raise SynapseError(
-                400, "Required parameter 'user_ids' is missing", Codes.MISSING_PARAM
-            )
+        body = parse_and_validate_json_object_from_request(request, self.PostBody)
 
         statuses, failures = await self._account_handler.get_account_statuses(
-            body["user_ids"],
+            body.user_ids,
             allow_remote=True,
         )
 
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index a395694fa5..f653d2a3e1 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -27,9 +27,9 @@ from synapse.http.servlet import (
 )
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import log_kv, set_tag
+from synapse.rest.client._base import client_patterns, interactive_auth_handler
 from synapse.types import JsonDict, StreamToken
-
-from ._base import client_patterns, interactive_auth_handler
+from synapse.util.cancellation import cancellable
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -156,6 +156,7 @@ class KeyQueryServlet(RestServlet):
         self.auth = hs.get_auth()
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
+    @cancellable
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request, allow_guest=True)
         user_id = requester.user.to_string()
@@ -199,6 +200,7 @@ class KeyChangesServlet(RestServlet):
         self.device_handler = hs.get_device_handler()
         self.store = hs.get_datastores().main
 
+    @cancellable
     async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request, allow_guest=True)
 
diff --git a/synapse/rest/client/models.py b/synapse/rest/client/models.py
index 3150602997..6278450c70 100644
--- a/synapse/rest/client/models.py
+++ b/synapse/rest/client/models.py
@@ -25,8 +25,8 @@ class AuthenticationData(RequestBodyModel):
 
     (The name "Authentication Data" is taken directly from the spec.)
 
-    Additional keys will be present, depending on the `type` field. Use `.dict()` to
-    access them.
+    Additional keys will be present, depending on the `type` field. Use
+    `.dict(exclude_unset=True)` to access them.
     """
 
     class Config:
@@ -36,7 +36,7 @@ class AuthenticationData(RequestBodyModel):
     type: Optional[StrictStr] = None
 
 
-class EmailRequestTokenBody(RequestBodyModel):
+class ThreePidRequestTokenBody(RequestBodyModel):
     if TYPE_CHECKING:
         client_secret: StrictStr
     else:
@@ -47,7 +47,7 @@ class EmailRequestTokenBody(RequestBodyModel):
             max_length=255,
             strict=True,
         )
-    email: StrictStr
+
     id_server: Optional[StrictStr]
     id_access_token: Optional[StrictStr]
     next_link: Optional[StrictStr]
@@ -61,9 +61,25 @@ class EmailRequestTokenBody(RequestBodyModel):
             raise ValueError("id_access_token is required if an id_server is supplied.")
         return token
 
+
+class EmailRequestTokenBody(ThreePidRequestTokenBody):
+    email: StrictStr
+
     # Canonicalise the email address. The addresses are all stored canonicalised
     # in the database. This allows the user to reset his password without having to
     # know the exact spelling (eg. upper and lower case) of address in the database.
     # Without this, an email stored in the database as "foo@bar.com" would cause
     # user requests for "FOO@bar.com" to raise a Not Found error.
     _email_validator = validator("email", allow_reuse=True)(validate_email)
+
+
+if TYPE_CHECKING:
+    ISO3116_1_Alpha_2 = StrictStr
+else:
+    # Per spec: two-letter uppercase ISO-3166-1-alpha-2
+    ISO3116_1_Alpha_2 = constr(regex="[A-Z]{2}", strict=True)
+
+
+class MsisdnRequestTokenBody(ThreePidRequestTokenBody):
+    country: ISO3116_1_Alpha_2
+    phone_number: StrictStr
diff --git a/synapse/server.py b/synapse/server.py
index 5a99c0b344..df3a1cb405 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -341,7 +341,17 @@ class HomeServer(metaclass=abc.ABCMeta):
         return domain_specific_string.domain == self.hostname
 
     def is_mine_id(self, string: str) -> bool:
-        return string.split(":", 1)[1] == self.hostname
+        """Determines whether a user ID or room alias originates from this homeserver.
+
+        Returns:
+            `True` if the hostname part of the user ID or room alias matches this
+            homeserver.
+            `False` otherwise, or if the user ID or room alias is malformed.
+        """
+        localpart_hostname = string.split(":", 1)
+        if len(localpart_hostname) < 2:
+            return False
+        return localpart_hostname[1] == self.hostname
 
     @cache_in_self
     def get_clock(self) -> Clock:
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index ba5380ce3e..bbe568bf05 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -36,6 +36,7 @@ from synapse.storage.util.partial_state_events_tracker import (
     PartialStateEventsTracker,
 )
 from synapse.types import MutableStateMap, StateMap
+from synapse.util.cancellation import cancellable
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -229,6 +230,7 @@ class StateStorageController:
 
     @trace
     @tag_args
+    @cancellable
     async def get_state_ids_for_events(
         self,
         event_ids: Collection[str],
@@ -350,6 +352,7 @@ class StateStorageController:
 
     @trace
     @tag_args
+    @cancellable
     async def get_state_group_for_events(
         self,
         event_ids: Collection[str],
@@ -398,6 +401,7 @@ class StateStorageController:
             event_id, room_id, prev_group, delta_ids, current_state_ids
         )
 
+    @cancellable
     async def get_current_state_ids(
         self,
         room_id: str,
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b394a6658b..e881bff7fb 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -533,15 +533,14 @@ class DatabasePool:
         if isinstance(self.engine, Sqlite3Engine):
             self._unsafe_to_upsert_tables.add("user_directory_search")
 
-        if self.engine.can_native_upsert:
-            # Check ASAP (and then later, every 1s) to see if we have finished
-            # background updates of tables that aren't safe to update.
-            self._clock.call_later(
-                0.0,
-                run_as_background_process,
-                "upsert_safety_check",
-                self._check_safe_to_upsert,
-            )
+        # Check ASAP (and then later, every 1s) to see if we have finished
+        # background updates of tables that aren't safe to update.
+        self._clock.call_later(
+            0.0,
+            run_as_background_process,
+            "upsert_safety_check",
+            self._check_safe_to_upsert,
+        )
 
     def name(self) -> str:
         "Return the name of this database"
@@ -1160,11 +1159,8 @@ class DatabasePool:
         attempts = 0
         while True:
             try:
-                # We can autocommit if we are going to use native upserts
-                autocommit = (
-                    self.engine.can_native_upsert
-                    and table not in self._unsafe_to_upsert_tables
-                )
+                # We can autocommit if it is safe to upsert
+                autocommit = table not in self._unsafe_to_upsert_tables
 
                 return await self.runInteraction(
                     desc,
@@ -1199,7 +1195,7 @@ class DatabasePool:
     ) -> bool:
         """
         Pick the UPSERT method which works best on the platform. Either the
-        native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+        native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method.
 
         Args:
             txn: The transaction to use.
@@ -1207,14 +1203,15 @@ class DatabasePool:
             keyvalues: The unique key tables and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
-            lock: True to lock the table when doing the upsert.
+            lock: True to lock the table when doing the upsert. Unused when performing
+                a native upsert.
         Returns:
             Returns True if a row was inserted or updated (i.e. if `values` is
             not empty then this always returns True)
         """
         insertion_values = insertion_values or {}
 
-        if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+        if table not in self._unsafe_to_upsert_tables:
             return self.simple_upsert_txn_native_upsert(
                 txn, table, keyvalues, values, insertion_values=insertion_values
             )
@@ -1365,14 +1362,12 @@ class DatabasePool:
             value_names: The value column names
             value_values: A list of each row's value column values.
                 Ignored if value_names is empty.
-            lock: True to lock the table when doing the upsert. Unused if the database engine
-                supports native upserts.
+            lock: True to lock the table when doing the upsert. Unused when performing
+                a native upsert.
         """
 
-        # We can autocommit if we are going to use native upserts
-        autocommit = (
-            self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
-        )
+        # We can autocommit if it safe to upsert
+        autocommit = table not in self._unsafe_to_upsert_tables
 
         await self.runInteraction(
             desc,
@@ -1406,10 +1401,10 @@ class DatabasePool:
             value_names: The value column names
             value_values: A list of each row's value column values.
                 Ignored if value_names is empty.
-            lock: True to lock the table when doing the upsert. Unused if the database engine
-                supports native upserts.
+            lock: True to lock the table when doing the upsert. Unused when performing
+                a native upsert.
         """
-        if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+        if table not in self._unsafe_to_upsert_tables:
             return self.simple_upsert_many_txn_native_upsert(
                 txn, table, key_names, key_values, value_names, value_values
             )
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index ca0fe8c4be..5d700ca6c3 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -53,6 +53,7 @@ from synapse.util import json_decoder, json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 from synapse.util.stringutils import shortstr
 
@@ -668,6 +669,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
         ...
 
     @trace
+    @cancellable
     async def get_user_devices_from_cache(
         self, query_list: List[Tuple[str, Optional[str]]]
     ) -> Tuple[Set[str], Dict[str, Dict[str, JsonDict]]]:
@@ -743,6 +745,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
 
         return self._device_list_stream_cache.get_all_entities_changed(from_key)
 
+    @cancellable
     async def get_users_whose_devices_changed(
         self,
         from_key: int,
@@ -1221,6 +1224,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
             desc="get_min_device_lists_changes_in_room",
         )
 
+    @cancellable
     async def get_device_list_changes_in_rooms(
         self, room_ids: Collection[str], from_id: int
     ) -> Optional[Set[str]]:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 46c0d06157..8e9e1b0b4b 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -50,6 +50,7 @@ from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
@@ -135,6 +136,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
         return now_stream_id, []
 
     @trace
+    @cancellable
     async def get_e2e_device_keys_for_cs_api(
         self, query_list: List[Tuple[str, Optional[str]]]
     ) -> Dict[str, Dict[str, JsonDict]]:
@@ -197,6 +199,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
         ...
 
     @trace
+    @cancellable
     async def get_e2e_device_keys_and_signatures(
         self,
         query_list: Collection[Tuple[str, Optional[str]]],
@@ -887,6 +890,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
 
         return keys
 
+    @cancellable
     async def get_e2e_cross_signing_keys_bulk(
         self, user_ids: List[str], from_user_id: Optional[str] = None
     ) -> Dict[str, Optional[Dict[str, JsonDict]]]:
@@ -902,7 +906,6 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
             keys were not found, either their user ID will not be in the dict,
             or their user ID will map to None.
         """
-
         result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
 
         if from_user_id:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index c836078da6..ca47a22bf1 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -48,6 +48,7 @@ from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.lrucache import LruCache
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
@@ -976,6 +977,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
         return int(min_depth) if min_depth is not None else None
 
+    @cancellable
     async def get_forward_extremities_for_room_at_stream_ordering(
         self, room_id: str, stream_ordering: int
     ) -> List[str]:
@@ -1606,7 +1608,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                 logger.info("Invalid prev_events for %s", event_id)
                 continue
 
-            if room_version.event_format == EventFormatVersions.V1:
+            if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
                 for prev_event_tuple in prev_events:
                     if (
                         not isinstance(prev_event_tuple, list)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 9b997c304d..52914febf9 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -81,6 +81,7 @@ from synapse.util import unwrapFirstError
 from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import AsyncLruCache
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
 
@@ -339,6 +340,7 @@ class EventsWorkerStore(SQLBaseStore):
     ) -> Optional[EventBase]:
         ...
 
+    @cancellable
     async def get_event(
         self,
         event_id: str,
@@ -433,6 +435,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     @trace
     @tag_args
+    @cancellable
     async def get_events_as_list(
         self,
         event_ids: Collection[str],
@@ -584,6 +587,7 @@ class EventsWorkerStore(SQLBaseStore):
 
         return events
 
+    @cancellable
     async def _get_events_from_cache_or_db(
         self, event_ids: Iterable[str], allow_rejected: bool = False
     ) -> Dict[str, EventCacheEntry]:
@@ -1156,7 +1160,7 @@ class EventsWorkerStore(SQLBaseStore):
             if format_version is None:
                 # This means that we stored the event before we had the concept
                 # of a event format version, so it must be a V1 event.
-                format_version = EventFormatVersions.V1
+                format_version = EventFormatVersions.ROOM_V1_V2
 
             room_version_id = row.room_version_id
 
@@ -1186,10 +1190,10 @@ class EventsWorkerStore(SQLBaseStore):
                 #
                 # So, the following approximations should be adequate.
 
-                if format_version == EventFormatVersions.V1:
+                if format_version == EventFormatVersions.ROOM_V1_V2:
                     # if it's event format v1 then it must be room v1 or v2
                     room_version = RoomVersions.V1
-                elif format_version == EventFormatVersions.V2:
+                elif format_version == EventFormatVersions.ROOM_V3:
                     # if it's event format v2 then it must be room v3
                     room_version = RoomVersions.V3
                 else:
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
         now = self._clock.time_msec()
         token = random_string(6)
 
-        if self.db_pool.engine.can_native_upsert:
-
-            def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
-                # We take out the lock if either a) there is no row for the lock
-                # already, b) the existing row has timed out, or c) the row is
-                # for this instance (which means the process got killed and
-                # restarted)
-                sql = """
-                    INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
-                    VALUES (?, ?, ?, ?, ?)
-                    ON CONFLICT (lock_name, lock_key)
-                    DO UPDATE
-                        SET
-                            token = EXCLUDED.token,
-                            instance_name = EXCLUDED.instance_name,
-                            last_renewed_ts = EXCLUDED.last_renewed_ts
-                        WHERE
-                            worker_locks.last_renewed_ts < ?
-                            OR worker_locks.instance_name = EXCLUDED.instance_name
-                """
-                txn.execute(
-                    sql,
-                    (
-                        lock_name,
-                        lock_key,
-                        self._instance_name,
-                        token,
-                        now,
-                        now - _LOCK_TIMEOUT_MS,
-                    ),
-                )
-
-                # We only acquired the lock if we inserted or updated the table.
-                return bool(txn.rowcount)
-
-            did_lock = await self.db_pool.runInteraction(
-                "try_acquire_lock",
-                _try_acquire_lock_txn,
-                # We can autocommit here as we're executing a single query, this
-                # will avoid serialization errors.
-                db_autocommit=True,
+        def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
+            # We take out the lock if either a) there is no row for the lock
+            # already, b) the existing row has timed out, or c) the row is
+            # for this instance (which means the process got killed and
+            # restarted)
+            sql = """
+               INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
+               VALUES (?, ?, ?, ?, ?)
+               ON CONFLICT (lock_name, lock_key)
+               DO UPDATE
+                   SET
+                       token = EXCLUDED.token,
+                       instance_name = EXCLUDED.instance_name,
+                       last_renewed_ts = EXCLUDED.last_renewed_ts
+                   WHERE
+                       worker_locks.last_renewed_ts < ?
+                       OR worker_locks.instance_name = EXCLUDED.instance_name
+           """
+            txn.execute(
+                sql,
+                (
+                    lock_name,
+                    lock_key,
+                    self._instance_name,
+                    token,
+                    now,
+                    now - _LOCK_TIMEOUT_MS,
+                ),
             )
-            if not did_lock:
-                return None
-
-        else:
-            # If we're on an old SQLite we emulate the above logic by first
-            # clearing out any existing stale locks and then upserting.
-
-            def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
-                sql = """
-                    DELETE FROM worker_locks
-                    WHERE
-                        lock_name = ?
-                        AND lock_key = ?
-                        AND (last_renewed_ts < ? OR instance_name = ?)
-                """
-                txn.execute(
-                    sql,
-                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
-                )
-
-                inserted = self.db_pool.simple_upsert_txn_emulated(
-                    txn,
-                    table="worker_locks",
-                    keyvalues={
-                        "lock_name": lock_name,
-                        "lock_key": lock_key,
-                    },
-                    values={},
-                    insertion_values={
-                        "token": token,
-                        "last_renewed_ts": self._clock.time_msec(),
-                        "instance_name": self._instance_name,
-                    },
-                )
-
-                return inserted
 
-            did_lock = await self.db_pool.runInteraction(
-                "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
-            )
+            # We only acquired the lock if we inserted or updated the table.
+            return bool(txn.rowcount)
 
-            if not did_lock:
-                return None
+        did_lock = await self.db_pool.runInteraction(
+            "try_acquire_lock",
+            _try_acquire_lock_txn,
+            # We can autocommit here as we're executing a single query, this
+            # will avoid serialization errors.
+            db_autocommit=True,
+        )
+        if not did_lock:
+            return None
 
         lock = Lock(
             self._reactor,
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 124c70ad37..719a12b0ae 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -675,6 +675,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             values={
                 "stream_id": stream_id,
                 "event_id": event_id,
+                "event_stream_ordering": stream_ordering,
                 "data": json_encoder.encode(data),
             },
             # receipts_linearized has a unique constraint on
@@ -812,7 +813,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
         # FIXME: This shouldn't invalidate the whole cache
         txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
 
-        self.db_pool.simple_delete_txn(
+        self.db_pool.simple_upsert_txn(
             txn,
             table="receipts_graph",
             keyvalues={
@@ -820,19 +821,86 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 "receipt_type": receipt_type,
                 "user_id": user_id,
             },
-        )
-        self.db_pool.simple_insert_txn(
-            txn,
-            table="receipts_graph",
             values={
-                "room_id": room_id,
-                "receipt_type": receipt_type,
-                "user_id": user_id,
                 "event_ids": json_encoder.encode(event_ids),
                 "data": json_encoder.encode(data),
             },
+            # receipts_graph has a unique constraint on
+            # (user_id, room_id, receipt_type), so no need to lock
+            lock=False,
         )
 
 
-class ReceiptsStore(ReceiptsWorkerStore):
+class ReceiptsBackgroundUpdateStore(SQLBaseStore):
+    POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
+
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self.db_pool.updates.register_background_update_handler(
+            self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+            self._populate_receipt_event_stream_ordering,
+        )
+
+    async def _populate_receipt_event_stream_ordering(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        def _populate_receipt_event_stream_ordering_txn(
+            txn: LoggingTransaction,
+        ) -> bool:
+
+            if "max_stream_id" in progress:
+                max_stream_id = progress["max_stream_id"]
+            else:
+                txn.execute("SELECT max(stream_id) FROM receipts_linearized")
+                res = txn.fetchone()
+                if res is None or res[0] is None:
+                    return True
+                else:
+                    max_stream_id = res[0]
+
+            start = progress.get("stream_id", 0)
+            stop = start + batch_size
+
+            sql = """
+                UPDATE receipts_linearized
+                SET event_stream_ordering = (
+                    SELECT stream_ordering
+                    FROM events
+                    WHERE event_id = receipts_linearized.event_id
+                )
+                WHERE stream_id >= ? AND stream_id < ?
+            """
+            txn.execute(sql, (start, stop))
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+                {
+                    "stream_id": stop,
+                    "max_stream_id": max_stream_id,
+                },
+            )
+
+            return stop > max_stream_id
+
+        finished = await self.db_pool.runInteraction(
+            "_remove_devices_from_device_inbox_txn",
+            _populate_receipt_event_stream_ordering_txn,
+        )
+
+        if finished:
+            await self.db_pool.updates._end_background_update(
+                self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
+            )
+
+        return batch_size
+
+
+class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
     pass
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 7fb9c801da..ac821878b0 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -175,6 +175,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
                 "is_guest",
                 "admin",
                 "consent_version",
+                "consent_ts",
                 "consent_server_notice_sent",
                 "appservice_id",
                 "creation_ts",
@@ -2227,7 +2228,10 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
                 txn,
                 table="users",
                 keyvalues={"name": user_id},
-                updatevalues={"consent_version": consent_version},
+                updatevalues={
+                    "consent_version": consent_version,
+                    "consent_ts": self._clock.time_msec(),
+                },
             )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 4f0adb136a..fdb4684e12 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -32,10 +32,7 @@ import attr
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import (
-    run_as_background_process,
-    wrap_as_background_process,
-)
+from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
     DatabasePool,
@@ -55,6 +52,7 @@ from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
 
@@ -90,16 +88,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # at a time. Keyed by room_id.
         self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
 
-        # Is the current_state_events.membership up to date? Or is the
-        # background update still running?
-        self._current_state_events_membership_up_to_date = False
-
-        txn = db_conn.cursor(
-            txn_name="_check_safe_current_state_events_membership_updated"
-        )
-        self._check_safe_current_state_events_membership_updated_txn(txn)
-        txn.close()
-
         if (
             self.hs.config.worker.run_background_tasks
             and self.hs.config.metrics.metrics_flags.known_servers
@@ -156,34 +144,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         self._known_servers_count = max([count, 1])
         return self._known_servers_count
 
-    def _check_safe_current_state_events_membership_updated_txn(
-        self, txn: LoggingTransaction
-    ) -> None:
-        """Checks if it is safe to assume the new current_state_events
-        membership column is up to date
-        """
-
-        pending_update = self.db_pool.simple_select_one_txn(
-            txn,
-            table="background_updates",
-            keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
-            retcols=["update_name"],
-            allow_none=True,
-        )
-
-        self._current_state_events_membership_up_to_date = not pending_update
-
-        # If the update is still running, reschedule to run.
-        if pending_update:
-            self._clock.call_later(
-                15.0,
-                run_as_background_process,
-                "_check_safe_current_state_events_membership_updated",
-                self.db_pool.runInteraction,
-                "_check_safe_current_state_events_membership_updated",
-                self._check_safe_current_state_events_membership_updated_txn,
-            )
-
     @cached(max_entries=100000, iterable=True)
     async def get_users_in_room(self, room_id: str) -> List[str]:
         """
@@ -191,8 +151,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         (aka. with the lowest depth). This is done to match the sort in
         `get_current_hosts_in_room()` and so we can re-use the cache but it's
         not horrible to have here either.
-        """
 
+        Uses `m.room.member`s in the room state at the current forward extremities to
+        determine which users are in the room.
+
+        Will return inaccurate results for rooms with partial state, since the state for
+        the forward extremities of those rooms will exclude most members. We may also
+        calculate room state incorrectly for such rooms and believe that a member is or
+        is not in the room when the opposite is true.
+        """
         return await self.db_pool.runInteraction(
             "get_users_in_room", self.get_users_in_room_txn, room_id
         )
@@ -204,31 +171,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         `get_current_hosts_in_room()` and so we can re-use the cache but it's
         not horrible to have here either.
         """
-        # If we can assume current_state_events.membership is up to date
-        # then we can avoid a join, which is a Very Good Thing given how
-        # frequently this function gets called.
-        if self._current_state_events_membership_up_to_date:
-            sql = """
-                SELECT c.state_key FROM current_state_events as c
-                /* Get the depth of the event from the events table */
-                INNER JOIN events AS e USING (event_id)
-                WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
-                /* Sorted by lowest depth first */
-                ORDER BY e.depth ASC;
-            """
-        else:
-            sql = """
-                SELECT c.state_key FROM room_memberships as m
-                /* Get the depth of the event from the events table */
-                INNER JOIN events AS e USING (event_id)
-                INNER JOIN current_state_events as c
-                ON m.event_id = c.event_id
-                AND m.room_id = c.room_id
-                AND m.user_id = c.state_key
-                WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
-                /* Sorted by lowest depth first */
-                ORDER BY e.depth ASC;
-            """
+        sql = """
+            SELECT c.state_key FROM current_state_events as c
+            /* Get the depth of the event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
+            /* Sorted by lowest depth first */
+            ORDER BY e.depth ASC;
+        """
 
         txn.execute(sql, (room_id, Membership.JOIN))
         return [r[0] for r in txn]
@@ -345,28 +295,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             # We do this all in one transaction to keep the cache small.
             # FIXME: get rid of this when we have room_stats
 
-            # If we can assume current_state_events.membership is up to date
-            # then we can avoid a join, which is a Very Good Thing given how
-            # frequently this function gets called.
-            if self._current_state_events_membership_up_to_date:
-                # Note, rejected events will have a null membership field, so
-                # we we manually filter them out.
-                sql = """
-                    SELECT count(*), membership FROM current_state_events
-                    WHERE type = 'm.room.member' AND room_id = ?
-                        AND membership IS NOT NULL
-                    GROUP BY membership
-                """
-            else:
-                sql = """
-                    SELECT count(*), m.membership FROM room_memberships as m
-                    INNER JOIN current_state_events as c
-                    ON m.event_id = c.event_id
-                    AND m.room_id = c.room_id
-                    AND m.user_id = c.state_key
-                    WHERE c.type = 'm.room.member' AND c.room_id = ?
-                    GROUP BY m.membership
-                """
+            # Note, rejected events will have a null membership field, so
+            # we we manually filter them out.
+            sql = """
+                SELECT count(*), membership FROM current_state_events
+                WHERE type = 'm.room.member' AND room_id = ?
+                    AND membership IS NOT NULL
+                GROUP BY membership
+            """
 
             txn.execute(sql, (room_id,))
             res: Dict[str, MemberSummary] = {}
@@ -375,30 +311,18 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
             # we order by membership and then fairly arbitrarily by event_id so
             # heroes are consistent
-            if self._current_state_events_membership_up_to_date:
-                # Note, rejected events will have a null membership field, so
-                # we we manually filter them out.
-                sql = """
-                    SELECT state_key, membership, event_id
-                    FROM current_state_events
-                    WHERE type = 'm.room.member' AND room_id = ?
-                        AND membership IS NOT NULL
-                    ORDER BY
-                        CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
-                        event_id ASC
-                    LIMIT ?
-                """
-            else:
-                sql = """
-                    SELECT c.state_key, m.membership, c.event_id
-                    FROM room_memberships as m
-                    INNER JOIN current_state_events as c USING (room_id, event_id)
-                    WHERE c.type = 'm.room.member' AND c.room_id = ?
-                    ORDER BY
-                        CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
-                        c.event_id ASC
-                    LIMIT ?
-                """
+            # Note, rejected events will have a null membership field, so
+            # we we manually filter them out.
+            sql = """
+                SELECT state_key, membership, event_id
+                FROM current_state_events
+                WHERE type = 'm.room.member' AND room_id = ?
+                    AND membership IS NOT NULL
+                ORDER BY
+                    CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
+                    event_id ASC
+                LIMIT ?
+            """
 
             # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
             txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
@@ -641,27 +565,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # We use `current_state_events` here and not `local_current_membership`
         # as a) this gets called with remote users and b) this only gets called
         # for rooms the server is participating in.
-        if self._current_state_events_membership_up_to_date:
-            sql = """
-                SELECT room_id, e.instance_name, e.stream_ordering
-                FROM current_state_events AS c
-                INNER JOIN events AS e USING (room_id, event_id)
-                WHERE
-                    c.type = 'm.room.member'
-                    AND c.state_key = ?
-                    AND c.membership = ?
-            """
-        else:
-            sql = """
-                SELECT room_id, e.instance_name, e.stream_ordering
-                FROM current_state_events AS c
-                INNER JOIN room_memberships AS m USING (room_id, event_id)
-                INNER JOIN events AS e USING (room_id, event_id)
-                WHERE
-                    c.type = 'm.room.member'
-                    AND c.state_key = ?
-                    AND m.membership = ?
-            """
+        sql = """
+            SELECT room_id, e.instance_name, e.stream_ordering
+            FROM current_state_events AS c
+            INNER JOIN events AS e USING (room_id, event_id)
+            WHERE
+                c.type = 'm.room.member'
+                AND c.state_key = ?
+                AND c.membership = ?
+        """
 
         txn.execute(sql, (user_id, Membership.JOIN))
         return frozenset(
@@ -699,27 +611,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             user_ids,
         )
 
-        if self._current_state_events_membership_up_to_date:
-            sql = f"""
-                SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
-                FROM current_state_events AS c
-                INNER JOIN events AS e USING (room_id, event_id)
-                WHERE
-                    c.type = 'm.room.member'
-                    AND c.membership = ?
-                    AND {clause}
-            """
-        else:
-            sql = f"""
-                SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
-                FROM current_state_events AS c
-                INNER JOIN room_memberships AS m USING (room_id, event_id)
-                INNER JOIN events AS e USING (room_id, event_id)
-                WHERE
-                    c.type = 'm.room.member'
-                    AND m.membership = ?
-                    AND {clause}
-            """
+        sql = f"""
+            SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
+            FROM current_state_events AS c
+            INNER JOIN events AS e USING (room_id, event_id)
+            WHERE
+                c.type = 'm.room.member'
+                AND c.membership = ?
+                AND {clause}
+        """
 
         txn.execute(sql, [Membership.JOIN] + args)
 
@@ -770,6 +670,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             _get_users_server_still_shares_room_with_txn,
         )
 
+    @cancellable
     async def get_rooms_for_user(
         self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
     ) -> FrozenSet[str]:
@@ -1020,6 +921,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         longest is good because they're most likely to have anything we ask
         about.
 
+        Uses `m.room.member`s in the room state at the current forward extremities to
+        determine which hosts are in the room.
+
+        Will return inaccurate results for rooms with partial state, since the state for
+        the forward extremities of those rooms will exclude most members. We may also
+        calculate room state incorrectly for such rooms and believe that a host is or
+        is not in the room when the opposite is true.
+
         Returns:
             Returns a list of servers sorted by longest in the room first. (aka.
             sorted by join with the lowest depth first).
@@ -1042,6 +951,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             # We use a `Set` just for fast lookups
             domain_set: Set[str] = set()
             for u in users:
+                if ":" not in u:
+                    continue
                 domain = get_domain_from_id(u)
                 if domain not in domain_set:
                     domain_set.add(domain)
@@ -1075,7 +986,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                 ORDER BY min(e.depth) ASC;
             """
             txn.execute(sql, (room_id,))
-            return [d for d, in txn]
+            # `server_domain` will be `NULL` for malformed MXIDs with no colons.
+            return [d for d, in txn if d is not None]
 
         return await self.db_pool.runInteraction(
             "get_current_hosts_in_room", get_current_hosts_in_room_txn
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 0b10af0e58..af7bebee80 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -23,6 +23,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
+from synapse.logging.opentracing import trace
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
@@ -36,6 +37,7 @@ from synapse.storage.state import StateFilter
 from synapse.types import JsonDict, JsonMapping, StateMap
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
@@ -142,6 +144,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return room_version
 
+    @trace
     async def get_metadata_for_events(
         self, event_ids: Collection[str]
     ) -> Dict[str, EventMetadata]:
@@ -281,6 +284,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
     # FIXME: how should this be cached?
+    @cancellable
     async def get_partial_filtered_current_state_ids(
         self, room_id: str, state_filter: Optional[StateFilter] = None
     ) -> StateMap[str]:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b4c652acf3..356d4ca788 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore):
             absolutes: Absolute (set) fields
             additive_relatives: Fields that will be added onto if existing row present.
         """
-        if self.database_engine.can_native_upsert:
-            absolute_updates = [
-                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
-                for field in absolutes.keys()
-            ]
-
-            relative_updates = [
-                "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
-                % {"table": table, "field": field}
-                for field in additive_relatives.keys()
-            ]
-
-            insert_cols = []
-            qargs = []
-
-            for (key, val) in chain(
-                keyvalues.items(), absolutes.items(), additive_relatives.items()
-            ):
-                insert_cols.append(key)
-                qargs.append(val)
+        absolute_updates = [
+            "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+            for field in absolutes.keys()
+        ]
+
+        relative_updates = [
+            "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
+            % {"table": table, "field": field}
+            for field in additive_relatives.keys()
+        ]
+
+        insert_cols = []
+        qargs = []
+
+        for (key, val) in chain(
+            keyvalues.items(), absolutes.items(), additive_relatives.items()
+        ):
+            insert_cols.append(key)
+            qargs.append(val)
+
+        sql = """
+            INSERT INTO %(table)s (%(insert_cols_cs)s)
+            VALUES (%(insert_vals_qs)s)
+            ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+        """ % {
+            "table": table,
+            "insert_cols_cs": ", ".join(insert_cols),
+            "insert_vals_qs": ", ".join(
+                ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+            ),
+            "key_columns": ", ".join(keyvalues),
+            "updates": ", ".join(chain(absolute_updates, relative_updates)),
+        }
 
-            sql = """
-                INSERT INTO %(table)s (%(insert_cols_cs)s)
-                VALUES (%(insert_vals_qs)s)
-                ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
-            """ % {
-                "table": table,
-                "insert_cols_cs": ", ".join(insert_cols),
-                "insert_vals_qs": ", ".join(
-                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
-                ),
-                "key_columns": ", ".join(keyvalues),
-                "updates": ", ".join(chain(absolute_updates, relative_updates)),
-            }
-
-            txn.execute(sql, qargs)
-        else:
-            self.database_engine.lock_table(txn, table)
-            retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
-            current_row = self.db_pool.simple_select_one_txn(
-                txn, table, keyvalues, retcols, allow_none=True
-            )
-            if current_row is None:
-                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
-                self.db_pool.simple_insert_txn(txn, table, merged_dict)
-            else:
-                for (key, val) in additive_relatives.items():
-                    if current_row[key] is None:
-                        current_row[key] = val
-                    else:
-                        current_row[key] += val
-                current_row.update(absolutes)
-                self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
+        txn.execute(sql, qargs)
 
     async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
         """Calculate and insert an entry into room_stats_current.
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index a347430aa7..3f9bfaeac5 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -72,6 +72,7 @@ from synapse.storage.util.id_generators import MultiWriterIdGenerator
 from synapse.types import PersistedEventPosition, RoomStreamToken
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.cancellation import cancellable
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -597,6 +598,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return ret, key
 
+    @cancellable
     async def get_membership_changes_for_user(
         self,
         user_id: str,
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index ba79e19f7f..f8c6877ee8 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             retry_interval: how long until next retry in ms
         """
 
-        if self.database_engine.can_native_upsert:
-            await self.db_pool.runInteraction(
-                "set_destination_retry_timings",
-                self._set_destination_retry_timings_native,
-                destination,
-                failure_ts,
-                retry_last_ts,
-                retry_interval,
-                db_autocommit=True,  # Safe as its a single upsert
-            )
-        else:
-            await self.db_pool.runInteraction(
-                "set_destination_retry_timings",
-                self._set_destination_retry_timings_emulated,
-                destination,
-                failure_ts,
-                retry_last_ts,
-                retry_interval,
-            )
+        await self.db_pool.runInteraction(
+            "set_destination_retry_timings",
+            self._set_destination_retry_timings_native,
+            destination,
+            failure_ts,
+            retry_last_ts,
+            retry_interval,
+            db_autocommit=True,  # Safe as it's a single upsert
+        )
 
     def _set_destination_retry_timings_native(
         self,
@@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         retry_last_ts: int,
         retry_interval: int,
     ) -> None:
-        assert self.database_engine.can_native_upsert
-
         # Upsert retry time interval if retry_interval is zero (i.e. we're
         # resetting it) or greater than the existing retry interval.
         #
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index bb64543c1f..f8cfcaca83 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -31,6 +31,7 @@ from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import MutableStateMap, StateKey, StateMap
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.dictionary_cache import DictionaryCache
+from synapse.util.cancellation import cancellable
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -156,6 +157,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
             "get_state_group_delta", _get_state_group_delta_txn
         )
 
+    @cancellable
     async def _get_state_groups_from_groups(
         self, groups: List[int], state_filter: StateFilter
     ) -> Dict[int, StateMap[str]]:
@@ -235,6 +237,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
 
         return state_filter.filter_state(state_dict_ids), not missing_types
 
+    @cancellable
     async def _get_state_for_groups(
         self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
     ) -> Dict[int, MutableStateMap[str]]:
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 971ff82693..0d16a419a4 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -45,14 +45,6 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
 
     @property
     @abc.abstractmethod
-    def can_native_upsert(self) -> bool:
-        """
-        Do we support native UPSERTs?
-        """
-        ...
-
-    @property
-    @abc.abstractmethod
     def supports_using_any_list(self) -> bool:
         """
         Do we support using `a = ANY(?)` and passing a list
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 517f9d5f98..7f7d006ac2 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -159,13 +159,6 @@ class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
         db_conn.commit()
 
     @property
-    def can_native_upsert(self) -> bool:
-        """
-        Can we use native UPSERTs?
-        """
-        return True
-
-    @property
     def supports_using_any_list(self) -> bool:
         """Do we support using `a = ANY(?)` and passing a list"""
         return True
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 621f2c5efe..095ae0a096 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -49,14 +49,6 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
         return True
 
     @property
-    def can_native_upsert(self) -> bool:
-        """
-        Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
-        more work we haven't done yet to tell what was inserted vs updated.
-        """
-        return sqlite3.sqlite_version_info >= (3, 24, 0)
-
-    @property
     def supports_using_any_list(self) -> bool:
         """Do we support using `a = ANY(?)` and passing a list"""
         return False
@@ -70,12 +62,11 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
         self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False
     ) -> None:
         if not allow_outdated_version:
-            version = sqlite3.sqlite_version_info
             # Synapse is untested against older SQLite versions, and we don't want
             # to let users upgrade to a version of Synapse with broken support for their
             # sqlite version, because it risks leaving them with a half-upgraded db.
-            if version < (3, 22, 0):
-                raise RuntimeError("Synapse requires sqlite 3.22 or above.")
+            if sqlite3.sqlite_version_info < (3, 27, 0):
+                raise RuntimeError("Synapse requires sqlite 3.27 or above.")
 
     def check_new_database(self, txn: Cursor) -> None:
         """Gets called when setting up a brand new database. This allows us to
diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
new file mode 100644
index 0000000000..2a822f4509
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
@@ -0,0 +1,19 @@
+/* Copyright 2022 Beeper
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering BIGINT;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('populate_event_stream_ordering', '{}');
diff --git a/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql b/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql
new file mode 100644
index 0000000000..609eb1750f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql
@@ -0,0 +1,16 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD consent_ts bigint;
diff --git a/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py b/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py
new file mode 100644
index 0000000000..b5853d125c
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py
@@ -0,0 +1,52 @@
+# Copyright 2022 Beeper
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+Forces through the `current_state_events_membership` background job so checks
+for its completion can be removed.
+
+Note the background job must still remain defined in the database class.
+"""
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    cur.execute("SELECT update_name FROM background_updates")
+    rows = cur.fetchall()
+    for row in rows:
+        if row[0] == "current_state_events_membership":
+            break
+    # No pending background job so nothing to do here
+    else:
+        return
+
+    # Populate membership field for all current_state_events, this may take
+    # a while but was originally handled via a background update in 2019.
+    cur.execute(
+        """
+        UPDATE current_state_events
+        SET membership = (
+            SELECT membership FROM room_memberships
+            WHERE event_id = current_state_events.event_id
+        )
+        """
+    )
+
+    # Finally, delete the background job because we've handled it above
+    cur.execute(
+        """
+        DELETE FROM background_updates
+        WHERE update_name = 'current_state_events_membership'
+        """
+    )
diff --git a/synapse/storage/schema/state/delta/30/state_stream.sql b/synapse/storage/schema/state/delta/30/state_stream.sql
index e85699e82e..bdaf8b02d5 100644
--- a/synapse/storage/schema/state/delta/30/state_stream.sql
+++ b/synapse/storage/schema/state/delta/30/state_stream.sql
@@ -26,6 +26,10 @@
  * (event, state) pair, we can use that stream_ordering to identify when
  * the new state was assigned for the event.
  */
+
+/* NB: This table belongs to the `main` logical database; it should not be present
+ * in `state`.
+ */
 CREATE TABLE IF NOT EXISTS ex_outlier_stream(
     event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
     event_id TEXT NOT NULL,
diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py
index b4bf49dace..8d8894d1d5 100644
--- a/synapse/storage/util/partial_state_events_tracker.py
+++ b/synapse/storage/util/partial_state_events_tracker.py
@@ -24,6 +24,7 @@ from synapse.logging.opentracing import trace_with_opname
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.room import RoomWorkerStore
 from synapse.util import unwrapFirstError
+from synapse.util.cancellation import cancellable
 
 logger = logging.getLogger(__name__)
 
@@ -60,6 +61,7 @@ class PartialStateEventsTracker:
                 o.callback(None)
 
     @trace_with_opname("PartialStateEventsTracker.await_full_state")
+    @cancellable
     async def await_full_state(self, event_ids: Collection[str]) -> None:
         """Wait for all the given events to have full state.
 
@@ -154,6 +156,7 @@ class PartialCurrentStateTracker:
                 o.callback(None)
 
     @trace_with_opname("PartialCurrentStateTracker.await_full_state")
+    @cancellable
     async def await_full_state(self, room_id: str) -> None:
         # We add the deferred immediately so that the DB call to check for
         # partial state doesn't race when we unpartial the room.
diff --git a/synapse/types.py b/synapse/types.py
index 668d48d646..ec44601f54 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -52,6 +52,7 @@ from twisted.internet.interfaces import (
 )
 
 from synapse.api.errors import Codes, SynapseError
+from synapse.util.cancellation import cancellable
 from synapse.util.stringutils import parse_and_validate_server_name
 
 if TYPE_CHECKING:
@@ -699,7 +700,11 @@ class StreamToken:
     START: ClassVar["StreamToken"]
 
     @classmethod
+    @cancellable
     async def from_string(cls, store: "DataStore", string: str) -> "StreamToken":
+        """
+        Creates a RoomStreamToken from its textual representation.
+        """
         try:
             keys = string.split(cls._SEPARATOR)
             while len(keys) < len(attr.fields(cls)):
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index bdf9b0dc8c..35c0be08b0 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -20,9 +20,11 @@ from sys import intern
 from typing import Any, Callable, Dict, List, Optional, Sized, TypeVar
 
 import attr
+from prometheus_client import REGISTRY
 from prometheus_client.core import Gauge
 
 from synapse.config.cache import add_resizable_cache
+from synapse.util.metrics import DynamicCollectorRegistry
 
 logger = logging.getLogger(__name__)
 
@@ -30,27 +32,62 @@ logger = logging.getLogger(__name__)
 # Whether to track estimated memory usage of the LruCaches.
 TRACK_MEMORY_USAGE = False
 
+# We track cache metrics in a special registry that lets us update the metrics
+# just before they are returned from the scrape endpoint.
+CACHE_METRIC_REGISTRY = DynamicCollectorRegistry()
 
 caches_by_name: Dict[str, Sized] = {}
-collectors_by_name: Dict[str, "CacheMetric"] = {}
 
-cache_size = Gauge("synapse_util_caches_cache_size", "", ["name"])
-cache_hits = Gauge("synapse_util_caches_cache_hits", "", ["name"])
-cache_evicted = Gauge("synapse_util_caches_cache_evicted_size", "", ["name", "reason"])
-cache_total = Gauge("synapse_util_caches_cache_total", "", ["name"])
-cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
+cache_size = Gauge(
+    "synapse_util_caches_cache_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+cache_hits = Gauge(
+    "synapse_util_caches_cache_hits", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+cache_evicted = Gauge(
+    "synapse_util_caches_cache_evicted_size",
+    "",
+    ["name", "reason"],
+    registry=CACHE_METRIC_REGISTRY,
+)
+cache_total = Gauge(
+    "synapse_util_caches_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+cache_max_size = Gauge(
+    "synapse_util_caches_cache_max_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
 cache_memory_usage = Gauge(
     "synapse_util_caches_cache_size_bytes",
     "Estimated memory usage of the caches",
     ["name"],
+    registry=CACHE_METRIC_REGISTRY,
 )
 
-response_cache_size = Gauge("synapse_util_caches_response_cache_size", "", ["name"])
-response_cache_hits = Gauge("synapse_util_caches_response_cache_hits", "", ["name"])
+response_cache_size = Gauge(
+    "synapse_util_caches_response_cache_size",
+    "",
+    ["name"],
+    registry=CACHE_METRIC_REGISTRY,
+)
+response_cache_hits = Gauge(
+    "synapse_util_caches_response_cache_hits",
+    "",
+    ["name"],
+    registry=CACHE_METRIC_REGISTRY,
+)
 response_cache_evicted = Gauge(
-    "synapse_util_caches_response_cache_evicted_size", "", ["name", "reason"]
+    "synapse_util_caches_response_cache_evicted_size",
+    "",
+    ["name", "reason"],
+    registry=CACHE_METRIC_REGISTRY,
 )
-response_cache_total = Gauge("synapse_util_caches_response_cache_total", "", ["name"])
+response_cache_total = Gauge(
+    "synapse_util_caches_response_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+
+
+# Register our custom cache metrics registry with the global registry
+REGISTRY.register(CACHE_METRIC_REGISTRY)
 
 
 class EvictionReason(Enum):
@@ -168,9 +205,8 @@ def register_cache(
         add_resizable_cache(cache_name, resize_callback)
 
     metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
-    metric_name = "cache_%s_%s" % (cache_type, cache_name)
     caches_by_name[cache_name] = cache
-    collectors_by_name[metric_name] = metric
+    CACHE_METRIC_REGISTRY.register_hook(metric.collect)
     return metric
 
 
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index bc3b4938ea..9687120ebf 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -15,9 +15,9 @@
 import logging
 from functools import wraps
 from types import TracebackType
-from typing import Awaitable, Callable, Optional, Type, TypeVar
+from typing import Awaitable, Callable, Generator, List, Optional, Type, TypeVar
 
-from prometheus_client import Counter
+from prometheus_client import CollectorRegistry, Counter, Metric
 from typing_extensions import Concatenate, ParamSpec, Protocol
 
 from synapse.logging.context import (
@@ -208,3 +208,33 @@ class Measure:
         metrics.real_time_sum += duration
 
         # TODO: Add other in flight metrics.
+
+
+class DynamicCollectorRegistry(CollectorRegistry):
+    """
+    Custom Prometheus Collector registry that calls a hook first, allowing you
+    to update metrics on-demand.
+
+    Don't forget to register this registry with the main registry!
+    """
+
+    def __init__(self) -> None:
+        super().__init__()
+        self._pre_update_hooks: List[Callable[[], None]] = []
+
+    def collect(self) -> Generator[Metric, None, None]:
+        """
+        Collects metrics, calling pre-update hooks first.
+        """
+
+        for pre_update_hook in self._pre_update_hooks:
+            pre_update_hook()
+
+        yield from super().collect()
+
+    def register_hook(self, hook: Callable[[], None]) -> None:
+        """
+        Registers a hook that is called before metric collection.
+        """
+
+        self._pre_update_hooks.append(hook)
diff --git a/synapse/util/rust.py b/synapse/util/rust.py
new file mode 100644
index 0000000000..30ecb9ffd9
--- /dev/null
+++ b/synapse/util/rust.py
@@ -0,0 +1,84 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from hashlib import blake2b
+
+import synapse
+from synapse.synapse_rust import get_rust_file_digest
+
+
+def check_rust_lib_up_to_date() -> None:
+    """For editable installs check if the rust library is outdated and needs to
+    be rebuilt.
+    """
+
+    if not _dist_is_editable():
+        return
+
+    synapse_dir = os.path.dirname(synapse.__file__)
+    synapse_root = os.path.abspath(os.path.join(synapse_dir, ".."))
+
+    # Double check we've not gone into site-packages...
+    if os.path.basename(synapse_root) == "site-packages":
+        return
+
+    # ... and it looks like the root of a python project.
+    if not os.path.exists("pyproject.toml"):
+        return
+
+    # Get the hash of all Rust source files
+    hash = _hash_rust_files_in_directory(os.path.join(synapse_root, "rust", "src"))
+
+    if hash != get_rust_file_digest():
+        raise Exception("Rust module outdated. Please rebuild using `poetry install`")
+
+
+def _hash_rust_files_in_directory(directory: str) -> str:
+    """Get the hash of all files in a directory (recursively)"""
+
+    directory = os.path.abspath(directory)
+
+    paths = []
+
+    dirs = [directory]
+    while dirs:
+        dir = dirs.pop()
+        with os.scandir(dir) as d:
+            for entry in d:
+                if entry.is_dir():
+                    dirs.append(entry.path)
+                else:
+                    paths.append(entry.path)
+
+    # We sort to make sure that we get a consistent and well-defined ordering.
+    paths.sort()
+
+    hasher = blake2b()
+
+    for path in paths:
+        with open(os.path.join(directory, path), "rb") as f:
+            hasher.update(f.read())
+
+    return hasher.hexdigest()
+
+
+def _dist_is_editable() -> bool:
+    """Is distribution an editable install?"""
+    for path_item in sys.path:
+        egg_link = os.path.join(path_item, "matrix-synapse.egg-link")
+        if os.path.isfile(egg_link):
+            return True
+    return False
diff --git a/tests/http/server/_base.py b/tests/http/server/_base.py
index 5726e60cee..5071f83574 100644
--- a/tests/http/server/_base.py
+++ b/tests/http/server/_base.py
@@ -140,6 +140,8 @@ def make_request_with_cancellation_test(
     method: str,
     path: str,
     content: Union[bytes, str, JsonDict] = b"",
+    *,
+    token: Optional[str] = None,
 ) -> FakeChannel:
     """Performs a request repeatedly, disconnecting at successive `await`s, until
     one completes.
@@ -211,7 +213,13 @@ def make_request_with_cancellation_test(
                 with deferred_patch.patch():
                     # Start the request.
                     channel = make_request(
-                        reactor, site, method, path, content, await_result=False
+                        reactor,
+                        site,
+                        method,
+                        path,
+                        content,
+                        await_result=False,
+                        access_token=token,
                     )
                     request = channel.request
 
diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py
index 9d71a97524..d156be82b0 100644
--- a/tests/rest/admin/test_room.py
+++ b/tests/rest/admin/test_room.py
@@ -11,6 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import json
+import time
 import urllib.parse
 from typing import List, Optional
 from unittest.mock import Mock
@@ -22,10 +24,11 @@ from twisted.test.proto_helpers import MemoryReactor
 import synapse.rest.admin
 from synapse.api.constants import EventTypes, Membership, RoomTypes
 from synapse.api.errors import Codes
-from synapse.handlers.pagination import PaginationHandler
+from synapse.handlers.pagination import PaginationHandler, PurgeStatus
 from synapse.rest.client import directory, events, login, room
 from synapse.server import HomeServer
 from synapse.util import Clock
+from synapse.util.stringutils import random_string
 
 from tests import unittest
 
@@ -1793,6 +1796,159 @@ class RoomTestCase(unittest.HomeserverTestCase):
         self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
 
 
+class RoomMessagesTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.admin_user = self.register_user("admin", "pass", admin=True)
+        self.admin_user_tok = self.login("admin", "pass")
+
+        self.user = self.register_user("foo", "pass")
+        self.user_tok = self.login("foo", "pass")
+        self.room_id = self.helper.create_room_as(self.user, tok=self.user_tok)
+
+    def test_timestamp_to_event(self) -> None:
+        """Test that providing the current timestamp can get the last event."""
+        self.helper.send(self.room_id, body="message 1", tok=self.user_tok)
+        second_event_id = self.helper.send(
+            self.room_id, body="message 2", tok=self.user_tok
+        )["event_id"]
+        ts = str(round(time.time() * 1000))
+
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/timestamp_to_event?dir=b&ts=%s"
+            % (self.room_id, ts),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, channel.code)
+        self.assertIn("event_id", channel.json_body)
+        self.assertEqual(second_event_id, channel.json_body["event_id"])
+
+    def test_topo_token_is_accepted(self) -> None:
+        """Test Topo Token is accepted."""
+        token = "t1-0_0_0_0_0_0_0_0_0"
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, channel.code)
+        self.assertIn("start", channel.json_body)
+        self.assertEqual(token, channel.json_body["start"])
+        self.assertIn("chunk", channel.json_body)
+        self.assertIn("end", channel.json_body)
+
+    def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
+        """Test that stream token is accepted for forward pagination."""
+        token = "s0_0_0_0_0_0_0_0_0"
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(200, channel.code)
+        self.assertIn("start", channel.json_body)
+        self.assertEqual(token, channel.json_body["start"])
+        self.assertIn("chunk", channel.json_body)
+        self.assertIn("end", channel.json_body)
+
+    def test_room_messages_purge(self) -> None:
+        """Test room messages can be retrieved by an admin that isn't in the room."""
+        store = self.hs.get_datastores().main
+        pagination_handler = self.hs.get_pagination_handler()
+
+        # Send a first message in the room, which will be removed by the purge.
+        first_event_id = self.helper.send(
+            self.room_id, body="message 1", tok=self.user_tok
+        )["event_id"]
+        first_token = self.get_success(
+            store.get_topological_token_for_event(first_event_id)
+        )
+        first_token_str = self.get_success(first_token.to_string(store))
+
+        # Send a second message in the room, which won't be removed, and which we'll
+        # use as the marker to purge events before.
+        second_event_id = self.helper.send(
+            self.room_id, body="message 2", tok=self.user_tok
+        )["event_id"]
+        second_token = self.get_success(
+            store.get_topological_token_for_event(second_event_id)
+        )
+        second_token_str = self.get_success(second_token.to_string(store))
+
+        # Send a third event in the room to ensure we don't fall under any edge case
+        # due to our marker being the latest forward extremity in the room.
+        self.helper.send(self.room_id, body="message 3", tok=self.user_tok)
+
+        # Check that we get the first and second message when querying /messages.
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s"
+            % (
+                self.room_id,
+                second_token_str,
+                json.dumps({"types": [EventTypes.Message]}),
+            ),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        chunk = channel.json_body["chunk"]
+        self.assertEqual(len(chunk), 2, [event["content"] for event in chunk])
+
+        # Purge every event before the second event.
+        purge_id = random_string(16)
+        pagination_handler._purges_by_id[purge_id] = PurgeStatus()
+        self.get_success(
+            pagination_handler._purge_history(
+                purge_id=purge_id,
+                room_id=self.room_id,
+                token=second_token_str,
+                delete_local_events=True,
+            )
+        )
+
+        # Check that we only get the second message through /message now that the first
+        # has been purged.
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s"
+            % (
+                self.room_id,
+                second_token_str,
+                json.dumps({"types": [EventTypes.Message]}),
+            ),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        chunk = channel.json_body["chunk"]
+        self.assertEqual(len(chunk), 1, [event["content"] for event in chunk])
+
+        # Check that we get no event, but also no error, when querying /messages with
+        # the token that was pointing at the first event, because we don't have it
+        # anymore.
+        channel = self.make_request(
+            "GET",
+            "/_synapse/admin/v1/rooms/%s/messages?from=%s&dir=b&filter=%s"
+            % (
+                self.room_id,
+                first_token_str,
+                json.dumps({"types": [EventTypes.Message]}),
+            ),
+            access_token=self.admin_user_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        chunk = channel.json_body["chunk"]
+        self.assertEqual(len(chunk), 0, [event["content"] for event in chunk])
+
+
 class JoinAliasRoomTestCase(unittest.HomeserverTestCase):
 
     servlets = [
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 1afd082707..ec5ccf6fca 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -2580,6 +2580,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
         self.assertIn("appservice_id", content)
         self.assertIn("consent_server_notice_sent", content)
         self.assertIn("consent_version", content)
+        self.assertIn("consent_ts", content)
         self.assertIn("external_ids", content)
 
         # This key was removed intentionally. Ensure it is not accidentally re-included.
diff --git a/tests/rest/client/test_keys.py b/tests/rest/client/test_keys.py
index bbc8e74243..741fecea77 100644
--- a/tests/rest/client/test_keys.py
+++ b/tests/rest/client/test_keys.py
@@ -19,6 +19,7 @@ from synapse.rest import admin
 from synapse.rest.client import keys, login
 
 from tests import unittest
+from tests.http.server._base import make_request_with_cancellation_test
 
 
 class KeyQueryTestCase(unittest.HomeserverTestCase):
@@ -89,3 +90,31 @@ class KeyQueryTestCase(unittest.HomeserverTestCase):
             Codes.BAD_JSON,
             channel.result,
         )
+
+    def test_key_query_cancellation(self) -> None:
+        """
+        Tests that /keys/query is cancellable and does not swallow the
+        CancelledError.
+        """
+        self.register_user("alice", "wonderland")
+        alice_token = self.login("alice", "wonderland")
+
+        bob = self.register_user("bob", "uncle")
+
+        channel = make_request_with_cancellation_test(
+            "test_key_query_cancellation",
+            self.reactor,
+            self.site,
+            "POST",
+            "/_matrix/client/r0/keys/query",
+            {
+                "device_keys": {
+                    # Empty list means we request keys for all bob's devices
+                    bob: [],
+                },
+            },
+            token=alice_token,
+        )
+
+        self.assertEqual(200, channel.code, msg=channel.result["body"])
+        self.assertIn(bob, channel.json_body["device_keys"])
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py
index 46d829b062..67401272ac 100644
--- a/tests/storage/databases/main/test_events_worker.py
+++ b/tests/storage/databases/main/test_events_worker.py
@@ -254,7 +254,7 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase):
                         "room_id": self.room_id,
                         "json": json.dumps(event_json),
                         "internal_metadata": "{}",
-                        "format_version": EventFormatVersions.V3,
+                        "format_version": EventFormatVersions.ROOM_V4_PLUS,
                     },
                 )
             )
diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py
index cce8e75c74..40e58f8199 100644
--- a/tests/storage/test_base.py
+++ b/tests/storage/test_base.py
@@ -54,7 +54,6 @@ class SQLBaseStoreTestCase(unittest.TestCase):
         sqlite_config = {"name": "sqlite3"}
         engine = create_engine(sqlite_config)
         fake_engine = Mock(wraps=engine)
-        fake_engine.can_native_upsert = False
         fake_engine.in_transaction.return_value = False
 
         db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine)
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index d92a9ac5b7..a6679e1312 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -513,7 +513,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
 
         def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]:
             """Account for differences in prev_events format across room versions"""
-            if room_version.event_format == EventFormatVersions.V1:
+            if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
                 return prev_event_id, {}
 
             return prev_event_id
diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py
index a49ac1525e..853a93afab 100644
--- a/tests/storage/test_registration.py
+++ b/tests/storage/test_registration.py
@@ -11,15 +11,18 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.api.constants import UserTypes
 from synapse.api.errors import ThreepidValidationError
+from synapse.server import HomeServer
+from synapse.util import Clock
 
 from tests.unittest import HomeserverTestCase
 
 
 class RegistrationStoreTestCase(HomeserverTestCase):
-    def prepare(self, reactor, clock, hs):
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.store = hs.get_datastores().main
 
         self.user_id = "@my-user:test"
@@ -27,7 +30,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
         self.pwhash = "{xx1}123456789"
         self.device_id = "akgjhdjklgshg"
 
-    def test_register(self):
+    def test_register(self) -> None:
         self.get_success(self.store.register_user(self.user_id, self.pwhash))
 
         self.assertEqual(
@@ -38,6 +41,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
                 "admin": 0,
                 "is_guest": 0,
                 "consent_version": None,
+                "consent_ts": None,
                 "consent_server_notice_sent": None,
                 "appservice_id": None,
                 "creation_ts": 0,
@@ -48,7 +52,20 @@ class RegistrationStoreTestCase(HomeserverTestCase):
             (self.get_success(self.store.get_user_by_id(self.user_id))),
         )
 
-    def test_add_tokens(self):
+    def test_consent(self) -> None:
+        self.get_success(self.store.register_user(self.user_id, self.pwhash))
+        before_consent = self.clock.time_msec()
+        self.reactor.advance(5)
+        self.get_success(self.store.user_set_consent_version(self.user_id, "1"))
+        self.reactor.advance(5)
+
+        user = self.get_success(self.store.get_user_by_id(self.user_id))
+        assert user
+        self.assertEqual(user["consent_version"], "1")
+        self.assertGreater(user["consent_ts"], before_consent)
+        self.assertLess(user["consent_ts"], self.clock.time_msec())
+
+    def test_add_tokens(self) -> None:
         self.get_success(self.store.register_user(self.user_id, self.pwhash))
         self.get_success(
             self.store.add_access_token_to_user(
@@ -58,11 +75,12 @@ class RegistrationStoreTestCase(HomeserverTestCase):
 
         result = self.get_success(self.store.get_user_by_access_token(self.tokens[1]))
 
+        assert result
         self.assertEqual(result.user_id, self.user_id)
         self.assertEqual(result.device_id, self.device_id)
         self.assertIsNotNone(result.token_id)
 
-    def test_user_delete_access_tokens(self):
+    def test_user_delete_access_tokens(self) -> None:
         # add some tokens
         self.get_success(self.store.register_user(self.user_id, self.pwhash))
         self.get_success(
@@ -87,6 +105,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
 
         # check the one not associated with the device was not deleted
         user = self.get_success(self.store.get_user_by_access_token(self.tokens[0]))
+        assert user
         self.assertEqual(self.user_id, user.user_id)
 
         # now delete the rest
@@ -95,11 +114,11 @@ class RegistrationStoreTestCase(HomeserverTestCase):
         user = self.get_success(self.store.get_user_by_access_token(self.tokens[0]))
         self.assertIsNone(user, "access token was not deleted without device_id")
 
-    def test_is_support_user(self):
+    def test_is_support_user(self) -> None:
         TEST_USER = "@test:test"
         SUPPORT_USER = "@support:test"
 
-        res = self.get_success(self.store.is_support_user(None))
+        res = self.get_success(self.store.is_support_user(None))  # type: ignore[arg-type]
         self.assertFalse(res)
         self.get_success(
             self.store.register_user(user_id=TEST_USER, password_hash=None)
@@ -115,7 +134,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
         res = self.get_success(self.store.is_support_user(SUPPORT_USER))
         self.assertTrue(res)
 
-    def test_3pid_inhibit_invalid_validation_session_error(self):
+    def test_3pid_inhibit_invalid_validation_session_error(self) -> None:
         """Tests that enabling the configuration option to inhibit 3PID errors on
         /requestToken also inhibits validation errors caused by an unknown session ID.
         """
diff --git a/tests/test_event_auth.py b/tests/test_event_auth.py
index e42d7b9ba0..f4d9fba0a1 100644
--- a/tests/test_event_auth.py
+++ b/tests/test_event_auth.py
@@ -821,7 +821,7 @@ def _alias_event(room_version: RoomVersion, sender: str, **kwargs) -> EventBase:
 def _build_auth_dict_for_room_version(
     room_version: RoomVersion, auth_events: Iterable[EventBase]
 ) -> List:
-    if room_version.event_format == EventFormatVersions.V1:
+    if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
         return [(e.event_id, "not_used") for e in auth_events]
     else:
         return [e.event_id for e in auth_events]
@@ -871,7 +871,7 @@ event_count = 0
 
 def _maybe_get_event_id_dict_for_room_version(room_version: RoomVersion) -> dict:
     """If this room version needs it, generate an event id"""
-    if room_version.event_format != EventFormatVersions.V1:
+    if room_version.event_format != EventFormatVersions.ROOM_V1_V2:
         return {}
 
     global event_count
diff --git a/tests/test_rust.py b/tests/test_rust.py
new file mode 100644
index 0000000000..55d8b6b28c
--- /dev/null
+++ b/tests/test_rust.py
@@ -0,0 +1,11 @@
+from synapse.synapse_rust import sum_as_string
+
+from tests import unittest
+
+
+class RustTestCase(unittest.TestCase):
+    """Basic tests to ensure that we can call into Rust code."""
+
+    def test_basic(self):
+        result = sum_as_string(1, 2)
+        self.assertEqual("3", result)
diff --git a/tests/test_types.py b/tests/test_types.py
index d8d82a517e..1111169384 100644
--- a/tests/test_types.py
+++ b/tests/test_types.py
@@ -13,11 +13,35 @@
 # limitations under the License.
 
 from synapse.api.errors import SynapseError
-from synapse.types import RoomAlias, UserID, map_username_to_mxid_localpart
+from synapse.types import (
+    RoomAlias,
+    UserID,
+    get_domain_from_id,
+    get_localpart_from_id,
+    map_username_to_mxid_localpart,
+)
 
 from tests import unittest
 
 
+class IsMineIDTests(unittest.HomeserverTestCase):
+    def test_is_mine_id(self) -> None:
+        self.assertTrue(self.hs.is_mine_id("@user:test"))
+        self.assertTrue(self.hs.is_mine_id("#room:test"))
+        self.assertTrue(self.hs.is_mine_id("invalid:test"))
+
+        self.assertFalse(self.hs.is_mine_id("@user:test\0"))
+        self.assertFalse(self.hs.is_mine_id("@user"))
+
+    def test_two_colons(self) -> None:
+        """Test handling of IDs containing more than one colon."""
+        # The domain starts after the first colon.
+        # These functions must interpret things consistently.
+        self.assertFalse(self.hs.is_mine_id("@user:test:test"))
+        self.assertEqual("user", get_localpart_from_id("@user:test:test"))
+        self.assertEqual("test:test", get_domain_from_id("@user:test:test"))
+
+
 class UserIDTestCase(unittest.HomeserverTestCase):
     def test_parse(self):
         user = UserID.from_string("@1234abcd:test")