diff options
107 files changed, 1383 insertions, 1018 deletions
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 96750cb6c8..7dbd83908e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -280,7 +280,6 @@ jobs: - check-lockfile - lint-clippy - lint-rustfmt - - check-signoff runs-on: ubuntu-latest steps: - run: "true" diff --git a/Cargo.lock b/Cargo.lock index 084b8b91c3..f2b44b5448 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,9 +144,9 @@ checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" [[package]] name = "memoffset" -version = "0.6.5" +version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +checksum = "5a634b1c61a95585bd15607c6ab0c4e5b226e695ff2800ba0cdccddf208c406c" dependencies = [ "autocfg", ] @@ -191,9 +191,9 @@ dependencies = [ [[package]] name = "pyo3" -version = "0.17.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "268be0c73583c183f2b14052337465768c07726936a260f480f0857cb95ba543" +checksum = "e681a6cfdc4adcc93b4d3cf993749a4552018ee0a9b65fc0ccfad74352c72a38" dependencies = [ "anyhow", "cfg-if", @@ -209,9 +209,9 @@ dependencies = [ [[package]] name = "pyo3-build-config" -version = "0.17.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "28fcd1e73f06ec85bf3280c48c67e731d8290ad3d730f8be9dc07946923005c8" +checksum = "076c73d0bc438f7a4ef6fdd0c3bb4732149136abd952b110ac93e4edb13a6ba5" dependencies = [ "once_cell", "target-lexicon", @@ -219,9 +219,9 @@ dependencies = [ [[package]] name = "pyo3-ffi" -version = "0.17.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f6cb136e222e49115b3c51c32792886defbfb0adead26a688142b346a0b9ffc" +checksum = "e53cee42e77ebe256066ba8aa77eff722b3bb91f3419177cf4cd0f304d3284d9" dependencies = [ "libc", "pyo3-build-config", @@ -240,9 +240,9 @@ dependencies = [ [[package]] name = "pyo3-macros" -version = "0.17.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94144a1266e236b1c932682136dc35a9dee8d3589728f68130c7c3861ef96b28" +checksum = "dfeb4c99597e136528c6dd7d5e3de5434d1ceaf487436a3f03b2d56b6fc9efd1" dependencies = [ "proc-macro2", "pyo3-macros-backend", @@ -252,9 +252,9 @@ dependencies = [ [[package]] name = "pyo3-macros-backend" -version = "0.17.3" +version = "0.19.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8df9be978a2d2f0cdebabb03206ed73b11314701a5bfe71b0d753b81997777f" +checksum = "947dc12175c254889edc0c02e399476c2f652b4b9ebd123aa655c224de259536" dependencies = [ "proc-macro2", "quote", @@ -263,9 +263,9 @@ dependencies = [ [[package]] name = "pythonize" -version = "0.17.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f7f0c136f5fbc01868185eef462800e49659eb23acca83b9e884367a006acb6" +checksum = "8e35b716d430ace57e2d1b4afb51c9e5b7c46d2bce72926e07f9be6a98ced03e" dependencies = [ "pyo3", "serde", diff --git a/changelog.d/16162.misc b/changelog.d/16162.misc new file mode 100644 index 0000000000..b6c77229c1 --- /dev/null +++ b/changelog.d/16162.misc @@ -0,0 +1 @@ +Bump pyo3 from 0.17.1 to 0.19.2. diff --git a/changelog.d/16268.misc b/changelog.d/16268.misc new file mode 100644 index 0000000000..26059b108e --- /dev/null +++ b/changelog.d/16268.misc @@ -0,0 +1 @@ +Clean-up unused tables. diff --git a/changelog.d/16403.bugfix b/changelog.d/16403.bugfix new file mode 100644 index 0000000000..453c975a63 --- /dev/null +++ b/changelog.d/16403.bugfix @@ -0,0 +1 @@ +Remove legacy unspecced `knock_state_events` field returned in some responses. diff --git a/changelog.d/16404.bugfix b/changelog.d/16404.bugfix new file mode 100644 index 0000000000..3fd5028b33 --- /dev/null +++ b/changelog.d/16404.bugfix @@ -0,0 +1 @@ +Fixes possbile `AttributeError` when `_matrix/client/v3/account/whoami` is called over a unix socket. Contributed by @Sir-Photch. diff --git a/changelog.d/16419.misc b/changelog.d/16419.misc new file mode 100644 index 0000000000..591f371d00 --- /dev/null +++ b/changelog.d/16419.misc @@ -0,0 +1 @@ +Update registration of media repository URLs. diff --git a/changelog.d/16420.doc b/changelog.d/16420.doc new file mode 100644 index 0000000000..1c0c6b9577 --- /dev/null +++ b/changelog.d/16420.doc @@ -0,0 +1 @@ +Document internal background update mechanism. diff --git a/changelog.d/16421.misc b/changelog.d/16421.misc new file mode 100644 index 0000000000..93ceaeafc9 --- /dev/null +++ b/changelog.d/16421.misc @@ -0,0 +1 @@ +Improve type hints. diff --git a/changelog.d/16426.misc b/changelog.d/16426.misc new file mode 100644 index 0000000000..208a007171 --- /dev/null +++ b/changelog.d/16426.misc @@ -0,0 +1 @@ +Refactor some code to simplify and better type receipts stream adjacent code. diff --git a/changelog.d/16427.misc b/changelog.d/16427.misc new file mode 100644 index 0000000000..44f0e0595e --- /dev/null +++ b/changelog.d/16427.misc @@ -0,0 +1 @@ +Factor out `MultiWriter` token from `RoomStreamToken`. diff --git a/changelog.d/16428.misc b/changelog.d/16428.misc new file mode 100644 index 0000000000..75c9c3b757 --- /dev/null +++ b/changelog.d/16428.misc @@ -0,0 +1 @@ +Improve code comments. diff --git a/changelog.d/16429.misc b/changelog.d/16429.misc new file mode 100644 index 0000000000..bd7cdd42af --- /dev/null +++ b/changelog.d/16429.misc @@ -0,0 +1 @@ +Reduce memory allocations. diff --git a/changelog.d/16431.misc b/changelog.d/16431.misc new file mode 100644 index 0000000000..bd7cdd42af --- /dev/null +++ b/changelog.d/16431.misc @@ -0,0 +1 @@ +Reduce memory allocations. diff --git a/changelog.d/16433.misc b/changelog.d/16433.misc new file mode 100644 index 0000000000..bd7cdd42af --- /dev/null +++ b/changelog.d/16433.misc @@ -0,0 +1 @@ +Reduce memory allocations. diff --git a/changelog.d/16435.misc b/changelog.d/16435.misc new file mode 100644 index 0000000000..e541607161 --- /dev/null +++ b/changelog.d/16435.misc @@ -0,0 +1 @@ +Remove unused method. diff --git a/changelog.d/16438.misc b/changelog.d/16438.misc new file mode 100644 index 0000000000..bd7cdd42af --- /dev/null +++ b/changelog.d/16438.misc @@ -0,0 +1 @@ +Reduce memory allocations. diff --git a/changelog.d/16441.misc b/changelog.d/16441.misc new file mode 100644 index 0000000000..32264a62b2 --- /dev/null +++ b/changelog.d/16441.misc @@ -0,0 +1 @@ +Improve rate limiting logic. diff --git a/changelog.d/16454.misc b/changelog.d/16454.misc new file mode 100644 index 0000000000..1e75dc436f --- /dev/null +++ b/changelog.d/16454.misc @@ -0,0 +1 @@ +Do not block running of CI behind the check for sign-off on PRs. diff --git a/changelog.d/16455.bugfix b/changelog.d/16455.bugfix new file mode 100644 index 0000000000..653a25d3b6 --- /dev/null +++ b/changelog.d/16455.bugfix @@ -0,0 +1 @@ +Prevent the purging of large rooms from timing out when Postgres is in use. The timeout which causes this issue was introduced in Synapse 1.88.0. diff --git a/changelog.d/16457.bugfix b/changelog.d/16457.bugfix new file mode 100644 index 0000000000..b9a95cc510 --- /dev/null +++ b/changelog.d/16457.bugfix @@ -0,0 +1 @@ +Improve the performance of purging rooms, particularly encrypted rooms. diff --git a/docs/development/database_schema.md b/docs/development/database_schema.md index 675080ae1b..37a06acc12 100644 --- a/docs/development/database_schema.md +++ b/docs/development/database_schema.md @@ -150,6 +150,67 @@ def run_upgrade( ... ``` +## Background updates + +It is sometimes appropriate to perform database migrations as part of a background +process (instead of blocking Synapse until the migration is done). In particular, +this is useful for migrating data when adding new columns or tables. + +Pending background updates stored in the `background_updates` table and are denoted +by a unique name, the current status (stored in JSON), and some dependency information: + +* Whether the update requires a previous update to be complete. +* A rough ordering for which to complete updates. + +A new background updates needs to be added to the `background_updates` table: + +```sql +INSERT INTO background_updates (ordering, update_name, depends_on, progress_json) VALUES + (7706, 'my_background_update', 'a_previous_background_update' '{}'); +``` + +And then needs an associated handler in the appropriate datastore: + +```python +self.db_pool.updates.register_background_update_handler( + "my_background_update", + update_handler=self._my_background_update, +) +``` + +There are a few types of updates that can be performed, see the `BackgroundUpdater`: + +* `register_background_update_handler`: A generic handler for custom SQL +* `register_background_index_update`: Create an index in the background +* `register_background_validate_constraint`: Validate a constraint in the background + (PostgreSQL-only) +* `register_background_validate_constraint_and_delete_rows`: Similar to + `register_background_validate_constraint`, but deletes rows which don't fit + the constraint. + +For `register_background_update_handler`, the generic handler must track progress +and then finalize the background update: + +```python +async def _my_background_update(self, progress: JsonDict, batch_size: int) -> int: + def _do_something(txn: LoggingTransaction) -> int: + ... + self.db_pool.updates._background_update_progress_txn( + txn, "my_background_update", {"last_processed": last_processed} + ) + return last_processed - prev_last_processed + + num_processed = await self.db_pool.runInteraction("_do_something", _do_something) + await self.db_pool.updates._end_background_update("my_background_update") + + return num_processed +``` + +Synapse will attempt to rate-limit how often background updates are run via the +given batch-size and the returned number of processed entries (and how long the +function took to run). See +[background update controller callbacks](../modules/background_update_controller_callbacks.md). + ## Boolean columns Boolean columns require special treatment, since SQLite treats booleans the diff --git a/mypy.ini b/mypy.ini index 88aea301b9..fdfe9432fc 100644 --- a/mypy.ini +++ b/mypy.ini @@ -32,6 +32,7 @@ files = docker/, scripts-dev/, synapse/, + synmark/, tests/, build_rust.py @@ -80,6 +81,9 @@ ignore_missing_imports = True [mypy-pympler.*] ignore_missing_imports = True +[mypy-pyperf.*] +ignore_missing_imports = True + [mypy-rust_python_jaeger_reporter.*] ignore_missing_imports = True diff --git a/poetry.lock b/poetry.lock index 13884e6698..a1a2b83764 100644 --- a/poetry.lock +++ b/poetry.lock @@ -208,13 +208,13 @@ uvloop = ["uvloop (>=0.15.2)"] [[package]] name = "bleach" -version = "6.0.0" +version = "6.1.0" description = "An easy safelist-based HTML-sanitizing tool." optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "bleach-6.0.0-py3-none-any.whl", hash = "sha256:33c16e3353dbd13028ab4799a0f89a83f113405c766e9c122df8a06f5b85b3f4"}, - {file = "bleach-6.0.0.tar.gz", hash = "sha256:1a1a85c1595e07d8db14c5f09f09e6433502c51c595970edc090551f0db99414"}, + {file = "bleach-6.1.0-py3-none-any.whl", hash = "sha256:3225f354cfc436b9789c66c4ee030194bee0568fbf9cbdad3bc8b5c26c5f12b6"}, + {file = "bleach-6.1.0.tar.gz", hash = "sha256:0a31f1837963c41d46bbf1331b8778e1308ea0791db03cc4e7357b97cf42a8fe"}, ] [package.dependencies] @@ -222,7 +222,7 @@ six = ">=1.9.0" webencodings = "*" [package.extras] -css = ["tinycss2 (>=1.1.0,<1.2)"] +css = ["tinycss2 (>=1.1.0,<1.3)"] [[package]] name = "canonicaljson" @@ -1557,13 +1557,13 @@ testing-docutils = ["pygments", "pytest (>=7,<8)", "pytest-param-files (>=0.3.4, [[package]] name = "netaddr" -version = "0.8.0" +version = "0.9.0" description = "A network address manipulation library for Python" optional = false python-versions = "*" files = [ - {file = "netaddr-0.8.0-py2.py3-none-any.whl", hash = "sha256:9666d0232c32d2656e5e5f8d735f58fd6c7457ce52fc21c98d45f2af78f990ac"}, - {file = "netaddr-0.8.0.tar.gz", hash = "sha256:d6cc57c7a07b1d9d2e917aa8b36ae8ce61c35ba3fcd1b83ca31c5a0ee2b5a243"}, + {file = "netaddr-0.9.0-py3-none-any.whl", hash = "sha256:5148b1055679d2a1ec070c521b7db82137887fabd6d7e37f5199b44f775c3bb1"}, + {file = "netaddr-0.9.0.tar.gz", hash = "sha256:7b46fa9b1a2d71fd5de9e4a3784ef339700a53a08c8040f08baf5f1194da0128"}, ] [[package]] @@ -1749,22 +1749,22 @@ twisted = ["twisted"] [[package]] name = "psycopg2" -version = "2.9.8" +version = "2.9.9" description = "psycopg2 - Python-PostgreSQL Database Adapter" optional = true -python-versions = ">=3.6" +python-versions = ">=3.7" files = [ - {file = "psycopg2-2.9.8-cp310-cp310-win32.whl", hash = "sha256:2f8594f92bbb5d8b59ffec04e2686c416401e2d4297de1193f8e75235937e71d"}, - {file = "psycopg2-2.9.8-cp310-cp310-win_amd64.whl", hash = "sha256:f9ecbf504c4eaff90139d5c9b95d47275f2b2651e14eba56392b4041fbf4c2b3"}, - {file = "psycopg2-2.9.8-cp311-cp311-win32.whl", hash = "sha256:65f81e72136d8b9ac8abf5206938d60f50da424149a43b6073f1546063c0565e"}, - {file = "psycopg2-2.9.8-cp311-cp311-win_amd64.whl", hash = "sha256:f7e62095d749359b7854143843f27edd7dccfcd3e1d833b880562aa5702d92b0"}, - {file = "psycopg2-2.9.8-cp37-cp37m-win32.whl", hash = "sha256:81b21424023a290a40884c7f8b0093ba6465b59bd785c18f757e76945f65594c"}, - {file = "psycopg2-2.9.8-cp37-cp37m-win_amd64.whl", hash = "sha256:67c2f32f3aba79afb15799575e77ee2db6b46b8acf943c21d34d02d4e1041d50"}, - {file = "psycopg2-2.9.8-cp38-cp38-win32.whl", hash = "sha256:287a64ef168ef7fb9f382964705ff664b342bfff47e7242bf0a04ef203269dd5"}, - {file = "psycopg2-2.9.8-cp38-cp38-win_amd64.whl", hash = "sha256:dcde3cad4920e29e74bf4e76c072649764914facb2069e6b7fa1ddbebcd49e9f"}, - {file = "psycopg2-2.9.8-cp39-cp39-win32.whl", hash = "sha256:d4ad050ea50a16731d219c3a85e8f2debf49415a070f0b8331ccc96c81700d9b"}, - {file = "psycopg2-2.9.8-cp39-cp39-win_amd64.whl", hash = "sha256:d39bb3959788b2c9d7bf5ff762e29f436172b241cd7b47529baac77746fd7918"}, - {file = "psycopg2-2.9.8.tar.gz", hash = "sha256:3da6488042a53b50933244085f3f91803f1b7271f970f3e5536efa69314f6a49"}, + {file = "psycopg2-2.9.9-cp310-cp310-win32.whl", hash = "sha256:38a8dcc6856f569068b47de286b472b7c473ac7977243593a288ebce0dc89516"}, + {file = "psycopg2-2.9.9-cp310-cp310-win_amd64.whl", hash = "sha256:426f9f29bde126913a20a96ff8ce7d73fd8a216cfb323b1f04da402d452853c3"}, + {file = "psycopg2-2.9.9-cp311-cp311-win32.whl", hash = "sha256:ade01303ccf7ae12c356a5e10911c9e1c51136003a9a1d92f7aa9d010fb98372"}, + {file = "psycopg2-2.9.9-cp311-cp311-win_amd64.whl", hash = "sha256:121081ea2e76729acfb0673ff33755e8703d45e926e416cb59bae3a86c6a4981"}, + {file = "psycopg2-2.9.9-cp37-cp37m-win32.whl", hash = "sha256:5e0d98cade4f0e0304d7d6f25bbfbc5bd186e07b38eac65379309c4ca3193efa"}, + {file = "psycopg2-2.9.9-cp37-cp37m-win_amd64.whl", hash = "sha256:7e2dacf8b009a1c1e843b5213a87f7c544b2b042476ed7755be813eaf4e8347a"}, + {file = "psycopg2-2.9.9-cp38-cp38-win32.whl", hash = "sha256:ff432630e510709564c01dafdbe996cb552e0b9f3f065eb89bdce5bd31fabf4c"}, + {file = "psycopg2-2.9.9-cp38-cp38-win_amd64.whl", hash = "sha256:bac58c024c9922c23550af2a581998624d6e02350f4ae9c5f0bc642c633a2d5e"}, + {file = "psycopg2-2.9.9-cp39-cp39-win32.whl", hash = "sha256:c92811b2d4c9b6ea0285942b2e7cac98a59e166d59c588fe5cfe1eda58e72d59"}, + {file = "psycopg2-2.9.9-cp39-cp39-win_amd64.whl", hash = "sha256:de80739447af31525feddeb8effd640782cf5998e1a4e9192ebdf829717e3913"}, + {file = "psycopg2-2.9.9.tar.gz", hash = "sha256:d1454bde93fb1e224166811694d600e746430c006fbb031ea06ecc2ea41bf156"}, ] [[package]] @@ -2427,28 +2427,28 @@ files = [ [[package]] name = "ruff" -version = "0.0.290" +version = "0.0.292" description = "An extremely fast Python linter, written in Rust." optional = false python-versions = ">=3.7" files = [ - {file = "ruff-0.0.290-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:0e2b09ac4213b11a3520221083866a5816616f3ae9da123037b8ab275066fbac"}, - {file = "ruff-0.0.290-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:4ca6285aa77b3d966be32c9a3cd531655b3d4a0171e1f9bf26d66d0372186767"}, - {file = "ruff-0.0.290-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:35e3550d1d9f2157b0fcc77670f7bb59154f223bff281766e61bdd1dd854e0c5"}, - {file = "ruff-0.0.290-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:d748c8bd97874f5751aed73e8dde379ce32d16338123d07c18b25c9a2796574a"}, - {file = "ruff-0.0.290-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:982af5ec67cecd099e2ef5e238650407fb40d56304910102d054c109f390bf3c"}, - {file = "ruff-0.0.290-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:bbd37352cea4ee007c48a44c9bc45a21f7ba70a57edfe46842e346651e2b995a"}, - {file = "ruff-0.0.290-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:1d9be6351b7889462912e0b8185a260c0219c35dfd920fb490c7f256f1d8313e"}, - {file = "ruff-0.0.290-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:75cdc7fe32dcf33b7cec306707552dda54632ac29402775b9e212a3c16aad5e6"}, - {file = "ruff-0.0.290-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:eb07f37f7aecdbbc91d759c0c09870ce0fb3eed4025eebedf9c4b98c69abd527"}, - {file = "ruff-0.0.290-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:2ab41bc0ba359d3f715fc7b705bdeef19c0461351306b70a4e247f836b9350ed"}, - {file = "ruff-0.0.290-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:150bf8050214cea5b990945b66433bf9a5e0cef395c9bc0f50569e7de7540c86"}, - {file = "ruff-0.0.290-py3-none-musllinux_1_2_i686.whl", hash = "sha256:75386ebc15fe5467248c039f5bf6a0cfe7bfc619ffbb8cd62406cd8811815fca"}, - {file = "ruff-0.0.290-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:ac93eadf07bc4ab4c48d8bb4e427bf0f58f3a9c578862eb85d99d704669f5da0"}, - {file = "ruff-0.0.290-py3-none-win32.whl", hash = "sha256:461fbd1fb9ca806d4e3d5c745a30e185f7cf3ca77293cdc17abb2f2a990ad3f7"}, - {file = "ruff-0.0.290-py3-none-win_amd64.whl", hash = "sha256:f1f49f5ec967fd5778813780b12a5650ab0ebcb9ddcca28d642c689b36920796"}, - {file = "ruff-0.0.290-py3-none-win_arm64.whl", hash = "sha256:ae5a92dfbdf1f0c689433c223f8dac0782c2b2584bd502dfdbc76475669f1ba1"}, - {file = "ruff-0.0.290.tar.gz", hash = "sha256:949fecbc5467bb11b8db810a7fa53c7e02633856ee6bd1302b2f43adcd71b88d"}, + {file = "ruff-0.0.292-py3-none-macosx_10_7_x86_64.whl", hash = "sha256:02f29db018c9d474270c704e6c6b13b18ed0ecac82761e4fcf0faa3728430c96"}, + {file = "ruff-0.0.292-py3-none-macosx_10_9_x86_64.macosx_11_0_arm64.macosx_10_9_universal2.whl", hash = "sha256:69654e564342f507edfa09ee6897883ca76e331d4bbc3676d8a8403838e9fade"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:6c3c91859a9b845c33778f11902e7b26440d64b9d5110edd4e4fa1726c41e0a4"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:f4476f1243af2d8c29da5f235c13dca52177117935e1f9393f9d90f9833f69e4"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:be8eb50eaf8648070b8e58ece8e69c9322d34afe367eec4210fdee9a555e4ca7"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:9889bac18a0c07018aac75ef6c1e6511d8411724d67cb879103b01758e110a81"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6bdfabd4334684a4418b99b3118793f2c13bb67bf1540a769d7816410402a205"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:aa7c77c53bfcd75dbcd4d1f42d6cabf2485d2e1ee0678da850f08e1ab13081a8"}, + {file = "ruff-0.0.292-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8e087b24d0d849c5c81516ec740bf4fd48bf363cfb104545464e0fca749b6af9"}, + {file = "ruff-0.0.292-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:f160b5ec26be32362d0774964e218f3fcf0a7da299f7e220ef45ae9e3e67101a"}, + {file = "ruff-0.0.292-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:ac153eee6dd4444501c4bb92bff866491d4bfb01ce26dd2fff7ca472c8df9ad0"}, + {file = "ruff-0.0.292-py3-none-musllinux_1_2_i686.whl", hash = "sha256:87616771e72820800b8faea82edd858324b29bb99a920d6aa3d3949dd3f88fb0"}, + {file = "ruff-0.0.292-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:b76deb3bdbea2ef97db286cf953488745dd6424c122d275f05836c53f62d4016"}, + {file = "ruff-0.0.292-py3-none-win32.whl", hash = "sha256:e854b05408f7a8033a027e4b1c7f9889563dd2aca545d13d06711e5c39c3d003"}, + {file = "ruff-0.0.292-py3-none-win_amd64.whl", hash = "sha256:f27282bedfd04d4c3492e5c3398360c9d86a295be00eccc63914438b4ac8a83c"}, + {file = "ruff-0.0.292-py3-none-win_arm64.whl", hash = "sha256:7f67a69c8f12fbc8daf6ae6d36705037bde315abf8b82b6e1f4c9e74eb750f68"}, + {file = "ruff-0.0.292.tar.gz", hash = "sha256:1093449e37dd1e9b813798f6ad70932b57cf614e5c2b5c51005bf67d55db33ac"}, ] [[package]] @@ -3037,13 +3037,13 @@ twisted = "*" [[package]] name = "types-bleach" -version = "6.0.0.4" +version = "6.1.0.0" description = "Typing stubs for bleach" optional = false -python-versions = "*" +python-versions = ">=3.7" files = [ - {file = "types-bleach-6.0.0.4.tar.gz", hash = "sha256:357b0226f65c4f20ab3b13ca8d78a6b91c78aad256d8ec168d4e90fc3303ebd4"}, - {file = "types_bleach-6.0.0.4-py3-none-any.whl", hash = "sha256:2b8767eb407c286b7f02803678732e522e04db8d56cbc9f1270bee49627eae92"}, + {file = "types-bleach-6.1.0.0.tar.gz", hash = "sha256:3cf0e55d4618890a00af1151f878b2e2a7a96433850b74e12bede7663d774532"}, + {file = "types_bleach-6.1.0.0-py3-none-any.whl", hash = "sha256:f0bc75d0f6475036ac69afebf37c41d116dfba78dae55db80437caf0fcd35c28"}, ] [[package]] @@ -3444,4 +3444,4 @@ user-search = ["pyicu"] [metadata] lock-version = "2.0" python-versions = "^3.8.0" -content-hash = "364c309486e9d93d4da8a1a3784d5ecd7d2a9734cf84dcd4a991f2cd54f0b5b5" +content-hash = "a08543c65f18cc7e9dea648e89c18ab88fc1747aa2e029aa208f777fc3db06dd" diff --git a/pyproject.toml b/pyproject.toml index 672dfa8a8f..0831510890 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -321,7 +321,7 @@ all = [ # This helps prevents merge conflicts when running a batch of dependabot updates. isort = ">=5.10.1" black = ">=22.7.0" -ruff = "0.0.290" +ruff = "0.0.292" # Type checking only works with the pydantic.v1 compat module from pydantic v2 pydantic = "^2" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 26403d58cc..f62da35a6f 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -25,14 +25,14 @@ name = "synapse.synapse_rust" anyhow = "1.0.63" lazy_static = "1.4.0" log = "0.4.17" -pyo3 = { version = "0.17.1", features = [ +pyo3 = { version = "0.19.2", features = [ "macros", "anyhow", "abi3", - "abi3-py37", + "abi3-py38", ] } pyo3-log = "0.8.1" -pythonize = "0.17.0" +pythonize = "0.19.0" regex = "1.6.0" serde = { version = "1.0.144", features = ["derive"] } serde_json = "1.0.85" diff --git a/rust/src/push/evaluator.rs b/rust/src/push/evaluator.rs index 48e670478b..3bde075528 100644 --- a/rust/src/push/evaluator.rs +++ b/rust/src/push/evaluator.rs @@ -105,6 +105,17 @@ impl PushRuleEvaluator { /// Create a new `PushRuleEvaluator`. See struct docstring for details. #[allow(clippy::too_many_arguments)] #[new] + #[pyo3(signature = ( + flattened_keys, + has_mentions, + room_member_count, + sender_power_level, + notification_power_levels, + related_events_flattened, + related_event_match_enabled, + room_version_feature_flags, + msc3931_enabled, + ))] pub fn py_new( flattened_keys: BTreeMap<String, JsonValue>, has_mentions: bool, diff --git a/synapse/api/auth/internal.py b/synapse/api/auth/internal.py index a75f6f2cc4..36ee9c8b8f 100644 --- a/synapse/api/auth/internal.py +++ b/synapse/api/auth/internal.py @@ -115,7 +115,7 @@ class InternalAuth(BaseAuth): Once get_user_by_req has set up the opentracing span, this does the actual work. """ try: - ip_addr = request.getClientAddress().host + ip_addr = request.get_client_ip_if_available() user_agent = get_request_user_agent(request) access_token = self.get_access_token_from_request(request) diff --git a/synapse/api/presence.py b/synapse/api/presence.py index b78f419994..afef6712e1 100644 --- a/synapse/api/presence.py +++ b/synapse/api/presence.py @@ -80,10 +80,6 @@ class UserPresenceState: def as_dict(self) -> JsonDict: return attr.asdict(self) - @staticmethod - def from_dict(d: JsonDict) -> "UserPresenceState": - return UserPresenceState(**d) - def copy_and_replace(self, **kwargs: Any) -> "UserPresenceState": return attr.evolve(self, **kwargs) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c8bc46415d..1a7fa175ec 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1402,7 +1402,7 @@ class FederationClient(FederationBase): The remote homeserver return some state from the room. The response dictionary is in the form: - {"knock_state_events": [<state event dict>, ...]} + {"knock_room_state": [<state event dict>, ...]} The list of state events may be empty. @@ -1429,7 +1429,7 @@ class FederationClient(FederationBase): The remote homeserver can optionally return some state from the room. The response dictionary is in the form: - {"knock_state_events": [<state event dict>, ...]} + {"knock_room_state": [<state event dict>, ...]} The list of state events may be empty. """ diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index ec8e770430..6ac8d16095 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -850,14 +850,7 @@ class FederationServer(FederationBase): context, self._room_prejoin_state_types ) ) - return { - "knock_room_state": stripped_room_state, - # Since v1.37, Synapse incorrectly used "knock_state_events" for this field. - # Thus, we also populate a 'knock_state_events' with the same content to - # support old instances. - # See https://github.com/matrix-org/synapse/issues/14088. - "knock_state_events": stripped_room_state, - } + return {"knock_room_state": stripped_room_state} async def _on_send_membership_event( self, origin: str, content: JsonDict, membership_type: str, room_id: str diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 6520795635..525968bcba 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -395,7 +395,7 @@ class PresenceDestinationsRow(BaseFederationRow): @staticmethod def from_data(data: JsonDict) -> "PresenceDestinationsRow": return PresenceDestinationsRow( - state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"] + state=UserPresenceState(**data["state"]), destinations=data["dests"] ) def to_data(self) -> JsonDict: diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index fb20fd8a10..7b6b1da090 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -67,7 +67,7 @@ The loop continues so long as there is anything to send. At each iteration of th When the `PerDestinationQueue` has the catch-up flag set, the *Catch-Up Transmission Loop* (`_catch_up_transmission_loop`) is used in lieu of the regular `_transaction_transmission_loop`. -(Only once the catch-up mode has been exited can the regular tranaction transmission behaviour +(Only once the catch-up mode has been exited can the regular transaction transmission behaviour be resumed.) *Catch-Up Mode*, entered upon Synapse startup or once a homeserver has fallen behind due to diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index b5e4b2680e..fab4800717 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -431,7 +431,7 @@ class TransportLayerClient: The remote homeserver can optionally return some state from the room. The response dictionary is in the form: - {"knock_state_events": [<state event dict>, ...]} + {"knock_room_state": [<state event dict>, ...]} The list of state events may be empty. """ diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index ba9704a065..97fd1fd427 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -171,8 +171,8 @@ class AdminHandler: else: stream_ordering = room.stream_ordering - from_key = RoomStreamToken(0, 0) - to_key = RoomStreamToken(None, stream_ordering) + from_key = RoomStreamToken(topological=0, stream=0) + to_key = RoomStreamToken(stream=stream_ordering) # Events that we've processed in this room written_events: Set[str] = set() diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 7de7bd3289..c200a45f3a 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -216,7 +216,7 @@ class ApplicationServicesHandler: def notify_interested_services_ephemeral( self, - stream_key: str, + stream_key: StreamKeyType, new_token: Union[int, RoomStreamToken], users: Collection[Union[str, UserID]], ) -> None: @@ -326,7 +326,7 @@ class ApplicationServicesHandler: async def _notify_interested_services_ephemeral( self, services: List[ApplicationService], - stream_key: str, + stream_key: StreamKeyType, new_token: int, users: Collection[Union[str, UserID]], ) -> None: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 86ad96d030..50df4f2b06 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -845,7 +845,6 @@ class DeviceHandler(DeviceWorkerHandler): else: assert max_stream_id == stream_id # Avoid moving `room_id` backwards. - pass if self._handle_new_device_update_new_data: continue diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 29cd45550a..9d72794e8b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -868,19 +868,10 @@ class FederationHandler: # This is a bit of a hack and is cribbing off of invites. Basically we # store the room state here and retrieve it again when this event appears # in the invitee's sync stream. It is stripped out for all other local users. - stripped_room_state = ( - knock_response.get("knock_room_state") - # Since v1.37, Synapse incorrectly used "knock_state_events" for this field. - # Thus, we also check for a 'knock_state_events' to support old instances. - # See https://github.com/matrix-org/synapse/issues/14088. - or knock_response.get("knock_state_events") - ) + stripped_room_state = knock_response.get("knock_room_state") if stripped_room_state is None: - raise KeyError( - "Missing 'knock_room_state' (or legacy 'knock_state_events') field in " - "send_knock response" - ) + raise KeyError("Missing 'knock_room_state' field in send_knock response") event.unsigned["knock_room_state"] = stripped_room_state @@ -1506,7 +1497,6 @@ class FederationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass else: destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)} @@ -1582,7 +1572,6 @@ class FederationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass async def add_display_name_to_third_party_invite( self, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 5737f8014d..c34bd7db95 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -192,8 +192,7 @@ class InitialSyncHandler: ) elif event.membership == Membership.LEAVE: room_end_token = RoomStreamToken( - None, - event.stream_ordering, + stream=event.stream_ordering, ) deferred_room_state = run_in_background( self._state_storage_controller.get_state_for_events, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 44dbbf81dd..8de4b8e816 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -693,13 +693,9 @@ class EventCreationHandler: if require_consent and not is_exempt: await self.assert_accepted_privacy_policy(requester) - # Save the access token ID, the device ID and the transaction ID in the event - # internal metadata. This is useful to determine if we should echo the - # transaction_id in events. + # Save the the device ID and the transaction ID in the event internal metadata. + # This is useful to determine if we should echo the transaction_id in events. # See `synapse.events.utils.EventClientSerializer.serialize_event` - if requester.access_token_id is not None: - builder.internal_metadata.token_id = requester.access_token_id - if requester.device_id is not None: builder.internal_metadata.device_id = requester.device_id @@ -1133,7 +1129,6 @@ class EventCreationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # we know it was persisted, so must have a stream ordering assert ev.internal_metadata.stream_ordering @@ -2038,7 +2033,6 @@ class EventCreationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass return True except AuthError: logger.info( diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py index 7ed88a3611..87b428ab1c 100644 --- a/synapse/handlers/push_rules.py +++ b/synapse/handlers/push_rules.py @@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, UnrecognizedRequestError from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.push_rule import RuleNotFoundException from synapse.synapse_rust.push import get_base_rule_ids -from synapse.types import JsonDict, UserID +from synapse.types import JsonDict, StreamKeyType, UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -114,7 +114,9 @@ class PushRulesHandler: user_id: the user ID the change is for. """ stream_id = self._main_store.get_max_push_rules_stream_id() - self._notifier.on_new_event("push_rules_key", stream_id, users=[user_id]) + self._notifier.on_new_event( + StreamKeyType.PUSH_RULES, stream_id, users=[user_id] + ) async def push_rules_for_user( self, user: UserID diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index a7a29b758b..69ac468f75 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -130,11 +130,10 @@ class ReceiptsHandler: async def _handle_new_receipts(self, receipts: List[ReadReceipt]) -> bool: """Takes a list of receipts, stores them and informs the notifier.""" - min_batch_id: Optional[int] = None - max_batch_id: Optional[int] = None + receipts_persisted: List[ReadReceipt] = [] for receipt in receipts: - res = await self.store.insert_receipt( + stream_id = await self.store.insert_receipt( receipt.room_id, receipt.receipt_type, receipt.user_id, @@ -143,30 +142,26 @@ class ReceiptsHandler: receipt.data, ) - if not res: - # res will be None if this receipt is 'old' + if stream_id is None: + # stream_id will be None if this receipt is 'old' continue - stream_id, max_persisted_id = res + receipts_persisted.append(receipt) - if min_batch_id is None or stream_id < min_batch_id: - min_batch_id = stream_id - if max_batch_id is None or max_persisted_id > max_batch_id: - max_batch_id = max_persisted_id - - # Either both of these should be None or neither. - if min_batch_id is None or max_batch_id is None: + if not receipts_persisted: # no new receipts return False - affected_room_ids = list({r.room_id for r in receipts}) + max_batch_id = self.store.get_max_receipt_stream_id() + + affected_room_ids = list({r.room_id for r in receipts_persisted}) self.notifier.on_new_event( StreamKeyType.RECEIPT, max_batch_id, rooms=affected_room_ids ) # Note that the min here shouldn't be relied upon to be accurate. await self.hs.get_pusherpool().on_new_receipts( - min_batch_id, max_batch_id, affected_room_ids + {r.user_id for r in receipts_persisted} ) return True diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index a0c3b16819..97c9f01245 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -261,7 +261,6 @@ class RoomCreationHandler: # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # This is to satisfy mypy and should never happen raise PartialStateConflictError() @@ -1708,7 +1707,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): if from_key.topological: logger.warning("Stream has topological part!!!! %r", from_key) - from_key = RoomStreamToken(None, from_key.stream) + from_key = RoomStreamToken(stream=from_key.stream) app_service = self.store.get_app_service_by_user_id(user.to_string()) if app_service: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 90343c2306..130eee7e1d 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -382,8 +382,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): and persist a new event for the new membership change. Args: - requester: - target: + requester: User requesting the membership change, i.e. the sender of the + desired membership event. + target: Use whose membership should change, i.e. the state_key of the + desired membership event. room_id: membership: @@ -415,7 +417,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): Returns: Tuple of event ID and stream ordering position """ - user_id = target.to_string() if content is None: @@ -475,21 +476,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): (EventTypes.Member, user_id), None ) - if event.membership == Membership.JOIN: - newly_joined = True - if prev_member_event_id: - prev_member_event = await self.store.get_event( - prev_member_event_id - ) - newly_joined = prev_member_event.membership != Membership.JOIN - - # Only rate-limit if the user actually joined the room, otherwise we'll end - # up blocking profile updates. - if newly_joined and ratelimit: - await self._join_rate_limiter_local.ratelimit(requester) - await self._join_rate_per_room_limiter.ratelimit( - requester, key=room_id, update=False - ) with opentracing.start_active_span("handle_new_client_event"): result_event = ( await self.event_creation_handler.handle_new_client_event( @@ -514,7 +500,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # we know it was persisted, so should have a stream ordering assert result_event.internal_metadata.stream_ordering @@ -618,6 +603,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): Raises: ShadowBanError if a shadow-banned requester attempts to send an invite. """ + if ratelimit: + if action == Membership.JOIN: + # Only rate-limit if the user isn't already joined to the room, otherwise + # we'll end up blocking profile updates. + ( + current_membership, + _, + ) = await self.store.get_local_current_membership_for_user_in_room( + requester.user.to_string(), + room_id, + ) + if current_membership != Membership.JOIN: + await self._join_rate_limiter_local.ratelimit(requester) + await self._join_rate_per_room_limiter.ratelimit( + requester, key=room_id, update=False + ) + elif action == Membership.INVITE: + await self.ratelimit_invite(requester, room_id, target.to_string()) + if action == Membership.INVITE and requester.shadow_banned: # We randomly sleep a bit just to annoy the requester. await self.clock.sleep(random.randint(1, 10)) @@ -794,8 +798,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): if effective_membership_state == Membership.INVITE: target_id = target.to_string() - if ratelimit: - await self.ratelimit_invite(requester, room_id, target_id) # block any attempts to invite the server notices mxid if target_id == self._server_notices_mxid: @@ -2002,7 +2004,6 @@ class RoomMemberMasterHandler(RoomMemberHandler): # in the meantime and context needs to be recomputed, so let's do so. if i == max_retries - 1: raise e - pass # we know it was persisted, so must have a stream ordering assert result_event.internal_metadata.stream_ordering diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7bd42f635f..744e080309 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -2333,7 +2333,7 @@ class SyncHandler: continue leave_token = now_token.copy_and_replace( - StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering) + StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering) ) room_entries.append( RoomSyncResultBuilder( diff --git a/synapse/http/server.py b/synapse/http/server.py index 3bbf91298e..1e4e56f36b 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -266,7 +266,7 @@ class HttpServer(Protocol): def register_paths( self, method: str, - path_patterns: Iterable[Pattern], + path_patterns: Iterable[Pattern[str]], callback: ServletCallback, servlet_classname: str, ) -> None: diff --git a/synapse/media/_base.py b/synapse/media/_base.py index 80c448de2b..13345acf75 100644 --- a/synapse/media/_base.py +++ b/synapse/media/_base.py @@ -26,11 +26,11 @@ from twisted.internet.interfaces import IConsumer from twisted.protocols.basic import FileSender from twisted.web.server import Request -from synapse.api.errors import Codes, SynapseError, cs_error +from synapse.api.errors import Codes, cs_error from synapse.http.server import finish_request, respond_with_json from synapse.http.site import SynapseRequest from synapse.logging.context import make_deferred_yieldable -from synapse.util.stringutils import is_ascii, parse_and_validate_server_name +from synapse.util.stringutils import is_ascii logger = logging.getLogger(__name__) @@ -84,52 +84,12 @@ INLINE_CONTENT_TYPES = [ ] -def parse_media_id(request: Request) -> Tuple[str, str, Optional[str]]: - """Parses the server name, media ID and optional file name from the request URI - - Also performs some rough validation on the server name. - - Args: - request: The `Request`. - - Returns: - A tuple containing the parsed server name, media ID and optional file name. - - Raises: - SynapseError(404): if parsing or validation fail for any reason - """ - try: - # The type on postpath seems incorrect in Twisted 21.2.0. - postpath: List[bytes] = request.postpath # type: ignore - assert postpath - - # This allows users to append e.g. /test.png to the URL. Useful for - # clients that parse the URL to see content type. - server_name_bytes, media_id_bytes = postpath[:2] - server_name = server_name_bytes.decode("utf-8") - media_id = media_id_bytes.decode("utf8") - - # Validate the server name, raising if invalid - parse_and_validate_server_name(server_name) - - file_name = None - if len(postpath) > 2: - try: - file_name = urllib.parse.unquote(postpath[-1].decode("utf-8")) - except UnicodeDecodeError: - pass - return server_name, media_id, file_name - except Exception: - raise SynapseError( - 404, "Invalid media id token %r" % (request.postpath,), Codes.UNKNOWN - ) - - def respond_404(request: SynapseRequest) -> None: + assert request.path is not None respond_with_json( request, 404, - cs_error("Not found %r" % (request.postpath,), code=Codes.NOT_FOUND), + cs_error("Not found '%s'" % (request.path.decode(),), code=Codes.NOT_FOUND), send_cors=True, ) @@ -372,7 +332,7 @@ class ThumbnailInfo: # Content type of thumbnail, e.g. image/png type: str # The size of the media file, in bytes. - length: Optional[int] = None + length: int @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py index 1b7b014f9a..7fd46901f7 100644 --- a/synapse/media/media_repository.py +++ b/synapse/media/media_repository.py @@ -48,6 +48,7 @@ from synapse.media.filepath import MediaFilePaths from synapse.media.media_storage import MediaStorage from synapse.media.storage_provider import StorageProviderWrapper from synapse.media.thumbnailer import Thumbnailer, ThumbnailError +from synapse.media.url_previewer import UrlPreviewer from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID from synapse.util.async_helpers import Linearizer @@ -114,7 +115,7 @@ class MediaRepository: ) storage_providers.append(provider) - self.media_storage = MediaStorage( + self.media_storage: MediaStorage = MediaStorage( self.hs, self.primary_base_path, self.filepaths, storage_providers ) @@ -142,6 +143,13 @@ class MediaRepository: MEDIA_RETENTION_CHECK_PERIOD_MS, ) + if hs.config.media.url_preview_enabled: + self.url_previewer: Optional[UrlPreviewer] = UrlPreviewer( + hs, self, self.media_storage + ) + else: + self.url_previewer = None + def _start_update_recently_accessed(self) -> Deferred: return run_as_background_process( "update_recently_accessed_media", self._update_recently_accessed @@ -616,6 +624,7 @@ class MediaRepository: height=t_height, method=t_method, type=t_type, + length=t_byte_source.tell(), ), ) @@ -686,6 +695,7 @@ class MediaRepository: height=t_height, method=t_method, type=t_type, + length=t_byte_source.tell(), ), ) @@ -831,6 +841,7 @@ class MediaRepository: height=t_height, method=t_method, type=t_type, + length=t_byte_source.tell(), ), ) diff --git a/synapse/notifier.py b/synapse/notifier.py index fc39e5c963..99e7715896 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -126,7 +126,7 @@ class _NotifierUserStream: def notify( self, - stream_key: str, + stream_key: StreamKeyType, stream_id: Union[int, RoomStreamToken], time_now_ms: int, ) -> None: @@ -454,7 +454,7 @@ class Notifier: def on_new_event( self, - stream_key: str, + stream_key: StreamKeyType, new_token: Union[int, RoomStreamToken], users: Optional[Collection[Union[str, UserID]]] = None, rooms: Optional[StrCollection] = None, @@ -655,30 +655,29 @@ class Notifier: events: List[Union[JsonDict, EventBase]] = [] end_token = from_token - for name, source in self.event_sources.sources.get_sources(): - keyname = "%s_key" % name - before_id = getattr(before_token, keyname) - after_id = getattr(after_token, keyname) + for keyname, source in self.event_sources.sources.get_sources(): + before_id = before_token.get_field(keyname) + after_id = after_token.get_field(keyname) if before_id == after_id: continue new_events, new_key = await source.get_new_events( user=user, - from_key=getattr(from_token, keyname), + from_key=from_token.get_field(keyname), limit=limit, is_guest=is_peeking, room_ids=room_ids, explicit_room_id=explicit_room_id, ) - if name == "room": + if keyname == StreamKeyType.ROOM: new_events = await filter_events_for_client( self._storage_controllers, user.to_string(), new_events, is_peeking=is_peeking, ) - elif name == "presence": + elif keyname == StreamKeyType.PRESENCE: now = self.clock.time_msec() new_events[:] = [ { diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 9e3a98741a..4d405f2a0c 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -101,7 +101,7 @@ if TYPE_CHECKING: class PusherConfig: """Parameters necessary to configure a pusher.""" - id: Optional[str] + id: Optional[int] user_name: str profile_tag: str @@ -182,7 +182,7 @@ class Pusher(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: + def on_new_receipts(self) -> None: raise NotImplementedError() @abc.abstractmethod diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 1710dd51b9..cf45fd09a8 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -99,7 +99,7 @@ class EmailPusher(Pusher): pass self.timed_call = None - def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: + def on_new_receipts(self) -> None: # We could wake up and cancel the timer but there tend to be quite a # lot of read receipts so it's probably less work to just let the # timer fire diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 50027680cb..725910a659 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -160,7 +160,7 @@ class HttpPusher(Pusher): if should_check_for_notifs: self._start_processing() - def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: + def on_new_receipts(self) -> None: # Note that the min here shouldn't be relied upon to be accurate. # We could check the receipts are actually m.read receipts here, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 6517e3566f..15a2cc932f 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -292,20 +292,12 @@ class PusherPool: except Exception: logger.exception("Exception in pusher on_new_notifications") - async def on_new_receipts( - self, min_stream_id: int, max_stream_id: int, affected_room_ids: Iterable[str] - ) -> None: + async def on_new_receipts(self, users_affected: StrCollection) -> None: if not self.pushers: # nothing to do here. return try: - # Need to subtract 1 from the minimum because the lower bound here - # is not inclusive - users_affected = await self.store.get_users_sent_receipts_between( - min_stream_id - 1, max_stream_id - ) - for u in users_affected: # Don't push if the user account has expired expired = await self._account_validity_handler.is_user_expired(u) @@ -314,7 +306,7 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): - p.on_new_receipts(min_stream_id, max_stream_id) + p.on_new_receipts() except Exception: logger.exception("Exception in pusher on_new_receipts") diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index f4f2b29e96..d5337fe588 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -129,9 +129,7 @@ class ReplicationDataHandler: self.notifier.on_new_event( StreamKeyType.RECEIPT, token, rooms=[row.room_id for row in rows] ) - await self._pusher_pool.on_new_receipts( - token, token, {row.room_id for row in rows} - ) + await self._pusher_pool.on_new_receipts({row.user_id for row in rows}) elif stream_name == ToDeviceStream.NAME: entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index e616b5e1c8..0f0f851b79 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -18,7 +18,7 @@ allowed to be sent by which side. """ import abc import logging -from typing import Optional, Tuple, Type, TypeVar +from typing import List, Optional, Tuple, Type, TypeVar from synapse.replication.tcp.streams._base import StreamRow from synapse.util import json_decoder, json_encoder @@ -74,6 +74,8 @@ SC = TypeVar("SC", bound="_SimpleCommand") class _SimpleCommand(Command): """An implementation of Command whose argument is just a 'data' string.""" + __slots__ = ["data"] + def __init__(self, data: str): self.data = data @@ -122,6 +124,8 @@ class RdataCommand(Command): RDATA presence master 59 ["@baz:example.com", "online", ...] """ + __slots__ = ["stream_name", "instance_name", "token", "row"] + NAME = "RDATA" def __init__( @@ -179,6 +183,8 @@ class PositionCommand(Command): of the stream. """ + __slots__ = ["stream_name", "instance_name", "prev_token", "new_token"] + NAME = "POSITION" def __init__( @@ -235,6 +241,8 @@ class ReplicateCommand(Command): REPLICATE """ + __slots__: List[str] = [] + NAME = "REPLICATE" def __init__(self) -> None: @@ -264,6 +272,8 @@ class UserSyncCommand(Command): Where <state> is either "start" or "end" """ + __slots__ = ["instance_id", "user_id", "device_id", "is_syncing", "last_sync_ms"] + NAME = "USER_SYNC" def __init__( @@ -316,6 +326,8 @@ class ClearUserSyncsCommand(Command): CLEAR_USER_SYNC <instance_id> """ + __slots__ = ["instance_id"] + NAME = "CLEAR_USER_SYNC" def __init__(self, instance_id: str): @@ -343,6 +355,8 @@ class FederationAckCommand(Command): FEDERATION_ACK <instance_name> <token> """ + __slots__ = ["instance_name", "token"] + NAME = "FEDERATION_ACK" def __init__(self, instance_name: str, token: int): @@ -368,6 +382,15 @@ class UserIpCommand(Command): USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent> """ + __slots__ = [ + "user_id", + "access_token", + "ip", + "user_agent", + "device_id", + "last_seen", + ] + NAME = "USER_IP" def __init__( @@ -423,8 +446,6 @@ class RemoteServerUpCommand(_SimpleCommand): """Sent when a worker has detected that a remote server is no longer "down" and retry timings should be reset. - If sent from a client the server will relay to all other workers. - Format:: REMOTE_SERVER_UP <server> @@ -441,6 +462,8 @@ class LockReleasedCommand(Command): LOCK_RELEASED ["<instance_name>", "<lock_name>", "<lock_key>"] """ + __slots__ = ["instance_name", "lock_name", "lock_key"] + NAME = "LOCK_RELEASED" def __init__( diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index e42dade246..9bd0d764f8 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -146,7 +146,7 @@ class PurgeHistoryRestServlet(RestServlet): # RoomStreamToken expects [int] not Optional[int] assert event.internal_metadata.stream_ordering is not None room_token = RoomStreamToken( - event.depth, event.internal_metadata.stream_ordering + topological=event.depth, stream=event.internal_metadata.stream_ordering ) token = await room_token.to_string(self.store) diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py index e0ee55bd0e..8a617af599 100644 --- a/synapse/rest/admin/federation.py +++ b/synapse/rest/admin/federation.py @@ -198,7 +198,13 @@ class DestinationMembershipRestServlet(RestServlet): rooms, total = await self._store.get_destination_rooms_paginate( destination, start, limit, direction ) - response = {"rooms": rooms, "total": total} + response = { + "rooms": [ + {"room_id": room_id, "stream_ordering": stream_ordering} + for room_id, stream_ordering in rooms + ], + "total": total, + } if (start + limit) < total: response["next_token"] = str(start + len(rooms)) diff --git a/synapse/rest/media/config_resource.py b/synapse/rest/media/config_resource.py index a95804d327..dbf5133c72 100644 --- a/synapse/rest/media/config_resource.py +++ b/synapse/rest/media/config_resource.py @@ -14,17 +14,19 @@ # limitations under the License. # +import re from typing import TYPE_CHECKING -from synapse.http.server import DirectServeJsonResource, respond_with_json +from synapse.http.server import respond_with_json +from synapse.http.servlet import RestServlet from synapse.http.site import SynapseRequest if TYPE_CHECKING: from synapse.server import HomeServer -class MediaConfigResource(DirectServeJsonResource): - isLeaf = True +class MediaConfigResource(RestServlet): + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/config$")] def __init__(self, hs: "HomeServer"): super().__init__() @@ -33,9 +35,6 @@ class MediaConfigResource(DirectServeJsonResource): self.auth = hs.get_auth() self.limits_dict = {"m.upload.size": config.media.max_upload_size} - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET(self, request: SynapseRequest) -> None: await self.auth.get_user_by_req(request) respond_with_json(request, 200, self.limits_dict, send_cors=True) - - async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: - respond_with_json(request, 200, {}, send_cors=True) diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py index 3c618ef60a..65b9ff52fa 100644 --- a/synapse/rest/media/download_resource.py +++ b/synapse/rest/media/download_resource.py @@ -13,16 +13,14 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING +import re +from typing import TYPE_CHECKING, Optional -from synapse.http.server import ( - DirectServeJsonResource, - set_corp_headers, - set_cors_headers, -) -from synapse.http.servlet import parse_boolean +from synapse.http.server import set_corp_headers, set_cors_headers +from synapse.http.servlet import RestServlet, parse_boolean from synapse.http.site import SynapseRequest -from synapse.media._base import parse_media_id, respond_404 +from synapse.media._base import respond_404 +from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: from synapse.media.media_repository import MediaRepository @@ -31,15 +29,28 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class DownloadResource(DirectServeJsonResource): - isLeaf = True +class DownloadResource(RestServlet): + PATTERNS = [ + re.compile( + "/_matrix/media/(r0|v3|v1)/download/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)(/(?P<file_name>[^/]*))?$" + ) + ] def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): super().__init__() self.media_repo = media_repo self._is_mine_server_name = hs.is_mine_server_name - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET( + self, + request: SynapseRequest, + server_name: str, + media_id: str, + file_name: Optional[str] = None, + ) -> None: + # Validate the server name, raising if invalid + parse_and_validate_server_name(server_name) + set_cors_headers(request) set_corp_headers(request) request.setHeader( @@ -58,9 +69,8 @@ class DownloadResource(DirectServeJsonResource): b"Referrer-Policy", b"no-referrer", ) - server_name, media_id, name = parse_media_id(request) if self._is_mine_server_name(server_name): - await self.media_repo.get_local_media(request, media_id, name) + await self.media_repo.get_local_media(request, media_id, file_name) else: allow_remote = parse_boolean(request, "allow_remote", default=True) if not allow_remote: @@ -72,4 +82,6 @@ class DownloadResource(DirectServeJsonResource): respond_404(request) return - await self.media_repo.get_remote_media(request, server_name, media_id, name) + await self.media_repo.get_remote_media( + request, server_name, media_id, file_name + ) diff --git a/synapse/rest/media/media_repository_resource.py b/synapse/rest/media/media_repository_resource.py index 5ebaa3b032..2089bb1029 100644 --- a/synapse/rest/media/media_repository_resource.py +++ b/synapse/rest/media/media_repository_resource.py @@ -15,7 +15,7 @@ from typing import TYPE_CHECKING from synapse.config._base import ConfigError -from synapse.http.server import UnrecognizedRequestResource +from synapse.http.server import HttpServer, JsonResource from .config_resource import MediaConfigResource from .download_resource import DownloadResource @@ -27,7 +27,7 @@ if TYPE_CHECKING: from synapse.server import HomeServer -class MediaRepositoryResource(UnrecognizedRequestResource): +class MediaRepositoryResource(JsonResource): """File uploading and downloading. Uploads are POSTed to a resource which returns a token which is used to GET @@ -70,6 +70,11 @@ class MediaRepositoryResource(UnrecognizedRequestResource): width and height are close to the requested size and the aspect matches the requested size. The client should scale the image if it needs to fit within a given rectangle. + + This gets mounted at various points under /_matrix/media, including: + * /_matrix/media/r0 + * /_matrix/media/v1 + * /_matrix/media/v3 """ def __init__(self, hs: "HomeServer"): @@ -77,17 +82,23 @@ class MediaRepositoryResource(UnrecognizedRequestResource): if not hs.config.media.can_load_media_repo: raise ConfigError("Synapse is not configured to use a media repo.") - super().__init__() + JsonResource.__init__(self, hs, canonical_json=False) + self.register_servlets(self, hs) + + @staticmethod + def register_servlets(http_server: HttpServer, hs: "HomeServer") -> None: media_repo = hs.get_media_repository() - self.putChild(b"upload", UploadResource(hs, media_repo)) - self.putChild(b"download", DownloadResource(hs, media_repo)) - self.putChild( - b"thumbnail", ThumbnailResource(hs, media_repo, media_repo.media_storage) + # Note that many of these should not exist as v1 endpoints, but empirically + # a lot of traffic still goes to them. + + UploadResource(hs, media_repo).register(http_server) + DownloadResource(hs, media_repo).register(http_server) + ThumbnailResource(hs, media_repo, media_repo.media_storage).register( + http_server ) if hs.config.media.url_preview_enabled: - self.putChild( - b"preview_url", - PreviewUrlResource(hs, media_repo, media_repo.media_storage), + PreviewUrlResource(hs, media_repo, media_repo.media_storage).register( + http_server ) - self.putChild(b"config", MediaConfigResource(hs)) + MediaConfigResource(hs).register(http_server) diff --git a/synapse/rest/media/preview_url_resource.py b/synapse/rest/media/preview_url_resource.py index 58513c4be4..c8acb65dca 100644 --- a/synapse/rest/media/preview_url_resource.py +++ b/synapse/rest/media/preview_url_resource.py @@ -13,24 +13,20 @@ # See the License for the specific language governing permissions and # limitations under the License. +import re from typing import TYPE_CHECKING -from synapse.http.server import ( - DirectServeJsonResource, - respond_with_json, - respond_with_json_bytes, -) -from synapse.http.servlet import parse_integer, parse_string +from synapse.http.server import respond_with_json_bytes +from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.site import SynapseRequest from synapse.media.media_storage import MediaStorage -from synapse.media.url_previewer import UrlPreviewer if TYPE_CHECKING: from synapse.media.media_repository import MediaRepository from synapse.server import HomeServer -class PreviewUrlResource(DirectServeJsonResource): +class PreviewUrlResource(RestServlet): """ The `GET /_matrix/media/r0/preview_url` endpoint provides a generic preview API for URLs which outputs Open Graph (https://ogp.me/) responses (with some Matrix @@ -48,7 +44,7 @@ class PreviewUrlResource(DirectServeJsonResource): * Matrix cannot be used to distribute the metadata between homeservers. """ - isLeaf = True + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/preview_url$")] def __init__( self, @@ -62,14 +58,10 @@ class PreviewUrlResource(DirectServeJsonResource): self.clock = hs.get_clock() self.media_repo = media_repo self.media_storage = media_storage + assert self.media_repo.url_previewer is not None + self.url_previewer = self.media_repo.url_previewer - self._url_previewer = UrlPreviewer(hs, media_repo, media_storage) - - async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: - request.setHeader(b"Allow", b"OPTIONS, GET") - respond_with_json(request, 200, {}, send_cors=True) - - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET(self, request: SynapseRequest) -> None: # XXX: if get_user_by_req fails, what should we do in an async render? requester = await self.auth.get_user_by_req(request) url = parse_string(request, "url", required=True) @@ -77,5 +69,5 @@ class PreviewUrlResource(DirectServeJsonResource): if ts is None: ts = self.clock.time_msec() - og = await self._url_previewer.preview(url, requester.user, ts) + og = await self.url_previewer.preview(url, requester.user, ts) respond_with_json_bytes(request, 200, og, send_cors=True) diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py index 661e604b85..85b6bdbe72 100644 --- a/synapse/rest/media/thumbnail_resource.py +++ b/synapse/rest/media/thumbnail_resource.py @@ -13,29 +13,24 @@ # See the License for the specific language governing permissions and # limitations under the License. - import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple +import re +from typing import TYPE_CHECKING, List, Optional, Tuple from synapse.api.errors import Codes, SynapseError, cs_error from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP -from synapse.http.server import ( - DirectServeJsonResource, - respond_with_json, - set_corp_headers, - set_cors_headers, -) -from synapse.http.servlet import parse_integer, parse_string +from synapse.http.server import respond_with_json, set_corp_headers, set_cors_headers +from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.site import SynapseRequest from synapse.media._base import ( FileInfo, ThumbnailInfo, - parse_media_id, respond_404, respond_with_file, respond_with_responder, ) from synapse.media.media_storage import MediaStorage +from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: from synapse.media.media_repository import MediaRepository @@ -44,8 +39,12 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class ThumbnailResource(DirectServeJsonResource): - isLeaf = True +class ThumbnailResource(RestServlet): + PATTERNS = [ + re.compile( + "/_matrix/media/(r0|v3|v1)/thumbnail/(?P<server_name>[^/]*)/(?P<media_id>[^/]*)$" + ) + ] def __init__( self, @@ -60,12 +59,17 @@ class ThumbnailResource(DirectServeJsonResource): self.media_storage = media_storage self.dynamic_thumbnails = hs.config.media.dynamic_thumbnails self._is_mine_server_name = hs.is_mine_server_name + self._server_name = hs.hostname self.prevent_media_downloads_from = hs.config.media.prevent_media_downloads_from - async def _async_render_GET(self, request: SynapseRequest) -> None: + async def on_GET( + self, request: SynapseRequest, server_name: str, media_id: str + ) -> None: + # Validate the server name, raising if invalid + parse_and_validate_server_name(server_name) + set_cors_headers(request) set_corp_headers(request) - server_name, media_id, _ = parse_media_id(request) width = parse_integer(request, "width", required=True) height = parse_integer(request, "height", required=True) method = parse_string(request, "method", "scale") @@ -155,30 +159,24 @@ class ThumbnailResource(DirectServeJsonResource): thumbnail_infos = await self.store.get_local_media_thumbnails(media_id) for info in thumbnail_infos: - t_w = info["thumbnail_width"] == desired_width - t_h = info["thumbnail_height"] == desired_height - t_method = info["thumbnail_method"] == desired_method - t_type = info["thumbnail_type"] == desired_type + t_w = info.width == desired_width + t_h = info.height == desired_height + t_method = info.method == desired_method + t_type = info.type == desired_type if t_w and t_h and t_method and t_type: file_info = FileInfo( server_name=None, file_id=media_id, url_cache=media_info["url_cache"], - thumbnail=ThumbnailInfo( - width=info["thumbnail_width"], - height=info["thumbnail_height"], - type=info["thumbnail_type"], - method=info["thumbnail_method"], - ), + thumbnail=info, ) - t_type = file_info.thumbnail_type - t_length = info["thumbnail_length"] - responder = await self.media_storage.fetch_media(file_info) if responder: - await respond_with_responder(request, responder, t_type, t_length) + await respond_with_responder( + request, responder, info.type, info.length + ) return logger.debug("We don't have a thumbnail of that size. Generating") @@ -218,29 +216,23 @@ class ThumbnailResource(DirectServeJsonResource): file_id = media_info["filesystem_id"] for info in thumbnail_infos: - t_w = info["thumbnail_width"] == desired_width - t_h = info["thumbnail_height"] == desired_height - t_method = info["thumbnail_method"] == desired_method - t_type = info["thumbnail_type"] == desired_type + t_w = info.width == desired_width + t_h = info.height == desired_height + t_method = info.method == desired_method + t_type = info.type == desired_type if t_w and t_h and t_method and t_type: file_info = FileInfo( server_name=server_name, file_id=media_info["filesystem_id"], - thumbnail=ThumbnailInfo( - width=info["thumbnail_width"], - height=info["thumbnail_height"], - type=info["thumbnail_type"], - method=info["thumbnail_method"], - ), + thumbnail=info, ) - t_type = file_info.thumbnail_type - t_length = info["thumbnail_length"] - responder = await self.media_storage.fetch_media(file_info) if responder: - await respond_with_responder(request, responder, t_type, t_length) + await respond_with_responder( + request, responder, info.type, info.length + ) return logger.debug("We don't have a thumbnail of that size. Generating") @@ -300,7 +292,7 @@ class ThumbnailResource(DirectServeJsonResource): desired_height: int, desired_method: str, desired_type: str, - thumbnail_infos: List[Dict[str, Any]], + thumbnail_infos: List[ThumbnailInfo], media_id: str, file_id: str, url_cache: bool, @@ -315,7 +307,7 @@ class ThumbnailResource(DirectServeJsonResource): desired_height: The desired height, the returned thumbnail may be larger than this. desired_method: The desired method used to generate the thumbnail. desired_type: The desired content-type of the thumbnail. - thumbnail_infos: A list of dictionaries of candidate thumbnails. + thumbnail_infos: A list of thumbnail info of candidate thumbnails. file_id: The ID of the media that a thumbnail is being requested for. url_cache: True if this is from a URL cache. server_name: The server name, if this is a remote thumbnail. @@ -418,13 +410,14 @@ class ThumbnailResource(DirectServeJsonResource): # `dynamic_thumbnails` is disabled. logger.info("Failed to find any generated thumbnails") + assert request.path is not None respond_with_json( request, 400, cs_error( - "Cannot find any thumbnails for the requested media (%r). This might mean the media is not a supported_media_format=(%s) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)" + "Cannot find any thumbnails for the requested media ('%s'). This might mean the media is not a supported_media_format=(%s) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)" % ( - request.postpath, + request.path.decode(), ", ".join(THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP.keys()), ), code=Codes.UNKNOWN, @@ -438,7 +431,7 @@ class ThumbnailResource(DirectServeJsonResource): desired_height: int, desired_method: str, desired_type: str, - thumbnail_infos: List[Dict[str, Any]], + thumbnail_infos: List[ThumbnailInfo], file_id: str, url_cache: bool, server_name: Optional[str], @@ -451,7 +444,7 @@ class ThumbnailResource(DirectServeJsonResource): desired_height: The desired height, the returned thumbnail may be larger than this. desired_method: The desired method used to generate the thumbnail. desired_type: The desired content-type of the thumbnail. - thumbnail_infos: A list of dictionaries of candidate thumbnails. + thumbnail_infos: A list of thumbnail infos of candidate thumbnails. file_id: The ID of the media that a thumbnail is being requested for. url_cache: True if this is from a URL cache. server_name: The server name, if this is a remote thumbnail. @@ -469,21 +462,25 @@ class ThumbnailResource(DirectServeJsonResource): if desired_method == "crop": # Thumbnails that match equal or larger sizes of desired width/height. - crop_info_list: List[Tuple[int, int, int, bool, int, Dict[str, Any]]] = [] + crop_info_list: List[ + Tuple[int, int, int, bool, Optional[int], ThumbnailInfo] + ] = [] # Other thumbnails. - crop_info_list2: List[Tuple[int, int, int, bool, int, Dict[str, Any]]] = [] + crop_info_list2: List[ + Tuple[int, int, int, bool, Optional[int], ThumbnailInfo] + ] = [] for info in thumbnail_infos: # Skip thumbnails generated with different methods. - if info["thumbnail_method"] != "crop": + if info.method != "crop": continue - t_w = info["thumbnail_width"] - t_h = info["thumbnail_height"] + t_w = info.width + t_h = info.height aspect_quality = abs(d_w * t_h - d_h * t_w) min_quality = 0 if d_w <= t_w and d_h <= t_h else 1 size_quality = abs((d_w - t_w) * (d_h - t_h)) - type_quality = desired_type != info["thumbnail_type"] - length_quality = info["thumbnail_length"] + type_quality = desired_type != info.type + length_quality = info.length if t_w >= d_w or t_h >= d_h: crop_info_list.append( ( @@ -508,7 +505,7 @@ class ThumbnailResource(DirectServeJsonResource): ) # Pick the most appropriate thumbnail. Some values of `desired_width` and # `desired_height` may result in a tie, in which case we avoid comparing on - # the thumbnail info dictionary and pick the thumbnail that appears earlier + # the thumbnail info and pick the thumbnail that appears earlier # in the list of candidates. if crop_info_list: thumbnail_info = min(crop_info_list, key=lambda t: t[:-1])[-1] @@ -516,20 +513,20 @@ class ThumbnailResource(DirectServeJsonResource): thumbnail_info = min(crop_info_list2, key=lambda t: t[:-1])[-1] elif desired_method == "scale": # Thumbnails that match equal or larger sizes of desired width/height. - info_list: List[Tuple[int, bool, int, Dict[str, Any]]] = [] + info_list: List[Tuple[int, bool, int, ThumbnailInfo]] = [] # Other thumbnails. - info_list2: List[Tuple[int, bool, int, Dict[str, Any]]] = [] + info_list2: List[Tuple[int, bool, int, ThumbnailInfo]] = [] for info in thumbnail_infos: # Skip thumbnails generated with different methods. - if info["thumbnail_method"] != "scale": + if info.method != "scale": continue - t_w = info["thumbnail_width"] - t_h = info["thumbnail_height"] + t_w = info.width + t_h = info.height size_quality = abs((d_w - t_w) * (d_h - t_h)) - type_quality = desired_type != info["thumbnail_type"] - length_quality = info["thumbnail_length"] + type_quality = desired_type != info.type + length_quality = info.length if t_w >= d_w or t_h >= d_h: info_list.append((size_quality, type_quality, length_quality, info)) else: @@ -538,7 +535,7 @@ class ThumbnailResource(DirectServeJsonResource): ) # Pick the most appropriate thumbnail. Some values of `desired_width` and # `desired_height` may result in a tie, in which case we avoid comparing on - # the thumbnail info dictionary and pick the thumbnail that appears earlier + # the thumbnail info and pick the thumbnail that appears earlier # in the list of candidates. if info_list: thumbnail_info = min(info_list, key=lambda t: t[:-1])[-1] @@ -550,13 +547,7 @@ class ThumbnailResource(DirectServeJsonResource): file_id=file_id, url_cache=url_cache, server_name=server_name, - thumbnail=ThumbnailInfo( - width=thumbnail_info["thumbnail_width"], - height=thumbnail_info["thumbnail_height"], - type=thumbnail_info["thumbnail_type"], - method=thumbnail_info["thumbnail_method"], - length=thumbnail_info["thumbnail_length"], - ), + thumbnail=thumbnail_info, ) # No matching thumbnail was found. diff --git a/synapse/rest/media/upload_resource.py b/synapse/rest/media/upload_resource.py index 043e8d6077..949326d85d 100644 --- a/synapse/rest/media/upload_resource.py +++ b/synapse/rest/media/upload_resource.py @@ -14,11 +14,12 @@ # limitations under the License. import logging +import re from typing import IO, TYPE_CHECKING, Dict, List, Optional from synapse.api.errors import Codes, SynapseError -from synapse.http.server import DirectServeJsonResource, respond_with_json -from synapse.http.servlet import parse_bytes_from_args +from synapse.http.server import respond_with_json +from synapse.http.servlet import RestServlet, parse_bytes_from_args from synapse.http.site import SynapseRequest from synapse.media.media_storage import SpamMediaException @@ -29,8 +30,8 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class UploadResource(DirectServeJsonResource): - isLeaf = True +class UploadResource(RestServlet): + PATTERNS = [re.compile("/_matrix/media/(r0|v3|v1)/upload")] def __init__(self, hs: "HomeServer", media_repo: "MediaRepository"): super().__init__() @@ -43,10 +44,7 @@ class UploadResource(DirectServeJsonResource): self.max_upload_size = hs.config.media.max_upload_size self.clock = hs.get_clock() - async def _async_render_OPTIONS(self, request: SynapseRequest) -> None: - respond_with_json(request, 200, {}, send_cors=True) - - async def _async_render_POST(self, request: SynapseRequest) -> None: + async def on_POST(self, request: SynapseRequest) -> None: requester = await self.auth.get_user_by_req(request) raw_content_length = request.getHeader("Content-Length") if raw_content_length is None: diff --git a/synapse/storage/database.py b/synapse/storage/database.py index ca894edd5a..7d8af5c610 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -2418,7 +2418,7 @@ class DatabasePool: keyvalues: Optional[Dict[str, Any]] = None, exclude_keyvalues: Optional[Dict[str, Any]] = None, order_direction: str = "ASC", - ) -> List[Dict[str, Any]]: + ) -> List[Tuple[Any, ...]]: """ Executes a SELECT query on the named table with start and limit, of row numbers, which may return zero or number of rows from start to limit, @@ -2447,7 +2447,7 @@ class DatabasePool: order_direction: Whether the results should be ordered "ASC" or "DESC". Returns: - The result as a list of dictionaries. + The result as a list of tuples. """ if order_direction not in ["ASC", "DESC"]: raise ValueError("order_direction must be one of 'ASC' or 'DESC'.") @@ -2474,7 +2474,7 @@ class DatabasePool: ) txn.execute(sql, arg_list + [limit, start]) - return cls.cursor_to_dict(txn) + return txn.fetchall() async def simple_search_list( self, diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 101403578c..dfcbf0a175 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -142,26 +142,6 @@ class DataStore( super().__init__(database, db_conn, hs) - async def get_users(self) -> List[JsonDict]: - """Function to retrieve a list of users in users table. - - Returns: - A list of dictionaries representing users. - """ - return await self.db_pool.simple_select_list( - table="users", - keyvalues={}, - retcols=[ - "name", - "password_hash", - "is_guest", - "admin", - "user_type", - "deactivated", - ], - desc="get_users", - ) - async def get_users_paginate( self, start: int, diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 80f146dd53..39498d52c6 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -103,6 +103,13 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) "AccountDataAndTagsChangeCache", account_max ) + self.db_pool.updates.register_background_index_update( + update_name="room_account_data_index_room_id", + index_name="room_account_data_room_id", + table="room_account_data", + columns=("room_id",), + ) + self.db_pool.updates.register_background_update_handler( "delete_account_data_for_deactivated_users", self._delete_account_data_for_deactivated_users, @@ -151,10 +158,10 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) sql += " AND content != '{}'" txn.execute(sql, (user_id,)) - rows = self.db_pool.cursor_to_dict(txn) return { - row["account_data_type"]: db_to_json(row["content"]) for row in rows + account_data_type: db_to_json(content) + for account_data_type, content in txn } return await self.db_pool.runInteraction( @@ -196,13 +203,12 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) sql += " AND content != '{}'" txn.execute(sql, (user_id,)) - rows = self.db_pool.cursor_to_dict(txn) by_room: Dict[str, Dict[str, JsonDict]] = {} - for row in rows: - room_data = by_room.setdefault(row["room_id"], {}) + for room_id, account_data_type, content in txn: + room_data = by_room.setdefault(room_id, {}) - room_data[row["account_data_type"]] = db_to_json(row["content"]) + room_data[account_data_type] = db_to_json(content) return by_room diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 0553a0621a..073a99cd84 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -14,17 +14,7 @@ # limitations under the License. import logging import re -from typing import ( - TYPE_CHECKING, - Any, - Dict, - List, - Optional, - Pattern, - Sequence, - Tuple, - cast, -) +from typing import TYPE_CHECKING, List, Optional, Pattern, Sequence, Tuple, cast from synapse.appservice import ( ApplicationService, @@ -353,21 +343,15 @@ class ApplicationServiceTransactionWorkerStore( def _get_oldest_unsent_txn( txn: LoggingTransaction, - ) -> Optional[Dict[str, Any]]: + ) -> Optional[Tuple[int, str]]: # Monotonically increasing txn ids, so just select the smallest # one in the txns table (we delete them when they are sent) txn.execute( - "SELECT * FROM application_services_txns WHERE as_id=?" + "SELECT txn_id, event_ids FROM application_services_txns WHERE as_id=?" " ORDER BY txn_id ASC LIMIT 1", (service.id,), ) - rows = self.db_pool.cursor_to_dict(txn) - if not rows: - return None - - entry = rows[0] - - return entry + return cast(Optional[Tuple[int, str]], txn.fetchone()) entry = await self.db_pool.runInteraction( "get_oldest_unsent_appservice_txn", _get_oldest_unsent_txn @@ -376,8 +360,9 @@ class ApplicationServiceTransactionWorkerStore( if not entry: return None - event_ids = db_to_json(entry["event_ids"]) + txn_id, event_ids_str = entry + event_ids = db_to_json(event_ids_str) events = await self.get_events_as_list(event_ids) # TODO: to-device messages, one-time key counts, device list summaries and unused @@ -385,7 +370,7 @@ class ApplicationServiceTransactionWorkerStore( # We likely want to populate those for reliability. return AppServiceTransaction( service=service, - id=entry["txn_id"], + id=txn_id, events=events, ephemeral=[], to_device_messages=[], diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index df596f35f9..9f3804a504 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1413,13 +1413,13 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): def get_devices_not_accessed_since_txn( txn: LoggingTransaction, - ) -> List[Dict[str, str]]: + ) -> List[Tuple[str, str]]: sql = """ SELECT user_id, device_id FROM devices WHERE last_seen < ? AND hidden = FALSE """ txn.execute(sql, (since_ms,)) - return self.db_pool.cursor_to_dict(txn) + return cast(List[Tuple[str, str]], txn.fetchall()) rows = await self.db_pool.runInteraction( "get_devices_not_accessed_since", @@ -1427,11 +1427,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): ) devices: Dict[str, List[str]] = {} - for row in rows: + for user_id, device_id in rows: # Remote devices are never stale from our point of view. - if self.hs.is_mine_id(row["user_id"]): - user_devices = devices.setdefault(row["user_id"], []) - user_devices.append(row["device_id"]) + if self.hs.is_mine_id(user_id): + user_devices = devices.setdefault(user_id, []) + user_devices.append(device_id) return devices diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py index d01f28cc80..aac4cfb054 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py @@ -53,6 +53,13 @@ class EndToEndRoomKeyBackgroundStore(SQLBaseStore): ): super().__init__(database, db_conn, hs) + self.db_pool.updates.register_background_index_update( + update_name="e2e_room_keys_index_room_id", + index_name="e2e_room_keys_room_id", + table="e2e_room_keys", + columns=("room_id",), + ) + self.db_pool.updates.register_background_update_handler( "delete_e2e_backup_keys_for_deactivated_users", self._delete_e2e_backup_keys_for_deactivated_users, @@ -208,7 +215,7 @@ class EndToEndRoomKeyStore(EndToEndRoomKeyBackgroundStore): "message": "Set room key", "room_id": room_id, "session_id": session_id, - StreamKeyType.ROOM: room_key, + StreamKeyType.ROOM.value: room_key, } ) diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 89fac23f93..749ae54e20 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -921,14 +921,10 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker } txn.execute(sql, params) - rows = self.db_pool.cursor_to_dict(txn) - for row in rows: - user_id = row["user_id"] - key_type = row["keytype"] - key = db_to_json(row["keydata"]) + for user_id, key_type, key_data, _ in txn: user_keys = result.setdefault(user_id, {}) - user_keys[key_type] = key + user_keys[key_type] = db_to_json(key_data) return result @@ -988,13 +984,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker query_params.extend(item) txn.execute(sql, query_params) - rows = self.db_pool.cursor_to_dict(txn) # and add the signatures to the appropriate keys - for row in rows: - key_id: str = row["key_id"] - target_user_id: str = row["target_user_id"] - target_device_id: str = row["target_device_id"] + for target_user_id, target_device_id, key_id, signature in txn: key_type = devices[(target_user_id, target_device_id)] # We need to copy everything, because the result may have come # from the cache. dict.copy only does a shallow copy, so we @@ -1012,13 +1004,11 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker ].copy() if from_user_id in signatures: user_sigs = signatures[from_user_id] = signatures[from_user_id] - user_sigs[key_id] = row["signature"] + user_sigs[key_id] = signature else: - signatures[from_user_id] = {key_id: row["signature"]} + signatures[from_user_id] = {key_id: signature} else: - target_user_key["signatures"] = { - from_user_id: {key_id: row["signature"]} - } + target_user_key["signatures"] = {from_user_id: {key_id: signature}} return keys diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 790d058c43..d4dcdb898c 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1654,8 +1654,6 @@ class PersistEventsStore: ) -> None: to_prefill = [] - rows = [] - ev_map = {e.event_id: e for e, _ in events_and_contexts} if not ev_map: return @@ -1676,10 +1674,9 @@ class PersistEventsStore: ) txn.execute(sql + clause, args) - rows = self.db_pool.cursor_to_dict(txn) - for row in rows: - event = ev_map[row["event_id"]] - if not row["rejects"] and not row["redacts"]: + for event_id, redacts, rejects in txn: + event = ev_map[event_id] + if not rejects and not redacts: to_prefill.append(EventCacheEntry(event=event, redacted_event=None)) async def external_prefill() -> None: diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index 8cebeb5189..2e6b176bd2 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -28,6 +28,7 @@ from typing import ( from synapse.api.constants import Direction from synapse.logging.opentracing import trace +from synapse.media._base import ThumbnailInfo from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -435,8 +436,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): desc="store_url_cache", ) - async def get_local_media_thumbnails(self, media_id: str) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( + async def get_local_media_thumbnails(self, media_id: str) -> List[ThumbnailInfo]: + rows = await self.db_pool.simple_select_list( "local_media_repository_thumbnails", {"media_id": media_id}, ( @@ -448,6 +449,16 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): ), desc="get_local_media_thumbnails", ) + return [ + ThumbnailInfo( + width=row["thumbnail_width"], + height=row["thumbnail_height"], + method=row["thumbnail_method"], + type=row["thumbnail_type"], + length=row["thumbnail_length"], + ) + for row in rows + ] @trace async def store_local_thumbnail( @@ -556,8 +567,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): async def get_remote_media_thumbnails( self, origin: str, media_id: str - ) -> List[Dict[str, Any]]: - return await self.db_pool.simple_select_list( + ) -> List[ThumbnailInfo]: + rows = await self.db_pool.simple_select_list( "remote_media_cache_thumbnails", {"media_origin": origin, "media_id": media_id}, ( @@ -566,10 +577,19 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore): "thumbnail_method", "thumbnail_type", "thumbnail_length", - "filesystem_id", ), desc="get_remote_media_thumbnails", ) + return [ + ThumbnailInfo( + width=row["thumbnail_width"], + height=row["thumbnail_height"], + method=row["thumbnail_method"], + type=row["thumbnail_type"], + length=row["thumbnail_length"], + ) + for row in rows + ] @trace async def get_remote_media_thumbnail( diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 194b4e031f..519f05fb60 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -20,6 +20,7 @@ from typing import ( Mapping, Optional, Tuple, + Union, cast, ) @@ -385,28 +386,47 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) limit = 100 offset = 0 while True: - rows = await self.db_pool.runInteraction( - "get_presence_for_all_users", - self.db_pool.simple_select_list_paginate_txn, - "presence_stream", - orderby="stream_id", - start=offset, - limit=limit, - exclude_keyvalues=exclude_keyvalues, - retcols=( - "user_id", - "state", - "last_active_ts", - "last_federation_update_ts", - "last_user_sync_ts", - "status_msg", - "currently_active", + rows = cast( + List[Tuple[str, str, int, int, int, Optional[str], Union[int, bool]]], + await self.db_pool.runInteraction( + "get_presence_for_all_users", + self.db_pool.simple_select_list_paginate_txn, + "presence_stream", + orderby="stream_id", + start=offset, + limit=limit, + exclude_keyvalues=exclude_keyvalues, + retcols=( + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + ), + order_direction="ASC", ), - order_direction="ASC", ) - for row in rows: - users_to_state[row["user_id"]] = UserPresenceState(**row) + for ( + user_id, + state, + last_active_ts, + last_federation_update_ts, + last_user_sync_ts, + status_msg, + currently_active, + ) in rows: + users_to_state[user_id] = UserPresenceState( + user_id=user_id, + state=state, + last_active_ts=last_active_ts, + last_federation_update_ts=last_federation_update_ts, + last_user_sync_ts=last_user_sync_ts, + status_msg=status_msg, + currently_active=bool(currently_active), + ) # We've run out of updates to query if len(rows) < limit: @@ -434,13 +454,21 @@ class PresenceStore(PresenceBackgroundUpdateStore, CacheInvalidationWorkerStore) txn = db_conn.cursor() txn.execute(sql, (PresenceState.OFFLINE,)) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() txn.close() - for row in rows: - row["currently_active"] = bool(row["currently_active"]) - - return [UserPresenceState(**row) for row in rows] + return [ + UserPresenceState( + user_id=user_id, + state=state, + last_active_ts=last_active_ts, + last_federation_update_ts=last_federation_update_ts, + last_user_sync_ts=last_user_sync_ts, + status_msg=status_msg, + currently_active=bool(currently_active), + ) + for user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active in rows + ] def take_presence_startup_info(self) -> List[UserPresenceState]: active_on_startup = self._presence_on_startup diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index dea0e0458c..1e11bf2706 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -89,6 +89,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # furthermore, we might already have the table from a previous (failed) # purge attempt, so let's drop the table first. + if isinstance(self.database_engine, PostgresEngine): + # Disable statement timeouts for this transaction; purging rooms can + # take a while! + txn.execute("SET LOCAL statement_timeout = 0") + txn.execute("DROP TABLE IF EXISTS events_to_purge") txn.execute( diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 87e28e22d3..c7eb7fc478 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -47,6 +47,27 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +# The type of a row in the pushers table. +PusherRow = Tuple[ + int, # id + str, # user_name + Optional[int], # access_token + str, # profile_tag + str, # kind + str, # app_id + str, # app_display_name + str, # device_display_name + str, # pushkey + int, # ts + str, # lang + str, # data + int, # last_stream_ordering + int, # last_success + int, # failing_since + bool, # enabled + str, # device_id +] + class PusherWorkerStore(SQLBaseStore): def __init__( @@ -83,30 +104,66 @@ class PusherWorkerStore(SQLBaseStore): self._remove_deleted_email_pushers, ) - def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[PusherConfig]: + def _decode_pushers_rows( + self, + rows: Iterable[PusherRow], + ) -> Iterator[PusherConfig]: """JSON-decode the data in the rows returned from the `pushers` table Drops any rows whose data cannot be decoded """ - for r in rows: - data_json = r["data"] + for ( + id, + user_name, + access_token, + profile_tag, + kind, + app_id, + app_display_name, + device_display_name, + pushkey, + ts, + lang, + data, + last_stream_ordering, + last_success, + failing_since, + enabled, + device_id, + ) in rows: try: - r["data"] = db_to_json(data_json) + data_json = db_to_json(data) except Exception as e: logger.warning( "Invalid JSON in data for pusher %d: %s, %s", - r["id"], - data_json, + id, + data, e.args[0], ) continue - # If we're using SQLite, then boolean values are integers. This is - # troublesome since some code using the return value of this method might - # expect it to be a boolean, or will expose it to clients (in responses). - r["enabled"] = bool(r["enabled"]) - - yield PusherConfig(**r) + yield PusherConfig( + id=id, + user_name=user_name, + profile_tag=profile_tag, + kind=kind, + app_id=app_id, + app_display_name=app_display_name, + device_display_name=device_display_name, + pushkey=pushkey, + ts=ts, + lang=lang, + data=data_json, + last_stream_ordering=last_stream_ordering, + last_success=last_success, + failing_since=failing_since, + # If we're using SQLite, then boolean values are integers. This is + # troublesome since some code using the return value of this method might + # expect it to be a boolean, or will expose it to clients (in responses). + enabled=bool(enabled), + device_id=device_id, + access_token=access_token, + ) def get_pushers_stream_token(self) -> int: return self._pushers_id_gen.get_current_token() @@ -136,7 +193,7 @@ class PusherWorkerStore(SQLBaseStore): The pushers for which the given columns have the given values. """ - def get_pushers_by_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def get_pushers_by_txn(txn: LoggingTransaction) -> List[PusherRow]: # We could technically use simple_select_list here, but we need to call # COALESCE on the 'enabled' column. While it is technically possible to give # simple_select_list the whole `COALESCE(...) AS ...` as a column name, it @@ -154,7 +211,7 @@ class PusherWorkerStore(SQLBaseStore): txn.execute(sql, list(keyvalues.values())) - return self.db_pool.cursor_to_dict(txn) + return cast(List[PusherRow], txn.fetchall()) ret = await self.db_pool.runInteraction( desc="get_pushers_by", @@ -164,14 +221,22 @@ class PusherWorkerStore(SQLBaseStore): return self._decode_pushers_rows(ret) async def get_enabled_pushers(self) -> Iterator[PusherConfig]: - def get_enabled_pushers_txn(txn: LoggingTransaction) -> Iterator[PusherConfig]: - txn.execute("SELECT * FROM pushers WHERE COALESCE(enabled, TRUE)") - rows = self.db_pool.cursor_to_dict(txn) - - return self._decode_pushers_rows(rows) + def get_enabled_pushers_txn(txn: LoggingTransaction) -> List[PusherRow]: + txn.execute( + """ + SELECT id, user_name, access_token, profile_tag, kind, app_id, + app_display_name, device_display_name, pushkey, ts, lang, data, + last_stream_ordering, last_success, failing_since, + enabled, device_id + FROM pushers WHERE COALESCE(enabled, TRUE) + """ + ) + return cast(List[PusherRow], txn.fetchall()) - return await self.db_pool.runInteraction( - "get_enabled_pushers", get_enabled_pushers_txn + return self._decode_pushers_rows( + await self.db_pool.runInteraction( + "get_enabled_pushers", get_enabled_pushers_txn + ) ) async def get_all_updated_pushers_rows( @@ -304,7 +369,7 @@ class PusherWorkerStore(SQLBaseStore): ) async def get_throttle_params_by_room( - self, pusher_id: str + self, pusher_id: int ) -> Dict[str, ThrottleParams]: res = await self.db_pool.simple_select_list( "pusher_throttle", @@ -323,7 +388,7 @@ class PusherWorkerStore(SQLBaseStore): return params_by_room async def set_throttle_params( - self, pusher_id: str, room_id: str, params: ThrottleParams + self, pusher_id: int, room_id: str, params: ThrottleParams ) -> None: await self.db_pool.simple_upsert( "pusher_throttle", @@ -534,7 +599,7 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): (last_pusher_id, batch_size), ) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if len(rows) == 0: return 0 @@ -550,19 +615,19 @@ class PusherBackgroundUpdatesStore(SQLBaseStore): txn=txn, table="pushers", key_names=("id",), - key_values=[(row["pusher_id"],) for row in rows], + key_values=[row[0] for row in rows], value_names=("device_id", "access_token"), # If there was already a device_id on the pusher, we only want to clear # the access_token column, so we keep the existing device_id. Otherwise, # we set the device_id we got from joining the access_tokens table. value_values=[ - (row["pusher_device_id"] or row["token_device_id"], None) - for row in rows + (pusher_device_id or token_device_id, None) + for _, pusher_device_id, token_device_id in rows ], ) self.db_pool.updates._background_update_progress_txn( - txn, "set_device_id_for_pushers", {"pusher_id": rows[-1]["pusher_id"]} + txn, "set_device_id_for_pushers", {"pusher_id": rows[-1][0]} ) return len(rows) diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 0231f9407b..b2645ab43c 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -313,25 +313,25 @@ class ReceiptsWorkerStore(SQLBaseStore): ) -> Sequence[JsonMapping]: """See get_linearized_receipts_for_room""" - def f(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str]]: if from_key: sql = ( - "SELECT * FROM receipts_linearized WHERE" + "SELECT receipt_type, user_id, event_id, data" + " FROM receipts_linearized WHERE" " room_id = ? AND stream_id > ? AND stream_id <= ?" ) txn.execute(sql, (room_id, from_key, to_key)) else: sql = ( - "SELECT * FROM receipts_linearized WHERE" + "SELECT receipt_type, user_id, event_id, data" + " FROM receipts_linearized WHERE" " room_id = ? AND stream_id <= ?" ) txn.execute(sql, (room_id, to_key)) - rows = self.db_pool.cursor_to_dict(txn) - - return rows + return cast(List[Tuple[str, str, str, str]], txn.fetchall()) rows = await self.db_pool.runInteraction("get_linearized_receipts_for_room", f) @@ -339,10 +339,10 @@ class ReceiptsWorkerStore(SQLBaseStore): return [] content: JsonDict = {} - for row in rows: - content.setdefault(row["event_id"], {}).setdefault(row["receipt_type"], {})[ - row["user_id"] - ] = db_to_json(row["data"]) + for receipt_type, user_id, event_id, data in rows: + content.setdefault(event_id, {}).setdefault(receipt_type, {})[ + user_id + ] = db_to_json(data) return [{"type": EduTypes.RECEIPT, "room_id": room_id, "content": content}] @@ -357,10 +357,13 @@ class ReceiptsWorkerStore(SQLBaseStore): if not room_ids: return {} - def f(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def f( + txn: LoggingTransaction, + ) -> List[Tuple[str, str, str, str, Optional[str], str]]: if from_key: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, thread_id, data + FROM receipts_linearized WHERE stream_id > ? AND stream_id <= ? AND """ clause, args = make_in_list_sql_clause( @@ -370,7 +373,8 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql + clause, [from_key, to_key] + list(args)) else: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, thread_id, data + FROM receipts_linearized WHERE stream_id <= ? AND """ @@ -380,29 +384,31 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql + clause, [to_key] + list(args)) - return self.db_pool.cursor_to_dict(txn) + return cast( + List[Tuple[str, str, str, str, Optional[str], str]], txn.fetchall() + ) txn_results = await self.db_pool.runInteraction( "_get_linearized_receipts_for_rooms", f ) results: JsonDict = {} - for row in txn_results: + for room_id, receipt_type, user_id, event_id, thread_id, data in txn_results: # We want a single event per room, since we want to batch the # receipts by room, event and type. room_event = results.setdefault( - row["room_id"], - {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}}, + room_id, + {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}}, ) # The content is of the form: # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. } - event_entry = room_event["content"].setdefault(row["event_id"], {}) - receipt_type = event_entry.setdefault(row["receipt_type"], {}) + event_entry = room_event["content"].setdefault(event_id, {}) + receipt_type_dict = event_entry.setdefault(receipt_type, {}) - receipt_type[row["user_id"]] = db_to_json(row["data"]) - if row["thread_id"]: - receipt_type[row["user_id"]]["thread_id"] = row["thread_id"] + receipt_type_dict[user_id] = db_to_json(data) + if thread_id: + receipt_type_dict[user_id]["thread_id"] = thread_id results = { room_id: [results[room_id]] if room_id in results else [] @@ -428,10 +434,11 @@ class ReceiptsWorkerStore(SQLBaseStore): A dictionary of roomids to a list of receipts. """ - def f(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def f(txn: LoggingTransaction) -> List[Tuple[str, str, str, str, str]]: if from_key: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized WHERE stream_id > ? AND stream_id <= ? ORDER BY stream_id DESC LIMIT 100 @@ -439,7 +446,8 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql, [from_key, to_key]) else: sql = """ - SELECT * FROM receipts_linearized WHERE + SELECT room_id, receipt_type, user_id, event_id, data + FROM receipts_linearized WHERE stream_id <= ? ORDER BY stream_id DESC LIMIT 100 @@ -447,27 +455,27 @@ class ReceiptsWorkerStore(SQLBaseStore): txn.execute(sql, [to_key]) - return self.db_pool.cursor_to_dict(txn) + return cast(List[Tuple[str, str, str, str, str]], txn.fetchall()) txn_results = await self.db_pool.runInteraction( "get_linearized_receipts_for_all_rooms", f ) results: JsonDict = {} - for row in txn_results: + for room_id, receipt_type, user_id, event_id, data in txn_results: # We want a single event per room, since we want to batch the # receipts by room, event and type. room_event = results.setdefault( - row["room_id"], - {"type": EduTypes.RECEIPT, "room_id": row["room_id"], "content": {}}, + room_id, + {"type": EduTypes.RECEIPT, "room_id": room_id, "content": {}}, ) # The content is of the form: # {"$foo:bar": { "read": { "@user:host": <receipt> }, .. }, .. } - event_entry = room_event["content"].setdefault(row["event_id"], {}) - receipt_type = event_entry.setdefault(row["receipt_type"], {}) + event_entry = room_event["content"].setdefault(event_id, {}) + receipt_type_dict = event_entry.setdefault(receipt_type, {}) - receipt_type[row["user_id"]] = db_to_json(row["data"]) + receipt_type_dict[user_id] = db_to_json(data) return results @@ -742,7 +750,7 @@ class ReceiptsWorkerStore(SQLBaseStore): event_ids: List[str], thread_id: Optional[str], data: dict, - ) -> Optional[Tuple[int, int]]: + ) -> Optional[int]: """Insert a receipt, either from local client or remote server. Automatically does conversion between linearized and graph @@ -804,9 +812,7 @@ class ReceiptsWorkerStore(SQLBaseStore): data, ) - max_persisted_id = self._receipts_id_gen.get_current_token() - - return stream_id, max_persisted_id + return stream_id async def _insert_graph_receipt( self, diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index cc964604e2..64a2c31a5d 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -195,7 +195,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]: """Returns info about the user account, if it exists.""" - def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]: + def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[UserInfo]: # We could technically use simple_select_one here, but it would not perform # the COALESCEs (unless hacked into the column names), which could yield # confusing results. @@ -213,35 +213,46 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): (user_id,), ) - rows = self.db_pool.cursor_to_dict(txn) - - if len(rows) == 0: + row = txn.fetchone() + if not row: return None - return rows[0] + ( + name, + is_guest, + admin, + consent_version, + consent_ts, + consent_server_notice_sent, + appservice_id, + creation_ts, + user_type, + deactivated, + shadow_banned, + approved, + locked, + ) = row + + return UserInfo( + appservice_id=appservice_id, + consent_server_notice_sent=consent_server_notice_sent, + consent_version=consent_version, + consent_ts=consent_ts, + creation_ts=creation_ts, + is_admin=bool(admin), + is_deactivated=bool(deactivated), + is_guest=bool(is_guest), + is_shadow_banned=bool(shadow_banned), + user_id=UserID.from_string(name), + user_type=user_type, + approved=bool(approved), + locked=bool(locked), + ) - row = await self.db_pool.runInteraction( + return await self.db_pool.runInteraction( desc="get_user_by_id", func=get_user_by_id_txn, ) - if row is None: - return None - - return UserInfo( - appservice_id=row["appservice_id"], - consent_server_notice_sent=row["consent_server_notice_sent"], - consent_version=row["consent_version"], - consent_ts=row["consent_ts"], - creation_ts=row["creation_ts"], - is_admin=bool(row["admin"]), - is_deactivated=bool(row["deactivated"]), - is_guest=bool(row["is_guest"]), - is_shadow_banned=bool(row["shadow_banned"]), - user_id=UserID.from_string(row["name"]), - user_type=row["user_type"], - approved=bool(row["approved"]), - locked=bool(row["locked"]), - ) async def is_trial_user(self, user_id: str) -> bool: """Checks if user is in the "trial" period, i.e. within the first @@ -579,16 +590,31 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """ txn.execute(sql, (token,)) - rows = self.db_pool.cursor_to_dict(txn) - - if rows: - row = rows[0] - - # This field is nullable, ensure it comes out as a boolean - if row["token_used"] is None: - row["token_used"] = False + row = txn.fetchone() - return TokenLookupResult(**row) + if row: + ( + user_id, + is_guest, + shadow_banned, + token_id, + device_id, + valid_until_ms, + token_owner, + token_used, + ) = row + + return TokenLookupResult( + user_id=user_id, + is_guest=is_guest, + shadow_banned=shadow_banned, + token_id=token_id, + device_id=device_id, + valid_until_ms=valid_until_ms, + token_owner=token_owner, + # This field is nullable, ensure it comes out as a boolean + token_used=bool(token_used), + ) return None @@ -833,11 +859,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Counts all users registered on the homeserver.""" def _count_users(txn: LoggingTransaction) -> int: - txn.execute("SELECT COUNT(*) AS users FROM users") - rows = self.db_pool.cursor_to_dict(txn) - if rows: - return rows[0]["users"] - return 0 + txn.execute("SELECT COUNT(*) FROM users") + row = txn.fetchone() + assert row is not None + return row[0] return await self.db_pool.runInteraction("count_users", _count_users) @@ -891,11 +916,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): """Counts all users without a special user_type registered on the homeserver.""" def _count_users(txn: LoggingTransaction) -> int: - txn.execute("SELECT COUNT(*) AS users FROM users where user_type is null") - rows = self.db_pool.cursor_to_dict(txn) - if rows: - return rows[0]["users"] - return 0 + txn.execute("SELECT COUNT(*) FROM users where user_type is null") + row = txn.fetchone() + assert row is not None + return row[0] return await self.db_pool.runInteraction("count_real_users", _count_users) @@ -1252,12 +1276,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): ) txn.execute(sql, []) - res = self.db_pool.cursor_to_dict(txn) - if res: - for user in res: - self.set_expiration_date_for_user_txn( - txn, user["name"], use_delta=True - ) + for (name,) in txn.fetchall(): + self.set_expiration_date_for_user_txn(txn, name, use_delta=True) await self.db_pool.runInteraction( "get_users_with_no_expiration_date", @@ -1963,11 +1983,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): (user_id,), ) - rows = self.db_pool.cursor_to_dict(txn) + row = txn.fetchone() + assert row is not None # We cast to bool because the value returned by the database engine might # be an integer if we're using SQLite. - return bool(rows[0]["approved"]) + return bool(row[0]) return await self.db_pool.runInteraction( desc="is_user_pending_approval", @@ -2045,22 +2066,22 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): (last_user, batch_size), ) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return True, 0 rows_processed_nb = 0 - for user in rows: - if not user["count_tokens"] and not user["count_threepids"]: - self.set_user_deactivated_status_txn(txn, user["name"], True) + for name, count_tokens, count_threepids in rows: + if not count_tokens and not count_threepids: + self.set_user_deactivated_status_txn(txn, name, True) rows_processed_nb += 1 logger.info("Marked %d rows as deactivated", rows_processed_nb) self.db_pool.updates._background_update_progress_txn( - txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]} + txn, "users_set_deactivated_flag", {"user_id": rows[-1][0]} ) if batch_size > len(rows): diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 719e11aea6..1d4d99932b 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -831,7 +831,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): def get_retention_policy_for_room_txn( txn: LoggingTransaction, - ) -> List[Dict[str, Optional[int]]]: + ) -> Optional[Tuple[Optional[int], Optional[int]]]: txn.execute( """ SELECT min_lifetime, max_lifetime FROM room_retention @@ -841,7 +841,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): (room_id,), ) - return self.db_pool.cursor_to_dict(txn) + return cast(Optional[Tuple[Optional[int], Optional[int]]], txn.fetchone()) ret = await self.db_pool.runInteraction( "get_retention_policy_for_room", @@ -856,8 +856,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): max_lifetime=self.config.retention.retention_default_max_lifetime, ) - min_lifetime = ret[0]["min_lifetime"] - max_lifetime = ret[0]["max_lifetime"] + min_lifetime, max_lifetime = ret # If one of the room's policy's attributes isn't defined, use the matching # attribute from the default policy. @@ -1162,14 +1161,13 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql, args) - rows = self.db_pool.cursor_to_dict(txn) - rooms_dict = {} - - for row in rows: - rooms_dict[row["room_id"]] = RetentionPolicy( - min_lifetime=row["min_lifetime"], - max_lifetime=row["max_lifetime"], + rooms_dict = { + room_id: RetentionPolicy( + min_lifetime=min_lifetime, + max_lifetime=max_lifetime, ) + for room_id, min_lifetime, max_lifetime in txn + } if include_null: # If required, do a second query that retrieves all of the rooms we know @@ -1178,13 +1176,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql) - rows = self.db_pool.cursor_to_dict(txn) - # If a room isn't already in the dict (i.e. it doesn't have a retention # policy in its state), add it with a null policy. - for row in rows: - if row["room_id"] not in rooms_dict: - rooms_dict[row["room_id"]] = RetentionPolicy() + for (room_id,) in txn: + if room_id not in rooms_dict: + rooms_dict[room_id] = RetentionPolicy() return rooms_dict @@ -1703,24 +1699,24 @@ class RoomBackgroundUpdateStore(SQLBaseStore): (last_room, batch_size), ) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return True - for row in rows: - if not row["json"]: + for room_id, event_id, json in rows: + if not json: retention_policy = {} else: - ev = db_to_json(row["json"]) + ev = db_to_json(json) retention_policy = ev["content"] self.db_pool.simple_insert_txn( txn=txn, table="room_retention", values={ - "room_id": row["room_id"], - "event_id": row["event_id"], + "room_id": room_id, + "event_id": event_id, "min_lifetime": retention_policy.get("min_lifetime"), "max_lifetime": retention_policy.get("max_lifetime"), }, @@ -1729,7 +1725,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): logger.info("Inserted %d rows into room_retention", len(rows)) self.db_pool.updates._background_update_progress_txn( - txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]} + txn, "insert_room_retention", {"room_id": rows[-1][0]} ) if batch_size > len(rows): diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index e93573f315..bbe08368db 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1349,18 +1349,16 @@ class RoomMemberBackgroundUpdateStore(SQLBaseStore): txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size)) - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return 0 - min_stream_id = rows[-1]["stream_ordering"] + min_stream_id = rows[-1][0] to_update = [] - for row in rows: - event_id = row["event_id"] - room_id = row["room_id"] + for _, event_id, room_id, json in rows: try: - event_json = db_to_json(row["json"]) + event_json = db_to_json(json) content = event_json["content"] except Exception: continue diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index a7aae661d8..1d69c4a5f0 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -179,22 +179,24 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): # store_search_entries_txn with a generator function, but that # would mean having two cursors open on the database at once. # Instead we just build a list of results. - rows = self.db_pool.cursor_to_dict(txn) + rows = txn.fetchall() if not rows: return 0 - min_stream_id = rows[-1]["stream_ordering"] + min_stream_id = rows[-1][0] event_search_rows = [] - for row in rows: + for ( + stream_ordering, + event_id, + room_id, + etype, + json, + origin_server_ts, + ) in rows: try: - event_id = row["event_id"] - room_id = row["room_id"] - etype = row["type"] - stream_ordering = row["stream_ordering"] - origin_server_ts = row["origin_server_ts"] try: - event_json = db_to_json(row["json"]) + event_json = db_to_json(json) content = event_json["content"] except Exception: continue diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 5a3611c415..ea06e4eee0 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -266,7 +266,7 @@ def generate_next_token( # when we are going backwards so we subtract one from the # stream part. last_stream_ordering -= 1 - return RoomStreamToken(last_topo_ordering, last_stream_ordering) + return RoomStreamToken(topological=last_topo_ordering, stream=last_stream_ordering) def _make_generic_sql_bound( @@ -558,7 +558,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if p > min_pos } - return RoomStreamToken(None, min_pos, immutabledict(positions)) + return RoomStreamToken(stream=min_pos, instance_map=immutabledict(positions)) async def get_room_events_stream_for_rooms( self, @@ -708,7 +708,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ret.reverse() if rows: - key = RoomStreamToken(None, min(r.stream_ordering for r in rows)) + key = RoomStreamToken(stream=min(r.stream_ordering for r in rows)) else: # Assume we didn't get anything because there was nothing to # get. @@ -969,7 +969,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): topo = await self.db_pool.runInteraction( "_get_max_topological_txn", self._get_max_topological_txn, room_id ) - return RoomStreamToken(topo, stream_ordering) + return RoomStreamToken(topological=topo, stream=stream_ordering) @overload def get_stream_id_for_event_txn( @@ -1033,7 +1033,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): retcols=("stream_ordering", "topological_ordering"), desc="get_topological_token_for_event", ) - return RoomStreamToken(row["topological_ordering"], row["stream_ordering"]) + return RoomStreamToken( + topological=row["topological_ordering"], stream=row["stream_ordering"] + ) async def get_current_topological_token(self, room_id: str, stream_key: int) -> int: """Gets the topological token in a room after or at the given stream @@ -1114,8 +1116,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): else: topo = None internal = event.internal_metadata - internal.before = RoomStreamToken(topo, stream - 1) - internal.after = RoomStreamToken(topo, stream) + internal.before = RoomStreamToken(topological=topo, stream=stream - 1) + internal.after = RoomStreamToken(topological=topo, stream=stream) internal.order = (int(topo) if topo else 0, int(stream)) async def get_events_around( @@ -1191,11 +1193,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): # Paginating backwards includes the event at the token, but paginating # forward doesn't. before_token = RoomStreamToken( - results["topological_ordering"] - 1, results["stream_ordering"] + topological=results["topological_ordering"] - 1, + stream=results["stream_ordering"], ) after_token = RoomStreamToken( - results["topological_ordering"], results["stream_ordering"] + topological=results["topological_ordering"], + stream=results["stream_ordering"], ) rows, start_token = self._paginate_room_events_txn( diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py index 5c5372a825..5555b53575 100644 --- a/synapse/storage/databases/main/task_scheduler.py +++ b/synapse/storage/databases/main/task_scheduler.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, List, Optional, Tuple, cast from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( @@ -27,6 +27,8 @@ from synapse.util import json_encoder if TYPE_CHECKING: from synapse.server import HomeServer +ScheduledTaskRow = Tuple[str, str, str, int, str, str, str, str] + class TaskSchedulerWorkerStore(SQLBaseStore): def __init__( @@ -38,13 +40,18 @@ class TaskSchedulerWorkerStore(SQLBaseStore): super().__init__(database, db_conn, hs) @staticmethod - def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: - row["status"] = TaskStatus(row["status"]) - if row["params"] is not None: - row["params"] = db_to_json(row["params"]) - if row["result"] is not None: - row["result"] = db_to_json(row["result"]) - return ScheduledTask(**row) + def _convert_row_to_task(row: ScheduledTaskRow) -> ScheduledTask: + task_id, action, status, timestamp, resource_id, params, result, error = row + return ScheduledTask( + id=task_id, + action=action, + status=TaskStatus(status), + timestamp=timestamp, + resource_id=resource_id, + params=db_to_json(params) if params is not None else None, + result=db_to_json(result) if result is not None else None, + error=error, + ) async def get_scheduled_tasks( self, @@ -68,7 +75,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore): Returns: a list of `ScheduledTask`, ordered by increasing timestamps """ - def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[ScheduledTaskRow]: clauses: List[str] = [] args: List[Any] = [] if resource_id: @@ -101,7 +108,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore): args.append(limit) txn.execute(sql, args) - return self.db_pool.cursor_to_dict(txn) + return cast(List[ScheduledTaskRow], txn.fetchall()) rows = await self.db_pool.runInteraction( "get_scheduled_tasks", get_scheduled_tasks_txn @@ -193,7 +200,22 @@ class TaskSchedulerWorkerStore(SQLBaseStore): desc="get_scheduled_task", ) - return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None + return ( + TaskSchedulerWorkerStore._convert_row_to_task( + ( + row["id"], + row["action"], + row["status"], + row["timestamp"], + row["resource_id"], + row["params"], + row["result"], + row["error"], + ) + ) + if row + else None + ) async def delete_scheduled_task(self, id: str) -> None: """Delete a specific task from its id. diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 8f70eff809..f35757280d 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -526,7 +526,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): start: int, limit: int, direction: Direction = Direction.FORWARDS, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: """Function to retrieve a paginated list of destination's rooms. This will return a json list of rooms and the total number of rooms. @@ -537,12 +537,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): limit: number of rows to retrieve direction: sort ascending or descending by room_id Returns: - A tuple of a dict of rooms and a count of total rooms. + A tuple of a list of room tuples and a count of total rooms. + + Each room tuple is room_id, stream_ordering. """ def get_destination_rooms_paginate_txn( txn: LoggingTransaction, - ) -> Tuple[List[JsonDict], int]: + ) -> Tuple[List[Tuple[str, int]], int]: if direction == Direction.BACKWARDS: order = "DESC" else: @@ -556,14 +558,17 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): txn.execute(sql, [destination]) count = cast(Tuple[int], txn.fetchone())[0] - rooms = self.db_pool.simple_select_list_paginate_txn( - txn=txn, - table="destination_rooms", - orderby="room_id", - start=start, - limit=limit, - retcols=("room_id", "stream_ordering"), - order_direction=order, + rooms = cast( + List[Tuple[str, int]], + self.db_pool.simple_select_list_paginate_txn( + txn=txn, + table="destination_rooms", + orderby="room_id", + start=start, + limit=limit, + retcols=("room_id", "stream_ordering"), + order_direction=order, + ), ) return rooms, count diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 5b50bd66bc..de89de7d74 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -125,8 +125,8 @@ Changes in SCHEMA_VERSION = 82 SCHEMA_COMPAT_VERSION = ( - # The `event_txn_id_device_id` must be written to for new events. - 80 + # The event_txn_id table and tables from MSC2716 no longer exist. + 82 ) """Limit on how far the synapse codebase can be rolled back without breaking db compat diff --git a/synapse/storage/schema/main/delta/82/03_drop_old_tables.sql b/synapse/storage/schema/main/delta/82/03_drop_old_tables.sql new file mode 100644 index 0000000000..149020bbd7 --- /dev/null +++ b/synapse/storage/schema/main/delta/82/03_drop_old_tables.sql @@ -0,0 +1,24 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Drop the old event transaction ID table, the event_txn_id_device_id table +-- should be used instead. +DROP TABLE IF EXISTS event_txn_id; + +-- Drop tables related to MSC2716 since the implementation is being removed +DROP TABLE insertion_events; +DROP TABLE insertion_event_edges; +DROP TABLE insertion_event_extremities; +DROP TABLE batch_events; diff --git a/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql b/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql new file mode 100644 index 0000000000..fc948166e6 --- /dev/null +++ b/synapse/storage/schema/main/delta/82/04_add_indices_for_purging_rooms.sql @@ -0,0 +1,20 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8204, 'e2e_room_keys_index_room_id', '{}'); + +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (8204, 'room_account_data_index_room_id', '{}'); diff --git a/synapse/streams/events.py b/synapse/streams/events.py index d7084d2358..609a0978a9 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Iterator, Tuple +from typing import TYPE_CHECKING, Sequence, Tuple import attr @@ -23,7 +23,7 @@ from synapse.handlers.room import RoomEventSource from synapse.handlers.typing import TypingNotificationEventSource from synapse.logging.opentracing import trace from synapse.streams import EventSource -from synapse.types import StreamToken +from synapse.types import StreamKeyType, StreamToken if TYPE_CHECKING: from synapse.server import HomeServer @@ -37,9 +37,14 @@ class _EventSourcesInner: receipt: ReceiptEventSource account_data: AccountDataEventSource - def get_sources(self) -> Iterator[Tuple[str, EventSource]]: - for attribute in attr.fields(_EventSourcesInner): - yield attribute.name, getattr(self, attribute.name) + def get_sources(self) -> Sequence[Tuple[StreamKeyType, EventSource]]: + return [ + (StreamKeyType.ROOM, self.room), + (StreamKeyType.PRESENCE, self.presence), + (StreamKeyType.TYPING, self.typing), + (StreamKeyType.RECEIPT, self.receipt), + (StreamKeyType.ACCOUNT_DATA, self.account_data), + ] class EventSources: diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 76b0e3e694..09a88c86a7 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -22,8 +22,8 @@ from typing import ( Any, ClassVar, Dict, - Final, List, + Literal, Mapping, Match, MutableMapping, @@ -34,6 +34,7 @@ from typing import ( Type, TypeVar, Union, + overload, ) import attr @@ -60,6 +61,8 @@ from synapse.util.cancellation import cancellable from synapse.util.stringutils import parse_and_validate_server_name if TYPE_CHECKING: + from typing_extensions import Self + from synapse.appservice.api import ApplicationService from synapse.storage.databases.main import DataStore, PurgeEventsStore from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore @@ -436,7 +439,78 @@ def map_username_to_mxid_localpart( @attr.s(frozen=True, slots=True, order=False) -class RoomStreamToken: +class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): + """An abstract stream token class for streams that supports multiple + writers. + + This works by keeping track of the stream position of each writer, + represented by a default `stream` attribute and a map of instance name to + stream position of any writers that are ahead of the default stream + position. + """ + + stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True) + + instance_map: "immutabledict[str, int]" = attr.ib( + factory=immutabledict, + validator=attr.validators.deep_mapping( + key_validator=attr.validators.instance_of(str), + value_validator=attr.validators.instance_of(int), + mapping_validator=attr.validators.instance_of(immutabledict), + ), + kw_only=True, + ) + + @classmethod + @abc.abstractmethod + async def parse(cls, store: "DataStore", string: str) -> "Self": + """Parse the string representation of the token.""" + ... + + @abc.abstractmethod + async def to_string(self, store: "DataStore") -> str: + """Serialize the token into its string representation.""" + ... + + def copy_and_advance(self, other: "Self") -> "Self": + """Return a new token such that if an event is after both this token and + the other token, then its after the returned token too. + """ + + max_stream = max(self.stream, other.stream) + + instance_map = { + instance: max( + self.instance_map.get(instance, self.stream), + other.instance_map.get(instance, other.stream), + ) + for instance in set(self.instance_map).union(other.instance_map) + } + + return attr.evolve( + self, stream=max_stream, instance_map=immutabledict(instance_map) + ) + + def get_max_stream_pos(self) -> int: + """Get the maximum stream position referenced in this token. + + The corresponding "min" position is, by definition just `self.stream`. + + This is used to handle tokens that have non-empty `instance_map`, and so + reference stream positions after the `self.stream` position. + """ + return max(self.instance_map.values(), default=self.stream) + + def get_stream_pos_for_instance(self, instance_name: str) -> int: + """Get the stream position that the given writer was at at this token.""" + + # If we don't have an entry for the instance we can assume that it was + # at `self.stream`. + return self.instance_map.get(instance_name, self.stream) + + +@attr.s(frozen=True, slots=True, order=False) +class RoomStreamToken(AbstractMultiWriterStreamToken): """Tokens are positions between events. The token "s1" comes after event 1. s0 s1 @@ -513,16 +587,8 @@ class RoomStreamToken: topological: Optional[int] = attr.ib( validator=attr.validators.optional(attr.validators.instance_of(int)), - ) - stream: int = attr.ib(validator=attr.validators.instance_of(int)) - - instance_map: "immutabledict[str, int]" = attr.ib( - factory=immutabledict, - validator=attr.validators.deep_mapping( - key_validator=attr.validators.instance_of(str), - value_validator=attr.validators.instance_of(int), - mapping_validator=attr.validators.instance_of(immutabledict), - ), + kw_only=True, + default=None, ) def __attrs_post_init__(self) -> None: @@ -582,17 +648,7 @@ class RoomStreamToken: if self.topological or other.topological: raise Exception("Can't advance topological tokens") - max_stream = max(self.stream, other.stream) - - instance_map = { - instance: max( - self.instance_map.get(instance, self.stream), - other.instance_map.get(instance, other.stream), - ) - for instance in set(self.instance_map).union(other.instance_map) - } - - return RoomStreamToken(None, max_stream, immutabledict(instance_map)) + return super().copy_and_advance(other) def as_historical_tuple(self) -> Tuple[int, int]: """Returns a tuple of `(topological, stream)` for historical tokens. @@ -618,16 +674,6 @@ class RoomStreamToken: # at `self.stream`. return self.instance_map.get(instance_name, self.stream) - def get_max_stream_pos(self) -> int: - """Get the maximum stream position referenced in this token. - - The corresponding "min" position is, by definition just `self.stream`. - - This is used to handle tokens that have non-empty `instance_map`, and so - reference stream positions after the `self.stream` position. - """ - return max(self.instance_map.values(), default=self.stream) - async def to_string(self, store: "DataStore") -> str: if self.topological is not None: return "t%d-%d" % (self.topological, self.stream) @@ -649,20 +695,20 @@ class RoomStreamToken: return "s%d" % (self.stream,) -class StreamKeyType: +class StreamKeyType(Enum): """Known stream types. A stream is a list of entities ordered by an incrementing "stream token". """ - ROOM: Final = "room_key" - PRESENCE: Final = "presence_key" - TYPING: Final = "typing_key" - RECEIPT: Final = "receipt_key" - ACCOUNT_DATA: Final = "account_data_key" - PUSH_RULES: Final = "push_rules_key" - TO_DEVICE: Final = "to_device_key" - DEVICE_LIST: Final = "device_list_key" + ROOM = "room_key" + PRESENCE = "presence_key" + TYPING = "typing_key" + RECEIPT = "receipt_key" + ACCOUNT_DATA = "account_data_key" + PUSH_RULES = "push_rules_key" + TO_DEVICE = "to_device_key" + DEVICE_LIST = "device_list_key" UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key" @@ -784,7 +830,7 @@ class StreamToken: def room_stream_id(self) -> int: return self.room_key.stream - def copy_and_advance(self, key: str, new_value: Any) -> "StreamToken": + def copy_and_advance(self, key: StreamKeyType, new_value: Any) -> "StreamToken": """Advance the given key in the token to a new value if and only if the new value is after the old value. @@ -797,35 +843,68 @@ class StreamToken: return new_token new_token = self.copy_and_replace(key, new_value) - new_id = int(getattr(new_token, key)) - old_id = int(getattr(self, key)) + new_id = new_token.get_field(key) + old_id = self.get_field(key) if old_id < new_id: return new_token else: return self - def copy_and_replace(self, key: str, new_value: Any) -> "StreamToken": - return attr.evolve(self, **{key: new_value}) + def copy_and_replace(self, key: StreamKeyType, new_value: Any) -> "StreamToken": + return attr.evolve(self, **{key.value: new_value}) + @overload + def get_field(self, key: Literal[StreamKeyType.ROOM]) -> RoomStreamToken: + ... -StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0, 0) + @overload + def get_field( + self, + key: Literal[ + StreamKeyType.ACCOUNT_DATA, + StreamKeyType.DEVICE_LIST, + StreamKeyType.PRESENCE, + StreamKeyType.PUSH_RULES, + StreamKeyType.RECEIPT, + StreamKeyType.TO_DEVICE, + StreamKeyType.TYPING, + StreamKeyType.UN_PARTIAL_STATED_ROOMS, + ], + ) -> int: + ... + @overload + def get_field(self, key: StreamKeyType) -> Union[int, RoomStreamToken]: + ... -@attr.s(slots=True, frozen=True, auto_attribs=True) -class PersistedEventPosition: - """Position of a newly persisted event with instance that persisted it. + def get_field(self, key: StreamKeyType) -> Union[int, RoomStreamToken]: + """Returns the stream ID for the given key.""" + return getattr(self, key.value) - This can be used to test whether the event is persisted before or after a - RoomStreamToken. - """ + +StreamToken.START = StreamToken(RoomStreamToken(stream=0), 0, 0, 0, 0, 0, 0, 0, 0, 0) + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class PersistedPosition: + """Position of a newly persisted row with instance that persisted it.""" instance_name: str stream: int - def persisted_after(self, token: RoomStreamToken) -> bool: + def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool: return token.get_stream_pos_for_instance(self.instance_name) < self.stream + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class PersistedEventPosition(PersistedPosition): + """Position of a newly persisted event with instance that persisted it. + + This can be used to test whether the event is persisted before or after a + RoomStreamToken. + """ + def to_room_stream_token(self) -> RoomStreamToken: """Converts the position to a room stream token such that events persisted in the same room after this position will be after the @@ -836,7 +915,7 @@ class PersistedEventPosition: """ # Doing the naive thing satisfies the desired properties described in # the docstring. - return RoomStreamToken(None, self.stream) + return RoomStreamToken(stream=self.stream) @attr.s(slots=True, frozen=True, auto_attribs=True) diff --git a/synmark/__init__.py b/synmark/__init__.py index 2cc00b0f03..f213319542 100644 --- a/synmark/__init__.py +++ b/synmark/__init__.py @@ -13,15 +13,18 @@ # limitations under the License. import sys +from typing import cast + +from synapse.types import ISynapseReactor try: from twisted.internet.epollreactor import EPollReactor as Reactor except ImportError: - from twisted.internet.pollreactor import PollReactor as Reactor + from twisted.internet.pollreactor import PollReactor as Reactor # type: ignore[assignment] from twisted.internet.main import installReactor -def make_reactor(): +def make_reactor() -> ISynapseReactor: """ Instantiate and install a Twisted reactor suitable for testing (i.e. not the default global one). @@ -32,4 +35,4 @@ def make_reactor(): del sys.modules["twisted.internet.reactor"] installReactor(reactor) - return reactor + return cast(ISynapseReactor, reactor) diff --git a/synmark/__main__.py b/synmark/__main__.py index 19de639187..397dd86576 100644 --- a/synmark/__main__.py +++ b/synmark/__main__.py @@ -12,9 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import sys -from argparse import REMAINDER +from argparse import REMAINDER, Namespace from contextlib import redirect_stderr from io import StringIO +from typing import Any, Callable, Coroutine, List, TypeVar import pyperf @@ -22,44 +23,50 @@ from twisted.internet.defer import Deferred, ensureDeferred from twisted.logger import globalLogBeginner, textFileLogObserver from twisted.python.failure import Failure +from synapse.types import ISynapseReactor from synmark import make_reactor from synmark.suites import SUITES from tests.utils import setupdb +T = TypeVar("T") -def make_test(main): + +def make_test( + main: Callable[[ISynapseReactor, int], Coroutine[Any, Any, float]] +) -> Callable[[int], float]: """ Take a benchmark function and wrap it in a reactor start and stop. """ - def _main(loops): + def _main(loops: int) -> float: reactor = make_reactor() file_out = StringIO() with redirect_stderr(file_out): - d = Deferred() + d: "Deferred[float]" = Deferred() d.addCallback(lambda _: ensureDeferred(main(reactor, loops))) - def on_done(_): - if isinstance(_, Failure): - _.printTraceback() + def on_done(res: T) -> T: + if isinstance(res, Failure): + res.printTraceback() print(file_out.getvalue()) reactor.stop() - return _ + return res d.addBoth(on_done) reactor.callWhenRunning(lambda: d.callback(True)) reactor.run() - return d.result + # mypy thinks this is an object for some reason. + return d.result # type: ignore[return-value] return _main if __name__ == "__main__": - def add_cmdline_args(cmd, args): + def add_cmdline_args(cmd: List[str], args: Namespace) -> None: if args.log: cmd.extend(["--log"]) cmd.extend(args.tests) @@ -82,17 +89,26 @@ if __name__ == "__main__": setupdb() if runner.args.tests: - SUITES = list( - filter(lambda x: x[0].__name__.split(".")[-1] in runner.args.tests, SUITES) + existing_suites = {s.__name__.split(".")[-1] for s, _ in SUITES} + for test in runner.args.tests: + if test not in existing_suites: + print(f"Test suite {test} does not exist.") + exit(-1) + + suites = list( + filter(lambda t: t[0].__name__.split(".")[-1] in runner.args.tests, SUITES) ) + else: + suites = SUITES - for suite, loops in SUITES: + for suite, loops in suites: if loops: runner.args.loops = loops + loops_desc = str(loops) else: runner.args.loops = orig_loops - loops = "auto" + loops_desc = "auto" runner.bench_time_func( - suite.__name__ + "_" + str(loops), + suite.__name__ + "_" + loops_desc, make_test(suite.main), ) diff --git a/synmark/suites/logging.py b/synmark/suites/logging.py index 04e5b29dc9..e160443643 100644 --- a/synmark/suites/logging.py +++ b/synmark/suites/logging.py @@ -11,14 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging +import logging.config import warnings from io import StringIO +from typing import Optional from unittest.mock import Mock from pyperf import perf_counter +from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.defer import Deferred from twisted.internet.protocol import ServerFactory from twisted.logger import LogBeginner, LogPublisher @@ -26,45 +28,53 @@ from twisted.protocols.basic import LineOnlyReceiver from synapse.config.logger import _setup_stdlib_logging from synapse.logging import RemoteHandler +from synapse.synapse_rust import reset_logging_config +from synapse.types import ISynapseReactor from synapse.util import Clock class LineCounter(LineOnlyReceiver): delimiter = b"\n" + count = 0 - def __init__(self, *args, **kwargs): - self.count = 0 - super().__init__(*args, **kwargs) - - def lineReceived(self, line): + def lineReceived(self, line: bytes) -> None: self.count += 1 + assert isinstance(self.factory, Factory) + if self.count >= self.factory.wait_for and self.factory.on_done: on_done = self.factory.on_done self.factory.on_done = None on_done.callback(True) -async def main(reactor, loops): +class Factory(ServerFactory): + protocol = LineCounter + wait_for: int + on_done: Optional[Deferred] + + +async def main(reactor: ISynapseReactor, loops: int) -> float: """ Benchmark how long it takes to send `loops` messages. """ - servers = [] - def protocol(): - p = LineCounter() - servers.append(p) - return p - - logger_factory = ServerFactory.forProtocol(protocol) + logger_factory = Factory() logger_factory.wait_for = loops logger_factory.on_done = Deferred() - port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1") + port = reactor.listenTCP(0, logger_factory, backlog=50, interface="127.0.0.1") # A fake homeserver config. class Config: - server_name = "synmark-" + str(loops) - no_redirect_stdio = True + class server: + server_name = "synmark-" + str(loops) + + # This odd construct is to avoid mypy thinking that logging escapes the + # scope of Config. + class _logging: + no_redirect_stdio = True + + logging = _logging hs_config = Config() @@ -78,28 +88,34 @@ async def main(reactor, loops): publisher, errors, mock_sys, warnings, initialBufferSize=loops ) + address = port.getHost() + assert isinstance(address, (IPv4Address, IPv6Address)) log_config = { "version": 1, - "loggers": {"synapse": {"level": "DEBUG", "handlers": ["tersejson"]}}, + "loggers": {"synapse": {"level": "DEBUG", "handlers": ["remote"]}}, "formatters": {"tersejson": {"class": "synapse.logging.TerseJsonFormatter"}}, "handlers": { - "tersejson": { + "remote": { "class": "synapse.logging.RemoteHandler", - "host": "127.0.0.1", - "port": port.getHost().port, + "formatter": "tersejson", + "host": address.host, + "port": address.port, "maximum_buffer": 100, - "_reactor": reactor, } }, } - logger = logging.getLogger("synapse.logging.test_terse_json") + logger = logging.getLogger("synapse") _setup_stdlib_logging( - hs_config, - log_config, + hs_config, # type: ignore[arg-type] + None, logBeginner=beginner, ) + # Force a new logging config without having to load it from a file. + logging.config.dictConfig(log_config) + reset_logging_config() + # Wait for it to connect... for handler in logging.getLogger("synapse").handlers: if isinstance(handler, RemoteHandler): @@ -107,7 +123,7 @@ async def main(reactor, loops): else: raise RuntimeError("Improperly configured: no RemoteHandler found.") - await handler._service.whenConnected() + await handler._service.whenConnected(failAfterFailures=10) start = perf_counter() diff --git a/synmark/suites/lrucache.py b/synmark/suites/lrucache.py index 9b4a424149..cfa0163c62 100644 --- a/synmark/suites/lrucache.py +++ b/synmark/suites/lrucache.py @@ -14,14 +14,15 @@ from pyperf import perf_counter +from synapse.types import ISynapseReactor from synapse.util.caches.lrucache import LruCache -async def main(reactor, loops): +async def main(reactor: ISynapseReactor, loops: int) -> float: """ Benchmark `loops` number of insertions into LruCache without eviction. """ - cache = LruCache(loops) + cache: LruCache[int, bool] = LruCache(loops) start = perf_counter() diff --git a/synmark/suites/lrucache_evict.py b/synmark/suites/lrucache_evict.py index 0ee202ed36..02238c2627 100644 --- a/synmark/suites/lrucache_evict.py +++ b/synmark/suites/lrucache_evict.py @@ -14,15 +14,16 @@ from pyperf import perf_counter +from synapse.types import ISynapseReactor from synapse.util.caches.lrucache import LruCache -async def main(reactor, loops): +async def main(reactor: ISynapseReactor, loops: int) -> float: """ Benchmark `loops` number of insertions into LruCache where half of them are evicted. """ - cache = LruCache(loops // 2) + cache: LruCache[int, bool] = LruCache(loops // 2) start = perf_counter() diff --git a/tests/federation/transport/test_knocking.py b/tests/federation/transport/test_knocking.py index 3f42f79f26..b63ef3d4ed 100644 --- a/tests/federation/transport/test_knocking.py +++ b/tests/federation/transport/test_knocking.py @@ -308,7 +308,7 @@ class FederationKnockingTestCase( self.assertEqual(200, channel.code, channel.result) # Check that we got the stripped room state in return - room_state_events = channel.json_body["knock_state_events"] + room_state_events = channel.json_body["knock_room_state"] # Validate the stripped room state events self.check_knock_room_state_against_room_state( diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index a7e6cdd66a..867dbd6001 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -31,7 +31,7 @@ from synapse.appservice import ( from synapse.handlers.appservice import ApplicationServicesHandler from synapse.rest.client import login, receipts, register, room, sendtodevice from synapse.server import HomeServer -from synapse.types import JsonDict, RoomStreamToken +from synapse.types import JsonDict, RoomStreamToken, StreamKeyType from synapse.util import Clock from synapse.util.stringutils import random_string @@ -86,7 +86,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): [event], ] ) - self.handler.notify_interested_services(RoomStreamToken(None, 1)) + self.handler.notify_interested_services(RoomStreamToken(stream=1)) self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( interested_service, events=[event] @@ -107,7 +107,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): ] ) self.mock_store.get_events_as_list = AsyncMock(side_effect=[[event]]) - self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.handler.notify_interested_services(RoomStreamToken(stream=0)) self.mock_as_api.query_user.assert_called_once_with(services[0], user_id) @@ -126,7 +126,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): ] ) - self.handler.notify_interested_services(RoomStreamToken(None, 0)) + self.handler.notify_interested_services(RoomStreamToken(stream=0)) self.assertFalse( self.mock_as_api.query_user.called, @@ -304,7 +304,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.handler.notify_interested_services_ephemeral( - "receipt_key", 580, ["@fakerecipient:example.com"] + StreamKeyType.RECEIPT, 580, ["@fakerecipient:example.com"] ) self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( interested_service, ephemeral=[event] @@ -332,7 +332,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): ) self.handler.notify_interested_services_ephemeral( - "receipt_key", 580, ["@fakerecipient:example.com"] + StreamKeyType.RECEIPT, 580, ["@fakerecipient:example.com"] ) # This method will be called, but with an empty list of events self.mock_scheduler.enqueue_for_appservice.assert_called_once_with( @@ -441,7 +441,7 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): self.get_success( self.hs.get_application_service_handler()._notify_interested_services( RoomStreamToken( - None, self.hs.get_application_service_handler().current_max + stream=self.hs.get_application_service_handler().current_max ) ) ) @@ -634,7 +634,7 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): self.get_success( self.hs.get_application_service_handler()._notify_interested_services_ephemeral( services=[interested_appservice], - stream_key="receipt_key", + stream_key=StreamKeyType.RECEIPT, new_token=stream_token, users=[self.exclusive_as_user], ) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 95106ec8f3..3060bc9744 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -28,7 +28,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.handlers.typing import TypingWriterHandler from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.server import HomeServer -from synapse.types import JsonDict, Requester, UserID, create_requester +from synapse.types import JsonDict, Requester, StreamKeyType, UserID, create_requester from synapse.util import Clock from tests import unittest @@ -203,7 +203,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) ) - self.on_new_event.assert_has_calls([call("typing_key", 1, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 1, rooms=[ROOM_ID])] + ) self.assertEqual(self.event_source.get_current_key(), 1) events = self.get_success( @@ -273,7 +275,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) self.assertEqual(channel.code, 200) - self.on_new_event.assert_has_calls([call("typing_key", 1, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 1, rooms=[ROOM_ID])] + ) self.assertEqual(self.event_source.get_current_key(), 1) events = self.get_success( @@ -349,7 +353,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) ) - self.on_new_event.assert_has_calls([call("typing_key", 1, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 1, rooms=[ROOM_ID])] + ) self.mock_federation_client.put_json.assert_called_once_with( "farm", @@ -399,7 +405,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) ) - self.on_new_event.assert_has_calls([call("typing_key", 1, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 1, rooms=[ROOM_ID])] + ) self.on_new_event.reset_mock() self.assertEqual(self.event_source.get_current_key(), 1) @@ -425,7 +433,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.reactor.pump([16]) - self.on_new_event.assert_has_calls([call("typing_key", 2, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 2, rooms=[ROOM_ID])] + ) self.assertEqual(self.event_source.get_current_key(), 2) events = self.get_success( @@ -459,7 +469,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): ) ) - self.on_new_event.assert_has_calls([call("typing_key", 3, rooms=[ROOM_ID])]) + self.on_new_event.assert_has_calls( + [call(StreamKeyType.TYPING, 3, rooms=[ROOM_ID])] + ) self.on_new_event.reset_mock() self.assertEqual(self.event_source.get_current_key(), 3) diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index 04fc7bdcef..15f5d644e4 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -28,12 +28,13 @@ from typing_extensions import Literal from twisted.internet import defer from twisted.internet.defer import Deferred from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource from synapse.api.errors import Codes from synapse.events import EventBase from synapse.http.types import QueryParams from synapse.logging.context import make_deferred_yieldable -from synapse.media._base import FileInfo +from synapse.media._base import FileInfo, ThumbnailInfo from synapse.media.filepath import MediaFilePaths from synapse.media.media_storage import MediaStorage, ReadableFileWrapper from synapse.media.storage_provider import FileStorageProviderBackend @@ -41,12 +42,13 @@ from synapse.module_api import ModuleApi from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers from synapse.rest import admin from synapse.rest.client import login +from synapse.rest.media.thumbnail_resource import ThumbnailResource from synapse.server import HomeServer from synapse.types import JsonDict, RoomAlias from synapse.util import Clock from tests import unittest -from tests.server import FakeChannel, FakeSite, make_request +from tests.server import FakeChannel from tests.test_utils import SMALL_PNG from tests.utils import default_config @@ -288,22 +290,22 @@ class MediaRepoTests(unittest.HomeserverTestCase): return hs def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - media_resource = hs.get_media_repository_resource() - self.download_resource = media_resource.children[b"download"] - self.thumbnail_resource = media_resource.children[b"thumbnail"] self.store = hs.get_datastores().main self.media_repo = hs.get_media_repository() self.media_id = "example.com/12345" + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + def _req( self, content_disposition: Optional[bytes], include_content_type: bool = True ) -> FakeChannel: - channel = make_request( - self.reactor, - FakeSite(self.download_resource, self.reactor), + channel = self.make_request( "GET", - self.media_id, + f"/_matrix/media/v3/download/{self.media_id}", shorthand=False, await_result=False, ) @@ -481,11 +483,9 @@ class MediaRepoTests(unittest.HomeserverTestCase): # Fetching again should work, without re-requesting the image from the # remote. params = "?width=32&height=32&method=scale" - channel = make_request( - self.reactor, - FakeSite(self.thumbnail_resource, self.reactor), + channel = self.make_request( "GET", - self.media_id + params, + f"/_matrix/media/v3/thumbnail/{self.media_id}{params}", shorthand=False, await_result=False, ) @@ -511,11 +511,9 @@ class MediaRepoTests(unittest.HomeserverTestCase): ) shutil.rmtree(thumbnail_dir, ignore_errors=True) - channel = make_request( - self.reactor, - FakeSite(self.thumbnail_resource, self.reactor), + channel = self.make_request( "GET", - self.media_id + params, + f"/_matrix/media/v3/thumbnail/{self.media_id}{params}", shorthand=False, await_result=False, ) @@ -549,11 +547,9 @@ class MediaRepoTests(unittest.HomeserverTestCase): """ params = "?width=32&height=32&method=" + method - channel = make_request( - self.reactor, - FakeSite(self.thumbnail_resource, self.reactor), + channel = self.make_request( "GET", - self.media_id + params, + f"/_matrix/media/r0/thumbnail/{self.media_id}{params}", shorthand=False, await_result=False, ) @@ -590,7 +586,7 @@ class MediaRepoTests(unittest.HomeserverTestCase): channel.json_body, { "errcode": "M_UNKNOWN", - "error": "Cannot find any thumbnails for the requested media ([b'example.com', b'12345']). This might mean the media is not a supported_media_format=(image/jpeg, image/jpg, image/webp, image/gif, image/png) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)", + "error": "Cannot find any thumbnails for the requested media ('/_matrix/media/r0/thumbnail/example.com/12345'). This might mean the media is not a supported_media_format=(image/jpeg, image/jpg, image/webp, image/gif, image/png) or that thumbnailing failed for some other reason. (Dynamic thumbnails are disabled on this server.)", }, ) else: @@ -600,7 +596,7 @@ class MediaRepoTests(unittest.HomeserverTestCase): channel.json_body, { "errcode": "M_NOT_FOUND", - "error": "Not found [b'example.com', b'12345']", + "error": "Not found '/_matrix/media/r0/thumbnail/example.com/12345'", }, ) @@ -609,34 +605,39 @@ class MediaRepoTests(unittest.HomeserverTestCase): """Test that choosing between thumbnails with the same quality rating succeeds. We are not particular about which thumbnail is chosen.""" + + content_type = self.test_image.content_type.decode() + media_repo = self.hs.get_media_repository() + thumbnail_resouce = ThumbnailResource( + self.hs, media_repo, media_repo.media_storage + ) + self.assertIsNotNone( - self.thumbnail_resource._select_thumbnail( + thumbnail_resouce._select_thumbnail( desired_width=desired_size, desired_height=desired_size, desired_method=method, - desired_type=self.test_image.content_type, + desired_type=content_type, # Provide two identical thumbnails which are guaranteed to have the same # quality rating. thumbnail_infos=[ - { - "thumbnail_width": 32, - "thumbnail_height": 32, - "thumbnail_method": method, - "thumbnail_type": self.test_image.content_type, - "thumbnail_length": 256, - "filesystem_id": f"thumbnail1{self.test_image.extension.decode()}", - }, - { - "thumbnail_width": 32, - "thumbnail_height": 32, - "thumbnail_method": method, - "thumbnail_type": self.test_image.content_type, - "thumbnail_length": 256, - "filesystem_id": f"thumbnail2{self.test_image.extension.decode()}", - }, + ThumbnailInfo( + width=32, + height=32, + method=method, + type=content_type, + length=256, + ), + ThumbnailInfo( + width=32, + height=32, + method=method, + type=content_type, + length=256, + ), ], file_id=f"image{self.test_image.extension.decode()}", - url_cache=None, + url_cache=False, server_name=None, ) ) @@ -725,13 +726,13 @@ class SpamCheckerTestCaseLegacy(unittest.HomeserverTestCase): self.user = self.register_user("user", "pass") self.tok = self.login("user", "pass") - # Allow for uploading and downloading to/from the media repo - self.media_repo = hs.get_media_repository_resource() - self.download_resource = self.media_repo.children[b"download"] - self.upload_resource = self.media_repo.children[b"upload"] - load_legacy_spam_checkers(hs) + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + def default_config(self) -> Dict[str, Any]: config = default_config("test") @@ -751,9 +752,7 @@ class SpamCheckerTestCaseLegacy(unittest.HomeserverTestCase): def test_upload_innocent(self) -> None: """Attempt to upload some innocent data that should be allowed.""" - self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=self.tok, expect_code=200 - ) + self.helper.upload_media(SMALL_PNG, tok=self.tok, expect_code=200) def test_upload_ban(self) -> None: """Attempt to upload some data that includes bytes "evil", which should @@ -762,9 +761,7 @@ class SpamCheckerTestCaseLegacy(unittest.HomeserverTestCase): data = b"Some evil data" - self.helper.upload_media( - self.upload_resource, data, tok=self.tok, expect_code=400 - ) + self.helper.upload_media(data, tok=self.tok, expect_code=400) EVIL_DATA = b"Some evil data" @@ -781,15 +778,15 @@ class SpamCheckerTestCase(unittest.HomeserverTestCase): self.user = self.register_user("user", "pass") self.tok = self.login("user", "pass") - # Allow for uploading and downloading to/from the media repo - self.media_repo = hs.get_media_repository_resource() - self.download_resource = self.media_repo.children[b"download"] - self.upload_resource = self.media_repo.children[b"upload"] - hs.get_module_api().register_spam_checker_callbacks( check_media_file_for_spam=self.check_media_file_for_spam ) + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + async def check_media_file_for_spam( self, file_wrapper: ReadableFileWrapper, file_info: FileInfo ) -> Union[Codes, Literal["NOT_SPAM"], Tuple[Codes, JsonDict]]: @@ -805,21 +802,16 @@ class SpamCheckerTestCase(unittest.HomeserverTestCase): def test_upload_innocent(self) -> None: """Attempt to upload some innocent data that should be allowed.""" - self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=self.tok, expect_code=200 - ) + self.helper.upload_media(SMALL_PNG, tok=self.tok, expect_code=200) def test_upload_ban(self) -> None: """Attempt to upload some data that includes bytes "evil", which should get rejected by the spam checker. """ - self.helper.upload_media( - self.upload_resource, EVIL_DATA, tok=self.tok, expect_code=400 - ) + self.helper.upload_media(EVIL_DATA, tok=self.tok, expect_code=400) self.helper.upload_media( - self.upload_resource, EVIL_DATA_EXPERIMENT, tok=self.tok, expect_code=400, diff --git a/tests/media/test_url_previewer.py b/tests/media/test_url_previewer.py index 46ecde5344..04b69f378a 100644 --- a/tests/media/test_url_previewer.py +++ b/tests/media/test_url_previewer.py @@ -61,9 +61,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): return self.setup_test_homeserver(config=config) def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - media_repo_resource = hs.get_media_repository_resource() - preview_url = media_repo_resource.children[b"preview_url"] - self.url_previewer = preview_url._url_previewer + media_repo = hs.get_media_repository() + assert media_repo.url_previewer is not None + self.url_previewer = media_repo.url_previewer def test_all_urls_allowed(self) -> None: self.assertFalse(self.url_previewer._is_url_blocked("http://matrix.org")) diff --git a/tests/replication/test_multi_media_repo.py b/tests/replication/test_multi_media_repo.py index 6e78daa830..b230a6c361 100644 --- a/tests/replication/test_multi_media_repo.py +++ b/tests/replication/test_multi_media_repo.py @@ -13,7 +13,7 @@ # limitations under the License. import logging import os -from typing import Optional, Tuple +from typing import Any, Optional, Tuple from twisted.internet.interfaces import IOpenSSLServerConnectionCreator from twisted.internet.protocol import Factory @@ -29,7 +29,7 @@ from synapse.util import Clock from tests.http import TestServerTLSConnectionFactory, get_test_ca_cert_file from tests.replication._base import BaseMultiWorkerStreamTestCase -from tests.server import FakeChannel, FakeSite, FakeTransport, make_request +from tests.server import FakeChannel, FakeTransport, make_request from tests.test_utils import SMALL_PNG logger = logging.getLogger(__name__) @@ -56,6 +56,16 @@ class MediaRepoShardTestCase(BaseMultiWorkerStreamTestCase): conf["federation_custom_ca_list"] = [get_test_ca_cert_file()] return conf + def make_worker_hs( + self, worker_app: str, extra_config: Optional[dict] = None, **kwargs: Any + ) -> HomeServer: + worker_hs = super().make_worker_hs(worker_app, extra_config, **kwargs) + # Force the media paths onto the replication resource. + worker_hs.get_media_repository_resource().register_servlets( + self._hs_to_site[worker_hs].resource, worker_hs + ) + return worker_hs + def _get_media_req( self, hs: HomeServer, target: str, media_id: str ) -> Tuple[FakeChannel, Request]: @@ -68,12 +78,11 @@ class MediaRepoShardTestCase(BaseMultiWorkerStreamTestCase): The channel for the *client* request and the *outbound* request for the media which the caller should respond to. """ - resource = hs.get_media_repository_resource().children[b"download"] channel = make_request( self.reactor, - FakeSite(resource, self.reactor), + self._hs_to_site[hs], "GET", - f"/{target}/{media_id}", + f"/_matrix/media/r0/download/{target}/{media_id}", shorthand=False, access_token=self.access_token, await_result=False, diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py index 359d131b37..8646b2f0fd 100644 --- a/tests/rest/admin/test_admin.py +++ b/tests/rest/admin/test_admin.py @@ -13,10 +13,12 @@ # limitations under the License. import urllib.parse +from typing import Dict from parameterized import parameterized from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource import synapse.rest.admin from synapse.http.server import JsonResource @@ -26,7 +28,6 @@ from synapse.server import HomeServer from synapse.util import Clock from tests import unittest -from tests.server import FakeSite, make_request from tests.test_utils import SMALL_PNG @@ -55,21 +56,18 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): room.register_servlets, ] - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - # Allow for uploading and downloading to/from the media repo - self.media_repo = hs.get_media_repository_resource() - self.download_resource = self.media_repo.children[b"download"] - self.upload_resource = self.media_repo.children[b"upload"] + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources def _ensure_quarantined( self, admin_user_tok: str, server_and_media_id: str ) -> None: """Ensure a piece of media is quarantined when trying to access it.""" - channel = make_request( - self.reactor, - FakeSite(self.download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id, + f"/_matrix/media/v3/download/{server_and_media_id}", shorthand=False, access_token=admin_user_tok, ) @@ -117,20 +115,16 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): non_admin_user_tok = self.login("id_nonadmin", "pass") # Upload some media into the room - response = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=admin_user_tok - ) + response = self.helper.upload_media(SMALL_PNG, tok=admin_user_tok) # Extract media ID from the response server_name_and_media_id = response["content_uri"][6:] # Cut off 'mxc://' server_name, media_id = server_name_and_media_id.split("/") # Attempt to access the media - channel = make_request( - self.reactor, - FakeSite(self.download_resource, self.reactor), + channel = self.make_request( "GET", - server_name_and_media_id, + f"/_matrix/media/v3/download/{server_name_and_media_id}", shorthand=False, access_token=non_admin_user_tok, ) @@ -173,12 +167,8 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): self.helper.join(room_id, non_admin_user, tok=non_admin_user_tok) # Upload some media - response_1 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) - response_2 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) + response_1 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) + response_2 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) # Extract mxcs mxc_1 = response_1["content_uri"] @@ -227,12 +217,8 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): non_admin_user_tok = self.login("user_nonadmin", "pass") # Upload some media - response_1 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) - response_2 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) + response_1 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) + response_2 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) # Extract media IDs server_and_media_id_1 = response_1["content_uri"][6:] @@ -265,12 +251,8 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): non_admin_user_tok = self.login("user_nonadmin", "pass") # Upload some media - response_1 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) - response_2 = self.helper.upload_media( - self.upload_resource, SMALL_PNG, tok=non_admin_user_tok - ) + response_1 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) + response_2 = self.helper.upload_media(SMALL_PNG, tok=non_admin_user_tok) # Extract media IDs server_and_media_id_1 = response_1["content_uri"][6:] @@ -304,11 +286,9 @@ class QuarantineMediaTestCase(unittest.HomeserverTestCase): self._ensure_quarantined(admin_user_tok, server_and_media_id_1) # Attempt to access each piece of media - channel = make_request( - self.reactor, - FakeSite(self.download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id_2, + f"/_matrix/media/v3/download/{server_and_media_id_2}", shorthand=False, access_token=non_admin_user_tok, ) diff --git a/tests/rest/admin/test_media.py b/tests/rest/admin/test_media.py index 6d04911d67..278808abb5 100644 --- a/tests/rest/admin/test_media.py +++ b/tests/rest/admin/test_media.py @@ -13,10 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +from typing import Dict from parameterized import parameterized from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource import synapse.rest.admin from synapse.api.errors import Codes @@ -26,22 +28,27 @@ from synapse.server import HomeServer from synapse.util import Clock from tests import unittest -from tests.server import FakeSite, make_request from tests.test_utils import SMALL_PNG VALID_TIMESTAMP = 1609459200000 # 2021-01-01 in milliseconds INVALID_TIMESTAMP_IN_S = 1893456000 # 2030-01-01 in seconds -class DeleteMediaByIDTestCase(unittest.HomeserverTestCase): +class _AdminMediaTests(unittest.HomeserverTestCase): servlets = [ synapse.rest.admin.register_servlets, synapse.rest.admin.register_servlets_for_media_repo, login.register_servlets, ] + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + + +class DeleteMediaByIDTestCase(_AdminMediaTests): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self.media_repo = hs.get_media_repository_resource() self.server_name = hs.hostname self.admin_user = self.register_user("admin", "pass", admin=True) @@ -117,12 +124,8 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase): Tests that delete a media is successfully """ - download_resource = self.media_repo.children[b"download"] - upload_resource = self.media_repo.children[b"upload"] - # Upload some media into the room response = self.helper.upload_media( - upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200, @@ -134,11 +137,9 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase): self.assertEqual(server_name, self.server_name) # Attempt to access media - channel = make_request( - self.reactor, - FakeSite(download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id, + f"/_matrix/media/v3/download/{server_and_media_id}", shorthand=False, access_token=self.admin_user_tok, ) @@ -173,11 +174,9 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase): ) # Attempt to access media - channel = make_request( - self.reactor, - FakeSite(download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id, + f"/_matrix/media/v3/download/{server_and_media_id}", shorthand=False, access_token=self.admin_user_tok, ) @@ -194,7 +193,7 @@ class DeleteMediaByIDTestCase(unittest.HomeserverTestCase): self.assertFalse(os.path.exists(local_path)) -class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase): +class DeleteMediaByDateSizeTestCase(_AdminMediaTests): servlets = [ synapse.rest.admin.register_servlets, synapse.rest.admin.register_servlets_for_media_repo, @@ -529,11 +528,8 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase): """ Create a media and return media_id and server_and_media_id """ - upload_resource = self.media_repo.children[b"upload"] - # Upload some media into the room response = self.helper.upload_media( - upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200, @@ -553,16 +549,12 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase): """ Try to access a media and check the result """ - download_resource = self.media_repo.children[b"download"] - media_id = server_and_media_id.split("/")[1] local_path = self.filepaths.local_media_filepath(media_id) - channel = make_request( - self.reactor, - FakeSite(download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id, + f"/_matrix/media/v3/download/{server_and_media_id}", shorthand=False, access_token=self.admin_user_tok, ) @@ -591,27 +583,16 @@ class DeleteMediaByDateSizeTestCase(unittest.HomeserverTestCase): self.assertFalse(os.path.exists(local_path)) -class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase): - servlets = [ - synapse.rest.admin.register_servlets, - synapse.rest.admin.register_servlets_for_media_repo, - login.register_servlets, - ] - +class QuarantineMediaByIDTestCase(_AdminMediaTests): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - media_repo = hs.get_media_repository_resource() self.store = hs.get_datastores().main self.server_name = hs.hostname self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") - # Create media - upload_resource = media_repo.children[b"upload"] - # Upload some media into the room response = self.helper.upload_media( - upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200, @@ -720,26 +701,16 @@ class QuarantineMediaByIDTestCase(unittest.HomeserverTestCase): self.assertFalse(media_info["quarantined_by"]) -class ProtectMediaByIDTestCase(unittest.HomeserverTestCase): - servlets = [ - synapse.rest.admin.register_servlets, - synapse.rest.admin.register_servlets_for_media_repo, - login.register_servlets, - ] - +class ProtectMediaByIDTestCase(_AdminMediaTests): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - media_repo = hs.get_media_repository_resource() + hs.get_media_repository_resource() self.store = hs.get_datastores().main self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") - # Create media - upload_resource = media_repo.children[b"upload"] - # Upload some media into the room response = self.helper.upload_media( - upload_resource, SMALL_PNG, tok=self.admin_user_tok, expect_code=200, @@ -816,7 +787,7 @@ class ProtectMediaByIDTestCase(unittest.HomeserverTestCase): self.assertFalse(media_info["safe_from_quarantine"]) -class PurgeMediaCacheTestCase(unittest.HomeserverTestCase): +class PurgeMediaCacheTestCase(_AdminMediaTests): servlets = [ synapse.rest.admin.register_servlets, synapse.rest.admin.register_servlets_for_media_repo, diff --git a/tests/rest/admin/test_statistics.py b/tests/rest/admin/test_statistics.py index b60f16b914..cd8ee274d8 100644 --- a/tests/rest/admin/test_statistics.py +++ b/tests/rest/admin/test_statistics.py @@ -12,9 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Optional +from typing import Dict, List, Optional from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource import synapse.rest.admin from synapse.api.errors import Codes @@ -34,8 +35,6 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase): ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: - self.media_repo = hs.get_media_repository_resource() - self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") @@ -44,6 +43,11 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase): self.url = "/_synapse/admin/v1/statistics/users/media" + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + def test_no_auth(self) -> None: """ Try to list users without authentication. @@ -470,12 +474,9 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase): user_token: Access token of the user number_media: Number of media to be created for the user """ - upload_resource = self.media_repo.children[b"upload"] for _ in range(number_media): # Upload some media into the room - self.helper.upload_media( - upload_resource, SMALL_PNG, tok=user_token, expect_code=200 - ) + self.helper.upload_media(SMALL_PNG, tok=user_token, expect_code=200) def _check_fields(self, content: List[JsonDict]) -> None: """Checks that all attributes are present in content diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index b326ad2c90..37f37a09d8 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -17,12 +17,13 @@ import hmac import os import urllib.parse from binascii import unhexlify -from typing import List, Optional +from typing import Dict, List, Optional from unittest.mock import AsyncMock, Mock, patch from parameterized import parameterized, parameterized_class from twisted.test.proto_helpers import MemoryReactor +from twisted.web.resource import Resource import synapse.rest.admin from synapse.api.constants import ApprovalNoticeMedium, LoginType, UserTypes @@ -45,7 +46,6 @@ from synapse.types import JsonDict, UserID, create_requester from synapse.util import Clock from tests import unittest -from tests.server import FakeSite, make_request from tests.test_utils import SMALL_PNG from tests.unittest import override_config @@ -3421,7 +3421,6 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.store = hs.get_datastores().main - self.media_repo = hs.get_media_repository_resource() self.filepaths = MediaFilePaths(hs.config.media.media_store_path) self.admin_user = self.register_user("admin", "pass", admin=True) @@ -3432,6 +3431,11 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): self.other_user ) + def create_resource_dict(self) -> Dict[str, Resource]: + resources = super().create_resource_dict() + resources["/_matrix/media"] = self.hs.get_media_repository_resource() + return resources + @parameterized.expand(["GET", "DELETE"]) def test_no_auth(self, method: str) -> None: """Try to list media of an user without authentication.""" @@ -3907,12 +3911,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): Returns: The ID of the newly created media. """ - upload_resource = self.media_repo.children[b"upload"] - download_resource = self.media_repo.children[b"download"] - # Upload some media into the room response = self.helper.upload_media( - upload_resource, image_data, user_token, filename, expect_code=200 + image_data, user_token, filename, expect_code=200 ) # Extract media ID from the response @@ -3920,11 +3921,9 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): media_id = server_and_media_id.split("/")[1] # Try to access a media and to create `last_access_ts` - channel = make_request( - self.reactor, - FakeSite(download_resource, self.reactor), + channel = self.make_request( "GET", - server_and_media_id, + f"/_matrix/media/v3/download/{server_and_media_id}", shorthand=False, access_token=user_token, ) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 7627823d3f..aaa4f3bba0 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -1447,6 +1447,30 @@ class RoomJoinRatelimitTestCase(RoomBase): @unittest.override_config( {"rc_joins": {"local": {"per_second": 0.5, "burst_count": 3}}} ) + def test_join_attempts_local_ratelimit(self) -> None: + """Tests that unsuccessful joins that end up being denied are rate-limited.""" + # Create 4 rooms + room_ids = [ + self.helper.create_room_as(self.user_id, is_public=True) for _ in range(4) + ] + # Pre-emptively ban the user who will attempt to join. + joiner_user_id = self.register_user("joiner", "secret") + for room_id in room_ids: + self.helper.ban(room_id, self.user_id, joiner_user_id) + + # Now make a new user try to join some of them. + # The user can make 3 requests, each of which should be denied. + for room_id in room_ids[0:3]: + self.helper.join(room_id, joiner_user_id, expect_code=HTTPStatus.FORBIDDEN) + + # The fourth attempt should be rate limited. + self.helper.join( + room_ids[3], joiner_user_id, expect_code=HTTPStatus.TOO_MANY_REQUESTS + ) + + @unittest.override_config( + {"rc_joins": {"local": {"per_second": 0.5, "burst_count": 3}}} + ) def test_join_local_ratelimit_profile_change(self) -> None: """Tests that sending a profile update into all of the user's joined rooms isn't rate-limited by the rate-limiter on joins.""" diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index 9532e5ddc1..465b696c0b 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -37,7 +37,6 @@ import attr from typing_extensions import Literal from twisted.test.proto_helpers import MemoryReactorClock -from twisted.web.resource import Resource from twisted.web.server import Site from synapse.api.constants import Membership @@ -45,7 +44,7 @@ from synapse.api.errors import Codes from synapse.server import HomeServer from synapse.types import JsonDict -from tests.server import FakeChannel, FakeSite, make_request +from tests.server import FakeChannel, make_request from tests.test_utils.html_parsers import TestHtmlParser from tests.test_utils.oidc import FakeAuthorizationGrant, FakeOidcServer @@ -558,7 +557,6 @@ class RestHelper: def upload_media( self, - resource: Resource, image_data: bytes, tok: str, filename: str = "test.png", @@ -576,7 +574,7 @@ class RestHelper: path = "/_matrix/media/r0/upload?filename=%s" % (filename,) channel = make_request( self.reactor, - FakeSite(resource, self.reactor), + self.site, "POST", path, content=image_data, diff --git a/tests/rest/media/test_url_preview.py b/tests/rest/media/test_url_preview.py index 05d5e39cab..24459c6af4 100644 --- a/tests/rest/media/test_url_preview.py +++ b/tests/rest/media/test_url_preview.py @@ -24,10 +24,10 @@ from twisted.internet.address import IPv4Address, IPv6Address from twisted.internet.error import DNSLookupError from twisted.internet.interfaces import IAddress, IResolutionReceiver from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactor +from twisted.web.resource import Resource from synapse.config.oembed import OEmbedEndpointConfig from synapse.media.url_previewer import IMAGE_CACHE_EXPIRY_MS -from synapse.rest.media.media_repository_resource import MediaRepositoryResource from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -117,8 +117,8 @@ class URLPreviewTests(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.media_repo = hs.get_media_repository() - media_repo_resource = hs.get_media_repository_resource() - self.preview_url = media_repo_resource.children[b"preview_url"] + assert self.media_repo.url_previewer is not None + self.url_previewer = self.media_repo.url_previewer self.lookups: Dict[str, Any] = {} @@ -143,8 +143,15 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.reactor.nameResolver = Resolver() # type: ignore[assignment] - def create_test_resource(self) -> MediaRepositoryResource: - return self.hs.get_media_repository_resource() + def create_resource_dict(self) -> Dict[str, Resource]: + """Create a resource tree for the test server + + A resource tree is a mapping from path to twisted.web.resource. + + The default implementation creates a JsonResource and calls each function in + `servlets` to register servlets against it. + """ + return {"/_matrix/media": self.hs.get_media_repository_resource()} def _assert_small_png(self, json_body: JsonDict) -> None: """Assert properties from the SMALL_PNG test image.""" @@ -159,7 +166,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -183,7 +190,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): # Check the cache returns the correct response channel = self.make_request( - "GET", "preview_url?url=http://matrix.org", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://matrix.org", + shorthand=False, ) # Check the cache response has the same content @@ -193,13 +202,15 @@ class URLPreviewTests(unittest.HomeserverTestCase): ) # Clear the in-memory cache - self.assertIn("http://matrix.org", self.preview_url._url_previewer._cache) - self.preview_url._url_previewer._cache.pop("http://matrix.org") - self.assertNotIn("http://matrix.org", self.preview_url._url_previewer._cache) + self.assertIn("http://matrix.org", self.url_previewer._cache) + self.url_previewer._cache.pop("http://matrix.org") + self.assertNotIn("http://matrix.org", self.url_previewer._cache) # Check the database cache returns the correct response channel = self.make_request( - "GET", "preview_url?url=http://matrix.org", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://matrix.org", + shorthand=False, ) # Check the cache response has the same content @@ -221,7 +232,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -251,7 +262,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -287,7 +298,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -328,7 +339,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -363,7 +374,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -396,7 +407,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://example.com", + "/_matrix/media/v3/preview_url?url=http://example.com", shorthand=False, await_result=False, ) @@ -425,7 +436,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.lookups["example.com"] = [(IPv4Address, "192.168.1.1")] channel = self.make_request( - "GET", "preview_url?url=http://example.com", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) # No requests made. @@ -446,7 +459,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.lookups["example.com"] = [(IPv4Address, "1.1.1.2")] channel = self.make_request( - "GET", "preview_url?url=http://example.com", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) self.assertEqual(channel.code, 502) @@ -463,7 +478,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): Blocked IP addresses, accessed directly, are not spidered. """ channel = self.make_request( - "GET", "preview_url?url=http://192.168.1.1", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://192.168.1.1", + shorthand=False, ) # No requests made. @@ -479,7 +496,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): Blocked IP ranges, accessed directly, are not spidered. """ channel = self.make_request( - "GET", "preview_url?url=http://1.1.1.2", shorthand=False + "GET", "/_matrix/media/v3/preview_url?url=http://1.1.1.2", shorthand=False ) self.assertEqual(channel.code, 403) @@ -497,7 +514,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://example.com", + "/_matrix/media/v3/preview_url?url=http://example.com", shorthand=False, await_result=False, ) @@ -533,7 +550,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): ] channel = self.make_request( - "GET", "preview_url?url=http://example.com", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) self.assertEqual(channel.code, 502) self.assertEqual( @@ -553,7 +572,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): ] channel = self.make_request( - "GET", "preview_url?url=http://example.com", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) # No requests made. @@ -574,7 +595,9 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.lookups["example.com"] = [(IPv6Address, "2001:800::1")] channel = self.make_request( - "GET", "preview_url?url=http://example.com", shorthand=False + "GET", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) self.assertEqual(channel.code, 502) @@ -591,10 +614,11 @@ class URLPreviewTests(unittest.HomeserverTestCase): OPTIONS returns the OPTIONS. """ channel = self.make_request( - "OPTIONS", "preview_url?url=http://example.com", shorthand=False + "OPTIONS", + "/_matrix/media/v3/preview_url?url=http://example.com", + shorthand=False, ) - self.assertEqual(channel.code, 200) - self.assertEqual(channel.json_body, {}) + self.assertEqual(channel.code, 204) def test_accept_language_config_option(self) -> None: """ @@ -605,7 +629,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): # Build and make a request to the server channel = self.make_request( "GET", - "preview_url?url=http://example.com", + "/_matrix/media/v3/preview_url?url=http://example.com", shorthand=False, await_result=False, ) @@ -658,7 +682,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -708,7 +732,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -750,7 +774,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -790,7 +814,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -831,7 +855,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - f"preview_url?{query_params}", + f"/_matrix/media/v3/preview_url?{query_params}", shorthand=False, ) self.pump() @@ -852,7 +876,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://matrix.org", + "/_matrix/media/v3/preview_url?url=http://matrix.org", shorthand=False, await_result=False, ) @@ -889,7 +913,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://twitter.com/matrixdotorg/status/12345", + "/_matrix/media/v3/preview_url?url=http://twitter.com/matrixdotorg/status/12345", shorthand=False, await_result=False, ) @@ -949,7 +973,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://twitter.com/matrixdotorg/status/12345", + "/_matrix/media/v3/preview_url?url=http://twitter.com/matrixdotorg/status/12345", shorthand=False, await_result=False, ) @@ -998,7 +1022,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://www.hulu.com/watch/12345", + "/_matrix/media/v3/preview_url?url=http://www.hulu.com/watch/12345", shorthand=False, await_result=False, ) @@ -1043,7 +1067,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://twitter.com/matrixdotorg/status/12345", + "/_matrix/media/v3/preview_url?url=http://twitter.com/matrixdotorg/status/12345", shorthand=False, await_result=False, ) @@ -1072,7 +1096,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://www.twitter.com/matrixdotorg/status/12345", + "/_matrix/media/v3/preview_url?url=http://www.twitter.com/matrixdotorg/status/12345", shorthand=False, await_result=False, ) @@ -1164,7 +1188,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://www.twitter.com/matrixdotorg/status/12345", + "/_matrix/media/v3/preview_url?url=http://www.twitter.com/matrixdotorg/status/12345", shorthand=False, await_result=False, ) @@ -1205,7 +1229,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=http://cdn.twitter.com/matrixdotorg", + "/_matrix/media/v3/preview_url?url=http://cdn.twitter.com/matrixdotorg", shorthand=False, await_result=False, ) @@ -1247,7 +1271,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): # Check fetching channel = self.make_request( "GET", - f"download/{host}/{media_id}", + f"/_matrix/media/v3/download/{host}/{media_id}", shorthand=False, await_result=False, ) @@ -1260,7 +1284,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - f"download/{host}/{media_id}", + f"/_matrix/media/v3/download/{host}/{media_id}", shorthand=False, await_result=False, ) @@ -1295,7 +1319,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): # Check fetching channel = self.make_request( "GET", - f"thumbnail/{host}/{media_id}?width=32&height=32&method=scale", + f"/_matrix/media/v3/thumbnail/{host}/{media_id}?width=32&height=32&method=scale", shorthand=False, await_result=False, ) @@ -1313,7 +1337,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - f"thumbnail/{host}/{media_id}?width=32&height=32&method=scale", + f"/_matrix/media/v3/thumbnail/{host}/{media_id}?width=32&height=32&method=scale", shorthand=False, await_result=False, ) @@ -1343,7 +1367,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.assertTrue(os.path.isdir(thumbnail_dir)) self.reactor.advance(IMAGE_CACHE_EXPIRY_MS * 1000 + 1) - self.get_success(self.preview_url._url_previewer._expire_url_cache_data()) + self.get_success(self.url_previewer._expire_url_cache_data()) for path in [file_path] + file_dirs + [thumbnail_dir] + thumbnail_dirs: self.assertFalse( @@ -1363,7 +1387,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=" + bad_url, + "/_matrix/media/v3/preview_url?url=" + bad_url, shorthand=False, await_result=False, ) @@ -1372,7 +1396,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=" + good_url, + "/_matrix/media/v3/preview_url?url=" + good_url, shorthand=False, await_result=False, ) @@ -1404,7 +1428,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "preview_url?url=" + bad_url, + "/_matrix/media/v3/preview_url?url=" + bad_url, shorthand=False, await_result=False, ) diff --git a/tests/unittest.py b/tests/unittest.py index dbaff361b4..99ad02eb06 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -60,7 +60,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.server import DEFAULT_ROOM_VERSION from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.federation.transport.server import TransportLayerServer -from synapse.http.server import JsonResource +from synapse.http.server import JsonResource, OptionsResource from synapse.http.site import SynapseRequest, SynapseSite from synapse.logging.context import ( SENTINEL_CONTEXT, @@ -459,7 +459,7 @@ class HomeserverTestCase(TestCase): The default calls `self.create_resource_dict` and builds the resultant dict into a tree. """ - root_resource = Resource() + root_resource = OptionsResource() create_resource_tree(self.create_resource_dict(), root_resource) return root_resource |