diff options
108 files changed, 2344 insertions, 574 deletions
diff --git a/changelog.d/11157.misc b/changelog.d/11157.misc new file mode 100644 index 0000000000..75444c51d1 --- /dev/null +++ b/changelog.d/11157.misc @@ -0,0 +1 @@ +Only allow old Element/Riot Android clients to send read receipts without a request body. All other clients must include a request body as required by the specification. Contributed by @rogersheu. diff --git a/changelog.d/11188.bugfix b/changelog.d/11188.bugfix new file mode 100644 index 0000000000..0688743c00 --- /dev/null +++ b/changelog.d/11188.bugfix @@ -0,0 +1 @@ +Allow an empty list of `state_events_at_start` to be sent when using the [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) `/batch_send` endpoint and the author of the historical messages is already part of the current room state at the given `?prev_event_id`. diff --git a/changelog.d/11207.bugfix b/changelog.d/11207.bugfix new file mode 100644 index 0000000000..7e98d565a1 --- /dev/null +++ b/changelog.d/11207.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug which could result in serialization errors and potentially duplicate transaction data when sending ephemeral events to application services. Contributed by @Fizzadar at Beeper. diff --git a/changelog.d/11210.feature b/changelog.d/11210.feature new file mode 100644 index 0000000000..8f8e386415 --- /dev/null +++ b/changelog.d/11210.feature @@ -0,0 +1 @@ +Calculate a default value for `public_baseurl` based on `server_name`. diff --git a/changelog.d/11228.feature b/changelog.d/11228.feature new file mode 100644 index 0000000000..33c1756b50 --- /dev/null +++ b/changelog.d/11228.feature @@ -0,0 +1 @@ +Allow the admin [Delete Room API](https://matrix-org.github.io/synapse/latest/admin_api/rooms.html#delete-room-api) to block a room without the need to join it. diff --git a/changelog.d/11233.misc b/changelog.d/11233.misc new file mode 100644 index 0000000000..fdf9e5642e --- /dev/null +++ b/changelog.d/11233.misc @@ -0,0 +1 @@ +Add `twine` and `towncrier` as dev dependencies, as they're used by the release script. diff --git a/changelog.d/11234.bugfix b/changelog.d/11234.bugfix new file mode 100644 index 0000000000..c0c02a58f6 --- /dev/null +++ b/changelog.d/11234.bugfix @@ -0,0 +1 @@ +Fix long-standing bug where cross signing keys were not included in the response to `/r0/keys/query` the first time a remote user was queried. diff --git a/changelog.d/11236.feature b/changelog.d/11236.feature new file mode 100644 index 0000000000..e7aeee2aa6 --- /dev/null +++ b/changelog.d/11236.feature @@ -0,0 +1 @@ +Support filtering by relation senders & types per [MSC3440](https://github.com/matrix-org/matrix-doc/pull/3440). diff --git a/changelog.d/11237.misc b/changelog.d/11237.misc new file mode 100644 index 0000000000..b90efc6535 --- /dev/null +++ b/changelog.d/11237.misc @@ -0,0 +1 @@ +Allow `stream_writers.typing` config to be a list of one worker. diff --git a/changelog.d/11239.misc b/changelog.d/11239.misc new file mode 100644 index 0000000000..48a796bed0 --- /dev/null +++ b/changelog.d/11239.misc @@ -0,0 +1 @@ +Remove debugging statement in tests. diff --git a/changelog.d/11240.bugfix b/changelog.d/11240.bugfix new file mode 100644 index 0000000000..94d73f67e3 --- /dev/null +++ b/changelog.d/11240.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where all requests that read events from the database could get stuck as a result of losing the database connection. diff --git a/changelog.d/11242.misc b/changelog.d/11242.misc new file mode 100644 index 0000000000..3a98259edf --- /dev/null +++ b/changelog.d/11242.misc @@ -0,0 +1 @@ +Split out federated PDU retrieval function into a non-cached version. diff --git a/changelog.d/11244.misc b/changelog.d/11244.misc new file mode 100644 index 0000000000..c6e65df97f --- /dev/null +++ b/changelog.d/11244.misc @@ -0,0 +1 @@ +Fix [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) historical messages backfilling in random order on remote homeservers. diff --git a/changelog.d/11246.misc b/changelog.d/11246.misc new file mode 100644 index 0000000000..e5e912c1b0 --- /dev/null +++ b/changelog.d/11246.misc @@ -0,0 +1 @@ +Add an additional test for the `cachedList` method decorator. diff --git a/changelog.d/11247.misc b/changelog.d/11247.misc new file mode 100644 index 0000000000..5ce701560e --- /dev/null +++ b/changelog.d/11247.misc @@ -0,0 +1 @@ +Clean up code relating to to-device messages and sending ephemeral events to application services. \ No newline at end of file diff --git a/changelog.d/11253.misc b/changelog.d/11253.misc new file mode 100644 index 0000000000..71c55a2751 --- /dev/null +++ b/changelog.d/11253.misc @@ -0,0 +1 @@ +Make minor correction to the type of `auth_checkers` callbacks. diff --git a/changelog.d/11255.bugfix b/changelog.d/11255.bugfix new file mode 100644 index 0000000000..ce72592624 --- /dev/null +++ b/changelog.d/11255.bugfix @@ -0,0 +1 @@ +Fix rolling back Synapse version when using workers. diff --git a/changelog.d/11257.doc b/changelog.d/11257.doc new file mode 100644 index 0000000000..1205be2add --- /dev/null +++ b/changelog.d/11257.doc @@ -0,0 +1 @@ +Add documentation for using LemonLDAP as an OpenID Connect Identity Provider. Contributed by @l00ptr. diff --git a/changelog.d/11262.bugfix b/changelog.d/11262.bugfix new file mode 100644 index 0000000000..768fbb8973 --- /dev/null +++ b/changelog.d/11262.bugfix @@ -0,0 +1 @@ +Fix a bug where if a remote event is being processed by a worker when it gets killed then it won't get processed on restart. Introduced in v1.37.1. diff --git a/changelog.d/11263.feature b/changelog.d/11263.feature new file mode 100644 index 0000000000..831e76ec9f --- /dev/null +++ b/changelog.d/11263.feature @@ -0,0 +1 @@ +Add some background update admin APIs. diff --git a/changelog.d/11269.misc b/changelog.d/11269.misc new file mode 100644 index 0000000000..a2149c2d2d --- /dev/null +++ b/changelog.d/11269.misc @@ -0,0 +1 @@ +Clean up trivial aspects of the Debian package build tooling. diff --git a/changelog.d/11270.misc b/changelog.d/11270.misc new file mode 100644 index 0000000000..e2181b9b2a --- /dev/null +++ b/changelog.d/11270.misc @@ -0,0 +1 @@ +Blacklist new SyTest that checks that key uploads are valid pending the validation being implemented in Synapse. diff --git a/changelog.d/11273.misc b/changelog.d/11273.misc new file mode 100644 index 0000000000..a2149c2d2d --- /dev/null +++ b/changelog.d/11273.misc @@ -0,0 +1 @@ +Clean up trivial aspects of the Debian package build tooling. diff --git a/changelog.d/11276.bugfix b/changelog.d/11276.bugfix new file mode 100644 index 0000000000..ce72592624 --- /dev/null +++ b/changelog.d/11276.bugfix @@ -0,0 +1 @@ +Fix rolling back Synapse version when using workers. diff --git a/changelog.d/11278.misc b/changelog.d/11278.misc new file mode 100644 index 0000000000..9b014bc8b4 --- /dev/null +++ b/changelog.d/11278.misc @@ -0,0 +1 @@ +Fix a small typo in the error response when a relation type other than 'm.annotation' is passed to `GET /rooms/{room_id}/aggregations/{event_id}`. \ No newline at end of file diff --git a/changelog.d/11282.misc b/changelog.d/11282.misc new file mode 100644 index 0000000000..4720519cbc --- /dev/null +++ b/changelog.d/11282.misc @@ -0,0 +1 @@ +Require all files in synapse/ and tests/ to pass mypy unless specifically excluded. diff --git a/changelog.d/11285.misc b/changelog.d/11285.misc new file mode 100644 index 0000000000..4720519cbc --- /dev/null +++ b/changelog.d/11285.misc @@ -0,0 +1 @@ +Require all files in synapse/ and tests/ to pass mypy unless specifically excluded. diff --git a/changelog.d/11286.doc b/changelog.d/11286.doc new file mode 100644 index 0000000000..890d7b4ee4 --- /dev/null +++ b/changelog.d/11286.doc @@ -0,0 +1 @@ +Fix typo in the word `available` and fix HTTP method (should be `GET`) for the `username_available` admin API. Contributed by Stanislav Motylkov. diff --git a/debian/build_virtualenv b/debian/build_virtualenv index 3097371d59..e691163619 100755 --- a/debian/build_virtualenv +++ b/debian/build_virtualenv @@ -40,6 +40,7 @@ dh_virtualenv \ --upgrade-pip \ --preinstall="lxml" \ --preinstall="mock" \ + --preinstall="wheel" \ --extra-pip-arg="--no-cache-dir" \ --extra-pip-arg="--compile" \ --extras="all,systemd,test" diff --git a/debian/changelog b/debian/changelog index 14748f8c25..74a98f0866 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,8 +1,12 @@ matrix-synapse-py3 (1.47.0+nmu1) UNRELEASED; urgency=medium * Update scripts to pass Shellcheck lints. + * Remove unused Vagrant scripts from debian/ directory. + * Allow building Debian packages for any architecture, not just amd64. + * Preinstall the "wheel" package when building virtualenvs. + * Do not error if /etc/default/matrix-synapse is missing. - -- root <root@cae79a6e79d7> Fri, 22 Oct 2021 22:20:31 +0000 + -- Dan Callahan <danc@element.io> Fri, 22 Oct 2021 22:20:31 +0000 matrix-synapse-py3 (1.46.0) stable; urgency=medium diff --git a/debian/control b/debian/control index 763fabd6f6..412a9e1d4c 100644 --- a/debian/control +++ b/debian/control @@ -19,7 +19,7 @@ Standards-Version: 3.9.8 Homepage: https://github.com/matrix-org/synapse Package: matrix-synapse-py3 -Architecture: amd64 +Architecture: any Provides: matrix-synapse Conflicts: matrix-synapse (<< 0.34.0.1-0matrix2), diff --git a/debian/matrix-synapse.service b/debian/matrix-synapse.service index 553babf549..bde1c6cb9f 100644 --- a/debian/matrix-synapse.service +++ b/debian/matrix-synapse.service @@ -5,7 +5,7 @@ Description=Synapse Matrix homeserver Type=notify User=matrix-synapse WorkingDirectory=/var/lib/matrix-synapse -EnvironmentFile=/etc/default/matrix-synapse +EnvironmentFile=-/etc/default/matrix-synapse ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ ExecReload=/bin/kill -HUP $MAINPID diff --git a/debian/test/.gitignore b/debian/test/.gitignore deleted file mode 100644 index 95eda73fcc..0000000000 --- a/debian/test/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -.vagrant -*.log diff --git a/debian/test/provision.sh b/debian/test/provision.sh deleted file mode 100644 index 55d7b8e03a..0000000000 --- a/debian/test/provision.sh +++ /dev/null @@ -1,24 +0,0 @@ -#!/bin/bash -# -# provisioning script for vagrant boxes for testing the matrix-synapse debs. -# -# Will install the most recent matrix-synapse-py3 deb for this platform from -# the /debs directory. - -set -e - -apt-get update -apt-get install -y lsb-release - -deb=$(find /debs -name "matrix-synapse-py3_*+$(lsb_release -cs)*.deb" | sort | tail -n1) - -debconf-set-selections <<EOF -matrix-synapse matrix-synapse/report-stats boolean false -matrix-synapse matrix-synapse/server-name string localhost:18448 -EOF - -dpkg -i "$deb" - -sed -i -e 's/port: 8448$/port: 18448/; s/port: 8008$/port: 18008' /etc/matrix-synapse/homeserver.yaml -echo 'registration_shared_secret: secret' >> /etc/matrix-synapse/homeserver.yaml -systemctl restart matrix-synapse diff --git a/debian/test/stretch/Vagrantfile b/debian/test/stretch/Vagrantfile deleted file mode 100644 index d8eff6fe11..0000000000 --- a/debian/test/stretch/Vagrantfile +++ /dev/null @@ -1,13 +0,0 @@ -# -*- mode: ruby -*- -# vi: set ft=ruby : - -ver = `cd ../../..; dpkg-parsechangelog -S Version`.strip() - -Vagrant.configure("2") do |config| - config.vm.box = "debian/stretch64" - - config.vm.synced_folder ".", "/vagrant", disabled: true - config.vm.synced_folder "../../../../debs", "/debs", type: "nfs" - - config.vm.provision "shell", path: "../provision.sh" -end diff --git a/debian/test/xenial/Vagrantfile b/debian/test/xenial/Vagrantfile deleted file mode 100644 index 189236da17..0000000000 --- a/debian/test/xenial/Vagrantfile +++ /dev/null @@ -1,10 +0,0 @@ -# -*- mode: ruby -*- -# vi: set ft=ruby : - -Vagrant.configure("2") do |config| - config.vm.box = "ubuntu/xenial64" - - config.vm.synced_folder ".", "/vagrant", disabled: true - config.vm.synced_folder "../../../../debs", "/debs" - config.vm.provision "shell", path: "../provision.sh" -end diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 35412ea92c..04320ab07b 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -51,6 +51,7 @@ - [Administration](usage/administration/README.md) - [Admin API](usage/administration/admin_api/README.md) - [Account Validity](admin_api/account_validity.md) + - [Background Updates](usage/administration/admin_api/background_updates.md) - [Delete Group](admin_api/delete_group.md) - [Event Reports](admin_api/event_reports.md) - [Media](admin_api/media_admin_api.md) diff --git a/docs/admin_api/rooms.md b/docs/admin_api/rooms.md index 1fc3cc3c42..41a4961d00 100644 --- a/docs/admin_api/rooms.md +++ b/docs/admin_api/rooms.md @@ -385,7 +385,7 @@ A response body like the following is returned: # Delete Room API -The Delete Room admin API allows server admins to remove rooms from server +The Delete Room admin API allows server admins to remove rooms from the server and block these rooms. Shuts down a room. Moves all local users and room aliases automatically to a @@ -396,13 +396,17 @@ The new room will be created with the user specified by the `new_room_user_id` p as room administrator and will contain a message explaining what happened. Users invited to the new room will have power level `-10` by default, and thus be unable to speak. -If `block` is `True` it prevents new joins to the old room. +If `block` is `true`, users will be prevented from joining the old room. +This option can also be used to pre-emptively block a room, even if it's unknown +to this homeserver. In this case, the room will be blocked, and no further action +will be taken. If `block` is `false`, attempting to delete an unknown room is +invalid and will be rejected as a bad request. This API will remove all trace of the old room from your database after removing all local users. If `purge` is `true` (the default), all traces of the old room will be removed from your database after removing all local users. If you do not want this to happen, set `purge` to `false`. -Depending on the amount of history being purged a call to the API may take +Depending on the amount of history being purged, a call to the API may take several minutes or longer. The local server will only have the power to move local user and room aliases to @@ -464,8 +468,9 @@ The following JSON body parameters are available: `new_room_user_id` in the new room. Ideally this will clearly convey why the original room was shut down. Defaults to `Sharing illegal content on this server is not permitted and rooms in violation will be blocked.` -* `block` - Optional. If set to `true`, this room will be added to a blocking list, preventing - future attempts to join the room. Defaults to `false`. +* `block` - Optional. If set to `true`, this room will be added to a blocking list, + preventing future attempts to join the room. Rooms can be blocked + even if they're not yet known to the homeserver. Defaults to `false`. * `purge` - Optional. If set to `true`, it will remove all traces of the room from your database. Defaults to `true`. * `force_purge` - Optional, and ignored unless `purge` is `true`. If set to `true`, it @@ -483,7 +488,8 @@ The following fields are returned in the JSON response body: * `failed_to_kick_users` - An array of users (`user_id`) that that were not kicked. * `local_aliases` - An array of strings representing the local aliases that were migrated from the old room to the new. -* `new_room_id` - A string representing the room ID of the new room. +* `new_room_id` - A string representing the room ID of the new room, or `null` if + no such room was created. ## Undoing room deletions diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md index f03539c9f0..16ec33b3c1 100644 --- a/docs/admin_api/user_admin_api.md +++ b/docs/admin_api/user_admin_api.md @@ -1107,7 +1107,7 @@ This endpoint will work even if registration is disabled on the server, unlike The API is: ``` -POST /_synapse/admin/v1/username_availabile?username=$localpart +GET /_synapse/admin/v1/username_available?username=$localpart ``` The request and response format is the same as the [/_matrix/client/r0/register/available](https://matrix.org/docs/spec/client_server/r0.6.0#get-matrix-client-r0-register-available) API. diff --git a/docs/modules/password_auth_provider_callbacks.md b/docs/modules/password_auth_provider_callbacks.md index 0de60b128a..e53abf6409 100644 --- a/docs/modules/password_auth_provider_callbacks.md +++ b/docs/modules/password_auth_provider_callbacks.md @@ -11,7 +11,7 @@ registered by using the Module API's `register_password_auth_provider_callbacks` _First introduced in Synapse v1.46.0_ ```python - auth_checkers: Dict[Tuple[str,Tuple], Callable] +auth_checkers: Dict[Tuple[str, Tuple[str, ...]], Callable] ``` A dict mapping from tuples of a login type identifier (such as `m.login.password`) and a diff --git a/docs/openid.md b/docs/openid.md index 4a340ef107..c74e8bda60 100644 --- a/docs/openid.md +++ b/docs/openid.md @@ -22,6 +22,7 @@ such as [Github][github-idp]. [google-idp]: https://developers.google.com/identity/protocols/oauth2/openid-connect [auth0]: https://auth0.com/ [authentik]: https://goauthentik.io/ +[lemonldap]: https://lemonldap-ng.org/ [okta]: https://www.okta.com/ [dex-idp]: https://github.com/dexidp/dex [keycloak-idp]: https://www.keycloak.org/docs/latest/server_admin/#sso-protocols @@ -243,6 +244,43 @@ oidc_providers: display_name_template: "{{ user.preferred_username|capitalize }}" # TO BE FILLED: If your users have names in Authentik and you want those in Synapse, this should be replaced with user.name|capitalize. ``` +### LemonLDAP + +[LemonLDAP::NG][lemonldap] is an open-source IdP solution. + +1. Create an OpenID Connect Relying Parties in LemonLDAP::NG +2. The parameters are: +- Client ID under the basic menu of the new Relying Parties (`Options > Basic > + Client ID`) +- Client secret (`Options > Basic > Client secret`) +- JWT Algorithm: RS256 within the security menu of the new Relying Parties + (`Options > Security > ID Token signature algorithm` and `Options > Security > + Access Token signature algorithm`) +- Scopes: OpenID, Email and Profile +- Allowed redirection addresses for login (`Options > Basic > Allowed + redirection addresses for login` ) : + `[synapse public baseurl]/_synapse/client/oidc/callback` + +Synapse config: +```yaml +oidc_providers: + - idp_id: lemonldap + idp_name: lemonldap + discover: true + issuer: "https://auth.example.org/" # TO BE FILLED: replace with your domain + client_id: "your client id" # TO BE FILLED + client_secret: "your client secret" # TO BE FILLED + scopes: + - "openid" + - "profile" + - "email" + user_mapping_provider: + config: + localpart_template: "{{ user.preferred_username }}}" + # TO BE FILLED: If your users have names in LemonLDAP::NG and you want those in Synapse, this should be replaced with user.name|capitalize or any valid filter. + display_name_template: "{{ user.preferred_username|capitalize }}" +``` + ### GitHub [GitHub][github-idp] is a bit special as it is not an OpenID Connect compliant provider, but diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index c3a4148f74..d48c08f1d9 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -91,6 +91,8 @@ pid_file: DATADIR/homeserver.pid # Otherwise, it should be the URL to reach Synapse's client HTTP listener (see # 'listeners' below). # +# Defaults to 'https://<server_name>/'. +# #public_baseurl: https://example.com/ # Uncomment the following to tell other servers to send federation traffic on @@ -1265,7 +1267,7 @@ oembed: # in on this server. # # (By default, no suggestion is made, so it is left up to the client. -# This setting is ignored unless public_baseurl is also set.) +# This setting is ignored unless public_baseurl is also explicitly set.) # #default_identity_server: https://matrix.org @@ -1290,8 +1292,6 @@ oembed: # by the Matrix Identity Service API specification: # https://matrix.org/docs/spec/identity_service/latest # -# If a delegate is specified, the config option public_baseurl must also be filled out. -# account_threepid_delegates: #email: https://example.com # Delegate email sending to example.com #msisdn: http://localhost:8090 # Delegate SMS sending to this local process @@ -1981,11 +1981,10 @@ sso: # phishing attacks from evil.site. To avoid this, include a slash after the # hostname: "https://my.client/". # - # If public_baseurl is set, then the login fallback page (used by clients - # that don't natively support the required login flows) is whitelisted in - # addition to any URLs in this list. + # The login fallback page (used by clients that don't natively support the + # required login flows) is whitelisted in addition to any URLs in this list. # - # By default, this list is empty. + # By default, this list contains only the login fallback page. # #client_whitelist: # - https://riot.im/develop diff --git a/docs/systemd-with-workers/system/matrix-synapse-worker@.service b/docs/systemd-with-workers/system/matrix-synapse-worker@.service index d164e8ce1f..8f5c44c9d4 100644 --- a/docs/systemd-with-workers/system/matrix-synapse-worker@.service +++ b/docs/systemd-with-workers/system/matrix-synapse-worker@.service @@ -15,7 +15,7 @@ Type=notify NotifyAccess=main User=matrix-synapse WorkingDirectory=/var/lib/matrix-synapse -EnvironmentFile=/etc/default/matrix-synapse +EnvironmentFile=-/etc/default/matrix-synapse ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.generic_worker --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --config-path=/etc/matrix-synapse/workers/%i.yaml ExecReload=/bin/kill -HUP $MAINPID Restart=always diff --git a/docs/systemd-with-workers/system/matrix-synapse.service b/docs/systemd-with-workers/system/matrix-synapse.service index f6b6dfd3ce..0c73fb55fb 100644 --- a/docs/systemd-with-workers/system/matrix-synapse.service +++ b/docs/systemd-with-workers/system/matrix-synapse.service @@ -10,7 +10,7 @@ Type=notify NotifyAccess=main User=matrix-synapse WorkingDirectory=/var/lib/matrix-synapse -EnvironmentFile=/etc/default/matrix-synapse +EnvironmentFile=-/etc/default/matrix-synapse ExecStartPre=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ --generate-keys ExecStart=/opt/venvs/matrix-synapse/bin/python -m synapse.app.homeserver --config-path=/etc/matrix-synapse/homeserver.yaml --config-path=/etc/matrix-synapse/conf.d/ ExecReload=/bin/kill -HUP $MAINPID diff --git a/docs/usage/administration/admin_api/background_updates.md b/docs/usage/administration/admin_api/background_updates.md new file mode 100644 index 0000000000..b36d7fe398 --- /dev/null +++ b/docs/usage/administration/admin_api/background_updates.md @@ -0,0 +1,84 @@ +# Background Updates API + +This API allows a server administrator to manage the background updates being +run against the database. + +## Status + +This API gets the current status of the background updates. + + +The API is: + +``` +GET /_synapse/admin/v1/background_updates/status +``` + +Returning: + +```json +{ + "enabled": true, + "current_updates": { + "<db_name>": { + "name": "<background_update_name>", + "total_item_count": 50, + "total_duration_ms": 10000.0, + "average_items_per_ms": 2.2, + }, + } +} +``` + +`enabled` whether the background updates are enabled or disabled. + +`db_name` the database name (usually Synapse is configured with a single database named 'master'). + +For each update: + +`name` the name of the update. +`total_item_count` total number of "items" processed (the meaning of 'items' depends on the update in question). +`total_duration_ms` how long the background process has been running, not including time spent sleeping. +`average_items_per_ms` how many items are processed per millisecond based on an exponential average. + + + +## Enabled + +This API allow pausing background updates. + +Background updates should *not* be paused for significant periods of time, as +this can affect the performance of Synapse. + +*Note*: This won't persist over restarts. + +*Note*: This won't cancel any update query that is currently running. This is +usually fine since most queries are short lived, except for `CREATE INDEX` +background updates which won't be cancelled once started. + + +The API is: + +``` +POST /_synapse/admin/v1/background_updates/enabled +``` + +with the following body: + +```json +{ + "enabled": false +} +``` + +`enabled` sets whether the background updates are enabled or disabled. + +The API returns the `enabled` param. + +```json +{ + "enabled": false +} +``` + +There is also a `GET` version which returns the `enabled` state. diff --git a/mypy.ini b/mypy.ini index 600402a5d3..1752b82bc5 100644 --- a/mypy.ini +++ b/mypy.ini @@ -10,82 +10,173 @@ warn_unreachable = True local_partial_types = True no_implicit_optional = True -# To find all folders that pass mypy you run: -# -# find synapse/* -type d -not -name __pycache__ -exec bash -c "mypy '{}' > /dev/null" \; -print - files = scripts-dev/sign_json, - synapse/__init__.py, - synapse/api, - synapse/appservice, - synapse/config, - synapse/crypto, - synapse/event_auth.py, - synapse/events, - synapse/federation, - synapse/groups, - synapse/handlers, - synapse/http, - synapse/logging, - synapse/metrics, - synapse/module_api, - synapse/notifier.py, - synapse/push, - synapse/replication, - synapse/rest, - synapse/server.py, - synapse/server_notices, - synapse/spam_checker_api, - synapse/state, - synapse/storage/__init__.py, - synapse/storage/_base.py, - synapse/storage/background_updates.py, - synapse/storage/databases/main/appservice.py, - synapse/storage/databases/main/client_ips.py, - synapse/storage/databases/main/events.py, - synapse/storage/databases/main/keys.py, - synapse/storage/databases/main/pusher.py, - synapse/storage/databases/main/registration.py, - synapse/storage/databases/main/relations.py, - synapse/storage/databases/main/session.py, - synapse/storage/databases/main/stream.py, - synapse/storage/databases/main/ui_auth.py, - synapse/storage/databases/state, - synapse/storage/database.py, - synapse/storage/engines, - synapse/storage/keys.py, - synapse/storage/persist_events.py, - synapse/storage/prepare_database.py, - synapse/storage/purge_events.py, - synapse/storage/push_rule.py, - synapse/storage/relations.py, - synapse/storage/roommember.py, - synapse/storage/state.py, - synapse/storage/types.py, - synapse/storage/util, - synapse/streams, - synapse/types.py, - synapse/util, - synapse/visibility.py, - tests/replication, - tests/test_event_auth.py, - tests/test_utils, - tests/handlers/test_password_providers.py, - tests/handlers/test_room.py, - tests/handlers/test_room_summary.py, - tests/handlers/test_send_email.py, - tests/handlers/test_sync.py, - tests/handlers/test_user_directory.py, - tests/rest/client/test_login.py, - tests/rest/client/test_auth.py, - tests/rest/client/test_relations.py, - tests/rest/media/v1/test_filepath.py, - tests/rest/media/v1/test_oembed.py, - tests/storage/test_state.py, - tests/storage/test_user_directory.py, - tests/util/test_itertools.py, - tests/util/test_stream_change_cache.py + setup.py, + synapse/, + tests/ + +# Note: Better exclusion syntax coming in mypy > 0.910 +# https://github.com/python/mypy/pull/11329 +# +# For now, set the (?x) flag enable "verbose" regexes +# https://docs.python.org/3/library/re.html#re.X +exclude = (?x) + ^( + |synapse/_scripts/register_new_matrix_user.py + |synapse/_scripts/review_recent_signups.py + |synapse/app/__init__.py + |synapse/app/_base.py + |synapse/app/admin_cmd.py + |synapse/app/appservice.py + |synapse/app/client_reader.py + |synapse/app/event_creator.py + |synapse/app/federation_reader.py + |synapse/app/federation_sender.py + |synapse/app/frontend_proxy.py + |synapse/app/generic_worker.py + |synapse/app/homeserver.py + |synapse/app/media_repository.py + |synapse/app/phone_stats_home.py + |synapse/app/pusher.py + |synapse/app/synchrotron.py + |synapse/app/user_dir.py + |synapse/storage/databases/__init__.py + |synapse/storage/databases/main/__init__.py + |synapse/storage/databases/main/account_data.py + |synapse/storage/databases/main/cache.py + |synapse/storage/databases/main/censor_events.py + |synapse/storage/databases/main/deviceinbox.py + |synapse/storage/databases/main/devices.py + |synapse/storage/databases/main/directory.py + |synapse/storage/databases/main/e2e_room_keys.py + |synapse/storage/databases/main/end_to_end_keys.py + |synapse/storage/databases/main/event_federation.py + |synapse/storage/databases/main/event_push_actions.py + |synapse/storage/databases/main/events_bg_updates.py + |synapse/storage/databases/main/events_forward_extremities.py + |synapse/storage/databases/main/events_worker.py + |synapse/storage/databases/main/filtering.py + |synapse/storage/databases/main/group_server.py + |synapse/storage/databases/main/lock.py + |synapse/storage/databases/main/media_repository.py + |synapse/storage/databases/main/metrics.py + |synapse/storage/databases/main/monthly_active_users.py + |synapse/storage/databases/main/openid.py + |synapse/storage/databases/main/presence.py + |synapse/storage/databases/main/profile.py + |synapse/storage/databases/main/purge_events.py + |synapse/storage/databases/main/push_rule.py + |synapse/storage/databases/main/receipts.py + |synapse/storage/databases/main/rejections.py + |synapse/storage/databases/main/room.py + |synapse/storage/databases/main/room_batch.py + |synapse/storage/databases/main/roommember.py + |synapse/storage/databases/main/search.py + |synapse/storage/databases/main/signatures.py + |synapse/storage/databases/main/state.py + |synapse/storage/databases/main/state_deltas.py + |synapse/storage/databases/main/stats.py + |synapse/storage/databases/main/tags.py + |synapse/storage/databases/main/transactions.py + |synapse/storage/databases/main/user_directory.py + |synapse/storage/databases/main/user_erasure_store.py + |synapse/storage/schema/ + + |tests/api/test_auth.py + |tests/api/test_ratelimiting.py + |tests/app/test_openid_listener.py + |tests/appservice/test_scheduler.py + |tests/config/test_cache.py + |tests/config/test_tls.py + |tests/crypto/test_keyring.py + |tests/events/test_presence_router.py + |tests/events/test_utils.py + |tests/federation/test_federation_catch_up.py + |tests/federation/test_federation_sender.py + |tests/federation/test_federation_server.py + |tests/federation/transport/test_knocking.py + |tests/federation/transport/test_server.py + |tests/handlers/test_cas.py + |tests/handlers/test_directory.py + |tests/handlers/test_e2e_keys.py + |tests/handlers/test_federation.py + |tests/handlers/test_oidc.py + |tests/handlers/test_presence.py + |tests/handlers/test_profile.py + |tests/handlers/test_saml.py + |tests/handlers/test_typing.py + |tests/http/federation/test_matrix_federation_agent.py + |tests/http/federation/test_srv_resolver.py + |tests/http/test_fedclient.py + |tests/http/test_proxyagent.py + |tests/http/test_servlet.py + |tests/http/test_site.py + |tests/logging/__init__.py + |tests/logging/test_terse_json.py + |tests/module_api/test_api.py + |tests/push/test_email.py + |tests/push/test_http.py + |tests/push/test_presentable_names.py + |tests/push/test_push_rule_evaluator.py + |tests/rest/admin/test_admin.py + |tests/rest/admin/test_device.py + |tests/rest/admin/test_media.py + |tests/rest/admin/test_server_notice.py + |tests/rest/admin/test_user.py + |tests/rest/admin/test_username_available.py + |tests/rest/client/test_account.py + |tests/rest/client/test_events.py + |tests/rest/client/test_filter.py + |tests/rest/client/test_groups.py + |tests/rest/client/test_register.py + |tests/rest/client/test_report_event.py + |tests/rest/client/test_rooms.py + |tests/rest/client/test_third_party_rules.py + |tests/rest/client/test_transactions.py + |tests/rest/client/test_typing.py + |tests/rest/client/utils.py + |tests/rest/key/v2/test_remote_key_resource.py + |tests/rest/media/v1/test_base.py + |tests/rest/media/v1/test_media_storage.py + |tests/rest/media/v1/test_url_preview.py + |tests/scripts/test_new_matrix_user.py + |tests/server.py + |tests/server_notices/test_resource_limits_server_notices.py + |tests/state/test_v2.py + |tests/storage/test_account_data.py + |tests/storage/test_appservice.py + |tests/storage/test_background_update.py + |tests/storage/test_base.py + |tests/storage/test_client_ips.py + |tests/storage/test_database.py + |tests/storage/test_event_federation.py + |tests/storage/test_id_generators.py + |tests/storage/test_roommember.py + |tests/test_metrics.py + |tests/test_phone_home.py + |tests/test_server.py + |tests/test_state.py + |tests/test_terms_auth.py + |tests/test_visibility.py + |tests/unittest.py + |tests/util/caches/test_cached_call.py + |tests/util/caches/test_deferred_cache.py + |tests/util/caches/test_descriptors.py + |tests/util/caches/test_response_cache.py + |tests/util/caches/test_ttlcache.py + |tests/util/test_async_helpers.py + |tests/util/test_batching_queue.py + |tests/util/test_dict_cache.py + |tests/util/test_expiring_cache.py + |tests/util/test_file_consumer.py + |tests/util/test_linearizer.py + |tests/util/test_logcontext.py + |tests/util/test_lrucache.py + |tests/util/test_rwlock.py + |tests/util/test_wheel_timer.py + |tests/utils.py + )$ [mypy-synapse.api.*] disallow_untyped_defs = True @@ -272,6 +363,9 @@ ignore_missing_imports = True [mypy-opentracing] ignore_missing_imports = True +[mypy-parameterized.*] +ignore_missing_imports = True + [mypy-phonenumbers.*] ignore_missing_imports = True diff --git a/setup.py b/setup.py index 220084a49d..5d602db240 100755 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ # limitations under the License. import glob import os +from typing import Any, Dict from setuptools import Command, find_packages, setup @@ -49,8 +50,6 @@ here = os.path.abspath(os.path.dirname(__file__)) # [1]: http://tox.readthedocs.io/en/2.5.0/example/basic.html#integration-with-setup-py-test-command # [2]: https://pypi.python.org/pypi/setuptools_trial class TestCommand(Command): - user_options = [] - def initialize_options(self): pass @@ -75,7 +74,7 @@ def read_file(path_segments): def exec_file(path_segments): """Execute a single python file to get the variables defined in it""" - result = {} + result: Dict[str, Any] = {} code = read_file(path_segments) exec(code, result) return result @@ -132,6 +131,9 @@ CONDITIONAL_REQUIREMENTS["dev"] = ( "GitPython==3.1.14", "commonmark==0.9.1", "pygithub==1.55", + # The following are executed as commands by the release script. + "twine", + "towncrier", ] ) diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py index 4b0a9b2974..13dd6ce248 100644 --- a/synapse/api/filtering.py +++ b/synapse/api/filtering.py @@ -1,7 +1,7 @@ # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd # Copyright 2018-2019 New Vector Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019-2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -86,6 +86,9 @@ ROOM_EVENT_FILTER_SCHEMA = { # cf https://github.com/matrix-org/matrix-doc/pull/2326 "org.matrix.labels": {"type": "array", "items": {"type": "string"}}, "org.matrix.not_labels": {"type": "array", "items": {"type": "string"}}, + # MSC3440, filtering by event relations. + "io.element.relation_senders": {"type": "array", "items": {"type": "string"}}, + "io.element.relation_types": {"type": "array", "items": {"type": "string"}}, }, } @@ -146,14 +149,16 @@ def matrix_user_id_validator(user_id_str: str) -> UserID: class Filtering: def __init__(self, hs: "HomeServer"): - super().__init__() + self._hs = hs self.store = hs.get_datastore() + self.DEFAULT_FILTER_COLLECTION = FilterCollection(hs, {}) + async def get_user_filter( self, user_localpart: str, filter_id: Union[int, str] ) -> "FilterCollection": result = await self.store.get_user_filter(user_localpart, filter_id) - return FilterCollection(result) + return FilterCollection(self._hs, result) def add_user_filter( self, user_localpart: str, user_filter: JsonDict @@ -191,21 +196,22 @@ FilterEvent = TypeVar("FilterEvent", EventBase, UserPresenceState, JsonDict) class FilterCollection: - def __init__(self, filter_json: JsonDict): + def __init__(self, hs: "HomeServer", filter_json: JsonDict): self._filter_json = filter_json room_filter_json = self._filter_json.get("room", {}) self._room_filter = Filter( - {k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms")} + hs, + {k: v for k, v in room_filter_json.items() if k in ("rooms", "not_rooms")}, ) - self._room_timeline_filter = Filter(room_filter_json.get("timeline", {})) - self._room_state_filter = Filter(room_filter_json.get("state", {})) - self._room_ephemeral_filter = Filter(room_filter_json.get("ephemeral", {})) - self._room_account_data = Filter(room_filter_json.get("account_data", {})) - self._presence_filter = Filter(filter_json.get("presence", {})) - self._account_data = Filter(filter_json.get("account_data", {})) + self._room_timeline_filter = Filter(hs, room_filter_json.get("timeline", {})) + self._room_state_filter = Filter(hs, room_filter_json.get("state", {})) + self._room_ephemeral_filter = Filter(hs, room_filter_json.get("ephemeral", {})) + self._room_account_data = Filter(hs, room_filter_json.get("account_data", {})) + self._presence_filter = Filter(hs, filter_json.get("presence", {})) + self._account_data = Filter(hs, filter_json.get("account_data", {})) self.include_leave = filter_json.get("room", {}).get("include_leave", False) self.event_fields = filter_json.get("event_fields", []) @@ -232,25 +238,37 @@ class FilterCollection: def include_redundant_members(self) -> bool: return self._room_state_filter.include_redundant_members - def filter_presence( + async def filter_presence( self, events: Iterable[UserPresenceState] ) -> List[UserPresenceState]: - return self._presence_filter.filter(events) + return await self._presence_filter.filter(events) - def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]: - return self._account_data.filter(events) + async def filter_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]: + return await self._account_data.filter(events) - def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]: - return self._room_state_filter.filter(self._room_filter.filter(events)) + async def filter_room_state(self, events: Iterable[EventBase]) -> List[EventBase]: + return await self._room_state_filter.filter( + await self._room_filter.filter(events) + ) - def filter_room_timeline(self, events: Iterable[EventBase]) -> List[EventBase]: - return self._room_timeline_filter.filter(self._room_filter.filter(events)) + async def filter_room_timeline( + self, events: Iterable[EventBase] + ) -> List[EventBase]: + return await self._room_timeline_filter.filter( + await self._room_filter.filter(events) + ) - def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDict]: - return self._room_ephemeral_filter.filter(self._room_filter.filter(events)) + async def filter_room_ephemeral(self, events: Iterable[JsonDict]) -> List[JsonDict]: + return await self._room_ephemeral_filter.filter( + await self._room_filter.filter(events) + ) - def filter_room_account_data(self, events: Iterable[JsonDict]) -> List[JsonDict]: - return self._room_account_data.filter(self._room_filter.filter(events)) + async def filter_room_account_data( + self, events: Iterable[JsonDict] + ) -> List[JsonDict]: + return await self._room_account_data.filter( + await self._room_filter.filter(events) + ) def blocks_all_presence(self) -> bool: return ( @@ -274,7 +292,9 @@ class FilterCollection: class Filter: - def __init__(self, filter_json: JsonDict): + def __init__(self, hs: "HomeServer", filter_json: JsonDict): + self._hs = hs + self._store = hs.get_datastore() self.filter_json = filter_json self.limit = filter_json.get("limit", 10) @@ -297,6 +317,20 @@ class Filter: self.labels = filter_json.get("org.matrix.labels", None) self.not_labels = filter_json.get("org.matrix.not_labels", []) + # Ideally these would be rejected at the endpoint if they were provided + # and not supported, but that would involve modifying the JSON schema + # based on the homeserver configuration. + if hs.config.experimental.msc3440_enabled: + self.relation_senders = self.filter_json.get( + "io.element.relation_senders", None + ) + self.relation_types = self.filter_json.get( + "io.element.relation_types", None + ) + else: + self.relation_senders = None + self.relation_types = None + def filters_all_types(self) -> bool: return "*" in self.not_types @@ -306,7 +340,7 @@ class Filter: def filters_all_rooms(self) -> bool: return "*" in self.not_rooms - def check(self, event: FilterEvent) -> bool: + def _check(self, event: FilterEvent) -> bool: """Checks whether the filter matches the given event. Args: @@ -420,8 +454,30 @@ class Filter: return room_ids - def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]: - return list(filter(self.check, events)) + async def _check_event_relations( + self, events: Iterable[FilterEvent] + ) -> List[FilterEvent]: + # The event IDs to check, mypy doesn't understand the ifinstance check. + event_ids = [event.event_id for event in events if isinstance(event, EventBase)] # type: ignore[attr-defined] + event_ids_to_keep = set( + await self._store.events_have_relations( + event_ids, self.relation_senders, self.relation_types + ) + ) + + return [ + event + for event in events + if not isinstance(event, EventBase) or event.event_id in event_ids_to_keep + ] + + async def filter(self, events: Iterable[FilterEvent]) -> List[FilterEvent]: + result = [event for event in events if self._check(event)] + + if self.relation_senders or self.relation_types: + return await self._check_event_relations(result) + + return result def with_room_ids(self, room_ids: Iterable[str]) -> "Filter": """Returns a new filter with the given room IDs appended. @@ -433,7 +489,7 @@ class Filter: filter: A new filter including the given rooms and the old filter's rooms. """ - newFilter = Filter(self.filter_json) + newFilter = Filter(self._hs, self.filter_json) newFilter.rooms += room_ids return newFilter @@ -444,6 +500,3 @@ def _matches_wildcard(actual_value: Optional[str], filter_value: str) -> bool: return actual_value.startswith(type_prefix) else: return actual_value == filter_value - - -DEFAULT_FILTER_COLLECTION = FilterCollection({}) diff --git a/synapse/api/urls.py b/synapse/api/urls.py index 6e84b1524f..4486b3bc7d 100644 --- a/synapse/api/urls.py +++ b/synapse/api/urls.py @@ -38,9 +38,6 @@ class ConsentURIBuilder: def __init__(self, hs_config: HomeServerConfig): if hs_config.key.form_secret is None: raise ConfigError("form_secret not set in config") - if hs_config.server.public_baseurl is None: - raise ConfigError("public_baseurl not set in config") - self._hmac_secret = hs_config.key.form_secret.encode("utf-8") self._public_baseurl = hs_config.server.public_baseurl diff --git a/synapse/config/account_validity.py b/synapse/config/account_validity.py index b56c2a24df..c533452cab 100644 --- a/synapse/config/account_validity.py +++ b/synapse/config/account_validity.py @@ -75,10 +75,6 @@ class AccountValidityConfig(Config): self.account_validity_period * 10.0 / 100.0 ) - if self.account_validity_renew_by_email_enabled: - if not self.root.server.public_baseurl: - raise ConfigError("Can't send renewal emails without 'public_baseurl'") - # Load account validity templates. account_validity_template_dir = account_validity_config.get("template_dir") if account_validity_template_dir is not None: diff --git a/synapse/config/cas.py b/synapse/config/cas.py index 9b58ecf3d8..3f81814043 100644 --- a/synapse/config/cas.py +++ b/synapse/config/cas.py @@ -16,7 +16,7 @@ from typing import Any, List from synapse.config.sso import SsoAttributeRequirement -from ._base import Config, ConfigError +from ._base import Config from ._util import validate_config @@ -35,14 +35,10 @@ class CasConfig(Config): if self.cas_enabled: self.cas_server_url = cas_config["server_url"] - # The public baseurl is required because it is used by the redirect - # template. - public_baseurl = self.root.server.public_baseurl - if not public_baseurl: - raise ConfigError("cas_config requires a public_baseurl to be set") - # TODO Update this to a _synapse URL. + public_baseurl = self.root.server.public_baseurl self.cas_service_url = public_baseurl + "_matrix/client/r0/login/cas/ticket" + self.cas_displayname_attribute = cas_config.get("displayname_attribute") required_attributes = cas_config.get("required_attributes") or {} self.cas_required_attributes = _parsed_required_attributes_def( diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index 8ff59aa2f8..afd65fecd3 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -186,11 +186,6 @@ class EmailConfig(Config): if not self.email_notif_from: missing.append("email.notif_from") - # public_baseurl is required to build password reset and validation links that - # will be emailed to users - if config.get("public_baseurl") is None: - missing.append("public_baseurl") - if missing: raise ConfigError( MISSING_PASSWORD_RESET_CONFIG_ERROR % (", ".join(missing),) @@ -296,9 +291,6 @@ class EmailConfig(Config): if not self.email_notif_from: missing.append("email.notif_from") - if config.get("public_baseurl") is None: - missing.append("public_baseurl") - if missing: raise ConfigError( "email.enable_notifs is True but required keys are missing: %s" diff --git a/synapse/config/oidc.py b/synapse/config/oidc.py index 10f5796330..42f113cd24 100644 --- a/synapse/config/oidc.py +++ b/synapse/config/oidc.py @@ -59,8 +59,6 @@ class OIDCConfig(Config): ) public_baseurl = self.root.server.public_baseurl - if public_baseurl is None: - raise ConfigError("oidc_config requires a public_baseurl to be set") self.oidc_callback_url = public_baseurl + "_synapse/client/oidc/callback" @property diff --git a/synapse/config/registration.py b/synapse/config/registration.py index a3d2a38c4c..5379e80715 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -45,17 +45,6 @@ class RegistrationConfig(Config): account_threepid_delegates = config.get("account_threepid_delegates") or {} self.account_threepid_delegate_email = account_threepid_delegates.get("email") self.account_threepid_delegate_msisdn = account_threepid_delegates.get("msisdn") - if ( - self.account_threepid_delegate_msisdn - and not self.root.server.public_baseurl - ): - raise ConfigError( - "The configuration option `public_baseurl` is required if " - "`account_threepid_delegate.msisdn` is set, such that " - "clients know where to submit validation tokens to. Please " - "configure `public_baseurl`." - ) - self.default_identity_server = config.get("default_identity_server") self.allow_guest_access = config.get("allow_guest_access", False) @@ -240,7 +229,7 @@ class RegistrationConfig(Config): # in on this server. # # (By default, no suggestion is made, so it is left up to the client. - # This setting is ignored unless public_baseurl is also set.) + # This setting is ignored unless public_baseurl is also explicitly set.) # #default_identity_server: https://matrix.org @@ -265,8 +254,6 @@ class RegistrationConfig(Config): # by the Matrix Identity Service API specification: # https://matrix.org/docs/spec/identity_service/latest # - # If a delegate is specified, the config option public_baseurl must also be filled out. - # account_threepid_delegates: #email: https://example.com # Delegate email sending to example.com #msisdn: http://localhost:8090 # Delegate SMS sending to this local process diff --git a/synapse/config/saml2.py b/synapse/config/saml2.py index 9c51b6a25a..ba2b0905ff 100644 --- a/synapse/config/saml2.py +++ b/synapse/config/saml2.py @@ -199,14 +199,11 @@ class SAML2Config(Config): """ import saml2 - public_baseurl = self.root.server.public_baseurl - if public_baseurl is None: - raise ConfigError("saml2_config requires a public_baseurl to be set") - if self.saml2_grandfathered_mxid_source_attribute: optional_attributes.add(self.saml2_grandfathered_mxid_source_attribute) optional_attributes -= required_attributes + public_baseurl = self.root.server.public_baseurl metadata_url = public_baseurl + "_synapse/client/saml2/metadata.xml" response_url = public_baseurl + "_synapse/client/saml2/authn_response" return { diff --git a/synapse/config/server.py b/synapse/config/server.py index a387fd9310..7bc0030a9e 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -16,6 +16,7 @@ import itertools import logging import os.path import re +import urllib.parse from textwrap import indent from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union @@ -264,10 +265,44 @@ class ServerConfig(Config): self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.serve_server_wellknown = config.get("serve_server_wellknown", False) - self.public_baseurl = config.get("public_baseurl") - if self.public_baseurl is not None: - if self.public_baseurl[-1] != "/": - self.public_baseurl += "/" + # Whether we should serve a "client well-known": + # (a) at .well-known/matrix/client on our client HTTP listener + # (b) in the response to /login + # + # ... which together help ensure that clients use our public_baseurl instead of + # whatever they were told by the user. + # + # For the sake of backwards compatibility with existing installations, this is + # True if public_baseurl is specified explicitly, and otherwise False. (The + # reasoning here is that we have no way of knowing that the default + # public_baseurl is actually correct for existing installations - many things + # will not work correctly, but that's (probably?) better than sending clients + # to a completely broken URL. + self.serve_client_wellknown = False + + public_baseurl = config.get("public_baseurl") + if public_baseurl is None: + public_baseurl = f"https://{self.server_name}/" + logger.info("Using default public_baseurl %s", public_baseurl) + else: + self.serve_client_wellknown = True + if public_baseurl[-1] != "/": + public_baseurl += "/" + self.public_baseurl = public_baseurl + + # check that public_baseurl is valid + try: + splits = urllib.parse.urlsplit(self.public_baseurl) + except Exception as e: + raise ConfigError(f"Unable to parse URL: {e}", ("public_baseurl",)) + if splits.scheme not in ("https", "http"): + raise ConfigError( + f"Invalid scheme '{splits.scheme}': only https and http are supported" + ) + if splits.query or splits.fragment: + raise ConfigError( + "public_baseurl cannot contain query parameters or a #-fragment" + ) # Whether to enable user presence. presence_config = config.get("presence") or {} @@ -773,6 +808,8 @@ class ServerConfig(Config): # Otherwise, it should be the URL to reach Synapse's client HTTP listener (see # 'listeners' below). # + # Defaults to 'https://<server_name>/'. + # #public_baseurl: https://example.com/ # Uncomment the following to tell other servers to send federation traffic on diff --git a/synapse/config/sso.py b/synapse/config/sso.py index 11a9b76aa0..60aacb13ea 100644 --- a/synapse/config/sso.py +++ b/synapse/config/sso.py @@ -101,13 +101,10 @@ class SSOConfig(Config): # gracefully to the client). This would make it pointless to ask the user for # confirmation, since the URL the confirmation page would be showing wouldn't be # the client's. - # public_baseurl is an optional setting, so we only add the fallback's URL to the - # list if it's provided (because we can't figure out what that URL is otherwise). - if self.root.server.public_baseurl: - login_fallback_url = ( - self.root.server.public_baseurl + "_matrix/static/client/login" - ) - self.sso_client_whitelist.append(login_fallback_url) + login_fallback_url = ( + self.root.server.public_baseurl + "_matrix/static/client/login" + ) + self.sso_client_whitelist.append(login_fallback_url) def generate_config_section(self, **kwargs): return """\ @@ -128,11 +125,10 @@ class SSOConfig(Config): # phishing attacks from evil.site. To avoid this, include a slash after the # hostname: "https://my.client/". # - # If public_baseurl is set, then the login fallback page (used by clients - # that don't natively support the required login flows) is whitelisted in - # addition to any URLs in this list. + # The login fallback page (used by clients that don't natively support the + # required login flows) is whitelisted in addition to any URLs in this list. # - # By default, this list is empty. + # By default, this list contains only the login fallback page. # #client_whitelist: # - https://riot.im/develop diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 462630201d..4507992031 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -63,7 +63,8 @@ class WriterLocations: Attributes: events: The instances that write to the event and backfill streams. - typing: The instance that writes to the typing stream. + typing: The instances that write to the typing stream. Currently + can only be a single instance. to_device: The instances that write to the to_device stream. Currently can only be a single instance. account_data: The instances that write to the account data streams. Currently @@ -75,9 +76,15 @@ class WriterLocations: """ events = attr.ib( - default=["master"], type=List[str], converter=_instance_to_list_converter + default=["master"], + type=List[str], + converter=_instance_to_list_converter, + ) + typing = attr.ib( + default=["master"], + type=List[str], + converter=_instance_to_list_converter, ) - typing = attr.ib(default="master", type=str) to_device = attr.ib( default=["master"], type=List[str], @@ -217,6 +224,11 @@ class WorkerConfig(Config): % (instance, stream) ) + if len(self.writers.typing) != 1: + raise ConfigError( + "Must only specify one instance to handle `typing` messages." + ) + if len(self.writers.to_device) != 1: raise ConfigError( "Must only specify one instance to handle `to_device` messages." diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 670186f548..3b85b135e0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -277,6 +277,58 @@ class FederationClient(FederationBase): return pdus + async def get_pdu_from_destination_raw( + self, + destination: str, + event_id: str, + room_version: RoomVersion, + outlier: bool = False, + timeout: Optional[int] = None, + ) -> Optional[EventBase]: + """Requests the PDU with given origin and ID from the remote home + server. Does not have any caching or rate limiting! + + Args: + destination: Which homeserver to query + event_id: event to fetch + room_version: version of the room + outlier: Indicates whether the PDU is an `outlier`, i.e. if + it's from an arbitrary point in the context as opposed to part + of the current block of PDUs. Defaults to `False` + timeout: How long to try (in ms) each destination for before + moving to the next destination. None indicates no timeout. + + Returns: + The requested PDU, or None if we were unable to find it. + + Raises: + SynapseError, NotRetryingDestination, FederationDeniedError + """ + transaction_data = await self.transport_layer.get_event( + destination, event_id, timeout=timeout + ) + + logger.debug( + "retrieved event id %s from %s: %r", + event_id, + destination, + transaction_data, + ) + + pdu_list: List[EventBase] = [ + event_from_pdu_json(p, room_version, outlier=outlier) + for p in transaction_data["pdus"] + ] + + if pdu_list and pdu_list[0]: + pdu = pdu_list[0] + + # Check signatures are correct. + signed_pdu = await self._check_sigs_and_hash(room_version, pdu) + return signed_pdu + + return None + async def get_pdu( self, destinations: Iterable[str], @@ -321,30 +373,14 @@ class FederationClient(FederationBase): continue try: - transaction_data = await self.transport_layer.get_event( - destination, event_id, timeout=timeout - ) - - logger.debug( - "retrieved event id %s from %s: %r", - event_id, - destination, - transaction_data, + signed_pdu = await self.get_pdu_from_destination_raw( + destination=destination, + event_id=event_id, + room_version=room_version, + outlier=outlier, + timeout=timeout, ) - pdu_list: List[EventBase] = [ - event_from_pdu_json(p, room_version, outlier=outlier) - for p in transaction_data["pdus"] - ] - - if pdu_list and pdu_list[0]: - pdu = pdu_list[0] - - # Check signatures are correct. - signed_pdu = await self._check_sigs_and_hash(room_version, pdu) - - break - pdu_attempts[destination] = now except SynapseError as e: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 32a75993d9..9a8758e9a6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -213,6 +213,11 @@ class FederationServer(FederationBase): self._started_handling_of_staged_events = True self._handle_old_staged_events() + # Start a periodic check for old staged events. This is to handle + # the case where locks time out, e.g. if another process gets killed + # without dropping its locks. + self._clock.looping_call(self._handle_old_staged_events, 60 * 1000) + # keep this as early as possible to make the calculated origin ts as # accurate as possible. request_time = self._clock.time_msec() @@ -1232,10 +1237,6 @@ class FederationHandlerRegistry: self.query_handlers[query_type] = handler - def register_instance_for_edu(self, edu_type: str, instance_name: str) -> None: - """Register that the EDU handler is on a different instance than master.""" - self._edu_type_to_instance[edu_type] = [instance_name] - def register_instances_for_edu( self, edu_type: str, instance_names: List[str] ) -> None: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 67f8ffcaff..9abdad262b 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.storage.databases.main.directory import RoomAliasMapping from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID +from synapse.util.async_helpers import Linearizer from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -58,6 +59,10 @@ class ApplicationServicesHandler: self.current_max = 0 self.is_processing = False + self._ephemeral_events_linearizer = Linearizer( + name="appservice_ephemeral_events" + ) + def notify_interested_services(self, max_token: RoomStreamToken) -> None: """Notifies (pushes) all application services interested in this event. @@ -183,7 +188,7 @@ class ApplicationServicesHandler: self, stream_key: str, new_token: Union[int, RoomStreamToken], - users: Optional[Collection[Union[str, UserID]]] = None, + users: Collection[Union[str, UserID]], ) -> None: """ This is called by the notifier in the background when an ephemeral event is handled @@ -198,7 +203,9 @@ class ApplicationServicesHandler: value for `stream_key` will cause this function to return early. Ephemeral events will only be pushed to appservices that have opted into - them. + receiving them by setting `push_ephemeral` to true in their registration + file. Note that while MSC2409 is experimental, this option is called + `de.sorunome.msc2409.push_ephemeral`. Appservices will only receive ephemeral events that fall within their registered user and room namespaces. @@ -209,6 +216,7 @@ class ApplicationServicesHandler: if not self.notify_appservices: return + # Ignore any unsupported streams if stream_key not in ("typing_key", "receipt_key", "presence_key"): return @@ -225,18 +233,25 @@ class ApplicationServicesHandler: # Additional context: https://github.com/matrix-org/synapse/pull/11137 assert isinstance(new_token, int) + # Check whether there are any appservices which have registered to receive + # ephemeral events. + # + # Note that whether these events are actually relevant to these appservices + # is decided later on. services = [ service for service in self.store.get_app_services() if service.supports_ephemeral ] if not services: + # Bail out early if none of the target appservices have explicitly registered + # to receive these ephemeral events. return # We only start a new background process if necessary rather than # optimistically (to cut down on overhead). self._notify_interested_services_ephemeral( - services, stream_key, new_token, users or [] + services, stream_key, new_token, users ) @wrap_as_background_process("notify_interested_services_ephemeral") @@ -247,7 +262,7 @@ class ApplicationServicesHandler: new_token: int, users: Collection[Union[str, UserID]], ) -> None: - logger.debug("Checking interested services for %s" % (stream_key)) + logger.debug("Checking interested services for %s", stream_key) with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: if stream_key == "typing_key": @@ -260,26 +275,37 @@ class ApplicationServicesHandler: events = await self._handle_typing(service, new_token) if events: self.scheduler.submit_ephemeral_events_for_as(service, events) + continue - elif stream_key == "receipt_key": - events = await self._handle_receipts(service) - if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) - - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "read_receipt", new_token + # Since we read/update the stream position for this AS/stream + with ( + await self._ephemeral_events_linearizer.queue( + (service.id, stream_key) ) + ): + if stream_key == "receipt_key": + events = await self._handle_receipts(service, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "read_receipt", new_token + ) - elif stream_key == "presence_key": - events = await self._handle_presence(service, users) - if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + elif stream_key == "presence_key": + events = await self._handle_presence(service, users, new_token) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) - # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( - service, "presence", new_token - ) + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "presence", new_token + ) async def _handle_typing( self, service: ApplicationService, new_token: int @@ -316,7 +342,9 @@ class ApplicationServicesHandler: ) return typing - async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]: + async def _handle_receipts( + self, service: ApplicationService, new_token: Optional[int] + ) -> List[JsonDict]: """ Return the latest read receipts that the given application service should receive. @@ -327,6 +355,9 @@ class ApplicationServicesHandler: Args: service: The application service to check for which events it should receive. + new_token: A receipts event stream token. Purely used to double-check that the + from_token we pull from the database isn't greater than or equal to this + token. Prevents accidentally duplicating work. Returns: A list of JSON dictionaries containing data derived from the read receipts that @@ -335,6 +366,12 @@ class ApplicationServicesHandler: from_key = await self.store.get_type_stream_id_for_appservice( service, "read_receipt" ) + if new_token is not None and new_token <= from_key: + logger.debug( + "Rejecting token lower than or equal to stored: %s" % (new_token,) + ) + return [] + receipts_source = self.event_sources.sources.receipt receipts, _ = await receipts_source.get_new_events_as( service=service, from_key=from_key @@ -342,7 +379,10 @@ class ApplicationServicesHandler: return receipts async def _handle_presence( - self, service: ApplicationService, users: Collection[Union[str, UserID]] + self, + service: ApplicationService, + users: Collection[Union[str, UserID]], + new_token: Optional[int], ) -> List[JsonDict]: """ Return the latest presence updates that the given application service should receive. @@ -355,6 +395,9 @@ class ApplicationServicesHandler: Args: service: The application service that ephemeral events are being sent to. users: The users that should receive the presence update. + new_token: A presence update stream token. Purely used to double-check that the + from_token we pull from the database isn't greater than or equal to this + token. Prevents accidentally duplicating work. Returns: A list of json dictionaries containing data derived from the presence events @@ -365,6 +408,12 @@ class ApplicationServicesHandler: from_key = await self.store.get_type_stream_id_for_appservice( service, "presence" ) + if new_token is not None and new_token <= from_key: + logger.debug( + "Rejecting token lower than or equal to stored: %s" % (new_token,) + ) + return [] + for user in users: if isinstance(user, str): user = UserID.from_string(user) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index d508d7d32a..60e59d11a0 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1989,7 +1989,9 @@ class PasswordAuthProvider: self, check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None, on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None, - auth_checkers: Optional[Dict[Tuple[str, Tuple], CHECK_AUTH_CALLBACK]] = None, + auth_checkers: Optional[ + Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK] + ] = None, ) -> None: # Register check_3pid_auth callback if check_3pid_auth is not None: diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index b6a2a34ab7..b582266af9 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -89,6 +89,13 @@ class DeviceMessageHandler: ) async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None: + """ + Handle receiving to-device messages from remote homeservers. + + Args: + origin: The remote homeserver. + content: The JSON dictionary containing the to-device messages. + """ local_messages = {} sender_user_id = content["sender"] if origin != get_domain_from_id(sender_user_id): @@ -135,12 +142,16 @@ class DeviceMessageHandler: message_type, sender_user_id, by_device ) - stream_id = await self.store.add_messages_from_remote_to_device_inbox( + # Add messages to the database. + # Retrieve the stream id of the last-processed to-device message. + last_stream_id = await self.store.add_messages_from_remote_to_device_inbox( origin, message_id, local_messages ) + # Notify listeners that there are new to-device messages to process, + # handing them the latest stream id. self.notifier.on_new_event( - "to_device_key", stream_id, users=local_messages.keys() + "to_device_key", last_stream_id, users=local_messages.keys() ) async def _check_for_unknown_devices( @@ -195,6 +206,14 @@ class DeviceMessageHandler: message_type: str, messages: Dict[str, Dict[str, JsonDict]], ) -> None: + """ + Handle a request from a user to send to-device message(s). + + Args: + requester: The user that is sending the to-device messages. + message_type: The type of to-device messages that are being sent. + messages: A dictionary containing recipients mapped to messages intended for them. + """ sender_user_id = requester.user.to_string() message_id = random_string(16) @@ -257,12 +276,16 @@ class DeviceMessageHandler: "org.matrix.opentracing_context": json_encoder.encode(context), } - stream_id = await self.store.add_messages_to_device_inbox( + # Add messages to the database. + # Retrieve the stream id of the last-processed to-device message. + last_stream_id = await self.store.add_messages_to_device_inbox( local_messages, remote_edu_contents ) + # Notify listeners that there are new to-device messages to process, + # handing them the latest stream id. self.notifier.on_new_event( - "to_device_key", stream_id, users=local_messages.keys() + "to_device_key", last_stream_id, users=local_messages.keys() ) if self.federation_sender: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index d0fb2fc7dc..60c11e3d21 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -201,95 +201,19 @@ class E2eKeysHandler: r[user_id] = remote_queries[user_id] # Now fetch any devices that we don't have in our cache - @trace - async def do_remote_query(destination: str) -> None: - """This is called when we are querying the device list of a user on - a remote homeserver and their device list is not in the device list - cache. If we share a room with this user and we're not querying for - specific user we will update the cache with their device list. - """ - - destination_query = remote_queries_not_in_cache[destination] - - # We first consider whether we wish to update the device list cache with - # the users device list. We want to track a user's devices when the - # authenticated user shares a room with the queried user and the query - # has not specified a particular device. - # If we update the cache for the queried user we remove them from further - # queries. We use the more efficient batched query_client_keys for all - # remaining users - user_ids_updated = [] - for (user_id, device_list) in destination_query.items(): - if user_id in user_ids_updated: - continue - - if device_list: - continue - - room_ids = await self.store.get_rooms_for_user(user_id) - if not room_ids: - continue - - # We've decided we're sharing a room with this user and should - # probably be tracking their device lists. However, we haven't - # done an initial sync on the device list so we do it now. - try: - if self._is_master: - user_devices = await self.device_handler.device_list_updater.user_device_resync( - user_id - ) - else: - user_devices = await self._user_device_resync_client( - user_id=user_id - ) - - user_devices = user_devices["devices"] - user_results = results.setdefault(user_id, {}) - for device in user_devices: - user_results[device["device_id"]] = device["keys"] - user_ids_updated.append(user_id) - except Exception as e: - failures[destination] = _exception_to_failure(e) - - if len(destination_query) == len(user_ids_updated): - # We've updated all the users in the query and we do not need to - # make any further remote calls. - return - - # Remove all the users from the query which we have updated - for user_id in user_ids_updated: - destination_query.pop(user_id) - - try: - remote_result = await self.federation.query_client_keys( - destination, {"device_keys": destination_query}, timeout=timeout - ) - - for user_id, keys in remote_result["device_keys"].items(): - if user_id in destination_query: - results[user_id] = keys - - if "master_keys" in remote_result: - for user_id, key in remote_result["master_keys"].items(): - if user_id in destination_query: - cross_signing_keys["master_keys"][user_id] = key - - if "self_signing_keys" in remote_result: - for user_id, key in remote_result["self_signing_keys"].items(): - if user_id in destination_query: - cross_signing_keys["self_signing_keys"][user_id] = key - - except Exception as e: - failure = _exception_to_failure(e) - failures[destination] = failure - set_tag("error", True) - set_tag("reason", failure) - await make_deferred_yieldable( defer.gatherResults( [ - run_in_background(do_remote_query, destination) - for destination in remote_queries_not_in_cache + run_in_background( + self._query_devices_for_destination, + results, + cross_signing_keys, + failures, + destination, + queries, + timeout, + ) + for destination, queries in remote_queries_not_in_cache.items() ], consumeErrors=True, ).addErrback(unwrapFirstError) @@ -301,6 +225,121 @@ class E2eKeysHandler: return ret + @trace + async def _query_devices_for_destination( + self, + results: JsonDict, + cross_signing_keys: JsonDict, + failures: Dict[str, JsonDict], + destination: str, + destination_query: Dict[str, Iterable[str]], + timeout: int, + ) -> None: + """This is called when we are querying the device list of a user on + a remote homeserver and their device list is not in the device list + cache. If we share a room with this user and we're not querying for + specific user we will update the cache with their device list. + + Args: + results: A map from user ID to their device keys, which gets + updated with the newly fetched keys. + cross_signing_keys: Map from user ID to their cross signing keys, + which gets updated with the newly fetched keys. + failures: Map of destinations to failures that have occurred while + attempting to fetch keys. + destination: The remote server to query + destination_query: The query dict of devices to query the remote + server for. + timeout: The timeout for remote HTTP requests. + """ + + # We first consider whether we wish to update the device list cache with + # the users device list. We want to track a user's devices when the + # authenticated user shares a room with the queried user and the query + # has not specified a particular device. + # If we update the cache for the queried user we remove them from further + # queries. We use the more efficient batched query_client_keys for all + # remaining users + user_ids_updated = [] + for (user_id, device_list) in destination_query.items(): + if user_id in user_ids_updated: + continue + + if device_list: + continue + + room_ids = await self.store.get_rooms_for_user(user_id) + if not room_ids: + continue + + # We've decided we're sharing a room with this user and should + # probably be tracking their device lists. However, we haven't + # done an initial sync on the device list so we do it now. + try: + if self._is_master: + resync_results = await self.device_handler.device_list_updater.user_device_resync( + user_id + ) + else: + resync_results = await self._user_device_resync_client( + user_id=user_id + ) + + # Add the device keys to the results. + user_devices = resync_results["devices"] + user_results = results.setdefault(user_id, {}) + for device in user_devices: + user_results[device["device_id"]] = device["keys"] + user_ids_updated.append(user_id) + + # Add any cross signing keys to the results. + master_key = resync_results.get("master_key") + self_signing_key = resync_results.get("self_signing_key") + + if master_key: + cross_signing_keys["master_keys"][user_id] = master_key + + if self_signing_key: + cross_signing_keys["self_signing_keys"][user_id] = self_signing_key + except Exception as e: + failures[destination] = _exception_to_failure(e) + + if len(destination_query) == len(user_ids_updated): + # We've updated all the users in the query and we do not need to + # make any further remote calls. + return + + # Remove all the users from the query which we have updated + for user_id in user_ids_updated: + destination_query.pop(user_id) + + try: + remote_result = await self.federation.query_client_keys( + destination, {"device_keys": destination_query}, timeout=timeout + ) + + for user_id, keys in remote_result["device_keys"].items(): + if user_id in destination_query: + results[user_id] = keys + + if "master_keys" in remote_result: + for user_id, key in remote_result["master_keys"].items(): + if user_id in destination_query: + cross_signing_keys["master_keys"][user_id] = key + + if "self_signing_keys" in remote_result: + for user_id, key in remote_result["self_signing_keys"].items(): + if user_id in destination_query: + cross_signing_keys["self_signing_keys"][user_id] = key + + except Exception as e: + failure = _exception_to_failure(e) + failures[destination] = failure + set_tag("error", True) + set_tag("reason", failure) + + return + async def get_cross_signing_keys_from_cache( self, query: Iterable[str], from_user_id: Optional[str] ) -> Dict[str, Dict[str, dict]]: diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 6a315117ba..3dbe611f95 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -537,10 +537,6 @@ class IdentityHandler: except RequestTimedOutError: raise SynapseError(500, "Timed out contacting identity server") - # It is already checked that public_baseurl is configured since this code - # should only be used if account_threepid_delegate_msisdn is true. - assert self.hs.config.server.public_baseurl - # we need to tell the client to send the token back to us, since it doesn't # otherwise know where to send it, so add submit_url response parameter # (see also MSC2078) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index abfe7be0e3..aa26911aed 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -424,7 +424,7 @@ class PaginationHandler: if events: if event_filter: - events = event_filter.filter(events) + events = await event_filter.filter(events) events = await filter_events_for_client( self.storage, user_id, events, is_peeking=(member_event_id is None) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 969eb3b9b0..11af30eee7 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -12,8 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Contains functions for performing events on rooms.""" - +"""Contains functions for performing actions on rooms.""" import itertools import logging import math @@ -31,6 +30,8 @@ from typing import ( Tuple, ) +from typing_extensions import TypedDict + from synapse.api.constants import ( EventContentFields, EventTypes, @@ -1158,8 +1159,10 @@ class RoomContextHandler: ) if event_filter: - results["events_before"] = event_filter.filter(results["events_before"]) - results["events_after"] = event_filter.filter(results["events_after"]) + results["events_before"] = await event_filter.filter( + results["events_before"] + ) + results["events_after"] = await event_filter.filter(results["events_after"]) results["events_before"] = await filter_evts(results["events_before"]) results["events_after"] = await filter_evts(results["events_after"]) @@ -1195,7 +1198,7 @@ class RoomContextHandler: state_events = list(state[last_event_id].values()) if event_filter: - state_events = event_filter.filter(state_events) + state_events = await event_filter.filter(state_events) results["state"] = await filter_evts(state_events) @@ -1275,6 +1278,13 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]): return self.store.get_room_events_max_id(room_id) +class ShutdownRoomResponse(TypedDict): + kicked_users: List[str] + failed_to_kick_users: List[str] + local_aliases: List[str] + new_room_id: Optional[str] + + class RoomShutdownHandler: DEFAULT_MESSAGE = ( @@ -1300,7 +1310,7 @@ class RoomShutdownHandler: new_room_name: Optional[str] = None, message: Optional[str] = None, block: bool = False, - ) -> dict: + ) -> ShutdownRoomResponse: """ Shuts down a room. Moves all local users and room aliases automatically to a new room if `new_room_user_id` is set. Otherwise local users only @@ -1334,8 +1344,13 @@ class RoomShutdownHandler: Defaults to `Sharing illegal content on this server is not permitted and rooms in violation will be blocked.` block: - If set to `true`, this room will be added to a blocking list, - preventing future attempts to join the room. Defaults to `false`. + If set to `True`, users will be prevented from joining the old + room. This option can also be used to pre-emptively block a room, + even if it's unknown to this homeserver. In this case, the room + will be blocked, and no further action will be taken. If `False`, + attempting to delete an unknown room is invalid. + + Defaults to `False`. Returns: a dict containing the following keys: kicked_users: An array of users (`user_id`) that were kicked. @@ -1344,7 +1359,9 @@ class RoomShutdownHandler: local_aliases: An array of strings representing the local aliases that were migrated from the old room to the new. - new_room_id: A string representing the room ID of the new room. + new_room_id: + A string representing the room ID of the new room, or None if + no such room was created. """ if not new_room_name: @@ -1355,14 +1372,28 @@ class RoomShutdownHandler: if not RoomID.is_valid(room_id): raise SynapseError(400, "%s is not a legal room ID" % (room_id,)) - if not await self.store.get_room(room_id): - raise NotFoundError("Unknown room id %s" % (room_id,)) - - # This will work even if the room is already blocked, but that is - # desirable in case the first attempt at blocking the room failed below. + # Action the block first (even if the room doesn't exist yet) if block: + # This will work even if the room is already blocked, but that is + # desirable in case the first attempt at blocking the room failed below. await self.store.block_room(room_id, requester_user_id) + if not await self.store.get_room(room_id): + if block: + # We allow you to block an unknown room. + return { + "kicked_users": [], + "failed_to_kick_users": [], + "local_aliases": [], + "new_room_id": None, + } + else: + # But if you don't want to preventatively block another room, + # this function can't do anything useful. + raise NotFoundError( + "Cannot shut down room: unknown room id %s" % (room_id,) + ) + if new_room_user_id is not None: if not self.hs.is_mine_id(new_room_user_id): raise SynapseError( diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index 6e4dff8056..ab7eaab2fb 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -180,7 +180,7 @@ class SearchHandler: % (set(group_keys) - {"room_id", "sender"},), ) - search_filter = Filter(filter_dict) + search_filter = Filter(self.hs, filter_dict) # TODO: Search through left rooms too rooms = await self.store.get_rooms_for_local_user_where_membership_is( @@ -242,7 +242,7 @@ class SearchHandler: rank_map.update({r["event"].event_id: r["rank"] for r in results}) - filtered_events = search_filter.filter([r["event"] for r in results]) + filtered_events = await search_filter.filter([r["event"] for r in results]) events = await filter_events_for_client( self.storage, user.to_string(), filtered_events @@ -292,7 +292,9 @@ class SearchHandler: rank_map.update({r["event"].event_id: r["rank"] for r in results}) - filtered_events = search_filter.filter([r["event"] for r in results]) + filtered_events = await search_filter.filter( + [r["event"] for r in results] + ) events = await filter_events_for_client( self.storage, user.to_string(), filtered_events diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 2c7c6d63a9..891435c14d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -510,7 +510,7 @@ class SyncHandler: log_kv({"limited": limited}) if potential_recents: - recents = sync_config.filter_collection.filter_room_timeline( + recents = await sync_config.filter_collection.filter_room_timeline( potential_recents ) log_kv({"recents_after_sync_filtering": len(recents)}) @@ -575,8 +575,8 @@ class SyncHandler: log_kv({"loaded_recents": len(events)}) - loaded_recents = sync_config.filter_collection.filter_room_timeline( - events + loaded_recents = ( + await sync_config.filter_collection.filter_room_timeline(events) ) log_kv({"loaded_recents_after_sync_filtering": len(loaded_recents)}) @@ -1015,7 +1015,7 @@ class SyncHandler: return { (e.type, e.state_key): e - for e in sync_config.filter_collection.filter_room_state( + for e in await sync_config.filter_collection.filter_room_state( list(state.values()) ) if e.type != EventTypes.Aliases # until MSC2261 or alternative solution @@ -1383,7 +1383,7 @@ class SyncHandler: sync_config.user ) - account_data_for_user = sync_config.filter_collection.filter_account_data( + account_data_for_user = await sync_config.filter_collection.filter_account_data( [ {"type": account_data_type, "content": content} for account_data_type, content in account_data.items() @@ -1448,7 +1448,7 @@ class SyncHandler: # Deduplicate the presence entries so that there's at most one per user presence = list({p.user_id: p for p in presence}.values()) - presence = sync_config.filter_collection.filter_presence(presence) + presence = await sync_config.filter_collection.filter_presence(presence) sync_result_builder.presence = presence @@ -2021,12 +2021,14 @@ class SyncHandler: ) account_data_events = ( - sync_config.filter_collection.filter_room_account_data( + await sync_config.filter_collection.filter_room_account_data( account_data_events ) ) - ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) + ephemeral = await sync_config.filter_collection.filter_room_ephemeral( + ephemeral + ) if not ( always_include diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c411d69924..22c6174821 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -62,8 +62,8 @@ class FollowerTypingHandler: if hs.should_send_federation(): self.federation = hs.get_federation_sender() - if hs.config.worker.writers.typing != hs.get_instance_name(): - hs.get_federation_registry().register_instance_for_edu( + if hs.get_instance_name() not in hs.config.worker.writers.typing: + hs.get_federation_registry().register_instances_for_edu( "m.typing", hs.config.worker.writers.typing, ) @@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - assert hs.config.worker.writers.typing == hs.get_instance_name() + assert hs.get_instance_name() in hs.config.worker.writers.typing self.auth = hs.get_auth() self.notifier = hs.get_notifier() diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 06fd06fdf3..21293038ef 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -138,7 +138,7 @@ class ReplicationCommandHandler: if isinstance(stream, TypingStream): # Only add TypingStream as a source on the instance in charge of # typing. - if hs.config.worker.writers.typing == hs.get_instance_name(): + if hs.get_instance_name() in hs.config.worker.writers.typing: self._streams_to_replicate.append(stream) continue diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index c8b188ae4e..743a01da08 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -328,8 +328,7 @@ class TypingStream(Stream): ROW_TYPE = TypingStreamRow def __init__(self, hs: "HomeServer"): - writer_instance = hs.config.worker.writers.typing - if writer_instance == hs.get_instance_name(): + if hs.get_instance_name() in hs.config.worker.writers.typing: # On the writer, query the typing handler typing_writer_handler = hs.get_typing_writer_handler() update_function: Callable[ diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 70514e814f..81e98f81d6 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -25,6 +25,10 @@ from synapse.http.server import HttpServer, JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseRequest from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin +from synapse.rest.admin.background_updates import ( + BackgroundUpdateEnabledRestServlet, + BackgroundUpdateRestServlet, +) from synapse.rest.admin.devices import ( DeleteDevicesRestServlet, DeviceRestServlet, @@ -247,6 +251,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: # Some servlets only get registered for the main process. if hs.config.worker.worker_app is None: SendServerNoticeServlet(hs).register(http_server) + BackgroundUpdateEnabledRestServlet(hs).register(http_server) + BackgroundUpdateRestServlet(hs).register(http_server) def register_servlets_for_client_rest_resource( diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py new file mode 100644 index 0000000000..0d0183bf20 --- /dev/null +++ b/synapse/rest/admin/background_updates.py @@ -0,0 +1,107 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +from typing import TYPE_CHECKING, Tuple + +from synapse.api.errors import SynapseError +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.site import SynapseRequest +from synapse.rest.admin._base import admin_patterns, assert_user_is_admin +from synapse.types import JsonDict + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class BackgroundUpdateEnabledRestServlet(RestServlet): + """Allows temporarily disabling background updates""" + + PATTERNS = admin_patterns("/background_updates/enabled") + + def __init__(self, hs: "HomeServer"): + self.group_server = hs.get_groups_server_handler() + self.is_mine_id = hs.is_mine_id + self.auth = hs.get_auth() + + self.data_stores = hs.get_datastores() + + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester.user) + + # We need to check that all configured databases have updates enabled. + # (They *should* all be in sync.) + enabled = all(db.updates.enabled for db in self.data_stores.databases) + + return 200, {"enabled": enabled} + + async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester.user) + + body = parse_json_object_from_request(request) + + enabled = body.get("enabled", True) + + if not isinstance(enabled, bool): + raise SynapseError(400, "'enabled' parameter must be a boolean") + + for db in self.data_stores.databases: + db.updates.enabled = enabled + + # If we're re-enabling them ensure that we start the background + # process again. + if enabled: + db.updates.start_doing_background_updates() + + return 200, {"enabled": enabled} + + +class BackgroundUpdateRestServlet(RestServlet): + """Fetch information about background updates""" + + PATTERNS = admin_patterns("/background_updates/status") + + def __init__(self, hs: "HomeServer"): + self.group_server = hs.get_groups_server_handler() + self.is_mine_id = hs.is_mine_id + self.auth = hs.get_auth() + + self.data_stores = hs.get_datastores() + + async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]: + requester = await self.auth.get_user_by_req(request) + await assert_user_is_admin(self.auth, requester.user) + + # We need to check that all configured databases have updates enabled. + # (They *should* all be in sync.) + enabled = all(db.updates.enabled for db in self.data_stores.databases) + + current_updates = {} + + for db in self.data_stores.databases: + update = db.updates.get_current_update() + if not update: + continue + + current_updates[db.name()] = { + "name": update.name, + "total_item_count": update.total_item_count, + "total_duration_ms": update.total_duration_ms, + "average_items_per_ms": update.average_items_per_ms(), + } + + return 200, {"enabled": enabled, "current_updates": current_updates} diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 05c5b4bf0c..a2f4edebb8 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -13,7 +13,7 @@ # limitations under the License. import logging from http import HTTPStatus -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, List, Optional, Tuple, cast from urllib import parse as urlparse from synapse.api.constants import EventTypes, JoinRules, Membership @@ -239,9 +239,22 @@ class RoomRestServlet(RestServlet): # Purge room if purge: - await pagination_handler.purge_room(room_id, force=force_purge) - - return 200, ret + try: + await pagination_handler.purge_room(room_id, force=force_purge) + except NotFoundError: + if block: + # We can block unknown rooms with this endpoint, in which case + # a failed purge is expected. + pass + else: + # But otherwise, we expect this purge to have succeeded. + raise + + # Cast safety: cast away the knowledge that this is a TypedDict. + # See https://github.com/python/mypy/issues/4976#issuecomment-579883622 + # for some discussion on why this is necessary. Either way, + # `ret` is an opaque dictionary blob as far as the rest of the app cares. + return 200, cast(JsonDict, ret) class RoomMembersRestServlet(RestServlet): @@ -583,6 +596,7 @@ class RoomEventContextServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() + self._hs = hs self.clock = hs.get_clock() self.room_context_handler = hs.get_room_context_handler() self._event_serializer = hs.get_event_client_serializer() @@ -600,7 +614,9 @@ class RoomEventContextServlet(RestServlet): filter_str = parse_string(request, "filter", encoding="utf-8") if filter_str: filter_json = urlparse.unquote(filter_str) - event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json)) + event_filter: Optional[Filter] = Filter( + self._hs, json_decoder.decode(filter_json) + ) else: event_filter = None diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py index 9770413c61..2b25b9aad6 100644 --- a/synapse/rest/client/receipts.py +++ b/synapse/rest/client/receipts.py @@ -13,10 +13,12 @@ # limitations under the License. import logging +import re from typing import TYPE_CHECKING, Tuple from synapse.api.constants import ReadReceiptEventFields from synapse.api.errors import Codes, SynapseError +from synapse.http import get_request_user_agent from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseRequest @@ -24,6 +26,8 @@ from synapse.types import JsonDict from ._base import client_patterns +pattern = re.compile(r"(?:Element|SchildiChat)/1\.[012]\.") + if TYPE_CHECKING: from synapse.server import HomeServer @@ -52,7 +56,13 @@ class ReceiptRestServlet(RestServlet): if receipt_type != "m.read": raise SynapseError(400, "Receipt type must be 'm.read'") - body = parse_json_object_from_request(request, allow_empty_body=True) + # Do not allow older SchildiChat and Element Android clients (prior to Element/1.[012].x) to send an empty body. + user_agent = get_request_user_agent(request) + allow_empty_body = False + if "Android" in user_agent: + if pattern.match(user_agent) or "Riot" in user_agent: + allow_empty_body = True + body = parse_json_object_from_request(request, allow_empty_body) hidden = body.get(ReadReceiptEventFields.MSC2285_HIDDEN, False) if not isinstance(hidden, bool): diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index 58f6699073..184cfbe196 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -298,7 +298,9 @@ class RelationAggregationPaginationServlet(RestServlet): raise SynapseError(404, "Unknown parent event.") if relation_type not in (RelationTypes.ANNOTATION, None): - raise SynapseError(400, "Relation type must be 'annotation'") + raise SynapseError( + 400, f"Relation type must be '{RelationTypes.ANNOTATION}'" + ) limit = parse_integer(request, "limit", default=5) from_token_str = parse_string(request, "from") diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index ed95189b6d..03a353d53c 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -550,6 +550,7 @@ class RoomMessageListRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() + self._hs = hs self.pagination_handler = hs.get_pagination_handler() self.auth = hs.get_auth() self.store = hs.get_datastore() @@ -567,7 +568,9 @@ class RoomMessageListRestServlet(RestServlet): filter_str = parse_string(request, "filter", encoding="utf-8") if filter_str: filter_json = urlparse.unquote(filter_str) - event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json)) + event_filter: Optional[Filter] = Filter( + self._hs, json_decoder.decode(filter_json) + ) if ( event_filter and event_filter.filter_json.get("event_format", "client") @@ -672,6 +675,7 @@ class RoomEventContextServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() + self._hs = hs self.clock = hs.get_clock() self.room_context_handler = hs.get_room_context_handler() self._event_serializer = hs.get_event_client_serializer() @@ -688,7 +692,9 @@ class RoomEventContextServlet(RestServlet): filter_str = parse_string(request, "filter", encoding="utf-8") if filter_str: filter_json = urlparse.unquote(filter_str) - event_filter: Optional[Filter] = Filter(json_decoder.decode(filter_json)) + event_filter: Optional[Filter] = Filter( + self._hs, json_decoder.decode(filter_json) + ) else: event_filter = None @@ -914,7 +920,7 @@ class RoomTypingRestServlet(RestServlet): # If we're not on the typing writer instance we should scream if we get # requests. self._is_typing_writer = ( - hs.config.worker.writers.typing == hs.get_instance_name() + hs.get_instance_name() in hs.config.worker.writers.typing ) async def on_PUT( diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index 982763f2d8..c9509d2ae3 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -139,20 +139,22 @@ class RoomBatchSendEventRestServlet(RestServlet): errcode=Codes.INVALID_PARAM, ) + state_event_ids_at_start = [] # Create and persist all of the state events that float off on their own # before the batch. These will most likely be all of the invite/member # state events used to auth the upcoming historical messages. - state_event_ids_at_start = ( - await self.room_batch_handler.persist_state_events_at_start( - state_events_at_start=body["state_events_at_start"], - room_id=room_id, - initial_auth_event_ids=auth_event_ids, - app_service_requester=requester, + if body["state_events_at_start"]: + state_event_ids_at_start = ( + await self.room_batch_handler.persist_state_events_at_start( + state_events_at_start=body["state_events_at_start"], + room_id=room_id, + initial_auth_event_ids=auth_event_ids, + app_service_requester=requester, + ) ) - ) - # Update our ongoing auth event ID list with all of the new state we - # just created - auth_event_ids.extend(state_event_ids_at_start) + # Update our ongoing auth event ID list with all of the new state we + # just created + auth_event_ids.extend(state_event_ids_at_start) inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids( prev_event_ids_from_query @@ -205,8 +207,11 @@ class RoomBatchSendEventRestServlet(RestServlet): # Also connect the historical event chain to the end of the floating # state chain, which causes the HS to ask for the state at the start of - # the batch later. - prev_event_ids = [state_event_ids_at_start[-1]] + # the batch later. If there is no state chain to connect to, just make + # the insertion event float itself. + prev_event_ids = [] + if len(state_event_ids_at_start): + prev_event_ids = [state_event_ids_at_start[-1]] # Create and persist all of the historical events as well as insertion # and batch meta events to make the batch navigable in the DAG. diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index 913216a7c4..8c0fdb1940 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -29,7 +29,7 @@ from typing import ( from synapse.api.constants import Membership, PresenceState from synapse.api.errors import Codes, StoreError, SynapseError -from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection +from synapse.api.filtering import FilterCollection from synapse.api.presence import UserPresenceState from synapse.events import EventBase from synapse.events.utils import ( @@ -150,7 +150,7 @@ class SyncRestServlet(RestServlet): request_key = (user, timeout, since, filter_id, full_state, device_id) if filter_id is None: - filter_collection = DEFAULT_FILTER_COLLECTION + filter_collection = self.filtering.DEFAULT_FILTER_COLLECTION elif filter_id.startswith("{"): try: filter_object = json_decoder.decode(filter_id) @@ -160,7 +160,7 @@ class SyncRestServlet(RestServlet): except Exception: raise SynapseError(400, "Invalid filter JSON") self.filtering.check_valid_filter(filter_object) - filter_collection = FilterCollection(filter_object) + filter_collection = FilterCollection(self.hs, filter_object) else: try: filter_collection = await self.filtering.get_user_filter( diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py index edbf5ce5d0..04b035a1b1 100644 --- a/synapse/rest/well_known.py +++ b/synapse/rest/well_known.py @@ -34,8 +34,7 @@ class WellKnownBuilder: self._config = hs.config def get_well_known(self) -> Optional[JsonDict]: - # if we don't have a public_baseurl, we can't help much here. - if self._config.server.public_baseurl is None: + if not self._config.server.serve_client_wellknown: return None result = {"m.homeserver": {"base_url": self._config.server.public_baseurl}} diff --git a/synapse/server.py b/synapse/server.py index 0fbf36ba99..013a7bacaa 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -463,7 +463,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_typing_writer_handler(self) -> TypingWriterHandler: - if self.config.worker.writers.typing == self.get_instance_name(): + if self.get_instance_name() in self.config.worker.writers.typing: return TypingWriterHandler(self) else: raise Exception("Workers cannot write typing") @@ -474,7 +474,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_typing_handler(self) -> FollowerTypingHandler: - if self.config.worker.writers.typing == self.get_instance_name(): + if self.get_instance_name() in self.config.worker.writers.typing: # Use get_typing_writer_handler to ensure that we use the same # cached version. return self.get_typing_writer_handler() diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 82b31d24f1..b9a8ca997e 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -100,29 +100,58 @@ class BackgroundUpdater: ] = {} self._all_done = False + # Whether we're currently running updates + self._running = False + + # Whether background updates are enabled. This allows us to + # enable/disable background updates via the admin API. + self.enabled = True + + def get_current_update(self) -> Optional[BackgroundUpdatePerformance]: + """Returns the current background update, if any.""" + + update_name = self._current_background_update + if not update_name: + return None + + perf = self._background_update_performance.get(update_name) + if not perf: + perf = BackgroundUpdatePerformance(update_name) + + return perf + def start_doing_background_updates(self) -> None: - run_as_background_process("background_updates", self.run_background_updates) + if self.enabled: + run_as_background_process("background_updates", self.run_background_updates) async def run_background_updates(self, sleep: bool = True) -> None: - logger.info("Starting background schema updates") - while True: - if sleep: - await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) + if self._running or not self.enabled: + return - try: - result = await self.do_next_background_update( - self.BACKGROUND_UPDATE_DURATION_MS - ) - except Exception: - logger.exception("Error doing update") - else: - if result: - logger.info( - "No more background updates to do." - " Unscheduling background update task." + self._running = True + + try: + logger.info("Starting background schema updates") + while self.enabled: + if sleep: + await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) + + try: + result = await self.do_next_background_update( + self.BACKGROUND_UPDATE_DURATION_MS ) - self._all_done = True - return None + except Exception: + logger.exception("Error doing update") + else: + if result: + logger.info( + "No more background updates to do." + " Unscheduling background update task." + ) + self._all_done = True + return None + finally: + self._running = False async def has_completed_background_updates(self) -> bool: """Check if all the background updates have completed diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 5c71e27518..d4cab69ebf 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -446,6 +446,10 @@ class DatabasePool: self._check_safe_to_upsert, ) + def name(self) -> str: + "Return the name of this database" + return self._database_config.name + def is_running(self) -> bool: """Is the database pool currently running""" return self._db_pool.running diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 2da2659f41..baec35ee27 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -412,16 +412,16 @@ class ApplicationServiceTransactionWorkerStore( ) async def set_type_stream_id_for_appservice( - self, service: ApplicationService, type: str, pos: Optional[int] + self, service: ApplicationService, stream_type: str, pos: Optional[int] ) -> None: - if type not in ("read_receipt", "presence"): + if stream_type not in ("read_receipt", "presence"): raise ValueError( "Expected type to be a valid application stream id type, got %s" - % (type,) + % (stream_type,) ) def set_type_stream_id_for_appservice_txn(txn): - stream_id_type = "%s_stream_id" % type + stream_id_type = "%s_stream_id" % stream_type txn.execute( "UPDATE application_services_state SET %s = ? WHERE as_id=?" % stream_id_type, diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 264e625bd7..ae3afdd5d2 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -134,7 +134,10 @@ class DeviceInboxWorkerStore(SQLBaseStore): limit: The maximum number of messages to retrieve. Returns: - A list of messages for the device and where in the stream the messages got to. + A tuple containing: + * A list of messages for the device. + * The max stream token of these messages. There may be more to retrieve + if the given limit was reached. """ has_changed = self._device_inbox_stream_cache.has_entity_changed( user_id, last_stream_id @@ -153,12 +156,19 @@ class DeviceInboxWorkerStore(SQLBaseStore): txn.execute( sql, (user_id, device_id, last_stream_id, current_stream_id, limit) ) + messages = [] + stream_pos = current_stream_id + for row in txn: stream_pos = row[0] messages.append(db_to_json(row[1])) + + # If the limit was not reached we know that there's no more data for this + # user/device pair up to current_stream_id. if len(messages) < limit: stream_pos = current_stream_id + return messages, stream_pos return await self.db_pool.runInteraction( @@ -260,13 +270,20 @@ class DeviceInboxWorkerStore(SQLBaseStore): " LIMIT ?" ) txn.execute(sql, (destination, last_stream_id, current_stream_id, limit)) + messages = [] + stream_pos = current_stream_id + for row in txn: stream_pos = row[0] messages.append(db_to_json(row[1])) + + # If the limit was not reached we know that there's no more data for this + # user/device pair up to current_stream_id. if len(messages) < limit: log_kv({"message": "Set stream position to current position"}) stream_pos = current_stream_id + return messages, stream_pos return await self.db_pool.runInteraction( @@ -372,8 +389,8 @@ class DeviceInboxWorkerStore(SQLBaseStore): """Used to send messages from this server. Args: - local_messages_by_user_and_device: - Dictionary of user_id to device_id to message. + local_messages_by_user_then_device: + Dictionary of recipient user_id to recipient device_id to message. remote_messages_by_destination: Dictionary of destination server_name to the EDU JSON to send. diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index ae37901be9..c6bf316d5b 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -28,6 +28,7 @@ from typing import ( import attr from constantly import NamedConstant, Names +from prometheus_client import Gauge from typing_extensions import Literal from twisted.internet import defer @@ -81,6 +82,12 @@ EVENT_QUEUE_ITERATIONS = 3 # No. times we block waiting for requests for events EVENT_QUEUE_TIMEOUT_S = 0.1 # Timeout when waiting for requests for events +event_fetch_ongoing_gauge = Gauge( + "synapse_event_fetch_ongoing", + "The number of event fetchers that are running", +) + + @attr.s(slots=True, auto_attribs=True) class _EventCacheEntry: event: EventBase @@ -222,6 +229,7 @@ class EventsWorkerStore(SQLBaseStore): self._event_fetch_lock = threading.Condition() self._event_fetch_list = [] self._event_fetch_ongoing = 0 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) # We define this sequence here so that it can be referenced from both # the DataStore and PersistEventStore. @@ -732,28 +740,31 @@ class EventsWorkerStore(SQLBaseStore): """Takes a database connection and waits for requests for events from the _event_fetch_list queue. """ - i = 0 - while True: - with self._event_fetch_lock: - event_list = self._event_fetch_list - self._event_fetch_list = [] - - if not event_list: - single_threaded = self.database_engine.single_threaded - if ( - not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING - or single_threaded - or i > EVENT_QUEUE_ITERATIONS - ): - self._event_fetch_ongoing -= 1 - return - else: - self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) - i += 1 - continue - i = 0 - - self._fetch_event_list(conn, event_list) + try: + i = 0 + while True: + with self._event_fetch_lock: + event_list = self._event_fetch_list + self._event_fetch_list = [] + + if not event_list: + single_threaded = self.database_engine.single_threaded + if ( + not self.USE_DEDICATED_DB_THREADS_FOR_EVENT_FETCHING + or single_threaded + or i > EVENT_QUEUE_ITERATIONS + ): + break + else: + self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S) + i += 1 + continue + i = 0 + + self._fetch_event_list(conn, event_list) + finally: + self._event_fetch_ongoing -= 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) def _fetch_event_list( self, conn: Connection, event_list: List[Tuple[List[str], defer.Deferred]] @@ -977,6 +988,7 @@ class EventsWorkerStore(SQLBaseStore): if self._event_fetch_ongoing < EVENT_QUEUE_THREADS: self._event_fetch_ongoing += 1 + event_fetch_ongoing_gauge.set(self._event_fetch_ongoing) should_start = True else: should_start = False diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index 3d1dff660b..3d0df0cbd4 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -14,6 +14,7 @@ import logging from types import TracebackType from typing import TYPE_CHECKING, Dict, Optional, Tuple, Type +from weakref import WeakValueDictionary from twisted.internet.interfaces import IReactorCore @@ -61,7 +62,7 @@ class LockStore(SQLBaseStore): # A map from `(lock_name, lock_key)` to the token of any locks that we # think we currently hold. - self._live_tokens: Dict[Tuple[str, str], str] = {} + self._live_tokens: Dict[Tuple[str, str], Lock] = WeakValueDictionary() # When we shut down we want to remove the locks. Technically this can # lead to a race, as we may drop the lock while we are still processing. @@ -80,10 +81,10 @@ class LockStore(SQLBaseStore): # We need to take a copy of the tokens dict as dropping the locks will # cause the dictionary to change. - tokens = dict(self._live_tokens) + locks = dict(self._live_tokens) - for (lock_name, lock_key), token in tokens.items(): - await self._drop_lock(lock_name, lock_key, token) + for lock in locks.values(): + await lock.release() logger.info("Dropped locks due to shutdown") @@ -93,6 +94,11 @@ class LockStore(SQLBaseStore): used (otherwise the lock will leak). """ + # Check if this process has taken out a lock and if it's still valid. + lock = self._live_tokens.get((lock_name, lock_key)) + if lock and await lock.is_still_valid(): + return None + now = self._clock.time_msec() token = random_string(6) @@ -100,7 +106,9 @@ class LockStore(SQLBaseStore): def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool: # We take out the lock if either a) there is no row for the lock - # already or b) the existing row has timed out. + # already, b) the existing row has timed out, or c) the row is + # for this instance (which means the process got killed and + # restarted) sql = """ INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts) VALUES (?, ?, ?, ?, ?) @@ -112,6 +120,7 @@ class LockStore(SQLBaseStore): last_renewed_ts = EXCLUDED.last_renewed_ts WHERE worker_locks.last_renewed_ts < ? + OR worker_locks.instance_name = EXCLUDED.instance_name """ txn.execute( sql, @@ -148,11 +157,11 @@ class LockStore(SQLBaseStore): WHERE lock_name = ? AND lock_key = ? - AND last_renewed_ts < ? + AND (last_renewed_ts < ? OR instance_name = ?) """ txn.execute( sql, - (lock_name, lock_key, now - _LOCK_TIMEOUT_MS), + (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name), ) inserted = self.db_pool.simple_upsert_txn_emulated( @@ -179,9 +188,7 @@ class LockStore(SQLBaseStore): if not did_lock: return None - self._live_tokens[(lock_name, lock_key)] = token - - return Lock( + lock = Lock( self._reactor, self._clock, self, @@ -190,6 +197,10 @@ class LockStore(SQLBaseStore): token=token, ) + self._live_tokens[(lock_name, lock_key)] = lock + + return lock + async def _is_lock_still_valid( self, lock_name: str, lock_key: str, token: str ) -> bool: diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 53576ad52f..907af10995 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -20,7 +20,7 @@ import attr from synapse.api.constants import RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import LoggingTransaction +from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.relations import ( AggregationPaginationToken, @@ -334,6 +334,62 @@ class RelationsWorkerStore(SQLBaseStore): return count, latest_event + async def events_have_relations( + self, + parent_ids: List[str], + relation_senders: Optional[List[str]], + relation_types: Optional[List[str]], + ) -> List[str]: + """Check which events have a relationship from the given senders of the + given types. + + Args: + parent_ids: The events being annotated + relation_senders: The relation senders to check. + relation_types: The relation types to check. + + Returns: + True if the event has at least one relationship from one of the given senders of the given type. + """ + # If no restrictions are given then the event has the required relations. + if not relation_senders and not relation_types: + return parent_ids + + sql = """ + SELECT relates_to_id FROM event_relations + INNER JOIN events USING (event_id) + WHERE + %s; + """ + + def _get_if_event_has_relations(txn) -> List[str]: + clauses: List[str] = [] + clause, args = make_in_list_sql_clause( + txn.database_engine, "relates_to_id", parent_ids + ) + clauses.append(clause) + + if relation_senders: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "sender", relation_senders + ) + clauses.append(clause) + args.extend(temp_args) + if relation_types: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "relation_type", relation_types + ) + clauses.append(clause) + args.extend(temp_args) + + txn.execute(sql % " AND ".join(clauses), args) + + return [row[0] for row in txn] + + return await self.db_pool.runInteraction( + "get_if_event_has_relations", _get_if_event_has_relations + ) + async def has_user_annotated_event( self, parent_id: str, event_type: str, aggregation_key: str, sender: str ) -> bool: diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index cefc77fa0f..17b398bb69 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1751,7 +1751,12 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): ) async def block_room(self, room_id: str, user_id: str) -> None: - """Marks the room as blocked. Can be called multiple times. + """Marks the room as blocked. + + Can be called multiple times (though we'll only track the last user to + block this room). + + Can be called on a room unknown to this homeserver. Args: room_id: Room to block diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index dc7884b1c0..42dc807d17 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -272,31 +272,37 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: args = [] if event_filter.types: - clauses.append("(%s)" % " OR ".join("type = ?" for _ in event_filter.types)) + clauses.append( + "(%s)" % " OR ".join("event.type = ?" for _ in event_filter.types) + ) args.extend(event_filter.types) for typ in event_filter.not_types: - clauses.append("type != ?") + clauses.append("event.type != ?") args.append(typ) if event_filter.senders: - clauses.append("(%s)" % " OR ".join("sender = ?" for _ in event_filter.senders)) + clauses.append( + "(%s)" % " OR ".join("event.sender = ?" for _ in event_filter.senders) + ) args.extend(event_filter.senders) for sender in event_filter.not_senders: - clauses.append("sender != ?") + clauses.append("event.sender != ?") args.append(sender) if event_filter.rooms: - clauses.append("(%s)" % " OR ".join("room_id = ?" for _ in event_filter.rooms)) + clauses.append( + "(%s)" % " OR ".join("event.room_id = ?" for _ in event_filter.rooms) + ) args.extend(event_filter.rooms) for room_id in event_filter.not_rooms: - clauses.append("room_id != ?") + clauses.append("event.room_id != ?") args.append(room_id) if event_filter.contains_url: - clauses.append("contains_url = ?") + clauses.append("event.contains_url = ?") args.append(event_filter.contains_url) # We're only applying the "labels" filter on the database query, because applying the @@ -307,6 +313,23 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]: clauses.append("(%s)" % " OR ".join("label = ?" for _ in event_filter.labels)) args.extend(event_filter.labels) + # Filter on relation_senders / relation types from the joined tables. + if event_filter.relation_senders: + clauses.append( + "(%s)" + % " OR ".join( + "related_event.sender = ?" for _ in event_filter.relation_senders + ) + ) + args.extend(event_filter.relation_senders) + + if event_filter.relation_types: + clauses.append( + "(%s)" + % " OR ".join("relation_type = ?" for _ in event_filter.relation_types) + ) + args.extend(event_filter.relation_types) + return " AND ".join(clauses), args @@ -1116,7 +1139,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): bounds = generate_pagination_where_clause( direction=direction, - column_names=("topological_ordering", "stream_ordering"), + column_names=("event.topological_ordering", "event.stream_ordering"), from_token=from_bound, to_token=to_bound, engine=self.database_engine, @@ -1133,32 +1156,51 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): select_keywords = "SELECT" join_clause = "" + # Using DISTINCT in this SELECT query is quite expensive, because it + # requires the engine to sort on the entire (not limited) result set, + # i.e. the entire events table. Only use it in scenarios that could result + # in the same event ID occurring multiple times in the results. + needs_distinct = False if event_filter and event_filter.labels: # If we're not filtering on a label, then joining on event_labels will # return as many row for a single event as the number of labels it has. To # avoid this, only join if we're filtering on at least one label. - join_clause = """ + join_clause += """ LEFT JOIN event_labels USING (event_id, room_id, topological_ordering) """ if len(event_filter.labels) > 1: - # Using DISTINCT in this SELECT query is quite expensive, because it - # requires the engine to sort on the entire (not limited) result set, - # i.e. the entire events table. We only need to use it when we're - # filtering on more than two labels, because that's the only scenario - # in which we can possibly to get multiple times the same event ID in - # the results. - select_keywords += "DISTINCT" + # Multiple labels could cause the same event to appear multiple times. + needs_distinct = True + + # If there is a filter on relation_senders and relation_types join to the + # relations table. + if event_filter and ( + event_filter.relation_senders or event_filter.relation_types + ): + # Filtering by relations could cause the same event to appear multiple + # times (since there's no limit on the number of relations to an event). + needs_distinct = True + join_clause += """ + LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id) + """ + if event_filter.relation_senders: + join_clause += """ + LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id) + """ + + if needs_distinct: + select_keywords += " DISTINCT" sql = """ %(select_keywords)s - event_id, instance_name, - topological_ordering, stream_ordering - FROM events + event.event_id, event.instance_name, + event.topological_ordering, event.stream_ordering + FROM events AS event %(join_clause)s - WHERE outlier = ? AND room_id = ? AND %(bounds)s - ORDER BY topological_ordering %(order)s, - stream_ordering %(order)s LIMIT ? + WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s + ORDER BY event.topological_ordering %(order)s, + event.stream_ordering %(order)s LIMIT ? """ % { "select_keywords": select_keywords, "join_clause": join_clause, diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 1629d2a53c..8b9c6adae2 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -133,22 +133,23 @@ def prepare_database( # if it's a worker app, refuse to upgrade the database, to avoid multiple # workers doing it at once. - if ( - config.worker.worker_app is not None - and version_info.current_version != SCHEMA_VERSION - ): + if config.worker.worker_app is None: + _upgrade_existing_database( + cur, + version_info, + database_engine, + config, + databases=databases, + ) + elif version_info.current_version < SCHEMA_VERSION: + # If the DB is on an older version than we expect then we refuse + # to start the worker (as the main process needs to run first to + # update the schema). raise UpgradeDatabaseException( OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, version_info.current_version) ) - _upgrade_existing_database( - cur, - version_info, - database_engine, - config, - databases=databases, - ) else: logger.info("%r: Initialising new database", databases) diff --git a/sytest-blacklist b/sytest-blacklist index 65bf1774e3..57e603a4a6 100644 --- a/sytest-blacklist +++ b/sytest-blacklist @@ -32,3 +32,6 @@ We can't peek into rooms with invited history_visibility We can't peek into rooms with joined history_visibility Local users can peek by room alias Peeked rooms only turn up in the sync for the device who peeked them + +# Validation needs to be added to Synapse: #10554 +Rejects invalid device keys diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index f44c91a373..b7fc33dc94 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -15,6 +15,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from unittest.mock import patch + import jsonschema from synapse.api.constants import EventContentFields @@ -51,9 +53,8 @@ class FilteringTestCase(unittest.HomeserverTestCase): {"presence": {"senders": ["@bar;pik.test.com"]}}, ] for filter in invalid_filters: - with self.assertRaises(SynapseError) as check_filter_error: + with self.assertRaises(SynapseError): self.filtering.check_valid_filter(filter) - self.assertIsInstance(check_filter_error.exception, SynapseError) def test_valid_filters(self): valid_filters = [ @@ -119,12 +120,12 @@ class FilteringTestCase(unittest.HomeserverTestCase): definition = {"types": ["m.room.message", "org.matrix.foo.bar"]} event = MockEvent(sender="@foo:bar", type="m.room.message", room_id="!foo:bar") - self.assertTrue(Filter(definition).check(event)) + self.assertTrue(Filter(self.hs, definition)._check(event)) def test_definition_types_works_with_wildcards(self): definition = {"types": ["m.*", "org.matrix.foo.bar"]} event = MockEvent(sender="@foo:bar", type="m.room.message", room_id="!foo:bar") - self.assertTrue(Filter(definition).check(event)) + self.assertTrue(Filter(self.hs, definition)._check(event)) def test_definition_types_works_with_unknowns(self): definition = {"types": ["m.room.message", "org.matrix.foo.bar"]} @@ -133,24 +134,24 @@ class FilteringTestCase(unittest.HomeserverTestCase): type="now.for.something.completely.different", room_id="!foo:bar", ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_not_types_works_with_literals(self): definition = {"not_types": ["m.room.message", "org.matrix.foo.bar"]} event = MockEvent(sender="@foo:bar", type="m.room.message", room_id="!foo:bar") - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_not_types_works_with_wildcards(self): definition = {"not_types": ["m.room.message", "org.matrix.*"]} event = MockEvent( sender="@foo:bar", type="org.matrix.custom.event", room_id="!foo:bar" ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_not_types_works_with_unknowns(self): definition = {"not_types": ["m.*", "org.*"]} event = MockEvent(sender="@foo:bar", type="com.nom.nom.nom", room_id="!foo:bar") - self.assertTrue(Filter(definition).check(event)) + self.assertTrue(Filter(self.hs, definition)._check(event)) def test_definition_not_types_takes_priority_over_types(self): definition = { @@ -158,35 +159,35 @@ class FilteringTestCase(unittest.HomeserverTestCase): "types": ["m.room.message", "m.room.topic"], } event = MockEvent(sender="@foo:bar", type="m.room.topic", room_id="!foo:bar") - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_senders_works_with_literals(self): definition = {"senders": ["@flibble:wibble"]} event = MockEvent( sender="@flibble:wibble", type="com.nom.nom.nom", room_id="!foo:bar" ) - self.assertTrue(Filter(definition).check(event)) + self.assertTrue(Filter(self.hs, definition)._check(event)) def test_definition_senders_works_with_unknowns(self): definition = {"senders": ["@flibble:wibble"]} event = MockEvent( sender="@challenger:appears", type="com.nom.nom.nom", room_id="!foo:bar" ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_not_senders_works_with_literals(self): definition = {"not_senders": ["@flibble:wibble"]} event = MockEvent( sender="@flibble:wibble", type="com.nom.nom.nom", room_id="!foo:bar" ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_not_senders_works_with_unknowns(self): definition = {"not_senders": ["@flibble:wibble"]} event = MockEvent( sender="@challenger:appears", type="com.nom.nom.nom", room_id="!foo:bar" ) - self.assertTrue(Filter(definition).check(event)) + self.assertTrue(Filter(self.hs, definition)._check(event)) def test_definition_not_senders_takes_priority_over_senders(self): definition = { @@ -196,14 +197,14 @@ class FilteringTestCase(unittest.HomeserverTestCase): event = MockEvent( sender="@misspiggy:muppets", type="m.room.topic", room_id="!foo:bar" ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_rooms_works_with_literals(self): definition = {"rooms": ["!secretbase:unknown"]} event = MockEvent( sender="@foo:bar", type="m.room.message", room_id="!secretbase:unknown" ) - self.assertTrue(Filter(definition).check(event)) + self.assertTrue(Filter(self.hs, definition)._check(event)) def test_definition_rooms_works_with_unknowns(self): definition = {"rooms": ["!secretbase:unknown"]} @@ -212,7 +213,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): type="m.room.message", room_id="!anothersecretbase:unknown", ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_not_rooms_works_with_literals(self): definition = {"not_rooms": ["!anothersecretbase:unknown"]} @@ -221,7 +222,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): type="m.room.message", room_id="!anothersecretbase:unknown", ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_not_rooms_works_with_unknowns(self): definition = {"not_rooms": ["!secretbase:unknown"]} @@ -230,7 +231,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): type="m.room.message", room_id="!anothersecretbase:unknown", ) - self.assertTrue(Filter(definition).check(event)) + self.assertTrue(Filter(self.hs, definition)._check(event)) def test_definition_not_rooms_takes_priority_over_rooms(self): definition = { @@ -240,7 +241,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): event = MockEvent( sender="@foo:bar", type="m.room.message", room_id="!secretbase:unknown" ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_combined_event(self): definition = { @@ -256,7 +257,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): type="m.room.message", # yup room_id="!stage:unknown", # yup ) - self.assertTrue(Filter(definition).check(event)) + self.assertTrue(Filter(self.hs, definition)._check(event)) def test_definition_combined_event_bad_sender(self): definition = { @@ -272,7 +273,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): type="m.room.message", # yup room_id="!stage:unknown", # yup ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_combined_event_bad_room(self): definition = { @@ -288,7 +289,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): type="m.room.message", # yup room_id="!piggyshouse:muppets", # nope ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_definition_combined_event_bad_type(self): definition = { @@ -304,7 +305,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): type="muppets.misspiggy.kisses", # nope room_id="!stage:unknown", # yup ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_filter_labels(self): definition = {"org.matrix.labels": ["#fun"]} @@ -315,7 +316,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): content={EventContentFields.LABELS: ["#fun"]}, ) - self.assertTrue(Filter(definition).check(event)) + self.assertTrue(Filter(self.hs, definition)._check(event)) event = MockEvent( sender="@foo:bar", @@ -324,7 +325,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): content={EventContentFields.LABELS: ["#notfun"]}, ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) def test_filter_not_labels(self): definition = {"org.matrix.not_labels": ["#fun"]} @@ -335,7 +336,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): content={EventContentFields.LABELS: ["#fun"]}, ) - self.assertFalse(Filter(definition).check(event)) + self.assertFalse(Filter(self.hs, definition)._check(event)) event = MockEvent( sender="@foo:bar", @@ -344,7 +345,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): content={EventContentFields.LABELS: ["#notfun"]}, ) - self.assertTrue(Filter(definition).check(event)) + self.assertTrue(Filter(self.hs, definition)._check(event)) def test_filter_presence_match(self): user_filter_json = {"presence": {"types": ["m.*"]}} @@ -362,7 +363,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): ) ) - results = user_filter.filter_presence(events=events) + results = self.get_success(user_filter.filter_presence(events=events)) self.assertEquals(events, results) def test_filter_presence_no_match(self): @@ -386,7 +387,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): ) ) - results = user_filter.filter_presence(events=events) + results = self.get_success(user_filter.filter_presence(events=events)) self.assertEquals([], results) def test_filter_room_state_match(self): @@ -405,7 +406,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): ) ) - results = user_filter.filter_room_state(events=events) + results = self.get_success(user_filter.filter_room_state(events=events)) self.assertEquals(events, results) def test_filter_room_state_no_match(self): @@ -426,7 +427,7 @@ class FilteringTestCase(unittest.HomeserverTestCase): ) ) - results = user_filter.filter_room_state(events) + results = self.get_success(user_filter.filter_room_state(events)) self.assertEquals([], results) def test_filter_rooms(self): @@ -441,10 +442,52 @@ class FilteringTestCase(unittest.HomeserverTestCase): "!not_included:example.com", # Disallowed because not in rooms. ] - filtered_room_ids = list(Filter(definition).filter_rooms(room_ids)) + filtered_room_ids = list(Filter(self.hs, definition).filter_rooms(room_ids)) self.assertEquals(filtered_room_ids, ["!allowed:example.com"]) + @unittest.override_config({"experimental_features": {"msc3440_enabled": True}}) + def test_filter_relations(self): + events = [ + # An event without a relation. + MockEvent( + event_id="$no_relation", + sender="@foo:bar", + type="org.matrix.custom.event", + room_id="!foo:bar", + ), + # An event with a relation. + MockEvent( + event_id="$with_relation", + sender="@foo:bar", + type="org.matrix.custom.event", + room_id="!foo:bar", + ), + # Non-EventBase objects get passed through. + {}, + ] + + # For the following tests we patch the datastore method (intead of injecting + # events). This is a bit cheeky, but tests the logic of _check_event_relations. + + # Filter for a particular sender. + definition = { + "io.element.relation_senders": ["@foo:bar"], + } + + async def events_have_relations(*args, **kwargs): + return ["$with_relation"] + + with patch.object( + self.datastore, "events_have_relations", new=events_have_relations + ): + filtered_events = list( + self.get_success( + Filter(self.hs, definition)._check_event_relations(events) + ) + ) + self.assertEquals(filtered_events, events[1:]) + def test_add_filter(self): user_filter_json = {"room": {"state": {"types": ["m.*"]}}} diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 43998020b2..d6f14e2dba 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -40,6 +40,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): hs.get_application_service_scheduler.return_value = self.mock_scheduler hs.get_clock.return_value = MockClock() self.handler = ApplicationServicesHandler(hs) + self.event_source = hs.get_event_sources() def test_notify_interested_services(self): interested_service = self._mkservice(is_interested=True) @@ -252,6 +253,60 @@ class AppServiceHandlerTestCase(unittest.TestCase): }, ) + def test_notify_interested_services_ephemeral(self): + """ + Test sending ephemeral events to the appservice handler are scheduled + to be pushed out to interested appservices, and that the stream ID is + updated accordingly. + """ + interested_service = self._mkservice(is_interested=True) + services = [interested_service] + + self.mock_store.get_app_services.return_value = services + self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable( + 579 + ) + + event = Mock(event_id="event_1") + self.event_source.sources.receipt.get_new_events_as.return_value = ( + make_awaitable(([event], None)) + ) + + self.handler.notify_interested_services_ephemeral( + "receipt_key", 580, ["@fakerecipient:example.com"] + ) + self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( + interested_service, [event] + ) + self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with( + interested_service, + "read_receipt", + 580, + ) + + def test_notify_interested_services_ephemeral_out_of_order(self): + """ + Test sending out of order ephemeral events to the appservice handler + are ignored. + """ + interested_service = self._mkservice(is_interested=True) + services = [interested_service] + + self.mock_store.get_app_services.return_value = services + self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable( + 580 + ) + + event = Mock(event_id="event_1") + self.event_source.sources.receipt.get_new_events_as.return_value = ( + make_awaitable(([event], None)) + ) + + self.handler.notify_interested_services_ephemeral( + "receipt_key", 580, ["@fakerecipient:example.com"] + ) + self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called() + def _mkservice(self, is_interested, protocols=None): service = Mock() service.is_interested.return_value = make_awaitable(is_interested) diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 39e7b1ab25..0c3b86fda9 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -17,6 +17,8 @@ from unittest import mock from signedjson import key as key, sign as sign +from twisted.internet import defer + from synapse.api.constants import RoomEncryptionAlgorithms from synapse.api.errors import Codes, SynapseError @@ -630,3 +632,152 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): ], other_master_key["signatures"][local_user]["ed25519:" + usersigning_pubkey], ) + + def test_query_devices_remote_no_sync(self): + """Tests that querying keys for a remote user that we don't share a room + with returns the cross signing keys correctly. + """ + + remote_user_id = "@test:other" + local_user_id = "@test:test" + + remote_master_key = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY" + remote_self_signing_key = "QeIiFEjluPBtI7WQdG365QKZcFs9kqmHir6RBD0//nQ" + + self.hs.get_federation_client().query_client_keys = mock.Mock( + return_value=defer.succeed( + { + "device_keys": {remote_user_id: {}}, + "master_keys": { + remote_user_id: { + "user_id": remote_user_id, + "usage": ["master"], + "keys": {"ed25519:" + remote_master_key: remote_master_key}, + }, + }, + "self_signing_keys": { + remote_user_id: { + "user_id": remote_user_id, + "usage": ["self_signing"], + "keys": { + "ed25519:" + + remote_self_signing_key: remote_self_signing_key + }, + } + }, + } + ) + ) + + e2e_handler = self.hs.get_e2e_keys_handler() + + query_result = self.get_success( + e2e_handler.query_devices( + { + "device_keys": {remote_user_id: []}, + }, + timeout=10, + from_user_id=local_user_id, + from_device_id="some_device_id", + ) + ) + + self.assertEqual(query_result["failures"], {}) + self.assertEqual( + query_result["master_keys"], + { + remote_user_id: { + "user_id": remote_user_id, + "usage": ["master"], + "keys": {"ed25519:" + remote_master_key: remote_master_key}, + }, + }, + ) + self.assertEqual( + query_result["self_signing_keys"], + { + remote_user_id: { + "user_id": remote_user_id, + "usage": ["self_signing"], + "keys": { + "ed25519:" + remote_self_signing_key: remote_self_signing_key + }, + } + }, + ) + + def test_query_devices_remote_sync(self): + """Tests that querying keys for a remote user that we share a room with, + but haven't yet fetched the keys for, returns the cross signing keys + correctly. + """ + + remote_user_id = "@test:other" + local_user_id = "@test:test" + + self.store.get_rooms_for_user = mock.Mock( + return_value=defer.succeed({"some_room_id"}) + ) + + remote_master_key = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY" + remote_self_signing_key = "QeIiFEjluPBtI7WQdG365QKZcFs9kqmHir6RBD0//nQ" + + self.hs.get_federation_client().query_user_devices = mock.Mock( + return_value=defer.succeed( + { + "user_id": remote_user_id, + "stream_id": 1, + "devices": [], + "master_key": { + "user_id": remote_user_id, + "usage": ["master"], + "keys": {"ed25519:" + remote_master_key: remote_master_key}, + }, + "self_signing_key": { + "user_id": remote_user_id, + "usage": ["self_signing"], + "keys": { + "ed25519:" + + remote_self_signing_key: remote_self_signing_key + }, + }, + } + ) + ) + + e2e_handler = self.hs.get_e2e_keys_handler() + + query_result = self.get_success( + e2e_handler.query_devices( + { + "device_keys": {remote_user_id: []}, + }, + timeout=10, + from_user_id=local_user_id, + from_device_id="some_device_id", + ) + ) + + self.assertEqual(query_result["failures"], {}) + self.assertEqual( + query_result["master_keys"], + { + remote_user_id: { + "user_id": remote_user_id, + "usage": ["master"], + "keys": {"ed25519:" + remote_master_key: remote_master_key}, + } + }, + ) + self.assertEqual( + query_result["self_signing_keys"], + { + remote_user_id: { + "user_id": remote_user_id, + "usage": ["self_signing"], + "keys": { + "ed25519:" + remote_self_signing_key: remote_self_signing_key + }, + } + }, + ) diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 339c039914..638186f173 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -13,10 +13,11 @@ # limitations under the License. from typing import Optional +from unittest.mock import Mock from synapse.api.constants import EventTypes, JoinRules from synapse.api.errors import Codes, ResourceLimitError -from synapse.api.filtering import DEFAULT_FILTER_COLLECTION +from synapse.api.filtering import Filtering from synapse.api.room_versions import RoomVersions from synapse.handlers.sync import SyncConfig from synapse.rest import admin @@ -197,7 +198,7 @@ def generate_sync_config( _request_key += 1 return SyncConfig( user=UserID.from_string(user_id), - filter_collection=DEFAULT_FILTER_COLLECTION, + filter_collection=Filtering(Mock()).DEFAULT_FILTER_COLLECTION, is_guest=False, request_key=("request_key", _request_key), device_id=device_id, diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 525b83141b..d16cd141a7 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -116,7 +116,6 @@ class ModuleApiTestCase(HomeserverTestCase): # Insert a second ip, agent at a later date. We should be able to retrieve it. last_seen_2 = last_seen_1 + 10000 - print("%s => %s" % (last_seen_1, last_seen_2)) self.get_success( self.store.insert_client_ip( user_id, "access_token", "ip_2", "user_agent_2", "device_2", last_seen_2 diff --git a/tests/push/test_email.py b/tests/push/test_email.py index fa8018e5a7..90f800e564 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -65,7 +65,7 @@ class EmailPusherTests(HomeserverTestCase): "notif_from": "test@example.com", "riot_base_url": None, } - config["public_baseurl"] = "aaa" + config["public_baseurl"] = "http://aaa" config["start_pushers"] = True hs = self.setup_test_homeserver(config=config) diff --git a/tests/rest/admin/test_background_updates.py b/tests/rest/admin/test_background_updates.py new file mode 100644 index 0000000000..78c48db552 --- /dev/null +++ b/tests/rest/admin/test_background_updates.py @@ -0,0 +1,218 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse.rest.admin +from synapse.rest.client import login +from synapse.server import HomeServer + +from tests import unittest + + +class BackgroundUpdatesTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs: HomeServer): + self.store = hs.get_datastore() + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + def _register_bg_update(self): + "Adds a bg update but doesn't start it" + + async def _fake_update(progress, batch_size) -> int: + await self.clock.sleep(0.2) + return batch_size + + self.store.db_pool.updates.register_background_update_handler( + "test_update", + _fake_update, + ) + + self.get_success( + self.store.db_pool.simple_insert( + table="background_updates", + values={ + "update_name": "test_update", + "progress_json": "{}", + }, + ) + ) + + def test_status_empty(self): + """Test the status API works.""" + + channel = self.make_request( + "GET", + "/_synapse/admin/v1/background_updates/status", + access_token=self.admin_user_tok, + ) + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + + # Background updates should be enabled, but none should be running. + self.assertDictEqual( + channel.json_body, {"current_updates": {}, "enabled": True} + ) + + def test_status_bg_update(self): + """Test the status API works with a background update.""" + + # Create a new background update + + self._register_bg_update() + + self.store.db_pool.updates.start_doing_background_updates() + self.reactor.pump([1.0, 1.0]) + + channel = self.make_request( + "GET", + "/_synapse/admin/v1/background_updates/status", + access_token=self.admin_user_tok, + ) + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + + # Background updates should be enabled, and one should be running. + self.assertDictEqual( + channel.json_body, + { + "current_updates": { + "master": { + "name": "test_update", + "average_items_per_ms": 0.1, + "total_duration_ms": 1000.0, + "total_item_count": 100, + } + }, + "enabled": True, + }, + ) + + def test_enabled(self): + """Test the enabled API works.""" + + # Create a new background update + + self._register_bg_update() + self.store.db_pool.updates.start_doing_background_updates() + + # Test that GET works and returns enabled is True. + channel = self.make_request( + "GET", + "/_synapse/admin/v1/background_updates/enabled", + access_token=self.admin_user_tok, + ) + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertDictEqual(channel.json_body, {"enabled": True}) + + # Disable the BG updates + channel = self.make_request( + "POST", + "/_synapse/admin/v1/background_updates/enabled", + content={"enabled": False}, + access_token=self.admin_user_tok, + ) + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertDictEqual(channel.json_body, {"enabled": False}) + + # Advance a bit and get the current status, note this will finish the in + # flight background update so we call it the status API twice and check + # there was no change. + self.reactor.pump([1.0, 1.0]) + + channel = self.make_request( + "GET", + "/_synapse/admin/v1/background_updates/status", + access_token=self.admin_user_tok, + ) + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertDictEqual( + channel.json_body, + { + "current_updates": { + "master": { + "name": "test_update", + "average_items_per_ms": 0.1, + "total_duration_ms": 1000.0, + "total_item_count": 100, + } + }, + "enabled": False, + }, + ) + + # Run the reactor for a bit so the BG updates would have a chance to run + # if they were to. + self.reactor.pump([1.0, 1.0]) + + channel = self.make_request( + "GET", + "/_synapse/admin/v1/background_updates/status", + access_token=self.admin_user_tok, + ) + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + + # There should be no change from the previous /status response. + self.assertDictEqual( + channel.json_body, + { + "current_updates": { + "master": { + "name": "test_update", + "average_items_per_ms": 0.1, + "total_duration_ms": 1000.0, + "total_item_count": 100, + } + }, + "enabled": False, + }, + ) + + # Re-enable the background updates. + + channel = self.make_request( + "POST", + "/_synapse/admin/v1/background_updates/enabled", + content={"enabled": True}, + access_token=self.admin_user_tok, + ) + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + + self.assertDictEqual(channel.json_body, {"enabled": True}) + + self.reactor.pump([1.0, 1.0]) + + channel = self.make_request( + "GET", + "/_synapse/admin/v1/background_updates/status", + access_token=self.admin_user_tok, + ) + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + + # Background updates should be enabled and making progress. + self.assertDictEqual( + channel.json_body, + { + "current_updates": { + "master": { + "name": "test_update", + "average_items_per_ms": 0.1, + "total_duration_ms": 2000.0, + "total_item_count": 200, + } + }, + "enabled": True, + }, + ) diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 46116644ce..11ec54c82e 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -14,9 +14,12 @@ import json import urllib.parse +from http import HTTPStatus from typing import List, Optional from unittest.mock import Mock +from parameterized import parameterized + import synapse.rest.admin from synapse.api.constants import EventTypes, Membership from synapse.api.errors import Codes @@ -281,6 +284,31 @@ class DeleteRoomTestCase(unittest.HomeserverTestCase): self._is_blocked(self.room_id, expect=True) self._has_no_members(self.room_id) + @parameterized.expand([(True,), (False,)]) + def test_block_unknown_room(self, purge: bool) -> None: + """ + We can block an unknown room. In this case, the `purge` argument + should be ignored. + """ + room_id = "!unknown:test" + + # The room isn't already in the blocked rooms table + self._is_blocked(room_id, expect=False) + + # Request the room be blocked. + channel = self.make_request( + "DELETE", + f"/_synapse/admin/v1/rooms/{room_id}", + {"block": True, "purge": purge}, + access_token=self.admin_user_tok, + ) + + # The room is now blocked. + self.assertEqual( + HTTPStatus.OK, int(channel.result["code"]), msg=channel.result["body"] + ) + self._is_blocked(room_id) + def test_shutdown_room_consent(self): """Test that we can shutdown rooms with local users who have not yet accepted the privacy policy. This used to fail when we tried to diff --git a/tests/rest/client/test_consent.py b/tests/rest/client/test_consent.py index 84d092ca82..fcdc565814 100644 --- a/tests/rest/client/test_consent.py +++ b/tests/rest/client/test_consent.py @@ -35,7 +35,6 @@ class ConsentResourceTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor, clock): config = self.default_config() - config["public_baseurl"] = "aaaa" config["form_secret"] = "123abc" # Make some temporary templates... diff --git a/tests/rest/client/test_register.py b/tests/rest/client/test_register.py index 66dcfc9f88..6e7c0f11df 100644 --- a/tests/rest/client/test_register.py +++ b/tests/rest/client/test_register.py @@ -891,7 +891,6 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase): "smtp_pass": None, "notif_from": "test@example.com", } - config["public_baseurl"] = "aaa" self.hs = self.setup_test_homeserver(config=config) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index 376853fd65..10a4a4dc5e 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -25,7 +25,12 @@ from urllib import parse as urlparse from twisted.internet import defer import synapse.rest.admin -from synapse.api.constants import EventContentFields, EventTypes, Membership +from synapse.api.constants import ( + EventContentFields, + EventTypes, + Membership, + RelationTypes, +) from synapse.api.errors import Codes, HttpResponseException from synapse.handlers.pagination import PurgeStatus from synapse.rest import admin @@ -2157,6 +2162,153 @@ class LabelsTestCase(unittest.HomeserverTestCase): return event_id +class RelationsTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] + + def default_config(self): + config = super().default_config() + config["experimental_features"] = {"msc3440_enabled": True} + return config + + def prepare(self, reactor, clock, homeserver): + self.user_id = self.register_user("test", "test") + self.tok = self.login("test", "test") + self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok) + + self.second_user_id = self.register_user("second", "test") + self.second_tok = self.login("second", "test") + self.helper.join( + room=self.room_id, user=self.second_user_id, tok=self.second_tok + ) + + self.third_user_id = self.register_user("third", "test") + self.third_tok = self.login("third", "test") + self.helper.join(room=self.room_id, user=self.third_user_id, tok=self.third_tok) + + # An initial event with a relation from second user. + res = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "Message 1"}, + tok=self.tok, + ) + self.event_id_1 = res["event_id"] + self.helper.send_event( + room_id=self.room_id, + type="m.reaction", + content={ + "m.relates_to": { + "rel_type": RelationTypes.ANNOTATION, + "event_id": self.event_id_1, + "key": "👍", + } + }, + tok=self.second_tok, + ) + + # Another event with a relation from third user. + res = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "Message 2"}, + tok=self.tok, + ) + self.event_id_2 = res["event_id"] + self.helper.send_event( + room_id=self.room_id, + type="m.reaction", + content={ + "m.relates_to": { + "rel_type": RelationTypes.REFERENCE, + "event_id": self.event_id_2, + } + }, + tok=self.third_tok, + ) + + # An event with no relations. + self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "No relations"}, + tok=self.tok, + ) + + def _filter_messages(self, filter: JsonDict) -> List[JsonDict]: + """Make a request to /messages with a filter, returns the chunk of events.""" + channel = self.make_request( + "GET", + "/rooms/%s/messages?filter=%s&dir=b" % (self.room_id, json.dumps(filter)), + access_token=self.tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + return channel.json_body["chunk"] + + def test_filter_relation_senders(self): + # Messages which second user reacted to. + filter = {"io.element.relation_senders": [self.second_user_id]} + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0]["event_id"], self.event_id_1) + + # Messages which third user reacted to. + filter = {"io.element.relation_senders": [self.third_user_id]} + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0]["event_id"], self.event_id_2) + + # Messages which either user reacted to. + filter = { + "io.element.relation_senders": [self.second_user_id, self.third_user_id] + } + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 2, chunk) + self.assertCountEqual( + [c["event_id"] for c in chunk], [self.event_id_1, self.event_id_2] + ) + + def test_filter_relation_type(self): + # Messages which have annotations. + filter = {"io.element.relation_types": [RelationTypes.ANNOTATION]} + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0]["event_id"], self.event_id_1) + + # Messages which have references. + filter = {"io.element.relation_types": [RelationTypes.REFERENCE]} + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0]["event_id"], self.event_id_2) + + # Messages which have either annotations or references. + filter = { + "io.element.relation_types": [ + RelationTypes.ANNOTATION, + RelationTypes.REFERENCE, + ] + } + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 2, chunk) + self.assertCountEqual( + [c["event_id"] for c in chunk], [self.event_id_1, self.event_id_2] + ) + + def test_filter_relation_senders_and_type(self): + # Messages which second user reacted to. + filter = { + "io.element.relation_senders": [self.second_user_id], + "io.element.relation_types": [RelationTypes.ANNOTATION], + } + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0]["event_id"], self.event_id_1) + + class ContextTestCase(unittest.HomeserverTestCase): servlets = [ diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 95be369d4b..c427686376 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -14,6 +14,8 @@ # limitations under the License. import json +from parameterized import parameterized + import synapse.rest.admin from synapse.api.constants import ( EventContentFields, @@ -417,7 +419,30 @@ class ReadReceiptsTestCase(unittest.HomeserverTestCase): # Test that the first user can't see the other user's hidden read receipt self.assertEqual(self._get_read_receipt(), None) - def test_read_receipt_with_empty_body(self): + @parameterized.expand( + [ + # Old Element version, expected to send an empty body + ( + "agent1", + "Element/1.2.2 (Linux; U; Android 9; MatrixAndroidSDK_X 0.0.1)", + 200, + ), + # Old SchildiChat version, expected to send an empty body + ("agent2", "SchildiChat/1.2.1 (Android 10)", 200), + # Expected 400: Denies empty body starting at version 1.3+ + ("agent3", "Element/1.3.6 (Android 10)", 400), + ("agent4", "SchildiChat/1.3.6 (Android 11)", 400), + # Contains "Riot": Receipts with empty bodies expected + ("agent5", "Element (Riot.im) (Android 9)", 200), + # Expected 400: Does not contain "Android" + ("agent6", "Element/1.2.1", 400), + # Expected 400: Different format, missing "/" after Element; existing build that should allow empty bodies, but minimal ongoing usage + ("agent7", "Element dbg/1.1.8-dev (Android)", 400), + ] + ) + def test_read_receipt_with_empty_body( + self, name, user_agent: str, expected_status_code: int + ): # Send a message as the first user res = self.helper.send(self.room_id, body="hello", tok=self.tok) @@ -426,8 +451,9 @@ class ReadReceiptsTestCase(unittest.HomeserverTestCase): "POST", "/rooms/%s/receipt/m.read/%s" % (self.room_id, res["event_id"]), access_token=self.tok2, + custom_headers=[("User-Agent", user_agent)], ) - self.assertEqual(channel.code, 200) + self.assertEqual(channel.code, expected_status_code) def _get_read_receipt(self): """Syncs and returns the read receipt.""" diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py new file mode 100644 index 0000000000..a6be9a1bb1 --- /dev/null +++ b/tests/storage/test_rollback_worker.py @@ -0,0 +1,69 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.app.generic_worker import GenericWorkerServer +from synapse.storage.database import LoggingDatabaseConnection +from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database +from synapse.storage.schema import SCHEMA_VERSION + +from tests.unittest import HomeserverTestCase + + +class WorkerSchemaTests(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver( + federation_http_client=None, homeserver_to_use=GenericWorkerServer + ) + return hs + + def default_config(self): + conf = super().default_config() + + # Mark this as a worker app. + conf["worker_app"] = "yes" + + return conf + + def test_rolling_back(self): + """Test that workers can start if the DB is a newer schema version""" + + db_pool = self.hs.get_datastore().db_pool + db_conn = LoggingDatabaseConnection( + db_pool._db_pool.connect(), + db_pool.engine, + "tests", + ) + + cur = db_conn.cursor() + cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION + 1,)) + + db_conn.commit() + + prepare_database(db_conn, db_pool.engine, self.hs.config) + + def test_not_upgraded(self): + """Test that workers don't start if the DB has an older schema version""" + db_pool = self.hs.get_datastore().db_pool + db_conn = LoggingDatabaseConnection( + db_pool._db_pool.connect(), + db_pool.engine, + "tests", + ) + + cur = db_conn.cursor() + cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION - 1,)) + + db_conn.commit() + + with self.assertRaises(PrepareDatabaseException): + prepare_database(db_conn, db_pool.engine, self.hs.config) diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py new file mode 100644 index 0000000000..ce782c7e1d --- /dev/null +++ b/tests/storage/test_stream.py @@ -0,0 +1,207 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List + +from synapse.api.constants import EventTypes, RelationTypes +from synapse.api.filtering import Filter +from synapse.events import EventBase +from synapse.rest import admin +from synapse.rest.client import login, room +from synapse.types import JsonDict + +from tests.unittest import HomeserverTestCase + + +class PaginationTestCase(HomeserverTestCase): + """ + Test the pre-filtering done in the pagination code. + + This is similar to some of the tests in tests.rest.client.test_rooms but here + we ensure that the filtering done in the database is applied successfully. + """ + + servlets = [ + admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] + + def default_config(self): + config = super().default_config() + config["experimental_features"] = {"msc3440_enabled": True} + return config + + def prepare(self, reactor, clock, homeserver): + self.user_id = self.register_user("test", "test") + self.tok = self.login("test", "test") + self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok) + + self.second_user_id = self.register_user("second", "test") + self.second_tok = self.login("second", "test") + self.helper.join( + room=self.room_id, user=self.second_user_id, tok=self.second_tok + ) + + self.third_user_id = self.register_user("third", "test") + self.third_tok = self.login("third", "test") + self.helper.join(room=self.room_id, user=self.third_user_id, tok=self.third_tok) + + # An initial event with a relation from second user. + res = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "Message 1"}, + tok=self.tok, + ) + self.event_id_1 = res["event_id"] + self.helper.send_event( + room_id=self.room_id, + type="m.reaction", + content={ + "m.relates_to": { + "rel_type": RelationTypes.ANNOTATION, + "event_id": self.event_id_1, + "key": "👍", + } + }, + tok=self.second_tok, + ) + + # Another event with a relation from third user. + res = self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "Message 2"}, + tok=self.tok, + ) + self.event_id_2 = res["event_id"] + self.helper.send_event( + room_id=self.room_id, + type="m.reaction", + content={ + "m.relates_to": { + "rel_type": RelationTypes.REFERENCE, + "event_id": self.event_id_2, + } + }, + tok=self.third_tok, + ) + + # An event with no relations. + self.helper.send_event( + room_id=self.room_id, + type=EventTypes.Message, + content={"msgtype": "m.text", "body": "No relations"}, + tok=self.tok, + ) + + def _filter_messages(self, filter: JsonDict) -> List[EventBase]: + """Make a request to /messages with a filter, returns the chunk of events.""" + + from_token = self.get_success( + self.hs.get_event_sources().get_current_token_for_pagination() + ) + + events, next_key = self.get_success( + self.hs.get_datastore().paginate_room_events( + room_id=self.room_id, + from_key=from_token.room_key, + to_key=None, + direction="b", + limit=10, + event_filter=Filter(self.hs, filter), + ) + ) + + return events + + def test_filter_relation_senders(self): + # Messages which second user reacted to. + filter = {"io.element.relation_senders": [self.second_user_id]} + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0].event_id, self.event_id_1) + + # Messages which third user reacted to. + filter = {"io.element.relation_senders": [self.third_user_id]} + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0].event_id, self.event_id_2) + + # Messages which either user reacted to. + filter = { + "io.element.relation_senders": [self.second_user_id, self.third_user_id] + } + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 2, chunk) + self.assertCountEqual( + [c.event_id for c in chunk], [self.event_id_1, self.event_id_2] + ) + + def test_filter_relation_type(self): + # Messages which have annotations. + filter = {"io.element.relation_types": [RelationTypes.ANNOTATION]} + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0].event_id, self.event_id_1) + + # Messages which have references. + filter = {"io.element.relation_types": [RelationTypes.REFERENCE]} + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0].event_id, self.event_id_2) + + # Messages which have either annotations or references. + filter = { + "io.element.relation_types": [ + RelationTypes.ANNOTATION, + RelationTypes.REFERENCE, + ] + } + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 2, chunk) + self.assertCountEqual( + [c.event_id for c in chunk], [self.event_id_1, self.event_id_2] + ) + + def test_filter_relation_senders_and_type(self): + # Messages which second user reacted to. + filter = { + "io.element.relation_senders": [self.second_user_id], + "io.element.relation_types": [RelationTypes.ANNOTATION], + } + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0].event_id, self.event_id_1) + + def test_duplicate_relation(self): + """An event should only be returned once if there are multiple relations to it.""" + self.helper.send_event( + room_id=self.room_id, + type="m.reaction", + content={ + "m.relates_to": { + "rel_type": RelationTypes.ANNOTATION, + "event_id": self.event_id_1, + "key": "A", + } + }, + tok=self.second_tok, + ) + + filter = {"io.element.relation_senders": [self.second_user_id]} + chunk = self._filter_messages(filter) + self.assertEqual(len(chunk), 1, chunk) + self.assertEqual(chunk[0].event_id, self.event_id_1) diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 39947a166b..ced3efd93f 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -17,6 +17,7 @@ from typing import Set from unittest import mock from twisted.internet import defer, reactor +from twisted.internet.defer import Deferred from synapse.api.errors import SynapseError from synapse.logging.context import ( @@ -703,6 +704,48 @@ class CachedListDescriptorTestCase(unittest.TestCase): obj.mock.assert_called_once_with((40,), 2) self.assertEqual(r, {10: "fish", 40: "gravy"}) + def test_concurrent_lookups(self): + """All concurrent lookups should get the same result""" + + class Cls: + def __init__(self): + self.mock = mock.Mock() + + @descriptors.cached() + def fn(self, arg1): + pass + + @descriptors.cachedList("fn", "args1") + def list_fn(self, args1) -> "Deferred[dict]": + return self.mock(args1) + + obj = Cls() + deferred_result = Deferred() + obj.mock.return_value = deferred_result + + # start off several concurrent lookups of the same key + d1 = obj.list_fn([10]) + d2 = obj.list_fn([10]) + d3 = obj.list_fn([10]) + + # the mock should have been called exactly once + obj.mock.assert_called_once_with((10,)) + obj.mock.reset_mock() + + # ... and none of the calls should yet be complete + self.assertFalse(d1.called) + self.assertFalse(d2.called) + self.assertFalse(d3.called) + + # complete the lookup. @cachedList functions need to complete with a map + # of input->result + deferred_result.callback({10: "peas"}) + + # ... which should give the right result to all the callers + self.assertEqual(self.successResultOf(d1), {10: "peas"}) + self.assertEqual(self.successResultOf(d2), {10: "peas"}) + self.assertEqual(self.successResultOf(d3), {10: "peas"}) + @defer.inlineCallbacks def test_invalidate(self): """Make sure that invalidation callbacks are called.""" |