summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--.github/workflows/tests.yml18
-rw-r--r--Cargo.lock12
-rw-r--r--changelog.d/15997.misc1
-rw-r--r--changelog.d/16066.bugfix1
-rw-r--r--changelog.d/16090.misc1
-rw-r--r--changelog.d/16137.feature1
-rw-r--r--changelog.d/16170.bugfix1
-rw-r--r--changelog.d/16171.bugfix1
-rw-r--r--changelog.d/16172.bugfix1
-rw-r--r--changelog.d/16174.bugfix1
-rw-r--r--changelog.d/16219.feature1
-rw-r--r--changelog.d/16235.misc1
-rw-r--r--changelog.d/16240.misc1
-rw-r--r--changelog.d/16248.misc1
-rw-r--r--changelog.d/16251.bugfix1
-rw-r--r--changelog.d/16257.bugfix1
-rw-r--r--changelog.d/16260.misc1
-rw-r--r--changelog.d/16263.misc1
-rw-r--r--docs/upgrade.md8
-rw-r--r--docs/usage/configuration/config_documentation.md4
-rw-r--r--flake.lock6
-rw-r--r--flake.nix20
-rw-r--r--poetry.lock37
-rw-r--r--rust/Cargo.toml9
-rwxr-xr-xscripts-dev/federation_client.py12
-rw-r--r--synapse/api/presence.py43
-rw-r--r--synapse/config/_base.py7
-rw-r--r--synapse/handlers/device.py48
-rw-r--r--synapse/handlers/initial_sync.py8
-rw-r--r--synapse/handlers/pagination.py12
-rw-r--r--synapse/handlers/presence.py300
-rw-r--r--synapse/handlers/room.py10
-rw-r--r--synapse/handlers/sync.py16
-rw-r--r--synapse/http/federation/matrix_federation_agent.py29
-rw-r--r--synapse/logging/context.py4
-rw-r--r--synapse/module_api/__init__.py13
-rw-r--r--synapse/module_api/callbacks/third_party_event_rules_callbacks.py11
-rw-r--r--synapse/storage/databases/main/deviceinbox.py28
-rw-r--r--synapse/storage/databases/main/devices.py8
-rw-r--r--synapse/storage/databases/main/keys.py17
-rw-r--r--synapse/storage/databases/main/receipts.py6
-rw-r--r--synapse/storage/engines/_base.py6
-rw-r--r--synapse/storage/engines/postgres.py4
-rw-r--r--synapse/storage/engines/sqlite.py4
-rw-r--r--synapse/storage/schema/main/delta/48/group_unique_indexes.py4
-rw-r--r--synapse/util/gai_resolver.py2
-rw-r--r--synapse/util/task_scheduler.py17
-rw-r--r--tests/handlers/test_appservice.py125
-rw-r--r--tests/handlers/test_device.py47
-rw-r--r--tests/handlers/test_presence.py598
-rw-r--r--tests/http/federation/test_matrix_federation_agent.py323
51 files changed, 1634 insertions, 198 deletions
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 0a01e82984..fb117380d0 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -35,7 +35,7 @@ jobs:
     steps:
       - uses: actions/checkout@v3
       - name: Install Rust
-        uses: dtolnay/rust-toolchain@1.60.0
+        uses: dtolnay/rust-toolchain@1.61.0
       - uses: Swatinem/rust-cache@v2
       - uses: matrix-org/setup-python-poetry@v1
         with:
@@ -93,7 +93,7 @@ jobs:
         uses: actions/checkout@v3
 
       - name: Install Rust
-        uses: dtolnay/rust-toolchain@1.60.0
+        uses: dtolnay/rust-toolchain@1.61.0
       - uses: Swatinem/rust-cache@v2
 
       - name: Setup Poetry
@@ -150,7 +150,7 @@ jobs:
         with:
           ref: ${{ github.event.pull_request.head.sha }}
       - name: Install Rust
-        uses: dtolnay/rust-toolchain@1.60.0
+        uses: dtolnay/rust-toolchain@1.61.0
       - uses: Swatinem/rust-cache@v2
       - uses: matrix-org/setup-python-poetry@v1
         with:
@@ -167,7 +167,7 @@ jobs:
       - uses: actions/checkout@v3
 
       - name: Install Rust
-        uses: dtolnay/rust-toolchain@1.60.0
+        uses: dtolnay/rust-toolchain@1.61.0
         with:
             components: clippy
       - uses: Swatinem/rust-cache@v2
@@ -268,7 +268,7 @@ jobs:
             postgres:${{ matrix.job.postgres-version }}
 
       - name: Install Rust
-        uses: dtolnay/rust-toolchain@1.60.0
+        uses: dtolnay/rust-toolchain@1.61.0
       - uses: Swatinem/rust-cache@v2
 
       - uses: matrix-org/setup-python-poetry@v1
@@ -308,7 +308,7 @@ jobs:
       - uses: actions/checkout@v3
 
       - name: Install Rust
-        uses: dtolnay/rust-toolchain@1.60.0
+        uses: dtolnay/rust-toolchain@1.61.0
       - uses: Swatinem/rust-cache@v2
 
       # There aren't wheels for some of the older deps, so we need to install
@@ -416,7 +416,7 @@ jobs:
         run: cat sytest-blacklist .ci/worker-blacklist > synapse-blacklist-with-workers
 
       - name: Install Rust
-        uses: dtolnay/rust-toolchain@1.60.0
+        uses: dtolnay/rust-toolchain@1.61.0
       - uses: Swatinem/rust-cache@v2
 
       - name: Run SyTest
@@ -556,7 +556,7 @@ jobs:
           path: synapse
 
       - name: Install Rust
-        uses: dtolnay/rust-toolchain@1.60.0
+        uses: dtolnay/rust-toolchain@1.61.0
       - uses: Swatinem/rust-cache@v2
 
       - uses: actions/setup-go@v4
@@ -584,7 +584,7 @@ jobs:
       - uses: actions/checkout@v3
 
       - name: Install Rust
-        uses: dtolnay/rust-toolchain@1.60.0
+        uses: dtolnay/rust-toolchain@1.61.0
       - uses: Swatinem/rust-cache@v2
 
       - run: cargo test
diff --git a/Cargo.lock b/Cargo.lock
index 4d60f8dcb6..95a713e437 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -138,9 +138,9 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
 
 [[package]]
 name = "memchr"
-version = "2.5.0"
+version = "2.6.3"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
+checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c"
 
 [[package]]
 name = "memoffset"
@@ -291,9 +291,9 @@ dependencies = [
 
 [[package]]
 name = "regex"
-version = "1.9.4"
+version = "1.9.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29"
+checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47"
 dependencies = [
  "aho-corasick",
  "memchr",
@@ -303,9 +303,9 @@ dependencies = [
 
 [[package]]
 name = "regex-automata"
-version = "0.3.7"
+version = "0.3.8"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629"
+checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795"
 dependencies = [
  "aho-corasick",
  "memchr",
diff --git a/changelog.d/15997.misc b/changelog.d/15997.misc
new file mode 100644
index 0000000000..94768c3cb8
--- /dev/null
+++ b/changelog.d/15997.misc
@@ -0,0 +1 @@
+Allow modules to delete rooms.
\ No newline at end of file
diff --git a/changelog.d/16066.bugfix b/changelog.d/16066.bugfix
new file mode 100644
index 0000000000..83649cf42a
--- /dev/null
+++ b/changelog.d/16066.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where multi-device accounts could cause high load due to presence.
diff --git a/changelog.d/16090.misc b/changelog.d/16090.misc
new file mode 100644
index 0000000000..d54ef936c7
--- /dev/null
+++ b/changelog.d/16090.misc
@@ -0,0 +1 @@
+Add GCC and GNU Make to the Nix flake development environment so that `ruff` can be compiled.
\ No newline at end of file
diff --git a/changelog.d/16137.feature b/changelog.d/16137.feature
new file mode 100644
index 0000000000..bba6f161cd
--- /dev/null
+++ b/changelog.d/16137.feature
@@ -0,0 +1 @@
+Support resolving homeservers using `matrix-fed` DNS SRV records from [MSC4040](https://github.com/matrix-org/matrix-spec-proposals/pull/4040).
diff --git a/changelog.d/16170.bugfix b/changelog.d/16170.bugfix
new file mode 100644
index 0000000000..83649cf42a
--- /dev/null
+++ b/changelog.d/16170.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where multi-device accounts could cause high load due to presence.
diff --git a/changelog.d/16171.bugfix b/changelog.d/16171.bugfix
new file mode 100644
index 0000000000..83649cf42a
--- /dev/null
+++ b/changelog.d/16171.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where multi-device accounts could cause high load due to presence.
diff --git a/changelog.d/16172.bugfix b/changelog.d/16172.bugfix
new file mode 100644
index 0000000000..83649cf42a
--- /dev/null
+++ b/changelog.d/16172.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where multi-device accounts could cause high load due to presence.
diff --git a/changelog.d/16174.bugfix b/changelog.d/16174.bugfix
new file mode 100644
index 0000000000..83649cf42a
--- /dev/null
+++ b/changelog.d/16174.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where multi-device accounts could cause high load due to presence.
diff --git a/changelog.d/16219.feature b/changelog.d/16219.feature
new file mode 100644
index 0000000000..c789f2abb7
--- /dev/null
+++ b/changelog.d/16219.feature
@@ -0,0 +1 @@
+Add the ability to use `G` (GiB) and `T` (TiB) suffixes in configuration options that refer to numbers of bytes.
\ No newline at end of file
diff --git a/changelog.d/16235.misc b/changelog.d/16235.misc
new file mode 100644
index 0000000000..b1533f93b6
--- /dev/null
+++ b/changelog.d/16235.misc
@@ -0,0 +1 @@
+Fix type checking when using the new version of Twisted.
diff --git a/changelog.d/16240.misc b/changelog.d/16240.misc
new file mode 100644
index 0000000000..4f266c1fb0
--- /dev/null
+++ b/changelog.d/16240.misc
@@ -0,0 +1 @@
+Delete device messages asynchronously and in staged batches using the task scheduler.
diff --git a/changelog.d/16248.misc b/changelog.d/16248.misc
new file mode 100644
index 0000000000..0a5ed6dccb
--- /dev/null
+++ b/changelog.d/16248.misc
@@ -0,0 +1 @@
+Bump minimum supported Rust version to 1.61.0.
diff --git a/changelog.d/16251.bugfix b/changelog.d/16251.bugfix
new file mode 100644
index 0000000000..6d3157c7aa
--- /dev/null
+++ b/changelog.d/16251.bugfix
@@ -0,0 +1 @@
+Fix a long-standing bug where appservices using MSC2409 to receive to_device messages, would only get messages for one user.
\ No newline at end of file
diff --git a/changelog.d/16257.bugfix b/changelog.d/16257.bugfix
new file mode 100644
index 0000000000..28a5319749
--- /dev/null
+++ b/changelog.d/16257.bugfix
@@ -0,0 +1 @@
+Fix long-standing bug where we kept re-requesting a remote server's key repeatedly, potentially causing delays in receiving events over federation.
diff --git a/changelog.d/16260.misc b/changelog.d/16260.misc
new file mode 100644
index 0000000000..9f3289d7d4
--- /dev/null
+++ b/changelog.d/16260.misc
@@ -0,0 +1 @@
+Update rust to version 1.71.1 in the nix development environment.
\ No newline at end of file
diff --git a/changelog.d/16263.misc b/changelog.d/16263.misc
new file mode 100644
index 0000000000..d54ef936c7
--- /dev/null
+++ b/changelog.d/16263.misc
@@ -0,0 +1 @@
+Add GCC and GNU Make to the Nix flake development environment so that `ruff` can be compiled.
\ No newline at end of file
diff --git a/docs/upgrade.md b/docs/upgrade.md
index f50a279e98..2f888b6f12 100644
--- a/docs/upgrade.md
+++ b/docs/upgrade.md
@@ -88,6 +88,14 @@ process, for example:
     dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
     ```
 
+# Upgrading to v1.93.0
+
+## Minimum supported Rust version
+The minimum supported Rust version has been increased from v1.60.0 to v1.61.0.
+Users building from source will need to ensure their `rustc` version is up to
+date.
+
+
 # Upgrading to v1.90.0
 
 ## App service query parameter authorization is now a configuration option
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 0b1725816e..97fd1beb39 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -25,8 +25,10 @@ messages from the database after 5 minutes, rather than 5 months.
 
 In addition, configuration options referring to size use the following suffixes:
 
-* `M` = MiB, or 1,048,576 bytes
 * `K` = KiB, or 1024 bytes
+* `M` = MiB, or 1,048,576 bytes
+* `G` = GiB, or 1,073,741,824 bytes
+* `T` = TiB, or 1,099,511,627,776 bytes
 
 For example, setting `max_avatar_size: 10M` means that Synapse will not accept files larger than 10,485,760 bytes
 for a user avatar.
diff --git a/flake.lock b/flake.lock
index d53be767a7..9b360fa33e 100644
--- a/flake.lock
+++ b/flake.lock
@@ -258,11 +258,11 @@
         "nixpkgs": "nixpkgs_3"
       },
       "locked": {
-        "lastModified": 1690510705,
-        "narHash": "sha256-6mjs3Gl9/xrseFh9iNcNq1u5yJ/MIoAmjoaG7SXZDIE=",
+        "lastModified": 1693966243,
+        "narHash": "sha256-a2CA1aMIPE67JWSVIGoGtD3EGlFdK9+OlJQs0FOWCKY=",
         "owner": "oxalica",
         "repo": "rust-overlay",
-        "rev": "851ae4c128905a62834d53ce7704ebc1ba481bea",
+        "rev": "a8b4bb4cbb744baaabc3e69099f352f99164e2c1",
         "type": "github"
       },
       "original": {
diff --git a/flake.nix b/flake.nix
index b89b6d9218..31f2832939 100644
--- a/flake.nix
+++ b/flake.nix
@@ -82,7 +82,7 @@
                   #
                   # NOTE: We currently need to set the Rust version unnecessarily high
                   # in order to work around https://github.com/matrix-org/synapse/issues/15939
-                  (rust-bin.stable."1.70.0".default.override {
+                  (rust-bin.stable."1.71.1".default.override {
                     # Additionally install the "rust-src" extension to allow diving into the
                     # Rust source code in an IDE (rust-analyzer will also make use of it).
                     extensions = [ "rust-src" ];
@@ -90,6 +90,11 @@
                   # The rust-analyzer language server implementation.
                   rust-analyzer
 
+                  # GCC includes a linker; needed for building `ruff`
+                  gcc
+                  # Needed for building `ruff`
+                  gnumake
+
                   # Native dependencies for running Synapse.
                   icu
                   libffi
@@ -236,6 +241,19 @@
                   URI
                   YAMLLibYAML
                 ]}";
+
+                # Clear the LD_LIBRARY_PATH environment variable on shell init.
+                #
+                # By default, devenv will set LD_LIBRARY_PATH to point to .devenv/profile/lib. This causes
+                # issues when we include `gcc` as a dependency to build C libraries, as the version of glibc
+                # that the development environment's cc compiler uses may differ from that of the system.
+                #
+                # When LD_LIBRARY_PATH is set, system tools will attempt to use the development environment's
+                # libraries. Which, when built against a different glibc version lead, to "version 'GLIBC_X.YY'
+                # not found" errors.
+                enterShell = ''
+                  unset LD_LIBRARY_PATH
+                '';
               }
             ];
           };
diff --git a/poetry.lock b/poetry.lock
index 1cefabb358..872a863edc 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2866,44 +2866,43 @@ urllib3 = ">=1.26.0"
 
 [[package]]
 name = "twisted"
-version = "22.10.0"
+version = "23.8.0"
 description = "An asynchronous networking framework written in Python"
 optional = false
 python-versions = ">=3.7.1"
 files = [
-    {file = "Twisted-22.10.0-py3-none-any.whl", hash = "sha256:86c55f712cc5ab6f6d64e02503352464f0400f66d4f079096d744080afcccbd0"},
-    {file = "Twisted-22.10.0.tar.gz", hash = "sha256:32acbd40a94f5f46e7b42c109bfae2b302250945561783a8b7a059048f2d4d31"},
+    {file = "twisted-23.8.0-py3-none-any.whl", hash = "sha256:b8bdba145de120ffb36c20e6e071cce984e89fba798611ed0704216fb7f884cd"},
+    {file = "twisted-23.8.0.tar.gz", hash = "sha256:3c73360add17336a622c0d811c2a2ce29866b6e59b1125fd6509b17252098a24"},
 ]
 
 [package.dependencies]
-attrs = ">=19.2.0"
-Automat = ">=0.8.0"
+attrs = ">=21.3.0"
+automat = ">=0.8.0"
 constantly = ">=15.1"
 hyperlink = ">=17.1.1"
 idna = {version = ">=2.4", optional = true, markers = "extra == \"tls\""}
-incremental = ">=21.3.0"
+incremental = ">=22.10.0"
 pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""}
 service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""}
 twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""}
-typing-extensions = ">=3.6.5"
-"zope.interface" = ">=4.4.2"
+typing-extensions = ">=3.10.0"
+zope-interface = ">=5"
 
 [package.extras]
-all-non-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
-conch = ["appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "cryptography (>=2.6)", "pyasn1"]
-conch-nacl = ["PyNaCl", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "cryptography (>=2.6)", "pyasn1"]
+all-non-platform = ["twisted[conch,contextvars,http2,serial,test,tls]", "twisted[conch,contextvars,http2,serial,test,tls]"]
+conch = ["appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)"]
 contextvars = ["contextvars (>=2.4,<3)"]
-dev = ["coverage (>=6b1,<7)", "pydoctor (>=22.9.0,<22.10.0)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=5.0,<6)", "sphinx-rtd-theme (>=1.0,<2.0)", "towncrier (>=22.8,<23.0)", "twistedchecker (>=0.7,<1.0)"]
-dev-release = ["pydoctor (>=22.9.0,<22.10.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=5.0,<6)", "sphinx-rtd-theme (>=1.0,<2.0)", "towncrier (>=22.8,<23.0)"]
-gtk-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pygobject", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
+dev = ["coverage (>=6b1,<7)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "twisted[dev-release]", "twistedchecker (>=0.7,<1.0)"]
+dev-release = ["pydoctor (>=23.4.0,<23.5.0)", "pydoctor (>=23.4.0,<23.5.0)", "readthedocs-sphinx-ext (>=2.2,<3.0)", "readthedocs-sphinx-ext (>=2.2,<3.0)", "sphinx (>=5,<7)", "sphinx (>=5,<7)", "sphinx-rtd-theme (>=1.2,<2.0)", "sphinx-rtd-theme (>=1.2,<2.0)", "towncrier (>=22.12,<23.0)", "towncrier (>=22.12,<23.0)", "urllib3 (<2)", "urllib3 (<2)"]
+gtk-platform = ["pygobject", "pygobject", "twisted[all-non-platform]", "twisted[all-non-platform]"]
 http2 = ["h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)"]
-macos-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyobjc-core", "pyobjc-framework-CFNetwork", "pyobjc-framework-Cocoa", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
-mypy = ["PyHamcrest (>=1.9.0)", "PyNaCl", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "coverage (>=6b1,<7)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "mypy (==0.930)", "mypy-zope (==0.3.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pydoctor (>=22.9.0,<22.10.0)", "pyflakes (>=2.2,<3.0)", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "python-subunit (>=1.4,<2.0)", "pywin32 (!=226)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "service-identity (>=18.1.0)", "sphinx (>=5.0,<6)", "sphinx-rtd-theme (>=1.0,<2.0)", "towncrier (>=22.8,<23.0)", "twistedchecker (>=0.7,<1.0)", "types-pyOpenSSL", "types-setuptools"]
-osx-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyobjc-core", "pyobjc-framework-CFNetwork", "pyobjc-framework-Cocoa", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
+macos-platform = ["pyobjc-core", "pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "pyobjc-framework-cocoa", "twisted[all-non-platform]", "twisted[all-non-platform]"]
+mypy = ["mypy (==0.981)", "mypy-extensions (==0.4.3)", "mypy-zope (==0.3.11)", "twisted[all-non-platform,dev]", "types-pyopenssl", "types-setuptools"]
+osx-platform = ["twisted[macos-platform]", "twisted[macos-platform]"]
 serial = ["pyserial (>=3.0)", "pywin32 (!=226)"]
-test = ["PyHamcrest (>=1.9.0)", "cython-test-exception-raiser (>=1.0.2,<2)", "hypothesis (>=6.0,<7.0)"]
+test = ["cython-test-exception-raiser (>=1.0.2,<2)", "hypothesis (>=6.56)", "pyhamcrest (>=2)"]
 tls = ["idna (>=2.4)", "pyopenssl (>=21.0.0)", "service-identity (>=18.1.0)"]
-windows-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "pywin32 (!=226)", "service-identity (>=18.1.0)"]
+windows-platform = ["pywin32 (!=226)", "pywin32 (!=226)", "twisted[all-non-platform]", "twisted[all-non-platform]"]
 
 [[package]]
 name = "twisted-iocpsupport"
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index 3ead01c052..16917136db 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -7,7 +7,7 @@ name = "synapse"
 version = "0.1.0"
 
 edition = "2021"
-rust-version = "1.60.0"
+rust-version = "1.61.0"
 
 [lib]
 name = "synapse"
@@ -23,7 +23,12 @@ name = "synapse.synapse_rust"
 anyhow = "1.0.63"
 lazy_static = "1.4.0"
 log = "0.4.17"
-pyo3 = { version = "0.17.1", features = ["macros", "anyhow", "abi3", "abi3-py37"] }
+pyo3 = { version = "0.17.1", features = [
+    "macros",
+    "anyhow",
+    "abi3",
+    "abi3-py37",
+] }
 pyo3-log = "0.8.1"
 pythonize = "0.17.0"
 regex = "1.6.0"
diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py
index 5ad334b4d8..e8baeac5e2 100755
--- a/scripts-dev/federation_client.py
+++ b/scripts-dev/federation_client.py
@@ -329,6 +329,17 @@ class MatrixConnectionAdapter(HTTPAdapter):
                 raise ValueError("Invalid host:port '%s'" % (server_name,))
             return out[0], port, out[0]
 
+        # Look up SRV for Matrix 1.8 `matrix-fed` service first
+        try:
+            srv = srvlookup.lookup("matrix-fed", "tcp", server_name)[0]
+            print(
+                f"SRV lookup on _matrix-fed._tcp.{server_name} gave {srv}",
+                file=sys.stderr,
+            )
+            return srv.host, srv.port, server_name
+        except Exception:
+            pass
+        # Fall back to deprecated `matrix` service
         try:
             srv = srvlookup.lookup("matrix", "tcp", server_name)[0]
             print(
@@ -337,6 +348,7 @@ class MatrixConnectionAdapter(HTTPAdapter):
             )
             return srv.host, srv.port, server_name
         except Exception:
+            # Fall even further back to just port 8448
             return server_name, 8448, server_name
 
     @staticmethod
diff --git a/synapse/api/presence.py b/synapse/api/presence.py
index b80aa83cb3..b78f419994 100644
--- a/synapse/api/presence.py
+++ b/synapse/api/presence.py
@@ -20,18 +20,53 @@ from synapse.api.constants import PresenceState
 from synapse.types import JsonDict
 
 
+@attr.s(slots=True, auto_attribs=True)
+class UserDevicePresenceState:
+    """
+    Represents the current presence state of a user's device.
+
+    user_id: The user ID.
+    device_id: The user's device ID.
+    state: The presence state, see PresenceState.
+    last_active_ts: Time in msec that the device last interacted with server.
+    last_sync_ts: Time in msec that the device last *completed* a sync
+        (or event stream).
+    """
+
+    user_id: str
+    device_id: Optional[str]
+    state: str
+    last_active_ts: int
+    last_sync_ts: int
+
+    @classmethod
+    def default(
+        cls, user_id: str, device_id: Optional[str]
+    ) -> "UserDevicePresenceState":
+        """Returns a default presence state."""
+        return cls(
+            user_id=user_id,
+            device_id=device_id,
+            state=PresenceState.OFFLINE,
+            last_active_ts=0,
+            last_sync_ts=0,
+        )
+
+
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class UserPresenceState:
     """Represents the current presence state of the user.
 
-    user_id
-    last_active: Time in msec that the user last interacted with server.
-    last_federation_update: Time in msec since either a) we sent a presence
+    user_id: The user ID.
+    state: The presence state, see PresenceState.
+    last_active_ts: Time in msec that the user last interacted with server.
+    last_federation_update_ts: Time in msec since either a) we sent a presence
         update to other servers or b) we received a presence update, depending
         on if is a local user or not.
-    last_user_sync: Time in msec that the user last *completed* a sync
+    last_user_sync_ts: Time in msec that the user last *completed* a sync
         (or event stream).
     status_msg: User set status message.
+    currently_active: True if the user is currently syncing.
     """
 
     user_id: str
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 69a8318127..58856839e1 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -179,8 +179,9 @@ class Config:
 
         If an integer is provided it is treated as bytes and is unchanged.
 
-        String byte sizes can have a suffix of 'K' or `M`, representing kibibytes and
-        mebibytes respectively. No suffix is understood as a plain byte count.
+        String byte sizes can have a suffix of 'K', `M`, `G` or `T`,
+        representing kibibytes, mebibytes, gibibytes and tebibytes respectively.
+        No suffix is understood as a plain byte count.
 
         Raises:
             TypeError, if given something other than an integer or a string
@@ -189,7 +190,7 @@ class Config:
         if type(value) is int:  # noqa: E721
             return value
         elif isinstance(value, str):
-            sizes = {"K": 1024, "M": 1024 * 1024}
+            sizes = {"K": 1024, "M": 1024 * 1024, "G": 1024**3, "T": 1024**4}
             size = 1
             suffix = value[-1]
             if suffix in sizes:
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 763f56dfc1..9e52af5f13 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -43,9 +43,12 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.types import (
     JsonDict,
+    JsonMapping,
+    ScheduledTask,
     StrCollection,
     StreamKeyType,
     StreamToken,
+    TaskStatus,
     UserID,
     get_domain_from_id,
     get_verify_key_from_cross_signing_key,
@@ -62,6 +65,7 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages"
 MAX_DEVICE_DISPLAY_NAME_LEN = 100
 DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000
 
@@ -78,6 +82,7 @@ class DeviceWorkerHandler:
         self._appservice_handler = hs.get_application_service_handler()
         self._state_storage = hs.get_storage_controllers().state
         self._auth_handler = hs.get_auth_handler()
+        self._event_sources = hs.get_event_sources()
         self.server_name = hs.hostname
         self._msc3852_enabled = hs.config.experimental.msc3852_enabled
         self._query_appservices_for_keys = (
@@ -386,6 +391,7 @@ class DeviceHandler(DeviceWorkerHandler):
         self._account_data_handler = hs.get_account_data_handler()
         self._storage_controllers = hs.get_storage_controllers()
         self.db_pool = hs.get_datastores().main.db_pool
+        self._task_scheduler = hs.get_task_scheduler()
 
         self.device_list_updater = DeviceListUpdater(hs, self)
 
@@ -419,6 +425,10 @@ class DeviceHandler(DeviceWorkerHandler):
                 self._delete_stale_devices,
             )
 
+        self._task_scheduler.register_action(
+            self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
+        )
+
     def _check_device_name_length(self, name: Optional[str]) -> None:
         """
         Checks whether a device name is longer than the maximum allowed length.
@@ -530,6 +540,7 @@ class DeviceHandler(DeviceWorkerHandler):
             user_id: The user to delete devices from.
             device_ids: The list of device IDs to delete
         """
+        to_device_stream_id = self._event_sources.get_current_token().to_device_key
 
         try:
             await self.store.delete_devices(user_id, device_ids)
@@ -559,12 +570,49 @@ class DeviceHandler(DeviceWorkerHandler):
                     f"org.matrix.msc3890.local_notification_settings.{device_id}",
                 )
 
+            # Delete device messages asynchronously and in batches using the task scheduler
+            await self._task_scheduler.schedule_task(
+                DELETE_DEVICE_MSGS_TASK_NAME,
+                resource_id=device_id,
+                params={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                    "up_to_stream_id": to_device_stream_id,
+                },
+            )
+
         # Pushers are deleted after `delete_access_tokens_for_user` is called so that
         # modules using `on_logged_out` hook can use them if needed.
         await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
 
         await self.notify_device_update(user_id, device_ids)
 
+    DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
+
+    async def _delete_device_messages(
+        self,
+        task: ScheduledTask,
+    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+        """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
+        assert task.params is not None
+        user_id = task.params["user_id"]
+        device_id = task.params["device_id"]
+        up_to_stream_id = task.params["up_to_stream_id"]
+
+        res = await self.store.delete_messages_for_device(
+            user_id=user_id,
+            device_id=device_id,
+            up_to_stream_id=up_to_stream_id,
+            limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
+        )
+
+        if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
+            return TaskStatus.COMPLETE, None, None
+        else:
+            # There is probably still device messages to be deleted, let's keep the task active and it will be run
+            # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
+            return TaskStatus.ACTIVE, None, None
+
     async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
         """Update the given device
 
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index b3be7a86f0..5dc76ef588 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from synapse.api.constants import (
     AccountDataTypes,
@@ -23,7 +23,6 @@ from synapse.api.constants import (
     Membership,
 )
 from synapse.api.errors import SynapseError
-from synapse.events import EventBase
 from synapse.events.utils import SerializeEventConfig
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
@@ -35,7 +34,6 @@ from synapse.types import (
     JsonDict,
     Requester,
     RoomStreamToken,
-    StateMap,
     StreamKeyType,
     StreamToken,
     UserID,
@@ -199,9 +197,7 @@ class InitialSyncHandler:
                     deferred_room_state = run_in_background(
                         self._state_storage_controller.get_state_for_events,
                         [event.event_id],
-                    ).addCallback(
-                        lambda states: cast(StateMap[EventBase], states[event.event_id])
-                    )
+                    ).addCallback(lambda states: states[event.event_id])
 
                 (messages, token), current_state = await make_deferred_yieldable(
                     gather_results(
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index e5ac9096cc..19cf5a2b43 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -713,7 +713,7 @@ class PaginationHandler:
         self,
         delete_id: str,
         room_id: str,
-        requester_user_id: str,
+        requester_user_id: Optional[str],
         new_room_user_id: Optional[str] = None,
         new_room_name: Optional[str] = None,
         message: Optional[str] = None,
@@ -732,6 +732,10 @@ class PaginationHandler:
             requester_user_id:
                 User who requested the action. Will be recorded as putting the room on the
                 blocking list.
+                If None, the action was not manually requested but instead
+                triggered automatically, e.g. through a Synapse module
+                or some other policy.
+                MUST NOT be None if block=True.
             new_room_user_id:
                 If set, a new room will be created with this user ID
                 as the creator and admin, and all users in the old room will be
@@ -818,7 +822,7 @@ class PaginationHandler:
     def start_shutdown_and_purge_room(
         self,
         room_id: str,
-        requester_user_id: str,
+        requester_user_id: Optional[str],
         new_room_user_id: Optional[str] = None,
         new_room_name: Optional[str] = None,
         message: Optional[str] = None,
@@ -833,6 +837,10 @@ class PaginationHandler:
             requester_user_id:
                 User who requested the action and put the room on the
                 blocking list.
+                If None, the action was not manually requested but instead
+                triggered automatically, e.g. through a Synapse module
+                or some other policy.
+                MUST NOT be None if block=True.
             new_room_user_id:
                 If set, a new room will be created with this user ID
                 as the creator and admin, and all users in the old room will be
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index f31e18328b..375c7d0901 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -13,13 +13,56 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""This module is responsible for keeping track of presence status of local
+"""
+This module is responsible for keeping track of presence status of local
 and remote users.
 
 The methods that define policy are:
     - PresenceHandler._update_states
     - PresenceHandler._handle_timeouts
     - should_notify
+
+# Tracking local presence
+
+For local users, presence is tracked on a per-device basis. When a user has multiple
+devices the user presence state is derived by coalescing the presence from each
+device:
+
+    BUSY > ONLINE > UNAVAILABLE > OFFLINE
+
+The time that each device was last active and last synced is tracked in order to
+automatically downgrade a device's presence state:
+
+    A device may move from ONLINE -> UNAVAILABLE, if it has not been active for
+    a period of time.
+
+    A device may go from any state -> OFFLINE, if it is not active and has not
+    synced for a period of time.
+
+The timeouts are handled using a wheel timer, which has coarse buckets. Timings
+do not need to be exact.
+
+Generally a device's presence state is updated whenever a user syncs (via the
+set_presence parameter), when the presence API is called, or if "pro-active"
+events occur, including:
+
+* Sending an event, receipt, read marker.
+* Updating typing status.
+
+The busy state has special status that it cannot is not downgraded by a call to
+sync with a lower priority state *and* it takes a long period of time to transition
+to offline.
+
+# Persisting (and restoring) presence
+
+For all users, presence is persisted on a per-user basis. Data is kept in-memory
+and persisted periodically. When Synapse starts each worker loads the current
+presence state and then tracks the presence stream to keep itself up-to-date.
+
+When restoring presence for local users a pseudo-device is created to match the
+user state; this device follows the normal timeout logic (see above) and will
+automatically be replaced with any information from currently available devices.
+
 """
 import abc
 import contextlib
@@ -30,6 +73,7 @@ from contextlib import contextmanager
 from types import TracebackType
 from typing import (
     TYPE_CHECKING,
+    AbstractSet,
     Any,
     Callable,
     Collection,
@@ -49,7 +93,7 @@ from prometheus_client import Counter
 import synapse.metrics
 from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
-from synapse.api.presence import UserPresenceState
+from synapse.api.presence import UserDevicePresenceState, UserPresenceState
 from synapse.appservice import ApplicationService
 from synapse.events.presence_router import PresenceRouter
 from synapse.logging.context import run_in_background
@@ -111,6 +155,8 @@ LAST_ACTIVE_GRANULARITY = 60 * 1000
 # How long to wait until a new /events or /sync request before assuming
 # the client has gone.
 SYNC_ONLINE_TIMEOUT = 30 * 1000
+# Busy status waits longer, but does eventually go offline.
+BUSY_ONLINE_TIMEOUT = 60 * 60 * 1000
 
 # How long to wait before marking the user as idle. Compared against last active
 IDLE_TIMER = 5 * 60 * 1000
@@ -137,6 +183,7 @@ class BasePresenceHandler(abc.ABC):
     writer"""
 
     def __init__(self, hs: "HomeServer"):
+        self.hs = hs
         self.clock = hs.get_clock()
         self.store = hs.get_datastores().main
         self._storage_controllers = hs.get_storage_controllers()
@@ -162,6 +209,7 @@ class BasePresenceHandler(abc.ABC):
             self.VALID_PRESENCE += (PresenceState.BUSY,)
 
         active_presence = self.store.take_presence_startup_info()
+        # The combined status across all user devices.
         self.user_to_current_state = {state.user_id: state for state in active_presence}
 
     @abc.abstractmethod
@@ -426,8 +474,6 @@ class _NullContextManager(ContextManager[None]):
 class WorkerPresenceHandler(BasePresenceHandler):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
-        self.hs = hs
-
         self._presence_writer_instance = hs.config.worker.writers.presence[0]
 
         # Route presence EDUs to the right worker
@@ -691,7 +737,6 @@ class WorkerPresenceHandler(BasePresenceHandler):
 class PresenceHandler(BasePresenceHandler):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
-        self.hs = hs
         self.wheel_timer: WheelTimer[str] = WheelTimer()
         self.notifier = hs.get_notifier()
 
@@ -708,9 +753,27 @@ class PresenceHandler(BasePresenceHandler):
             lambda: len(self.user_to_current_state),
         )
 
+        # The per-device presence state, maps user to devices to per-device presence state.
+        self._user_to_device_to_current_state: Dict[
+            str, Dict[Optional[str], UserDevicePresenceState]
+        ] = {}
+
         now = self.clock.time_msec()
         if self._presence_enabled:
             for state in self.user_to_current_state.values():
+                # Create a psuedo-device to properly handle time outs. This will
+                # be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT.
+                pseudo_device_id = None
+                self._user_to_device_to_current_state[state.user_id] = {
+                    pseudo_device_id: UserDevicePresenceState(
+                        user_id=state.user_id,
+                        device_id=pseudo_device_id,
+                        state=state.state,
+                        last_active_ts=state.last_active_ts,
+                        last_sync_ts=state.last_user_sync_ts,
+                    )
+                }
+
                 self.wheel_timer.insert(
                     now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
                 )
@@ -752,7 +815,7 @@ class PresenceHandler(BasePresenceHandler):
 
         # Keeps track of the number of *ongoing* syncs on other processes.
         #
-        # While any sync is ongoing on another process the user will never
+        # While any sync is ongoing on another process the user's device will never
         # go offline.
         #
         # Each process has a unique identifier and an update frequency. If
@@ -981,22 +1044,21 @@ class PresenceHandler(BasePresenceHandler):
 
         timers_fired_counter.inc(len(states))
 
-        syncing_user_ids = {
-            user_id
-            for (user_id, _), count in self._user_device_to_num_current_syncs.items()
+        # Set of user ID & device IDs which are currently syncing.
+        syncing_user_devices = {
+            user_id_device_id
+            for user_id_device_id, count in self._user_device_to_num_current_syncs.items()
             if count
         }
-        syncing_user_ids.update(
-            user_id
-            for user_id, _ in itertools.chain(
-                *self.external_process_to_current_syncs.values()
-            )
+        syncing_user_devices.update(
+            itertools.chain(*self.external_process_to_current_syncs.values())
         )
 
         changes = handle_timeouts(
             states,
             is_mine_fn=self.is_mine_id,
-            syncing_user_ids=syncing_user_ids,
+            syncing_user_devices=syncing_user_devices,
+            user_to_devices=self._user_to_device_to_current_state,
             now=now,
         )
 
@@ -1016,11 +1078,26 @@ class PresenceHandler(BasePresenceHandler):
 
         bump_active_time_counter.inc()
 
-        prev_state = await self.current_state_for_user(user_id)
+        now = self.clock.time_msec()
 
-        new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()}
-        if prev_state.state == PresenceState.UNAVAILABLE:
-            new_fields["state"] = PresenceState.ONLINE
+        # Update the device information & mark the device as online if it was
+        # unavailable.
+        devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+        device_state = devices.setdefault(
+            device_id,
+            UserDevicePresenceState.default(user_id, device_id),
+        )
+        device_state.last_active_ts = now
+        if device_state.state == PresenceState.UNAVAILABLE:
+            device_state.state = PresenceState.ONLINE
+
+        # Update the user state, this will always update last_active_ts and
+        # might update the presence state.
+        prev_state = await self.current_state_for_user(user_id)
+        new_fields: Dict[str, Any] = {
+            "last_active_ts": now,
+            "state": _combine_device_states(devices.values()),
+        }
 
         await self._update_states([prev_state.copy_and_replace(**new_fields)])
 
@@ -1132,6 +1209,12 @@ class PresenceHandler(BasePresenceHandler):
             if is_syncing and (user_id, device_id) not in process_presence:
                 process_presence.add((user_id, device_id))
             elif not is_syncing and (user_id, device_id) in process_presence:
+                devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+                device_state = devices.setdefault(
+                    device_id, UserDevicePresenceState.default(user_id, device_id)
+                )
+                device_state.last_sync_ts = sync_time_msec
+
                 new_state = prev_state.copy_and_replace(
                     last_user_sync_ts=sync_time_msec
                 )
@@ -1151,11 +1234,24 @@ class PresenceHandler(BasePresenceHandler):
             process_presence = self.external_process_to_current_syncs.pop(
                 process_id, set()
             )
-            prev_states = await self.current_state_for_users(
-                {user_id for user_id, device_id in process_presence}
-            )
+
             time_now_ms = self.clock.time_msec()
 
+            # Mark each device as having a last sync time.
+            updated_users = set()
+            for user_id, device_id in process_presence:
+                device_state = self._user_to_device_to_current_state.setdefault(
+                    user_id, {}
+                ).setdefault(
+                    device_id, UserDevicePresenceState.default(user_id, device_id)
+                )
+
+                device_state.last_sync_ts = time_now_ms
+                updated_users.add(user_id)
+
+            # Update each user (and insert into the appropriate timers to check if
+            # they've gone offline).
+            prev_states = await self.current_state_for_users(updated_users)
             await self._update_states(
                 [
                     prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
@@ -1277,6 +1373,20 @@ class PresenceHandler(BasePresenceHandler):
         if prev_state.state == PresenceState.BUSY and is_sync:
             presence = PresenceState.BUSY
 
+        # Update the device specific information.
+        devices = self._user_to_device_to_current_state.setdefault(user_id, {})
+        device_state = devices.setdefault(
+            device_id,
+            UserDevicePresenceState.default(user_id, device_id),
+        )
+        device_state.state = presence
+        device_state.last_active_ts = now
+        if is_sync:
+            device_state.last_sync_ts = now
+
+        # Based on the state of each user's device calculate the new presence state.
+        presence = _combine_device_states(devices.values())
+
         new_fields = {"state": presence}
 
         if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
@@ -1873,7 +1983,8 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
 def handle_timeouts(
     user_states: List[UserPresenceState],
     is_mine_fn: Callable[[str], bool],
-    syncing_user_ids: Set[str],
+    syncing_user_devices: AbstractSet[Tuple[str, Optional[str]]],
+    user_to_devices: Dict[str, Dict[Optional[str], UserDevicePresenceState]],
     now: int,
 ) -> List[UserPresenceState]:
     """Checks the presence of users that have timed out and updates as
@@ -1882,7 +1993,8 @@ def handle_timeouts(
     Args:
         user_states: List of UserPresenceState's to check.
         is_mine_fn: Function that returns if a user_id is ours
-        syncing_user_ids: Set of user_ids with active syncs.
+        syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
+        user_to_devices: A map of user ID to device ID to UserDevicePresenceState.
         now: Current time in ms.
 
     Returns:
@@ -1891,9 +2003,16 @@ def handle_timeouts(
     changes = {}  # Actual changes we need to notify people about
 
     for state in user_states:
-        is_mine = is_mine_fn(state.user_id)
-
-        new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
+        user_id = state.user_id
+        is_mine = is_mine_fn(user_id)
+
+        new_state = handle_timeout(
+            state,
+            is_mine,
+            syncing_user_devices,
+            user_to_devices.get(user_id, {}),
+            now,
+        )
         if new_state:
             changes[state.user_id] = new_state
 
@@ -1901,14 +2020,19 @@ def handle_timeouts(
 
 
 def handle_timeout(
-    state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
+    state: UserPresenceState,
+    is_mine: bool,
+    syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]],
+    user_devices: Dict[Optional[str], UserDevicePresenceState],
+    now: int,
 ) -> Optional[UserPresenceState]:
     """Checks the presence of the user to see if any of the timers have elapsed
 
     Args:
-        state
+        state: UserPresenceState to check.
         is_mine: Whether the user is ours
-        syncing_user_ids: Set of user_ids with active syncs.
+        syncing_user_devices: A set of (user ID, device ID) tuples with active syncs..
+        user_devices: A map of device ID to UserDevicePresenceState.
         now: Current time in ms.
 
     Returns:
@@ -1919,34 +2043,63 @@ def handle_timeout(
         return None
 
     changed = False
-    user_id = state.user_id
 
     if is_mine:
-        if state.state == PresenceState.ONLINE:
-            if now - state.last_active_ts > IDLE_TIMER:
-                # Currently online, but last activity ages ago so auto
-                # idle
-                state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
-                changed = True
-            elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
-                # So that we send down a notification that we've
-                # stopped updating.
+        # Check per-device whether the device should be considered idle or offline
+        # due to timeouts.
+        device_changed = False
+        offline_devices = []
+        for device_id, device_state in user_devices.items():
+            if device_state.state == PresenceState.ONLINE:
+                if now - device_state.last_active_ts > IDLE_TIMER:
+                    # Currently online, but last activity ages ago so auto
+                    # idle
+                    device_state.state = PresenceState.UNAVAILABLE
+                    device_changed = True
+
+            # If there are have been no sync for a while (and none ongoing),
+            # set presence to offline.
+            if (state.user_id, device_id) not in syncing_device_ids:
+                # If the user has done something recently but hasn't synced,
+                # don't set them as offline.
+                sync_or_active = max(
+                    device_state.last_sync_ts, device_state.last_active_ts
+                )
+
+                # Implementations aren't meant to timeout a device with a busy
+                # state, but it needs to timeout *eventually* or else the user
+                # will be stuck in that state.
+                online_timeout = (
+                    BUSY_ONLINE_TIMEOUT
+                    if device_state.state == PresenceState.BUSY
+                    else SYNC_ONLINE_TIMEOUT
+                )
+                if now - sync_or_active > online_timeout:
+                    # Mark the device as going offline.
+                    offline_devices.append(device_id)
+                    device_changed = True
+
+        # Offline devices are not needed and do not add information.
+        for device_id in offline_devices:
+            user_devices.pop(device_id)
+
+        # If the presence state of the devices changed, then (maybe) update
+        # the user's overall presence state.
+        if device_changed:
+            new_presence = _combine_device_states(user_devices.values())
+            if new_presence != state.state:
+                state = state.copy_and_replace(state=new_presence)
                 changed = True
 
+        if now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+            # So that we send down a notification that we've
+            # stopped updating.
+            changed = True
+
         if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
             # Need to send ping to other servers to ensure they don't
             # timeout and set us to offline
             changed = True
-
-        # If there are have been no sync for a while (and none ongoing),
-        # set presence to offline
-        if user_id not in syncing_user_ids:
-            # If the user has done something recently but hasn't synced,
-            # don't set them as offline.
-            sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
-            if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
-                state = state.copy_and_replace(state=PresenceState.OFFLINE)
-                changed = True
     else:
         # We expect to be poked occasionally by the other side.
         # This is to protect against forgetful/buggy servers, so that
@@ -2021,6 +2174,13 @@ def handle_update(
                 new_state = new_state.copy_and_replace(last_federation_update_ts=now)
                 federation_ping = True
 
+        if new_state.state == PresenceState.BUSY:
+            wheel_timer.insert(
+                now=now,
+                obj=user_id,
+                then=new_state.last_user_sync_ts + BUSY_ONLINE_TIMEOUT,
+            )
+
     else:
         wheel_timer.insert(
             now=now,
@@ -2036,6 +2196,46 @@ def handle_update(
     return new_state, persist_and_notify, federation_ping
 
 
+PRESENCE_BY_PRIORITY = {
+    PresenceState.BUSY: 4,
+    PresenceState.ONLINE: 3,
+    PresenceState.UNAVAILABLE: 2,
+    PresenceState.OFFLINE: 1,
+}
+
+
+def _combine_device_states(
+    device_states: Iterable[UserDevicePresenceState],
+) -> str:
+    """
+    Find the device to use presence information from.
+
+    Orders devices by priority, then last_active_ts.
+
+    Args:
+        device_states: An iterable of device presence states
+
+    Return:
+        The combined presence state.
+    """
+
+    # Based on (all) the user's devices calculate the new presence state.
+    presence = PresenceState.OFFLINE
+    last_active_ts = -1
+
+    # Find the device to use the presence state of based on the presence priority,
+    # but tie-break with how recently the device has been seen.
+    for device_state in device_states:
+        if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > (
+            PRESENCE_BY_PRIORITY[presence],
+            last_active_ts,
+        ):
+            presence = device_state.state
+            last_active_ts = device_state.last_active_ts
+
+    return presence
+
+
 async def get_interested_parties(
     store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState]
 ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 0513e28aab..7a762c8511 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1787,7 +1787,7 @@ class RoomShutdownHandler:
     async def shutdown_room(
         self,
         room_id: str,
-        requester_user_id: str,
+        requester_user_id: Optional[str],
         new_room_user_id: Optional[str] = None,
         new_room_name: Optional[str] = None,
         message: Optional[str] = None,
@@ -1811,6 +1811,10 @@ class RoomShutdownHandler:
             requester_user_id:
                 User who requested the action and put the room on the
                 blocking list.
+                If None, the action was not manually requested but instead
+                triggered automatically, e.g. through a Synapse module
+                or some other policy.
+                MUST NOT be None if block=True.
             new_room_user_id:
                 If set, a new room will be created with this user ID
                 as the creator and admin, and all users in the old room will be
@@ -1863,6 +1867,10 @@ class RoomShutdownHandler:
 
         # Action the block first (even if the room doesn't exist yet)
         if block:
+            if requester_user_id is None:
+                raise ValueError(
+                    "shutdown_room: block=True not allowed when requester_user_id is None."
+                )
             # This will work even if the room is already blocked, but that is
             # desirable in case the first attempt at blocking the room failed below.
             await self.store.block_room(room_id, requester_user_id)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 60a9f341b5..0ccd7d250c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -40,6 +40,7 @@ from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
+from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
 from synapse.handlers.relations import BundledAggregations
 from synapse.logging import issue9533_logger
 from synapse.logging.context import current_context
@@ -268,6 +269,7 @@ class SyncHandler:
         self._storage_controllers = hs.get_storage_controllers()
         self._state_storage_controller = self._storage_controllers.state
         self._device_handler = hs.get_device_handler()
+        self._task_scheduler = hs.get_task_scheduler()
 
         self.should_calculate_push_rules = hs.config.push.enable_push
 
@@ -360,11 +362,19 @@ class SyncHandler:
         # (since we now know that the device has received them)
         if since_token is not None:
             since_stream_id = since_token.to_device_key
-            deleted = await self.store.delete_messages_for_device(
-                sync_config.user.to_string(), sync_config.device_id, since_stream_id
+            # Delete device messages asynchronously and in batches using the task scheduler
+            await self._task_scheduler.schedule_task(
+                DELETE_DEVICE_MSGS_TASK_NAME,
+                resource_id=sync_config.device_id,
+                params={
+                    "user_id": sync_config.user.to_string(),
+                    "device_id": sync_config.device_id,
+                    "up_to_stream_id": since_stream_id,
+                },
             )
             logger.debug(
-                "Deleted %d to-device messages up to %d", deleted, since_stream_id
+                "Deletion of to-device messages up to %d scheduled",
+                since_stream_id,
             )
 
         if timeout == 0 or since_token is None or full_state:
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index 91a24efcd0..a3a396bb37 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -399,15 +399,34 @@ class MatrixHostnameEndpoint:
         if port or _is_ip_literal(host):
             return [Server(host, port or 8448)]
 
+        # Check _matrix-fed._tcp SRV record.
         logger.debug("Looking up SRV record for %s", host.decode(errors="replace"))
+        server_list = await self._srv_resolver.resolve_service(
+            b"_matrix-fed._tcp." + host
+        )
+
+        if server_list:
+            if logger.isEnabledFor(logging.DEBUG):
+                logger.debug(
+                    "Got %s from SRV lookup for %s",
+                    ", ".join(map(str, server_list)),
+                    host.decode(errors="replace"),
+                )
+            return server_list
+
+        # No _matrix-fed._tcp SRV record, fallback to legacy _matrix._tcp SRV record.
+        logger.debug(
+            "Looking up deprecated SRV record for %s", host.decode(errors="replace")
+        )
         server_list = await self._srv_resolver.resolve_service(b"_matrix._tcp." + host)
 
         if server_list:
-            logger.debug(
-                "Got %s from SRV lookup for %s",
-                ", ".join(map(str, server_list)),
-                host.decode(errors="replace"),
-            )
+            if logger.isEnabledFor(logging.DEBUG):
+                logger.debug(
+                    "Got %s from deprecated SRV lookup for %s",
+                    ", ".join(map(str, server_list)),
+                    host.decode(errors="replace"),
+                )
             return server_list
 
         # No SRV records, so we fallback to host and 8448
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index 64c6ae4512..bf7e311026 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -728,7 +728,7 @@ async def _unwrap_awaitable(awaitable: Awaitable[R]) -> R:
 
 
 @overload
-def preserve_fn(  # type: ignore[misc]
+def preserve_fn(
     f: Callable[P, Awaitable[R]],
 ) -> Callable[P, "defer.Deferred[R]"]:
     # The `type: ignore[misc]` above suppresses
@@ -756,7 +756,7 @@ def preserve_fn(
 
 
 @overload
-def run_in_background(  # type: ignore[misc]
+def run_in_background(
     f: Callable[P, Awaitable[R]], *args: P.args, **kwargs: P.kwargs
 ) -> "defer.Deferred[R]":
     # The `type: ignore[misc]` above suppresses
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 2f00a7ba20..d6efe10a28 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -1730,6 +1730,19 @@ class ModuleApi:
         room_alias_str = room_alias.to_string() if room_alias else None
         return room_id, room_alias_str
 
+    async def delete_room(self, room_id: str) -> None:
+        """
+        Schedules the deletion of a room from Synapse's database.
+
+        If the room is already being deleted, this method does nothing.
+        This method does not wait for the room to be deleted.
+
+        Added in Synapse v1.89.0.
+        """
+        # Future extensions to this method might want to e.g. allow use of `force_purge`.
+        # TODO In the future we should make sure this is persistent.
+        self._hs.get_pagination_handler().start_shutdown_and_purge_room(room_id, None)
+
     async def set_displayname(
         self,
         user_id: UserID,
diff --git a/synapse/module_api/callbacks/third_party_event_rules_callbacks.py b/synapse/module_api/callbacks/third_party_event_rules_callbacks.py
index 911f37ba42..ecaeef3511 100644
--- a/synapse/module_api/callbacks/third_party_event_rules_callbacks.py
+++ b/synapse/module_api/callbacks/third_party_event_rules_callbacks.py
@@ -40,7 +40,7 @@ CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK = Callable[
     [str, StateMap[EventBase], str], Awaitable[bool]
 ]
 ON_NEW_EVENT_CALLBACK = Callable[[EventBase, StateMap[EventBase]], Awaitable]
-CHECK_CAN_SHUTDOWN_ROOM_CALLBACK = Callable[[str, str], Awaitable[bool]]
+CHECK_CAN_SHUTDOWN_ROOM_CALLBACK = Callable[[Optional[str], str], Awaitable[bool]]
 CHECK_CAN_DEACTIVATE_USER_CALLBACK = Callable[[str, bool], Awaitable[bool]]
 ON_PROFILE_UPDATE_CALLBACK = Callable[[str, ProfileInfo, bool, bool], Awaitable]
 ON_USER_DEACTIVATION_STATUS_CHANGED_CALLBACK = Callable[[str, bool, bool], Awaitable]
@@ -429,12 +429,17 @@ class ThirdPartyEventRulesModuleApiCallbacks:
                     "Failed to run module API callback %s: %s", callback, e
                 )
 
-    async def check_can_shutdown_room(self, user_id: str, room_id: str) -> bool:
+    async def check_can_shutdown_room(
+        self, user_id: Optional[str], room_id: str
+    ) -> bool:
         """Intercept requests to shutdown a room. If `False` is returned, the
          room must not be shut down.
 
         Args:
-            requester: The ID of the user requesting the shutdown.
+            user_id: The ID of the user requesting the shutdown.
+                If no user ID is supplied, then the room is being shut down through
+                some mechanism other than a user's request, e.g. through a module's
+                request.
             room_id: The ID of the room.
         """
         for callback in self._check_can_shutdown_room_callbacks:
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index b471fcb064..744e98c6d0 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -349,7 +349,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                     table="devices",
                     column="user_id",
                     iterable=user_ids_to_query,
-                    keyvalues={"user_id": user_id, "hidden": False},
+                    keyvalues={"hidden": False},
                     retcols=("device_id",),
                 )
 
@@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 
     @trace
     async def delete_messages_for_device(
-        self, user_id: str, device_id: Optional[str], up_to_stream_id: int
+        self,
+        user_id: str,
+        device_id: Optional[str],
+        up_to_stream_id: int,
+        limit: int,
     ) -> int:
         """
         Args:
             user_id: The recipient user_id.
             device_id: The recipient device_id.
             up_to_stream_id: Where to delete messages up to.
+            limit: maximum number of messages to delete
 
         Returns:
             The number of messages deleted.
@@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 log_kv({"message": "No changes in cache since last check"})
                 return 0
 
+        ROW_ID_NAME = self.database_engine.row_id_name
+
         def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
-            sql = (
-                "DELETE FROM device_inbox"
-                " WHERE user_id = ? AND device_id = ?"
-                " AND stream_id <= ?"
-            )
+            sql = f"""
+                DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
+                  SELECT {ROW_ID_NAME} FROM device_inbox
+                  WHERE user_id = ? AND device_id = ? AND stream_id <= ?
+                  LIMIT {limit}
+                )
+                """
             txn.execute(sql, (user_id, device_id, up_to_stream_id))
             return txn.rowcount
 
@@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 
         log_kv({"message": f"deleted {count} messages for device", "count": count})
 
+        # In this case we don't know if we hit the limit or the delete is complete
+        # so let's not update the cache.
+        if count == limit:
+            return count
+
         # Update the cache, ensuring that we only ever increase the value
         updated_last_deleted_stream_id = self._last_device_delete_cache.get(
             (user_id, device_id), 0
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index e4162f846b..324fdfa892 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1766,14 +1766,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
 
             self.db_pool.simple_delete_many_txn(
                 txn,
-                table="device_inbox",
-                column="device_id",
-                values=device_ids,
-                keyvalues={"user_id": user_id},
-            )
-
-            self.db_pool.simple_delete_many_txn(
-                txn,
                 table="device_auth_providers",
                 column="device_id",
                 values=device_ids,
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index a3b4744855..57aa4921e1 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -221,12 +221,17 @@ class KeyStore(CacheInvalidationWorkerStore):
             """Processes a batch of keys to fetch, and adds the result to `keys`."""
 
             # batch_iter always returns tuples so it's safe to do len(batch)
-            sql = """
-            SELECT server_name, key_id, key_json, ts_valid_until_ms
-            FROM server_keys_json WHERE 1=0
-            """ + " OR (server_name=? AND key_id=?)" * len(
-                batch
-            )
+            where_clause = " OR (server_name=? AND key_id=?)" * len(batch)
+
+            # `server_keys_json` can have multiple entries per server (one per
+            # remote server we fetched from, if using perspectives). Order by
+            # `ts_added_ms` so the most recently fetched one always wins.
+            sql = f"""
+                SELECT server_name, key_id, key_json, ts_valid_until_ms
+                FROM server_keys_json WHERE 1=0
+                {where_clause}
+                ORDER BY ts_added_ms
+            """
 
             txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
 
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 5ee5c7ad9f..e4d10ff250 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -939,11 +939,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
         receipts."""
 
         def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
-            if isinstance(self.database_engine, PostgresEngine):
-                ROW_ID_NAME = "ctid"
-            else:
-                ROW_ID_NAME = "rowid"
-
+            ROW_ID_NAME = self.database_engine.row_id_name
             # Identify any duplicate receipts arising from
             # https://github.com/matrix-org/synapse/issues/14406.
             # The following query takes less than a minute on matrix.org.
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 0b5b3bf03e..b1a2418cbd 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -100,6 +100,12 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
         """Gets a string giving the server version. For example: '3.22.0'"""
         ...
 
+    @property
+    @abc.abstractmethod
+    def row_id_name(self) -> str:
+        """Gets the literal name representing a row id for this engine."""
+        ...
+
     @abc.abstractmethod
     def in_transaction(self, conn: ConnectionType) -> bool:
         """Whether the connection is currently in a transaction."""
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 05a72dc554..6309363217 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -211,6 +211,10 @@ class PostgresEngine(
         else:
             return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)
 
+    @property
+    def row_id_name(self) -> str:
+        return "ctid"
+
     def in_transaction(self, conn: psycopg2.extensions.connection) -> bool:
         return conn.status != psycopg2.extensions.STATUS_READY
 
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index ca8c59297c..802069e1e1 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -123,6 +123,10 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
         """Gets a string giving the server version. For example: '3.22.0'."""
         return "%i.%i.%i" % sqlite3.sqlite_version_info
 
+    @property
+    def row_id_name(self) -> str:
+        return "rowid"
+
     def in_transaction(self, conn: sqlite3.Connection) -> bool:
         return conn.in_transaction
 
diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py
index ad2da4c8af..622686d28f 100644
--- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py
+++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py
@@ -14,7 +14,7 @@
 
 
 from synapse.storage.database import LoggingTransaction
-from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+from synapse.storage.engines import BaseDatabaseEngine
 from synapse.storage.prepare_database import get_statements
 
 FIX_INDEXES = """
@@ -37,7 +37,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
 
 
 def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
-    rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
+    rowid = database_engine.row_id_name
 
     # remove duplicates from group_users & group_invites tables
     cur.execute(
diff --git a/synapse/util/gai_resolver.py b/synapse/util/gai_resolver.py
index 214eb17fbc..fecf829ade 100644
--- a/synapse/util/gai_resolver.py
+++ b/synapse/util/gai_resolver.py
@@ -136,7 +136,7 @@ class GAIResolver:
 
     # The types on IHostnameResolver is incorrect in Twisted, see
     # https://twistedmatrix.com/trac/ticket/10276
-    def resolveHostName(  # type: ignore[override]
+    def resolveHostName(
         self,
         resolutionReceiver: IResolutionReceiver,
         hostName: str,
diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py
index 9e89aeb748..9b2581e51a 100644
--- a/synapse/util/task_scheduler.py
+++ b/synapse/util/task_scheduler.py
@@ -77,6 +77,7 @@ class TaskScheduler:
     LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000  # 24hrs
 
     def __init__(self, hs: "HomeServer"):
+        self._hs = hs
         self._store = hs.get_datastores().main
         self._clock = hs.get_clock()
         self._running_tasks: Set[str] = set()
@@ -97,8 +98,6 @@ class TaskScheduler:
                 "handle_scheduled_tasks",
                 self._handle_scheduled_tasks,
             )
-        else:
-            self.replication_client = hs.get_replication_command_handler()
 
     def register_action(
         self,
@@ -133,7 +132,7 @@ class TaskScheduler:
         params: Optional[JsonMapping] = None,
     ) -> str:
         """Schedule a new potentially resumable task. A function matching the specified
-        `action` should have been previously registered with `register_action`.
+        `action` should have be registered with `register_action` before the task is run.
 
         Args:
             action: the name of a previously registered action
@@ -149,11 +148,6 @@ class TaskScheduler:
         Returns:
             The id of the scheduled task
         """
-        if action not in self._actions:
-            raise Exception(
-                f"No function associated with action {action} of the scheduled task"
-            )
-
         status = TaskStatus.SCHEDULED
         if timestamp is None or timestamp < self._clock.time_msec():
             timestamp = self._clock.time_msec()
@@ -175,7 +169,7 @@ class TaskScheduler:
             if self._run_background_tasks:
                 await self._launch_task(task)
             else:
-                self.replication_client.send_new_active_task(task.id)
+                self._hs.get_replication_command_handler().send_new_active_task(task.id)
 
         return task.id
 
@@ -315,7 +309,10 @@ class TaskScheduler:
         """
         assert self._run_background_tasks
 
-        assert task.action in self._actions
+        if task.action not in self._actions:
+            raise Exception(
+                f"No function associated with action {task.action} of the scheduled task {task.id}"
+            )
         function = self._actions[task.action]
 
         async def wrapper() -> None:
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 46d022092e..a7e6cdd66a 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -422,6 +422,18 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
             "exclusive_as_user", "password", self.exclusive_as_user_device_id
         )
 
+        self.exclusive_as_user_2_device_id = "exclusive_as_device_2"
+        self.exclusive_as_user_2 = self.register_user("exclusive_as_user_2", "password")
+        self.exclusive_as_user_2_token = self.login(
+            "exclusive_as_user_2", "password", self.exclusive_as_user_2_device_id
+        )
+
+        self.exclusive_as_user_3_device_id = "exclusive_as_device_3"
+        self.exclusive_as_user_3 = self.register_user("exclusive_as_user_3", "password")
+        self.exclusive_as_user_3_token = self.login(
+            "exclusive_as_user_3", "password", self.exclusive_as_user_3_device_id
+        )
+
     def _notify_interested_services(self) -> None:
         # This is normally set in `notify_interested_services` but we need to call the
         # internal async version so the reactor gets pushed to completion.
@@ -849,6 +861,119 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
         for count in service_id_to_message_count.values():
             self.assertEqual(count, number_of_messages)
 
+    @unittest.override_config(
+        {"experimental_features": {"msc2409_to_device_messages_enabled": True}}
+    )
+    def test_application_services_receive_local_to_device_for_many_users(self) -> None:
+        """
+        Test that when a user sends a to-device message to many users
+        in an application service's user namespace, the
+        application service will receive all of them.
+        """
+        interested_appservice = self._register_application_service(
+            namespaces={
+                ApplicationService.NS_USERS: [
+                    {
+                        "regex": "@exclusive_as_user:.+",
+                        "exclusive": True,
+                    },
+                    {
+                        "regex": "@exclusive_as_user_2:.+",
+                        "exclusive": True,
+                    },
+                    {
+                        "regex": "@exclusive_as_user_3:.+",
+                        "exclusive": True,
+                    },
+                ],
+            },
+        )
+
+        # Have local_user send a to-device message to exclusive_as_users
+        message_content = {"some_key": "some really interesting value"}
+        chan = self.make_request(
+            "PUT",
+            "/_matrix/client/r0/sendToDevice/m.room_key_request/3",
+            content={
+                "messages": {
+                    self.exclusive_as_user: {
+                        self.exclusive_as_user_device_id: message_content
+                    },
+                    self.exclusive_as_user_2: {
+                        self.exclusive_as_user_2_device_id: message_content
+                    },
+                    self.exclusive_as_user_3: {
+                        self.exclusive_as_user_3_device_id: message_content
+                    },
+                }
+            },
+            access_token=self.local_user_token,
+        )
+        self.assertEqual(chan.code, 200, chan.result)
+
+        # Have exclusive_as_user send a to-device message to local_user
+        for user_token in [
+            self.exclusive_as_user_token,
+            self.exclusive_as_user_2_token,
+            self.exclusive_as_user_3_token,
+        ]:
+            chan = self.make_request(
+                "PUT",
+                "/_matrix/client/r0/sendToDevice/m.room_key_request/4",
+                content={
+                    "messages": {
+                        self.local_user: {self.local_user_device_id: message_content}
+                    }
+                },
+                access_token=user_token,
+            )
+            self.assertEqual(chan.code, 200, chan.result)
+
+        # Check if our application service - that is interested in exclusive_as_user - received
+        # the to-device message as part of an AS transaction.
+        # Only the local_user -> exclusive_as_user to-device message should have been forwarded to the AS.
+        #
+        # The uninterested application service should not have been notified at all.
+        self.send_mock.assert_called_once()
+        (
+            service,
+            _events,
+            _ephemeral,
+            to_device_messages,
+            _otks,
+            _fbks,
+            _device_list_summary,
+        ) = self.send_mock.call_args[0]
+
+        # Assert that this was the same to-device message that local_user sent
+        self.assertEqual(service, interested_appservice)
+
+        # Assert expected number of messages
+        self.assertEqual(len(to_device_messages), 3)
+
+        for device_msg in to_device_messages:
+            self.assertEqual(device_msg["type"], "m.room_key_request")
+            self.assertEqual(device_msg["sender"], self.local_user)
+            self.assertEqual(device_msg["content"], message_content)
+
+        self.assertEqual(to_device_messages[0]["to_user_id"], self.exclusive_as_user)
+        self.assertEqual(
+            to_device_messages[0]["to_device_id"],
+            self.exclusive_as_user_device_id,
+        )
+
+        self.assertEqual(to_device_messages[1]["to_user_id"], self.exclusive_as_user_2)
+        self.assertEqual(
+            to_device_messages[1]["to_device_id"],
+            self.exclusive_as_user_2_device_id,
+        )
+
+        self.assertEqual(to_device_messages[2]["to_user_id"], self.exclusive_as_user_3)
+        self.assertEqual(
+            to_device_messages[2]["to_device_id"],
+            self.exclusive_as_user_3_device_id,
+        )
+
     def _register_application_service(
         self,
         namespaces: Optional[Dict[str, Iterable[Dict]]] = None,
diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py
index 55a4f95ef3..9659a4a355 100644
--- a/tests/handlers/test_device.py
+++ b/tests/handlers/test_device.py
@@ -30,6 +30,7 @@ from synapse.server import HomeServer
 from synapse.storage.databases.main.appservice import _make_exclusive_regex
 from synapse.types import JsonDict, create_requester
 from synapse.util import Clock
+from synapse.util.task_scheduler import TaskScheduler
 
 from tests import unittest
 from tests.unittest import override_config
@@ -49,6 +50,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
         assert isinstance(handler, DeviceHandler)
         self.handler = handler
         self.store = hs.get_datastores().main
+        self.device_message_handler = hs.get_device_message_handler()
         return hs
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
@@ -211,6 +213,51 @@ class DeviceTestCase(unittest.HomeserverTestCase):
         )
         self.assertIsNone(res)
 
+    def test_delete_device_and_big_device_inbox(self) -> None:
+        """Check that deleting a big device inbox is staged and batched asynchronously."""
+        DEVICE_ID = "abc"
+        sender = "@sender:" + self.hs.hostname
+        receiver = "@receiver:" + self.hs.hostname
+        self._record_user(sender, DEVICE_ID, DEVICE_ID)
+        self._record_user(receiver, DEVICE_ID, DEVICE_ID)
+
+        # queue a bunch of messages in the inbox
+        requester = create_requester(sender, device_id=DEVICE_ID)
+        for i in range(0, DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT + 10):
+            self.get_success(
+                self.device_message_handler.send_device_message(
+                    requester, "message_type", {receiver: {"*": {"val": i}}}
+                )
+            )
+
+        # delete the device
+        self.get_success(self.handler.delete_devices(receiver, [DEVICE_ID]))
+
+        # messages should be deleted up to DEVICE_MSGS_DELETE_BATCH_LIMIT straight away
+        res = self.get_success(
+            self.store.db_pool.simple_select_list(
+                table="device_inbox",
+                keyvalues={"user_id": receiver},
+                retcols=("user_id", "device_id", "stream_id"),
+                desc="get_device_id_from_device_inbox",
+            )
+        )
+        self.assertEqual(10, len(res))
+
+        # wait for the task scheduler to do a second delete pass
+        self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)
+
+        # remaining messages should now be deleted
+        res = self.get_success(
+            self.store.db_pool.simple_select_list(
+                table="device_inbox",
+                keyvalues={"user_id": receiver},
+                retcols=("user_id", "device_id", "stream_id"),
+                desc="get_device_id_from_device_inbox",
+            )
+        )
+        self.assertEqual(0, len(res))
+
     def test_update_device(self) -> None:
         self._record_users()
 
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 88a16193a3..638787b029 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -21,11 +21,12 @@ from signedjson.key import generate_signing_key
 from twisted.test.proto_helpers import MemoryReactor
 
 from synapse.api.constants import EventTypes, Membership, PresenceState
-from synapse.api.presence import UserPresenceState
+from synapse.api.presence import UserDevicePresenceState, UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events.builder import EventBuilder
 from synapse.federation.sender import FederationSender
 from synapse.handlers.presence import (
+    BUSY_ONLINE_TIMEOUT,
     EXTERNAL_PROCESS_EXPIRY,
     FEDERATION_PING_INTERVAL,
     FEDERATION_TIMEOUT,
@@ -352,6 +353,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
 
     def test_idle_timer(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -362,8 +364,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_user_sync_ts=now,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNotNone(new_state)
         assert new_state is not None
@@ -376,6 +391,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
         presence state into unavailable.
         """
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -386,8 +402,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_user_sync_ts=now,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNotNone(new_state)
         assert new_state is not None
@@ -396,6 +425,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
 
     def test_sync_timeout(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -406,8 +436,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNotNone(new_state)
         assert new_state is not None
@@ -416,6 +459,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
 
     def test_sync_online(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -426,9 +470,20 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
         new_state = handle_timeout(
-            state, is_mine=True, syncing_user_ids={user_id}, now=now
+            state,
+            is_mine=True,
+            syncing_device_ids={(user_id, device_id)},
+            user_devices={device_id: device_state},
+            now=now,
         )
 
         self.assertIsNotNone(new_state)
@@ -438,6 +493,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
 
     def test_federation_ping(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -449,14 +505,28 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNotNone(new_state)
         self.assertEqual(state, new_state)
 
     def test_no_timeout(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         now = 5000000
 
         state = UserPresenceState.default(user_id)
@@ -466,8 +536,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_user_sync_ts=now,
             last_federation_update_ts=now,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNone(new_state)
 
@@ -485,8 +568,9 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             status_msg=status_msg,
         )
 
+        # Note that this is a remote user so we do not have their device information.
         new_state = handle_timeout(
-            state, is_mine=False, syncing_user_ids=set(), now=now
+            state, is_mine=False, syncing_device_ids=set(), user_devices={}, now=now
         )
 
         self.assertIsNotNone(new_state)
@@ -496,6 +580,7 @@ class PresenceTimeoutTestCase(unittest.TestCase):
 
     def test_last_active(self) -> None:
         user_id = "@foo:bar"
+        device_id = "dev-1"
         status_msg = "I'm here!"
         now = 5000000
 
@@ -507,8 +592,21 @@ class PresenceTimeoutTestCase(unittest.TestCase):
             last_federation_update_ts=now,
             status_msg=status_msg,
         )
+        device_state = UserDevicePresenceState(
+            user_id=user_id,
+            device_id=device_id,
+            state=state.state,
+            last_active_ts=state.last_active_ts,
+            last_sync_ts=state.last_user_sync_ts,
+        )
 
-        new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
+        new_state = handle_timeout(
+            state,
+            is_mine=True,
+            syncing_device_ids=set(),
+            user_devices={device_id: device_state},
+            now=now,
+        )
 
         self.assertIsNotNone(new_state)
         self.assertEqual(state, new_state)
@@ -579,7 +677,7 @@ class PresenceHandlerInitTestCase(unittest.HomeserverTestCase):
         [
             (PresenceState.BUSY, PresenceState.BUSY),
             (PresenceState.ONLINE, PresenceState.ONLINE),
-            (PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE),
+            (PresenceState.UNAVAILABLE, PresenceState.ONLINE),
             # Offline syncs don't update the state.
             (PresenceState.OFFLINE, PresenceState.ONLINE),
         ]
@@ -800,6 +898,486 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
         # we should now be online
         self.assertEqual(state.state, PresenceState.ONLINE)
 
+    @parameterized.expand(
+        # A list of tuples of 4 strings:
+        #
+        # * The presence state of device 1.
+        # * The presence state of device 2.
+        # * The expected user presence state after both devices have synced.
+        # * The expected user presence state after device 1 has idled.
+        # * The expected user presence state after device 2 has idled.
+        # * True to use workers, False a monolith.
+        [
+            (*cases, workers)
+            for workers in (False, True)
+            for cases in [
+                # If both devices have the same state, online should eventually idle.
+                # Otherwise, the state doesn't change.
+                (
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                ),
+                # If the second device has a "lower" state it should fallback to it,
+                # except for "busy" which overrides.
+                (
+                    PresenceState.BUSY,
+                    PresenceState.ONLINE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.BUSY,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.BUSY,
+                    PresenceState.OFFLINE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.OFFLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                # If the second device has a "higher" state it should override.
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+            ]
+        ],
+        name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[5] else 'monolith'}",
+    )
+    @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
+    def test_set_presence_from_syncing_multi_device(
+        self,
+        dev_1_state: str,
+        dev_2_state: str,
+        expected_state_1: str,
+        expected_state_2: str,
+        expected_state_3: str,
+        test_with_workers: bool,
+    ) -> None:
+        """
+        Test the behaviour of multiple devices syncing at the same time.
+
+        Roughly the user's presence state should be set to the "highest" priority
+        of all the devices. When a device then goes offline its state should be
+        discarded and the next highest should win.
+
+        Note that these tests use the idle timer (and don't close the syncs), it
+        is unlikely that a *single* sync would last this long, but is close enough
+        to continually syncing with that current state.
+        """
+        user_id = f"@test:{self.hs.config.server.server_name}"
+
+        # By default, we call /sync against the main process.
+        worker_presence_handler = self.presence_handler
+        if test_with_workers:
+            # Create a worker and use it to handle /sync traffic instead.
+            # This is used to test that presence changes get replicated from workers
+            # to the main process correctly.
+            worker_to_sync_against = self.make_worker_hs(
+                "synapse.app.generic_worker", {"worker_name": "synchrotron"}
+            )
+            worker_presence_handler = worker_to_sync_against.get_presence_handler()
+
+        # 1. Sync with the first device.
+        self.get_success(
+            worker_presence_handler.user_syncing(
+                user_id,
+                "dev-1",
+                affect_presence=dev_1_state != PresenceState.OFFLINE,
+                presence_state=dev_1_state,
+            ),
+            by=0.01,
+        )
+
+        # 2. Wait half the idle timer.
+        self.reactor.advance(IDLE_TIMER / 1000 / 2)
+        self.reactor.pump([0.1])
+
+        # 3. Sync with the second device.
+        self.get_success(
+            worker_presence_handler.user_syncing(
+                user_id,
+                "dev-2",
+                affect_presence=dev_2_state != PresenceState.OFFLINE,
+                presence_state=dev_2_state,
+            ),
+            by=0.01,
+        )
+
+        # 4. Assert the expected presence state.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, expected_state_1)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, expected_state_1)
+
+        # When testing with workers, make another random sync (with any *different*
+        # user) to keep the process information from expiring.
+        #
+        # This is due to EXTERNAL_PROCESS_EXPIRY being equivalent to IDLE_TIMER.
+        if test_with_workers:
+            with self.get_success(
+                worker_presence_handler.user_syncing(
+                    f"@other-user:{self.hs.config.server.server_name}",
+                    "dev-3",
+                    affect_presence=True,
+                    presence_state=PresenceState.ONLINE,
+                ),
+                by=0.01,
+            ):
+                pass
+
+        # 5. Advance such that the first device should be discarded (the idle timer),
+        # then pump so _handle_timeouts function to called.
+        self.reactor.advance(IDLE_TIMER / 1000 / 2)
+        self.reactor.pump([0.01])
+
+        # 6. Assert the expected presence state.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, expected_state_2)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, expected_state_2)
+
+        # 7. Advance such that the second device should be discarded (half the idle timer),
+        # then pump so _handle_timeouts function to called.
+        self.reactor.advance(IDLE_TIMER / 1000 / 2)
+        self.reactor.pump([0.1])
+
+        # 8. The devices are still "syncing" (the sync context managers were never
+        # closed), so might idle.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, expected_state_3)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, expected_state_3)
+
+    @parameterized.expand(
+        # A list of tuples of 4 strings:
+        #
+        # * The presence state of device 1.
+        # * The presence state of device 2.
+        # * The expected user presence state after both devices have synced.
+        # * The expected user presence state after device 1 has stopped syncing.
+        # * True to use workers, False a monolith.
+        [
+            (*cases, workers)
+            for workers in (False, True)
+            for cases in [
+                # If both devices have the same state, nothing exciting should happen.
+                (
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.OFFLINE,
+                ),
+                # If the second device has a "lower" state it should fallback to it,
+                # except for "busy" which overrides.
+                (
+                    PresenceState.BUSY,
+                    PresenceState.ONLINE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.BUSY,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.BUSY,
+                    PresenceState.OFFLINE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.ONLINE,
+                    PresenceState.UNAVAILABLE,
+                ),
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.OFFLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.OFFLINE,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.OFFLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.OFFLINE,
+                ),
+                # If the second device has a "higher" state it should override.
+                (
+                    PresenceState.ONLINE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                    PresenceState.BUSY,
+                ),
+                (
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                    PresenceState.ONLINE,
+                ),
+                (
+                    PresenceState.OFFLINE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                    PresenceState.UNAVAILABLE,
+                ),
+            ]
+        ],
+        name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[4] else 'monolith'}",
+    )
+    @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
+    def test_set_presence_from_non_syncing_multi_device(
+        self,
+        dev_1_state: str,
+        dev_2_state: str,
+        expected_state_1: str,
+        expected_state_2: str,
+        test_with_workers: bool,
+    ) -> None:
+        """
+        Test the behaviour of multiple devices syncing at the same time.
+
+        Roughly the user's presence state should be set to the "highest" priority
+        of all the devices. When a device then goes offline its state should be
+        discarded and the next highest should win.
+
+        Note that these tests use the idle timer (and don't close the syncs), it
+        is unlikely that a *single* sync would last this long, but is close enough
+        to continually syncing with that current state.
+        """
+        user_id = f"@test:{self.hs.config.server.server_name}"
+
+        # By default, we call /sync against the main process.
+        worker_presence_handler = self.presence_handler
+        if test_with_workers:
+            # Create a worker and use it to handle /sync traffic instead.
+            # This is used to test that presence changes get replicated from workers
+            # to the main process correctly.
+            worker_to_sync_against = self.make_worker_hs(
+                "synapse.app.generic_worker", {"worker_name": "synchrotron"}
+            )
+            worker_presence_handler = worker_to_sync_against.get_presence_handler()
+
+        # 1. Sync with the first device.
+        sync_1 = self.get_success(
+            worker_presence_handler.user_syncing(
+                user_id,
+                "dev-1",
+                affect_presence=dev_1_state != PresenceState.OFFLINE,
+                presence_state=dev_1_state,
+            ),
+            by=0.1,
+        )
+
+        # 2. Sync with the second device.
+        sync_2 = self.get_success(
+            worker_presence_handler.user_syncing(
+                user_id,
+                "dev-2",
+                affect_presence=dev_2_state != PresenceState.OFFLINE,
+                presence_state=dev_2_state,
+            ),
+            by=0.1,
+        )
+
+        # 3. Assert the expected presence state.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, expected_state_1)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, expected_state_1)
+
+        # 4. Disconnect the first device.
+        with sync_1:
+            pass
+
+        # 5. Advance such that the first device should be discarded (the sync timeout),
+        # then pump so _handle_timeouts function to called.
+        self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
+        self.reactor.pump([5])
+
+        # 6. Assert the expected presence state.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, expected_state_2)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, expected_state_2)
+
+        # 7. Disconnect the second device.
+        with sync_2:
+            pass
+
+        # 8. Advance such that the second device should be discarded (the sync timeout),
+        # then pump so _handle_timeouts function to called.
+        if dev_1_state == PresenceState.BUSY or dev_2_state == PresenceState.BUSY:
+            timeout = BUSY_ONLINE_TIMEOUT
+        else:
+            timeout = SYNC_ONLINE_TIMEOUT
+        self.reactor.advance(timeout / 1000)
+        self.reactor.pump([5])
+
+        # 9. There are no more devices, should be offline.
+        state = self.get_success(
+            self.presence_handler.get_state(UserID.from_string(user_id))
+        )
+        self.assertEqual(state.state, PresenceState.OFFLINE)
+        if test_with_workers:
+            state = self.get_success(
+                worker_presence_handler.get_state(UserID.from_string(user_id))
+            )
+            self.assertEqual(state.state, PresenceState.OFFLINE)
+
     def test_set_presence_from_syncing_keeps_status(self) -> None:
         """Test that presence set by syncing retains status message"""
         status_msg = "I'm here!"
diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py
index 0d17f2fe5b..9f63fa6fa8 100644
--- a/tests/http/federation/test_matrix_federation_agent.py
+++ b/tests/http/federation/test_matrix_federation_agent.py
@@ -15,7 +15,7 @@ import base64
 import logging
 import os
 from typing import Generator, List, Optional, cast
-from unittest.mock import AsyncMock, patch
+from unittest.mock import AsyncMock, call, patch
 
 import treq
 from netaddr import IPSet
@@ -651,9 +651,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
         # .well-known request fails.
         self.reactor.pump((0.4,))
 
-        # now there should be a SRV lookup
-        self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.testserv1"
+        # now there should be two SRV lookups
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [call(b"_matrix-fed._tcp.testserv1"), call(b"_matrix._tcp.testserv1")]
         )
 
         # we should fall back to a direct connection
@@ -737,9 +737,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
         # .well-known request fails.
         self.reactor.pump((0.4,))
 
-        # now there should be a SRV lookup
-        self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.testserv"
+        # now there should be two SRV lookups
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")]
         )
 
         # we should fall back to a direct connection
@@ -788,9 +788,12 @@ class MatrixFederationAgentTests(unittest.TestCase):
             content=b'{ "m.server": "target-server" }',
         )
 
-        # there should be a SRV lookup
-        self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.target-server"
+        # there should be two SRV lookups
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [
+                call(b"_matrix-fed._tcp.target-server"),
+                call(b"_matrix._tcp.target-server"),
+            ]
         )
 
         # now we should get a connection to the target server
@@ -878,9 +881,12 @@ class MatrixFederationAgentTests(unittest.TestCase):
 
         self.reactor.pump((0.1,))
 
-        # there should be a SRV lookup
-        self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.target-server"
+        # there should be two SRV lookups
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [
+                call(b"_matrix-fed._tcp.target-server"),
+                call(b"_matrix._tcp.target-server"),
+            ]
         )
 
         # now we should get a connection to the target server
@@ -942,9 +948,9 @@ class MatrixFederationAgentTests(unittest.TestCase):
             client_factory, expected_sni=b"testserv", content=b"NOT JSON"
         )
 
-        # now there should be a SRV lookup
-        self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.testserv"
+        # now there should be two SRV lookups
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")]
         )
 
         # we should fall back to a direct connection
@@ -1016,14 +1022,14 @@ class MatrixFederationAgentTests(unittest.TestCase):
         # there should be no requests
         self.assertEqual(len(http_proto.requests), 0)
 
-        # and there should be a SRV lookup instead
-        self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.testserv"
+        # and there should be two SRV lookups instead
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")]
         )
 
     def test_get_hostname_srv(self) -> None:
         """
-        Test the behaviour when there is a single SRV record
+        Test the behaviour when there is a single SRV record for _matrix-fed.
         """
         self.agent = self._make_agent()
 
@@ -1039,7 +1045,51 @@ class MatrixFederationAgentTests(unittest.TestCase):
 
         # the request for a .well-known will have failed with a DNS lookup error.
         self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.testserv"
+            b"_matrix-fed._tcp.testserv"
+        )
+
+        # Make sure treq is trying to connect
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        (host, port, client_factory, _timeout, _bindAddress) = clients[0]
+        self.assertEqual(host, "1.2.3.4")
+        self.assertEqual(port, 8443)
+
+        # make a test server, and wire up the client
+        http_server = self._make_connection(client_factory, expected_sni=b"testserv")
+
+        self.assertEqual(len(http_server.requests), 1)
+        request = http_server.requests[0]
+        self.assertEqual(request.method, b"GET")
+        self.assertEqual(request.path, b"/foo/bar")
+        self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"])
+
+        # finish the request
+        request.finish()
+        self.reactor.pump((0.1,))
+        self.successResultOf(test_d)
+
+    def test_get_hostname_srv_legacy(self) -> None:
+        """
+        Test the behaviour when there is a single SRV record for _matrix.
+        """
+        self.agent = self._make_agent()
+
+        # Return no entries for the _matrix-fed lookup, and a response for _matrix.
+        self.mock_resolver.resolve_service.side_effect = [
+            [],
+            [Server(host=b"srvtarget", port=8443)],
+        ]
+        self.reactor.lookups["srvtarget"] = "1.2.3.4"
+
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
+
+        # Nothing happened yet
+        self.assertNoResult(test_d)
+
+        # the request for a .well-known will have failed with a DNS lookup error.
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")]
         )
 
         # Make sure treq is trying to connect
@@ -1065,7 +1115,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
 
     def test_get_well_known_srv(self) -> None:
         """Test the behaviour when the .well-known redirects to a place where there
-        is a SRV.
+        is a _matrix-fed SRV record.
         """
         self.agent = self._make_agent()
 
@@ -1096,7 +1146,72 @@ class MatrixFederationAgentTests(unittest.TestCase):
 
         # there should be a SRV lookup
         self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.target-server"
+            b"_matrix-fed._tcp.target-server"
+        )
+
+        # now we should get a connection to the target of the SRV record
+        self.assertEqual(len(clients), 2)
+        (host, port, client_factory, _timeout, _bindAddress) = clients[1]
+        self.assertEqual(host, "5.6.7.8")
+        self.assertEqual(port, 8443)
+
+        # make a test server, and wire up the client
+        http_server = self._make_connection(
+            client_factory, expected_sni=b"target-server"
+        )
+
+        self.assertEqual(len(http_server.requests), 1)
+        request = http_server.requests[0]
+        self.assertEqual(request.method, b"GET")
+        self.assertEqual(request.path, b"/foo/bar")
+        self.assertEqual(
+            request.requestHeaders.getRawHeaders(b"host"), [b"target-server"]
+        )
+
+        # finish the request
+        request.finish()
+        self.reactor.pump((0.1,))
+        self.successResultOf(test_d)
+
+    def test_get_well_known_srv_legacy(self) -> None:
+        """Test the behaviour when the .well-known redirects to a place where there
+        is a _matrix SRV record.
+        """
+        self.agent = self._make_agent()
+
+        self.reactor.lookups["testserv"] = "1.2.3.4"
+        self.reactor.lookups["srvtarget"] = "5.6.7.8"
+
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
+
+        # Nothing happened yet
+        self.assertNoResult(test_d)
+
+        # there should be an attempt to connect on port 443 for the .well-known
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        (host, port, client_factory, _timeout, _bindAddress) = clients[0]
+        self.assertEqual(host, "1.2.3.4")
+        self.assertEqual(port, 443)
+
+        # Return no entries for the _matrix-fed lookup, and a response for _matrix.
+        self.mock_resolver.resolve_service.side_effect = [
+            [],
+            [Server(host=b"srvtarget", port=8443)],
+        ]
+
+        self._handle_well_known_connection(
+            client_factory,
+            expected_sni=b"testserv",
+            content=b'{ "m.server": "target-server" }',
+        )
+
+        # there should be two SRV lookups
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [
+                call(b"_matrix-fed._tcp.target-server"),
+                call(b"_matrix._tcp.target-server"),
+            ]
         )
 
         # now we should get a connection to the target of the SRV record
@@ -1158,8 +1273,11 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.reactor.pump((0.4,))
 
         # now there should have been a SRV lookup
-        self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.xn--bcher-kva.com"
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [
+                call(b"_matrix-fed._tcp.xn--bcher-kva.com"),
+                call(b"_matrix._tcp.xn--bcher-kva.com"),
+            ]
         )
 
         # We should fall back to port 8448
@@ -1188,7 +1306,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.successResultOf(test_d)
 
     def test_idna_srv_target(self) -> None:
-        """test the behaviour when the target of a SRV record has idna chars"""
+        """test the behaviour when the target of a _matrix-fed SRV record has idna chars"""
         self.agent = self._make_agent()
 
         self.mock_resolver.resolve_service.return_value = [
@@ -1204,7 +1322,57 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.assertNoResult(test_d)
 
         self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.xn--bcher-kva.com"
+            b"_matrix-fed._tcp.xn--bcher-kva.com"
+        )
+
+        # Make sure treq is trying to connect
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        (host, port, client_factory, _timeout, _bindAddress) = clients[0]
+        self.assertEqual(host, "1.2.3.4")
+        self.assertEqual(port, 8443)
+
+        # make a test server, and wire up the client
+        http_server = self._make_connection(
+            client_factory, expected_sni=b"xn--bcher-kva.com"
+        )
+
+        self.assertEqual(len(http_server.requests), 1)
+        request = http_server.requests[0]
+        self.assertEqual(request.method, b"GET")
+        self.assertEqual(request.path, b"/foo/bar")
+        self.assertEqual(
+            request.requestHeaders.getRawHeaders(b"host"), [b"xn--bcher-kva.com"]
+        )
+
+        # finish the request
+        request.finish()
+        self.reactor.pump((0.1,))
+        self.successResultOf(test_d)
+
+    def test_idna_srv_target_legacy(self) -> None:
+        """test the behaviour when the target of a _matrix SRV record has idna chars"""
+        self.agent = self._make_agent()
+
+        # Return no entries for the _matrix-fed lookup, and a response for _matrix.
+        self.mock_resolver.resolve_service.side_effect = [
+            [],
+            [Server(host=b"xn--trget-3qa.com", port=8443)],
+        ]  # târget.com
+        self.reactor.lookups["xn--trget-3qa.com"] = "1.2.3.4"
+
+        test_d = self._make_get_request(
+            b"matrix-federation://xn--bcher-kva.com/foo/bar"
+        )
+
+        # Nothing happened yet
+        self.assertNoResult(test_d)
+
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [
+                call(b"_matrix-fed._tcp.xn--bcher-kva.com"),
+                call(b"_matrix._tcp.xn--bcher-kva.com"),
+            ]
         )
 
         # Make sure treq is trying to connect
@@ -1394,7 +1562,7 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.assertIsNone(r.delegated_server)
 
     def test_srv_fallbacks(self) -> None:
-        """Test that other SRV results are tried if the first one fails."""
+        """Test that other SRV results are tried if the first one fails for _matrix-fed SRV."""
         self.agent = self._make_agent()
 
         self.mock_resolver.resolve_service.return_value = [
@@ -1409,7 +1577,67 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.assertNoResult(test_d)
 
         self.mock_resolver.resolve_service.assert_called_once_with(
-            b"_matrix._tcp.testserv"
+            b"_matrix-fed._tcp.testserv"
+        )
+
+        # We should see an attempt to connect to the first server
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
+        self.assertEqual(host, "1.2.3.4")
+        self.assertEqual(port, 8443)
+
+        # Fonx the connection
+        client_factory.clientConnectionFailed(None, Exception("nope"))
+
+        # There's a 300ms delay in HostnameEndpoint
+        self.reactor.pump((0.4,))
+
+        # Hasn't failed yet
+        self.assertNoResult(test_d)
+
+        # We shouldnow see an attempt to connect to the second server
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
+        self.assertEqual(host, "1.2.3.4")
+        self.assertEqual(port, 8444)
+
+        # make a test server, and wire up the client
+        http_server = self._make_connection(client_factory, expected_sni=b"testserv")
+
+        self.assertEqual(len(http_server.requests), 1)
+        request = http_server.requests[0]
+        self.assertEqual(request.method, b"GET")
+        self.assertEqual(request.path, b"/foo/bar")
+        self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"])
+
+        # finish the request
+        request.finish()
+        self.reactor.pump((0.1,))
+        self.successResultOf(test_d)
+
+    def test_srv_fallbacks_legacy(self) -> None:
+        """Test that other SRV results are tried if the first one fails for _matrix SRV."""
+        self.agent = self._make_agent()
+
+        # Return no entries for the _matrix-fed lookup, and a response for _matrix.
+        self.mock_resolver.resolve_service.side_effect = [
+            [],
+            [
+                Server(host=b"target.com", port=8443),
+                Server(host=b"target.com", port=8444),
+            ],
+        ]
+        self.reactor.lookups["target.com"] = "1.2.3.4"
+
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
+
+        # Nothing happened yet
+        self.assertNoResult(test_d)
+
+        self.mock_resolver.resolve_service.assert_has_calls(
+            [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")]
         )
 
         # We should see an attempt to connect to the first server
@@ -1449,6 +1677,43 @@ class MatrixFederationAgentTests(unittest.TestCase):
         self.reactor.pump((0.1,))
         self.successResultOf(test_d)
 
+    def test_srv_no_fallback_to_legacy(self) -> None:
+        """Test that _matrix SRV results are not tried if the _matrix-fed one fails."""
+        self.agent = self._make_agent()
+
+        # Return a failing entry for _matrix-fed.
+        self.mock_resolver.resolve_service.side_effect = [
+            [Server(host=b"target.com", port=8443)],
+            [],
+        ]
+        self.reactor.lookups["target.com"] = "1.2.3.4"
+
+        test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar")
+
+        # Nothing happened yet
+        self.assertNoResult(test_d)
+
+        # Only the _matrix-fed is checked, _matrix is ignored.
+        self.mock_resolver.resolve_service.assert_called_once_with(
+            b"_matrix-fed._tcp.testserv"
+        )
+
+        # We should see an attempt to connect to the first server
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
+        self.assertEqual(host, "1.2.3.4")
+        self.assertEqual(port, 8443)
+
+        # Fonx the connection
+        client_factory.clientConnectionFailed(None, Exception("nope"))
+
+        # There's a 300ms delay in HostnameEndpoint
+        self.reactor.pump((0.4,))
+
+        # Failed to resolve a server.
+        self.assertFailure(test_d, Exception)
+
 
 class TestCachePeriodFromHeaders(unittest.TestCase):
     def test_cache_control(self) -> None: