diff options
51 files changed, 1634 insertions, 198 deletions
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0a01e82984..fb117380d0 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -35,7 +35,7 @@ jobs: steps: - uses: actions/checkout@v3 - name: Install Rust - uses: dtolnay/rust-toolchain@1.60.0 + uses: dtolnay/rust-toolchain@1.61.0 - uses: Swatinem/rust-cache@v2 - uses: matrix-org/setup-python-poetry@v1 with: @@ -93,7 +93,7 @@ jobs: uses: actions/checkout@v3 - name: Install Rust - uses: dtolnay/rust-toolchain@1.60.0 + uses: dtolnay/rust-toolchain@1.61.0 - uses: Swatinem/rust-cache@v2 - name: Setup Poetry @@ -150,7 +150,7 @@ jobs: with: ref: ${{ github.event.pull_request.head.sha }} - name: Install Rust - uses: dtolnay/rust-toolchain@1.60.0 + uses: dtolnay/rust-toolchain@1.61.0 - uses: Swatinem/rust-cache@v2 - uses: matrix-org/setup-python-poetry@v1 with: @@ -167,7 +167,7 @@ jobs: - uses: actions/checkout@v3 - name: Install Rust - uses: dtolnay/rust-toolchain@1.60.0 + uses: dtolnay/rust-toolchain@1.61.0 with: components: clippy - uses: Swatinem/rust-cache@v2 @@ -268,7 +268,7 @@ jobs: postgres:${{ matrix.job.postgres-version }} - name: Install Rust - uses: dtolnay/rust-toolchain@1.60.0 + uses: dtolnay/rust-toolchain@1.61.0 - uses: Swatinem/rust-cache@v2 - uses: matrix-org/setup-python-poetry@v1 @@ -308,7 +308,7 @@ jobs: - uses: actions/checkout@v3 - name: Install Rust - uses: dtolnay/rust-toolchain@1.60.0 + uses: dtolnay/rust-toolchain@1.61.0 - uses: Swatinem/rust-cache@v2 # There aren't wheels for some of the older deps, so we need to install @@ -416,7 +416,7 @@ jobs: run: cat sytest-blacklist .ci/worker-blacklist > synapse-blacklist-with-workers - name: Install Rust - uses: dtolnay/rust-toolchain@1.60.0 + uses: dtolnay/rust-toolchain@1.61.0 - uses: Swatinem/rust-cache@v2 - name: Run SyTest @@ -556,7 +556,7 @@ jobs: path: synapse - name: Install Rust - uses: dtolnay/rust-toolchain@1.60.0 + uses: dtolnay/rust-toolchain@1.61.0 - uses: Swatinem/rust-cache@v2 - uses: actions/setup-go@v4 @@ -584,7 +584,7 @@ jobs: - uses: actions/checkout@v3 - name: Install Rust - uses: dtolnay/rust-toolchain@1.60.0 + uses: dtolnay/rust-toolchain@1.61.0 - uses: Swatinem/rust-cache@v2 - run: cargo test diff --git a/Cargo.lock b/Cargo.lock index 4d60f8dcb6..95a713e437 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,9 +138,9 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "memchr" -version = "2.5.0" +version = "2.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "memoffset" @@ -291,9 +291,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.9.4" +version = "1.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12de2eff854e5fa4b1295edd650e227e9d8fb0c9e90b12e7f36d6a6811791a29" +checksum = "697061221ea1b4a94a624f67d0ae2bfe4e22b8a17b6a192afb11046542cc8c47" dependencies = [ "aho-corasick", "memchr", @@ -303,9 +303,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.3.7" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49530408a136e16e5b486e883fbb6ba058e8e4e8ae6621a77b048b314336e629" +checksum = "c2f401f4955220693b56f8ec66ee9c78abffd8d1c4f23dc41a23839eb88f0795" dependencies = [ "aho-corasick", "memchr", diff --git a/changelog.d/15997.misc b/changelog.d/15997.misc new file mode 100644 index 0000000000..94768c3cb8 --- /dev/null +++ b/changelog.d/15997.misc @@ -0,0 +1 @@ +Allow modules to delete rooms. \ No newline at end of file diff --git a/changelog.d/16066.bugfix b/changelog.d/16066.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16066.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/changelog.d/16090.misc b/changelog.d/16090.misc new file mode 100644 index 0000000000..d54ef936c7 --- /dev/null +++ b/changelog.d/16090.misc @@ -0,0 +1 @@ +Add GCC and GNU Make to the Nix flake development environment so that `ruff` can be compiled. \ No newline at end of file diff --git a/changelog.d/16137.feature b/changelog.d/16137.feature new file mode 100644 index 0000000000..bba6f161cd --- /dev/null +++ b/changelog.d/16137.feature @@ -0,0 +1 @@ +Support resolving homeservers using `matrix-fed` DNS SRV records from [MSC4040](https://github.com/matrix-org/matrix-spec-proposals/pull/4040). diff --git a/changelog.d/16170.bugfix b/changelog.d/16170.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16170.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/changelog.d/16171.bugfix b/changelog.d/16171.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16171.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/changelog.d/16172.bugfix b/changelog.d/16172.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16172.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/changelog.d/16174.bugfix b/changelog.d/16174.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16174.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/changelog.d/16219.feature b/changelog.d/16219.feature new file mode 100644 index 0000000000..c789f2abb7 --- /dev/null +++ b/changelog.d/16219.feature @@ -0,0 +1 @@ +Add the ability to use `G` (GiB) and `T` (TiB) suffixes in configuration options that refer to numbers of bytes. \ No newline at end of file diff --git a/changelog.d/16235.misc b/changelog.d/16235.misc new file mode 100644 index 0000000000..b1533f93b6 --- /dev/null +++ b/changelog.d/16235.misc @@ -0,0 +1 @@ +Fix type checking when using the new version of Twisted. diff --git a/changelog.d/16240.misc b/changelog.d/16240.misc new file mode 100644 index 0000000000..4f266c1fb0 --- /dev/null +++ b/changelog.d/16240.misc @@ -0,0 +1 @@ +Delete device messages asynchronously and in staged batches using the task scheduler. diff --git a/changelog.d/16248.misc b/changelog.d/16248.misc new file mode 100644 index 0000000000..0a5ed6dccb --- /dev/null +++ b/changelog.d/16248.misc @@ -0,0 +1 @@ +Bump minimum supported Rust version to 1.61.0. diff --git a/changelog.d/16251.bugfix b/changelog.d/16251.bugfix new file mode 100644 index 0000000000..6d3157c7aa --- /dev/null +++ b/changelog.d/16251.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where appservices using MSC2409 to receive to_device messages, would only get messages for one user. \ No newline at end of file diff --git a/changelog.d/16257.bugfix b/changelog.d/16257.bugfix new file mode 100644 index 0000000000..28a5319749 --- /dev/null +++ b/changelog.d/16257.bugfix @@ -0,0 +1 @@ +Fix long-standing bug where we kept re-requesting a remote server's key repeatedly, potentially causing delays in receiving events over federation. diff --git a/changelog.d/16260.misc b/changelog.d/16260.misc new file mode 100644 index 0000000000..9f3289d7d4 --- /dev/null +++ b/changelog.d/16260.misc @@ -0,0 +1 @@ +Update rust to version 1.71.1 in the nix development environment. \ No newline at end of file diff --git a/changelog.d/16263.misc b/changelog.d/16263.misc new file mode 100644 index 0000000000..d54ef936c7 --- /dev/null +++ b/changelog.d/16263.misc @@ -0,0 +1 @@ +Add GCC and GNU Make to the Nix flake development environment so that `ruff` can be compiled. \ No newline at end of file diff --git a/docs/upgrade.md b/docs/upgrade.md index f50a279e98..2f888b6f12 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -88,6 +88,14 @@ process, for example: dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.93.0 + +## Minimum supported Rust version +The minimum supported Rust version has been increased from v1.60.0 to v1.61.0. +Users building from source will need to ensure their `rustc` version is up to +date. + + # Upgrading to v1.90.0 ## App service query parameter authorization is now a configuration option diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 0b1725816e..97fd1beb39 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -25,8 +25,10 @@ messages from the database after 5 minutes, rather than 5 months. In addition, configuration options referring to size use the following suffixes: -* `M` = MiB, or 1,048,576 bytes * `K` = KiB, or 1024 bytes +* `M` = MiB, or 1,048,576 bytes +* `G` = GiB, or 1,073,741,824 bytes +* `T` = TiB, or 1,099,511,627,776 bytes For example, setting `max_avatar_size: 10M` means that Synapse will not accept files larger than 10,485,760 bytes for a user avatar. diff --git a/flake.lock b/flake.lock index d53be767a7..9b360fa33e 100644 --- a/flake.lock +++ b/flake.lock @@ -258,11 +258,11 @@ "nixpkgs": "nixpkgs_3" }, "locked": { - "lastModified": 1690510705, - "narHash": "sha256-6mjs3Gl9/xrseFh9iNcNq1u5yJ/MIoAmjoaG7SXZDIE=", + "lastModified": 1693966243, + "narHash": "sha256-a2CA1aMIPE67JWSVIGoGtD3EGlFdK9+OlJQs0FOWCKY=", "owner": "oxalica", "repo": "rust-overlay", - "rev": "851ae4c128905a62834d53ce7704ebc1ba481bea", + "rev": "a8b4bb4cbb744baaabc3e69099f352f99164e2c1", "type": "github" }, "original": { diff --git a/flake.nix b/flake.nix index b89b6d9218..31f2832939 100644 --- a/flake.nix +++ b/flake.nix @@ -82,7 +82,7 @@ # # NOTE: We currently need to set the Rust version unnecessarily high # in order to work around https://github.com/matrix-org/synapse/issues/15939 - (rust-bin.stable."1.70.0".default.override { + (rust-bin.stable."1.71.1".default.override { # Additionally install the "rust-src" extension to allow diving into the # Rust source code in an IDE (rust-analyzer will also make use of it). extensions = [ "rust-src" ]; @@ -90,6 +90,11 @@ # The rust-analyzer language server implementation. rust-analyzer + # GCC includes a linker; needed for building `ruff` + gcc + # Needed for building `ruff` + gnumake + # Native dependencies for running Synapse. icu libffi @@ -236,6 +241,19 @@ URI YAMLLibYAML ]}"; + + # Clear the LD_LIBRARY_PATH environment variable on shell init. + # + # By default, devenv will set LD_LIBRARY_PATH to point to .devenv/profile/lib. This causes + # issues when we include `gcc` as a dependency to build C libraries, as the version of glibc + # that the development environment's cc compiler uses may differ from that of the system. + # + # When LD_LIBRARY_PATH is set, system tools will attempt to use the development environment's + # libraries. Which, when built against a different glibc version lead, to "version 'GLIBC_X.YY' + # not found" errors. + enterShell = '' + unset LD_LIBRARY_PATH + ''; } ]; }; diff --git a/poetry.lock b/poetry.lock index 1cefabb358..872a863edc 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2866,44 +2866,43 @@ urllib3 = ">=1.26.0" [[package]] name = "twisted" -version = "22.10.0" +version = "23.8.0" description = "An asynchronous networking framework written in Python" optional = false python-versions = ">=3.7.1" files = [ - {file = "Twisted-22.10.0-py3-none-any.whl", hash = "sha256:86c55f712cc5ab6f6d64e02503352464f0400f66d4f079096d744080afcccbd0"}, - {file = "Twisted-22.10.0.tar.gz", hash = "sha256:32acbd40a94f5f46e7b42c109bfae2b302250945561783a8b7a059048f2d4d31"}, + {file = "twisted-23.8.0-py3-none-any.whl", hash = "sha256:b8bdba145de120ffb36c20e6e071cce984e89fba798611ed0704216fb7f884cd"}, + {file = "twisted-23.8.0.tar.gz", hash = "sha256:3c73360add17336a622c0d811c2a2ce29866b6e59b1125fd6509b17252098a24"}, ] [package.dependencies] -attrs = ">=19.2.0" -Automat = ">=0.8.0" +attrs = ">=21.3.0" +automat = ">=0.8.0" constantly = ">=15.1" hyperlink = ">=17.1.1" idna = {version = ">=2.4", optional = true, markers = "extra == \"tls\""} -incremental = ">=21.3.0" +incremental = ">=22.10.0" pyopenssl = {version = ">=21.0.0", optional = true, markers = "extra == \"tls\""} service-identity = {version = ">=18.1.0", optional = true, markers = "extra == \"tls\""} twisted-iocpsupport = {version = ">=1.0.2,<2", markers = "platform_system == \"Windows\""} -typing-extensions = ">=3.6.5" -"zope.interface" = ">=4.4.2" +typing-extensions = ">=3.10.0" +zope-interface = ">=5" [package.extras] -all-non-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"] -conch = ["appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "cryptography (>=2.6)", "pyasn1"] -conch-nacl = ["PyNaCl", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "cryptography (>=2.6)", "pyasn1"] +all-non-platform = ["twisted[conch,contextvars,http2,serial,test,tls]", "twisted[conch,contextvars,http2,serial,test,tls]"] +conch = ["appdirs (>=1.4.0)", "bcrypt (>=3.1.3)", "cryptography (>=3.3)"] contextvars = ["contextvars (>=2.4,<3)"] -dev = ["coverage (>=6b1,<7)", "pydoctor (>=22.9.0,<22.10.0)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=5.0,<6)", "sphinx-rtd-theme (>=1.0,<2.0)", "towncrier (>=22.8,<23.0)", "twistedchecker (>=0.7,<1.0)"] -dev-release = ["pydoctor (>=22.9.0,<22.10.0)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "sphinx (>=5.0,<6)", "sphinx-rtd-theme (>=1.0,<2.0)", "towncrier (>=22.8,<23.0)"] -gtk-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pygobject", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"] +dev = ["coverage (>=6b1,<7)", "pyflakes (>=2.2,<3.0)", "python-subunit (>=1.4,<2.0)", "twisted[dev-release]", "twistedchecker (>=0.7,<1.0)"] +dev-release = ["pydoctor (>=23.4.0,<23.5.0)", "pydoctor (>=23.4.0,<23.5.0)", "readthedocs-sphinx-ext (>=2.2,<3.0)", "readthedocs-sphinx-ext (>=2.2,<3.0)", "sphinx (>=5,<7)", "sphinx (>=5,<7)", "sphinx-rtd-theme (>=1.2,<2.0)", "sphinx-rtd-theme (>=1.2,<2.0)", "towncrier (>=22.12,<23.0)", "towncrier (>=22.12,<23.0)", "urllib3 (<2)", "urllib3 (<2)"] +gtk-platform = ["pygobject", "pygobject", "twisted[all-non-platform]", "twisted[all-non-platform]"] http2 = ["h2 (>=3.0,<5.0)", "priority (>=1.1.0,<2.0)"] -macos-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyobjc-core", "pyobjc-framework-CFNetwork", "pyobjc-framework-Cocoa", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"] -mypy = ["PyHamcrest (>=1.9.0)", "PyNaCl", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "coverage (>=6b1,<7)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "mypy (==0.930)", "mypy-zope (==0.3.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pydoctor (>=22.9.0,<22.10.0)", "pyflakes (>=2.2,<3.0)", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "python-subunit (>=1.4,<2.0)", "pywin32 (!=226)", "readthedocs-sphinx-ext (>=2.1,<3.0)", "service-identity (>=18.1.0)", "sphinx (>=5.0,<6)", "sphinx-rtd-theme (>=1.0,<2.0)", "towncrier (>=22.8,<23.0)", "twistedchecker (>=0.7,<1.0)", "types-pyOpenSSL", "types-setuptools"] -osx-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyobjc-core", "pyobjc-framework-CFNetwork", "pyobjc-framework-Cocoa", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "service-identity (>=18.1.0)"] +macos-platform = ["pyobjc-core", "pyobjc-core", "pyobjc-framework-cfnetwork", "pyobjc-framework-cfnetwork", "pyobjc-framework-cocoa", "pyobjc-framework-cocoa", "twisted[all-non-platform]", "twisted[all-non-platform]"] +mypy = ["mypy (==0.981)", "mypy-extensions (==0.4.3)", "mypy-zope (==0.3.11)", "twisted[all-non-platform,dev]", "types-pyopenssl", "types-setuptools"] +osx-platform = ["twisted[macos-platform]", "twisted[macos-platform]"] serial = ["pyserial (>=3.0)", "pywin32 (!=226)"] -test = ["PyHamcrest (>=1.9.0)", "cython-test-exception-raiser (>=1.0.2,<2)", "hypothesis (>=6.0,<7.0)"] +test = ["cython-test-exception-raiser (>=1.0.2,<2)", "hypothesis (>=6.56)", "pyhamcrest (>=2)"] tls = ["idna (>=2.4)", "pyopenssl (>=21.0.0)", "service-identity (>=18.1.0)"] -windows-platform = ["PyHamcrest (>=1.9.0)", "appdirs (>=1.4.0)", "bcrypt (>=3.0.0)", "contextvars (>=2.4,<3)", "cryptography (>=2.6)", "cython-test-exception-raiser (>=1.0.2,<2)", "h2 (>=3.0,<5.0)", "hypothesis (>=6.0,<7.0)", "idna (>=2.4)", "priority (>=1.1.0,<2.0)", "pyasn1", "pyopenssl (>=21.0.0)", "pyserial (>=3.0)", "pywin32 (!=226)", "pywin32 (!=226)", "service-identity (>=18.1.0)"] +windows-platform = ["pywin32 (!=226)", "pywin32 (!=226)", "twisted[all-non-platform]", "twisted[all-non-platform]"] [[package]] name = "twisted-iocpsupport" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 3ead01c052..16917136db 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,7 +7,7 @@ name = "synapse" version = "0.1.0" edition = "2021" -rust-version = "1.60.0" +rust-version = "1.61.0" [lib] name = "synapse" @@ -23,7 +23,12 @@ name = "synapse.synapse_rust" anyhow = "1.0.63" lazy_static = "1.4.0" log = "0.4.17" -pyo3 = { version = "0.17.1", features = ["macros", "anyhow", "abi3", "abi3-py37"] } +pyo3 = { version = "0.17.1", features = [ + "macros", + "anyhow", + "abi3", + "abi3-py37", +] } pyo3-log = "0.8.1" pythonize = "0.17.0" regex = "1.6.0" diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index 5ad334b4d8..e8baeac5e2 100755 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -329,6 +329,17 @@ class MatrixConnectionAdapter(HTTPAdapter): raise ValueError("Invalid host:port '%s'" % (server_name,)) return out[0], port, out[0] + # Look up SRV for Matrix 1.8 `matrix-fed` service first + try: + srv = srvlookup.lookup("matrix-fed", "tcp", server_name)[0] + print( + f"SRV lookup on _matrix-fed._tcp.{server_name} gave {srv}", + file=sys.stderr, + ) + return srv.host, srv.port, server_name + except Exception: + pass + # Fall back to deprecated `matrix` service try: srv = srvlookup.lookup("matrix", "tcp", server_name)[0] print( @@ -337,6 +348,7 @@ class MatrixConnectionAdapter(HTTPAdapter): ) return srv.host, srv.port, server_name except Exception: + # Fall even further back to just port 8448 return server_name, 8448, server_name @staticmethod diff --git a/synapse/api/presence.py b/synapse/api/presence.py index b80aa83cb3..b78f419994 100644 --- a/synapse/api/presence.py +++ b/synapse/api/presence.py @@ -20,18 +20,53 @@ from synapse.api.constants import PresenceState from synapse.types import JsonDict +@attr.s(slots=True, auto_attribs=True) +class UserDevicePresenceState: + """ + Represents the current presence state of a user's device. + + user_id: The user ID. + device_id: The user's device ID. + state: The presence state, see PresenceState. + last_active_ts: Time in msec that the device last interacted with server. + last_sync_ts: Time in msec that the device last *completed* a sync + (or event stream). + """ + + user_id: str + device_id: Optional[str] + state: str + last_active_ts: int + last_sync_ts: int + + @classmethod + def default( + cls, user_id: str, device_id: Optional[str] + ) -> "UserDevicePresenceState": + """Returns a default presence state.""" + return cls( + user_id=user_id, + device_id=device_id, + state=PresenceState.OFFLINE, + last_active_ts=0, + last_sync_ts=0, + ) + + @attr.s(slots=True, frozen=True, auto_attribs=True) class UserPresenceState: """Represents the current presence state of the user. - user_id - last_active: Time in msec that the user last interacted with server. - last_federation_update: Time in msec since either a) we sent a presence + user_id: The user ID. + state: The presence state, see PresenceState. + last_active_ts: Time in msec that the user last interacted with server. + last_federation_update_ts: Time in msec since either a) we sent a presence update to other servers or b) we received a presence update, depending on if is a local user or not. - last_user_sync: Time in msec that the user last *completed* a sync + last_user_sync_ts: Time in msec that the user last *completed* a sync (or event stream). status_msg: User set status message. + currently_active: True if the user is currently syncing. """ user_id: str diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 69a8318127..58856839e1 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -179,8 +179,9 @@ class Config: If an integer is provided it is treated as bytes and is unchanged. - String byte sizes can have a suffix of 'K' or `M`, representing kibibytes and - mebibytes respectively. No suffix is understood as a plain byte count. + String byte sizes can have a suffix of 'K', `M`, `G` or `T`, + representing kibibytes, mebibytes, gibibytes and tebibytes respectively. + No suffix is understood as a plain byte count. Raises: TypeError, if given something other than an integer or a string @@ -189,7 +190,7 @@ class Config: if type(value) is int: # noqa: E721 return value elif isinstance(value, str): - sizes = {"K": 1024, "M": 1024 * 1024} + sizes = {"K": 1024, "M": 1024 * 1024, "G": 1024**3, "T": 1024**4} size = 1 suffix = value[-1] if suffix in sizes: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 763f56dfc1..9e52af5f13 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -43,9 +43,12 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.types import ( JsonDict, + JsonMapping, + ScheduledTask, StrCollection, StreamKeyType, StreamToken, + TaskStatus, UserID, get_domain_from_id, get_verify_key_from_cross_signing_key, @@ -62,6 +65,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages" MAX_DEVICE_DISPLAY_NAME_LEN = 100 DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000 @@ -78,6 +82,7 @@ class DeviceWorkerHandler: self._appservice_handler = hs.get_application_service_handler() self._state_storage = hs.get_storage_controllers().state self._auth_handler = hs.get_auth_handler() + self._event_sources = hs.get_event_sources() self.server_name = hs.hostname self._msc3852_enabled = hs.config.experimental.msc3852_enabled self._query_appservices_for_keys = ( @@ -386,6 +391,7 @@ class DeviceHandler(DeviceWorkerHandler): self._account_data_handler = hs.get_account_data_handler() self._storage_controllers = hs.get_storage_controllers() self.db_pool = hs.get_datastores().main.db_pool + self._task_scheduler = hs.get_task_scheduler() self.device_list_updater = DeviceListUpdater(hs, self) @@ -419,6 +425,10 @@ class DeviceHandler(DeviceWorkerHandler): self._delete_stale_devices, ) + self._task_scheduler.register_action( + self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME + ) + def _check_device_name_length(self, name: Optional[str]) -> None: """ Checks whether a device name is longer than the maximum allowed length. @@ -530,6 +540,7 @@ class DeviceHandler(DeviceWorkerHandler): user_id: The user to delete devices from. device_ids: The list of device IDs to delete """ + to_device_stream_id = self._event_sources.get_current_token().to_device_key try: await self.store.delete_devices(user_id, device_ids) @@ -559,12 +570,49 @@ class DeviceHandler(DeviceWorkerHandler): f"org.matrix.msc3890.local_notification_settings.{device_id}", ) + # Delete device messages asynchronously and in batches using the task scheduler + await self._task_scheduler.schedule_task( + DELETE_DEVICE_MSGS_TASK_NAME, + resource_id=device_id, + params={ + "user_id": user_id, + "device_id": device_id, + "up_to_stream_id": to_device_stream_id, + }, + ) + # Pushers are deleted after `delete_access_tokens_for_user` is called so that # modules using `on_logged_out` hook can use them if needed. await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids) await self.notify_device_update(user_id, device_ids) + DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 + + async def _delete_device_messages( + self, + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`.""" + assert task.params is not None + user_id = task.params["user_id"] + device_id = task.params["device_id"] + up_to_stream_id = task.params["up_to_stream_id"] + + res = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=up_to_stream_id, + limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, + ) + + if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + return TaskStatus.COMPLETE, None, None + else: + # There is probably still device messages to be deleted, let's keep the task active and it will be run + # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). + return TaskStatus.ACTIVE, None, None + async def update_device(self, user_id: str, device_id: str, content: dict) -> None: """Update the given device diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index b3be7a86f0..5dc76ef588 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, List, Optional, Tuple from synapse.api.constants import ( AccountDataTypes, @@ -23,7 +23,6 @@ from synapse.api.constants import ( Membership, ) from synapse.api.errors import SynapseError -from synapse.events import EventBase from synapse.events.utils import SerializeEventConfig from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state @@ -35,7 +34,6 @@ from synapse.types import ( JsonDict, Requester, RoomStreamToken, - StateMap, StreamKeyType, StreamToken, UserID, @@ -199,9 +197,7 @@ class InitialSyncHandler: deferred_room_state = run_in_background( self._state_storage_controller.get_state_for_events, [event.event_id], - ).addCallback( - lambda states: cast(StateMap[EventBase], states[event.event_id]) - ) + ).addCallback(lambda states: states[event.event_id]) (messages, token), current_state = await make_deferred_yieldable( gather_results( diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index e5ac9096cc..19cf5a2b43 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -713,7 +713,7 @@ class PaginationHandler: self, delete_id: str, room_id: str, - requester_user_id: str, + requester_user_id: Optional[str], new_room_user_id: Optional[str] = None, new_room_name: Optional[str] = None, message: Optional[str] = None, @@ -732,6 +732,10 @@ class PaginationHandler: requester_user_id: User who requested the action. Will be recorded as putting the room on the blocking list. + If None, the action was not manually requested but instead + triggered automatically, e.g. through a Synapse module + or some other policy. + MUST NOT be None if block=True. new_room_user_id: If set, a new room will be created with this user ID as the creator and admin, and all users in the old room will be @@ -818,7 +822,7 @@ class PaginationHandler: def start_shutdown_and_purge_room( self, room_id: str, - requester_user_id: str, + requester_user_id: Optional[str], new_room_user_id: Optional[str] = None, new_room_name: Optional[str] = None, message: Optional[str] = None, @@ -833,6 +837,10 @@ class PaginationHandler: requester_user_id: User who requested the action and put the room on the blocking list. + If None, the action was not manually requested but instead + triggered automatically, e.g. through a Synapse module + or some other policy. + MUST NOT be None if block=True. new_room_user_id: If set, a new room will be created with this user ID as the creator and admin, and all users in the old room will be diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f31e18328b..375c7d0901 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -13,13 +13,56 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""This module is responsible for keeping track of presence status of local +""" +This module is responsible for keeping track of presence status of local and remote users. The methods that define policy are: - PresenceHandler._update_states - PresenceHandler._handle_timeouts - should_notify + +# Tracking local presence + +For local users, presence is tracked on a per-device basis. When a user has multiple +devices the user presence state is derived by coalescing the presence from each +device: + + BUSY > ONLINE > UNAVAILABLE > OFFLINE + +The time that each device was last active and last synced is tracked in order to +automatically downgrade a device's presence state: + + A device may move from ONLINE -> UNAVAILABLE, if it has not been active for + a period of time. + + A device may go from any state -> OFFLINE, if it is not active and has not + synced for a period of time. + +The timeouts are handled using a wheel timer, which has coarse buckets. Timings +do not need to be exact. + +Generally a device's presence state is updated whenever a user syncs (via the +set_presence parameter), when the presence API is called, or if "pro-active" +events occur, including: + +* Sending an event, receipt, read marker. +* Updating typing status. + +The busy state has special status that it cannot is not downgraded by a call to +sync with a lower priority state *and* it takes a long period of time to transition +to offline. + +# Persisting (and restoring) presence + +For all users, presence is persisted on a per-user basis. Data is kept in-memory +and persisted periodically. When Synapse starts each worker loads the current +presence state and then tracks the presence stream to keep itself up-to-date. + +When restoring presence for local users a pseudo-device is created to match the +user state; this device follows the normal timeout logic (see above) and will +automatically be replaced with any information from currently available devices. + """ import abc import contextlib @@ -30,6 +73,7 @@ from contextlib import contextmanager from types import TracebackType from typing import ( TYPE_CHECKING, + AbstractSet, Any, Callable, Collection, @@ -49,7 +93,7 @@ from prometheus_client import Counter import synapse.metrics from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError -from synapse.api.presence import UserPresenceState +from synapse.api.presence import UserDevicePresenceState, UserPresenceState from synapse.appservice import ApplicationService from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background @@ -111,6 +155,8 @@ LAST_ACTIVE_GRANULARITY = 60 * 1000 # How long to wait until a new /events or /sync request before assuming # the client has gone. SYNC_ONLINE_TIMEOUT = 30 * 1000 +# Busy status waits longer, but does eventually go offline. +BUSY_ONLINE_TIMEOUT = 60 * 60 * 1000 # How long to wait before marking the user as idle. Compared against last active IDLE_TIMER = 5 * 60 * 1000 @@ -137,6 +183,7 @@ class BasePresenceHandler(abc.ABC): writer""" def __init__(self, hs: "HomeServer"): + self.hs = hs self.clock = hs.get_clock() self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() @@ -162,6 +209,7 @@ class BasePresenceHandler(abc.ABC): self.VALID_PRESENCE += (PresenceState.BUSY,) active_presence = self.store.take_presence_startup_info() + # The combined status across all user devices. self.user_to_current_state = {state.user_id: state for state in active_presence} @abc.abstractmethod @@ -426,8 +474,6 @@ class _NullContextManager(ContextManager[None]): class WorkerPresenceHandler(BasePresenceHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.hs = hs - self._presence_writer_instance = hs.config.worker.writers.presence[0] # Route presence EDUs to the right worker @@ -691,7 +737,6 @@ class WorkerPresenceHandler(BasePresenceHandler): class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.hs = hs self.wheel_timer: WheelTimer[str] = WheelTimer() self.notifier = hs.get_notifier() @@ -708,9 +753,27 @@ class PresenceHandler(BasePresenceHandler): lambda: len(self.user_to_current_state), ) + # The per-device presence state, maps user to devices to per-device presence state. + self._user_to_device_to_current_state: Dict[ + str, Dict[Optional[str], UserDevicePresenceState] + ] = {} + now = self.clock.time_msec() if self._presence_enabled: for state in self.user_to_current_state.values(): + # Create a psuedo-device to properly handle time outs. This will + # be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT. + pseudo_device_id = None + self._user_to_device_to_current_state[state.user_id] = { + pseudo_device_id: UserDevicePresenceState( + user_id=state.user_id, + device_id=pseudo_device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) + } + self.wheel_timer.insert( now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER ) @@ -752,7 +815,7 @@ class PresenceHandler(BasePresenceHandler): # Keeps track of the number of *ongoing* syncs on other processes. # - # While any sync is ongoing on another process the user will never + # While any sync is ongoing on another process the user's device will never # go offline. # # Each process has a unique identifier and an update frequency. If @@ -981,22 +1044,21 @@ class PresenceHandler(BasePresenceHandler): timers_fired_counter.inc(len(states)) - syncing_user_ids = { - user_id - for (user_id, _), count in self._user_device_to_num_current_syncs.items() + # Set of user ID & device IDs which are currently syncing. + syncing_user_devices = { + user_id_device_id + for user_id_device_id, count in self._user_device_to_num_current_syncs.items() if count } - syncing_user_ids.update( - user_id - for user_id, _ in itertools.chain( - *self.external_process_to_current_syncs.values() - ) + syncing_user_devices.update( + itertools.chain(*self.external_process_to_current_syncs.values()) ) changes = handle_timeouts( states, is_mine_fn=self.is_mine_id, - syncing_user_ids=syncing_user_ids, + syncing_user_devices=syncing_user_devices, + user_to_devices=self._user_to_device_to_current_state, now=now, ) @@ -1016,11 +1078,26 @@ class PresenceHandler(BasePresenceHandler): bump_active_time_counter.inc() - prev_state = await self.current_state_for_user(user_id) + now = self.clock.time_msec() - new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()} - if prev_state.state == PresenceState.UNAVAILABLE: - new_fields["state"] = PresenceState.ONLINE + # Update the device information & mark the device as online if it was + # unavailable. + devices = self._user_to_device_to_current_state.setdefault(user_id, {}) + device_state = devices.setdefault( + device_id, + UserDevicePresenceState.default(user_id, device_id), + ) + device_state.last_active_ts = now + if device_state.state == PresenceState.UNAVAILABLE: + device_state.state = PresenceState.ONLINE + + # Update the user state, this will always update last_active_ts and + # might update the presence state. + prev_state = await self.current_state_for_user(user_id) + new_fields: Dict[str, Any] = { + "last_active_ts": now, + "state": _combine_device_states(devices.values()), + } await self._update_states([prev_state.copy_and_replace(**new_fields)]) @@ -1132,6 +1209,12 @@ class PresenceHandler(BasePresenceHandler): if is_syncing and (user_id, device_id) not in process_presence: process_presence.add((user_id, device_id)) elif not is_syncing and (user_id, device_id) in process_presence: + devices = self._user_to_device_to_current_state.setdefault(user_id, {}) + device_state = devices.setdefault( + device_id, UserDevicePresenceState.default(user_id, device_id) + ) + device_state.last_sync_ts = sync_time_msec + new_state = prev_state.copy_and_replace( last_user_sync_ts=sync_time_msec ) @@ -1151,11 +1234,24 @@ class PresenceHandler(BasePresenceHandler): process_presence = self.external_process_to_current_syncs.pop( process_id, set() ) - prev_states = await self.current_state_for_users( - {user_id for user_id, device_id in process_presence} - ) + time_now_ms = self.clock.time_msec() + # Mark each device as having a last sync time. + updated_users = set() + for user_id, device_id in process_presence: + device_state = self._user_to_device_to_current_state.setdefault( + user_id, {} + ).setdefault( + device_id, UserDevicePresenceState.default(user_id, device_id) + ) + + device_state.last_sync_ts = time_now_ms + updated_users.add(user_id) + + # Update each user (and insert into the appropriate timers to check if + # they've gone offline). + prev_states = await self.current_state_for_users(updated_users) await self._update_states( [ prev_state.copy_and_replace(last_user_sync_ts=time_now_ms) @@ -1277,6 +1373,20 @@ class PresenceHandler(BasePresenceHandler): if prev_state.state == PresenceState.BUSY and is_sync: presence = PresenceState.BUSY + # Update the device specific information. + devices = self._user_to_device_to_current_state.setdefault(user_id, {}) + device_state = devices.setdefault( + device_id, + UserDevicePresenceState.default(user_id, device_id), + ) + device_state.state = presence + device_state.last_active_ts = now + if is_sync: + device_state.last_sync_ts = now + + # Based on the state of each user's device calculate the new presence state. + presence = _combine_device_states(devices.values()) + new_fields = {"state": presence} if presence == PresenceState.ONLINE or presence == PresenceState.BUSY: @@ -1873,7 +1983,8 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): def handle_timeouts( user_states: List[UserPresenceState], is_mine_fn: Callable[[str], bool], - syncing_user_ids: Set[str], + syncing_user_devices: AbstractSet[Tuple[str, Optional[str]]], + user_to_devices: Dict[str, Dict[Optional[str], UserDevicePresenceState]], now: int, ) -> List[UserPresenceState]: """Checks the presence of users that have timed out and updates as @@ -1882,7 +1993,8 @@ def handle_timeouts( Args: user_states: List of UserPresenceState's to check. is_mine_fn: Function that returns if a user_id is ours - syncing_user_ids: Set of user_ids with active syncs. + syncing_user_devices: A set of (user ID, device ID) tuples with active syncs.. + user_to_devices: A map of user ID to device ID to UserDevicePresenceState. now: Current time in ms. Returns: @@ -1891,9 +2003,16 @@ def handle_timeouts( changes = {} # Actual changes we need to notify people about for state in user_states: - is_mine = is_mine_fn(state.user_id) - - new_state = handle_timeout(state, is_mine, syncing_user_ids, now) + user_id = state.user_id + is_mine = is_mine_fn(user_id) + + new_state = handle_timeout( + state, + is_mine, + syncing_user_devices, + user_to_devices.get(user_id, {}), + now, + ) if new_state: changes[state.user_id] = new_state @@ -1901,14 +2020,19 @@ def handle_timeouts( def handle_timeout( - state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int + state: UserPresenceState, + is_mine: bool, + syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]], + user_devices: Dict[Optional[str], UserDevicePresenceState], + now: int, ) -> Optional[UserPresenceState]: """Checks the presence of the user to see if any of the timers have elapsed Args: - state + state: UserPresenceState to check. is_mine: Whether the user is ours - syncing_user_ids: Set of user_ids with active syncs. + syncing_user_devices: A set of (user ID, device ID) tuples with active syncs.. + user_devices: A map of device ID to UserDevicePresenceState. now: Current time in ms. Returns: @@ -1919,34 +2043,63 @@ def handle_timeout( return None changed = False - user_id = state.user_id if is_mine: - if state.state == PresenceState.ONLINE: - if now - state.last_active_ts > IDLE_TIMER: - # Currently online, but last activity ages ago so auto - # idle - state = state.copy_and_replace(state=PresenceState.UNAVAILABLE) - changed = True - elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: - # So that we send down a notification that we've - # stopped updating. + # Check per-device whether the device should be considered idle or offline + # due to timeouts. + device_changed = False + offline_devices = [] + for device_id, device_state in user_devices.items(): + if device_state.state == PresenceState.ONLINE: + if now - device_state.last_active_ts > IDLE_TIMER: + # Currently online, but last activity ages ago so auto + # idle + device_state.state = PresenceState.UNAVAILABLE + device_changed = True + + # If there are have been no sync for a while (and none ongoing), + # set presence to offline. + if (state.user_id, device_id) not in syncing_device_ids: + # If the user has done something recently but hasn't synced, + # don't set them as offline. + sync_or_active = max( + device_state.last_sync_ts, device_state.last_active_ts + ) + + # Implementations aren't meant to timeout a device with a busy + # state, but it needs to timeout *eventually* or else the user + # will be stuck in that state. + online_timeout = ( + BUSY_ONLINE_TIMEOUT + if device_state.state == PresenceState.BUSY + else SYNC_ONLINE_TIMEOUT + ) + if now - sync_or_active > online_timeout: + # Mark the device as going offline. + offline_devices.append(device_id) + device_changed = True + + # Offline devices are not needed and do not add information. + for device_id in offline_devices: + user_devices.pop(device_id) + + # If the presence state of the devices changed, then (maybe) update + # the user's overall presence state. + if device_changed: + new_presence = _combine_device_states(user_devices.values()) + if new_presence != state.state: + state = state.copy_and_replace(state=new_presence) changed = True + if now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: + # So that we send down a notification that we've + # stopped updating. + changed = True + if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL: # Need to send ping to other servers to ensure they don't # timeout and set us to offline changed = True - - # If there are have been no sync for a while (and none ongoing), - # set presence to offline - if user_id not in syncing_user_ids: - # If the user has done something recently but hasn't synced, - # don't set them as offline. - sync_or_active = max(state.last_user_sync_ts, state.last_active_ts) - if now - sync_or_active > SYNC_ONLINE_TIMEOUT: - state = state.copy_and_replace(state=PresenceState.OFFLINE) - changed = True else: # We expect to be poked occasionally by the other side. # This is to protect against forgetful/buggy servers, so that @@ -2021,6 +2174,13 @@ def handle_update( new_state = new_state.copy_and_replace(last_federation_update_ts=now) federation_ping = True + if new_state.state == PresenceState.BUSY: + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + BUSY_ONLINE_TIMEOUT, + ) + else: wheel_timer.insert( now=now, @@ -2036,6 +2196,46 @@ def handle_update( return new_state, persist_and_notify, federation_ping +PRESENCE_BY_PRIORITY = { + PresenceState.BUSY: 4, + PresenceState.ONLINE: 3, + PresenceState.UNAVAILABLE: 2, + PresenceState.OFFLINE: 1, +} + + +def _combine_device_states( + device_states: Iterable[UserDevicePresenceState], +) -> str: + """ + Find the device to use presence information from. + + Orders devices by priority, then last_active_ts. + + Args: + device_states: An iterable of device presence states + + Return: + The combined presence state. + """ + + # Based on (all) the user's devices calculate the new presence state. + presence = PresenceState.OFFLINE + last_active_ts = -1 + + # Find the device to use the presence state of based on the presence priority, + # but tie-break with how recently the device has been seen. + for device_state in device_states: + if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > ( + PRESENCE_BY_PRIORITY[presence], + last_active_ts, + ): + presence = device_state.state + last_active_ts = device_state.last_active_ts + + return presence + + async def get_interested_parties( store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState] ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]: diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0513e28aab..7a762c8511 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1787,7 +1787,7 @@ class RoomShutdownHandler: async def shutdown_room( self, room_id: str, - requester_user_id: str, + requester_user_id: Optional[str], new_room_user_id: Optional[str] = None, new_room_name: Optional[str] = None, message: Optional[str] = None, @@ -1811,6 +1811,10 @@ class RoomShutdownHandler: requester_user_id: User who requested the action and put the room on the blocking list. + If None, the action was not manually requested but instead + triggered automatically, e.g. through a Synapse module + or some other policy. + MUST NOT be None if block=True. new_room_user_id: If set, a new room will be created with this user ID as the creator and admin, and all users in the old room will be @@ -1863,6 +1867,10 @@ class RoomShutdownHandler: # Action the block first (even if the room doesn't exist yet) if block: + if requester_user_id is None: + raise ValueError( + "shutdown_room: block=True not allowed when requester_user_id is None." + ) # This will work even if the room is already blocked, but that is # desirable in case the first attempt at blocking the room failed below. await self.store.block_room(room_id, requester_user_id) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 60a9f341b5..0ccd7d250c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -40,6 +40,7 @@ from synapse.api.filtering import FilterCollection from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase +from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME from synapse.handlers.relations import BundledAggregations from synapse.logging import issue9533_logger from synapse.logging.context import current_context @@ -268,6 +269,7 @@ class SyncHandler: self._storage_controllers = hs.get_storage_controllers() self._state_storage_controller = self._storage_controllers.state self._device_handler = hs.get_device_handler() + self._task_scheduler = hs.get_task_scheduler() self.should_calculate_push_rules = hs.config.push.enable_push @@ -360,11 +362,19 @@ class SyncHandler: # (since we now know that the device has received them) if since_token is not None: since_stream_id = since_token.to_device_key - deleted = await self.store.delete_messages_for_device( - sync_config.user.to_string(), sync_config.device_id, since_stream_id + # Delete device messages asynchronously and in batches using the task scheduler + await self._task_scheduler.schedule_task( + DELETE_DEVICE_MSGS_TASK_NAME, + resource_id=sync_config.device_id, + params={ + "user_id": sync_config.user.to_string(), + "device_id": sync_config.device_id, + "up_to_stream_id": since_stream_id, + }, ) logger.debug( - "Deleted %d to-device messages up to %d", deleted, since_stream_id + "Deletion of to-device messages up to %d scheduled", + since_stream_id, ) if timeout == 0 or since_token is None or full_state: diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 91a24efcd0..a3a396bb37 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -399,15 +399,34 @@ class MatrixHostnameEndpoint: if port or _is_ip_literal(host): return [Server(host, port or 8448)] + # Check _matrix-fed._tcp SRV record. logger.debug("Looking up SRV record for %s", host.decode(errors="replace")) + server_list = await self._srv_resolver.resolve_service( + b"_matrix-fed._tcp." + host + ) + + if server_list: + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Got %s from SRV lookup for %s", + ", ".join(map(str, server_list)), + host.decode(errors="replace"), + ) + return server_list + + # No _matrix-fed._tcp SRV record, fallback to legacy _matrix._tcp SRV record. + logger.debug( + "Looking up deprecated SRV record for %s", host.decode(errors="replace") + ) server_list = await self._srv_resolver.resolve_service(b"_matrix._tcp." + host) if server_list: - logger.debug( - "Got %s from SRV lookup for %s", - ", ".join(map(str, server_list)), - host.decode(errors="replace"), - ) + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Got %s from deprecated SRV lookup for %s", + ", ".join(map(str, server_list)), + host.decode(errors="replace"), + ) return server_list # No SRV records, so we fallback to host and 8448 diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 64c6ae4512..bf7e311026 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -728,7 +728,7 @@ async def _unwrap_awaitable(awaitable: Awaitable[R]) -> R: @overload -def preserve_fn( # type: ignore[misc] +def preserve_fn( f: Callable[P, Awaitable[R]], ) -> Callable[P, "defer.Deferred[R]"]: # The `type: ignore[misc]` above suppresses @@ -756,7 +756,7 @@ def preserve_fn( @overload -def run_in_background( # type: ignore[misc] +def run_in_background( f: Callable[P, Awaitable[R]], *args: P.args, **kwargs: P.kwargs ) -> "defer.Deferred[R]": # The `type: ignore[misc]` above suppresses diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 2f00a7ba20..d6efe10a28 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -1730,6 +1730,19 @@ class ModuleApi: room_alias_str = room_alias.to_string() if room_alias else None return room_id, room_alias_str + async def delete_room(self, room_id: str) -> None: + """ + Schedules the deletion of a room from Synapse's database. + + If the room is already being deleted, this method does nothing. + This method does not wait for the room to be deleted. + + Added in Synapse v1.89.0. + """ + # Future extensions to this method might want to e.g. allow use of `force_purge`. + # TODO In the future we should make sure this is persistent. + self._hs.get_pagination_handler().start_shutdown_and_purge_room(room_id, None) + async def set_displayname( self, user_id: UserID, diff --git a/synapse/module_api/callbacks/third_party_event_rules_callbacks.py b/synapse/module_api/callbacks/third_party_event_rules_callbacks.py index 911f37ba42..ecaeef3511 100644 --- a/synapse/module_api/callbacks/third_party_event_rules_callbacks.py +++ b/synapse/module_api/callbacks/third_party_event_rules_callbacks.py @@ -40,7 +40,7 @@ CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK = Callable[ [str, StateMap[EventBase], str], Awaitable[bool] ] ON_NEW_EVENT_CALLBACK = Callable[[EventBase, StateMap[EventBase]], Awaitable] -CHECK_CAN_SHUTDOWN_ROOM_CALLBACK = Callable[[str, str], Awaitable[bool]] +CHECK_CAN_SHUTDOWN_ROOM_CALLBACK = Callable[[Optional[str], str], Awaitable[bool]] CHECK_CAN_DEACTIVATE_USER_CALLBACK = Callable[[str, bool], Awaitable[bool]] ON_PROFILE_UPDATE_CALLBACK = Callable[[str, ProfileInfo, bool, bool], Awaitable] ON_USER_DEACTIVATION_STATUS_CHANGED_CALLBACK = Callable[[str, bool, bool], Awaitable] @@ -429,12 +429,17 @@ class ThirdPartyEventRulesModuleApiCallbacks: "Failed to run module API callback %s: %s", callback, e ) - async def check_can_shutdown_room(self, user_id: str, room_id: str) -> bool: + async def check_can_shutdown_room( + self, user_id: Optional[str], room_id: str + ) -> bool: """Intercept requests to shutdown a room. If `False` is returned, the room must not be shut down. Args: - requester: The ID of the user requesting the shutdown. + user_id: The ID of the user requesting the shutdown. + If no user ID is supplied, then the room is being shut down through + some mechanism other than a user's request, e.g. through a module's + request. room_id: The ID of the room. """ for callback in self._check_can_shutdown_room_callbacks: diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index b471fcb064..744e98c6d0 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -349,7 +349,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): table="devices", column="user_id", iterable=user_ids_to_query, - keyvalues={"user_id": user_id, "hidden": False}, + keyvalues={"hidden": False}, retcols=("device_id",), ) @@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore): @trace async def delete_messages_for_device( - self, user_id: str, device_id: Optional[str], up_to_stream_id: int + self, + user_id: str, + device_id: Optional[str], + up_to_stream_id: int, + limit: int, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. + limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 + ROW_ID_NAME = self.database_engine.row_id_name + def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - sql = ( - "DELETE FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND stream_id <= ?" - ) + sql = f""" + DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN ( + SELECT {ROW_ID_NAME} FROM device_inbox + WHERE user_id = ? AND device_id = ? AND stream_id <= ? + LIMIT {limit} + ) + """ txn.execute(sql, (user_id, device_id, up_to_stream_id)) return txn.rowcount @@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": f"deleted {count} messages for device", "count": count}) + # In this case we don't know if we hit the limit or the delete is complete + # so let's not update the cache. + if count == limit: + return count + # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index e4162f846b..324fdfa892 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1766,14 +1766,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.db_pool.simple_delete_many_txn( txn, - table="device_inbox", - column="device_id", - values=device_ids, - keyvalues={"user_id": user_id}, - ) - - self.db_pool.simple_delete_many_txn( - txn, table="device_auth_providers", column="device_id", values=device_ids, diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index a3b4744855..57aa4921e1 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -221,12 +221,17 @@ class KeyStore(CacheInvalidationWorkerStore): """Processes a batch of keys to fetch, and adds the result to `keys`.""" # batch_iter always returns tuples so it's safe to do len(batch) - sql = """ - SELECT server_name, key_id, key_json, ts_valid_until_ms - FROM server_keys_json WHERE 1=0 - """ + " OR (server_name=? AND key_id=?)" * len( - batch - ) + where_clause = " OR (server_name=? AND key_id=?)" * len(batch) + + # `server_keys_json` can have multiple entries per server (one per + # remote server we fetched from, if using perspectives). Order by + # `ts_added_ms` so the most recently fetched one always wins. + sql = f""" + SELECT server_name, key_id, key_json, ts_valid_until_ms + FROM server_keys_json WHERE 1=0 + {where_clause} + ORDER BY ts_added_ms + """ txn.execute(sql, tuple(itertools.chain.from_iterable(batch))) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 5ee5c7ad9f..e4d10ff250 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -939,11 +939,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): receipts.""" def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: - if isinstance(self.database_engine, PostgresEngine): - ROW_ID_NAME = "ctid" - else: - ROW_ID_NAME = "rowid" - + ROW_ID_NAME = self.database_engine.row_id_name # Identify any duplicate receipts arising from # https://github.com/matrix-org/synapse/issues/14406. # The following query takes less than a minute on matrix.org. diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 0b5b3bf03e..b1a2418cbd 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -100,6 +100,12 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM """Gets a string giving the server version. For example: '3.22.0'""" ... + @property + @abc.abstractmethod + def row_id_name(self) -> str: + """Gets the literal name representing a row id for this engine.""" + ... + @abc.abstractmethod def in_transaction(self, conn: ConnectionType) -> bool: """Whether the connection is currently in a transaction.""" diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 05a72dc554..6309363217 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -211,6 +211,10 @@ class PostgresEngine( else: return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100) + @property + def row_id_name(self) -> str: + return "ctid" + def in_transaction(self, conn: psycopg2.extensions.connection) -> bool: return conn.status != psycopg2.extensions.STATUS_READY diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index ca8c59297c..802069e1e1 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -123,6 +123,10 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]): """Gets a string giving the server version. For example: '3.22.0'.""" return "%i.%i.%i" % sqlite3.sqlite_version_info + @property + def row_id_name(self) -> str: + return "rowid" + def in_transaction(self, conn: sqlite3.Connection) -> bool: return conn.in_transaction diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py index ad2da4c8af..622686d28f 100644 --- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py +++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py @@ -14,7 +14,7 @@ from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.prepare_database import get_statements FIX_INDEXES = """ @@ -37,7 +37,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id); def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" + rowid = database_engine.row_id_name # remove duplicates from group_users & group_invites tables cur.execute( diff --git a/synapse/util/gai_resolver.py b/synapse/util/gai_resolver.py index 214eb17fbc..fecf829ade 100644 --- a/synapse/util/gai_resolver.py +++ b/synapse/util/gai_resolver.py @@ -136,7 +136,7 @@ class GAIResolver: # The types on IHostnameResolver is incorrect in Twisted, see # https://twistedmatrix.com/trac/ticket/10276 - def resolveHostName( # type: ignore[override] + def resolveHostName( self, resolutionReceiver: IResolutionReceiver, hostName: str, diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 9e89aeb748..9b2581e51a 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -77,6 +77,7 @@ class TaskScheduler: LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs def __init__(self, hs: "HomeServer"): + self._hs = hs self._store = hs.get_datastores().main self._clock = hs.get_clock() self._running_tasks: Set[str] = set() @@ -97,8 +98,6 @@ class TaskScheduler: "handle_scheduled_tasks", self._handle_scheduled_tasks, ) - else: - self.replication_client = hs.get_replication_command_handler() def register_action( self, @@ -133,7 +132,7 @@ class TaskScheduler: params: Optional[JsonMapping] = None, ) -> str: """Schedule a new potentially resumable task. A function matching the specified - `action` should have been previously registered with `register_action`. + `action` should have be registered with `register_action` before the task is run. Args: action: the name of a previously registered action @@ -149,11 +148,6 @@ class TaskScheduler: Returns: The id of the scheduled task """ - if action not in self._actions: - raise Exception( - f"No function associated with action {action} of the scheduled task" - ) - status = TaskStatus.SCHEDULED if timestamp is None or timestamp < self._clock.time_msec(): timestamp = self._clock.time_msec() @@ -175,7 +169,7 @@ class TaskScheduler: if self._run_background_tasks: await self._launch_task(task) else: - self.replication_client.send_new_active_task(task.id) + self._hs.get_replication_command_handler().send_new_active_task(task.id) return task.id @@ -315,7 +309,10 @@ class TaskScheduler: """ assert self._run_background_tasks - assert task.action in self._actions + if task.action not in self._actions: + raise Exception( + f"No function associated with action {task.action} of the scheduled task {task.id}" + ) function = self._actions[task.action] async def wrapper() -> None: diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 46d022092e..a7e6cdd66a 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -422,6 +422,18 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): "exclusive_as_user", "password", self.exclusive_as_user_device_id ) + self.exclusive_as_user_2_device_id = "exclusive_as_device_2" + self.exclusive_as_user_2 = self.register_user("exclusive_as_user_2", "password") + self.exclusive_as_user_2_token = self.login( + "exclusive_as_user_2", "password", self.exclusive_as_user_2_device_id + ) + + self.exclusive_as_user_3_device_id = "exclusive_as_device_3" + self.exclusive_as_user_3 = self.register_user("exclusive_as_user_3", "password") + self.exclusive_as_user_3_token = self.login( + "exclusive_as_user_3", "password", self.exclusive_as_user_3_device_id + ) + def _notify_interested_services(self) -> None: # This is normally set in `notify_interested_services` but we need to call the # internal async version so the reactor gets pushed to completion. @@ -849,6 +861,119 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): for count in service_id_to_message_count.values(): self.assertEqual(count, number_of_messages) + @unittest.override_config( + {"experimental_features": {"msc2409_to_device_messages_enabled": True}} + ) + def test_application_services_receive_local_to_device_for_many_users(self) -> None: + """ + Test that when a user sends a to-device message to many users + in an application service's user namespace, the + application service will receive all of them. + """ + interested_appservice = self._register_application_service( + namespaces={ + ApplicationService.NS_USERS: [ + { + "regex": "@exclusive_as_user:.+", + "exclusive": True, + }, + { + "regex": "@exclusive_as_user_2:.+", + "exclusive": True, + }, + { + "regex": "@exclusive_as_user_3:.+", + "exclusive": True, + }, + ], + }, + ) + + # Have local_user send a to-device message to exclusive_as_users + message_content = {"some_key": "some really interesting value"} + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.room_key_request/3", + content={ + "messages": { + self.exclusive_as_user: { + self.exclusive_as_user_device_id: message_content + }, + self.exclusive_as_user_2: { + self.exclusive_as_user_2_device_id: message_content + }, + self.exclusive_as_user_3: { + self.exclusive_as_user_3_device_id: message_content + }, + } + }, + access_token=self.local_user_token, + ) + self.assertEqual(chan.code, 200, chan.result) + + # Have exclusive_as_user send a to-device message to local_user + for user_token in [ + self.exclusive_as_user_token, + self.exclusive_as_user_2_token, + self.exclusive_as_user_3_token, + ]: + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.room_key_request/4", + content={ + "messages": { + self.local_user: {self.local_user_device_id: message_content} + } + }, + access_token=user_token, + ) + self.assertEqual(chan.code, 200, chan.result) + + # Check if our application service - that is interested in exclusive_as_user - received + # the to-device message as part of an AS transaction. + # Only the local_user -> exclusive_as_user to-device message should have been forwarded to the AS. + # + # The uninterested application service should not have been notified at all. + self.send_mock.assert_called_once() + ( + service, + _events, + _ephemeral, + to_device_messages, + _otks, + _fbks, + _device_list_summary, + ) = self.send_mock.call_args[0] + + # Assert that this was the same to-device message that local_user sent + self.assertEqual(service, interested_appservice) + + # Assert expected number of messages + self.assertEqual(len(to_device_messages), 3) + + for device_msg in to_device_messages: + self.assertEqual(device_msg["type"], "m.room_key_request") + self.assertEqual(device_msg["sender"], self.local_user) + self.assertEqual(device_msg["content"], message_content) + + self.assertEqual(to_device_messages[0]["to_user_id"], self.exclusive_as_user) + self.assertEqual( + to_device_messages[0]["to_device_id"], + self.exclusive_as_user_device_id, + ) + + self.assertEqual(to_device_messages[1]["to_user_id"], self.exclusive_as_user_2) + self.assertEqual( + to_device_messages[1]["to_device_id"], + self.exclusive_as_user_2_device_id, + ) + + self.assertEqual(to_device_messages[2]["to_user_id"], self.exclusive_as_user_3) + self.assertEqual( + to_device_messages[2]["to_device_id"], + self.exclusive_as_user_3_device_id, + ) + def _register_application_service( self, namespaces: Optional[Dict[str, Iterable[Dict]]] = None, diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 55a4f95ef3..9659a4a355 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -30,6 +30,7 @@ from synapse.server import HomeServer from synapse.storage.databases.main.appservice import _make_exclusive_regex from synapse.types import JsonDict, create_requester from synapse.util import Clock +from synapse.util.task_scheduler import TaskScheduler from tests import unittest from tests.unittest import override_config @@ -49,6 +50,7 @@ class DeviceTestCase(unittest.HomeserverTestCase): assert isinstance(handler, DeviceHandler) self.handler = handler self.store = hs.get_datastores().main + self.device_message_handler = hs.get_device_message_handler() return hs def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -211,6 +213,51 @@ class DeviceTestCase(unittest.HomeserverTestCase): ) self.assertIsNone(res) + def test_delete_device_and_big_device_inbox(self) -> None: + """Check that deleting a big device inbox is staged and batched asynchronously.""" + DEVICE_ID = "abc" + sender = "@sender:" + self.hs.hostname + receiver = "@receiver:" + self.hs.hostname + self._record_user(sender, DEVICE_ID, DEVICE_ID) + self._record_user(receiver, DEVICE_ID, DEVICE_ID) + + # queue a bunch of messages in the inbox + requester = create_requester(sender, device_id=DEVICE_ID) + for i in range(0, DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT + 10): + self.get_success( + self.device_message_handler.send_device_message( + requester, "message_type", {receiver: {"*": {"val": i}}} + ) + ) + + # delete the device + self.get_success(self.handler.delete_devices(receiver, [DEVICE_ID])) + + # messages should be deleted up to DEVICE_MSGS_DELETE_BATCH_LIMIT straight away + res = self.get_success( + self.store.db_pool.simple_select_list( + table="device_inbox", + keyvalues={"user_id": receiver}, + retcols=("user_id", "device_id", "stream_id"), + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(10, len(res)) + + # wait for the task scheduler to do a second delete pass + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + + # remaining messages should now be deleted + res = self.get_success( + self.store.db_pool.simple_select_list( + table="device_inbox", + keyvalues={"user_id": receiver}, + retcols=("user_id", "device_id", "stream_id"), + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(0, len(res)) + def test_update_device(self) -> None: self._record_users() diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 88a16193a3..638787b029 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -21,11 +21,12 @@ from signedjson.key import generate_signing_key from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventTypes, Membership, PresenceState -from synapse.api.presence import UserPresenceState +from synapse.api.presence import UserDevicePresenceState, UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events.builder import EventBuilder from synapse.federation.sender import FederationSender from synapse.handlers.presence import ( + BUSY_ONLINE_TIMEOUT, EXTERNAL_PROCESS_EXPIRY, FEDERATION_PING_INTERVAL, FEDERATION_TIMEOUT, @@ -352,6 +353,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): def test_idle_timer(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -362,8 +364,21 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_user_sync_ts=now, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNotNone(new_state) assert new_state is not None @@ -376,6 +391,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): presence state into unavailable. """ user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -386,8 +402,21 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_user_sync_ts=now, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNotNone(new_state) assert new_state is not None @@ -396,6 +425,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): def test_sync_timeout(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -406,8 +436,21 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNotNone(new_state) assert new_state is not None @@ -416,6 +459,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): def test_sync_online(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -426,9 +470,20 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) new_state = handle_timeout( - state, is_mine=True, syncing_user_ids={user_id}, now=now + state, + is_mine=True, + syncing_device_ids={(user_id, device_id)}, + user_devices={device_id: device_state}, + now=now, ) self.assertIsNotNone(new_state) @@ -438,6 +493,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): def test_federation_ping(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -449,14 +505,28 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNotNone(new_state) self.assertEqual(state, new_state) def test_no_timeout(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" now = 5000000 state = UserPresenceState.default(user_id) @@ -466,8 +536,21 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_user_sync_ts=now, last_federation_update_ts=now, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNone(new_state) @@ -485,8 +568,9 @@ class PresenceTimeoutTestCase(unittest.TestCase): status_msg=status_msg, ) + # Note that this is a remote user so we do not have their device information. new_state = handle_timeout( - state, is_mine=False, syncing_user_ids=set(), now=now + state, is_mine=False, syncing_device_ids=set(), user_devices={}, now=now ) self.assertIsNotNone(new_state) @@ -496,6 +580,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): def test_last_active(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -507,8 +592,21 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_federation_update_ts=now, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNotNone(new_state) self.assertEqual(state, new_state) @@ -579,7 +677,7 @@ class PresenceHandlerInitTestCase(unittest.HomeserverTestCase): [ (PresenceState.BUSY, PresenceState.BUSY), (PresenceState.ONLINE, PresenceState.ONLINE), - (PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE), + (PresenceState.UNAVAILABLE, PresenceState.ONLINE), # Offline syncs don't update the state. (PresenceState.OFFLINE, PresenceState.ONLINE), ] @@ -800,6 +898,486 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): # we should now be online self.assertEqual(state.state, PresenceState.ONLINE) + @parameterized.expand( + # A list of tuples of 4 strings: + # + # * The presence state of device 1. + # * The presence state of device 2. + # * The expected user presence state after both devices have synced. + # * The expected user presence state after device 1 has idled. + # * The expected user presence state after device 2 has idled. + # * True to use workers, False a monolith. + [ + (*cases, workers) + for workers in (False, True) + for cases in [ + # If both devices have the same state, online should eventually idle. + # Otherwise, the state doesn't change. + ( + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.OFFLINE, + PresenceState.OFFLINE, + PresenceState.OFFLINE, + PresenceState.OFFLINE, + PresenceState.OFFLINE, + ), + # If the second device has a "lower" state it should fallback to it, + # except for "busy" which overrides. + ( + PresenceState.BUSY, + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.ONLINE, + PresenceState.OFFLINE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + # If the second device has a "higher" state it should override. + ( + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.OFFLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.OFFLINE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ] + ], + name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[5] else 'monolith'}", + ) + @unittest.override_config({"experimental_features": {"msc3026_enabled": True}}) + def test_set_presence_from_syncing_multi_device( + self, + dev_1_state: str, + dev_2_state: str, + expected_state_1: str, + expected_state_2: str, + expected_state_3: str, + test_with_workers: bool, + ) -> None: + """ + Test the behaviour of multiple devices syncing at the same time. + + Roughly the user's presence state should be set to the "highest" priority + of all the devices. When a device then goes offline its state should be + discarded and the next highest should win. + + Note that these tests use the idle timer (and don't close the syncs), it + is unlikely that a *single* sync would last this long, but is close enough + to continually syncing with that current state. + """ + user_id = f"@test:{self.hs.config.server.server_name}" + + # By default, we call /sync against the main process. + worker_presence_handler = self.presence_handler + if test_with_workers: + # Create a worker and use it to handle /sync traffic instead. + # This is used to test that presence changes get replicated from workers + # to the main process correctly. + worker_to_sync_against = self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "synchrotron"} + ) + worker_presence_handler = worker_to_sync_against.get_presence_handler() + + # 1. Sync with the first device. + self.get_success( + worker_presence_handler.user_syncing( + user_id, + "dev-1", + affect_presence=dev_1_state != PresenceState.OFFLINE, + presence_state=dev_1_state, + ), + by=0.01, + ) + + # 2. Wait half the idle timer. + self.reactor.advance(IDLE_TIMER / 1000 / 2) + self.reactor.pump([0.1]) + + # 3. Sync with the second device. + self.get_success( + worker_presence_handler.user_syncing( + user_id, + "dev-2", + affect_presence=dev_2_state != PresenceState.OFFLINE, + presence_state=dev_2_state, + ), + by=0.01, + ) + + # 4. Assert the expected presence state. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_1) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_1) + + # When testing with workers, make another random sync (with any *different* + # user) to keep the process information from expiring. + # + # This is due to EXTERNAL_PROCESS_EXPIRY being equivalent to IDLE_TIMER. + if test_with_workers: + with self.get_success( + worker_presence_handler.user_syncing( + f"@other-user:{self.hs.config.server.server_name}", + "dev-3", + affect_presence=True, + presence_state=PresenceState.ONLINE, + ), + by=0.01, + ): + pass + + # 5. Advance such that the first device should be discarded (the idle timer), + # then pump so _handle_timeouts function to called. + self.reactor.advance(IDLE_TIMER / 1000 / 2) + self.reactor.pump([0.01]) + + # 6. Assert the expected presence state. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_2) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_2) + + # 7. Advance such that the second device should be discarded (half the idle timer), + # then pump so _handle_timeouts function to called. + self.reactor.advance(IDLE_TIMER / 1000 / 2) + self.reactor.pump([0.1]) + + # 8. The devices are still "syncing" (the sync context managers were never + # closed), so might idle. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_3) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_3) + + @parameterized.expand( + # A list of tuples of 4 strings: + # + # * The presence state of device 1. + # * The presence state of device 2. + # * The expected user presence state after both devices have synced. + # * The expected user presence state after device 1 has stopped syncing. + # * True to use workers, False a monolith. + [ + (*cases, workers) + for workers in (False, True) + for cases in [ + # If both devices have the same state, nothing exciting should happen. + ( + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.OFFLINE, + PresenceState.OFFLINE, + PresenceState.OFFLINE, + PresenceState.OFFLINE, + ), + # If the second device has a "lower" state it should fallback to it, + # except for "busy" which overrides. + ( + PresenceState.BUSY, + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.ONLINE, + PresenceState.OFFLINE, + PresenceState.ONLINE, + PresenceState.OFFLINE, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + ), + # If the second device has a "higher" state it should override. + ( + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + ), + ( + PresenceState.OFFLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + ), + ( + PresenceState.OFFLINE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ] + ], + name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[4] else 'monolith'}", + ) + @unittest.override_config({"experimental_features": {"msc3026_enabled": True}}) + def test_set_presence_from_non_syncing_multi_device( + self, + dev_1_state: str, + dev_2_state: str, + expected_state_1: str, + expected_state_2: str, + test_with_workers: bool, + ) -> None: + """ + Test the behaviour of multiple devices syncing at the same time. + + Roughly the user's presence state should be set to the "highest" priority + of all the devices. When a device then goes offline its state should be + discarded and the next highest should win. + + Note that these tests use the idle timer (and don't close the syncs), it + is unlikely that a *single* sync would last this long, but is close enough + to continually syncing with that current state. + """ + user_id = f"@test:{self.hs.config.server.server_name}" + + # By default, we call /sync against the main process. + worker_presence_handler = self.presence_handler + if test_with_workers: + # Create a worker and use it to handle /sync traffic instead. + # This is used to test that presence changes get replicated from workers + # to the main process correctly. + worker_to_sync_against = self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "synchrotron"} + ) + worker_presence_handler = worker_to_sync_against.get_presence_handler() + + # 1. Sync with the first device. + sync_1 = self.get_success( + worker_presence_handler.user_syncing( + user_id, + "dev-1", + affect_presence=dev_1_state != PresenceState.OFFLINE, + presence_state=dev_1_state, + ), + by=0.1, + ) + + # 2. Sync with the second device. + sync_2 = self.get_success( + worker_presence_handler.user_syncing( + user_id, + "dev-2", + affect_presence=dev_2_state != PresenceState.OFFLINE, + presence_state=dev_2_state, + ), + by=0.1, + ) + + # 3. Assert the expected presence state. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_1) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_1) + + # 4. Disconnect the first device. + with sync_1: + pass + + # 5. Advance such that the first device should be discarded (the sync timeout), + # then pump so _handle_timeouts function to called. + self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000) + self.reactor.pump([5]) + + # 6. Assert the expected presence state. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_2) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_2) + + # 7. Disconnect the second device. + with sync_2: + pass + + # 8. Advance such that the second device should be discarded (the sync timeout), + # then pump so _handle_timeouts function to called. + if dev_1_state == PresenceState.BUSY or dev_2_state == PresenceState.BUSY: + timeout = BUSY_ONLINE_TIMEOUT + else: + timeout = SYNC_ONLINE_TIMEOUT + self.reactor.advance(timeout / 1000) + self.reactor.pump([5]) + + # 9. There are no more devices, should be offline. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, PresenceState.OFFLINE) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, PresenceState.OFFLINE) + def test_set_presence_from_syncing_keeps_status(self) -> None: """Test that presence set by syncing retains status message""" status_msg = "I'm here!" diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index 0d17f2fe5b..9f63fa6fa8 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -15,7 +15,7 @@ import base64 import logging import os from typing import Generator, List, Optional, cast -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, call, patch import treq from netaddr import IPSet @@ -651,9 +651,9 @@ class MatrixFederationAgentTests(unittest.TestCase): # .well-known request fails. self.reactor.pump((0.4,)) - # now there should be a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv1" + # now there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv1"), call(b"_matrix._tcp.testserv1")] ) # we should fall back to a direct connection @@ -737,9 +737,9 @@ class MatrixFederationAgentTests(unittest.TestCase): # .well-known request fails. self.reactor.pump((0.4,)) - # now there should be a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv" + # now there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")] ) # we should fall back to a direct connection @@ -788,9 +788,12 @@ class MatrixFederationAgentTests(unittest.TestCase): content=b'{ "m.server": "target-server" }', ) - # there should be a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.target-server" + # there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [ + call(b"_matrix-fed._tcp.target-server"), + call(b"_matrix._tcp.target-server"), + ] ) # now we should get a connection to the target server @@ -878,9 +881,12 @@ class MatrixFederationAgentTests(unittest.TestCase): self.reactor.pump((0.1,)) - # there should be a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.target-server" + # there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [ + call(b"_matrix-fed._tcp.target-server"), + call(b"_matrix._tcp.target-server"), + ] ) # now we should get a connection to the target server @@ -942,9 +948,9 @@ class MatrixFederationAgentTests(unittest.TestCase): client_factory, expected_sni=b"testserv", content=b"NOT JSON" ) - # now there should be a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv" + # now there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")] ) # we should fall back to a direct connection @@ -1016,14 +1022,14 @@ class MatrixFederationAgentTests(unittest.TestCase): # there should be no requests self.assertEqual(len(http_proto.requests), 0) - # and there should be a SRV lookup instead - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv" + # and there should be two SRV lookups instead + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")] ) def test_get_hostname_srv(self) -> None: """ - Test the behaviour when there is a single SRV record + Test the behaviour when there is a single SRV record for _matrix-fed. """ self.agent = self._make_agent() @@ -1039,7 +1045,51 @@ class MatrixFederationAgentTests(unittest.TestCase): # the request for a .well-known will have failed with a DNS lookup error. self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv" + b"_matrix-fed._tcp.testserv" + ) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8443) + + # make a test server, and wire up the client + http_server = self._make_connection(client_factory, expected_sni=b"testserv") + + self.assertEqual(len(http_server.requests), 1) + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/foo/bar") + self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"]) + + # finish the request + request.finish() + self.reactor.pump((0.1,)) + self.successResultOf(test_d) + + def test_get_hostname_srv_legacy(self) -> None: + """ + Test the behaviour when there is a single SRV record for _matrix. + """ + self.agent = self._make_agent() + + # Return no entries for the _matrix-fed lookup, and a response for _matrix. + self.mock_resolver.resolve_service.side_effect = [ + [], + [Server(host=b"srvtarget", port=8443)], + ] + self.reactor.lookups["srvtarget"] = "1.2.3.4" + + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") + + # Nothing happened yet + self.assertNoResult(test_d) + + # the request for a .well-known will have failed with a DNS lookup error. + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")] ) # Make sure treq is trying to connect @@ -1065,7 +1115,7 @@ class MatrixFederationAgentTests(unittest.TestCase): def test_get_well_known_srv(self) -> None: """Test the behaviour when the .well-known redirects to a place where there - is a SRV. + is a _matrix-fed SRV record. """ self.agent = self._make_agent() @@ -1096,7 +1146,72 @@ class MatrixFederationAgentTests(unittest.TestCase): # there should be a SRV lookup self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.target-server" + b"_matrix-fed._tcp.target-server" + ) + + # now we should get a connection to the target of the SRV record + self.assertEqual(len(clients), 2) + (host, port, client_factory, _timeout, _bindAddress) = clients[1] + self.assertEqual(host, "5.6.7.8") + self.assertEqual(port, 8443) + + # make a test server, and wire up the client + http_server = self._make_connection( + client_factory, expected_sni=b"target-server" + ) + + self.assertEqual(len(http_server.requests), 1) + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/foo/bar") + self.assertEqual( + request.requestHeaders.getRawHeaders(b"host"), [b"target-server"] + ) + + # finish the request + request.finish() + self.reactor.pump((0.1,)) + self.successResultOf(test_d) + + def test_get_well_known_srv_legacy(self) -> None: + """Test the behaviour when the .well-known redirects to a place where there + is a _matrix SRV record. + """ + self.agent = self._make_agent() + + self.reactor.lookups["testserv"] = "1.2.3.4" + self.reactor.lookups["srvtarget"] = "5.6.7.8" + + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") + + # Nothing happened yet + self.assertNoResult(test_d) + + # there should be an attempt to connect on port 443 for the .well-known + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 443) + + # Return no entries for the _matrix-fed lookup, and a response for _matrix. + self.mock_resolver.resolve_service.side_effect = [ + [], + [Server(host=b"srvtarget", port=8443)], + ] + + self._handle_well_known_connection( + client_factory, + expected_sni=b"testserv", + content=b'{ "m.server": "target-server" }', + ) + + # there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [ + call(b"_matrix-fed._tcp.target-server"), + call(b"_matrix._tcp.target-server"), + ] ) # now we should get a connection to the target of the SRV record @@ -1158,8 +1273,11 @@ class MatrixFederationAgentTests(unittest.TestCase): self.reactor.pump((0.4,)) # now there should have been a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.xn--bcher-kva.com" + self.mock_resolver.resolve_service.assert_has_calls( + [ + call(b"_matrix-fed._tcp.xn--bcher-kva.com"), + call(b"_matrix._tcp.xn--bcher-kva.com"), + ] ) # We should fall back to port 8448 @@ -1188,7 +1306,7 @@ class MatrixFederationAgentTests(unittest.TestCase): self.successResultOf(test_d) def test_idna_srv_target(self) -> None: - """test the behaviour when the target of a SRV record has idna chars""" + """test the behaviour when the target of a _matrix-fed SRV record has idna chars""" self.agent = self._make_agent() self.mock_resolver.resolve_service.return_value = [ @@ -1204,7 +1322,57 @@ class MatrixFederationAgentTests(unittest.TestCase): self.assertNoResult(test_d) self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.xn--bcher-kva.com" + b"_matrix-fed._tcp.xn--bcher-kva.com" + ) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8443) + + # make a test server, and wire up the client + http_server = self._make_connection( + client_factory, expected_sni=b"xn--bcher-kva.com" + ) + + self.assertEqual(len(http_server.requests), 1) + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/foo/bar") + self.assertEqual( + request.requestHeaders.getRawHeaders(b"host"), [b"xn--bcher-kva.com"] + ) + + # finish the request + request.finish() + self.reactor.pump((0.1,)) + self.successResultOf(test_d) + + def test_idna_srv_target_legacy(self) -> None: + """test the behaviour when the target of a _matrix SRV record has idna chars""" + self.agent = self._make_agent() + + # Return no entries for the _matrix-fed lookup, and a response for _matrix. + self.mock_resolver.resolve_service.side_effect = [ + [], + [Server(host=b"xn--trget-3qa.com", port=8443)], + ] # târget.com + self.reactor.lookups["xn--trget-3qa.com"] = "1.2.3.4" + + test_d = self._make_get_request( + b"matrix-federation://xn--bcher-kva.com/foo/bar" + ) + + # Nothing happened yet + self.assertNoResult(test_d) + + self.mock_resolver.resolve_service.assert_has_calls( + [ + call(b"_matrix-fed._tcp.xn--bcher-kva.com"), + call(b"_matrix._tcp.xn--bcher-kva.com"), + ] ) # Make sure treq is trying to connect @@ -1394,7 +1562,7 @@ class MatrixFederationAgentTests(unittest.TestCase): self.assertIsNone(r.delegated_server) def test_srv_fallbacks(self) -> None: - """Test that other SRV results are tried if the first one fails.""" + """Test that other SRV results are tried if the first one fails for _matrix-fed SRV.""" self.agent = self._make_agent() self.mock_resolver.resolve_service.return_value = [ @@ -1409,7 +1577,67 @@ class MatrixFederationAgentTests(unittest.TestCase): self.assertNoResult(test_d) self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv" + b"_matrix-fed._tcp.testserv" + ) + + # We should see an attempt to connect to the first server + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8443) + + # Fonx the connection + client_factory.clientConnectionFailed(None, Exception("nope")) + + # There's a 300ms delay in HostnameEndpoint + self.reactor.pump((0.4,)) + + # Hasn't failed yet + self.assertNoResult(test_d) + + # We shouldnow see an attempt to connect to the second server + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8444) + + # make a test server, and wire up the client + http_server = self._make_connection(client_factory, expected_sni=b"testserv") + + self.assertEqual(len(http_server.requests), 1) + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/foo/bar") + self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"]) + + # finish the request + request.finish() + self.reactor.pump((0.1,)) + self.successResultOf(test_d) + + def test_srv_fallbacks_legacy(self) -> None: + """Test that other SRV results are tried if the first one fails for _matrix SRV.""" + self.agent = self._make_agent() + + # Return no entries for the _matrix-fed lookup, and a response for _matrix. + self.mock_resolver.resolve_service.side_effect = [ + [], + [ + Server(host=b"target.com", port=8443), + Server(host=b"target.com", port=8444), + ], + ] + self.reactor.lookups["target.com"] = "1.2.3.4" + + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") + + # Nothing happened yet + self.assertNoResult(test_d) + + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")] ) # We should see an attempt to connect to the first server @@ -1449,6 +1677,43 @@ class MatrixFederationAgentTests(unittest.TestCase): self.reactor.pump((0.1,)) self.successResultOf(test_d) + def test_srv_no_fallback_to_legacy(self) -> None: + """Test that _matrix SRV results are not tried if the _matrix-fed one fails.""" + self.agent = self._make_agent() + + # Return a failing entry for _matrix-fed. + self.mock_resolver.resolve_service.side_effect = [ + [Server(host=b"target.com", port=8443)], + [], + ] + self.reactor.lookups["target.com"] = "1.2.3.4" + + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") + + # Nothing happened yet + self.assertNoResult(test_d) + + # Only the _matrix-fed is checked, _matrix is ignored. + self.mock_resolver.resolve_service.assert_called_once_with( + b"_matrix-fed._tcp.testserv" + ) + + # We should see an attempt to connect to the first server + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8443) + + # Fonx the connection + client_factory.clientConnectionFailed(None, Exception("nope")) + + # There's a 300ms delay in HostnameEndpoint + self.reactor.pump((0.4,)) + + # Failed to resolve a server. + self.assertFailure(test_d, Exception) + class TestCachePeriodFromHeaders(unittest.TestCase): def test_cache_control(self) -> None: |